Stream Data Out of CockroachDB Using Changefeeds

On this page Carat arrow pointing down
Warning:
CockroachDB v21.1 is no longer supported. For more details, see the Release Support Policy.

Change data capture (CDC) provides efficient, distributed, row-level change feeds into a configurable sink for downstream processing such as reporting, caching, or full-text indexing.

What is change data capture?

While CockroachDB is an excellent system of record, it also needs to coexist with other systems. For example, you might want to keep your data mirrored in full-text indexes, analytics engines, or big data pipelines.

The main feature of CDC is the changefeed, which targets an allowlist of tables, called the "watched rows". There are two implementations of changefeeds:

Core changefeeds Enterprise changefeeds
Useful for prototyping or quick testing. Recommended for production use.
Available in all products. Available in CockroachDB Dedicated and CockroachDB Dedicated or with an Enterprise license in CockroachDB.
Streams indefinitely until underlying SQL connection is closed. Maintains connection to configured sink.
Create with EXPERIMENTAL CHANGEFEED FOR. Create with CREATE CHANGEFEED.
Watches one or multiple tables in a comma-separated list. Emits every change to a "watched" row as a record. Watches one or multiple tables in a comma-separated list. Emits every change to a "watched" row as a record in a
configurable format (JSON or Avro) to a configurable sink (Kafka).
CREATE changefeed and cancel by closing the connection. Manage changefeed with CREATE, PAUSE, RESUME, and CANCEL, as well as monitor and debug.

Considerations

  • It is necessary to enable rangefeeds for changefeeds to work.
  • Changefeeds do not share internal buffers, so each running changefeed will increase total memory usage. To watch multiple tables, we recommend creating a changefeed with a comma-separated list of tables.
  • Many DDL queries (including TRUNCATE, DROP TABLE, and queries that add a column family) will cause errors on a changefeed watching the affected tables. You will need to start a new changefeed.
  • Partial or intermittent sink unavailability may impact changefeed stability. If a sink is unavailable, messages can't send, which means that a changefeed's high-water mark timestamp is at risk of falling behind the cluster's garbage collection window. Throughput and latency can be affected once the sink is available again. However, ordering guarantees will still hold for as long as a changefeed remains active.
  • When an IMPORT INTO statement is run, any current changefeed jobs targeting that table will fail.

Enable rangefeeds

Changefeeds connect to a long-lived request (i.e., a rangefeed), which pushes changes as they happen. This reduces the latency of row changes, as well as reduces transaction restarts on tables being watched by a changefeed for some workloads.

Rangefeeds must be enabled for a changefeed to work. To enable the cluster setting:

icon/buttons/copy
> SET CLUSTER SETTING kv.rangefeed.enabled = true;

Any created changefeed will error until this setting is enabled. Note that enabling rangefeeds currently has a small performance cost (about a 5-10% increase in latencies), whether or not the rangefeed is being used in a changefeed.

The kv.closed_timestamp.target_duration cluster setting can be used with changefeeds. Resolved timestamps will always be behind by at least this setting's duration; however, decreasing the duration leads to more transaction restarts in your cluster, which can affect performance.

