Getting Started with Snowpipe Streaming and Amazon MSK
Overview
Snowflake's Snowpipe streaming capabilities are designed for rowsets with variable arrival frequency. It focuses on lower latency and cost for smaller data sets. This helps data workers stream rows into Snowflake without requiring files with a more attractive cost/latency profile.
Here are some of the use cases that can benefit from Snowpipe streaming:
- IoT time-series data ingestion
- CDC streams from OLTP systems
- Log ingestion from SIEM systems
- Ingestion into ML feature stores
In our demo, we will use real-time commercial flight data over the San Francisco Bay Area from the Opensky Network to illustrate the solution using Snowflake's Snowpipe streaming and MSK (Amazon Managed Streaming for Apache Kafka).
The architecture diagram below shows the deployment. An MSK cluster and a Linux EC2 instance (jumphost) will be provisioned in private subnets of an AWS VPC. The Linux jumphost will host the Kafka producer and Snowpipe streaming via Kafka Connect.
The Kafka producer calls the data sources' REST API and receives time-series data in JSON format. This data is then ingested into the Kafka cluster before being picked up by the Kafka connector and delivered to a Snowflake table. The data in Snowflake table can be visualized in real-time with AMG (Amazon Managed Grafana) and Streamlit The historical data can also be analyzed by BI tools like Amazon Quicksight. Please note that in the demo, we are not demonstrating the visualization aspect. We will have a future Quickstart demo that focuses on visualization.


Prerequisites
- Familiarity with Snowflake, basic SQL knowledge, Snowsight UI and Snowflake objects
- Familiarity with AWS Services (e.g. EC2, MSK, etc), Networking and the Management Console
- Basic knowledge of Python and Linux shell scripting
What You'll Need Before the Lab
To participate in the virtual hands-on lab, attendees need the following resources.
- A Snowflake Enterprise Account on preferred AWS region with
ACCOUNTADMINaccess - An AWS Account with
Administrator Access - Create your own VPC and subnets (This is optional if you have an existing VPC with subnets you can leverage. Please refer
to this AWS document for the MSK networking topology)
- In the AWS account, create a VPC, preferrably in the same region as the Snowflake account
- In the VPC, create subnets and attach an internet gateway to allow egress traffic to the internet by using a routing table and security group for outbound traffic. Note that the subnets can be public or private, for private subnets, you will need to attach a NAT gateway to allow egress traffic to the internet. Public subnets are sufficient for this lab.
- Now if you have decided to create your own VPC/subnets, for your convenience, click here to deploy a VPC with a pair of public and private subnets, internet gateway and NAT gateway for you.
What You'll Learn
- Using MSK (Amazon Managed Streaming for Apache Kafka)
- Connecting to EC2 instances with Amazon System Session Manager, this is an alternative to SSH if your instance is in a private subnet
- Using SnowSQL, the command line client for connecting to Snowflake to execute SQL queries and perform all DDL and DML operations, including loading data into and unloading data out of database tables.
- Using Snowflake to query tables populated with time-series data
What You'll Build
- Create a provisioned Kafka cluster
- Create Kafka producers and connectors
- Create topics in a Kafka cluster
- A Snowflake database for hosting real-time flight data
Create a provisioned Kafka cluster and a Linux jumphost in AWS
1. Create an MSK cluster and an EC2 instance
The MSK cluster is created in a VPC managed by Amazon. We will deploy our Kafka clients in our own VPC and use security groups to ensure the communications between the MSK cluster and clients are secure.
First, click here
to launch a provisioned MSK cluster. Note the default AWS region is us-west-2 (Oregon), feel free to select a region you prefer to deploy the environment.
Click Next at the Create stack page.
Set the Stack name or modify the default value to customize it to your identity. Leave the default Kafka version as is. For MSKSecurityGroupId, we recommend
to use the default security group in your VPC. In the drop-down menu, pick two subnets, they can be either public or private subnets depending on the network layout of your VPC. Leave TLSMutualAuthentication as false and the jumphost instance type and AMI id as default before clicking
Next.
See below sample screen capture for reference.

Leave everything as default in the Configure stack options page and click Next.
In the Review page, click Submit.
In about 10-30 minutes depending on your AWS region, the Cloudformation template provisions an MSK cluster with two brokers. It will also provision a Linux EC2 instance in the subnet you selected. We will then use it to run the Kafka connector with Snowpipe streaming SDK and the producer.
2. Configure the Linux session for timeout and default shell
In this step we need to connect to the EC2 instance in order to interact with the MSK cluster.
Go to the AWS Systems Manager console in the same region where you setup the MSK cluster,
Click Session Manager on the left pane.

