Welcome to the live session!
At Confluent we developed ksqlDB, the database purpose-built for stream processing applications. ksqlDB is built on top of Kafka Streams, powerful Java library for enriching, transforming, and processing real-time streams of data. Having Kafka Streams at its core means ksqlDB is built on well-designed and easily understood layers of abstractions. So now, beginners and experts alike can easily unlock and fully leverage the power of Kafka in a fun and accessible way.
Click Create topic on my own or if you already created a topic, click on the + Add topic button on the top right side of the table.
Confluent offers 120+ pre-built connectors, enabling you to modernize your entire data architecture even faster. These connectors also provide you peace-of-mind with enterprise-grade security, reliability, compatibility, and support.
{
"name": "DatagenSourceConnector_0",
"config": {
"connector.class": "DatagenSource",
"name": "DatagenSourceConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<add_your_api_key>",
"kafka.api.secret": "<add_your_api_secret_key>",
"kafka.topic": "ratings",
"output.data.format": "AVRO",
"quickstart": "RATINGS",
"tasks.max": "1"
}
}
Click Create topic on my own or if you already created a topic, click on the + Add topic button on the top right side of the table.
{
"name": "MySqlCdcSourceConnector_0",
"config": {
"connector.class": "MySqlCdcSource",
"name": "MySqlCdcSourceConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<add_your_api_key>",
"kafka.api.secret": "<add_your_api_secret_key>",
"database.hostname": "<will_be_given_during_lab>",
"database.port": "3306",
"database.user": "<will_be_given_during_lab>",
"database.password": "<will_be_given_during_lab>",
"database.server.name": "mysql",
"database.ssl.mode": "preferred",
"snapshot.mode": "when_needed",
"output.data.format": "AVRO",
"after.state.only": "true",
"tasks.max": "1"
}
}
Click Create topic on my own or if you already created a topic, click on the + Add topic button on the top right side of the table.
{
"name": "DatagenSourceConnector_1",
"config": {
"connector.class": "DatagenSource",
"name": "DatagenSourceConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<add_your_api_key>",
"kafka.api.secret": "<add_your_api_secret_key>",
"kafka.topic": "users",
"output.data.format": "AVRO",
"quickstart": "USERS",
"tasks.max": "1"
}
}
You should have a valid project in Google Cloud in order to complete this lab.
Dataset ID: confluent_bigquery_demo
Data location: same as your Confluent Cloud Cluster (in this lab us-west-4).
For detailed instructions refer to our documentation.
Name: confluent-bucket-demo
Location type: Region. Pick the same region as your Confluent Cloud cluster (In this lab we use us-west-4).
Storage class: Standard
For detailed instructions refer to our documentation.
Instance name: confluent-bigtable-demo
Instance ID: <auto_generated>
Storage type: SSD
Location: Region. Pick the same region as your Confluent Cloud cluster (In this lab we use us-west-4).
For detailed instructions refer to our documentation
You need to create a GCP Service Account so the Confluent Connectors can access GCP resources. For the purpose of this lab, we will create 1 Service Account and assign 3 roles to it. However, you should always follow best practices for production workloads.
Use the search bar and search for IAM & Admin and click on the result.
Expand the left hand side menu and go to Service Accounts and click on the Create Service Account and enter the following
Service account name: confluent-demo
Description: Account to be used during Confluent Cloud Live Lab
Role: BigQuery Admin, Bigtable Administrator, Storage Admin
Click Grant and then Done.
Once the account is created, click on the 3 dots in Actions columns and hit Manage Keys.
Click on Add Key and then on Create a new key.
Select JSON
as key type. Keep this key somewhere safe as you will need it in later steps. The key resembles the example below:
{
"type": "service_account",
"project_id": "confluent-842583",
"private_key_id": "...omitted...",
"private_key": "-----BEGIN PRIVATE ...omitted... =\n-----END PRIVATE KEY-----\n",
"client_email": "confluent2@confluent-842583.iam.gserviceaccount.com",
"client_id": "...omitted...",
"auth_uri": "https://accounts.google.com/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/metadata/confluent2%40confluent-842583.iam.gserviceaccount.com"
}
Now that you have data flowing through Confluent, you can now easily build stream processing applications using ksqlDB. You are able to continuously transform, enrich, join, and aggregate your data using simple SQL syntax. You can gain value from your data directly from Confluent in real-time. Also, ksqlDB is a fully managed service within Confluent Cloud with a 99.9% uptime SLA. You can now focus on developing services and building your data pipeline while letting Confluent manage your resources for you.
With ksqlDB, you have the ability to leverage streams and tables from your topics in Confluent. A stream in ksqlDB is a topic with a schema and it records the history of what has happened in the world as a sequence of events.
You can interact with ksqlDB through the Editor. You can create a stream by using the CREATE STREAM statement and a table using the CREATE TABLE statement. If you’re interested in learning more about ksqlDB and the differences between streams and tables, I recommend reading these two blogs here and here or watch ksqlDB 101 course on Confluent Developer webiste.
To write streaming queries against topics, you will need to register the topics with ksqlDB as a stream and/or table.
ratings
topic.CREATE STREAM RATINGS_OG WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');
RATINGS_OG
stream by running the following query.SELECT * FROM RATINGS_OG EMIT CHANGES;
test
channel.CREATE STREAM RATINGS_LIVE AS
SELECT *
FROM RATINGS_OG
WHERE LCASE(CHANNEL) NOT LIKE '%test%'
EMIT CHANGES;
RATINGS_LIVE
stream by running the following query.SELECT * FROM RATINGS_LIVE EMIT CHANGES;
Stop the running query by clicking on Stop.
Create a stream from customers topic.
CREATE STREAM CUSTOMERS_INFORMATION
WITH (KAFKA_TOPIC ='mysql.demo.CUSTOMERS_INFO',
KEY_FORMAT ='JSON',
VALUE_FORMAT='AVRO');
customers
table based on customers_information
stream you just created.CREATE TABLE CUSTOMERS WITH (FORMAT='AVRO') AS
SELECT id AS customer_id,
LATEST_BY_OFFSET(first_name) AS first_name,
LATEST_BY_OFFSET(last_name) AS last_name,
LATEST_BY_OFFSET(dob) AS dob,
LATEST_BY_OFFSET(email) AS email,
LATEST_BY_OFFSET(gender) AS gender,
LATEST_BY_OFFSET(club_status) AS club_status
FROM CUSTOMERS_INFORMATION
GROUP BY id;
customers
table by running the following query.SELECT * FROM CUSTOMERS;
Now that we have a stream of ratings data and customer information, we can perform a join query to enrich our data stream.
Create a new stream by running the following statement.
CREATE STREAM RATINGS_WITH_CUSTOMER_DATA WITH (KAFKA_TOPIC='ratings-enriched') AS
SELECT C.CUSTOMER_ID,
C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
C.DOB,
C.GENDER,
C.CLUB_STATUS,
C.EMAIL,
R.RATING_ID,
R.MESSAGE,
R.STARS,
R.CHANNEL,
TIMESTAMPTOSTRING(R.ROWTIME,'yyyy-MM-dd''T''HH:mm:ss.SSSZ') AS RATING_TS
FROM RATINGS_LIVE R
INNER JOIN CUSTOMERS C
ON R.USER_ID = C.CUSTOMER_ID
EMIT CHANGES;
SELECT * FROM RATINGS_WITH_CUSTOMER_DATA EMIT CHANGES;
{
"name": "BigQuerySinkConnector_0",
"config": {
"topics": "ratings-enriched",
"input.data.format": "AVRO",
"connector.class": "BigQuerySink",
"name": "BigQuerySinkConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<add_your_api_key>",
"kafka.api.secret": "<add_your_api_secret_key>",
"keyfile": "<add_the_JSON_key_created_earlier>",
"project": "<add_your_gcp_project_id>",
"datasets": "confluent_bigquery_demo",
"auto.create.tables": "true",
"auto.update.schemas": "true",
"tasks.max": "1",
"transforms": "Transform,Transform3 ",
"transforms.Transform.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Transform.spec": "DOB:string",
"transforms.Transform3.type": "org.apache.kafka.connect.transforms.MaskField$Value",
"transforms.Transform3.fields": "DOB",
"transforms.Transform3.replacement": "<xxxx-xx-xx>"
}
}
DATE
and we want to replace it with a string pattern, we will achieve our goal in a 2 step process. First, we will cast the date of birth from DATE
to String
, then we will replace that String
value with a pattern we have pre-defined.For more information on Single Message Transforms (SMT) refer to our documentation or watch the series by Robin Moffatt, staff developer advocate at Confluent here.
ratings_live
stream in Cloud Storage and not the customers’ information.{
"name": "GcsSinkConnector_0",
"config": {
"topics": "pksqlc**-RATINGS_LIVE",
"input.data.format": "AVRO",
"connector.class": "GcsSink",
"name": "GcsSinkConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<add_your_api_key>",
"kafka.api.secret": "<add_your_api_secret_key>",
"gcs.credentials.config": "<add_the_JSON_key_created_earlier>”,
"gcs.bucket.name": "confluent-bucket-demo",
"output.data.format": "JSON",
"time.interval": "HOURLY",
"flush.size": "1000",
"tasks.max": "1"
}
}
userid
and regionid
fields to make the rowkey
in BigTableString
.{
"name": "BigTableSinkConnector_0",
"config": {
"topics": "users",
"input.data.format": "AVRO",
"input.key.format": "STRING",
"connector.class": "BigTableSink",
"name": "BigTableSinkConnector_0",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<add_your_api_key>",
"kafka.api.secret": "<add_your_api_secret_key>",
"gcp.bigtable.credentials.json": "<add_the_JSON_key_created_earlier>",
"gcp.bigtable.project.id": "<add_your_gcp_project_id>",
"gcp.bigtable.instance.id": "confluent-bigtable-demo",
"insert.mode": "UPSERT",
"table.name.format": "confluent-${topic}",
"bigtable.row.key.definition": "userid,regionid",
"bigtable.row.key.delimiter": "#",
"auto.create.tables": "true",
"tasks.max": "1",
"transforms": "Transform ",
"transforms.Transform.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.Transform.target.type": "string",
"transforms.Transform.field": "registertime",
"transforms.Transform.format": "yyyy-MM-dd"
}
}
Deleting the resources you created during this lab will prevent you from incurring additional charges.