Stream a Changefeed to a Confluent Cloud Kafka Cluster

On this page Carat arrow pointing down

CockroachDB Enterprise changefeeds can stream change data out to Apache Kafka with different configuration settings and options. Confluent Cloud provides a fully managed service for running Apache Kafka as well as the Confluent Cloud Schema Registry.

A schema registry is a repository for schemas, which allows you to share and manage schemas between different services. Confluent Cloud Schema Registries map to Kafka topics in your Confluent Cloud environment.

In this tutorial, you will set up a changefeed to stream data from CockroachDB to a Kafka cluster in Confluent Cloud. You will also connect a Schema Registry that retrieves the schemas from your changefeed's messages.

An overview of the workflow involves creating and connecting the following:

  1. Confluent Cloud Kafka cluster
  2. Confluent Schema Registry
  3. Changefeed streaming to your Confluent Cloud Kafka cluster

Before you begin

You will need the following set up before starting this tutorial:

  • A CockroachDB cluster. You can use a CockroachDB Cloud or CockroachDB Self-Hosted cluster. If you are using CockroachDB Serverless or CockroachDB Dedicated, see the Quickstart with CockroachDB guide. For CockroachDB Self-Hosted clusters, see the install page.
  • A Confluent Cloud account. See Confluent's Get started page for details.
  • The Confluent CLI. See Install Confluent CLI to set this up. This tutorial uses v3.3.0 of the Confluent CLI. Note that you can also complete the steps in this tutorial in Confluent's Cloud console.

This tutorial uses the Cockroach Labs movr workload as an example database.

Step 1. Create a Confluent Cloud Kafka cluster

In this step, you'll use the Confluent CLI to create and configure a Kafka cluster.

First, ensure you are logged in to Confluent Cloud:

icon/buttons/copy
confluent login --save

These instructions use the --save flag to store your username and password to a local file for convenience during this tutorial, but you can omit this flag if you would prefer to manually authenticate yourself each time.

List the environments in your Confluent Cloud account:

icon/buttons/copy
confluent environment list

If you haven't created an environment explicitly, this command will list a default environment. You can use the default environment for this tutorial.

However, if you would prefer to create an environment, run the following command with a name for your environment:

icon/buttons/copy
confluent environment create {ENVIRONMENT NAME}

Set the environment that you would like to create your cluster in, using the environment's ID, which starts with env-:

icon/buttons/copy
confluent environment use {ENVIRONMENT ID}

Next, create a Kafka cluster:

icon/buttons/copy
confluent kafka cluster create movr-confluent-tutorial --cloud "gcp" --region "us-east1"

Here the name of the cluster is movr-confluent-tutorial, but you can change this for your cluster.

Note that the --cloud and --region flags are required when running the create command. See Confluent's documentation on confluent kafka cluster create.

The create command returns your new cluster's details, with a format similar to the following:

+---------------+--------------------------------------------------------+
| ID            | lkc-{ID}                                             |
| Name          | movr-confluent-tutorial                                |
| Type          | BASIC                                                  |
| Ingress       |                                                    100 |
| Egress        |                                                    100 |
| Storage       | 5 TB                                                   |
| Provider      | gcp                                                    |
| Availability  | single-zone                                            |
| Region        | us-east1                                               |
| Status        | PROVISIONING                                           |
| Endpoint      | SASL_SSL://pkc-4yyd6.us-east1.gcp.confluent.cloud:9092 |
| API Endpoint  | https://pkac-ew1dj.us-east1.gcp.confluent.cloud        |
| REST Endpoint | https://pkc-4yyd6.us-east1.gcp.confluent.cloud:443     |
+---------------+--------------------------------------------------------+

You'll need this information later in the tutorial, but you can also access this status at any time with the following command:

icon/buttons/copy
confluent kafka cluster describe {CLUSTER ID}
Note:

It can take up to 5 minutes for your Kafka cluster to provision. The Status field in the cluster's details will change from PROVISIONING to UP once your Kafka cluster is ready.

Step 2. Create a cluster API key and secret

In this step, you'll create an API key and secret for your Kafka cluster, which you'll need for connecting to your changefeed.

Create the API key for your Kafka cluster:

