Skip to content

Analyze datacenter power usage

For businesses that provide cloud infrastructure across multiple data centers with isolated tenants, you may have an accounting unit to accurately monitor and invoice your customers. Oftentimes these data centers consume large amounts of electricity and are constructed with smart electrical panels that control the power supplies to multiple customer tenants. This recipe demonstrates how to accurately bill each customer by capturing and analyzing telemetry data from these smart panels.

data center

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. This recipe shows sample connector configuration(s) below, each of which should be created separately. You can also substitute your own connectors to connect to any supported data source.

Our datacenter power analysis applications require data from two different sources: customer tenant information and smart control panel readings.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
{
  "connector.class"       : "MySqlCdcSource",
  "name"                  : "Customer_Tenant_Source",
  "kafka.api.key"         : "<my-kafka-api-key>",
  "kafka.api.secret"      : "<my-kafka-api-secret>",
  "database.hostname"     : "<db-hostname>",
  "database.port"         : "3306",
  "database.user"         : "<db-user>",
  "database.password"     : "<db-password>",
  "database.server.name"  : "mysql",
  "database.whitelist"    : "customer",
  "table.includelist"     : "customer.tenant",
  "snapshot.mode"         : "initial",
  "output.data.format"    : "AVRO",
  "tasks.max"             : "1"
}

{
  "connector.class"       : "MqttSource",
  "name"                  : "Smart_Panel_Source",
  "kafka.api.key"         : "<my-kafka-api-key>",
  "kafka.api.secret"      : "<my-kafka-api-secret>",
  "kafka.topic"           : "panel-readings",
  "mqtt.server.uri"       : "tcp://<mqtt-server-hostname>:1881",
  "mqtt.topics"           : "<mqtt-topic>",
  "tasks.max"             : "1"
}

Optional: If you cannot connect to a real datasource with properly formatted data, or if you just want to execute this recipe without external dependencies, no worries! In the next section, you'll see how to test the recipe by inserting mock data into the streams, using the ksqlDB command INSERT INTO.

ksqlDB code


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
SET 'auto.offset.reset' = 'earliest';

-- Create a Table for the captured tenant occupancy events
CREATE TABLE tenant_occupancy (
  tenant_id VARCHAR PRIMARY KEY,
  customer_id BIGINT
) WITH (
  KAFKA_TOPIC = 'tenant-occupancy',
  PARTITIONS = 6,
  KEY_FORMAT = 'JSON',
  VALUE_FORMAT = 'JSON'
);

-- Create a Stream for the power control panel telemetry data.
--   tenant_kwh_usage is reset by the device every month
CREATE STREAM panel_power_readings (
  panel_id BIGINT,
  tenant_id VARCHAR,
  panel_current_utilization DOUBLE,
  tenant_kwh_usage BIGINT
) WITH (
  KAFKA_TOPIC = 'panel-readings',
  PARTITIONS = 6,
  KEY_FORMAT = 'JSON',
  VALUE_FORMAT = 'JSON'
);

-- Create a filtered Stream of panel readings registering power usage >= 85%
--  good for determining panels which are drawing a high electrical load
CREATE STREAM overloaded_panels AS 
  SELECT panel_id, tenant_id, panel_current_utilization 
    FROM panel_power_readings 
    WHERE panel_current_utilization >= 0.85
  EMIT CHANGES;

-- Create a stream of billable power events 
--  the tenant_kwh_usage field is the aggregate amount of power used in the
--  current month 
CREATE STREAM billable_power AS 
  SELECT 
      FORMAT_TIMESTAMP(FROM_UNIXTIME(panel_power_readings.ROWTIME), 'yyyy-MM') 
        AS billable_month,
      tenant_occupancy.customer_id as customer_id,
      tenant_occupancy.tenant_id as tenant_id, 
      panel_power_readings.tenant_kwh_usage as tenant_kwh_usage
    FROM panel_power_readings
    INNER JOIN tenant_occupancy ON 
      panel_power_readings.tenant_id = tenant_occupancy.tenant_id
  EMIT CHANGES;

-- Create a table that can be queried for billing reports
CREATE TABLE billable_power_report WITH (KEY_FORMAT = 'JSON') AS
  SELECT customer_id, tenant_id, billable_month, MAX(tenant_kwh_usage) as kwh
    FROM billable_power
    GROUP BY tenant_id, customer_id, billable_month;

In the previous section Read the data in, if you did not have a real data source to connect to, you can now use ksqlDB to insert example events into the source topics with the following statements:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
-- tenant_id is in the form of a resource name used to indicate the 
--  data center provider, country, regional locale, and tenant id
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:eqix:us:chi1:12', 924);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:eqix:us:chi1:10', 243);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:kddi:eu:ber1:15', 924);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:kddi:eu:ber1:20', 123);
INSERT INTO tenant_occupancy (tenant_id, customer_id) VALUES ('dc:kddi:cn:hnk2:11', 243);

