Skip to content

Detect and analyze SSH attacks

There are lots of ways SSH can be abused, but one of the most straightforward ways to detect suspicious activity is to monitor for rejected logins. This recipe processes Syslog data to detect failed logins, while streaming out those pairs of usernames and IP addresses. With ksqlDB, you can filter and react to unwanted events in real time to minimize damage rather than performing historical analysis of Syslog data from cold storage.

SSH-attack

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular datasources and end systems in the cloud. However, if you want to stream specific events that don't have an available connector in Confluent Cloud, you can run your own connector and send the data to your Kafka cluster. This recipe shows how you can run your own connector locally.

To stream syslog data into a Kafka topic called syslog, create the Dockerfile below to bundle a Kafka Connect worker with the kafka-connect-syslog connector:

1
2
3
4
5
FROM confluentinc/cp-server-connect-base:7.0.1

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-syslog:1.3.4

Build the custom Docker image with this command:

1
2
3
docker build \
   -t localbuild/connect_distributed_with_syslog:1.3.4 \
   -f Dockerfile .

Next, create a docker-compose.yml file with the following content, substituting your Confluent Cloud connection information:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
---
version: '2'
services:

  connect:
    image: localbuild/connect_distributed_with_syslog:1.3.4
    hostname: connect
    container_name: connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: < BOOTSTRAP SERVER >
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect"
      CONNECT_CONFIG_STORAGE_TOPIC: recipe-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: recipe-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: recipe-connect-status
      CONNECT_REPLICATION_FACTOR: 3
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
      # Connect worker
      CONNECT_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
      CONNECT_SASL_MECHANISM: PLAIN
      # Connect producer
      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
      # Connect consumer
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG > 
      CONNECT_CONSUMER_SASL_MECHANISM: PLAIN

Run the container with this:

1
docker-compose up -d

Create a Syslog Source connector configuration file called connector-syslog.config:

1
2
3
4
5
6
7
8
{
  "connector.class"          : "io.confluent.connect.syslog.SyslogSourceConnector",
  "name"                     : "recipe-syslog-source",
  "syslog.port"              : "<port>",
  "syslog.listener"          : "TCP",
  "kafka.topic"              : "syslog",
  "tasks.max"                : "1"
}

Submit that connector to the connect worker:

1
curl -X POST -H "Content-Type: application/json" --data @connector-syslog.config http://localhost:8083/connectors

Now you should have Syslog messages being written to the syslog topic in Confluent Cloud.

Optional: If you cannot connect to a real datasource with properly formatted data, or if you just want to execute this recipe without external dependencies, no worries! In the next section, you'll see how to test the recipe by inserting mock data into the streams, using the ksqlDB command INSERT INTO.

ksqlDB code

Process the syslog events by flagging events with invalid users, stripping out all the other unnecessary fields, and creating just a stream of relevant information. There are many ways to customize the resulting stream to fit the business needs: this example also demonstrates how to enrich the stream with a new field FACILITY_DESCRIPTION with human-readable content.


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
SET 'auto.offset.reset' = 'earliest';

-- Extract relevant fields from log messages
CREATE STREAM syslog (
  ts varchar, 
  host varchar,
  facility int,
  message varchar,
  remote_address varchar 
) WITH (
  KAFKA_TOPIC = 'syslog',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Create actionable stream of SSH attacks, filtering syslog messages where user is invalid,
-- and enriched with user and IP
CREATE STREAM ssh_attacks AS
  SELECT
    FORMAT_TIMESTAMP(ts, 'yyyy-MM-dd HH:mm:ss') AS syslog_timestamp,
    host,
    facility,
    CASE WHEN facility = 0 THEN 'kernel messages'
         WHEN facility = 1 THEN 'user-level messages'
         WHEN facility = 2 THEN 'mail system'
         WHEN facility = 3 THEN 'system daemons'
         WHEN facility = 4 THEN 'security/authorization messages'
         WHEN facility = 5 THEN 'messages generated internally by syslogd'
         WHEN facility = 6 THEN 'line printer subsystem'
         ELSE '<unknown>'
       END AS facility_description,
    SPLIT(REPLACE(message, 'Invalid user ', ''), ' from ')[1] AS attack_user,
    remote_address AS attack_ip
  FROM syslog
  WHERE message LIKE 'Invalid user%'
  EMIT CHANGES;

In the previous section Read the data in, if you did not have a real data source to connect to, you can now use ksqlDB to insert example events into the source topics with the following statements:

1
2
3
4
5
6
7
8
9
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-07-14T00:10:10.300Z', 'asgard.example.com', 0, 'Line protocol on Interface GigabitEthernet0/1, changed state to up', NULL);
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-05T20:11:13.138Z', 'asgard.example.com', 0, 'partition health measures for /var did not suffice - still using 96% of partition space', NULL);
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:11.333Z', 'asgard.example.com', 4, 'Invalid user asmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:12.333Z', 'asgard.example.com', 4, 'Invalid user bsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:13.333Z', 'asgard.example.com', 4, 'Invalid user csmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:14.333Z', 'asgard.example.com', 4, 'Invalid user dsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:15.333Z', 'asgard.example.com', 4, 'Invalid user esmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:16.333Z', 'asgard.example.com', 4, 'Invalid user fsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-05T20:13:13.138Z', 'asgard.example.com', 0, 'partition health measures for /var did not suffice - still using 96% of partition space', NULL);

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.