Ordering guarantees

  • In most cases, each version of a row will be emitted once. However, some infrequent conditions (e.g., node failures, network partitions) will cause them to be repeated. This gives our changefeeds an at-least-once delivery guarantee.

  • Once a row has been emitted with some timestamp, no previously unseen versions of that row will be emitted with a lower timestamp. That is, you will never see a new change for that row at an earlier timestamp.

    For example, if you ran the following:

    > CREATE TABLE foo (id INT PRIMARY KEY DEFAULT unique_rowid(), name STRING);
    > CREATE CHANGEFEED FOR TABLE foo INTO 'kafka://localhost:9092' WITH UPDATED;
    > INSERT INTO foo VALUES (1, 'Carl');
    > UPDATE foo SET name = 'Petee' WHERE id = 1;
    

    You'd expect the changefeed to emit:

    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    

    It is also possible that the changefeed emits an out of order duplicate of an earlier value that you already saw:

    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    

    However, you will never see an output like the following (i.e., an out of order row that you've never seen before):

    [1] {"__crdb__": {"updated": <timestamp 2>}, "id": 1, "name": "Petee"}
    [1] {"__crdb__": {"updated": <timestamp 1>}, "id": 1, "name": "Carl"}
    
  • If a row is modified more than once in the same transaction, only the last change will be emitted.

  • Rows are sharded between Kafka partitions by the row’s primary key.

  • The UPDATED option adds an "updated" timestamp to each emitted row. You can also use the RESOLVED option to emit "resolved" timestamp messages to each Kafka partition. A "resolved" timestamp is a guarantee that no (previously unseen) rows with a lower update timestamp will be emitted on that partition.

    For example:

    {"__crdb__": {"updated": "1532377312562986715.0000000000"}, "id": 1, "name": "Petee H"}
    {"__crdb__": {"updated": "1532377306108205142.0000000000"}, "id": 2, "name": "Carl"}
    {"__crdb__": {"updated": "1532377358501715562.0000000000"}, "id": 3, "name": "Ernie"}
    {"__crdb__":{"resolved":"1532379887442299001.0000000000"}}
    {"__crdb__":{"resolved":"1532379888444290910.0000000000"}}
    {"__crdb__":{"resolved":"1532379889448662988.0000000000"}}
    ...
    {"__crdb__":{"resolved":"1532379922512859361.0000000000"}}
    {"__crdb__": {"updated": "1532379923319195777.0000000000"}, "id": 4, "name": "Lucky"}
    
  • With duplicates removed, an individual row is emitted in the same order as the transactions that updated it. However, this is not true for updates to two different rows, even two rows in the same table.

    To compare two different rows for happens-before, compare the "updated" timestamp. This works across anything in the same cluster (e.g., tables, nodes, etc.).

    Resolved timestamp notifications on every Kafka partition can be used to provide strong ordering and global consistency guarantees by buffering records in between timestamp closures. Use the "resolved" timestamp to see every row that changed at a certain time.

    The complexity with timestamps is necessary because CockroachDB supports transactions that can affect any part of the cluster, and it is not possible to horizontally divide the transaction log into independent changefeeds. For more information about this, read our blog post on CDC.

Delete messages

Deleting a row will result in a changefeed outputting the primary key of the deleted row and a null value. For example, with default options, deleting the row with primary key 5 will output:

[5] {"after": null}

In some unusual situations you may receive a delete message for a row without first seeing an insert message. For example, if an attempt is made to delete a row that does not exist, you may or may not get a delete message because the changefeed behavior is undefined to allow for optimizations at the storage layer. Similarly, if there are multiple writes to a row within a single transaction, only the last one will propagate to a changefeed. This means that creating and deleting a row within the same transaction will never result in an insert message, but may result in a delete message.

Avro schema changes

To ensure that the Avro schemas that CockroachDB publishes will work with the schema compatibility rules used by the Confluent schema registry, CockroachDB emits all fields in Avro as nullable unions. This ensures that Avro and Confluent consider the schemas to be both backward- and forward-compatible, since the Confluent Schema Registry has a different set of rules than Avro for schemas to be backward- and forward-compatible.

Note that the original CockroachDB column definition is also included in the schema as a doc field, so it's still possible to distinguish between a NOT NULL CockroachDB column and a NULL CockroachDB column.

Schema changes with column backfill

When schema changes with column backfill (e.g., adding a column with a default, adding a computed column, adding a NOT NULL column, dropping a column) are made to watched rows, the changefeed will emit some duplicates during the backfill. When it finishes, CockroachDB outputs all watched rows using the new schema. When using Avro, rows that have been backfilled by a schema change are always re-emitted.

For an example of a schema change with column backfill, start with the changefeed created in the example below:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}

Add a column to the watched table:

icon/buttons/copy
> ALTER TABLE office_dogs ADD COLUMN likes_treats BOOL DEFAULT TRUE;

The changefeed emits duplicate records 1, 2, and 3 before outputting the records using the new schema:

[1] {"id": 1, "name": "Petee H"}
[2] {"id": 2, "name": "Carl"}
[3] {"id": 3, "name": "Ernie"}
[1] {"id": 1, "name": "Petee H"}  # Duplicate
[2] {"id": 2, "name": "Carl"}     # Duplicate
[3] {"id": 3, "name": "Ernie"}    # Duplicate
[1] {"id": 1, "likes_treats": true, "name": "Petee H"}
[2] {"id": 2, "likes_treats": true, "name": "Carl"}
[3] {"id": 3, "likes_treats": true, "name": "Ernie"}

When using the schema_change_policy = nobackfill option, the changefeed will still emit duplicate records for the table that is being altered. In the preceding output, the records marked as # Duplicate will still emit with this option, but not the new schema records.

