Skip to content

Automate instant payment verifications

As digital transactions become the new norm, it’s critical to check customer payment requests in real time for suspicious activity. This means financial institutions must verify the payment by checking it against any regulatory restrictions before proceeding to process it. This recipe shows you how to validate these payments against available funds and anti-money laundering (AML) policies.

payment verification

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. This recipe shows sample connector configuration(s) below, each of which should be created separately. You can also substitute your own connectors to connect to any supported data source.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
{
  "connector.class"          : "PostgresSource",
  "name"                     : "recipe-payment-status-check-customers",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "5432",
  "connection.user"          : "postgres",
  "connection.password"      : "<my-database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "customers",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UTC",
  "tasks.max"                : "1"
}

{
  "connector.class"          : "PostgresSource",
  "name"                     : "recipe-payment-status-check-payments",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "5432",
  "connection.user"          : "postgres",
  "connection.password"      : "<my-database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "payments",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UTC",
  "tasks.max"                : "1"
}

{
  "connector.class"          : "PostgresSource",
  "name"                     : "recipe-payment-status-check-aml_status",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "5432",
  "connection.user"          : "postgres",
  "connection.password"      : "<my-database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "aml_status",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UTC",
  "tasks.max"                : "1"
}

{
  "connector.class"          : "PostgresSource",
  "name"                     : "recipe-payment-status-check-funds_status",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "5432",
  "connection.user"          : "postgres",
  "connection.password"      : "<my-database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "funds_status",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UTC",
  "tasks.max"                : "1"
}

Optional: If you cannot connect to a real datasource with properly formatted data, or if you just want to execute this recipe without external dependencies, no worries! In the next section, you'll see how to test the recipe by inserting mock data into the streams, using the ksqlDB command INSERT INTO.

ksqlDB code

Now you can process the data in a variety of ways.


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
SET 'auto.offset.reset' = 'earliest';

-- Register the initial streams and tables from the Kafka topics
CREATE STREAM PAYMENTS (
  PAYMENT_ID INTEGER KEY,
  CUSTID INTEGER,
  ACCOUNTID INTEGER,
  AMOUNT INTEGER,
  BANK VARCHAR
) WITH (
  KAFKA_TOPIC = 'payments',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

CREATE STREAM aml_status (
  PAYMENT_ID INTEGER,
  BANK VARCHAR,
  STATUS VARCHAR
) WITH (
  KAFKA_TOPIC = 'aml_status',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

CREATE STREAM funds_status (
  PAYMENT_ID INTEGER,
  REASON_CODE VARCHAR,
  STATUS VARCHAR
) WITH (
  KAFKA_TOPIC = 'funds_status',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

CREATE TABLE customers (
  ID INTEGER PRIMARY KEY, 
  FIRST_NAME VARCHAR, 
  LAST_NAME VARCHAR, 
  EMAIL VARCHAR, 
  GENDER VARCHAR, 
  STATUS360 VARCHAR
) WITH (
  KAFKA_TOPIC = 'customers',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Enrich Payments stream with Customers table
CREATE STREAM enriched_payments AS SELECT
  p.payment_id AS payment_id,
  p.custid AS customer_id,
  p.accountid,
  p.amount,
  p.bank,
  c.first_name,
  c.last_name,
  c.email,
  c.status360
  FROM payments p LEFT JOIN customers c ON p.custid = c.id;

-- Combine the status streams
CREATE STREAM payment_statuses AS SELECT
  payment_id,
  status,
  'AML' AS source_system
  FROM aml_status;

INSERT INTO payment_statuses SELECT payment_id, status, 'FUNDS' AS source_system FROM funds_status;

-- Combine payment and status events in 1 hour window. 
CREATE STREAM payments_with_status AS SELECT
  ep.payment_id AS payment_id,
  ep.accountid,
  ep.amount,
  ep.bank,
  ep.first_name,
  ep.last_name,
  ep.email,
  ep.status360,
  ps.status,
  ps.source_system
  FROM enriched_payments ep LEFT JOIN payment_statuses ps WITHIN 1 HOUR ON ep.payment_id = ps.payment_id ;

-- Aggregate data to the final table
CREATE TABLE payments_final AS SELECT
  payment_id,
  HISTOGRAM(status) AS status_counts,
  COLLECT_LIST('{ "system" : "' + source_system + '", "status" : "' + STATUS + '"}') AS service_status_list
  FROM payments_with_status
  WHERE status IS NOT NULL
  GROUP BY payment_id;

In the previous section Read the data in, if you did not have a real data source to connect to, you can now use ksqlDB to insert example events into the source topics with the following statements:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
-- Customer Data
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (10,'Brena','Tollerton','btollerton9@furl.net','Female','silver');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (9,'Even','Tinham','etinham8@facebook.com','Male','silver');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (8,'Patti','Rosten','prosten7@ihg.com','Female','silver');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (7,'Fay','Huc','fhuc6@quantcast.com','Female','bronze');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (6,'Robinet','Leheude','rleheude5@reddit.com','Female','platinum');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (5,'Hansiain','Coda','hcoda4@senate.gov','Male','platinum');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (4,'Hashim','Rumke','hrumke3@sohu.com','Male','platinum');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (3,'Mariejeanne','Cocci','mcocci2@techcrunch.com','Female','bronze');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (2,'Ruthie','Brockherst','rbrockherst1@ow.ly','Female','platinum');
INSERT INTO customers (id, FIRST_NAME, LAST_NAME, EMAIL, GENDER, STATUS360) VALUES (1,'Rica','Blaisdell','rblaisdell0@rambler.ru','Female','bronze');

-- Payment Instruction Data
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (1,1,1234000,100,'DBS');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (3,2,1234100,200,'Barclays Bank');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (5,3,1234200,300,'BNP Paribas');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (7,4,1234300,400,'Wells Fargo');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (9,5,1234400,500,'DBS');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (11,6,1234500,600,'Royal Bank of Canada');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (13,7,1234600,700,'DBS');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (15,8,1234700,800,'DBS');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (17,9,1234800,900,'DBS');
INSERT INTO payments (PAYMENT_ID, CUSTID, ACCOUNTID, AMOUNT, BANK) VALUES (19,10,1234900,1000,'United Overseas Bank');

-- AML Status Data
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (1,'Wells Fargo','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (3,'Commonwealth Bank of Australia','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (5,'Deutsche Bank','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (7,'DBS','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (9,'United Overseas Bank','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (11,'Citi','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (13,'Commonwealth Bank of Australia','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (15,'Barclays Bank','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (17,'United Overseas Bank','OK');
INSERT INTO aml_status(PAYMENT_ID,BANK,STATUS) VALUES (19,'Royal Bank of Canada','OK');

-- Funds Status Data
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (1,'00','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (3,'99','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (5,'30','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (7,'00','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (9,'00','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (11,'00','NOT OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (13,'30','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (15,'00','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (17,'10','OK');
INSERT INTO funds_status(PAYMENT_ID,REASON_CODE,STATUS) VALUES (19,'10','OK');

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.