Next, we will set the preferred shell as bash.
Preferences tab.

Edit button.

Go to General preferences section, type in 60 minutes for idle session timeout value.

Further scroll down to Linux shell profile section, and type in /bin/bash before clicking Save button.

3. Connect to the Linux EC2 instance console
Session tab and click the Start session button.

Target instances.
Its name should be <Cloudformation stack name>-jumphost, select it and click Start session.

4. Create a key-pair to be used for authenticating with Snowflake programmatically
Create a key pair in AWS Session Manager console by executing the following commands. You will be prompted to give an encryption password, remember this phrase, you will need it later.
cd $HOME openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8
See below example screenshot:

Next we will create a public key by running following commands. You will be prompted to type in the phrase you used in above step.
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
see below example screenshot:

Next we will print out the public key string in a correct format that we can use for Snowflake.
grep -v KEY rsa_key.pub | tr -d 'n' > pub.Key cat pub.Key
see below example screenshot:

5. Install the Kafka connector for Snowpipe streaming
Run the following command to install the Kafka connector and Snowpipe streaming SDK
passwd=changeit # Use the default password for the Java keystore, you should chang it after finishing the lab directory=/home/azureuser/snowpipe-streaming # Installation directory cd $HOME mkdir -p $directory cd $directory pwd=`pwd` sudo yum clean all sudo yum -y install openssl vim-common java-1.8.0-openjdk-devel.x86_64 gzip tar jq python3-pip wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz tar xvfz kafka_2.12-2.8.1.tgz -C $pwd wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar -O $pwd/kafka_2.12-2.8.1/libs/aws-msk-iam-auth-1.1.1-all.jar rm -rf $pwd/kafka_2.12-2.8.1.tgz cd /tmp && cp /usr/lib/jvm/java-openjdk/jre/lib/security/cacerts kafka.client.truststore.jks cd /tmp && keytool -genkey -keystore kafka.client.keystore.jks -validity 300 -storepass $passwd -keypass $passwd -dname "CN=snowflake.com" -alias snowflake -storetype pkcs12 #Snowflake kafka connector wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.8.2/snowflake-kafka-connector-1.8.2.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-kafka-connector-1.8.2.jar #Snowpipe streaming SDK wget https://repo1.maven.org/maven2/net/snowflake/snowflake-ingest-sdk/1.0.3-beta/snowflake-ingest-sdk-1.0.3-beta.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-ingest-sdk-1.0.3-beta.jar wget https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.13.15/snowflake-jdbc-3.13.15.jar -O $pwd/kafka_2.12-2.8.1/libs/snowflake-jdbc-3.13.15.jar wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar -O $pwd/kafka_2.12-2.8.1/libs/bc-fips-1.0.1.jar wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar -O $pwd/kafka_2.12-2.8.1/libs/bcpkix-fips-1.0.3.jar
6. Retrieve the broker string from the MSK cluster.
Go to the MSK console and click on the newly created MSK cluster, it should have a substring MSKCluster in its name.

View client information

Private endpoint for TLS authentication type.

Now switch back to the Session Manager window and execute the following command by replacing <broker string> with
the copied values.
export BS=<broker string>
Now run the following command to add BS as an environment variable so it is recognized across the Linux sessions.
echo "export BS=$BS" >> ~/.bashrc
See the following example screen capture.