Note:

Changefeeds will emit NULL values for VIRTUAL computed columns and not the column's computed value.

Changefeeds on regional by row tables

Changefeeds are supported on regional by row tables. When working with changefeeds on regional by row tables, it is necessary to consider the following:

Note:

If the schema_change_policy changefeed option is configured to stop, the backfill will cause the changefeed to fail.

  • Setting a table to REGIONAL BY ROW will have an impact on the changefeed's output as a result of the schema change. The backfill and future updated or inserted rows will emit output that includes the newly added crdb_region column as part of the schema. Therefore, it is necessary to ensure that programs consuming the changefeed can manage the new format of the primary keys.

  • Changing a row's region will appear as an insert and delete in the emitted changefeed output. For example, in the following output in which the region has been updated to us-east1, the insert messages are emitted followed by the delete messages:

    . . .
    {"after": {"city": "washington dc", "crdb_region": "us-east1", "creation_time": "2019-01-02T03:04:05", "current_location": "52372 Katherine Plains", "ext": {"color": "black"}, "id": "54a69217-35ee-4000-8000-0000000001f0", "owner_id": "3dcc63f1-4120-4c00-8000-0000000004b7", "status": "in_use", "type": "scooter"}, "updated": "1632241564629087669.0000000000"}
    {"after": {"city": "washington dc", "crdb_region": "us-east1", "creation_time": "2019-01-02T03:04:05", "current_location": "75024 Patrick Bridge", "ext": {"color": "black"}, "id": "54d242e6-bdc8-4400-8000-0000000001f1", "owner_id": "3ab9f559-b3d0-4c00-8000-00000000047b", "status": "in_use", "type": "scooter"}, "updated": "1632241564629087669.0000000000"}
    {"after": {"city": "washington dc", "crdb_region": "us-east1", "creation_time": "2019-01-02T03:04:05", "current_location": "45597 Jackson Inlet", "ext": {"brand": "Schwinn", "color": "red"}, "id": "54fdf3b6-45a1-4c00-8000-0000000001f2", "owner_id": "4339c0eb-edfa-4400-8000-000000000521", "status": "in_use", "type": "bike"}, "updated": "1632241564629087669.0000000000"}
    {"after": {"city": "washington dc", "crdb_region": "us-east1", "creation_time": "2019-01-02T03:04:05", "current_location": "18336 Katherine Port", "ext": {"color": "yellow"}, "id": "5529a485-cd7b-4000-8000-0000000001f3", "owner_id": "452bd3c3-6113-4000-8000-000000000547", "status": "in_use", "type": "scooter"}, "updated": "1632241564629087669.0000000000"}
    {"after": null, "updated": "1632241564629087669.0000000000"}
    {"after": null, "updated": "1632241564629087669.0000000000"}
    {"after": null, "updated": "1632241564629087669.0000000000"}
    {"after": null, "updated": "1632241564629087669.0000000000"}
    . . .
    

See the changefeed responses section for more general information on the messages emitted from a changefeed.

Create a changefeed (Core)

A core changefeed streams row-level changes to the client indefinitely until the underlying connection is closed or the changefeed is canceled.

To create a core changefeed:

icon/buttons/copy
> EXPERIMENTAL CHANGEFEED FOR name;

For more information, see CHANGEFEED FOR.

Configure a changefeed (Enterprise)

An Enterprise changefeed streams row-level changes in a configurable format to a configurable sink (i.e., Kafka or a cloud storage sink). You can create, pause, resume, cancel, monitor, and debug an Enterprise changefeed.

Create

To create an Enterprise changefeed:

icon/buttons/copy
> CREATE CHANGEFEED FOR TABLE table_name, table_name2 INTO '{scheme}://{host}:{port}?{query_parameters}';
Note:

Parameters should always be URI-encoded before they are included the changefeed's URI, as they often contain special characters. Use Javascript's encodeURIComponent function or Go language's url.QueryEscape function to URI-encode the parameters. Other languages provide similar functions to URI-encode special characters.

For more information, see CREATE CHANGEFEED.

Pause

To pause an Enterprise changefeed:

icon/buttons/copy
> PAUSE JOB job_id;

For more information, see PAUSE JOB.

Resume

To resume a paused Enterprise changefeed:

icon/buttons/copy
> RESUME JOB job_id;

For more information, see RESUME JOB.

Cancel

