Skip to content

Notify Passengers of Flight Updates

Worse than having a flight delayed is not being notified about the important changes that come with it, such as new boarding times, cancellations, gate changes, and estimated arrivals. This recipe shows how ksqlDB can help airlines combine passenger, flight booking, and current flight plan data to immediately alert a passenger about flight updates in real time.

flight updates

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. This recipe shows sample connector configuration(s) below, each of which should be created separately. You can also substitute your own connectors to connect to any supported data source.

This example pulls in data from different tables for customers, flights, flight updates, and bookings.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
{
  "connector.class"          : "PostgresSource",
  "name"                     : "recipe-postgres-aviation",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "5432",
  "connection.user"          : "postgres",
  "connection.password"      : "<my-database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "customers,flights,flight_updates,bookings",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UTC",
  "tasks.max"                : "1"
}

Optional: If you cannot connect to a real datasource with properly formatted data, or if you just want to execute this recipe without external dependencies, no worries! In the next section, you'll see how to test the recipe by inserting mock data into the streams, using the ksqlDB command INSERT INTO.

ksqlDB code

This ksqlDB application joins between customer flight booking data and any flight updates to provide a stream of notifications to passengers.


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
SET 'auto.offset.reset' = 'earliest';

CREATE TABLE customers (ID             INT     PRIMARY KEY
                       , NAME           VARCHAR
                       , ADDRESS        VARCHAR
                       , EMAIL          VARCHAR
                       , PHONE          VARCHAR
                       , LOYALTY_STATUS VARCHAR)
              WITH (KAFKA_TOPIC = 'customers'
                   , FORMAT = 'AVRO'
                   , PARTITIONS = 6
);

CREATE TABLE flights (ID               INT     PRIMARY KEY
                       , ORIGIN        VARCHAR
                       , DESTINATION   VARCHAR
                       , CODE          VARCHAR
                       , SCHEDULED_DEP TIMESTAMP
                       , SCHEDULED_ARR TIMESTAMP)
              WITH (KAFKA_TOPIC = 'flights'
                   , FORMAT = 'AVRO'
                   , PARTITIONS = 6
);

CREATE TABLE bookings (ID            INT     PRIMARY KEY
                       , CUSTOMER_ID INT
                       , FLIGHT_ID   INT)
              WITH (KAFKA_TOPIC = 'bookings'
                   , FORMAT = 'AVRO'
                   , PARTITIONS = 6
);

CREATE TABLE customer_bookings AS 
  SELECT C.*, B.ID, B.FLIGHT_ID
  FROM   bookings B
          INNER JOIN customers C
              ON B.CUSTOMER_ID = C.ID;

CREATE TABLE customer_flights 
  WITH (KAFKA_TOPIC = 'customer_flights') AS
  SELECT CB.*, F.*
  FROM   customer_bookings CB
          INNER JOIN flights F
              ON CB.FLIGHT_ID = F.ID;

CREATE STREAM cf_stream WITH (KAFKA_TOPIC = 'customer_flights', FORMAT = 'AVRO');

CREATE STREAM cf_rekey WITH (KAFKA_TOPIC = 'cf_rekey') AS 
  SELECT F_ID                 AS FLIGHT_ID
        , CB_C_ID             AS CUSTOMER_ID
        , CB_C_NAME           AS CUSTOMER_NAME
        , CB_C_ADDRESS        AS CUSTOMER_ADDRESS
        , CB_C_EMAIL          AS CUSTOMER_EMAIL
        , CB_C_PHONE          AS CUSTOMER_PHONE
        , CB_C_LOYALTY_STATUS AS CUSTOMER_LOYALTY_STATUS
        , F_ORIGIN            AS FLIGHT_ORIGIN
        , F_DESTINATION       AS FLIGHT_DESTINATION
        , F_CODE              AS FLIGHT_CODE
        , F_SCHEDULED_DEP     AS FLIGHT_SCHEDULED_DEP
        , F_SCHEDULED_ARR     AS FLIGHT_SCHEDULED_ARR
  FROM cf_stream
  PARTITION BY F_ID;

CREATE TABLE customer_flights_rekeyed 
  (FLIGHT_ID INT PRIMARY KEY) 
  WITH (KAFKA_TOPIC = 'cf_rekey', FORMAT = 'AVRO');

CREATE STREAM flight_updates (ID          INT KEY
                            , FLIGHT_ID   INT
                            , UPDATED_DEP TIMESTAMP
                            , REASON      VARCHAR
                             )
              WITH (KAFKA_TOPIC = 'flight_updates'
                   , FORMAT = 'AVRO'
                   , PARTITIONS = 6
);

