Part 1 of this two-part post demonstrated how to build a Type 2 Slowly Changing Dimension (SCD) using Snowflake’s Stream functionality to set up a stream and insert data. Now, let’s automate the stream and have it run on a schedule. First, you’ll update some data and then manually process it. Then, you’ll delete data and set up automatic processing.

Updating Data in a Table

You’ve seen how inserts work. Now, let’s try updates.

To start, run the following transaction to update two records:

begin;
update nation
set n_comment = 'New comment for Brazil', update_timestamp = current_timestamp()::timestamp_ntz
where n_nationkey = 2;

update nation
set n_comment = 'New comment for Canada', update_timestamp = current_timestamp()::timestamp_ntz
where n_nationkey = 3;
commit;

View the data in the NATION table by running the following command:

select * from nation where n_nationkey in (1, 2,3);

In the following figure, notice the updated values in the N_COMMENT and UPDATE_TIMESTAMP columns for N_NATIONKEY 2 and N_NATIONKEY 3.

Next, let’s look at what the stream captured. Updates generate two rows in a stream.

Run the following command:

select * from nation_table_changes;

In the following figure, notice the METADATA$ISUPDATE column contains TRUE. You updated two records in the NATION table, so there are four rows in the stream.

Next, merge the stream data into the NATION_HISTORY table by executing the MERGE statement again:

-- MERGE statement that uses the CHANGE_DATA view to load data into the NATION_HISTORY table
merge into nation_history nh -- Target table to merge changes from NATION into
using nation_change_data m -- CHANGE_DATA is a view that holds the logic that determines what to insert/update into the NATION_HISTORY table.
   on nh.n_nationkey = m.n_nationkey -- n_nationkey and start_time determine whether there is a unique record in the NATION_HISTORY table
   and nh.start_time = m.start_time
when matched and m.dml_type = 'U' then update -- Indicates the record has been updated and is no longer current and the end_time needs to be stamped
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when matched and m.dml_type = 'D' then update -- Deletes are essentially logical deletes. The record is stamped and no newer version is inserted
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when not matched and m.dml_type = 'I' then insert -- Inserting a new n_nationkey and updating an existing one both result in an Insert
           (n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag)
    values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, 
m.country_code, m.start_time, m.end_time, m.current_flag);

The MERGE statement updated two records in the NATION_HISTORY table and inserted two more, as shown in the following figure:

If you view the data in the NATION_HISTORY table, you’ll now see 27 records instead of 25, as shown in the following figure:

select * from nation_history;
ΩΩ

As shown in the following figure, filtering for only the N_NATIONKEY values you updated shows how records are stamped for START_TIME, END_TIME, and CURRENT_TIME.

select * from nation_history where n_nationkey in (2,3) order by n_nationkey, start_time;

Again, because you executed a DML statement using the stream on the NATION table, the stream was purged to ensure  you don’t process the same changed data twice. To verify this, run the following command:

select * from nation_table_changes;

The following figure shows that the stream has been purged:

Creating a Task

Manually executing the MERGE command is starting to get tiresome. Luckily you don’t have to execute it manually. This is where tasks come into play.

Using a task, you can schedule the MERGE statement to run on a recurring basis and execute only if there is data in the NATION_TABLE_CHANGES stream.

If you haven’t done so already, the following are the steps you can follow to create a TASKADMIN role. As mentioned in Part 1, I am running this as SYSADMIN so the TASKADMIN role is granted to SYSADMIN below.

--Set up TASKADMIN role
use role securityadmin;
create role taskadmin;
-- Set the active role to ACCOUNTADMIN before granting the EXECUTE TASK privilege to TASKADMIN
use role accountadmin;
grant execute task on account to role taskadmin;

-- Set the active role to SECURITYADMIN to show that this role can grant a role to another role 
use role securityadmin;
grant role taskadmin to role sysadmin;

Once SYSADMIN has been granted the TASKADMIN role, you can create a task to automate the MERGE.

Tasks require a warehouse to run, so let’s create a task warehouse if one doesn’t exist:

create warehouse if not exists task_warehouse with warehouse_size = 'XSMALL' auto_suspend = 120;

Finally, run the following SQL command to create the task. You should receive no errors. This task will execute every minute and run only if the NATION_TABLE_CHANGES stream has data in it.

-- Create a task to schedule the MERGE statement
create or replace task populate_nation_history warehouse = task_warehouse schedule = '1 minute' when system$stream_has_data('nation_table_changes')
as   
merge into nation_history nh
using nation_change_data m
   on nh.n_nationkey = m.n_nationkey
   and nh.start_time = m.start_time