To cancel an Enterprise changefeed:

icon/buttons/copy
> CANCEL JOB job_id;

For more information, see CANCEL JOB.

Configuring all changefeeds

It is useful to be able to pause all running changefeeds during troubleshooting, testing, or when a decrease in CPU load is needed.

To pause all running changefeeds:

icon/buttons/copy
PAUSE JOBS (SELECT job_id FROM [SHOW JOBS] WHERE job_type='CHANGEFEED' AND status IN ('running'));

This will change the status for each of the running changefeeds to paused, which can be verified with SHOW JOBS.

To resume all running changefeeds:

icon/buttons/copy
RESUME JOBS (SELECT job_id FROM [SHOW JOBS] WHERE job_type='CHANGEFEED' AND status IN ('paused'));

This will resume the changefeeds and update the status for each of the changefeeds to running.

Monitor a changefeed

Note:

Monitoring is only available for Enterprise changefeeds.

Changefeed progress is exposed as a high-water timestamp that advances as the changefeed progresses. This is a guarantee that all changes before or at the timestamp have been emitted. You can monitor a changefeed:

  • On the Changefeed Dashboard of the DB Console.
  • On the Jobs page of the DB Console. Hover over the high-water timestamp to view the system time.
  • Using crdb_internal.jobs:

    icon/buttons/copy
    > SELECT * FROM crdb_internal.jobs WHERE job_id = <job_id>;
    
            job_id       |  job_type  |                              description                               | ... |      high_water_timestamp      | error | coordinator_id
    +--------------------+------------+------------------------------------------------------------------------+ ... +--------------------------------+-------+----------------+
      383870400694353921 | CHANGEFEED | CREATE CHANGEFEED FOR TABLE office_dogs INTO 'kafka://localhost:9092' | ... | 1537279405671006870.0000000000 |       |              1
    (1 row)
    
  • Setting up an alert on the changefeed.max_behind_nanos metric to track when a changefeed's high-water mark timestamp is at risk of falling behind the cluster's garbage collection window. For more information, see Monitoring and Alerting.

Note:

You can use the high-water timestamp to start a new changefeed where another ended.

Debug a changefeed

Using logs

For Enterprise changefeeds, use log information to debug connection issues (i.e., kafka: client has run out of available brokers to talk to (Is your cluster reachable?)). Debug by looking for lines in the logs with [kafka-producer] in them:

I190312 18:56:53.535646 585 vendor/github.com/Shopify/sarama/client.go:123  [kafka-producer] Initializing new client
I190312 18:56:53.535714 585 vendor/github.com/Shopify/sarama/client.go:724  [kafka-producer] client/metadata fetching metadata for all topics from broker localhost:9092
I190312 18:56:53.536730 569 vendor/github.com/Shopify/sarama/broker.go:148  [kafka-producer] Connected to broker at localhost:9092 (unregistered)
I190312 18:56:53.537661 585 vendor/github.com/Shopify/sarama/client.go:500  [kafka-producer] client/brokers registered new broker #0 at 172.16.94.87:9092
I190312 18:56:53.537686 585 vendor/github.com/Shopify/sarama/client.go:170  [kafka-producer] Successfully initialized new client

Using SHOW JOBS

For Enterprise changefeeds, you can check the status by using:

icon/buttons/copy
SELECT * FROM [SHOW JOBS] WHERE job_type='CHANGEFEED';

Or:

icon/buttons/copy
SELECT * from crdb_internal.jobs WHERE job_type='CHANGEFEED';

For more information, see SHOW JOBS.

Using the DB Console

On the Custom Chart debug page of the DB Console:

  1. To add a chart, click Add Chart.
  2. Select changefeed.error_retries from the Metric Name dropdown menu.

    A graph of changefeed restarts due to retryable errors will display.

Usage examples

Create a core changefeed