CREATE STREAM customer_flight_updates AS
  SELECT  CUSTOMER_NAME
      , FU.REASON      AS FLIGHT_CHANGE_REASON 
      , FU.UPDATED_DEP AS FLIGHT_UPDATED_DEP
      , FLIGHT_SCHEDULED_DEP 
      , CUSTOMER_EMAIL
      , CUSTOMER_PHONE
      , FLIGHT_DESTINATION
      , FLIGHT_CODE
  FROM flight_updates FU
        INNER JOIN customer_flights_rekeyed CB
        ON FU.FLIGHT_ID = CB.FLIGHT_ID
  EMIT CHANGES;

In the previous section Read the data in, if you did not have a real data source to connect to, you can now use ksqlDB to insert example events into the source topics with the following statements:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (1, 'Gleda Lealle', '93 Express Point', 'glealle0@senate.gov', '+351 831 301 6746', 'Silver');
INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (2, 'Gilly Crocombe', '332 Blaine Avenue', 'gcrocombe1@homestead.com', '+33 203 565 3736', 'Silver');
INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (3, 'Astrix Aspall', '56 Randy Place', 'aaspall2@ebay.co.uk', '+33 679 296 6645', 'Gold');
INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (4, 'Ker Omond', '23255 Tennessee Court', 'komond3@usnews.com', '+33 515 323 0170', 'Silver');
INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (5, 'Arline Synnott', '144 Ramsey Avenue', 'asynnott4@theatlantic.com', '+62 953 759 8885', 'Bronze');

INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (1, 'LBA', 'AMS', '642',  '2021-11-18T06:04:00', '2021-11-18T06:48:00');
INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (2, 'LBA', 'LHR', '9607', '2021-11-18T07:36:00', '2021-11-18T08:05:00');
INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (3, 'AMS', 'TXL', '7968', '2021-11-18T08:11:00', '2021-11-18T10:41:00');
INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (4, 'AMS', 'OSL', '496',  '2021-11-18T11:20:00', '2021-11-18T13:25:00');
INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (5, 'LHR', 'JFK', '9230', '2021-11-18T10:36:00', '2021-11-18T19:07:00');

INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (1,2,1);
INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (2,1,1);
INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (3,5,3);
INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (4,4,2);

INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (1, 2, '2021-11-18T09:00:00.000', 'Cabin staff unavailable');
INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (2, 3, '2021-11-19T14:00:00.000', 'Mechanical checks');
INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (3, 1, '2021-11-19T08:10:09.000', 'Icy conditions');

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.

Explanation

Create and populate the underlying tables

ksqlDB supports tables and streams as objects. Both are backed by Kafka topics. Here we're going to create three tables in a normalized data model to hold information about our customers, their bookings, and the flights.

First off, let's create a table that will hold data about our customers:

1
2
3
4
5
6
7
8
9
CREATE TABLE customers (ID             INT     PRIMARY KEY
                       , NAME           VARCHAR
                       , ADDRESS        VARCHAR
                       , EMAIL          VARCHAR
                       , PHONE          VARCHAR
                       , LOYALTY_STATUS VARCHAR)
              WITH (KAFKA_TOPIC='customers'
                   , FORMAT='AVRO'
                   , PARTITIONS=6);

This will store the data in a Kafka topic. In practice, you would probably populate this directly from your application or a feed from your database using Kafka Connect. For simplicity, here we'll just load some data directly:

1
2
3
4
5
INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (1, 'Gleda Lealle', '93 Express Point', 'glealle0@senate.gov', '+351 831 301 6746', 'Silver');
INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (2, 'Gilly Crocombe', '332 Blaine Avenue', 'gcrocombe1@homestead.com', '+33 203 565 3736', 'Silver');
INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (3, 'Astrix Aspall', '56 Randy Place', 'aaspall2@ebay.co.uk', '+33 679 296 6645', 'Gold');
INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (4, 'Ker Omond', '23255 Tennessee Court', 'komond3@usnews.com', '+33 515 323 0170', 'Silver');
INSERT INTO customers (ID, NAME, ADDRESS, EMAIL, PHONE, LOYALTY_STATUS) VALUES (5, 'Arline Synnott', '144 Ramsey Avenue', 'asynnott4@theatlantic.com', '+62 953 759 8885', 'Bronze');

