Failed to deserialize data to Avro

2,060 views
Skip to first unread message

RonnieXie

unread,
May 26, 2016, 10:25:54 PM5/26/16
to Confluent Platform
    I use the Kafka Connect Mongodb(https://github.com/DataReply/kafka-connect-mongodb).
    I start schema-registry and connect source and sink is oK.
    I want send massage to kafka by my produce and store data to mongodb by connect sink.
    But When I send massage to kafka by my produce,the connect throw  the ERROR.
    Can you please help me resolve this issue ?
    
    ERROR Thread WorkerSinkTask-mongodb-sink-connector-0 exiting with uncaught exception:  (org.apache.kafka.connect.util.ShutdownableThread:84)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: 
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Exception in thread "WorkerSinkTask-mongodb-sink-connector-0" org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro: 
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

       This is my produce.
       void producer()  {
Properties props = new Properties();
props.put("serializer.class", "io.confluent.kafka.serializers.KafkaAvroEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "172.16.70.162:9092");
props.put("schema.registry.url", "http://172.16.70.162:8081");

Producer producer = new Producer<String, Object>(new ProducerConfig(props));
String key = "key1";
String userSchema = "{\"type\":\"record\"," +
                   "\"name\":\"myrecorda\"," +
                   "\"fields\":[{\"name\":\"f1a\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1a", "aaaaavalue1");

KeyedMessage<String, Object> message = new KeyedMessage<String, Object>("ronnie", key, avroRecord);
producer.send(message);
System.out.println("ProducerRecord============= : " +message.toString());
}

Ewen Cheslack-Postava

unread,
May 27, 2016, 12:15:26 AM5/27/16
to Confluent Platform
It looks like there's some invalid data in the topic. Is it possible that during previous testing you published data in some other format into the topic?

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/ef77ab20-e4de-46e1-b86d-760fb5505193%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

RonnieXie

unread,
May 27, 2016, 1:16:13 AM5/27/16
to Confluent Platform
The Kafka Connect Mongodb maybe have oneself format.
The owner of Kafka Connect Mongodb in github said that For every message, a SourceRecord is created, having the following schema:
But I don't know create same format.
{
  "type": "record",
  "name": "schemaname",
  "fields": [
    {
      "name": "timestamp",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "name": "order",
      "type": [
        "null",
        "int"
      ]
    },
    {
      "name": "operation",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "database",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "object",
      "type": [
        "null",
        "string"
      ]
    }
  ],
  "connect.name": "stillmongotesting"
}

在 2016年5月27日星期五 UTC+8下午12:15:26,Ewen Cheslack-Postava写道:
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Thanks,
Ewen
Reply all
Reply to author
Forward
0 new messages