In this example, you'll set up a core changefeed for a single-node cluster.

  1. In a terminal window, start cockroach:

    icon/buttons/copy
    $ cockroach start-single-node \
    --insecure \
    --listen-addr=localhost \
    --background
    
  2. As the root user, open the built-in SQL client:

    icon/buttons/copy
    $ cockroach sql \
    --url="postgresql://root@127.0.0.1:26257?sslmode=disable" \
    --format=csv
    
    Note:

    Because core changefeeds return results differently than other SQL statements, they require a dedicated database connection with specific settings around result buffering. In normal operation, CockroachDB improves performance by buffering results server-side before returning them to a client; however, result buffering is automatically turned off for core changefeeds. Core changefeeds also have different cancellation behavior than other queries: they can only be canceled by closing the underlying connection or issuing a CANCEL QUERY statement on a separate connection. Combined, these attributes of changefeeds mean that applications should explicitly create dedicated connections to consume changefeed data, instead of using a connection pool as most client drivers do by default.

    Note:

    To determine how wide the columns need to be, the default table display format in cockroach sql buffers the results it receives from the server before printing them to the console. When consuming core changefeed data using cockroach sql, it's important to use a display format like csv that does not buffer its results. To set the display format, use the --format=csv flag when starting the built-in SQL client, or set the \set display_format=csv option once the SQL client is open.

  3. Enable the kv.rangefeed.enabled cluster setting:

    icon/buttons/copy
    > SET CLUSTER SETTING kv.rangefeed.enabled = true;
    
  4. Create table foo:

    icon/buttons/copy
    > CREATE TABLE foo (a INT PRIMARY KEY);
    
  5. Insert a row into the table:

    icon/buttons/copy
    > INSERT INTO foo VALUES (0);
    
  6. Start the core changefeed:

    icon/buttons/copy
    > EXPERIMENTAL CHANGEFEED FOR foo;
    
    table,key,value
    foo,[0],"{""after"": {""a"": 0}}"
    
  7. In a new terminal, add another row:

    icon/buttons/copy
    $ cockroach sql --insecure -e "INSERT INTO foo VALUES (1)"
    
  8. Back in the terminal where the core changefeed is streaming, the following output has appeared:

    foo,[1],"{""after"": {""a"": 1}}"
    

    Note that records may take a couple of seconds to display in the core changefeed.

  9. To stop streaming the changefeed, enter CTRL+C into the terminal where the changefeed is running.

  10. To stop cockroach, run:

    icon/buttons/copy
    $ cockroach quit --insecure
    

Create a core changefeed using Avro

In this example, you'll set up a core changefeed for a single-node cluster that emits Avro records. CockroachDB's Avro binary encoding convention uses the Confluent Schema Registry to store Avro schemas.

  1. Use the cockroach start-single-node command to start a single-node cluster:

    icon/buttons/copy
    $ cockroach start-single-node \
    --insecure \
    --listen-addr=localhost \
    --background
    
  2. Download and extract the Confluent Open Source platform.

  3. Move into the extracted confluent-<version> directory and start Confluent:

    icon/buttons/copy
    $ ./bin/confluent local services start
    

    Only zookeeper, kafka, and schema-registry are needed. To troubleshoot Confluent, see their docs and the Quick Start Guide.

  4. As the root user, open the built-in SQL client:

    icon/buttons/copy
    $ cockroach sql --url="postgresql://root@127.0.0.1:26257?sslmode=disable" --format=csv
    
    Note:

    Because core changefeeds return results differently than other SQL statements, they require a dedicated database connection with specific settings around result buffering. In normal operation, CockroachDB improves performance by buffering results server-side before returning them to a client; however, result buffering is automatically turned off for core changefeeds. Core changefeeds also have different cancellation behavior than other queries: they can only be canceled by closing the underlying connection or issuing a CANCEL QUERY statement on a separate connection. Combined, these attributes of changefeeds mean that applications should explicitly create dedicated connections to consume changefeed data, instead of using a connection pool as most client drivers do by default.

    Note:

    To determine how wide the columns need to be, the default table display format in cockroach sql buffers the results it receives from the server before printing them to the console. When consuming core changefeed data using cockroach sql, it's important to use a display format like csv that does not buffer its results. To set the display format, use the --format=csv flag when starting the built-in SQL client, or set the \set display_format=csv option once the SQL client is open.

  5. Enable the kv.rangefeed.enabled cluster setting:

    icon/buttons/copy
    > SET CLUSTER SETTING kv.rangefeed.enabled = true;
    
  6. Create table bar:

    icon/buttons/copy
    > CREATE TABLE bar (a INT PRIMARY KEY);
    
  7. Insert a row into the table:

    icon/buttons/copy
    > INSERT INTO bar VALUES (0);
    
  8. Start the core changefeed:

    icon/buttons/copy
    > EXPERIMENTAL CHANGEFEED FOR bar WITH format = experimental_avro, confluent_schema_registry = 'http://localhost:8081';
    
    table,key,value
    bar,\000\000\000\000\001\002\000,\000\000\000\000\002\002\002\000
    
  9. In a new terminal, add another row:

    icon/buttons/copy
    $ cockroach sql --insecure -e "INSERT INTO bar VALUES (1)"
    
  10. Back in the terminal where the core changefeed is streaming, the output will appear:

    bar,\000\000\000\000\001\002\002,\000\000\000\000\002\002\002\002
    

    Note that records may take a couple of seconds to display in the core changefeed.

  11. To stop streaming the changefeed, enter CTRL+C into the terminal where the changefeed is running.

  12. To stop cockroach, run:

    icon/buttons/copy
    $ cockroach quit --insecure
    
  13. To stop Confluent, move into the extracted confluent-<version> directory and stop Confluent:

    icon/buttons/copy
    $ ./bin/confluent local services stop
    

    To terminate all Confluent processes, use:

    icon/buttons/copy
    $ ./bin/confluent local destroy
    

