I use the Kafka Connect Mongodb(
https://github.com/DataReply/kafka-connect-mongodb).
I start schema-registry and connect source and sink is oK.
I want send massage to kafka by my produce and store data to mongodb by connect sink.
But When I send massage to kafka by my produce,the connect throw the ERROR.
Can you please help me resolve this issue ?
ERROR Thread WorkerSinkTask-mongodb-sink-connector-0 exiting with uncaught exception: (org.apache.kafka.connect.util.ShutdownableThread:84)
org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Exception in thread "WorkerSinkTask-mongodb-sink-connector-0" org.apache.kafka.connect.errors.DataException: Failed to deserialize data to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:175)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.iteration(WorkerSinkTaskThread.java:90)
at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:58)
at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
This is my produce.
void producer() {
Properties props = new Properties();
props.put("serializer.class", "io.confluent.kafka.serializers.KafkaAvroEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
Producer producer = new Producer<String, Object>(new ProducerConfig(props));
String key = "key1";
String userSchema = "{\"type\":\"record\"," +
"\"name\":\"myrecorda\"," +
"\"fields\":[{\"name\":\"f1a\",\"type\":\"string\"}]}";
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(userSchema);
GenericRecord avroRecord = new GenericData.Record(schema);
avroRecord.put("f1a", "aaaaavalue1");
KeyedMessage<String, Object> message = new KeyedMessage<String, Object>("ronnie", key, avroRecord);
producer.send(message);
System.out.println("ProducerRecord============= : " +message.toString());
}