icon/buttons/copy
confluent api-key create --resource {CLUSTER ID}

You will receive output displaying your API and secret key.

Then, to make the consumer setup easier later in the tutorial, you can store the API key locally and set it as your active API key:

icon/buttons/copy
confluent api-key store --resource {CLUSTER ID}
icon/buttons/copy
confluent api-key use {API KEY} --resource {CLUSTER ID}

This will prompt you to enter your API and secret key. Use the --force flag if you already have a key stored in your local environment.

Step 3. Create Kafka topics

Next, you'll create the Kafka topics for your changefeed messages.

Ensure you have the correct active Kafka cluster:

icon/buttons/copy
confluent kafka cluster use {CLUSTER ID}
Set Kafka cluster "lkc-{ID}" as the active cluster for environment "env-{ID}".

Run the following to create a topic:

icon/buttons/copy
confluent kafka topic create users

users will be the topic name for this tutorial. If needed, you can change the topic name for your purposes and run the previous command for each topic you would like to create.

Tip:

If you are using a Dedicated Confluent Cloud cluster, you can enable auto topic creation. For further detail, see Enable automatic topic creation.

Step 4. Create a Confluent Schema Registry

In this step, you'll create the Schema Registry in your environment.

Enable the Schema Registry for the active environment:

icon/buttons/copy
confluent schema-registry cluster enable --cloud "gcp" --geo "us"

The --cloud and --geo flags are required with this enable command. See the confluent schema-registry cluster enable docs for more detail. To match the Kafka cluster setup for this tutorial, the cloud is set to Google Cloud Platform and the geo to the US.

You will receive output showing the Schema Registry's ID and its endpoint URL:

+--------------+----------------------------------------------------+
| Id           | lsrc-816zp7                                        |
| Endpoint URL | https://psrc-x77pq.us-central1.gcp.confluent.cloud |
+--------------+----------------------------------------------------+

Step 5. Create a Schema Registry API key and secret

Next, generate an API and secret key for the Schema Registry using the ID from your output:

icon/buttons/copy
confluent api-key create --resource {SCHEMA REGISTRY ID}

The output will display your API key and secret. You'll need these to create your Kafka consumer and start your changefeed.

Step 6. Create a Kafka consumer

In this step, you'll start a Kafka consumer for the changefeed messages.

Run the following command to create a consumer:

icon/buttons/copy
confluent kafka topic consume users \
 --value-format avro \
 --from-beginning \
 --schema-registry-endpoint {SCHEMA REGISTRY ENDPOINT URL} \
 --schema-registry-api-key {SCHEMA REGISTRY API KEY} \
 --schema-registry-api-secret {SCHEMA REGISTRY SECRET}

In this command, you need to pass the following Schema Registry details:

  • The endpoint URL from the output in Step 4
  • The API and secret key from Step 5

For this command to run successfully, ensure that confluent kafka cluster describe {CLUSTER ID} returns a Status of UP.

Your terminal will wait for messages after this command has run successfully.

Tip:

Run confluent schema-registry cluster describe to access details for the Schema Registry, if needed.

Step 7. Prepare your CockroachDB cluster

To create your changefeed, you'll prepare your CockroachDB cluster with the movr workload and enable rangefeeds.

In a new terminal window, initiate the movr workload for your cluster:

icon/buttons/copy
cockroach workload init movr {"CONNECTION STRING"}

Then, run the workload to generate some data:

icon/buttons/copy
 cockroach workload run movr --duration=1m {"CONNECTION STRING"}

Start a SQL session for your CockroachDB cluster:

icon/buttons/copy
cockroach sql --url {"CONNECTION STRING"}

Set your organization name and Enterprise license key that you received via email:

icon/buttons/copy
SET CLUSTER SETTING cluster.organization = '<organization name>';
icon/buttons/copy
SET CLUSTER SETTING enterprise.license = '<secret>';

Before you can create an Enterprise changefeed, it is necessary to enable rangefeeds on your cluster:

icon/buttons/copy
SET CLUSTER SETTING kv.rangefeed.enabled = true;

Step 8. Create a changefeed