Create a changefeed connected to Kafka

Note:

CREATE CHANGEFEED is an Enterprise-only feature. For the core version, see the CHANGEFEED FOR example above.

In this example, you'll set up a changefeed for a single-node cluster that is connected to a Kafka sink. The changefeed will watch two tables.

  1. If you do not already have one, request a trial Enterprise license.

  2. Use the cockroach start-single-node command to start a single-node cluster:

    icon/buttons/copy
    $ cockroach start-single-node --insecure --listen-addr=localhost --background
    
  3. Download and extract the Confluent Open Source platform (which includes Kafka).

  4. Move into the extracted confluent-<version> directory and start Confluent:

    icon/buttons/copy
    $ ./bin/confluent local services start
    

    Only zookeeper and kafka are needed. To troubleshoot Confluent, see their docs and the Quick Start Guide.

  5. Create two Kafka topics:

    icon/buttons/copy
    $ ./bin/kafka-topics \
    --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic office_dogs
    
    icon/buttons/copy
    $ ./bin/kafka-topics \
    --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic employees
    
    Note:

    You are expected to create any Kafka topics with the necessary number of replications and partitions. Topics can be created manually or Kafka brokers can be configured to automatically create topics with a default partition count and replication factor.

  6. As the root user, open the built-in SQL client:

    icon/buttons/copy
    $ cockroach sql --insecure
    
  7. Set your organization name and Enterprise license key that you received via email:

    icon/buttons/copy
    > SET CLUSTER SETTING cluster.organization = '<organization name>';
    
    icon/buttons/copy
    > SET CLUSTER SETTING enterprise.license = '<secret>';
    
  8. Enable the kv.rangefeed.enabled cluster setting:

    icon/buttons/copy
    > SET CLUSTER SETTING kv.rangefeed.enabled = true;
    
  9. Create a database called cdc_demo:

    icon/buttons/copy
    > CREATE DATABASE cdc_demo;
    
  10. Set the database as the default:

    icon/buttons/copy
    > SET DATABASE = cdc_demo;
    
  11. Create a table and add data:

    icon/buttons/copy
    > CREATE TABLE office_dogs (
         id INT PRIMARY KEY,
         name STRING);
    
    icon/buttons/copy
    > INSERT INTO office_dogs VALUES
       (1, 'Petee'),
       (2, 'Carl');
    
    icon/buttons/copy
    > UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
    
  12. Create another table and add data:

    icon/buttons/copy
    > CREATE TABLE employees (
         dog_id INT REFERENCES office_dogs (id),
         employee_name STRING);
    
    icon/buttons/copy
    > INSERT INTO employees VALUES
       (1, 'Lauren'),
       (2, 'Spencer');
    
  13. Start the changefeed:

    icon/buttons/copy
    > CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'kafka://localhost:9092';
    
    
            job_id       
    +--------------------+
      360645287206223873
    (1 row)
    

    This will start up the changefeed in the background and return the job_id. The changefeed writes to Kafka.

  14. In a new terminal, move into the extracted confluent-<version> directory and start watching the Kafka topics:

    icon/buttons/copy
    $ ./bin/kafka-console-consumer \
    --bootstrap-server=localhost:9092 \
    --from-beginning \
    --whitelist 'office_dogs|employees'
    
    {"after": {"id": 1, "name": "Petee H"}}
    {"after": {"id": 2, "name": "Carl"}}
    {"after": {"id": 1, "name": "Lauren", "rowid": 528514320239329281}}
    {"after": {"id": 2, "name": "Spencer", "rowid": 528514320239362049}}
    

    The initial scan displays the state of the tables as of when the changefeed started (therefore, the initial value of "Petee" is omitted).

    Note:

    This example only prints the value. To print both the key and value of each message in the changefeed (e.g., to observe what happens with DELETEs), use the --property print.key=true flag.

  15. Back in the SQL client, insert more data:

    icon/buttons/copy
    > INSERT INTO office_dogs VALUES (3, 'Ernie');
    
  16. Back in the terminal where you're watching the Kafka topics, the following output has appeared:

    {"after": {"id": 3, "name": "Ernie"}}
    
  17. When you are done, exit the SQL shell (\q).

  18. To stop cockroach, run:

    icon/buttons/copy
    $ cockroach quit --insecure
    
  19. To stop Kafka, move into the extracted confluent-<version> directory and stop Confluent:

    icon/buttons/copy
    $ ./bin/confluent local services stop
    

