The Debezium PostgreSQL connector creates and publishes to a separate topic for each database table. I am trying to determine whether it’s possible for events for all tables to be routed to a single topic instead of a topic per table. There are a number of reasons why this is desirable, including:
After talking to Randall Hauck and Ewen Cheslack-Postava at the Kafka Summit, it sounded like this might be achievable using the RegexRouter Transform. However, after giving this a quick test, I found that it does not work. Since the schema for each table is different, when it is registered with the SchemaRegistry (I am also using the AvroConverter), the transformed topic name is used (instead of the original/unique topic name which I believe would have allowed this to work).
Does anyone know if it’s possible to do this (or another way)?
[2017-05-16 15:16:52,110] ERROR Task debezium-connector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.DataException: Node1.public.onetopic
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:196)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"Key","namespace":"Node1.public.weather","fields":[{"name":"city","type":"string"}],"connect.name":"Node1.public.weather.Key"}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:245)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:237)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:103)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:196)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2017-05-16 15:16:52,120] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:142)
[2017-05-16 15:16:52,120] INFO Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:689)
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/5b85b972-8000-403b-9e59-a6ba2d7636bb%40googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/9886b4de-13b0-4665-8aaa-2885668848d7%40googlegroups.com.
The main drawback is that it won't integrate with everything using the standard AvroConverter. In particular, there are ways this can break Kafka Connect downstream due to the way the AvroConverter determines version information from the schema registry based on the topic, but you'll have registered the schema to a different topic/subject. However, normal downstream consumers will still be able to decode the messages since they just rely on the schema ID.Also, of course there is the overhead of maintaining a forked version of the converter.-Ewen
On Thu, May 18, 2017 at 3:07 PM, Jim Glennon <jim.g...@enterprisedb.com> wrote:
ugh, i should proofread better, that should have read:--Thanks for the reply. Since my original post I used the AvroConverter source as a starting point and made a new converter where I changed fromConnectData to pass the original topic name (which I get from the Connect Schema name) to the Serializer. With this approach, the Avro schemas are still registered as if in different topics, but the messages are published to a single topic. So far it seems to be working fine and accomplishing what I want.Are there any drawbacks that you can think of to using this approach?Thanks.~jim
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
Can you elaborate on this? How/when does the AvroConverter do this? I took a look at the code and it wasn't apparent where this is being done.
~jim
On Thursday, May 18, 2017 at 6:50:00 PM UTC-4, Ewen Cheslack-Postava wrote:
The main drawback is that it won't integrate with everything using the standard AvroConverter. In particular, there are ways this can break Kafka Connect downstream due to the way the AvroConverter determines version information from the schema registry based on the topic, but you'll have registered the schema to a different topic/subject. However, normal downstream consumers will still be able to decode the messages since they just rely on the schema ID.Also, of course there is the overhead of maintaining a forked version of the converter.-Ewen
On Thu, May 18, 2017 at 3:07 PM, Jim Glennon <jim.g...@enterprisedb.com> wrote:
ugh, i should proofread better, that should have read:--Thanks for the reply. Since my original post I used the AvroConverter source as a starting point and made a new converter where I changed fromConnectData to pass the original topic name (which I get from the Connect Schema name) to the Serializer. With this approach, the Avro schemas are still registered as if in different topics, but the messages are published to a single topic. So far it seems to be working fine and accomplishing what I want.Are there any drawbacks that you can think of to using this approach?Thanks.~jim
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsubscribe@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/9886b4de-13b0-4665-8aaa-2885668848d7%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/9cd17d81-ef1e-401f-bf03-2c5ffb39aa94%40googlegroups.com.
It's actually the AvroDeserializer that does it, but only the AvroConverter currently triggers that path. The lookup is done here: https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L142-L163-Ewen
On Mon, May 22, 2017 at 1:29 PM, Jim Glennon <jim.g...@enterprisedb.com> wrote:
Can you elaborate on this? How/when does the AvroConverter do this? I took a look at the code and it wasn't apparent where this is being done.
~jim
On Thursday, May 18, 2017 at 6:50:00 PM UTC-4, Ewen Cheslack-Postava wrote:
The main drawback is that it won't integrate with everything using the standard AvroConverter. In particular, there are ways this can break Kafka Connect downstream due to the way the AvroConverter determines version information from the schema registry based on the topic, but you'll have registered the schema to a different topic/subject. However, normal downstream consumers will still be able to decode the messages since they just rely on the schema ID.Also, of course there is the overhead of maintaining a forked version of the converter.-Ewen
On Thu, May 18, 2017 at 3:07 PM, Jim Glennon <jim.g...@enterprisedb.com> wrote:
ugh, i should proofread better, that should have read:--Thanks for the reply. Since my original post I used the AvroConverter source as a starting point and made a new converter where I changed fromConnectData to pass the original topic name (which I get from the Connect Schema name) to the Serializer. With this approach, the Avro schemas are still registered as if in different topics, but the messages are published to a single topic. So far it seems to be working fine and accomplishing what I want.Are there any drawbacks that you can think of to using this approach?Thanks.~jim
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/9886b4de-13b0-4665-8aaa-2885668848d7%40googlegroups.com.
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.