Before running the CREATE CHANGEFEED statement, you must URL-encode both the cluster's and the Schema Registry's API secret key.

Create the changefeed with the following statement:

icon/buttons/copy
CREATE CHANGEFEED FOR TABLE users INTO "kafka://{KAFKA ENDPOINT}?tls_enabled=true&sasl_enabled=true&sasl_user={CLUSTER API KEY}&sasl_password={URL-ENCODED CLUSTER SECRET KEY}&sasl_mechanism=PLAIN" WITH updated, format = avro, confluent_schema_registry = "https://{SCHEMA REGISTRY API KEY}:{URL-ENCODED SCHEMA REGISTRY SECRET KEY}@{SCHEMA REGISTRY ENDPOINT URL}:443";

To connect to the Kafka cluster, use the Endpoint from your cluster details and precede it with the kafka:// scheme. For example, an endpoint of pkc-4yyd6.us-east1.gcp.confluent.cloud:9092 would be: kafka://pkc-4yyd6.us-east1.gcp.confluent.cloud:9092.

Since the Kafka cluster uses SASL authentication, you need to pass the following parameters. This includes the cluster API and secret key you created in Step 2:

  • tls_enabled=true
  • sasl_enabled=true
  • sasl_user={CLUSTER API KEY}
  • sasl_password={URL-ENCODED CLUSTER SECRET KEY}
  • sasl_mechanism=PLAIN

Use the following options to define the format and schema registry:

  • format = avro
  • confluent_schema_registry = "https://{API KEY}:{URL-ENCODED SCHEMA REGISTRY SECRET KEY}@{SCHEMA REGISTRY URL}:443". Note that the schema registry uses basic authentication, which means that the URL's format is different from the Kafka URL.

    Tip:

    Use the timeout={duration} query parameter (duration string) in your Confluent Schema Registry URI to change the default timeout for contacting the schema registry. By default, the timeout is 30 seconds.

    To form the URL, you need the following:

    • Schema Registry API Key created in Step 5.
    • URL-encoded Schema Registry secret key created in Step 5.
    • The Endpoint URL from the Schema Registry's details created in Step 4. Make sure to add the :443 port to the end of this URL. For example, psrc-x77pq.us-central1.gcp.confluent.cloud:443.
  • Any other options you need to configure your changefeed. See Options for a list of all available Enterprise changefeed options.

Step 9. Verify the output

Move to the terminal window in which you started the Kafka consumer. As the changefeed runs, you will see the change data messages similar to the following:

. . .
{"after":{"users":{"name":{"string":"Michael Clark"},"address":{"string":"85957 Ashley Junctions"},"credit_card":{"string":"4144089313"},"id":{"string":"d84cf3b6-7029-4d4d-aa81-e5caa9cce09e"},"city":{"string":"seattle"}}},"updated":{"string":"1659643584586630201.0000000000"}}
{"after":{"users":{"address":{"string":"17068 Christopher Isle"},"credit_card":{"string":"6664835435"},"id":{"string":"11b99275-92ce-4244-be61-4dae21973f87"},"city":{"string":"amsterdam"},"name":{"string":"John Soto"}}},"updated":{"string":"1659643585384406152.0000000000"}}
{"after":{"users":{"id":{"string":"a4666991-0292-4b00-8df0-d807c10eded5"},"city":{"string":"boston"},"name":{"string":"Anthony Snow"},"address":{"string":"74151 Carrillo Ramp"},"credit_card":{"string":"2630730025"}}},"updated":{"string":"1659643584990243411.0000000000"}}
{"updated":{"string":"1659643584877025654.0000000000"},"after":{"users":{"city":{"string":"seattle"},"name":{"string":"Tanya Holmes"},"address":{"string":"19023 Murphy Mall Apt. 79"},"credit_card":{"string":"6549598808"},"id":{"string":"434d4827-945f-4c7a-8d10-05c03e3bbeeb"}}}}
. . .

You can also view the messages for your cluster in the Confluent Cloud console in the Topics sidebar under the Messages tab.

Users topic messages in the Confluent Cloud console.

You can use the Schema tab to view the schema for a specific topic.

Users vale schema in the Confluent Cloud console.

See also


Yes No
On this page

Yes No