Create a changefeed connected to Kafka using Avro

Note:

CREATE CHANGEFEED is an Enterprise-only feature. For the core version, see the CHANGEFEED FOR example above.

In this example, you'll set up a changefeed for a single-node cluster that is connected to a Kafka sink and emits Avro records. The changefeed will watch two tables.

  1. If you do not already have one, request a trial Enterprise license.

  2. Use the cockroach start-single-node command to start a single-node cluster:

    icon/buttons/copy
    $ cockroach start-single-node --insecure --listen-addr=localhost --background
    
  3. Download and extract the Confluent Open Source platform (which includes Kafka).

  4. Move into the extracted confluent-<version> directory and start Confluent:

    icon/buttons/copy
    $ ./bin/confluent local services start
    

    Only zookeeper, kafka, and schema-registry are needed. To troubleshoot Confluent, see their docs and the Quick Start Guide.

  5. Create two Kafka topics:

    icon/buttons/copy
    $ ./bin/kafka-topics \
    --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic office_dogs
    
    icon/buttons/copy
    $ ./bin/kafka-topics \
    --create \
    --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic employees
    
    Note:

    You are expected to create any Kafka topics with the necessary number of replications and partitions. Topics can be created manually or Kafka brokers can be configured to automatically create topics with a default partition count and replication factor.

  6. As the root user, open the built-in SQL client:

    icon/buttons/copy
    $ cockroach sql --insecure
    
  7. Set your organization name and Enterprise license key that you received via email:

    icon/buttons/copy
    > SET CLUSTER SETTING cluster.organization = '<organization name>';
    
    icon/buttons/copy
    > SET CLUSTER SETTING enterprise.license = '<secret>';
    
  8. Enable the kv.rangefeed.enabled cluster setting:

    icon/buttons/copy
    > SET CLUSTER SETTING kv.rangefeed.enabled = true;
    
  9. Create a database called cdc_demo:

    icon/buttons/copy
    > CREATE DATABASE cdc_demo;
    
  10. Set the database as the default:

    icon/buttons/copy
    > SET DATABASE = cdc_demo;
    
  11. Create a table and add data:

    icon/buttons/copy
    > CREATE TABLE office_dogs (
         id INT PRIMARY KEY,
         name STRING);
    
    icon/buttons/copy
    > INSERT INTO office_dogs VALUES
       (1, 'Petee'),
       (2, 'Carl');
    
    icon/buttons/copy
    > UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
    
  12. Create another table and add data:

    icon/buttons/copy
    > CREATE TABLE employees (
         dog_id INT REFERENCES office_dogs_avro (id),
         employee_name STRING);
    
    icon/buttons/copy
    > INSERT INTO employees VALUES
       (1, 'Lauren'),
       (2, 'Spencer');
    
  13. Start the changefeed:

    icon/buttons/copy
    > CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'kafka://localhost:9092' WITH format = experimental_avro, confluent_schema_registry = 'http://localhost:8081';
    
            job_id       
    +--------------------+
      360645287206223873
    (1 row)
    

    This will start up the changefeed in the background and return the job_id. The changefeed writes to Kafka.

  14. In a new terminal, move into the extracted confluent-<version> directory and start watching the Kafka topics:

    icon/buttons/copy
    $ ./bin/kafka-avro-console-consumer \
    --bootstrap-server=localhost:9092 \
    --from-beginning \
    --whitelist 'office_dogs|employees'
    
    {"after":{"office_dogs":{"id":{"long":1},"name":{"string":"Petee H"}}}}
    {"after":{"office_dogs":{"id":{"long":2},"name":{"string":"Carl"}}}}
    {"after":{"employees":{"dog_id":{"long":1},"employee_name":{"string":"Lauren"},"rowid":{"long":528537452042682369}}}}
    {"after":{"employees":{"dog_id":{"long":2},"employee_name":{"string":"Spencer"},"rowid":{"long":528537452042747905}}}}
    

    The initial scan displays the state of the table as of when the changefeed started (therefore, the initial value of "Petee" is omitted).

    Note:

    This example only prints the value. To print both the key and value of each message in the changefeed (e.g., to observe what happens with DELETEs), use the --property print.key=true flag.

  15. Back in the SQL client, insert more data:

    icon/buttons/copy
    > INSERT INTO office_dogs VALUES (3, 'Ernie');
    
  16. Back in the terminal where you're watching the Kafka topics, the following output has appeared:

    {"after":{"office_dogs":{"id":{"long":3},"name":{"string":"Ernie"}}}}
    
  17. When you are done, exit the SQL shell (\q).

  18. To stop cockroach, run:

    icon/buttons/copy
    $ cockroach quit --insecure
    
  19. To stop Kafka, move into the extracted confluent-<version> directory and stop Confluent:

    icon/buttons/copy
    $ ./bin/confluent local services stop
    

