Your First Steps with Snowpipe

Author: Torsten Grabs

Engineering, How to Use Snowflake

Please note that Snowpipe is in public preview in the US West Region of AWS. Auto Ingest will be available by February 2018. 

This blog post walks you through your first steps with deploying Snowpipe. We start with the steps for configuring Snowpipe so that it continuously loads data from Amazon S3 into a target table in Snowflake. Next, we explore the easy-to-configure mode of Snowpipe, where S3 event notifications sent to a dedicated queue in Amazon Simple Queue Service (SQS) inform Snowpipe about new data that is ready for loading. Finally, this post explains how to exert tighter programmatic control over when and what notifications are sent using the Snowpipe REST API.

If you haven’t read our blog about what Snowpipe is, click here, to understand how Snowpipe works and the business benefits it provides. Otherwise, keep reading this blog and learn how to configure Snowpipe.

Snowpipe one-time configuration in SQL

The following SQL statements show the one-time configuration experience for setting up Snowpipe. They include familiar DDL, such as creating an external stage and a new table, as well as how to create a pipe which is a new database object in Snowflake.

In the example below, we use a VARIANT column in Snowflake to store incoming data. Semi-structured data types in Snowflake are particularly well-suited for streaming or continuous data loading scenarios. They allow for seamless schema evolution in your application, over time, without making any changes to the Snowflake table schema. Snowflake automatically applies an efficient columnar representation to the data while it’s being loaded.

This makes it easy to leave the incoming continuous data in its original format. Note, a conversion into a different format such as Parquet is not needed for efficient loading or querying in Snowflake:

use role snowpipe_role;
use database snowpipe;

create or replace stage snowpipe.public.snowstage
    url='s3://snowpipe-demo/'
    credentials = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...' );
show stages;

-- Create target table for JSON data
create or replace table snowpipe.public.snowtable(jsontext variant);
show tables;

-- Create a pipe to ingest JSON data
create or replace pipe snowpipe.public.snowpipe auto_ingest=true as
    copy into snowpipe.public.snowtable
    from @snowpipe.public.snowstage
    file_format = (type = 'JSON');
show pipes;

The key database concept that Snowpipe introduces to Snowflake’s SQL language is called a “pipe“. Think of a pipe as a wrapper around Snowflake’s familiar COPY statement for data loading. A Snowpipe pipe continuously looks for new data, then loads it using the COPY statement in the pipe definition.

In the current example, the pipe is defined with AUTO_INGEST=true>, which tells Snowflake to use an SQS queue in AWS to receive event notifications from an S3 bucket pertaining to new data that is ready to load. The name of the SQS queue is shown in a new column within the results of the SHOW STAGES command. This queue name is used to configure S3 event notifications.

Snowpipe one-time configuration in AWS S3

With Snowpipe auto-ingest, available in February 2018, the remaining step requires configuring event notifications for the S3 bucket so the pipe recognizes when there is new data available for loading. Auto-ingest relies on SQS queues to deliver the notifications from S3 to Snowpipe. All Snowpipe SQS queues are created and managed by Snowflake so you don’t have to worry about managing yet another AWS service for your application.

The name of the SQS queue is available from the SHOW STAGES command in a new column called NOTIFICATION_CHANNEL. The following screenshot shows the event notification configuration for the S3 bucket. Note the use of the name (or ARN, to be precise) of the SQS queue at the bottom of the dialog box along and the check mark for the ObjectCreate notifications.

 

After saving this notification configuration, place new files in the bucket. Snowflake will receive automatic notifications about the files from S3. Then, Snowpipe will automatically begin loading them into whatever table you have defined as the target in your pipe configuration.

Continuously delivering data

Snowpipe requires placing data into S3 before loading.This design choice takes advantage of the rich ecosystem of tools designed for storing data in S3. Snowpipe’s continued use of S3 as the stage for data loading allows you to use the tool of your choice for delivering data in an S3 bucket. This includes services such as AWS Kinesis with Firehose, and Kafka with its S3 connector. Both are popular choices for transporting continuous and streaming data.

The architectural take-away from this is important: Snowflake can automatically and continuously load data delivered to S3 from Kinesis or Kafka.

Querying continuously loading data

A simple, but useful, example query for Snowpipe tracks the arrival of rows over time with a JSON field called 'created_at':

select
    date_trunc('MINUTE', to_timestamp(jsontext:created_at)),
    count(*)
from snowpipe.public.snowtable
group by date_trunc('MINUTE', to_timestamp(jsontext:created_at));

This illustrates how many rows arrived each minute, serving as a starting point for more expressive analytics over time windows.

It’s worth pointing out the powerful date, time and windowing functions Snowflake’s SQL language provides. This makes it easy to analyze data, as it arrives over time, and group it into time windows for examining trends.

Full control for Snowpipe

Using S3 event notifications to tell Snowpipe about new data to load from an S3 bucket may not be appropriate in some use cases. Consider an application that first needs to perform some processing in AWS Lambda before performing the load into Snowflake. In that case, we recommend timing notifications to Snowpipe more tightly.

Snowpipe allows for defining pipes with AUTO_INGEST set to ‘false’ while using exactly the same DDL, as shown above, to create stages and pipes. Without automatic ingestion pipes, however, an application needs to invoke a REST API with the filename, or a set of file names, to tell Snowpipe that the file is ready for loading. For instance, you can call the Snowpipe REST API after finishing the preprocessing from within your AWS Lambda code.

In general, any application capable of invoking a REST API can notify Snowpipe about new files. To make developing against the REST API easier, SDKs for both Python and Java are available. The following code snippet shows an example of the proxy-like abstraction that the Python SDK provides. Note how the ingest_files call can consume a set of files in addition to just a single file:

ingest_manager = SimpleIngestManager(account='...',
                                     user='...',
                                     pipe='...',
                                     private_key=...)
staged_file_list = []
staged_file_list.append(StagedFile(filename, None))
resp = ingest_manager.ingest_files(staged_file_list)

You can find the latest SDK versions on PyPI or Maven Central by using the search terms “Snowflake ingest”.

Try Snowpipe today

Please give Snowpipe a spin today and let us know your feedback. Snowpipe using REST-based notifications is now available in US West. You can find the documentation and information on how to get started here.

Snowpipe with auto-ingest using SQS will be available for preview in December, 2017. If you are interested in participating in a private preview for this capability, please let us know here.

You can also try Snowflake for free. Sign up and receive $400 US dollars worth of free usage. You can create a sandbox or launch a production implementation from the same Snowflake environment.