Avro Serialization - Error deserializing Avro message for id -1 | Unkow magic byte!

8,177 views
Skip to first unread message

Gabriel Queiroz

unread,
Dec 7, 2016, 9:42:34 AM12/7/16
to Confluent Platform
Hi,

I have a question about Avro Serialization using Topology Builder. I am trying to consume a topic in JSON and produce it to another topic in Avro using Topology Builder.

Right now I am able to do that, and consume from the topic using `kafka-avro-console-consumer` and I can see the messages procuded there without a problem.

The problem is that I want to use this topic with JDBC Sink Connect, but I am receiving the following error: 

org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: 
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:357)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:239)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:172)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:143)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

I think that it may be caused by the way that I am serializing the message to Avro...

Follow below how I am trying to serialize Avro with my Topology Builder:

CachedSchemaRegistryClient client=new CachedSchemaRegistryClient("http://localhost:8081"100);

GsonDeserializer<BalanceDebezium> debeziumDeserializer = new GsonDeserializer<>(BalanceDebezium.class);
SpecificAvroSerializer<gabriel.avro.Balance> balanceSerializer = new SpecificAvroSerializer<>(client);

TopologyBuilder builder = new TopologyBuilder();

builder
.addSource("BalanceSource", String().deserializer(), debeziumDeserializer, "debezium.balance")
           
.addProcessor("BalanceProcessor", () -> balanceProcessor, "BalanceSource")
           
.addSink("BalanceSink", "sink.balance", String().serializer(), balanceSerializer, "BalanceProcessor");

I've got the `SpeciifcAvroSerializer" from Wikipedia Avro Feed Example.

Anyone has an example on how to use an Avro Serilizer with Topology Builder?

Obs:
  • I tried to create a producer using the same schema but direct in terminal to see what it happens and the sink connect was able to deserialize the message, create the table, and insert into mysqldatabase.
  • The main idea behind was get a message produced in JSON by Debezium, normalize it, and then send it to another topic in Avro so Sink can insert int database. This workaround seems necessary since that Sink cannot understand messages produced by Debezium. Basically the same problem addressed in this topic https://groups.google.com/forum/#!topic/confluent-platform/-y1_27z2Zao.

Thank you very much,
Gabriel Queiroz.

Samuel Roux

unread,
Dec 8, 2016, 11:34:15 AM12/8/16
to Confluent Platform
Hi,

I think I'm facing the same issue.

I was trying to use Elastic Sink and got the "Unknown magic byte!" error.

One thing you can try is retrying to consume your topic with kafka-avro-console-consumer but with the property "print.key=true". This should show you the same error you've encounter in the JDBC Sink Connect.