Create a changefeed connected to a cloud storage sink

Note:

CREATE CHANGEFEED is an Enterprise-only feature. For the core version, see the CHANGEFEED FOR example above.

Warning:

This is an experimental feature. The interface and output are subject to change.

In this example, you'll set up a changefeed for a single-node cluster that is connected to an AWS S3 sink. The changefeed watches two tables. Note that you can set up changefeeds for any of these cloud storage providers.

  1. If you do not already have one, request a trial Enterprise license.

  2. Use the cockroach start-single-node command to start a single-node cluster:

    icon/buttons/copy
    $ cockroach start-single-node --insecure --listen-addr=localhost --background
    
  3. As the root user, open the built-in SQL client:

    icon/buttons/copy
    $ cockroach sql --insecure
    
  4. Set your organization name and Enterprise license key that you received via email:

    icon/buttons/copy
    > SET CLUSTER SETTING cluster.organization = '<organization name>';
    
    icon/buttons/copy
    > SET CLUSTER SETTING enterprise.license = '<secret>';
    
  5. Enable the kv.rangefeed.enabled cluster setting:

    icon/buttons/copy
    > SET CLUSTER SETTING kv.rangefeed.enabled = true;
    
  6. Create a database called cdc_demo:

    icon/buttons/copy
    > CREATE DATABASE cdc_demo;
    
  7. Set the database as the default:

    icon/buttons/copy
    > SET DATABASE = cdc_demo;
    
  8. Create a table and add data:

    icon/buttons/copy
    > CREATE TABLE office_dogs (
         id INT PRIMARY KEY,
         name STRING);
    
    icon/buttons/copy
    > INSERT INTO office_dogs VALUES
       (1, 'Petee'),
       (2, 'Carl');
    
    icon/buttons/copy
    > UPDATE office_dogs SET name = 'Petee H' WHERE id = 1;
    
  9. Create another table and add data:

    icon/buttons/copy
    > CREATE TABLE employees (
         dog_id INT REFERENCES office_dogs (id),
         employee_name STRING);
    
    icon/buttons/copy
    > INSERT INTO employees VALUES
       (1, 'Lauren'),
       (2, 'Spencer');
    
  10. Start the changefeed:

    icon/buttons/copy
    > CREATE CHANGEFEED FOR TABLE office_dogs, employees INTO 'experimental-s3://example-bucket-name/test?AWS_ACCESS_KEY_ID=enter_key-here&AWS_SECRET_ACCESS_KEY=enter_key_here' with updated, resolved='10s';
    
            job_id       
    +--------------------+
      360645287206223873
    (1 row)
    

    This will start up the changefeed in the background and return the job_id. The changefeed writes to AWS.

  11. Monitor your changefeed on the DB Console. For more information, see Changefeeds Dashboard.

  12. When you are done, exit the SQL shell (\q).

  13. To stop cockroach, run:

    icon/buttons/copy
    $ cockroach quit --insecure
    

Known limitations

See also


Yes No
On this page

Yes No