7. Create a configuration file connect-standalone.properties for the Kafka connector
Run the following commands to generate a configuration file connect-standalone.properties in directory /home/azureuser/snowpipe-streaming/scripts for the client to authenticate
with the Kafka cluster.
dir=/home/azureuser/snowpipe-streaming/scripts mkdir -p $dir && cd $dir cat << EOF > $dir/connect-standalone.properties #************CREATING SNOWFLAKE Connector**************** bootstrap.servers=$BS #************SNOWFLAKE VALUE CONVERSION**************** key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true #************SNOWFLAKE **************** offset.storage.file.filename=/tmp/connect.offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 #*********** FOR SSL **************** security.protocol=SSL ssl.truststore.location=/tmp/kafka.client.truststore.jks ssl.truststore.password=changeit ssl.enabled.protocols=TLSv1.1,TLSv1.2 consumer.security.protocol=SSL consumer.ssl.truststore.location=/tmp/kafka.client.truststore.jks consumer.ssl.truststore.password=changeit consumer.ssl.enabled.protocols=TLSv1.1,TLSv1.2 EOF
A configuration file connect-standalone.properties is created in directory /home/azureuser/snowpipe-streaming/scripts
8. Create a security client.properties configuration file for the producer
Run the following commands to create a security configuration file client.properties for the MSK cluster
dir=/home/azureuser/snowpipe-streaming/scripts cat << EOF > $dir/client.properties security.protocol=SSL ssl.truststore.location=/tmp/kafka.client.truststore.jks ssl.truststore.password=changeit ssl.enabled.protocols=TLSv1.1,TLSv1.2 EOF
A configuration file client.properties is created in directory /home/azureuser/snowpipe-streaming/scripts
9. Create a streaming topic called “streaming” in the MSK cluster
Now we can run the following commands to create a Kafka topic on the MSK cluster to stream our data.
$HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-topics.sh --bootstrap-server $BS --command-config $HOME/snowpipe-streaming/scripts/client.properties --create --topic streaming --partitions 2 --replication-factor 2
You should see the response Created topic streaming if it is successful.
To describe the topic, run the following commands:
$HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-topics.sh --bootstrap-server $BS --command-config $HOME/snowpipe-streaming/scripts/client.properties --describe --topic streaming
streaming topic.
See below example screenshot:

Configure Snowflake to communicate with the MSK cluster
1. Creating user, role, and database
First login to your Snowflake account as a power user with ACCOUNTADMIN role. Then run the following SQL commands in a worksheet to create a user, database and the role that we will use in the lab.
-- Set default value for multiple variables -- For purpose of this workshop, it is recommended to use these defaults during the exercise to avoid errors -- You should change them after the workshop SET PWD = 'Test1234567'; SET USER = 'STREAMING_USER'; SET DB = 'MSK_STREAMING_DB'; SET WH = 'MSK_STREAMING_WH'; SET ROLE = 'MSK_STREAMING_RL'; USE ROLE ACCOUNTADMIN; -- CREATE USERS CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD COMMENT='STREAMING USER'; -- CREATE ROLES CREATE OR REPLACE ROLE IDENTIFIER($ROLE); -- CREATE DATABASE AND WAREHOUSE CREATE DATABASE IF NOT EXISTS IDENTIFIER($DB); USE IDENTIFIER($DB); CREATE OR REPLACE WAREHOUSE IDENTIFIER($WH) WITH WAREHOUSE_SIZE = 'SMALL'; -- GRANTS GRANT CREATE WAREHOUSE ON ACCOUNT TO ROLE IDENTIFIER($ROLE); GRANT ROLE IDENTIFIER($ROLE) TO USER IDENTIFIER($USER); GRANT OWNERSHIP ON DATABASE IDENTIFIER($DB) TO ROLE IDENTIFIER($ROLE); GRANT USAGE ON WAREHOUSE IDENTIFIER($WH) TO ROLE IDENTIFIER($ROLE); -- SET DEFAULTS ALTER USER IDENTIFIER($USER) SET DEFAULT_ROLE=$ROLE; ALTER USER IDENTIFIER($USER) SET DEFAULT_WAREHOUSE=$WH;
Now logout of Snowflake, sign back in as the default user streaming_user we just created with the associated password (default: Test1234567).
Run the following SQL commands in a worksheet:
SET DB = 'MSK_STREAMING_DB'; SET SCHEMA = 'MSK_STREAMING_SCHEMA'; USE IDENTIFIER($DB); CREATE OR REPLACE SCHEMA IDENTIFIER($SCHEMA);
2. Install SnowSQL (optional but highly recommended)
SnowSQL is the command line client for connecting to Snowflake to execute SQL queries and perform all DDL and DML operations, including loading data into and unloading data out of database tables.
First, in the Snowflake worksheet, replace <pubKey> with the content of the file /home/azureuser/pub.Key (see step 3 in previous section) in the following SQL command and execute.
use role accountadmin; alter user streaming_user set rsa_public_key='<pubKey>';
See below example screenshot:

Now we can install SnowSQL. Execute the following commands on the Linux Session Manager console:
curl https://sfc-repo.snowflakecomputing.com/snowsql/bootstrap/1.2/linux_x86_64/snowsql-1.2.24-linux_x86_64.bash -o /tmp/snowsql-1.2.24-linux_x86_64.bash echo -e "~/bin n y" > /tmp/ans bash /tmp/snowsql-1.2.24-linux_x86_64.bash < /tmp/ans
See below example screenshot:

Next set the environment variable for Snowflake Private Key Phrase:
export SNOWSQL_PRIVATE_KEY_PASSPHRASE=<key phrase you set up when running openssl previously>
Note that you should add the command above in the ~/.bashrc file to preserve this environment variable across sessions.
echo "export SNOWSQL_PRIVATE_KEY_PASSPHRASE=$SNOWSQL_PRIVATE_KEY_PASSPHRASE" >> ~/.bashrc
Now you can execute this command to interact with Snowflake:
$HOME/bin/snowsql -a <Snowflake Account Name> --region <AWS region where Snowflake is located> -u streaming_user --private-key-path $HOME/rsa_key.p8 -d msk_streaming_db -s msk_streaming_schema
See below example screenshot:

Type Ctrl-D to get out of SnowSQL session.
You can edit the ~/.snowsql/config file to set default parameters and eliminate the need to specify them every time you run snowsql.
At this point, the Snowflake setup is complete.
Configure Kafka connector to Snowflake with Snowpipe streaming SDK
1. Run the following commands to collect various connection parameters for the Kafka connector
cd $HOME outf=/tmp/params cat << EOF > /tmp/get_params a='' until [ ! -z $a ] do read -p "Input Snowflake account URL: e.g. gwa123456 ==> " a done r='' until [ ! -z $r ] do read -p "Input Snowflake account region: e.g. us-west-2 ==> " r done if [[ $r == "us-west-2" ]] then echo export clstr_url=$a.snowflakecomputing.com > $outf export clstr_url=$a.snowflakecomputing.com else echo export clstr_url=$a.$r.snowflakecomputing.com > $outf export clstr_url=$a.$r.snowflakecomputing.com fi read -p "Snowflake cluster user name: default: streaming_user ==> " user if [[ $user == "" ]] then user="streaming_user" fi echo export user=$user >> $outf export user=$user pass='' until [ ! -z $pass ] do read -p "Private key passphrase ==> " pass done echo export key_pass=$pass >> $outf export key_pass=$pass read -p "Full path to your Snowflake private key file, default: /home/azureuser/rsa_key.p8 ==> " p8 if [[ $p8 == "" ]] then p8="/home/azureuser/rsa_key.p8" fi priv_key=`cat $p8 | grep -v PRIVATE | tr -d 'n'` echo export priv_key=$priv_key >> $outf export priv_key=$priv_key cat $outf >> $HOME/.bashrc EOF . /tmp/get_params
See below example screen capture.

2. Run the following commands to create a Snowflake Kafka connect property configuration file:
dir=/home/azureuser/snowpipe-streaming/scripts cat << EOF > $dir/snowflakeconnectorMSK.properties name=snowpipeStreaming connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector tasks.max=4 topics=streaming snowflake.private.key.passphrase=$key_pass snowflake.database.name=MSK_STREAMING_DB snowflake.schema.name=MSK_STREAMING_SCHEMA snowflake.topic2table.map=streaming:MSK_STREAMING_TBL buffer.count.records=10000 buffer.flush.time=5 buffer.size.bytes=20000000 snowflake.url.name=$clstr_url snowflake.user.name=$user snowflake.private.key=$priv_key snowflake.role.name=MSK_STREAMING_RL snowflake.ingestion.method=snowpipe_streaming value.converter.schemas.enable=false jmx=true key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter errors.tolerance=all EOF
Putting it all together
Finally, we are ready to start ingesting data into the Snowflake table.
1. Start the Kafka Connector for Snowpipe streaming
Go back to the Linux console and execute the following commands to start the Kafka connector.
$HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/connect-standalone.sh $HOME/snowpipe-streaming/scripts/connect-standalone.properties $HOME/snowpipe-streaming/scripts/snowflakeconnectorMSK.properties

2. Start the producer that will ingest real-time data to the MSK cluster
Start a new Linux session by following step 3 in the section named Create a provisioned Kafka cluster and a Linux jumphost in AWS
curl --connect-timeout 5 -k https://ecs-alb-1504531980.us-west-2.elb.amazonaws.com:8502/opensky | $HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-console-producer.sh --broker-list $BS --producer.config $HOME/snowpipe-streaming/scripts/client.properties --topic streaming
You should see response similar to screen capture below if everything works well.

