Hi,
I'm very new to kafka and debezium.
I'm following this tutorial
https://github.com/mtpatter/postgres-kafka-demoI set up the postgres db, added two tables and copied some values.
I added the ksql streams and tables and the source and sink connectors to postgres
After adding more data in the source table, the streaming stops and by checking the Connect log I see this error:
ERROR Postgres|dbserver1|records-stream-producer unexpected exception while streaming logical changes [io.debezium.connector.postgresql.RecordsStreamProducer]
java.lang.IllegalArgumentException: Invalid identifier:
at io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:68)
at io.debezium.text.TokenStream.start(TokenStream.java:445)
at io.debezium.relational.TableIdParser.parse(TableIdParser.java:28)
.... ERROR || WorkerSourceTask{id=postgres-source-0} Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
If I check the Connector status via the REST API, I see that the related task status is "FAILED" and there is no way to restart it.
{
"name": "postgres-source",
"connector": {
"state": "RUNNING",
"worker_id": "connect:8083"
},
"tasks": [
{
"id": 0,
"state": "FAILED",
"worker_id": "connect:8083",
"trace": "org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.\n\tat io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)\n\tat io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:156)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.IllegalArgumentException: Invalid identifier: \n\tat io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:68)\n\tat io.debezium.text.TokenStream.start(TokenStream.java:445)\n\tat io.debezium.relational.TableIdParser.parse(TableIdParser.java:28)\n\tat io.debezium.relational.TableId.parse(TableId.java:39)\n\tat io.debezium.connector.postgresql.PostgresSchema.parse(PostgresSchema.java:218)\n\tat io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:238)\n\tat io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:131)\n\tat io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder.processMessage(PgProtoMessageDecoder.java:48)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:265)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:250)\n\tat io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:131)\n\tat io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:117)\n\t... 5 more\n"
}
],
"type": "source"
}
I searched a lot but I cannot understand neither what makes the task fail and why I cannot restart the failing task.
Thanks