please help "serialization format issue"

1,028 views
Skip to first unread message

Max Li

unread,
Apr 24, 2019, 1:50:22 PM4/24/19
to debezium
source DB: mongodb
kafka: confluent 5.12
target: elastcisearch

from mongodb to kafka, it is working well


[oracle@houbiap5 confluent5]$ bin/kafka-console-consumer --bootstrap-server houbiap5:9092 --topic customers --from-beginning
{"after":null,"patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119323,"ord":1,"h":1255747003695718904,"initsync":false},"op":"d","ts_ms":1556119323326}
null
{"after":null,"patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119323,"ord":2,"h":-3344312220964857180,"initsync":false},"op":"d","ts_ms":1556119323328}
null
{"after":null,"patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119323,"ord":3,"h":8650281562576212760,"initsync":false},"op":"d","ts_ms":1556119323328}
null
{"after":null,"patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119323,"ord":4,"h":7307862537196435761,"initsync":false},"op":"d","ts_ms":1556119323328}
null
{"after":null,"patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119323,"ord":5,"h":7141663475881294335,"initsync":false},"op":"d","ts_ms":1556119323328}
null
{"after":"{\"_id\" : {\"$numberLong\" : \"1001\"},\"first_name\" : \"Sally\",\"last_name\" : \"Thomas\",\"email\" : \"sally....@acme.com\"}","patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119356,"ord":1,"h":-8203394498065317421,"initsync":false},"op":"c","ts_ms":1556119356396}
{"after":"{\"_id\" : {\"$numberLong\" : \"1002\"},\"first_name\" : \"George\",\"last_name\" : \"Bailey\",\"email\" : \"gba...@foobar.com\"}","patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119356,"ord":2,"h":-6732911301588244805,"initsync":false},"op":"c","ts_ms":1556119356396}
{"after":"{\"_id\" : {\"$numberLong\" : \"1003\"},\"first_name\" : \"Edward\",\"last_name\" : \"Walker\",\"email\" : \"e...@walker.com\"}","patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119356,"ord":3,"h":5731956300969233689,"initsync":false},"op":"c","ts_ms":1556119356396}
{"after":"{\"_id\" : {\"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"an...@noanswer.org\"}","patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119356,"ord":4,"h":3324121584721164025,"initsync":false},"op":"c","ts_ms":1556119356396}

I am using avroConverter in mongo-source.json
 "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://houbiap5:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://houbiap5:8081",

so the data should be AVRO format in kafka

[oracle@houbiap5 dba]$ cat mongo-es-sink.json
{
    "name": "mongo-es-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "tasks.max": "1",
        "topics": "customers",

        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://houbiap5:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://houbiap5:8081",

        "connection.url": "http://houtestdb5:9200",

        "key.ignore": "true",
        "type.name": "kafka-connect"
    }
}


 now when I launch elasticsearch , i got the following errors

[oracle@houbiap5 dba]$ curl -s http://houbiap5:8083/connectors/mongo-es-sink/status |jq
{
  "name": "mongo-es-sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.154.54.83:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "10.154.54.83:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic customers to Avro: \n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"
    }
  ],
  "type": "sink"
}

Max Li

unread,
Apr 24, 2019, 2:23:40 PM4/24/19
to debezium
try many ways, removed  avro-converter in mongo-es-sink.json by replacing string/json converter, it still got same error info


Max Li

unread,
Apr 24, 2019, 2:26:28 PM4/24/19
to debezium
follow this link https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

The solution is to check the source topic’s serialization format, and either switch Kafka Connect’s sink connector to use the correct converter, or switch the upstream format to Avro (which is a good idea). If the upstream topic is populated by Kafka Connect, then you can configure the source connector’s converter as follows:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

it doesn't work

Jiri Pechanec

unread,
Apr 25, 2019, 11:52:33 PM4/25/19
to debezium
What's the config of the ES sink? Also is not Avro enabled by default by the Connect instance you are using?

J.


On Wednesday, April 24, 2019 at 8:26:28 PM UTC+2, Max Li wrote:

The solution is to check the source topic’s serialization format, and either switch Kafka Connect’s sink connector to use the correct converter, or switch the upstream format to Avro (which is a good idea). If the upstream topic is populated by Kafka Connect, then you can configure the source connector’s converter as follows:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",

it doesn't work

On Wednesday, April 24, 2019 at 12:50:22 PM UTC-5, Max Li wrote:
source DB: mongodb
kafka: confluent 5.12
target: elastcisearch

from mongodb to kafka, it is working well


[oracle@houbiap5 confluent5]$ bin/kafka-console-consumer --bootstrap-server houbiap5:9092 --topic customers --from-beginning
{"after":null,"patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119323,"ord":1,"h":1255747003695718904,"initsync":false},"op":"d","ts_ms":1556119323326}
null
{"after":null,"patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119323,"ord":2,"h":-3344312220964857180,"initsync":false},"op":"d","ts_ms":1556119323328}
null
{"after":null,"patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119323,"ord":3,"h":8650281562576212760,"initsync":false},"op":"d","ts_ms":1556119323328}
null
{"after":null,"patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119323,"ord":4,"h":7307862537196435761,"initsync":false},"op":"d","ts_ms":1556119323328}
null
{"after":null,"patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119323,"ord":5,"h":7141663475881294335,"initsync":false},"op":"d","ts_ms":1556119323328}
null
{"after":"{\"_id\" : {\"$numberLong\" : \"1001\"},\"first_name\" : \"Sally\",\"last_name\" : \"Thomas\",\"email\" : \"sally...@acme.com\"}","patch":null,"source":{"version":"0.9.4.Final","connector":"mongodb","name":"mongodbTest","rs":"mongo01","ns":"inventory.customers","sec":1556119356,"ord":1,"h":-8203394498065317421,"initsync":false},"op":"c","ts_ms":1556119356396}
Reply all
Reply to author
Forward
0 new messages