ERROR records-stream-producer -exception while streaming logical changes

350 views
Skip to first unread message

Paolo Importuni

unread,
Nov 6, 2023, 8:43:59 AM11/6/23
to debezium
Hi,
I'm very new to kafka and debezium.
I'm following  this tutorial
https://github.com/mtpatter/postgres-kafka-demo
I set up the postgres db, added two tables and copied some values. 
I added the ksql streams and tables and the source and sink connectors to postgres

After adding more data in the source table, the streaming stops and by checking the Connect log I see this error:


 ERROR  Postgres|dbserver1|records-stream-producer  unexpected exception while streaming logical changes   [io.debezium.connector.postgresql.RecordsStreamProducer]
java.lang.IllegalArgumentException: Invalid identifier:
        at io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:68)
        at io.debezium.text.TokenStream.start(TokenStream.java:445)
        at io.debezium.relational.TableIdParser.parse(TableIdParser.java:28)
....

 ERROR  ||  WorkerSourceTask{id=postgres-source-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]

If I check the Connector status via the REST API, I see that the related task status is "FAILED"  and there is no way to restart it.

{
    "name": "postgres-source",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "connect:8083",
            "trace": "org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.\n\tat io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)\n\tat io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)\n\tat io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:156)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.IllegalArgumentException: Invalid identifier: \n\tat io.debezium.relational.TableIdParser$TableIdTokenizer.tokenize(TableIdParser.java:68)\n\tat io.debezium.text.TokenStream.start(TokenStream.java:445)\n\tat io.debezium.relational.TableIdParser.parse(TableIdParser.java:28)\n\tat io.debezium.relational.TableId.parse(TableId.java:39)\n\tat io.debezium.connector.postgresql.PostgresSchema.parse(PostgresSchema.java:218)\n\tat io.debezium.connector.postgresql.RecordsStreamProducer.process(RecordsStreamProducer.java:238)\n\tat io.debezium.connector.postgresql.RecordsStreamProducer.lambda$streamChanges$1(RecordsStreamProducer.java:131)\n\tat io.debezium.connector.postgresql.connection.pgproto.PgProtoMessageDecoder.processMessage(PgProtoMessageDecoder.java:48)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:265)\n\tat io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:250)\n\tat io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:131)\n\tat io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:117)\n\t... 5 more\n"
        }
    ],
    "type": "source"
} I searched a lot but I cannot understand neither what makes the task fail and why I cannot restart the failing task.
Thanks





Chris Cranford

unread,
Nov 6, 2023, 11:12:48 AM11/6/23
to debe...@googlegroups.com
Hi Paolo -

That's a very strange error.  Can you please tell us what version of PostgreSQL are you connecting to, the version of Debezium you're using, and your connector configuration? 

Thanks,
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/dbea62c0-3cf1-4605-ab13-7afcc59bd2acn%40googlegroups.com.

Paolo Importuni

unread,
Nov 6, 2023, 2:16:28 PM11/6/23
to debezium
Hi Chris, 

postgres version is:

postgres=# select version();
    110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit
                                                        version
-------------------------------------------------------------------------------------------------------------------------------
 PostgreSQL 11.21 (Debian 11.21-1.pgdg110+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit



I do not know exactly  which command I can use to get the current Debezium version but, by checking the CHANGELOG file in debezium image I see :
## 0.9.0.Final
January 5th, 2019 [Detailed release notes](https://issues.jboss.org/secure/ReleaseNote.jspa?projectId=12317320&version=12340275)


My connector config is:

"name": "postgres-source",
  "config": {"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max":"1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "students",
    "database.server.name": "dbserver1",
    "database.whitelist": "students",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.students",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "true",
    "value.converter.schema.registry.url": "http://schema-registry:8082",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
  }
}

Thanks for yr. help
Paolo

jiri.p...@gmail.com

unread,
Nov 7, 2023, 12:18:24 AM11/7/23
to debezium
Are you following the demo in sense that if I try it I'll see the same behaviour or do you use your own setup?

Also I'd definitely recommend to move away from Debezium 0.10 that is in the demo. It is kind of stone age now. Please drop database.whitelist config option and replace it with schema.include.list and set it to the schema with the tables you want to capture.

J.

Paolo Importuni

unread,
Nov 7, 2023, 2:56:11 AM11/7/23
to debezium
Hi J.

Are you following the demo in sense that if I try it I'll see the same behaviour or do you use your own setup?

Yes I'm following step by step. I only changed a couple of server ports in order to avoid conflicts and added the ksql client image.  I attach my .yml compose file and postgres.conf 

Also I'd definitely recommend to move away from Debezium 0.10 that is in the demo. It is kind of stone age now.

OK, how do I get rid of that ? Can I simply replace the mpattern/debezium-connect image with a docker "debezium/connect" one ?
 
Please drop database.whitelist config option and replace it with schema.include.list and set it to the schema with the tables you want to capture.

Ok I did it, but the same error is thrown.
 
J.

Thanks
Paolo

 
docker-compose.yml
postgresql.conf

jiri.p...@gmail.com

unread,
Nov 7, 2023, 5:57:38 AM11/7/23
to debezium

Paolo Importuni

unread,
Nov 7, 2023, 12:10:41 PM11/7/23
to debezium
Hi,


I changed my docker as follows:

#FROM debezium/connect:0.10
FROM debezium/connect-base:1.5

# https://github.com/debezium/debezium-examples/blob/master/unwrap-smt/debezium-jdbc-es/Dockerfile

ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc \
    KAFKA_CONNECT_ES_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-elasticsearch


# Deploy PostgreSQL JDBC Driver
#RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-42.1.4.jar
RUN cd /kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-42.6.0.jar

# Deploy Kafka Connect JDBC
RUN mkdir $KAFKA_CONNECT_JDBC_DIR && cd $KAFKA_CONNECT_JDBC_DIR &&\
curl -sO http://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/5.1.2/kafka-connect-jdbc-5.1.2.jar
 

and built the image again, but nothing changed, the error remains.

ty
Paolo
Reply all
Reply to author
Forward
0 new messages