MYSQL to HIVE

214 views
Skip to first unread message

Ayan Mukhuty

unread,
Mar 18, 2021, 1:13:14 AM3/18/21
to debezium
Hi Everyone,

I have recently created a pipeline for CDC using Debezium MySQL connector and HDFS 2.0 sink connector.

Change data flows from MySQL to Kafka and then to Hadoop hdfs, and at the top of the hdfs data I am creating a external hive table., finally I see the Hive table is of struct(complex) type where it has before, after etc. fields.

Can anyone help me to identify if there is any possibilities to change the complex table in hive to a normal table. 

Regards,
Ayan

jiri.p...@gmail.com

unread,
Mar 18, 2021, 1:18:15 AM3/18/21
to debezium

Ayan Mukhuty

unread,
Mar 19, 2021, 3:11:34 AM3/19/21
to debezium
I have gone through this, but i still have many doubts, is there any kind of document which i can refer for the transform of the struct datatypes to a normal table values.

Regards,
Ayan

Gunnar Morling

unread,
Mar 19, 2021, 3:33:47 AM3/19/21
to debe...@googlegroups.com
The documentation Jiri pointed to is pretty comprehensive, actually. Have you tried configuring this SMT? If so, what's the observed behavior vs. what you expected?

--Gunnar


--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/eeb31a7d-e961-4dc2-a63d-f4850de8ab7an%40googlegroups.com.

Ayan Mukhuty

unread,
Mar 22, 2021, 2:32:37 AM3/22/21
to debezium
Hi,

I have tried the SMT feature and I am partly successful, and thanks to you guys but I still have some issues which I would like to understand what is wrong with the configuration.

I am able to stream the change event for only update and create and i was not able to stream the delete changes with the SMT i used, where i have not used tombstones, here i was even able to insert the timestamp and the connector works fine only for update and create. 

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" datanode26-htrunk:8083/connectors/ -d '{ "name": "hdfs-sink4", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "1", "topics": "dbserver1.testdb.persons2", "hdfs.url": "hdfs://172.16.87.40:8020", "flush.size": "1", "name": "hdfs-sink4", "transforms": "unwrap,insertTS", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",  "value.converter.schemas.enable": "false", "auto.create": "true", "auto.evolve": "true",  "transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.insertTS.timestamp.field": "Time"} }'

Now I tried following the documentation for delete "https://debezium.io/documentation/reference/1.2/configuration/event-flattening.html#extract-new-record-state-delete-handling-mode" and I am not able to configure the sink connector and the status seems to be degraded. 

I have used the following sink configuration this time.

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" datanode26-htrunk:8083/connectors/ -d '{ "name": "hdfs-sink5", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "1", "topics": "dbserver1.testdb.persons3", "hdfs.url": "hdfs://172.16.87.40:8020", "flush.size": "1", "name": "hdfs-sink5", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.add.fields": "op,table,lsn", "transforms.unwrap.add.headers": "db", "transforms.unwrap.delete.handling.mode": "rewrite"} }'

And the status gives the below error:-

{"id":0,"state":"FAILED","worker_id":"172.23.26.69:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)\n\tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:519)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.IllegalArgumentException: Unexpected field name: lsn\n\tat io.debezium.transforms.ExtractNewRecordState$FieldReference.getSchema(ExtractNewRecordState.java:383)\n\tat io.debezium.transforms.ExtractNewRecordState.updateSchema(ExtractNewRecordState.java:268)\n\tat io.debezium.transforms.ExtractNewRecordState.makeUpdatedSchema(ExtractNewRecordState.java:261)\n\tat io.debezium.transforms.ExtractNewRecordState.lambda$addFields$2(ExtractNewRecordState.java:229)\n\tat java.util.concurrent.ConcurrentMap.computeIfAbsent(ConcurrentMap.java:324)\n\tat io.debezium.transforms.ExtractNewRecordState.addFields(ExtractNewRecordState.java:228)\n\tat io.debezium.transforms.ExtractNewRecordState.apply(ExtractNewRecordState.java:177)\n\tat org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)\n\t... 14 more\n"}


