Skip to content

Build a dynamic pricing strategy

As consumers increasingly transact digitally and online comparison shopping has become common practice, implementing a dynamic pricing strategy is essential to stay competitive. This recipe helps you keep track of pricing trends and statistics, such as lowest, median, and average prices over a given timeframe, so both buyers and sellers can make dynamic offers based on historical sales activity.

pricing strategy

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. This recipe shows sample connector configuration(s) below, each of which should be created separately. You can also substitute your own connectors to connect to any supported data source.

For this recipe, we are interested in knowing each marketplace event for an item, specifically its pricing. This creates a stream of events, upon which real-time stream processing can keep state and calculate pricing statistics.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
  "connector.class"          : "PostgresSource",
  "name"                     : "recipe-postgres-pricing",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "5432",
  "connection.user"          : "postgres",
  "connection.password"      : "<my-database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "sales",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UTC",
  "tasks.max"                : "1"
}

{
  "connector.class"          : "PostgresSource",
  "name"                     : "recipe-postgres-items",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "5432",
  "connection.user"          : "postgres",
  "connection.password"      : "<my-database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "items",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UTC",
  "tasks.max"                : "1"
}

Optional: If you cannot connect to a real datasource with properly formatted data, or if you just want to execute this recipe without external dependencies, no worries! In the next section, you'll see how to test the recipe by inserting mock data into the streams, using the ksqlDB command INSERT INTO.

ksqlDB code


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
SET 'auto.offset.reset' = 'earliest';

-- Create stream of sales
CREATE STREAM sales (
  item_id INT KEY,
  seller_id STRING,
  price DOUBLE
) WITH (
  VALUE_FORMAT = 'JSON',
  KAFKA_TOPIC = 'sales',
  PARTITIONS = 6
);

-- Create table of items
CREATE TABLE items (
  item_id INT PRIMARY KEY,
  item_name STRING
) WITH (
  VALUE_FORMAT = 'JSON',
  KAFKA_TOPIC = 'items',
  PARTITIONS = 6
);

-- Calculate minimum, maximum, and average price, per item, and join with item name
CREATE TABLE sales_stats WITH (KEY_FORMAT = 'JSON') AS
SELECT S.item_id,
       I.item_name,
       MIN(price) AS price_min,
       MAX(price) AS price_max,
       AVG(price) AS price_avg
FROM sales S
INNER JOIN items I ON S.item_id = I.item_id
GROUP BY S.item_id, I.item_name
EMIT CHANGES;

In the previous section Read the data in, if you did not have a real data source to connect to, you can now use ksqlDB to insert example events into the source topics with the following statements:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
INSERT INTO items (item_id, item_name) VALUES (1, 'Pikachu card');
INSERT INTO items (item_id, item_name) VALUES (2, 'Charizard card');
INSERT INTO items (item_id, item_name) VALUES (3, 'Mew card');

INSERT INTO sales (item_id, price) VALUES (1, 10.00); 
INSERT INTO sales (item_id, price) VALUES (2, 20.00); 
INSERT INTO sales (item_id, price) VALUES (3, 30.00); 
INSERT INTO sales (item_id, price) VALUES (1, 12.00); 
INSERT INTO sales (item_id, price) VALUES (1, 17.00); 
INSERT INTO sales (item_id, price) VALUES (3, 26.00); 

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.