org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!CachedSchemaRegistryClient client=new CachedSchemaRegistryClient("http://localhost:8081", 100);
GsonDeserializer<BalanceDebezium> debeziumDeserializer = new GsonDeserializer<>(BalanceDebezium.class);
SpecificAvroSerializer<gabriel.avro.Balance> balanceSerializer = new SpecificAvroSerializer<>(client);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("BalanceSource", String().deserializer(), debeziumDeserializer, "debezium.balance")
.addProcessor("BalanceProcessor", () -> balanceProcessor, "BalanceSource")
.addSink("BalanceSink", "sink.balance", String().serializer(), balanceSerializer, "BalanceProcessor");Hi guys,
My guess is that you use different de/serializers for key and value. When you read topic with kafka-avro-console-consumer, it implicitly uses avro de/serializers. But you probably use String serializer for key, so when you call it with print.key=true, it tries to deserialize string value into avro, which obviously is not possible.
I could tell more if you share config file for jdbc connector, specifically key and value de/serializers.
Jozef
“This e-mail message may contain confidential, commercial or privileged information that constitutes proprietary information of Xura, Inc. or its subsidiaries. If you are not the intended recipient of this message, you are hereby notified that any review, use or distribution of this information is absolutely prohibited and we request that you delete all copies and contact us by e-mailing to: secu...@xura.com. Thank You.”
Hi guys,
My guess is that you use different de/serializers for key and value. When you read topic with kafka-avro-console-consumer, it implicitly uses avro de/serializers. But you probably use String serializer for key, so when you call it with print.key=true, it tries to deserialize string value into avro, which obviously is not possible.
I could tell more if you share config file for jdbc connector, specifically key and value de/serializers.
Jozef
From: confluent-platform@googlegroups.com <confluent-platform@googlegroups.com> on behalf of Samuel Roux <rouxs...@gmail.com>
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/50aa2725-e0e5-4b79-a8a4-d1cd54c5546e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
“This e-mail message may contain confidential, commercial or privileged information that constitutes proprietary information of Xura, Inc. or its subsidiaries. If you are not the intended recipient of this message, you are hereby notified that any review, use or distribution of this information is absolutely prohibited and we request that you delete all copies and contact us by e-mailing to: secu...@xura.com. Thank You.”
--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/ZmDYh-Yd9Dg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/SN1PR19MB0527ED09C773C22BCE0811CC86870%40SN1PR19MB0527.namprd19.prod.outlook.com.
GsonDeserializer<BalanceDebezium> debeziumDeserializer = new GsonDeserializer<>(BalanceDebezium.class);
CachedSchemaRegistryClient client = new CachedSchemaRegistryClient("http://localhost:8081", 100);
SpecificAvroSerializer<Balance> balanceSerializer = new SpecificAvroSerializer<>(client);
SpecificAvroSerializer<Key> keySerializer = new SpecificAvroSerializer<>(client);
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("BalanceSource", String().deserializer(), debeziumDeserializer, TOPIC_DEBEZIUM_BALANCE)
.addProcessor("BalanceProcessor", () -> balanceProcessor, "BalanceSource")
.addSink("BalanceSink", TOPIC_BALANCE, keySerializer, balanceSerializer, "BalanceProcessor");name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
connection.url=jdbc:mysql://localhost:3306/sink
connection.user=root
connection.password=root
topics=balance
table.name.format=balance
pk.mode=record_value
pk.fields=accountId
insert.mode=upsert
auto.create=trueHi Gabriel,
Are you sure you serialize keys in avro format to kafka topic? If yes, you should read it with kafka-avro-console-consumer and property print.key=true.
If you cannot read it, you serialize it with different serializer and you need to specify it in you config file for schema-registry (not jdbc, sorry for confusion), see etc/schema-registry-standalone.properties for inspiration, look for key.converter.
Jozef
key.converter=org.apache.kafka.connect.storage.StringConverter