Skip to content

Handle corrupted data from Salesforce

Salesforce sends a notification when a change to a Salesforce record occurs as part of a create, update, delete, or undelete operation. However, if there is corrupt data in Salesforce, it sends a gap event instead of a change event, and these gap events should be properly handled to avoid discrepancies between Salesforce reports and internal dashboards. This recipe demonstrates how to process Salesforce data and filter corrupt events, which allows a downstream application to appropriately process and reconcile those events for accurate reporting and analytics.

Salesforce

Step by step

Set up your environment

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application. ksqlDB supports a SQL language for processing the data in real time (and will soon support connector integration for reading and writing data to other data sources and sinks). Execute the recipe with the provided SQL commands using the Confluent Cloud ksqlDB editor.

Read the data in

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. This recipe shows sample connector configuration(s) below, each of which should be created separately. You can also substitute your own connectors to connect to any supported data source.

Use Avro so ksqlDB can automatically detect the schema.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
{
  "connector.class"            : "SalesforceCdcSource",
  "name"                       : "SalesforceCdcSourceConnector",
  "kafka.api.key"              : "<my-kafka-api-key>",
  "kafka.api.secret"           : "<my-kafka-api-secret>",
  "kafka.topic"                : "sfdc.cdc.raw",
  "salesforce.username"        : "<my-sfdc-username>",
  "salesforce.password"        : "<my-sfdc-password>",
  "salesforce.password.token"  : "<sfdc-password-token>",
  "salesforce.consumer.key"    : "<sfdc-consumer-key>",
  "salesforce.consumer.secret" : "<sfdc-consumer-secret>",
  "salesforce.cdc.name"        : "AccountChangeEvent",
  "output.data.format"         : "AVRO",
  "tasks.max"                  : "1"
}

ksqlDB code


NOTE: The Confluent Cloud Console does not allow you to execute this code in the ksqlDB editor as a single block. This limitation will be removed in the next release, but until then, copy and paste each statement into the editor and execute them one at a time.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
SET 'auto.offset.reset' = 'earliest';

-- Register the stream of SFDC CDC Opportunities
CREATE STREAM stream_sfdc_cdc_opportunity_raw
WITH (
  KAFKA_TOPIC = 'sfdc.cdc.raw',
  VALUE_FORMAT = 'AVRO',
  PARTITIONS = 6
);

-- Create a new stream with Replay ID and Change Event Header for just Gap Events
CREATE STREAM stream_sfdc_cdc_opportunity_change_log AS
  SELECT
    STREAM_SFDC_CDC_OPPORTUNITY_RAW.REPLAYID AS REPLAYID,
    STREAM_SFDC_CDC_OPPORTUNITY_RAW.CHANGEEVENTHEADER AS CHANGEEVENTHEADER
  FROM stream_sfdc_cdc_opportunity_raw
  WHERE UCASE(STREAM_SFDC_CDC_OPPORTUNITY_RAW.CHANGEEVENTHEADER->CHANGETYPE) LIKE 'GAP%'
  EMIT CHANGES;

Cleanup

To clean up the ksqlDB resources created by this recipe, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is also deleted, asynchronously.

1
2
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, you'll need to remove those as well.