Using Single Message Transforms with Debezium PostgreSQL Connector

379 views
Skip to first unread message

Jim Glennon

unread,
May 16, 2017, 11:55:51 AM5/16/17
to Confluent Platform

The Debezium PostgreSQL connector creates and publishes to a separate topic for each database table. I am trying to determine whether it’s possible for events for all tables to be routed to a single topic instead of a topic per table. There are a number of reasons why this is desirable, including:

  • reducing the number of topics being used since the practical limit of topics is 3000 to 4000
  • the need to maintain Tx order across topics.

After talking to Randall Hauck and Ewen Cheslack-Postava at the Kafka Summit, it sounded like this might be achievable using the RegexRouter Transform. However, after giving this a quick test, I found that it does not work. Since the schema for each table is different, when it is registered with the SchemaRegistry (I am also using the AvroConverter), the transformed topic name is used (instead of the original/unique topic name which I believe would have allowed this to work). 


Does anyone know if it’s possible to do this (or another way)? 

Jim Glennon

unread,
May 16, 2017, 1:01:38 PM5/16/17
to Confluent Platform
A little more info on this. In my test, I configured the Debezium connector to transform to the same topic for all DB tables. I then performed inserts on two tables. This is the error that's encountered when the connector tries to publish the message for the second table insert.

[2017-05-16 15:16:52,110] ERROR Task debezium-connector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)

org.apache.kafka.connect.errors.DataException: Node1.public.onetopic

at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)

at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:196)

at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Key","namespace":"Node1.public.weather","fields":[{"name":"city","type":"string"}],"connect.name":"Node1.public.weather.Key"}

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409

at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)

at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)

at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:245)

at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:237)

at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)

at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)

at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)

at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)

at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:103)

at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)

at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:196)

at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)

at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)

at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

[2017-05-16 15:16:52,120] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)

[2017-05-16 15:16:52,120] INFO Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:689)

Ewen Cheslack-Postava

unread,
May 18, 2017, 5:44:39 PM5/18/17
to Confluent Platform
If you put data from different tables into the same topic, you'll have to accept that they will have different schemas that will be incompatible -- after all, the structure of each table is going to be different. The approach you are using should work, you'll just need to disable the compatibility check in the schema registry. See http://docs.confluent.io/current/schema-registry/docs/api.html#compatibility and http://docs.confluent.io/current/schema-registry/docs/api.html#put--config for how to do this.

Note that this will make writing an application to use this data more difficult. You won't be able to use SpecificRecords (which leaves you with the usable, but not as simple or elegant GenericRecord interface) and your application will have to understand a bunch of different schemas since they'll all be mixed together in the same topic.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5b85b972-8000-403b-9e59-a6ba2d7636bb%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Jim Glennon

unread,
May 18, 2017, 6:06:06 PM5/18/17
to Confluent Platform
Hi Ewen, 

Thanks for the reply. Since my original I used the AvroConverter source as a starting point and made a new converter where I changed fromConnectData to pass the original topic name (which I get from the Connect Schema name) to the Serializer. With this approach, the Avro schemas are still registered as if in different topics, but the messages are public to a single topic. So far it seems to be working fine and accomplishing what I want.

Are there any drawbacks that you can think of to using this approach?

Thanks.

~jim
To post to this group, send email to confluent...@googlegroups.com.

Jim Glennon

unread,
May 18, 2017, 6:07:51 PM5/18/17
to Confluent Platform
ugh, i should proofread better, that should have read:

Thanks for the reply. Since my original post I used the AvroConverter source as a starting point and made a new converter where I changed fromConnectData to pass the original topic name (which I get from the Connect Schema name) to the Serializer. With this approach, the Avro schemas are still registered as if in different topics, but the messages are published to a single topic. So far it seems to be working fine and accomplishing what I want.

Ewen Cheslack-Postava

unread,
May 18, 2017, 6:50:00 PM5/18/17
to Confluent Platform
The main drawback is that it won't integrate with everything using the standard AvroConverter. In particular, there are ways this can break Kafka Connect downstream due to the way the AvroConverter determines version information from the schema registry based on the topic, but you'll have registered the schema to a different topic/subject. However, normal downstream consumers will still be able to decode the messages since they just rely on the schema ID.

Also, of course there is the overhead of maintaining a forked version of the converter.

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Jim Glennon

unread,
May 22, 2017, 4:29:19 PM5/22/17
to Confluent Platform
Can you elaborate on this? How/when does the AvroConverter do this? I took a look at the code and it wasn't apparent where this is being done.

~jim


On Thursday, May 18, 2017 at 6:50:00 PM UTC-4, Ewen Cheslack-Postava wrote:
The main drawback is that it won't integrate with everything using the standard AvroConverter. In particular, there are ways this can break Kafka Connect downstream due to the way the AvroConverter determines version information from the schema registry based on the topic, but you'll have registered the schema to a different topic/subject. However, normal downstream consumers will still be able to decode the messages since they just rely on the schema ID.