Next, we'll create a table of flights and associated bookings for our customers.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE TABLE flights (ID               INT     PRIMARY KEY
                       , ORIGIN        VARCHAR
                       , DESTINATION   VARCHAR
                       , CODE          VARCHAR
                       , SCHEDULED_DEP TIMESTAMP
                       , SCHEDULED_ARR TIMESTAMP)
              WITH (KAFKA_TOPIC='flights'
                   , FORMAT='AVRO'
                   , PARTITIONS=6);

CREATE TABLE bookings (ID            INT     PRIMARY KEY
                       , CUSTOMER_ID INT
                       , FLIGHT_ID   INT)
              WITH (KAFKA_TOPIC='bookings'
                   , FORMAT='AVRO'
                   , PARTITIONS=6);

For these two tables, let's add some data. As before, this would usually come directly from your application or a stream of data from another system integrated through Kafka Connect.

1
2
3
4
5
INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (1, 'LBA', 'AMS', '642',  '2021-11-18T06:04:00', '2021-11-18T06:48:00');
INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (2, 'LBA', 'LHR', '9607', '2021-11-18T07:36:00', '2021-11-18T08:05:00');
INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (3, 'AMS', 'TXL', '7968', '2021-11-18T08:11:00', '2021-11-18T10:41:00');
INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (4, 'AMS', 'OSL', '496',  '2021-11-18T11:20:00', '2021-11-18T13:25:00');
INSERT INTO flights (ID, ORIGIN, DESTINATION, CODE, SCHEDULED_DEP, SCHEDULED_ARR) VALUES (5, 'LHR', 'JFK', '9230', '2021-11-18T10:36:00', '2021-11-18T19:07:00');
1
2
3
4
INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (1,2,1);
INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (2,1,1);
INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (3,5,3);
INSERT INTO bookings (ID, CUSTOMER_ID, FLIGHT_ID) VALUES (4,4,2);

Denormalize the data

To give us a single view of the passenger/flight data, we'll denormalize down the three tables into one. First, we join the customers to bookings that they've made and build a new table as a result:

1
2
3
4
5
6
7
SET 'auto.offset.reset' = 'earliest';

CREATE TABLE customer_bookings AS 
  SELECT C.*, B.ID, B.FLIGHT_ID
  FROM   bookings B
          INNER JOIN customers C
              ON B.CUSTOMER_ID = C.ID;

From here, we join to details of the flights themselves:

1
2
3
4
5
6
7
8
SET 'auto.offset.reset' = 'earliest';

CREATE TABLE customer_flights 
  WITH (KAFKA_TOPIC='customer_flights') AS
  SELECT CB.*, F.*
  FROM   customer_bookings CB
          INNER JOIN flights F
              ON CB.FLIGHT_ID=F.ID;

At this stage, we can query the data held in the tables to show which customers are booked on which flights:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
SET 'auto.offset.reset' = 'earliest';

SELECT  CB_C_NAME           AS NAME
      , CB_C_EMAIL          AS EMAIL
      , CB_C_LOYALTY_STATUS AS LOYALTY_STATUS
      , F_ORIGIN            AS ORIGIN
      , F_DESTINATION       AS DESTINATION
      , F_CODE              AS CODE
      , F_SCHEDULED_DEP     AS SCHEDULED_DEP 
FROM customer_flights
EMIT CHANGES;      
1
2
3
4
5
6
7
+---------------+------------------------+---------------+-------+------------+-----+------------------------+
|NAME           |EMAIL                   |LOYALTY_STATUS |ORIGIN |DESTINATION |CODE |SCHEDULED_DEP           |
+---------------+------------------------+---------------+-------+------------+-----+------------------------+
|Gilly Crocombe |gcrocombe1@homestead.com|Silver         |LBA    |AMS         |642  |2021-11-18T06:04:00.000 |
|Ker Omond      |komond3@usnews.com      |Silver         |LBA    |LHR         |9607 |2021-11-18T07:36:00.000 |
|Gleda Lealle   |glealle0@senate.gov     |Silver         |LBA    |AMS         |642  |2021-11-18T06:04:00.000 |
|Ker Omond      |komond3@usnews.com      |Silver         |AMS    |TXL         |7968 |2021-11-18T08:11:00.000 |

