Intro to Kafka and Snowflake Managed Iceberg
Overview
This guide provides a comprehensive walkthrough for establishing a local data streaming pipeline from open-source Apache Kafka to Snowflake-managed Iceberg tables. We will configure the Snowflake Kafka Connector to leverage Snowpipe Streaming, enabling efficient, near real-time data ingestion while seamlessly handling schema evolution. This setup facilitates a robust and adaptable data flow, ensuring that changes in your Kafka topics are automatically reflected in your Snowflake-managed Iceberg tables.
Prerequisites
Before proceeding, ensure you have the following:
- A Snowflake account
- Visual Studio Code (VSCode) installed.
- Cloud Provider Amazon S3 or Azure Blob storage to write with the iceberg files.
What You’ll Learn
- Local installation and configuration of Apache Kafka on macOS.
- Configuration of the Snowflake Kafka Connector for Snowpipe Streaming.
- Implementation of data ingestion and schema evolution for Iceberg tables using the Snowflake Kafka Connector.
What You’ll Build
- A streaming pipeline through kafka to Snowflake-managed iceberg table
Setup
Example: USE ROLE ACCOUNTADMIN; CREATE OR REPLACE EXTERNAL VOLUME iceberg_external_volume_s3 STORAGE_LOCATIONS = ( ( NAME = 'my-s3-us-west-2' STORAGE_PROVIDER = 'S3' STORAGE_BASE_URL = 's3://<s3-bucket>/iceberg' STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<aws account>:role/icerberg_table_access_role' STORAGE_AWS_EXTERNAL_ID = 'icerberg_table_access_id' ) );
- Login to snowsight and run the sql statements to create database,schema and Iceberg table:
SET PWD = 'Test1234567'; SET USER = 'demo_user'; SET DB = 'demo_db'; SET SCHEMA = 'demo_schema' USE ROLE ACCOUNTADMIN; -- CREATE USERS CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD COMMENT='STREAMING USER'; -- GRANTS GRANT ROLE SYSADMIN TO USER IDENTIFIER($USER); -- CREATE DATABASE AND SCHEMA CREATE DATABASE IDENTIFIER($DB); CREATE SCHEMA IDENTIFIER($SCHEMA); -- CREATE ICEBERG TABLE USE DATABASE IDENTIFIER($DB); USE SCHEMA IDENTIFIER($SCHEMA); CREATE OR REPLACE ICEBERG TABLE emp_iceberg_tab ( record_metadata OBJECT() ) EXTERNAL_VOLUME = 'iceberg_external_volume_s3' CATALOG = 'SNOWFLAKE' BASE_LOCATION = 'emp_iceberg_tab';
- Enable schema evolution on the table
Snowflake enables seamless handling of evolving semi-structured data. As data sources add new columns, Snowflake automatically updates table structures to reflect these changes, including the addition of new columns. This eliminates the need for manual schema adjustments. More Info Document
alter ICEBERG table emp_iceberg_tab set ENABLE_SCHEMA_EVOLUTION =true;
- Create a key-pair to be used for authenticating(We will not used username and password) with Snowflake user by following Document
To generate an encrypted version, use the following command, which omits -nocrypt:
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8
The commands generate a private key in PEM format.
-----BEGIN ENCRYPTED PRIVATE KEY----- MIIE6T... -----END ENCRYPTED PRIVATE KEY-----
Generate a public key
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
The command generates the public key in PEM format.
-----BEGIN PUBLIC KEY----- MIIBIj... -----END PUBLIC KEY-----
Assign the public key to a Snowflake user
use role accountadmin; alter user demo_user set rsa_public_key='< pubKey >';
Setup Local Kafka
-
Download kafka in your local mac from here. This tutorial used kafka version 2.8.1.
-
Start zookeeper in new terminal
cd kafka_2.13-2.8.1/bin ./zookeeper-server-start.sh ../config/zookeeper.properties

- Start Kafka server in new terminal
cd kafka_2.13-2.8.1/bin ./kafka-server-start.sh ../config/server.properties

- Download snowflake-kafka-connector from here

- Copy kafka connector jar file into kafka_2.13-2.8.1/libs/ folder
- Create kafka topic
cd kafka_2.13-2.8.1/bin ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic demo_topic
- Run kafka producer in console mode and produce some records
cd kafka_2.13-2.8.1/bin ./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic demo_topic
- Sample Records
Sample records: {"emp_id":100,"first_name":"Keshav","last_name":"Lodhi","designation":"DataEngineer"} {"emp_id":101,"first_name":"Ashish","last_name":"kumar","designation":"Solution Architect"} {"emp_id":102,"first_name":"Anup","last_name":"moncy","designation":"Solution Architect"}
- validate records produced in previous step
cd kafka_2.13-2.8.1/bin ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo_topic --from-beginning
- You should be able to see all 3 records from above output.
- Run snowflake kafka connector in distributed mode in new terminal
- Run this command to start kafka connector on your machine
cd kafka_2.13-2.8.1/bin ./connect-distributed.sh <full_path>/kafka_2.13-2.8.1/config/connect-distributed.properties
- create configuration for connector
cd kafka_2.13-2.8.1/config/ vi SF_connect1.json { "name":"demoiceberg", "config":{ "snowflake.ingestion.method":"SNOWPIPE_STREAMING", "snowflake.streaming.iceberg.enabled":true, "snowflake.enable.schematization":true, "snowflake.role.name":"sysadmin", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":false, "value.converter.schemas.enable":false, "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector", "tasks.max":"8", "topics":"demo_topic", "snowflake.topic2table.map":" demo_topic:emp_iceberg_tab", "buffer.count.records":"10000", "buffer.flush.time":"60", "buffer.size.bytes":"5000000", "snowflake.url.name":"https://xxxxx-nrb47395.snowflakecomputing.com", "snowflake.user.name":"demo_user", "snowflake.database.name":"demo_db", "snowflake.schema.name":"demo_schema", "snowflake.private.key":"MIIFDjBABgkqh****", "snowflake.private.key.passphrase":"***" } }
- Update the following configuration values based on your setup:
snowflake.role.name → You can set this to SYSADMIN for demo purposes. topics → This should be the Kafka topic name created in Step 5. snowflake.topic2table.map → Map your Kafka topic name to the Iceberg table name created earlier. snowflake.url.name → Enter the URL of your Snowflake account. snowflake.user.name → Specify your Snowflake username. snowflake.database.name → Use the database name created in the "Create Snowflake Managed Iceberg Table" step. snowflake.schema.name → Use the schema name created in the "Create Snowflake Managed Iceberg Table" step. snowflake.private.key → Copy the content of the private key generated in the "Create Snowflake Managed Iceberg Table" step. snowflake.private.key.passphrase → Enter the passphrase for the encrypted private key file created in the same step.
- Execute configuration
curl -X POST -H "Content-Type: application/json" --data @<full_path>kafka_2.13-2.8.1/config/SF_connect1.json http://localhost:8083/connectors
Schema evolution and Validate data
- Check iceberg table by login into snowsight
select * from emp_iceberg_tab;

- Now you will see 4 columns detected by the connector
- EMP_ID NUMBER(19,0) - DESIGNATION VARCHAR(16777216) - LAST_NAME VARCHAR(16777216) - FIRST_NAME VARCHAR(16777216)
- Producer few more record in kafka topic
{"emp_id":102,"first_name":"Anup","last_name":"moncy","designation":"Solution Architect","company":"Snowflake"}
- Validate the data and schema of Iceberg table
select * from emp_iceberg_tab;

- Now you will see 5 columns detected by the connector
EMP_ID NUMBER(19,0) DESIGNATION VARCHAR(16777216) LAST_NAME VARCHAR(16777216) FIRST_NAME VARCHAR(16777216) COMPANY VARCHAR(16777216)
Conclusion And Resources
What You Learned
- Local installation and configuration of Apache Kafka.
- Configuration of the Snowflake Kafka Connector for Snowpipe Streaming in distributed mode.
- Implementation of data ingestion and schema evolution for Iceberg tables using the Snowflake Kafka Connector.
Related Resources
This content is provided as is, and is not maintained on an ongoing basis. It may be out of date with current Snowflake instances