Data for Breakfast Around the World

Drive impact across your organization with data and agentic intelligence.

Snowflake for DevelopersGuidesIntro to Kafka and Snowflake Managed Iceberg

Intro to Kafka and Snowflake Managed Iceberg

Apache Iceberg
Ashish Kumar

Overview

This guide provides a comprehensive walkthrough for establishing a local data streaming pipeline from open-source Apache Kafka to Snowflake-managed Iceberg tables. We will configure the Snowflake Kafka Connector to leverage Snowpipe Streaming, enabling efficient, near real-time data ingestion while seamlessly handling schema evolution. This setup facilitates a robust and adaptable data flow, ensuring that changes in your Kafka topics are automatically reflected in your Snowflake-managed Iceberg tables.

Prerequisites

Before proceeding, ensure you have the following:

  • A Snowflake account
  • Visual Studio Code (VSCode) installed.
  • Cloud Provider Amazon S3 or Azure Blob storage to write with the iceberg files.

What You’ll Learn

  • Local installation and configuration of Apache Kafka on macOS.
  • Configuration of the Snowflake Kafka Connector for Snowpipe Streaming.
  • Implementation of data ingestion and schema evolution for Iceberg tables using the Snowflake Kafka Connector.

What You’ll Build

  • A streaming pipeline through kafka to Snowflake-managed iceberg table

Setup

  1. Configure an external volume by following this Document or if you are using Azure Blob you can follow this Document
Example:

USE ROLE ACCOUNTADMIN;

CREATE OR REPLACE EXTERNAL VOLUME iceberg_external_volume_s3
   STORAGE_LOCATIONS =
      (
         (
            NAME = 'my-s3-us-west-2'
            STORAGE_PROVIDER = 'S3'
            STORAGE_BASE_URL = 's3://<s3-bucket>/iceberg'
            STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<aws account>:role/icerberg_table_access_role'
            STORAGE_AWS_EXTERNAL_ID = 'icerberg_table_access_id'
         )
      );
  1. Login to snowsight and run the sql statements to create database,schema and Iceberg table:
SET PWD = 'Test1234567';
SET USER = 'demo_user';
SET DB = 'demo_db';
SET SCHEMA = 'demo_schema'

USE ROLE ACCOUNTADMIN;

-- CREATE USERS
CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD  COMMENT='STREAMING USER';

-- GRANTS
GRANT ROLE SYSADMIN TO USER IDENTIFIER($USER);

-- CREATE DATABASE AND SCHEMA
CREATE DATABASE IDENTIFIER($DB);
CREATE SCHEMA IDENTIFIER($SCHEMA);

-- CREATE ICEBERG TABLE
USE DATABASE IDENTIFIER($DB);
USE SCHEMA IDENTIFIER($SCHEMA);

CREATE OR REPLACE ICEBERG TABLE emp_iceberg_tab (
    record_metadata OBJECT()
  )
  EXTERNAL_VOLUME = 'iceberg_external_volume_s3'
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'emp_iceberg_tab';
  1. Enable schema evolution on the table

Snowflake enables seamless handling of evolving semi-structured data. As data sources add new columns, Snowflake automatically updates table structures to reflect these changes, including the addition of new columns. This eliminates the need for manual schema adjustments. More Info Document

alter ICEBERG table emp_iceberg_tab set ENABLE_SCHEMA_EVOLUTION  =true;
  1. Create a key-pair to be used for authenticating(We will not used username and password) with Snowflake user by following Document

To generate an encrypted version, use the following command, which omits -nocrypt:

openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8

The commands generate a private key in PEM format.

-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIE6T...
-----END ENCRYPTED PRIVATE KEY-----

Generate a public key

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

The command generates the public key in PEM format.

-----BEGIN PUBLIC KEY-----
MIIBIj...
-----END PUBLIC KEY-----

Assign the public key to a Snowflake user

use role accountadmin;
alter user demo_user set rsa_public_key='< pubKey >';

Setup Local Kafka

  1. Download kafka in your local mac from here. This tutorial used kafka version 2.8.1.

  2. Start zookeeper in new terminal

cd kafka_2.13-2.8.1/bin
./zookeeper-server-start.sh ../config/zookeeper.properties
  1. Start Kafka server in new terminal
cd kafka_2.13-2.8.1/bin
./kafka-server-start.sh ../config/server.properties
  1. Download snowflake-kafka-connector from here
  • Copy kafka connector jar file into kafka_2.13-2.8.1/libs/ folder
  1. Create kafka topic