Right now, my guesses (but I'm not sure of it) are that in the schema registry I got the "myTopic-value" schema but I don't have the "myTopic-key" one. If someone could tell me if I'm on the wrong way, that would've been appreciated.

Regards,

Samuel Roux

Jozef Koval

unread,
Dec 9, 2016, 5:37:12 AM12/9/16
to Confluent Platform

Hi guys,


My guess is that you use different de/serializers for key and value. When you read topic with kafka-avro-console-consumer, it implicitly uses avro de/serializers. But you probably use String serializer for key, so when you call it with print.key=true, it tries to deserialize string value into avro, which obviously is not possible.


I could tell more if you share config file for jdbc connector, specifically key and value de/serializers.


Jozef


From: confluent...@googlegroups.com <confluent...@googlegroups.com> on behalf of Samuel Roux <rouxs...@gmail.com>
Sent: Thursday, December 8, 2016 5:34:15 PM
To: Confluent Platform
Subject: Re: Avro Serialization - Error deserializing Avro message for id -1 | Unkow magic byte!
 
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/50aa2725-e0e5-4b79-a8a4-d1cd54c5546e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

“This e-mail message may contain confidential, commercial or privileged information that constitutes proprietary information of Xura, Inc. or its subsidiaries. If you are not the intended recipient of this message, you are hereby notified that any review, use or distribution of this information is absolutely prohibited and we request that you delete all copies and contact us by e-mailing to: secu...@xura.com. Thank You.”

Samuel Roux

unread,
Dec 9, 2016, 5:42:53 AM12/9/16
to confluent...@googlegroups.com
Hi,

Thank for your answer, that's seem to work for me.

Anyway, I've got another issue that I think is due to my kafka-stream app. Both of the key and the value tried to use the same schema in the schema-registry.

Do you guys got any idea why ?

Regards,

Samuel

2016-12-09 11:37 GMT+01:00 Jozef Koval <jozef...@xura.com>:

Hi guys,


My guess is that you use different de/serializers for key and value. When you read topic with kafka-avro-console-consumer, it implicitly uses avro de/serializers. But you probably use String serializer for key, so when you call it with print.key=true, it tries to deserialize string value into avro, which obviously is not possible.


I could tell more if you share config file for jdbc connector, specifically key and value de/serializers.


Jozef


To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.

“This e-mail message may contain confidential, commercial or privileged information that constitutes proprietary information of Xura, Inc. or its subsidiaries. If you are not the intended recipient of this message, you are hereby notified that any review, use or distribution of this information is absolutely prohibited and we request that you delete all copies and contact us by e-mailing to: secu...@xura.com. Thank You.”

--
You received this message because you are subscribed to a topic in the Google Groups "Confluent Platform" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/confluent-platform/ZmDYh-Yd9Dg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to confluent-platform+unsub...@googlegroups.com.
To post to this group, send email to confluent-platform@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/SN1PR19MB0527ED09C773C22BCE0811CC86870%40SN1PR19MB0527.namprd19.prod.outlook.com.

Gabriel Queiroz

unread,
Dec 9, 2016, 8:27:22 AM12/9/16
to Confluent Platform
Hi Jozef,

Thank you very much for your answer!

Indeed I was using different serializers for the key and the value (String Serializer for the key, and SpecificAvroSerializer for the value). I could make it work when I pass the key 'null', so the consumer doesn't try to deserialize my key (The Sink Connect was able to create the table and insert the values from the topic).

But when I try to use avro serializer for the key (using a specific schema for the key) both key and value tries to use the same schema on schema-registry. I think that is exactly the same problem that Samuel talked about.

This is how I am trying to use Avro Serializer for the key. My balance processor just transform the GSON Object into an Avro Object.
        
GsonDeserializer<BalanceDebezium> debeziumDeserializer = new GsonDeserializer<>(BalanceDebezium.class);

CachedSchemaRegistryClient client = new CachedSchemaRegistryClient("http://localhost:8081", 100);
SpecificAvroSerializer<Balance> balanceSerializer = new SpecificAvroSerializer<>(client);
SpecificAvroSerializer<Key> keySerializer = new SpecificAvroSerializer<>(client);


TopologyBuilder builder = new TopologyBuilder();


builder
.addSource("BalanceSource", String().deserializer(), debeziumDeserializer, TOPIC_DEBEZIUM_BALANCE)

       
.addProcessor("BalanceProcessor", () -> balanceProcessor, "BalanceSource")

       
.addSink("BalanceSink", TOPIC_BALANCE, keySerializer, balanceSerializer, "BalanceProcessor");

I am using JDBC Sink Connect, and I didn't configure any specific configuration for key and value de/serializers:

name=test-sink
connector
.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks
.max=1


connection
.url=jdbc:mysql://localhost:3306/sink
connection
.user=root
connection
.password=root


topics
=balance


table
.name.format=balance
pk
.mode=record_value
pk
.fields=accountId
insert
.mode=upsert
auto.create=true

How should I create the key when producing Avro Messages? I can share others configurations files if you want to.

Thank you very much for your help!

Have a great day,
Gabriel Queiroz.

Jozef Koval

unread,
Dec 9, 2016, 8:55:03 AM12/9/16
to Confluent Platform

Hi Gabriel,


Are you sure you serialize keys in avro format to kafka topic? If yes, you should read it with kafka-avro-console-consumer and property print.key=true. 

If you cannot read it, you serialize it with different serializer and you need to specify it in you config file for schema-registry (not jdbc, sorry for confusion), see etc/schema-registry-standalone.properties for inspiration, look for key.converter.


Jozef


From: confluent...@googlegroups.com <confluent...@googlegroups.com> on behalf of Gabriel Queiroz <gabrielque...@gmail.com>
Sent: Friday, December 9, 2016 2:27:22 PM

To: Confluent Platform
Subject: Re: Avro Serialization - Error deserializing Avro message for id -1 | Unkow magic byte!
--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/8d0b4f65-97a7-438e-93ae-761a6738f717%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Gabriel Queiroz

unread,
Dec 9, 2016, 10:55:14 AM12/9/16
to Confluent Platform, jozef...@xura.com
HI Jozef,

Thank you very much, it worked now. I checked my connect-avro-standalone.properties and I was using an Avro Key Converter. I don't necessary need to use Avro on key to make it work with JDBC Sink Connect, so I changed to String Converter and it worked!

If anyone else just want to use String as a Key, this worked for me:
key.converter=org.apache.kafka.connect.storage.StringConverter

The key.converter was using Avro so... how should I serialize a key in Avro if I need to? Should I created a schema for the key and a different schema for the value, passing Specific Serializers for each one?

Thank you very much for your help! Have a great day!
Gabriel Queiroz.

Reply all
Reply to author
Forward
0 new messages