Stream a Changefeed to Snowflake

On this page Carat arrow pointing down

While CockroachDB is an excellent system of record, it also needs to coexist with other systems. For example, you might want to keep your data mirrored in full-text indexes, analytics engines, or big data pipelines.

This page demonstrates how to use an Enterprise changefeed to stream row-level changes to Snowflake, an online analytical processing (OLAP) database.

Note:

Snowflake is optimized for inserts and batch rewrites over streaming updates. This tutorial sets up a changefeed to stream data to S3 with Snowpipe sending changes to Snowflake. Snowpipe imports previously unseen files and does not address uniqueness for primary keys, which means that target tables in Snowflake can contain multiple records per primary key.

This tutorial focuses on inserts into Snowflake. However, for some workarounds to address multiple records on the same primary key in Snowflake tables, refer to Remove multiple records from Snowflake target tables.

Before you begin

Before you begin, make sure you have:

Step 1. Create a cluster

If you have not done so already, create a cluster.

Step 2. Configure your cluster

  1. Connect to the built-in SQL shell as a user with admin privileges, replacing the placeholders in the client connection string with the correct username, password, and path to the ca.cert:

    icon/buttons/copy
    cockroach sql \
    --url='postgres://<username>:<password>@<global host>:26257?sslmode=verify-full&sslrootcert=certs/ca.crt'
    
    Note:

    For more information on connecting to your cluster, refer to Connect to your CockroachDB Dedicated Cluster or Connect to your CockroachDB Serverless Cluster.

  2. Enable rangefeeds. Note that rangefeeds are enabled by default on Serverless clusters:

    icon/buttons/copy
    SET CLUSTER SETTING kv.rangefeed.enabled = true;
    

Step 3. Create a database

  1. In the built-in SQL shell, create a database called cdc_test:

    icon/buttons/copy
    CREATE DATABASE cdc_test;
    
  2. Set it as the default:

    icon/buttons/copy
    SET DATABASE = cdc_test;
    

Step 4. Create tables

Before you can start a changefeed, you need to create at least one table for the changefeed to target. The targeted table's rows are referred to as the "watched rows".

Create a table called order_alerts to target:

icon/buttons/copy
CREATE TABLE order_alerts (
    id   INT PRIMARY KEY,
    name STRING
);

Step 5. Create an S3 bucket in the AWS Console

Every change to a watched row is emitted as a record in a configurable format (i.e., JSON for cloud storage sinks). To configure an AWS S3 bucket as the cloud storage sink:

  1. Log in to your AWS S3 Console.

  2. Create an S3 bucket where streaming updates from the watched tables will be collected.

    You will need the name of the S3 bucket when you create your changefeed. Ensure you have a set of IAM credentials with write access on the S3 bucket that you will use during changefeed setup.

Step 6. Create an enterprise changefeed

Back in the built-in SQL shell, create an enterprise changefeed. Replace the placeholders with your AWS access key ID and AWS secret access key:

icon/buttons/copy
CREATE CHANGEFEED FOR TABLE order_alerts
    INTO 's3://{bucket name}?AWS_ACCESS_KEY_ID={access key ID}&AWS_SECRET_ACCESS_KEY={secret access key}'
    WITH
        updated;

Refer to the Cloud Storage Authentication page for more detail on authenticating to Amazon S3 and other cloud providers.

        job_id
+--------------------+
  000000000000000000
(1 row)

You will receive the changefeed's job ID that you can use to manage the changefeed if needed.

Step 7. Insert data into the tables

  1. In the built-in SQL shell, insert data into the order_alerts table that the changefeed is targeting:

    icon/buttons/copy
    INSERT INTO order_alerts
        VALUES
            (1, 'Order received'),
            (2, 'Order processed');
    
  2. Navigate back to the S3 bucket to confirm that the data is now streaming to the bucket. A new date-based directory should display on the Objects tab.

    Note:

    If your changefeed is running but data is not displaying in your S3 bucket, you might have to debug your changefeed.