cd kafka_2.13-2.8.1/bin
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic demo_topic
  1. Run kafka producer in console mode and produce some records
cd kafka_2.13-2.8.1/bin
./kafka-console-producer.sh --bootstrap-server localhost:9092 --topic demo_topic
  • Sample Records
Sample records:
{"emp_id":100,"first_name":"Keshav","last_name":"Lodhi","designation":"DataEngineer"}
{"emp_id":101,"first_name":"Ashish","last_name":"kumar","designation":"Solution Architect"}
{"emp_id":102,"first_name":"Anup","last_name":"moncy","designation":"Solution Architect"}
  1. validate records produced in previous step
cd kafka_2.13-2.8.1/bin
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo_topic --from-beginning
  • You should be able to see all 3 records from above output.
  1. Run snowflake kafka connector in distributed mode in new terminal
  • Run this command to start kafka connector on your machine
cd kafka_2.13-2.8.1/bin
./connect-distributed.sh <full_path>/kafka_2.13-2.8.1/config/connect-distributed.properties
  1. create configuration for connector
cd kafka_2.13-2.8.1/config/

vi SF_connect1.json

{
    "name":"demoiceberg",
    "config":{
    "snowflake.ingestion.method":"SNOWPIPE_STREAMING",
    "snowflake.streaming.iceberg.enabled":true,
    "snowflake.enable.schematization":true,
    "snowflake.role.name":"sysadmin",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":false,
    "value.converter.schemas.enable":false,
    "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max":"8",
    "topics":"demo_topic",
    "snowflake.topic2table.map":" demo_topic:emp_iceberg_tab",
    "buffer.count.records":"10000",
    "buffer.flush.time":"60",
    "buffer.size.bytes":"5000000",
    "snowflake.url.name":"https://xxxxx-nrb47395.snowflakecomputing.com",
    "snowflake.user.name":"demo_user",
    "snowflake.database.name":"demo_db",
    "snowflake.schema.name":"demo_schema",
    "snowflake.private.key":"MIIFDjBABgkqh****",
    "snowflake.private.key.passphrase":"***"
    }
}
  • Update the following configuration values based on your setup:
snowflake.role.name → You can set this to SYSADMIN for demo purposes.
topics → This should be the Kafka topic name created in Step 5.
snowflake.topic2table.map → Map your Kafka topic name to the Iceberg table name created earlier.
snowflake.url.name → Enter the URL of your Snowflake account.
snowflake.user.name → Specify your Snowflake username.
snowflake.database.name → Use the database name created in the "Create Snowflake Managed Iceberg Table" step.
snowflake.schema.name → Use the schema name created in the "Create Snowflake Managed Iceberg Table" step.
snowflake.private.key → Copy the content of the private key generated in the "Create Snowflake Managed Iceberg Table" step.
snowflake.private.key.passphrase → Enter the passphrase for the encrypted private key file created in the same step.
  1. Execute configuration
curl -X POST -H "Content-Type: application/json" --data @<full_path>kafka_2.13-2.8.1/config/SF_connect1.json http://localhost:8083/connectors

Schema evolution and Validate data

  1. Check iceberg table by login into snowsight
select * from emp_iceberg_tab;
  • Now you will see 4 columns detected by the connector
- EMP_ID	NUMBER(19,0)
- DESIGNATION	VARCHAR(16777216)
- LAST_NAME	VARCHAR(16777216)
- FIRST_NAME	VARCHAR(16777216)
  1. Producer few more record in kafka topic
{"emp_id":102,"first_name":"Anup","last_name":"moncy","designation":"Solution Architect","company":"Snowflake"}
  1. Validate the data and schema of Iceberg table
select * from emp_iceberg_tab;
  • Now you will see 5 columns detected by the connector
EMP_ID	NUMBER(19,0)
DESIGNATION	VARCHAR(16777216)
LAST_NAME	VARCHAR(16777216)
FIRST_NAME	VARCHAR(16777216)
COMPANY	VARCHAR(16777216)

Conclusion And Resources

What You Learned

  • Local installation and configuration of Apache Kafka.
  • Configuration of the Snowflake Kafka Connector for Snowpipe Streaming in distributed mode.
  • Implementation of data ingestion and schema evolution for Iceberg tables using the Snowflake Kafka Connector.

Related Resources

Updated 2025-12-20

This content is provided as is, and is not maintained on an ongoing basis. It may be out of date with current Snowflake instances