-- power readings contain two distinct readings. The current total utilization of the
--  panel, and the monthly total wattage usage for the referenced tenant
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:eqix:us:chi1:12', 1.05, 1034);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (2, 'dc:eqix:us:chi1:10', 0.85, 867);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:kddi:eu:ber1:15', 0.54, 345);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (2, 'dc:kddi:eu:ber1:20', 0.67, 288);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:kddi:cn:hnk2:11', 1.11, 1119);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:eqix:us:chi1:12', 1.01, 1134);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (2, 'dc:eqix:us:chi1:10', 0.75, 898);
INSERT INTO panel_power_readings (panel_id, tenant_id, panel_current_utilization, tenant_kwh_usage) VALUES (1, 'dc:kddi:cn:hnk2:11', 1.10, 1201);

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.

Explanation

Typically, customer information would be sourced from an existing database. As customer occupancy changes, tables in the database are updated and we can stream them into Kafka using Kafka Connect with change data capture. The earlier example of the MySqlCdcSource configuration could be used to capture changes from a customer database's tenant table into the Kafka cluster. This connector is provided as fully managed on Confluent Cloud.

Telemetry data may be sourced into Kafka in a variety of ways. MQTT is a popular source for the Internet of Things (IoT) devices, and smart electrical panels may provide this functionality out of the box. The MQTT Connector is available as fully managed on Confluent Cloud.

The current state of customer tenant occupancy can be represented with a ksqlDB TABLE. Events streamed into the tenant-occupancy topic represent a customer (customer_id) beginning an occupancy of a particular tenant (tenant_id). As events are observed on the tenant-occupancy topic, the table will model the current set of tenant occupants.

1
2
3
4
5
6
7
8
9
CREATE TABLE tenant_occupancy (
  tenant_id VARCHAR PRIMARY KEY,
  customer_id BIGINT
) WITH (
  KAFKA_TOPIC = 'tenant-occupancy',
  PARTITIONS = 6,
  KEY_FORMAT = 'JSON',
  VALUE_FORMAT = 'JSON'
);

You can query this table to determine which customer occupies which tenant.

1
SELECT * FROM tenant_occupancy EMIT CHANGES;

When customers leave a tenant, the source system will need to send a tombstone record (an event with a valid tenant_id key and a null value). ksqlDB will process the tombstone by removing the row with the given key from the table.

Panel sensor readings can be streamed directly into a topic or sourced from an upstream system. A STREAM captures the power readings when they flow from the smart panel into Kafka. Each event contains a panel identifier and the associated tenant, in addition to two power readings.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE STREAM panel_power_readings (
  panel_id BIGINT,
  tenant_id VARCHAR,
  panel_current_utilization DOUBLE,
  tenant_kwh_usage BIGINT
) WITH (
  KAFKA_TOPIC = 'panel-readings',
  PARTITIONS = 6,
  KEY_FORMAT = 'JSON',
  VALUE_FORMAT = 'JSON'
);
  • panel_current_utilization represents the percentage of total capacity of the panel and is useful for business continuation monitoring
  • tenant_kwh_usage provides the total amount of energy consumed by the tenant in the current month

A simple example for determining when a panel is overloaded is provided by:

1
2
3
4
5
CREATE STREAM overloaded_panels AS 
  SELECT panel_id, tenant_id, panel_current_utilization 
    FROM panel_power_readings 
    WHERE panel_current_utilization >= 0.85
  EMIT CHANGES;

This command filters the panel power readings for instances where utilization is 85% or higher. This stream could be used in a monitoring or alerting context to notify on-call personnel of a potential issue with the power supplies to the datacenter.

To provide billing reports, a STREAM is created that joins the panel sensor readings with the customer tenant information. Functions are used to create a billable month indicator along with the necessary fields from the joined stream and table.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
CREATE STREAM billable_power AS 
  SELECT 
      FORMAT_TIMESTAMP(FROM_UNIXTIME(panel_power_readings.ROWTIME), 'yyyy-MM') 
        AS billable_month,
      tenant_occupancy.customer_id as customer_id,
      tenant_occupancy.tenant_id as tenant_id, 
      panel_power_readings.tenant_kwh_usage as tenant_kwh_usage
    FROM panel_power_readings
    INNER JOIN tenant_occupancy ON 
      panel_power_readings.tenant_id = tenant_occupancy.tenant_id
  EMIT CHANGES;

Finally, the billable_power_report aggregates the billable_power stream into a TABLE that can be queried to create reports by month, customer, and tenant.

1
2
3
4
CREATE TABLE billable_power_report WITH (KEY_FORMAT = 'JSON') AS
  SELECT customer_id, tenant_id, billable_month, MAX(tenant_kwh_usage) as kwh
    FROM billable_power
    GROUP BY tenant_id, customer_id, billable_month;