Also, of course there is the overhead of maintaining a forked version of the converter.

-Ewen
On Thu, May 18, 2017 at 3:07 PM, Jim Glennon <jim.g...@enterprisedb.com> wrote:
ugh, i should proofread better, that should have read:

Thanks for the reply. Since my original post I used the AvroConverter source as a starting point and made a new converter where I changed fromConnectData to pass the original topic name (which I get from the Connect Schema name) to the Serializer. With this approach, the Avro schemas are still registered as if in different topics, but the messages are published to a single topic. So far it seems to be working fine and accomplishing what I want.

Are there any drawbacks that you can think of to using this approach?

Thanks.

~jim

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Ewen Cheslack-Postava

unread,
May 22, 2017, 11:26:00 PM5/22/17
to Confluent Platform
It's actually the AvroDeserializer that does it, but only the AvroConverter currently triggers that path. The lookup is done here: https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L142-L163

-Ewen

On Mon, May 22, 2017 at 1:29 PM, Jim Glennon <jim.g...@enterprisedb.com> wrote:
Can you elaborate on this? How/when does the AvroConverter do this? I took a look at the code and it wasn't apparent where this is being done.

~jim

On Thursday, May 18, 2017 at 6:50:00 PM UTC-4, Ewen Cheslack-Postava wrote:
The main drawback is that it won't integrate with everything using the standard AvroConverter. In particular, there are ways this can break Kafka Connect downstream due to the way the AvroConverter determines version information from the schema registry based on the topic, but you'll have registered the schema to a different topic/subject. However, normal downstream consumers will still be able to decode the messages since they just rely on the schema ID.

Also, of course there is the overhead of maintaining a forked version of the converter.

-Ewen
On Thu, May 18, 2017 at 3:07 PM, Jim Glennon <jim.g...@enterprisedb.com> wrote:
ugh, i should proofread better, that should have read:

Thanks for the reply. Since my original post I used the AvroConverter source as a starting point and made a new converter where I changed fromConnectData to pass the original topic name (which I get from the Connect Schema name) to the Serializer. With this approach, the Avro schemas are still registered as if in different topics, but the messages are published to a single topic. So far it seems to be working fine and accomplishing what I want.

Are there any drawbacks that you can think of to using this approach?

Thanks.

~jim

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.

Jim Glennon

unread,
May 23, 2017, 9:04:09 AM5/23/17
to Confluent Platform
ok i see. so just to confirm, if we are only using the Deserializer with the KafkaConsumer, this wouldn't pose a problem for us?

~jim


On Monday, May 22, 2017 at 11:26:00 PM UTC-4, Ewen Cheslack-Postava wrote:
It's actually the AvroDeserializer that does it, but only the AvroConverter currently triggers that path. The lookup is done here: https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L142-L163

-Ewen
On Mon, May 22, 2017 at 1:29 PM, Jim Glennon <jim.g...@enterprisedb.com> wrote:
Can you elaborate on this? How/when does the AvroConverter do this? I took a look at the code and it wasn't apparent where this is being done.

~jim

On Thursday, May 18, 2017 at 6:50:00 PM UTC-4, Ewen Cheslack-Postava wrote:
The main drawback is that it won't integrate with everything using the standard AvroConverter. In particular, there are ways this can break Kafka Connect downstream due to the way the AvroConverter determines version information from the schema registry based on the topic, but you'll have registered the schema to a different topic/subject. However, normal downstream consumers will still be able to decode the messages since they just rely on the schema ID.

Also, of course there is the overhead of maintaining a forked version of the converter.

-Ewen
On Thu, May 18, 2017 at 3:07 PM, Jim Glennon <jim.g...@enterprisedb.com> wrote:
ugh, i should proofread better, that should have read:

Thanks for the reply. Since my original post I used the AvroConverter source as a starting point and made a new converter where I changed fromConnectData to pass the original topic name (which I get from the Connect Schema name) to the Serializer. With this approach, the Avro schemas are still registered as if in different topics, but the messages are published to a single topic. So far it seems to be working fine and accomplishing what I want.

Are there any drawbacks that you can think of to using this approach?

Thanks.

~jim

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.

Randall Hauch

unread,
May 24, 2017, 1:19:40 PM5/24/17
to Confluent Platform
Debezium provides a ByLogicalTableRouter SMT that can be used to do exactly this, and it can modify the message keys to include the table names or other field values. See also DBZ-121 for a bit of history and examples.

This routing does work fine if you're using the JsonConverter or another Converter that doesn't assume behavior across multiple messages. But if you're using the AvroConverter, all of this is still subject to the conflicting Avro schema issues that Ewen mentioned. 

Regards,

Randall
Reply all
Reply to author
Forward
0 new messages