Skip to content

Detect unusual credit card activity

One way many financial institutions detect fraud is to check for unusual activity in a short period of time, raising a red flag to promptly alert their customers and confirm any recent unexpected purchases. Fraud can involve using stolen credit cards, forging checks and account numbers, multiple duplicate transactions, and more. This recipe analyzes a customer’s typical credit card spend, and flags the account when there are instances of excessive spending as a possible case of credit card theft.

credit card being misused

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. This recipe shows sample connector configuration(s) below, each of which should be created separately. You can also substitute your own connectors to connect to any supported data source.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
{
  "connector.class"          : "OracleDatabaseSource",
  "name"                     : "recipe-oracle-transactions-cc",
  "connector.class"          : "OracleDatabaseSource",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "1521",
  "connection.user"          : "<database-username>",
  "connection.password"      : "<database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "TRANSACTIONS",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UCT",
  "tasks.max"                : "1"
}

{
  "connector.class"          : "OracleDatabaseSource",
  "name"                     : "recipe-oracle-customers-cc",
  "connector.class"          : "OracleDatabaseSource",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "connection.host"          : "<my-database-endpoint>",
  "connection.port"          : "1521",
  "connection.user"          : "<database-username>",
  "connection.password"      : "<database-password>",
  "db.name"                  : "<db-name>",
  "table.whitelist"          : "CUSTOMERS",
  "timestamp.column.name"    : "created_at",
  "output.data.format"       : "JSON",
  "db.timezone"              : "UCT",
  "tasks.max"                : "1"
}

Optional: If you cannot connect to a real datasource with properly formatted data, or if you just want to execute this recipe without external dependencies, no worries! In the next section, you'll see how to test the recipe by inserting mock data into the streams, using the ksqlDB command INSERT INTO.

ksqlDB code


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
SET 'auto.offset.reset' = 'earliest';

-- Create the stream of customer data
CREATE STREAM fd_cust_raw_stream (
  ID BIGINT, 
  FIRST_NAME VARCHAR, 
  LAST_NAME VARCHAR, 
  EMAIL VARCHAR, 
  AVG_CREDIT_SPEND DOUBLE
) WITH (
  KAFKA_TOPIC = 'FD_customers',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Repartition the customer data stream by account_id to prepare for the join later
CREATE STREAM fd_customer_rekeyed WITH (KAFKA_TOPIC = 'fd_customer_rekeyed') AS
  SELECT * 
  FROM fd_cust_raw_stream 
  PARTITION BY ID;

-- Register the partitioned customer data topic as a table
CREATE TABLE fd_customers (
  ID BIGINT PRIMARY KEY,
  FIRST_NAME VARCHAR, 
  LAST_NAME VARCHAR, 
  EMAIL VARCHAR, 
  AVG_CREDIT_SPEND DOUBLE
) WITH (
  KAFKA_TOPIC = 'fd_customer_rekeyed',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Create the stream of transactions
CREATE STREAM fd_transactions (
  ACCOUNT_ID BIGINT,
  TIMESTAMP VARCHAR,
  CARD_TYPE VARCHAR,
  AMOUNT DOUBLE,
  IP_ADDRESS VARCHAR,
  TRANSACTION_ID VARCHAR
) WITH (
  KAFKA_TOPIC = 'FD_transactions',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Join the transactions to customer information
CREATE STREAM fd_transactions_enriched WITH (KAFKA_TOPIC = 'transactions_enriched') AS
  SELECT
    T.ACCOUNT_ID,
    T.CARD_TYPE,
    T.AMOUNT, 
    C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, 
    C.AVG_CREDIT_SPEND 
  FROM fd_transactions T
  INNER JOIN fd_customers C
  ON T.ACCOUNT_ID = C.ID;

-- Aggregate the stream of transactions for each account ID using a two-hour
-- tumbling window, and filter for accounts in which the total spend in a
-- two-hour period is greater than the customer’s average:
CREATE TABLE fd_possible_stolen_card WITH (KAFKA_TOPIC = 'FD_possible_stolen_card', KEY_FORMAT = JSON') AS
  SELECT
    TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss Z') AS WINDOW_START, 
    T.ACCOUNT_ID,
    T.CARD_TYPE,
    SUM(T.AMOUNT) AS TOTAL_CREDIT_SPEND, 
    T.FULL_NAME,
    MAX(T.AVG_CREDIT_SPEND) AS AVG_CREDIT_SPEND 
  FROM fd_transactions_enriched T
  WINDOW TUMBLING (SIZE 2 HOURS) 
  GROUP BY T.ACCOUNT_ID, T.CARD_TYPE, T.FULL_NAME 
  HAVING SUM(T.AMOUNT) > MAX(T.AVG_CREDIT_SPEND);

In the previous section Read the data in, if you did not have a real data source to connect to, you can now use ksqlDB to insert example events into the source topics with the following statements:

1
2
3
4
5
6
7
INSERT INTO fd_cust_raw_stream (id, first_name, last_name, email, avg_credit_spend) VALUES (6011000990139424, 'Janice', 'Smith', 'jsmith@mycompany.com', 500.00);
INSERT INTO fd_cust_raw_stream (id, first_name, last_name, email, avg_credit_spend) VALUES (3530111333300000, 'George', 'Mall', 'gmall@mycompany.com', 20.00);

INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (6011000990139424, '2021-09-23T10:50:00.000Z', 'visa', 542.99, '192.168.44.1', '3985757');
INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (6011000990139424, '2021-09-23T10:50:01.000Z', 'visa', 611.48, '192.168.44.1', '8028435');
INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (3530111333300000, '2021-09-23T10:50:00.000Z', 'mastercard', 10.31, '192.168.101.3', '1695780');
INSERT INTO fd_transactions (account_id, timestamp, card_type, amount, ip_address, transaction_id) VALUES (3530111333300000, '2021-09-23T10:50:00.000Z', 'mastercard', 5.37, '192.168.101.3', '1695780');

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.