when matched and m.dml_type = 'U' then update
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when matched and m.dml_type = 'D' then update
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when not matched and m.dml_type = 'I' then insert
           (n_nationkey, n_name, n_regionkey, n_comment, 
country_code, start_time, end_time, current_flag)
    values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, 
m.country_code, m.start_time, m.end_time, m.current_flag);

After you have created the task, you can check its status using the following command:

show tasks;

The following figure shows the task is suspended. By default, a task is suspended when it is created.

Run the following to alter the task to resume it:

-- Resume the task
alter task populate_nation_history resume;
show tasks;

The following figure shows the task has been started.

You can query to see when the task will next run:

select timestampdiff(second, current_timestamp, scheduled_time) as next_run, scheduled_time, current_timestamp, name, state 
from table(information_schema.task_history()) where state = 'SCHEDULED' order by completed_time desc;

The following figure shows the task will run again in 32 seconds. You will probably see a different number as the results are specific to your task. The result could even be a negative number if the task has started running.

Deleting Data in a Table

So far, you’ve inserted and updated data.  You’ve also created a task to automate the process. Next up, delete data from the NATION table by running the following command:

-- Delete data
delete from nation
where n_nationkey in (3,7);

The following figure shows that two rows were deleted.

Looking at the NATION table, you can see there are only 23 records. N_NATIONKEY 3 and N_NATIONKEY 7 are no longer in the table, as shown in the following figure.

Run the following command to look at the NATION_TABLE_CHANGES stream:

select * from nation_table_changes;

The following figure shows two records were deleted:

Run the following command to see when the task to execute the MERGE will run again:

select timestampdiff(second, current_timestamp, scheduled_time) as next_run, scheduled_time, current_timestamp, name, state 
from table(information_schema.task_history()) where state = 'SCHEDULED' order by completed_time desc;

The following figure shows the task will run in 24 seconds, so let’s wait until it executes to check the NATION_HISTORY table.

When the time is up, query the NATION_TABLE_CHANGES stream to verify the data in it has been processed:

select * from nation_table_changes;

The following figure shows the stream contains no data, indicating the data has been processed:

Run the following command to query the NATION_HISTORY table to see the status of N_NATIONKEY 3 and N_NATIONKEY 7, which you deleted from the NATION table earlier:

select * from nation_history where n_nationkey in (3,7) order by n_nationkey, start_time;

The following figure shows that even though you deleted N_NATIONKEY 3 and N_NATIONKEY 7 from the NATION table and the record is no longer current (indicated by the CURRENT_FLAG column containing 0), the record wasn’t deleted from the history. The END_TIME column shows when the record was marked as deleted.

Simultaneously Inserting, Updating, and Deleting Data

So far, you’ve performed individual DML actions on the NATION table. Next, run the following command to insert Colombia (N_NATIONKEY 26), update Indonesia (N_NATIONKEY 9), and delete Saudi Arabia (N_NATIONKEY 20) all at once:

-- Insert, update, delete in one pass
begin;
insert into nation values(26, 'COLOMBIA', 1, 'New country', 'CO', current_timestamp()::timestamp_ntz);

update nation
set n_comment = 'New comment for Indonesia', update_timestamp = 
current_timestamp()::timestamp_ntz
where n_nationkey = 9;

delete from nation
where n_nationkey in (20);
commit;

Run the following command to view all the changes in the NATION_TABLE_CHANGES stream:

select * from nation_table_changes;

The following figure shows the changes:

Next, run the following command to view the data in the NATION table for N_NATIONKEY 26, N_NATIONKEY 9, and N_NATIONKEY 20:

select * from nation where n_nationkey in (26,9,20);

The following figure shows the data in the NATION table for N_NATIONKEY 9 and N_NATIONKEY 26, but there is no data for N_NATIONKEY 20, because you deleted Saudi Arabia:

Wait for the task to execute the stream and then run the following command to look at the NATION_HISTORY table:

select * from nation_history where n_nationkey in (26,9,20);

The following figure shows the history in the NATION_HISTORY table for N_NATIONKEY 9, N_NATIONKEY 20, and N_NATIONKEY 26:

Wrapping Up

That’s all it takes to set up the process for building a Type 2 SCD in Snowflake. You could also follow the same steps to build a Type 2 SCD and then modify to meet your specific needs.