The last step in denormalizing the data is to set the key of the table to that of the flight ID so that it can be joined to the updates (which we'll get to below).

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
SET 'auto.offset.reset' = 'earliest';

CREATE STREAM cf_stream WITH (KAFKA_TOPIC='customer_flights', FORMAT='AVRO');

CREATE STREAM cf_rekey WITH (KAFKA_TOPIC='cf_rekey') AS 
  SELECT F_ID                 AS FLIGHT_ID
        , CB_C_ID             AS CUSTOMER_ID
        , CB_C_NAME           AS CUSTOMER_NAME
        , CB_C_ADDRESS        AS CUSTOMER_ADDRESS
        , CB_C_EMAIL          AS CUSTOMER_EMAIL
        , CB_C_PHONE          AS CUSTOMER_PHONE
        , CB_C_LOYALTY_STATUS AS CUSTOMER_LOYALTY_STATUS
        , F_ORIGIN            AS FLIGHT_ORIGIN
        , F_DESTINATION       AS FLIGHT_DESTINATION
        , F_CODE              AS FLIGHT_CODE
        , F_SCHEDULED_DEP     AS FLIGHT_SCHEDULED_DEP
        , F_SCHEDULED_ARR     AS FLIGHT_SCHEDULED_ARR
  FROM cf_stream
  PARTITION BY F_ID;

CREATE TABLE customer_flights_rekeyed 
  (FLIGHT_ID INT PRIMARY KEY) 
  WITH (KAFKA_TOPIC='cf_rekey', FORMAT='AVRO');

We now have the customer_flights table but keyed on FLIGHT_ID.

Add a stream of flight updates

In the flights table above, we have the scheduled departure time of a flight (SCHEDULED_DEP). Now, let's introduce a stream of events that any flight changes will be written to. Again, we're populating it directly, but in the real world it'll be coming from somewhere else—perhaps Kafka Connect pulling the data from a JMS queue (or any of the other hundreds of supported sources).

1
2
3
4
5
6
7
8
CREATE STREAM flight_updates (ID          INT KEY
                            , FLIGHT_ID   INT
                            , UPDATED_DEP TIMESTAMP
                            , REASON      VARCHAR
                             )
              WITH (KAFKA_TOPIC='flight_updates'
                   , FORMAT='AVRO'
                   , PARTITIONS=6);

Join data

By joining between our customer flight booking data and any flight updates, we can provide a stream of notifications to passengers. Many platforms exist for providing the push notification, whether bespoke in app or using a third-party messaging tool. ksqlDB can integrate with these using its REST interface, native Java client, or one of the several community-supported clients.

In one ksqlDB window, run the following ksqlDB query to return customer details with flight updates. This is the same query that you would run from your application, and it runs continuously.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
SELECT  CUSTOMER_NAME
      , FU.REASON      AS FLIGHT_CHANGE_REASON 
      , FU.UPDATED_DEP AS FLIGHT_UPDATED_DEP
      , FLIGHT_SCHEDULED_DEP 
      , CUSTOMER_EMAIL
      , CUSTOMER_PHONE
      , FLIGHT_DESTINATION
      , FLIGHT_CODE
  FROM flight_updates FU
        INNER JOIN customer_flights_rekeyed CB
        ON FU.FLIGHT_ID = CB.FLIGHT_ID
EMIT CHANGES;

In another ksqlDB window, add some data to the flight update stream:

1
2
3
INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (1, 2, '2021-11-18T09:00:00.000', 'Cabin staff unavailable');
INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (2, 3, '2021-11-19T14:00:00.000', 'Mechanical checks');
INSERT INTO flight_updates (ID, FLIGHT_ID, UPDATED_DEP, REASON) VALUES (3, 1, '2021-11-19T08:10:09.000', 'Icy conditions');

In the original window, you will see the details of which passengers are impacted by which flight changes:

1
2
3
4
5
6
+---------------+------------------------+--------------------+----------------------+---------------------------+------------------+-------------------+------------+
|CUSTOMER_NAME  |FLIGHT_CHANGE_REASON    |FLIGHT_UPDATED_DEP  |FLIGHT_SCHEDULED_DEP  |CUSTOMER_EMAIL             |CUSTOMER_PHONE    |FLIGHT_DESTINATION |FLIGHT_CODE |
+---------------+------------------------+--------------------+----------------------+---------------------------+------------------+-------------------+------------+
|Gleda Lealle   |Icy conditions          |2021-11-19T08:10:09 |2021-11-18T06:04:00   |glealle0@senate.gov        |+351 831 301 6746 |AMS                |642         |
|Ker Omond      |Cabin staff unavailable |2021-11-18T09:00:00 |2021-11-18T07:36:00   |komond3@usnews.com         |+33 515 323 0170  |LHR                |9607        |
|Arline Synnott |Mechanical checks       |2021-11-19T14:00:00 |2021-11-18T08:11:00   |asynnott4@theatlantic.com  |+62 953 759 8885  |TXL                |7968        |