Note that in the script above, the producer queries a Rest API that provides real-time flight data over the San Francisco
Bay Area in JSON format. The data includes information such as timestamps, icao numbers, flight IDs, destination airport, longitude,
latitude, and altitude of the aircraft, etc. The data is ingested into the streaming topic on the MSK cluster and
then picked up by the Snowpipe streaming Kafka connector, which delivers it directly into a Snowflake
table msk_streaming_db.msk_streaming_schema.msk_streaming_tbl.

Query ingested data in Snowflake
Now, switch back to the Snowflake console and make sure that you signed in as the default user streaming_user.
The data should have been streamed into a table, ready for further processing.
1. Query the raw data
To verify that data has been streamed into Snowflake, execute the following SQL commands.
use msk_streaming_db; use schema msk_streaming_schema; show channels in table msk_streaming_tbl;

Now run the following query on the table.
select * from msk_streaming_tbl;
You should see there are two columns in the table: RECORD_METADATA and RECORD_CONTENT as shown in the screen capture below.

RECORD_CONTENT column is an JSON array that needs to be flattened.2. Flatten the raw JSON data
Now execute the following SQL commands to flatten the raw JSONs and create a materialized view with multiple columns based on the key names.
create or replace materialized view flights_vw as select f.value:utc::timestamp_ntz ts_utc, CONVERT_TIMEZONE('UTC','America/Los_Angeles',ts_utc::timestamp_ntz) as ts_pt, f.value:alt::integer alt, f.value:dest::string dest, f.value:orig::string orig, f.value:id::string id, f.value:icao::string icao, f.value:lat::float lat, f.value:lon::float lon, st_geohash(to_geography(ST_MAKEPOINT(lon, lat)),12) geohash, year(ts_pt) yr, month(ts_pt) mo, day(ts_pt) dd, hour(ts_pt) hr FROM msk_streaming_tbl, Table(Flatten(msk_streaming_tbl.record_content)) f;
The SQL commands create a view, convert timestamps to different time zones, and use Snowflake's Geohash function to generate geohashes that can be used in time-series visualization tools like Grafana
Let's query the view flights_vw now.
select * from flights_vw;

3. Stream real-time flight data continuously to Snowflake
We can now write a loop to stream the flight data continuously into Snowflake.
Note that there is a 30 seconds sleep time between the queries, it is because for our data source in this workshop, anything less than 30 seconds will not incur any new data as the source won't update more frequently than 30 seconds. Feel free to lower the frequency for other data sources that update more frequently.
Go back to the Linux session and run the following script.
while true do curl --connect-timeout 5 -k https://ecs-alb-1504531980.us-west-2.elb.amazonaws.com:8502/opensky | $HOME/snowpipe-streaming/kafka_2.12-2.8.1/bin/kafka-console-producer.sh --broker-list $BS --producer.config $HOME/snowpipe-streaming/scripts/client.properties --topic streaming sleep 30 done
You can now go back to the Snowflake worksheet to run a select count(1) from flights_vw query every 30 seconds to verify that the row counts is indeed increasing.
Cleanup
When you are done with the demo, to tear down the AWS resources, simply go to the Cloudformation console.
Select the Cloudformation template you used to deploy the MSK cluster at the start of the demo, then click the Delete tab. All the resources that were deployed previously, such as EC2 instances, MSK clusters, roles, etc., will be cleaned up.
See example screen capture below.

After the deletion of the MSK cluster, you will also need to delete the Cloudformation template for VPC if you created your own at the very beginning of the lab.
For Snowflake cleanup, execute the following SQL commands.
USE ROLE ACCOUNTADMIN; DROP DATABASE MSK_STREAMING_DB; DROP WAREHOUSE MSK_STREAMING_WH; DROP ROLE MSK_STREAMING_RL; -- Drop the streaming user DROP USER IF EXISTS STREAMING_USER;
Conclusions
In this lab, we built a demo to show how to ingest time-series data using Snowpipe streaming and Kafka with low latency. We demonstrated this using a self-managed Kafka connector on an EC2 instance. However, for a production environment, we recommend using Amazon MSK Connect, which offers scalability and resilience through the AWS infrastructure.
Related Resources
This content is provided as is, and is not maintained on an ongoing basis. It may be out of date with current Snowflake instances