Hi,
I have tried the SMT feature and I am partly successful, and thanks to you guys but I still have some issues which I would like to understand what is wrong with the configuration.
I am able to stream the change event for only update and create and i was not able to stream the delete changes with the SMT i used, where i have not used tombstones, here i was even able to insert the timestamp and the connector works fine only for update and create.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" datanode26-htrunk:8083/connectors/ -d '{ "name": "hdfs-sink4", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "1", "topics": "dbserver1.testdb.persons2", "hdfs.url": "hdfs://
172.16.87.40:8020", "flush.size": "1", "name": "hdfs-sink4", "transforms": "unwrap,insertTS", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "value.converter.schemas.enable": "false", "auto.create": "true", "auto.evolve": "true", "transforms.insertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.insertTS.timestamp.field": "Time"} }'
I have used the following sink configuration this time.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" datanode26-htrunk:8083/connectors/ -d '{ "name": "hdfs-sink5", "config": { "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "tasks.max": "1", "topics": "dbserver1.testdb.persons3", "hdfs.url": "hdfs://
172.16.87.40:8020", "flush.size": "1", "name": "hdfs-sink5", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.add.fields": "op,table,lsn", "transforms.unwrap.add.headers": "db", "transforms.unwrap.delete.handling.mode": "rewrite"} }'
And the status gives the below error:-
{"id":0,"state":"FAILED","worker_id":"
172.23.26.69:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:196)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:122)\n\tat org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:519)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:472)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.IllegalArgumentException: Unexpected field name: lsn\n\tat io.debezium.transforms.ExtractNewRecordState$FieldReference.getSchema(ExtractNewRecordState.java:383)\n\tat io.debezium.transforms.ExtractNewRecordState.updateSchema(ExtractNewRecordState.java:268)\n\tat io.debezium.transforms.ExtractNewRecordState.makeUpdatedSchema(ExtractNewRecordState.java:261)\n\tat io.debezium.transforms.ExtractNewRecordState.lambda$addFields$2(ExtractNewRecordState.java:229)\n\tat java.util.concurrent.ConcurrentMap.computeIfAbsent(ConcurrentMap.java:324)\n\tat io.debezium.transforms.ExtractNewRecordState.addFields(ExtractNewRecordState.java:228)\n\tat io.debezium.transforms.ExtractNewRecordState.apply(ExtractNewRecordState.java:177)\n\tat org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:146)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:180)\n\t... 14 more\n"}
Please let me know if anything I missed.
Regards,
Ayan