Skip to content

Understand user behavior with clickstream data

Analyzing clickstream data enables businesses to optimize webpages and determine the effectiveness of their web presence by better understanding their users’ click activity and navigation patterns. Because clickstream data often involves large data volumes, stream processing is a natural fit, as it quickly processes data as soon as it is ingested for analysis. This recipe enables you to measure key statistics on visitor activity over a given time frame, such as how many webpages they are viewing, how long they’re engaging with the website, and more.

grafana

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. This recipe shows sample connector configuration(s) below, each of which should be created separately. You can also substitute your own connectors to connect to any supported data source.

This recipe creates simulated data with the Datagen connector.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
  "name"                     : "Datagen_users",
  "connector.class"          : "DatagenSource",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "kafka.topic"              : "clickstream_users",
  "quickstart"               : "CLICKSTREAM_USERS",
  "maxInterval"              : "10",
  "tasks.max"                : "1",
  "output.data.format"       : "JSON"
}

{
  "name"                     : "Datagen_clicks",
  "connector.class"          : "DatagenSource",
  "kafka.api.key"            : "<my-kafka-api-key>",
  "kafka.api.secret"         : "<my-kafka-api-secret>",
  "kafka.topic"              : "clickstream",
  "quickstart"               : "CLICKSTREAM",
  "maxInterval"              : "30",
  "tasks.max"                : "1",
  "output.data.format"       : "JSON"
}

Optional: To simulate a real-world scenario where user sessions aren't just always open but do close after some time, you can pause and resume the DATAGEN_CLICKSTREAM connector.

ksqlDB code

Now you can process the data in a variety of ways by enriching the clickstream data with user information, analyze errors, aggregate data into windows of time, etc.


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
SET 'auto.offset.reset' = 'earliest';

-- stream of user clicks:
CREATE STREAM clickstream (
  _time BIGINT,
  time VARCHAR,
  ip VARCHAR,
  request VARCHAR,
  status INT,
  userid INT,
  bytes BIGINT,
  agent VARCHAR
) WITH (
  KAFKA_TOPIC = 'clickstream',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 1
);

-- users lookup table:
CREATE TABLE web_users (
  user_id VARCHAR PRIMARY KEY,
  registered_At BIGINT,
  username VARCHAR,
  first_name VARCHAR,
  last_name VARCHAR,
  city VARCHAR,
  level VARCHAR
) WITH (
  KAFKA_TOPIC = 'clickstream_users',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 1
);

-- Build materialized stream views:

-- enrich click-stream with more user information:
CREATE STREAM user_clickstream AS
  SELECT
    u.user_id,
    u.username,
    ip,
    u.city,
    request,
    status,
    bytes
  FROM clickstream c
  LEFT JOIN web_users u ON cast(c.userid AS VARCHAR) = u.user_id;

-- Build materialized table views:

-- Table of html pages per minute for each user:
CREATE TABLE pages_per_min AS
  SELECT
    userid AS k1,
    AS_VALUE(userid) AS userid,
    WINDOWSTART AS EVENT_TS,
    COUNT(*) AS pages
  FROM clickstream WINDOW HOPPING (SIZE 60 SECOND, ADVANCE BY 5 SECOND)
  WHERE request LIKE '%html%'
  GROUP BY userid;

-- User sessions table - 30 seconds of inactivity expires the session
-- Table counts number of events within the session
CREATE TABLE click_user_sessions AS
  SELECT
    username AS K,
    AS_VALUE(username) AS username,
    WINDOWEND AS EVENT_TS,
    COUNT(*) AS events
  FROM user_clickstream WINDOW SESSION (30 SECOND)
  GROUP BY username;

-- number of errors per min, using 'HAVING' Filter to show ERROR codes > 400
-- where count > 5
CREATE TABLE errors_per_min_alert WITH (KAFKA_TOPIC = 'errors_per_min_alert') AS
  SELECT
    status AS k1,
    AS_VALUE(status) AS status,
    WINDOWSTART AS EVENT_TS,
    COUNT(*) AS errors
  FROM clickstream WINDOW HOPPING (SIZE 60 SECOND, ADVANCE BY 20 SECOND)
  WHERE status > 400
  GROUP BY status
  HAVING COUNT(*) > 5 AND COUNT(*) IS NOT NULL;

-- Enriched user details table:
-- Aggregate (count&groupBy) using a TABLE-Window
CREATE TABLE user_ip_activity WITH (KEY_FORMAT = 'JSON', KAFKA_TOPIC = 'user_ip_activity') AS
  SELECT
    username AS k1,
    ip AS k2,
    city AS k3,
    AS_VALUE(username) AS username,
    WINDOWSTART AS EVENT_TS,
    AS_VALUE(ip) AS ip,
    AS_VALUE(city) AS city,
    COUNT(*) AS count
  FROM user_clickstream WINDOW TUMBLING (SIZE 60 SECOND)
  GROUP BY username, ip, city
  HAVING COUNT(*) > 1;

Write the data out

After processing the data, send it to Elasticsearch.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
  "connector.class"    : "ElasticsearchSink",
  "name"               : "recipe-elasticsearch-analyzed_clickstream",
  "input.data.format"  : "JSON",
  "kafka.api.key"      : "<my-kafka-api-key>",
  "kafka.api.secret"   : "<my-kafka-api-secret>",
  "topics"             : "USER_IP_ACTIVITY, ERRORS_PER_MIN_ALERT",
  "connection.url"     : "<elasticsearch-URI>",
  "connection.username": "<elasticsearch-username>",
  "connection.password": "<elasticsearch-password>",
  "type.name"          : "type.name:kafkaconnect",
  "key.ignore"         : "true",
  "schema.ignore"      : "true",
  "tasks.max"          : "1"
}

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.