Step 8. Configure Snowflake

  1. Log in to Snowflake as a user with read and write access to a cluster.

  2. Navigate to the Worksheets page and select a worksheet.

  3. Create a table to store the data to be ingested:

    icon/buttons/copy
    CREATE TABLE order_alerts (
       changefeed_record VARIANT
      );
    

    This will store all of the data in a single VARIANT column as JSON. You can then access this field with valid JSON and query the column as if it were a table.

  4. Run the statement.

  5. In the worksheet, create a stage called cdc-stage, which tells Snowflake where your data files reside in S3. Replace the placeholders with your AWS access key ID and AWS secret access key:

    icon/buttons/copy
    CREATE STAGE cdc_stage url='s3://changefeed-example/' credentials=(aws_key_id='<KEY>' aws_secret_key='<SECRET_KEY>') file_format = (type = json);
    
  6. In the worksheet, create a Snowpipe called cdc-pipe, which tells Snowflake to auto-ingest data:

    icon/buttons/copy
    CREATE PIPE cdc_pipe auto_ingest = TRUE as COPY INTO order_alerts FROM @cdc_stage;
    
    Note:

    Auto-ingest in Snowflake works with AWS, Azure, and Google Cloud Storage.

  7. In the worksheet, view the Snowpipe:

    icon/buttons/copy
    SHOW PIPES;
    
  8. Copy the ARN of the SQS queue for your stage, which displays in the notification_channel column. You will use this information to configure the S3 bucket.

Step 9. Configure the S3 bucket

  1. Navigate back to your S3 bucket.

  2. Configure an event notification for the S3 bucket. In the Properties tab, click Create event notification. Use the following parameters:

    • Event name: Name of the event notification (e.g., Auto-ingest Snowflake).
    • Event types: Select the All object create events.
    • Destination: Select SQS Queue.
    • Specify SQS queue: Select Enter SQS queue ARN from the drop-down.
    • SQS queue ARN: Paste the SQS queue name from the SHOW PIPES output (from Step 8).
  3. Navigate back to Snowflake.

  4. Ingest the data from your stage:

    icon/buttons/copy
    ALTER PIPE cdc_pipe refresh;
    
  5. To view the data in Snowflake, query the order_alerts table:

    icon/buttons/copy
    SELECT * FROM order_alerts;
    

    The ingested rows will display in the Results panel. It may take a few minutes for the data to load into the Snowflake cluster. To check on the progress of data being written to the destination table, refer to Snowflake's documentation on querying a stage.

Your changefeed is now streaming to Snowflake.

Remove multiple records from Snowflake target tables

In some cases, you may need to remove multiple records for the same primary key caused by updates to a row. Although it is not possible to remove duplicates with Snowpipe before they arrive at the target table, Snowflake offers some features that you can use to automate removing multiple records from the table downstream.

The following points outline two potential workarounds. For detailed instructions on each feature, we recommend reading the linked Snowflake documentation:

  • Use Snowflake streams and tasks to write into a new de-duplicated table.

    • Set up a stream that will watch for changes on the table that Snowpipe uploads into. The stream will create a snapshot of the changes at a specific point in time.
    • Create the new table in Snowflake that will hold the de-duplicated entries using the stream's METADATA$ACTION column.
    • Create a task to run a SQL statement that will pull data from the stream and merge it into a new table for the "unique" entries. You can set this task to run when there are new records in the stream and by a cron job schedule.

    Refer to Snowflake's examples on creating a stream.

  • Use Snowflake materialized views to maintain a de-duplicated table.

    • Create a materialized view that includes a selection query partitioning on the primary key with the Snowflake QUALIFY command.

    For example, in your materialized view statement, query the required columns and partition, rank, and select only the first row for each primary key:

    icon/buttons/copy
    SELECT * FROM order_alerts QUALIFY row_number() OVER (PARTITION BY id ORDER BY modified DESC) = 1;
    
    Note:

    Materialized views are an Enterprise feature in Snowflake.

Known limitations

  • Snowflake cannot filter streaming updates by table. Because of this, we recommend creating a changefeed that watches only one table.
  • Snowpipe is unaware of CockroachDB resolved timestamps. This means CockroachDB transactions will not be loaded atomically and partial transactions can briefly be returned from Snowflake.
  • Snowpipe works best with append-only workloads, as Snowpipe lacks native ETL capabilities to perform updates to data. You may need to pre-process data before uploading it to Snowflake.

Refer to the Create and Configure Changefeeds page for more general changefeed known limitations.


Yes No
On this page

Yes No