Please let me know if anything I missed.

Regards,
Ayan

jiri.p...@gmail.com

unread,
Mar 22, 2021, 3:35:38 AM3/22/21
to debezium
Hi,

LSN is for PostgreSQL, MySQL does not have it. Please see `transforms.unwrap.add.fields` config. You should use MySQL-related fields only.

J.

Ayan Mukhuty

unread,
Mar 22, 2021, 4:13:52 AM3/22/21
to debezium
Thanks a lot, and by removing lsn i was able to register the connector and the update and create event was working fine until i pushed a delete record in the source table and the status went to degraded.


Please find the details, any idea why?

]# curl -H "Accept:application/json" datanode26-htrunk:8083/connectors/hdfs-sink5/tasks/0/status
{"id":0,"state":"FAILED","worker_id":"172.23.26.69:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.RuntimeException: org.apache.kafka.connect.errors.SchemaProjectorException: Switch between schema-based and schema-less data is not supported\n\tat io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:406)\n\tat io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:386)\n\tat io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:124)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)\n\t... 10 more\nCaused by: org.apache.kafka.connect.errors.SchemaProjectorException: Switch between schema-based and schema-less data is not supported\n\tat io.confluent.connect.storage.schema.StorageSchemaCompatibility.validateAndCheck(StorageSchemaCompatibility.java:150)\n\tat io.confluent.connect.storage.schema.StorageSchemaCompatibility.shouldChangeSchema(StorageSchemaCompatibility.java:320)\n\tat io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:360)\n\t... 13 more\n"}

Regards,
Ayan

jiri.p...@gmail.com

unread,
Mar 22, 2021, 4:35:23 AM3/22/21
to debezium
Hi,

I'd say this is a bug/issue in the connector itself, please see https://github.com/confluentinc/kafka-connect-hdfs/issues/239 so you must drop the tombstones.

Ayan Mukhuty

unread,
Mar 22, 2021, 5:42:11 AM3/22/21
to debezium
Can you tell me what do i need to add in the connector config so that the tombstones gets dropped, i am not able to do anything like that.

jiri.p...@gmail.com

unread,
Mar 22, 2021, 6:00:11 AM3/22/21
to debezium
That's transformation not connector config option and you are intentionally keeping the tombstones present - see "transforms.unwrap.drop.tombstones": "false"

J.

Ayan Mukhuty

unread,
Mar 24, 2021, 2:10:17 AM3/24/21
to debezium
Thanks a lot for the solution..

I have deployed the connectors as it was tested, for one of our production mysql and after sometime i am getting the below errors.
curl -H "Accept:application/json" 172.23.26.69:8083/connectors/exact-test/status
{"name":"exact-test","connector":{"state":"RUNNING","worker_id":"172.23.26.69:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"172.23.26.69:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:282)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:336)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:264)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1522947 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n"}],"type":"source"}

Could you please help me, how can i solve this error.

Regards,
Ayan 

jiri.p...@gmail.com

unread,
Mar 24, 2021, 2:50:19 AM3/24/21
to debezium
Hi,

please do exactly as you are hinted in the error message. Either increase `max.request.size` at the broker level or configure it in Kafka Connect woker.

J.

Ayan Mukhuty

unread,
Apr 16, 2021, 8:06:20 AM4/16/21
to debezium
Hi,

Since i have registered the source connector using the REST "POST" call which file should i consider as worker file and hence change the default value of max.request.size?

Regards,
Ayan  

jiri.p...@gmail.com

unread,
Apr 18, 2021, 11:51:22 PM4/18/21
to debezium
Hi,

please check connect-distributed.properties file in Kafka Connect conf directory.

J.

Reply all
Reply to author
Forward
0 new messages