How to Load Terabytes into Snowflake – Speeds, Feeds and Techniques
4月 30, 2018
Author: Stuart Ozer
Engineering, How to Use Snowflake, Snowflake Technology
We often get these questions from customers facing an initial data load into Snowflake or, a large-scale daily data ingestion: “What’s the fastest way to load terabytes of data?”, and: “What incoming data format do you recommend?” Here’s an example of a data load that provides answers to both of those questions, and more.
We recently used data from the 10TB TPCDS Benchmark data set to explore a few alternatives. This data is available to all Snowflake customers through the database named SNOWFLAKE_SAMPLE_DATA, schema TPCDS_SF10TCL.
The largest table in that database is STORE_SALES, containing 28.8 billion rows representing 4.7 TB of uncompressed data, roughly 164 bytes per row. Snowflake compresses this down to 1.3 TB internally. The table contains five years of daily transaction history and 23 columns split between integer and decimal data.
We loaded three different source data formats for this table:
- CSV files gzipped
- Date-partitioned Parquet files (snappy compressed)
- Date-partitioned ORC files (snappy compressed)
When loading Parquet and ORC into Snowflake, you have the choice of storing entire rows within a Snowflake VARIANT, or extracting the individual columns into a structured schema. We tested both approaches for load performance. But for data with a fixed schema such as TPCDS, we prefer to store it structured.
Partitioned Parquet and ORC are interesting in other ways. When using HIVE partitioning for these formats within a data-lake environment, the value of the partitioning data column is typically represented by a portion of the file name, rather than by a value inside of the data itself. This means that during data loading, we must capture and potentially manipulate the file name by referencing Snowflake’s METADATA$FILENAME property, when using the COPY command.
First, let’s look at the raw performance of loading the data using a Snowflake 2X-large cluster:
|Source Format||Target Layout||Load Time (sec)||TB/Hr (uncompressed)|
|Parquet (Snappy comp)||Semi-structured||3518||4.8|
|Parquet (Snappy comp)||Structured||3095||5.4|
|ORC (Snappy comp)||Semi-structured||3845||4.4|
|ORC (Snappy comp)||Structured||2820||6.0|
A few points jump right out:
- Loading from Gzipped CSV is several times faster than loading from ORC and Parquet at an impressive 15 TB/Hour. While 5-6 TB/hour is decent if your data is originally in ORC or Parquet, don’t go out of your way to CREATE ORC or Parquet files from CSV in the hope that it will load Snowflake faster.
- Loading data into fully structured (columnarized) schema is ~10-20% faster than landing it into a VARIANT.
When we tested loading the same data using different warehouse sizes, we found that load speed was inversely proportional to the scale of the warehouse, as expected. For example, a 3X-large warehouse, which is twice the scale of a 2X-large, loaded the same CSV data at a rate of 28 TB/Hour. Conversely, an X-large loaded at ~7 TB/Hour, and a large loaded at a rate of ~3.5 TB/hr. This means you will spend about the same number of Snowflake credits to load a given data set regardless of the cluster size you use, as long as you suspend the warehouse when done to avoid idle time.
Load rates for your own data files may differ based on a number of factors:
- Location of your S3 buckets – For our test, both our Snowflake deployment and S3 buckets were located in us-west-2
- Number and types of columns – A larger number of columns may require more time relative to number of bytes in the files.
- Gzip Compression efficiency – More data read from S3 per uncompressed byte may lead to longer load times.
(In all cases, be sure to use a sufficient number of load files to keep all loading threads busy. For a 2X-large, there are 256 such threads, and we had ~2000 load files to cover the five years of history.)
Best Practices for Parquet and ORC
While we are considering Parquet and ORC, let’s look at the technique we used to populate the fully-structured version of STORE_SALES using partitioned Parquet data.
First, consider that the date partitioned Parquet files reside in an S3 bucket with the following prefix naming conventions, where the highlighted integer is one of the values of the partitioning keys. In STORE_SALES, it is an integer surrogate key for the sold_date column named ss_sold_date_sk:
If the data files were originally generated by HIVE, there will also be a prefix representing data for which the partitioning key is NULL:
Finally, HIVE will create a series of 0-byte “tag” files in the bucket that need to be ignored during ingestion. These files have a format in the pattern:
As we noted earlier, the data files themselves do not contain a column or value for ss_sold_date_sk even though it is part of the table’s definition. Instead, the value must be derived from the prefix name in S3.
To handle ingestion from this bucket, we first defined an external stage for Snowflake as:
create or replace stage parquet_test url='s3://<my_bucket>/tpcds/10tb_parquet/' credentials = (aws_key_id=…,aws_secret_key=…) FILE_FORMAT = (TYPE = 'PARQUET');
Finally, our command to load all of the Parquet data into the fully structured STORE_SALES table will look like this:
copy into STORE_SALES from ( select NULLIF( regexp_replace ( METADATA$FILENAME, '.*\\=(.*)\\/.*', '\\1'), '__HIVE_DEFAULT_PARTITION__' ) as ss_sold_date_sk, $1:ss_sold_time_sk as ss_sold_time_sk, $1:ss_item_sk as ss_item_sk, $1:ss_customer_sk as ss_customer_sk, $1:ss_cdemo_sk as ss_cdemo_sk, $1:ss_hdemo_sk as ss_hdemo_sk, $1:ss_addr_sk as ss_addr_sk, $1:ss_store_sk as ss_store_sk, $1:ss_promo_sk as ss_promo_sk, $1:ss_ticket_number as ss_ticket_number, $1:ss_quantity as ss_quantity, $1:ss_wholesale_cost as ss_wholesale_cost, $1:ss_list_price as ss_list_price, $1:ss_sales_price as ss_sales_price, $1:ss_ext_discount_amt as ss_ext_discount_amt, $1:ss_ext_sales_price as ss_ext_sales_price, $1:ss_ext_wholesale_cost as ss_ext_wholesale_cost, $1:ss_ext_list_price as ss_ext_list_price, $1:ss_ext_tax as ss_ext_tax, $1:ss_coupon_amt as ss_coupon_amt, $1:ss_net_paid as ss_net_paid, $1:ss_net_paid_inc_tax as ss_net_paid_inc_tax, $1:ss_net_profit as ss_net_profit from @parquet_test/store_sales/) pattern= '.*/.*/.*/ss_sold_date_sk=.*/.*' ;
Notice that we are using the “transform” feature of the COPY command to parse and manipulate the semi-structured Parquet format. The main body of the COPY includes extraction of the labeled fields contained in the Parquet data, mapping them directly to the corresponding column in STORE_SALES. E.g. in the expression:
$1:ss_net_paid as ss_net_paid,
$1 refers to the contents of the single column representing an entire Parquet row of input data as a set of key-value pairs, and $1:ss_net_paid represents the value associated with the ss_net_paid key in that row.
Let’s take a closer look at the two highlighted expressions from the script above.
The first expression,
NULLIF( regexp_replace ( METADATA$FILENAME, '.*\\=(.*)\\/.*', '\\1'), '__HIVE_DEFAULT_PARTITION__' )
is used to populate the ss_sold_date_sk column, which is the value used to partition the input data. The REGEX_REPLACE function transforms the fully qualified S3 file name into just the integer value representing the date_key embedded in the prefix. It does this by searching for the characters in the file path following the ‘=’ sign, up to the next ‘/’. NULLIF is used to replace the partitions named HIVE_DEFAULT_PARTITION with the value NULL for the date key.
The final expression
serves as a filter on the input files, forcing COPY to ignore the 0-byte placeholder files in the bucket.
Loading ORC entails exactly the same process, changing only the FORMAT definition in the CREATE STAGE command.
Loading data into Snowflake is fast and flexible. You get the greatest speed when working with CSV files, but Snowflake’s expressiveness in handling semi-structured data allows even complex partitioning schemes for existing ORC and Parquet data sets to be easily ingested into fully structured Snowflake tables.