Skip to content

Optimize Omni-channel Inventory

Having an up-to-date, real-time view of inventory on every item is essential in today's online marketplaces. This helps businesses maintain the optimum level of inventory—not too much and not too little—so that they can meet customer demand while minimizing inventory holding costs. This recipe demonstrates how to track and update inventory in real time, so you always have an up-to-date snapshot of your stock for both your customers and merchandising teams.

inventory

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. This recipe shows sample connector configuration(s) below, each of which should be created separately. You can also substitute your own connectors to connect to any supported data source.

For this recipe, we are interested in knowing each event for an item that affects its quantity. This creates a stream of events, where each event results in the addition or removal of inventory.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
  "connector.class"          : "PostgresSource",
  "name"                     : "recipe-postgres-inventory",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "5432",
  "connection.user"          : "postgres",
  "connection.password"      : "<my-database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "inventory",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UTC",
  "tasks.max"                : "1"
}

Optional: If you cannot connect to a real datasource with properly formatted data, or if you just want to execute this recipe without external dependencies, no worries! In the next section, you'll see how to test the recipe by inserting mock data into the streams, using the ksqlDB command INSERT INTO.

ksqlDB code

Create a ksqlDB TABLE, which is a mutable, partitioned collection that models change over time and represents what is true as of now.


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
SET 'auto.offset.reset' = 'earliest';

-- Create stream of inventory
CREATE STREAM inventory_stream (
  id STRING KEY,
  item STRING,
  quantity INTEGER
) WITH (
  VALUE_FORMAT = 'JSON',
  KAFKA_TOPIC = 'inventory',
  PARTITIONS = 6
);

-- Create stateful table with up-to-date information of inventory availability
CREATE TABLE inventory_stream_table
    WITH (KAFKA_TOPIC = 'inventory_table') AS
  SELECT
    item,
    SUM(quantity) AS item_quantity
  FROM inventory_stream
  GROUP BY item
  EMIT CHANGES;

In the previous section Read the data in, if you did not have a real data source to connect to, you can now use ksqlDB to insert example events into the source topics with the following statements:

1
2
3
4
5
6
7
8
9
INSERT INTO inventory_stream (id, item, quantity) VALUES ('1', 'Apple Magic Mouse 2', 10);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('2', 'iPhoneX', 25);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('3', 'MacBookPro13', 100);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('4', 'iPad4', 20);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('5', 'Apple Pencil', 10);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('5', 'PhoneX', 10);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('4', 'iPad4', -20);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('3', 'MacBookPro13', 10);
INSERT INTO inventory_stream (id, item, quantity) VALUES ('4', 'iPad4', 20);

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.