Detect and analyze SSH attacks
There are lots of ways SSH can be abused, but one of the most straightforward ways to detect suspicious activity is to monitor for rejected logins. This recipe processes Syslog data to detect failed logins, while streaming out those pairs of usernames and IP addresses. With ksqlDB, you can filter and react to unwanted events in real time to minimize damage rather than performing historical analysis of Syslog data from cold storage.
Step by step
Set up your environment
Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a
SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided
SQL commands using the Confluent Cloud ksqlDB editor.
Read the data in
Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular datasources and end systems in the cloud.
However, if you want to stream specific events that don't have an available connector in Confluent Cloud, you can run your own connector and send the data to your Kafka cluster.
This recipe shows how you can run your own connector locally.
To stream syslog data into a Kafka topic called
syslog, create the
Dockerfile below to bundle a Kafka Connect worker with the
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-syslog:1.3.4
Build the custom Docker image with this command:
docker build \
-t localbuild/connect_distributed_with_syslog:1.3.4 \
-f Dockerfile .
Next, create a
docker-compose.yml file with the following content, substituting your Confluent Cloud connection information:
CONNECT_BOOTSTRAP_SERVERS: < BOOTSTRAP SERVER >
# Connect worker
CONNECT_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
# Connect producer
CONNECT_PRODUCER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
# Connect consumer
CONNECT_CONSUMER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
Run the container with this:
Create a Syslog Source connector configuration file called
"connector.class" : "io.confluent.connect.syslog.SyslogSourceConnector",
"name" : "recipe-syslog-source",
"syslog.port" : "<port>",
"syslog.listener" : "TCP",
"kafka.topic" : "syslog",
"tasks.max" : "1"
Submit that connector to the connect worker:
curl -X POST -H "Content-Type: application/json" --data @connector-syslog.config http://localhost:8083/connectors
Now you should have Syslog messages being written to the
syslog topic in Confluent Cloud.
Optional: If you cannot connect to a real datasource with properly formatted data, or if you just want to execute this recipe without external dependencies, no worries! In the next section, you'll see how to test the recipe by inserting mock data into the streams, using the ksqlDB command
Process the syslog events by flagging events with invalid users, stripping out all the other unnecessary fields, and creating just a stream of relevant information. There are many ways to customize the resulting stream to fit the business needs: this example also demonstrates how to enrich the stream with a new field
FACILITY_DESCRIPTION with human-readable content.
The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block.
This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.
SET 'auto.offset.reset' = 'earliest';
-- Extract relevant fields from log messages
CREATE STREAM syslog (
) WITH (
KAFKA_TOPIC = 'syslog',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6
-- Create actionable stream of SSH attacks, filtering syslog messages where user is invalid,
-- and enriched with user and IP
CREATE STREAM ssh_attacks AS
FORMAT_TIMESTAMP(ts, 'yyyy-MM-dd HH:mm:ss') AS syslog_timestamp,
CASE WHEN facility = 0 THEN 'kernel messages'
WHEN facility = 1 THEN 'user-level messages'
WHEN facility = 2 THEN 'mail system'
WHEN facility = 3 THEN 'system daemons'
WHEN facility = 4 THEN 'security/authorization messages'
WHEN facility = 5 THEN 'messages generated internally by syslogd'
WHEN facility = 6 THEN 'line printer subsystem'
END AS facility_description,
SPLIT(REPLACE(message, 'Invalid user ', ''), ' from ') AS attack_user,
remote_address AS attack_ip
WHERE message LIKE 'Invalid user%'
In the previous section Read the data in, if you did not have a real data source to connect to, you can now use ksqlDB to insert example events into the source topics with the following statements:
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-07-14T00:10:10.300Z', 'asgard.example.com', 0, 'Line protocol on Interface GigabitEthernet0/1, changed state to up', NULL);
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-05T20:11:13.138Z', 'asgard.example.com', 0, 'partition health measures for /var did not suffice - still using 96% of partition space', NULL);
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:11.333Z', 'asgard.example.com', 4, 'Invalid user asmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:12.333Z', 'asgard.example.com', 4, 'Invalid user bsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:13.333Z', 'asgard.example.com', 4, 'Invalid user csmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:14.333Z', 'asgard.example.com', 4, 'Invalid user dsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:15.333Z', 'asgard.example.com', 4, 'Invalid user esmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-08T02:11:16.333Z', 'asgard.example.com', 4, 'Invalid user fsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES ('2021-08-05T20:13:13.138Z', 'asgard.example.com', 0, 'partition health measures for /var did not suffice - still using 96% of partition space', NULL);
To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the
DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, you'll need to remove those as well.