Skip to content

Flag Unhealthy IoT Devices

Organizations are turning towards the Internet of Things (IoT) to provide immediately actionable insights into the health and performance of various devices. However, each device can emit high volumes of telemetry data, making it difficult to accurately analyze and determine if and when something needs attention in real time. This recipe shows you how to process and coalesce that telemetry data using ksqlDB and flag devices that warrant more investigation.

internet of things

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. This recipe shows sample connector configuration(s) below, each of which should be created separately. You can also substitute your own connectors to connect to any supported data source.

In this example, the telemetry events are stored in Postgres database tables. The connector reads from the tables and writes the data into Kafka topics in Confluent Cloud.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
  "connector.class"          : "PostgresSource",
  "name"                     : "recipe-postgres-iot",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "5432",
  "connection.user"          : "postgres",
  "connection.password"      : "<my-database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "alarms, throughputs",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UTC",
  "tasks.max"                : "1"
}

Optional: If you cannot connect to a real datasource with properly formatted data, or if you just want to execute this recipe without external dependencies, no worries! In the next section, you'll see how to test the recipe by inserting mock data into the streams, using the ksqlDB command INSERT INTO.

ksqlDB code

In this example, there is one stream of data reporting device threshold values and another reporting alarms. The following stream processing app identifies which devices need to be investigated where the threshold is insufficient and alarm code is not zero.


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
SET 'auto.offset.reset' = 'earliest';

-- Create table with latest state of alarms
CREATE TABLE alarms (
  device_id STRING PRIMARY KEY,
  alarm_name STRING,
  code INT
) WITH (
  VALUE_FORMAT = 'JSON',
  KAFKA_TOPIC = 'alarms',
  PARTITIONS = 6
);

-- Create stream of throughputs 
CREATE STREAM throughputs (
  device_id STRING KEY,
  throughput DOUBLE
) WITH (
  VALUE_FORMAT = 'JSON',
  KAFKA_TOPIC = 'throughputs',
  PARTITIONS = 6
);

-- Create new stream of critial issues to investigate
-- where throughputs are below threshold 1000.0 and alarm code is not 0
CREATE STREAM critical_issues WITH (KAFKA_TOPIC = 'critical_issues') AS
  SELECT
    t.device_id,
    t.throughput,
    a.alarm_name,
    a.code
  FROM throughputs t
  LEFT JOIN alarms a ON t.device_id = a.device_id
  WHERE throughput < 1000.0 AND a.code != 0;

In the previous section Read the data in, if you did not have a real data source to connect to, you can now use ksqlDB to insert example events into the source topics with the following statements:

1
2
3
4
5
6
7
INSERT INTO alarms (device_id, alarm_name, code) VALUES ('d1', 'CHANNEL_CREATE', 0);
INSERT INTO alarms (device_id, alarm_name, code) VALUES ('d2', 'CHANNEL_CREATE', 42);
INSERT INTO alarms (device_id, alarm_name, code) VALUES ('d3', 'CHANNEL_CREATE', 0);

INSERT INTO throughputs (device_id, throughput) VALUES ('d1', 2000.0);
INSERT INTO throughputs (device_id, throughput) VALUES ('d2', 900.0);
INSERT INTO throughputs (device_id, throughput) VALUES ('d3', 500.0);

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.