Skip to content

Enrich orders with change data capture (CDC)

Change data capture (CDC) plays a vital role to ensure recently changed data is quickly ingested, transformed, and used by downstream analytics platforms and applications. If you have transactional events being written to a database, such as sales orders from a marketplace, you can use CDC to capture and denormalize these change events into a single table of enriched data to provide better query performance and consumption. This recipe demonstrates this principle by streaming data from a SQL Server, denormalizing the data, and writing it to Snowflake.

denormalized

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. This recipe shows sample connector configuration(s) below, each of which should be created separately. You can also substitute your own connectors to connect to any supported data source.

Change data capture (CDC) for orders is being read from a SQL Server database, and the customer data is being read from Oracle.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
  "connector.class"          : "SqlServerCdcSource",
  "name"                     : "recipe-sqlservercdc-orders",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "database.hostname"        : "<db-name>",
  "database.port"            : "1433",
  "database.user"            : "<database-username>",
  "database.password"        : "<database-password>",
  "database.dbname"          : "database-name",
  "database.server.name"     : "sql",
  "table.include.list"       :"<table_name>",
  "snapshot.mode"            : "initial",
  "output.data.format"       : "JSON",
  "tasks.max"                : "1"
}

{
  "connector.class"          : "OracleDatabaseSource",
  "name"                     : "recipe-oracle-customers",
  "connector.class"          : "OracleDatabaseSource",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "topic.prefix"             : "oracle_",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "1521",
  "connection.user"          : "<database-username>",
  "connection.password"      : "<database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "CUSTOMERS",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UCT",
  "tasks.max"                : "1"
}

Optional: If you cannot connect to a real datasource with properly formatted data, or if you just want to execute this recipe without external dependencies, no worries! In the next section, you'll see how to test the recipe by inserting mock data into the streams, using the ksqlDB command INSERT INTO.

ksqlDB code

This streams the user orders and denormalizes the data by joining facts (orders) with the dimension (customer).


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
SET 'auto.offset.reset' = 'earliest';

-- Create stream of orders
CREATE STREAM orders (
  order_id BIGINT,
  customer_id BIGINT,
  item VARCHAR,
  order_total_usd DECIMAL(10,2)
) WITH (
  KAFKA_TOPIC = 'orders',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Register the customer data topic as a table
CREATE TABLE customers (
  id BIGINT PRIMARY KEY,
  first_name VARCHAR,
  last_name VARCHAR,
  email VARCHAR
) WITH (
  KAFKA_TOPIC = 'CUST_RAW_STREAM',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Denormalize data, joining facts (orders) with the dimension (customer)
CREATE STREAM orders_enriched AS
  SELECT
    c.id AS customer_id,
    o.order_id AS order_id,
    o.item AS item,
    o.order_total_usd AS order_total_usd,
    CONCAT(CONCAT(c.first_name , ' ') , c.last_name) AS full_name,
    c.email AS email
  FROM orders o
    LEFT JOIN customers c
    ON o.customer_id = c.id;

In the previous section Read the data in, if you did not have a real data source to connect to, you can now use ksqlDB to insert example events into the source topics with the following statements:

1
2
3
4
5
6
7
8
9
INSERT INTO orders (order_id, customer_id, item, order_total_usd) VALUES (44697328, 375, 'book', 29.99);
INSERT INTO orders (order_id, customer_id, item, order_total_usd) VALUES (44697329, 375, 'guitar', 215.99);
INSERT INTO orders (order_id, customer_id, item, order_total_usd) VALUES (44697330, 983, 'thermometer', 12.99);
INSERT INTO orders (order_id, customer_id, item, order_total_usd) VALUES (44697331, 983, 'scarf', 32.99);
INSERT INTO orders (order_id, customer_id, item, order_total_usd) VALUES (44697332, 375, 'doormat', 15.99);
INSERT INTO orders (order_id, customer_id, item, order_total_usd) VALUES (44697333, 983, 'clippers', 65.99);

INSERT INTO customers (id, first_name, last_name, email) VALUES (375, 'Janice', 'Smith', 'jsmith@mycompany.com');
INSERT INTO customers (id, first_name, last_name, email) VALUES (983, 'George', 'Mall', 'gmall@mycompany.com');

Write the data out

Any downstream application or database can receive the denormalized data.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
{
  'connector.class'          : 'SnowflakeSink',
  'name'                     : 'recipe-snowflake-analyzed_clickstream',
  'kafka.api.key'            : '<my-kafka-api-key>',
  'kafka.api.secret'         : '<my-kafka-api-secret>',
  'topics'                   : 'ORDERS_ENRICHED',
  'input.data.format'        : 'JSON',
  'snowflake.url.name'       : 'https://wm83168.us-central1.gcp.snowflakecomputing.com:443',
  'snowflake.user.name'      : '<login-username>',
  'snowflake.private.key'    : '<private-key>',
  'snowflake.database.name'  : '<database-name>',
  'snowflake.schema.name'    : '<schema-name>',
  'tasks.max'                : '1'
}

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.