Hi all!
I am creating a streaming application that works something like so:
1. Events (let's call them orders) are published directly to a Kafka topic (orders).
2. I want to enrich the events with lookup data from database data that has been Debezium:ed to another Kafka topic (customers).
3. There is a stream created over the orders topic, and a table created over the customers topic.
The format of the events looks like so:
order
{
ordered int,
customerkey string,
vale int
}
customer
{
customerkey string,
customername string
}
In the underlying customer table, `customerkey` is primary key.
The database is SQL Server, and I use the SQL Server Debezium source connector (version: 1.9.3.Final) to stream the data to the topic.
I get the data from the database to the topic just fine. The problem is the format of the message key (the table's primary key). Let us say one row in the database looks like so:
customerkey customername
1001-5001 Acme Corp
Dependent on what key converter I use, the underlying data in the topic ends up looking like:
// string converter (i.e. KEY_FORMAT='Kafka)
key: [Struct{Custom@3472325019167633789/7309988044373569840], value: {"CustomerKey":"1000-5001","CustomerName":"Acme Corp"
// jsonconverter (i.e. KEY_FORMAT='JSON)
key: {"CustomerKey":"1000-5001"}, value: {"CustomerKey":"1000-5001","CustomerName":"Acme Corp"}
In my streaming application, I want to join between the orders `customerkey` and customers `customerkey`. With the key looking as it is, the join is not working.
I have looked high and low for a solution to this, and I came across the `extractKeyfromStruct` and the type `ExtractField$Key`. I configured my connector like so:
<<<<<<<<<<<<<<<<<<<<<
{
"name": "customer-extract",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
[snipped database info],
"transforms": "unwrap,reroute,extractKeyfromStruct",
"transforms.extractKeyfromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKeyfromStruct.field":"CUSTOMERKEY ",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "none",
"transforms.reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.reroute.key.enforce.uniqueness":"false",
"transforms.reroute.topic.regex": "azuresql.dbo.tb_(.*)",
"transforms.reroute.topic.replacement": "customers_cdc_extract",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter.schemas.enable": "true",
"include.schema.changes": "false"
}
}
>>>>>>>>>>>>>>>>>>>
After I deploy the configuration and check the connector's status, I get an error that `CUSTOMERKEY ` is unknown:
<<<<<<<<<<<<<<<<<<
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.IllegalArgumentException: Unknown field: CUSTOMERKEY\n\tat org.apache.kafka.connect.transforms.ExtractField.apply(ExtractField.java:65)\n\tat
>>>>>>>>>>>>>>>>>>
I have tried all various capitalization options of the field name: `CustomerKey` (as in the db), `customerkey`, and as per above `CUSTOMERKEY`. Each time I get the `Unknown` error.
I am not using the schema registry, but as you see in the connector config, I have schemas enabled both for value and key (I have also tried with no schema, same error).
Right now I don't what else to try, so I ask the experts!
Thanks!
Niels