Skip to content

Optimize Fleet Management

More and more, fleet management relies on knowing real-time information on vehicle availability, their locations, and integrating that with data from vehicle telematics. This enables businesses to improve operational efficiency by optimizing travel routes, lowering fuel consumption, and automating service schedules. This recipe combines fleet locations with individual vehicle information, so organizations can have a real-time consolidated view of their entire fleet.

fleet management

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. This recipe shows sample connector configuration(s) below, each of which should be created separately. You can also substitute your own connectors to connect to any supported data source.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
  "connector.class"          : "MongoDbAtlasSource",
  "name"                     : "recipe-mongodb-fleet_description",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<database-host-address>",
  "connection.user"          : "<database-username>",
  "connection.password"      : "<database-password>",
  "database"                 : "<database-name>",
  "collection"               : "<database-collection-name>",
  "poll.await.time.ms"       : "5000",
  "poll.max.batch.size"      : "1000",
  "copy.existing"            : "true",
  "output.data.format"       : "JSON"
  "tasks.max"                : "1"
}

{
  "connector.class"          : "PostgresSource",
  "name"                     : "recipe-postgres-fleet_location",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "5432",
  "connection.user"          : "postgres",
  "connection.password"      : "<my-database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "fleet_location",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UTC",
  "tasks.max"                : "1"
}

Optional: If you cannot connect to a real datasource with properly formatted data, or if you just want to execute this recipe without external dependencies, no worries! In the next section, you'll see how to test the recipe by inserting mock data into the streams, using the ksqlDB command INSERT INTO.

ksqlDB code

This application will enrich the fleet telemetry events with details about the associated vehicle.


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
SET 'auto.offset.reset' = 'earliest';

-- create stream of locations
CREATE STREAM locations (
  vehicle_id INT,
  latitude DOUBLE,
  longitude DOUBLE,
  timestamp VARCHAR
) WITH (
  KAFKA_TOPIC = 'locations',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- fleet lookup table
CREATE TABLE fleet (
  vehicle_id INT PRIMARY KEY,
  driver_id INT,
  license BIGINT
) WITH (
  KAFKA_TOPIC = 'descriptions',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- enrich fleet location stream with more fleet information
CREATE STREAM fleet_location_enhanced AS
  SELECT
    l.vehicle_id,
    latitude,
    longitude,
    timestamp,
    f.driver_id,
    f.license
  FROM locations l
  LEFT JOIN fleet f ON l.vehicle_id = f.vehicle_id;

In the previous section Read the data in, if you did not have a real data source to connect to, you can now use ksqlDB to insert example events into the source topics with the following statements:

1
2
3
4
5
6
7
INSERT INTO locations (vehicle_id, latitude, longitude, timestamp) VALUES (5401, 16.7587537, 96.2482149, '2021-09-23T10:50:00.000Z');
INSERT INTO locations (vehicle_id, latitude, longitude, timestamp) VALUES (5401, 16.004175, 120.7806412, '2021-09-27T06:39:00.000Z');
INSERT INTO locations (vehicle_id, latitude, longitude, timestamp) VALUES (5402, 32.755613, 22.6377432, '2021-09-25T20:22:00.000Z');

INSERT INTO fleet (vehicle_id, driver_id, license) VALUES (5401, 847383, 8852693196);
INSERT INTO fleet (vehicle_id, driver_id, license) VALUES (5402, 922947, 1255144201);
INSERT INTO fleet (vehicle_id, driver_id, license) VALUES (5403, 435309, 2132311746);

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.