Debezium JDBC sink connector appending 'PUBLIC.' for target oracle tables

715 views
Skip to first unread message

Hrishikesh Dutta Gupta

unread,
Feb 27, 2024, 12:06:41 AM2/27/24
to debezium
Hi, 

I am trying to push my source postgres cdc changes to target oracle(11G) tables through kafka and that's why using debezium source connector(docker image debezium/connect:1.8) as a source connector and debezium  jdbc sink connector as a sink connector(have included all the required jars) along with kafka broker., I have checked that the source connector has worked fine and also see the topic 'idcstate' getting created which contains the debezium records too.
Now, I am using the below source and sink connector configurations, my issue is why the debezium sink connector is throwing the exception below , its not inserting any record, why is it appending 'PUBLIC.'  before the table name 'idcstate' in the merge query? I have already provided the required connection.username and connection.password in the sink conenctor configuration. Can anyone  please me help with this issue ? Let me know if you need any more inputs.

Source Conenctor:

"name": "source-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "****",
    "database.port": "***",
    "database.user": "***",
    "database.password": "***",
    "database.dbname": "****",
    "database.server.name": "****",
    "table.include.list": "prodfxiapp1.idcstate",
    "transforms": "topicRename",
    "transforms.topicRename.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.topicRename.regex": ".*\\.(.*)",
    "transforms.topicRename.replacement": "$1"
  }

Sink Connector:

"name": "sink-connector",
  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:oracle:thin:@***1521/***",
    "connection.username": "DW_STAGE",
    "connection.password": "***",
    "topics": "idcstate",
    "schema.evolution": "basic",
    "quote.identifiers": "false",
    "insert.mode": "upsert",
    "primary.key.mode": "record_key",
    "primary.key.fields": "id"
  }


Exception in the connector logs:

2024-02-27 03:52:32,624 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Subscribed to topic(s): idcstate [org.apache.kafka.clients.consumer.KafkaConsumer]
2024-02-27 03:52:32,640 INFO || Starting JdbcSinkConnectorConfig with configuration: [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,641 INFO || connector.class = io.debezium.connector.jdbc.JdbcSinkConnector [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,641 INFO || connection.password = ******** [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,642 INFO || primary.key.mode = record_key [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,642 INFO || tasks.max = 1 [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,642 INFO || topics = idcstate [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,642 INFO || connection.username = DW_STAGE [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,642 INFO || quote.identifiers = false [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,642 INFO || schema.evolution = basic [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,642 INFO || task.class = io.debezium.connector.jdbc.JdbcSinkConnectorTask [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,642 INFO || name = sink-connector [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,642 INFO || primary.key.fields = id [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,642 INFO || connection.url = jdbc:oracle:thin:@*****:1521/*** [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,642 INFO || insert.mode = upsert [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:32,772 INFO || HHH000412: Hibernate ORM core version 6.1.7.Final [org.hibernate.Version]
2024-02-27 03:52:33,217 INFO || HHH000130: Instantiating explicit connection provider: org.hibernate.c3p0.internal.C3P0ConnectionProvider [org.hibernate.engine.jdbc.connections.internal.ConnectionProviderInitiator]
2024-02-27 03:52:33,221 INFO || HHH010002: C3P0 using driver: null at URL: jdbc:oracle:thin:@****:1521/** [org.hibernate.orm.connections.pooling.c3p0]
2024-02-27 03:52:33,222 INFO || HHH10001001: Connection properties: {password=****, user=DW_STAGE} [org.hibernate.orm.connections.pooling.c3p0]
2024-02-27 03:52:33,222 INFO || HHH10001003: Autocommit mode: false [org.hibernate.orm.connections.pooling.c3p0]
2024-02-27 03:52:33,222 WARN || HHH10001006: No JDBC Driver class was specified by property hibernate.connection.driver_class [org.hibernate.orm.connections.pooling.c3p0]
2024-02-27 03:52:33,233 INFO || MLog clients using slf4j logging. [com.mchange.v2.log.MLog]
2024-02-27 03:52:33,277 INFO || Initializing c3p0-0.9.5.5 [built 11-December-2019 22:18:33 -0800; debug? true; trace: 10] [com.mchange.v2.c3p0.C3P0Registry]
2024-02-27 03:52:33,305 INFO || HHH10001007: JDBC isolation level: <unknown> [org.hibernate.orm.connections.pooling.c3p0]
2024-02-27 03:52:33,320 INFO || Initializing c3p0 pool... com.mchange.v2.c3p0.PoolBackedDataSource@94b1be16 [ connectionPoolDataSource -> com.mchange.v2.c3p0.WrapperConnectionPoolDataSource@5772b6e2 [ acquireIncrement -> 32, acquireRetryAttempts -> 30, acquireRetryDelay -> 1000, autoCommitOnClose -> false, automaticTestTable -> null, breakAfterAcquireFailure -> false, checkoutTimeout -> 0, connectionCustomizerClassName -> null, connectionTesterClassName -> com.mchange.v2.c3p0.impl.DefaultConnectionTester, contextClassLoaderSource -> caller, debugUnreturnedConnectionStackTraces -> false, factoryClassLocation -> null, forceIgnoreUnresolvedTransactions -> false, forceSynchronousCheckins -> false, identityToken -> 1bqvnr9b11skawvcsz1zer|2724045f, idleConnectionTestPeriod -> 0, initialPoolSize -> 5, maxAdministrativeTaskTime -> 0, maxConnectionAge -> 0, maxIdleTime -> 0, maxIdleTimeExcessConnections -> 0, maxPoolSize -> 32, maxStatements -> 0, maxStatementsPerConnection -> 0, minPoolSize -> 5, nestedDataSource -> com.mchange.v2.c3p0.DriverManagerDataSource@4a905b63 [ description -> null, driverClass -> null, factoryClassLocation -> null, forceUseNamedDriverClass -> false, identityToken -> 1bqvnr9b11skawvcsz1zer|7759bfd7, jdbcUrl -> jdbc:oracle:thin:@*****:1521/***, properties -> {password=******, user=******} ], preferredTestQuery -> null, privilegeSpawnedThreads -> false, propertyCycle -> 0, statementCacheNumDeferredCloseThreads -> 0, testConnectionOnCheckin -> false, testConnectionOnCheckout -> false, unreturnedConnectionTimeout -> 0, usesTraditionalReflectiveProxies -> false; userOverrides: {} ], dataSourceName -> null, extensions -> {}, factoryClassLocation -> null, identityToken -> 1bqvnr9b11skawvcsz1zer|58cc6ca9, numHelperThreads -> 3 ] [com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource]
2024-02-27 03:52:33,733 INFO || HHH000400: Using dialect: org.hibernate.dialect.OracleDialect [SQL dialect]
2024-02-27 03:52:34,479 INFO || HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform] [org.hibernate.engine.transaction.jta.platform.internal.JtaPlatformInitiator]
2024-02-27 03:52:34,501 INFO || Using dialect io.debezium.connector.jdbc.dialect.oracle.OracleDatabaseDialect [io.debezium.connector.jdbc.dialect.DatabaseDialectResolver]
2024-02-27 03:52:34,568 INFO || Database version 11.2.0 [io.debezium.connector.jdbc.JdbcChangeEventSink]
2024-02-27 03:52:34,568 INFO || WorkerSinkTask{id=sink-connector-0} Sink task finished initialization and start [org.apache.kafka.connect.runtime.WorkerSinkTask]
2024-02-27 03:52:34,574 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Cluster ID: 1sJWvb3HTVaL9hF5TdTmfQ [org.apache.kafka.clients.Metadata]
2024-02-27 03:52:34,575 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Discovered group coordinator kafka:29092 (id: 2147483646 rack: null) [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:52:34,576 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] (Re-)joining group [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:52:34,587 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Request joining group due to: need to re-join with the given member-id [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:52:34,587 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] (Re-)joining group [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:52:37,597 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Successfully joined group with generation Generation{generationId=1, memberId='connector-consumer-sink-connector-0-2c411905-a6c7-4ace-863b-efcefabda43d', protocol='range'} [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:52:37,603 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Finished assignment for group at generation 1: {connector-consumer-sink-connector-0-2c411905-a6c7-4ace-863b-efcefabda43d=Assignment(partitions=[idcstate-0])} [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:52:37,612 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Successfully synced group in generation Generation{generationId=1, memberId='connector-consumer-sink-connector-0-2c411905-a6c7-4ace-863b-efcefabda43d', protocol='range'} [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:52:37,612 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Notifying assignor about the new Assignment(partitions=[idcstate-0]) [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:52:37,612 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Adding newly assigned partitions: idcstate-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:52:37,630 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Found no committed offset for partition idcstate-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:52:37,636 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Resetting offset for partition idcstate-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:29092 (id: 1 rack: null)], epoch=0}}. [org.apache.kafka.clients.consumer.internals.SubscriptionState]
2024-02-27 03:52:38,766 WARN || SQL Error: 903, SQLState: 42000 [org.hibernate.engine.jdbc.spi.SqlExceptionHelper]
2024-02-27 03:52:38,766 ERROR || ORA-00903: invalid table name
[org.hibernate.engine.jdbc.spi.SqlExceptionHelper]
2024-02-27 03:52:38,780 ERROR || Failed to process record: Failed to process a sink record [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-27 03:52:43,466 INFO || WorkerSourceTask{id=source-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
2024-02-27 03:53:38,786 ERROR || WorkerSinkTask{id=sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure [org.apache.kafka.connect.runtime.WorkerSinkTask]
org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:78)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:71)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:87)
... 11 more
Caused by: jakarta.persistence.PersistenceException: Converting `org.hibernate.exception.SQLGrammarException` to JPA `PersistenceException` : JDBC exception executing SQL [MERGE INTO PUBLIC.IDCSTATE USING (SELECT ? id, ? shortname, ? longname, ? readingsname, ? type, ? version, ? description, ? namespaceid, ? status, ? statuslastmodified, ? displaykey, ? guid, ? clsfid, ? createddate, ? modifieddate, ? workflowstatemapid, ? lastmodifiedbyid FROM dual) INCOMING ON (PUBLIC.IDCSTATE.id=INCOMING.id) WHEN MATCHED THEN UPDATE SET PUBLIC.IDCSTATE.shortname=INCOMING.shortname,PUBLIC.IDCSTATE.longname=INCOMING.longname,PUBLIC.IDCSTATE.readingsname=INCOMING.readingsname,PUBLIC.IDCSTATE.type=INCOMING.type,PUBLIC.IDCSTATE.version=INCOMING.version,PUBLIC.IDCSTATE.description=INCOMING.description,PUBLIC.IDCSTATE.namespaceid=INCOMING.namespaceid,PUBLIC.IDCSTATE.status=INCOMING.status,PUBLIC.IDCSTATE.statuslastmodified=INCOMING.statuslastmodified,PUBLIC.IDCSTATE.displaykey=INCOMING.displaykey,PUBLIC.IDCSTATE.guid=INCOMING.guid,PUBLIC.IDCSTATE.clsfid=INCOMING.clsfid,PUBLIC.IDCSTATE.createddate=INCOMING.createddate,PUBLIC.IDCSTATE.modifieddate=INCOMING.modifieddate,PUBLIC.IDCSTATE.workflowstatemapid=INCOMING.workflowstatemapid,PUBLIC.IDCSTATE.lastmodifiedbyid=INCOMING.lastmodifiedbyid WHEN NOT MATCHED THEN INSERT (shortname,longname,readingsname,type,version,description,namespaceid,status,statuslastmodified,displaykey,guid,clsfid,createddate,modifieddate,workflowstatemapid,lastmodifiedbyid,id) VALUES (INCOMING.shortname,INCOMING.longname,INCOMING.readingsname,INCOMING.type,INCOMING.version,INCOMING.description,INCOMING.namespaceid,INCOMING.status,INCOMING.statuslastmodified,INCOMING.displaykey,INCOMING.guid,INCOMING.clsfid,INCOMING.createddate,INCOMING.modifieddate,INCOMING.workflowstatemapid,INCOMING.lastmodifiedbyid,INCOMING.id)]
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:165)
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:175)
at org.hibernate.query.spi.AbstractQuery.executeUpdate(AbstractQuery.java:654)
at io.debezium.connector.jdbc.JdbcChangeEventSink.writeUpsert(JdbcChangeEventSink.java:257)
at io.debezium.connector.jdbc.JdbcChangeEventSink.write(JdbcChangeEventSink.java:216)
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:68)
... 12 more
Caused by: org.hibernate.exception.SQLGrammarException: JDBC exception executing SQL [MERGE INTO PUBLIC.IDCSTATE USING (SELECT ? id, ? shortname, ? longname, ? readingsname, ? type, ? version, ? description, ? namespaceid, ? status, ? statuslastmodified, ? displaykey, ? guid, ? clsfid, ? createddate, ? modifieddate, ? workflowstatemapid, ? lastmodifiedbyid FROM dual) INCOMING ON (PUBLIC.IDCSTATE.id=INCOMING.id) WHEN MATCHED THEN UPDATE SET PUBLIC.IDCSTATE.shortname=INCOMING.shortname,PUBLIC.IDCSTATE.longname=INCOMING.longname,PUBLIC.IDCSTATE.readingsname=INCOMING.readingsname,PUBLIC.IDCSTATE.type=INCOMING.type,PUBLIC.IDCSTATE.version=INCOMING.version,PUBLIC.IDCSTATE.description=INCOMING.description,PUBLIC.IDCSTATE.namespaceid=INCOMING.namespaceid,PUBLIC.IDCSTATE.status=INCOMING.status,PUBLIC.IDCSTATE.statuslastmodified=INCOMING.statuslastmodified,PUBLIC.IDCSTATE.displaykey=INCOMING.displaykey,PUBLIC.IDCSTATE.guid=INCOMING.guid,PUBLIC.IDCSTATE.clsfid=INCOMING.clsfid,PUBLIC.IDCSTATE.createddate=INCOMING.createddate,PUBLIC.IDCSTATE.modifieddate=INCOMING.modifieddate,PUBLIC.IDCSTATE.workflowstatemapid=INCOMING.workflowstatemapid,PUBLIC.IDCSTATE.lastmodifiedbyid=INCOMING.lastmodifiedbyid WHEN NOT MATCHED THEN INSERT (shortname,longname,readingsname,type,version,description,namespaceid,status,statuslastmodified,displaykey,guid,clsfid,createddate,modifieddate,workflowstatemapid,lastmodifiedbyid,id) VALUES (INCOMING.shortname,INCOMING.longname,INCOMING.readingsname,INCOMING.type,INCOMING.version,INCOMING.description,INCOMING.namespaceid,INCOMING.status,INCOMING.statuslastmodified,INCOMING.displaykey,INCOMING.guid,INCOMING.clsfid,INCOMING.createddate,INCOMING.modifieddate,INCOMING.workflowstatemapid,INCOMING.lastmodifiedbyid,INCOMING.id)]
at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:64)
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:56)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:95)
at org.hibernate.sql.exec.internal.StandardJdbcMutationExecutor.execute(StandardJdbcMutationExecutor.java:97)
at org.hibernate.query.sql.internal.NativeNonSelectQueryPlanImpl.executeUpdate(NativeNonSelectQueryPlanImpl.java:78)
at org.hibernate.query.sql.internal.NativeQueryImpl.doExecuteUpdate(NativeQueryImpl.java:820)
at org.hibernate.query.spi.AbstractQuery.executeUpdate(AbstractQuery.java:643)
... 15 more
Caused by: java.sql.SQLSyntaxErrorException: ORA-00903: invalid table name

at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:509)
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:461)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1104)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:553)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:269)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:655)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:270)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:91)
at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:970)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1205)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3666)
at oracle.jdbc.driver.T4CPreparedStatement.executeInternal(T4CPreparedStatement.java:1426)
at oracle.jdbc.driver.OraclePreparedStatement.executeLargeUpdate(OraclePreparedStatement.java:3756)
at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3736)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1063)
at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.executeUpdate(NewProxyPreparedStatement.java:1502)
at org.hibernate.sql.exec.internal.StandardJdbcMutationExecutor.execute(StandardJdbcMutationExecutor.java:84)
... 18 more
Caused by: Error : 903, Position : 11, Sql = MERGE INTO PUBLIC.IDCSTATE USING (SELECT :1 id, :2 shortname, :3 longname, :4 readingsname, :5 type, :6 version, :7 description, :8 namespaceid, :9 status, :10 statuslastmodified, :11 displaykey, :12 guid, :13 clsfid, :14 createddate, :15 modifieddate, :16 workflowstatemapid, :17 lastmodifiedbyid FROM dual) INCOMING ON (PUBLIC.IDCSTATE.id=INCOMING.id) WHEN MATCHED THEN UPDATE SET PUBLIC.IDCSTATE.shortname=INCOMING.shortname,PUBLIC.IDCSTATE.longname=INCOMING.longname,PUBLIC.IDCSTATE.readingsname=INCOMING.readingsname,PUBLIC.IDCSTATE.type=INCOMING.type,PUBLIC.IDCSTATE.version=INCOMING.version,PUBLIC.IDCSTATE.description=INCOMING.description,PUBLIC.IDCSTATE.namespaceid=INCOMING.namespaceid,PUBLIC.IDCSTATE.status=INCOMING.status,PUBLIC.IDCSTATE.statuslastmodified=INCOMING.statuslastmodified,PUBLIC.IDCSTATE.displaykey=INCOMING.displaykey,PUBLIC.IDCSTATE.guid=INCOMING.guid,PUBLIC.IDCSTATE.clsfid=INCOMING.clsfid,PUBLIC.IDCSTATE.createddate=INCOMING.createddate,PUBLIC.IDCSTATE.modifieddate=INCOMING.modifieddate,PUBLIC.IDCSTATE.workflowstatemapid=INCOMING.workflowstatemapid,PUBLIC.IDCSTATE.lastmodifiedbyid=INCOMING.lastmodifiedbyid WHEN NOT MATCHED THEN INSERT (shortname,longname,readingsname,type,version,description,namespaceid,status,statuslastmodified,displaykey,guid,clsfid,createddate,modifieddate,workflowstatemapid,lastmodifiedbyid,id) VALUES (INCOMING.shortname,INCOMING.longname,INCOMING.readingsname,INCOMING.type,INCOMING.version,INCOMING.description,INCOMING.namespaceid,INCOMING.status,INCOMING.statuslastmodified,INCOMING.displaykey,INCOMING.guid,INCOMING.clsfid,INCOMING.createddate,INCOMING.modifieddate,INCOMING.workflowstatemapid,INCOMING.lastmodifiedbyid,INCOMING.id), OriginalSql = MERGE INTO PUBLIC.IDCSTATE USING (SELECT ? id, ? shortname, ? longname, ? readingsname, ? type, ? version, ? description, ? namespaceid, ? status, ? statuslastmodified, ? displaykey, ? guid, ? clsfid, ? createddate, ? modifieddate, ? workflowstatemapid, ? lastmodifiedbyid FROM dual) INCOMING ON (PUBLIC.IDCSTATE.id=INCOMING.id) WHEN MATCHED THEN UPDATE SET PUBLIC.IDCSTATE.shortname=INCOMING.shortname,PUBLIC.IDCSTATE.longname=INCOMING.longname,PUBLIC.IDCSTATE.readingsname=INCOMING.readingsname,PUBLIC.IDCSTATE.type=INCOMING.type,PUBLIC.IDCSTATE.version=INCOMING.version,PUBLIC.IDCSTATE.description=INCOMING.description,PUBLIC.IDCSTATE.namespaceid=INCOMING.namespaceid,PUBLIC.IDCSTATE.status=INCOMING.status,PUBLIC.IDCSTATE.statuslastmodified=INCOMING.statuslastmodified,PUBLIC.IDCSTATE.displaykey=INCOMING.displaykey,PUBLIC.IDCSTATE.guid=INCOMING.guid,PUBLIC.IDCSTATE.clsfid=INCOMING.clsfid,PUBLIC.IDCSTATE.createddate=INCOMING.createddate,PUBLIC.IDCSTATE.modifieddate=INCOMING.modifieddate,PUBLIC.IDCSTATE.workflowstatemapid=INCOMING.workflowstatemapid,PUBLIC.IDCSTATE.lastmodifiedbyid=INCOMING.lastmodifiedbyid WHEN NOT MATCHED THEN INSERT (shortname,longname,readingsname,type,version,description,namespaceid,status,statuslastmodified,displaykey,guid,clsfid,createddate,modifieddate,workflowstatemapid,lastmodifiedbyid,id) VALUES (INCOMING.shortname,INCOMING.longname,INCOMING.readingsname,INCOMING.type,INCOMING.version,INCOMING.description,INCOMING.namespaceid,INCOMING.status,INCOMING.statuslastmodified,INCOMING.displaykey,INCOMING.guid,INCOMING.clsfid,INCOMING.createddate,INCOMING.modifieddate,INCOMING.workflowstatemapid,INCOMING.lastmodifiedbyid,INCOMING.id), Error Msg = ORA-00903: invalid table name

at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:513)
... 34 more
2024-02-27 03:53:38,809 ERROR || WorkerSinkTask{id=sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:241)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:78)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
... 10 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:71)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:87)
... 11 more
Caused by: jakarta.persistence.PersistenceException: Converting `org.hibernate.exception.SQLGrammarException` to JPA `PersistenceException` : JDBC exception executing SQL [MERGE INTO PUBLIC.IDCSTATE USING (SELECT ? id, ? shortname, ? longname, ? readingsname, ? type, ? version, ? description, ? namespaceid, ? status, ? statuslastmodified, ? displaykey, ? guid, ? clsfid, ? createddate, ? modifieddate, ? workflowstatemapid, ? lastmodifiedbyid FROM dual) INCOMING ON (PUBLIC.IDCSTATE.id=INCOMING.id) WHEN MATCHED THEN UPDATE SET PUBLIC.IDCSTATE.shortname=INCOMING.shortname,PUBLIC.IDCSTATE.longname=INCOMING.longname,PUBLIC.IDCSTATE.readingsname=INCOMING.readingsname,PUBLIC.IDCSTATE.type=INCOMING.type,PUBLIC.IDCSTATE.version=INCOMING.version,PUBLIC.IDCSTATE.description=INCOMING.description,PUBLIC.IDCSTATE.namespaceid=INCOMING.namespaceid,PUBLIC.IDCSTATE.status=INCOMING.status,PUBLIC.IDCSTATE.statuslastmodified=INCOMING.statuslastmodified,PUBLIC.IDCSTATE.displaykey=INCOMING.displaykey,PUBLIC.IDCSTATE.guid=INCOMING.guid,PUBLIC.IDCSTATE.clsfid=INCOMING.clsfid,PUBLIC.IDCSTATE.createddate=INCOMING.createddate,PUBLIC.IDCSTATE.modifieddate=INCOMING.modifieddate,PUBLIC.IDCSTATE.workflowstatemapid=INCOMING.workflowstatemapid,PUBLIC.IDCSTATE.lastmodifiedbyid=INCOMING.lastmodifiedbyid WHEN NOT MATCHED THEN INSERT (shortname,longname,readingsname,type,version,description,namespaceid,status,statuslastmodified,displaykey,guid,clsfid,createddate,modifieddate,workflowstatemapid,lastmodifiedbyid,id) VALUES (INCOMING.shortname,INCOMING.longname,INCOMING.readingsname,INCOMING.type,INCOMING.version,INCOMING.description,INCOMING.namespaceid,INCOMING.status,INCOMING.statuslastmodified,INCOMING.displaykey,INCOMING.guid,INCOMING.clsfid,INCOMING.createddate,INCOMING.modifieddate,INCOMING.workflowstatemapid,INCOMING.lastmodifiedbyid,INCOMING.id)]
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:165)
at org.hibernate.internal.ExceptionConverterImpl.convert(ExceptionConverterImpl.java:175)
at org.hibernate.query.spi.AbstractQuery.executeUpdate(AbstractQuery.java:654)
at io.debezium.connector.jdbc.JdbcChangeEventSink.writeUpsert(JdbcChangeEventSink.java:257)
at io.debezium.connector.jdbc.JdbcChangeEventSink.write(JdbcChangeEventSink.java:216)
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:68)
... 12 more
Caused by: org.hibernate.exception.SQLGrammarException: JDBC exception executing SQL [MERGE INTO PUBLIC.IDCSTATE USING (SELECT ? id, ? shortname, ? longname, ? readingsname, ? type, ? version, ? description, ? namespaceid, ? status, ? statuslastmodified, ? displaykey, ? guid, ? clsfid, ? createddate, ? modifieddate, ? workflowstatemapid, ? lastmodifiedbyid FROM dual) INCOMING ON (PUBLIC.IDCSTATE.id=INCOMING.id) WHEN MATCHED THEN UPDATE SET PUBLIC.IDCSTATE.shortname=INCOMING.shortname,PUBLIC.IDCSTATE.longname=INCOMING.longname,PUBLIC.IDCSTATE.readingsname=INCOMING.readingsname,PUBLIC.IDCSTATE.type=INCOMING.type,PUBLIC.IDCSTATE.version=INCOMING.version,PUBLIC.IDCSTATE.description=INCOMING.description,PUBLIC.IDCSTATE.namespaceid=INCOMING.namespaceid,PUBLIC.IDCSTATE.status=INCOMING.status,PUBLIC.IDCSTATE.statuslastmodified=INCOMING.statuslastmodified,PUBLIC.IDCSTATE.displaykey=INCOMING.displaykey,PUBLIC.IDCSTATE.guid=INCOMING.guid,PUBLIC.IDCSTATE.clsfid=INCOMING.clsfid,PUBLIC.IDCSTATE.createddate=INCOMING.createddate,PUBLIC.IDCSTATE.modifieddate=INCOMING.modifieddate,PUBLIC.IDCSTATE.workflowstatemapid=INCOMING.workflowstatemapid,PUBLIC.IDCSTATE.lastmodifiedbyid=INCOMING.lastmodifiedbyid WHEN NOT MATCHED THEN INSERT (shortname,longname,readingsname,type,version,description,namespaceid,status,statuslastmodified,displaykey,guid,clsfid,createddate,modifieddate,workflowstatemapid,lastmodifiedbyid,id) VALUES (INCOMING.shortname,INCOMING.longname,INCOMING.readingsname,INCOMING.type,INCOMING.version,INCOMING.description,INCOMING.namespaceid,INCOMING.status,INCOMING.statuslastmodified,INCOMING.displaykey,INCOMING.guid,INCOMING.clsfid,INCOMING.createddate,INCOMING.modifieddate,INCOMING.workflowstatemapid,INCOMING.lastmodifiedbyid,INCOMING.id)]
at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:64)
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:56)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:95)
at org.hibernate.sql.exec.internal.StandardJdbcMutationExecutor.execute(StandardJdbcMutationExecutor.java:97)
at org.hibernate.query.sql.internal.NativeNonSelectQueryPlanImpl.executeUpdate(NativeNonSelectQueryPlanImpl.java:78)
at org.hibernate.query.sql.internal.NativeQueryImpl.doExecuteUpdate(NativeQueryImpl.java:820)
at org.hibernate.query.spi.AbstractQuery.executeUpdate(AbstractQuery.java:643)
... 15 more
Caused by: java.sql.SQLSyntaxErrorException: ORA-00903: invalid table name

at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:509)
at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:461)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1104)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:553)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:269)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:655)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:270)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:91)
at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:970)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1205)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3666)
at oracle.jdbc.driver.T4CPreparedStatement.executeInternal(T4CPreparedStatement.java:1426)
at oracle.jdbc.driver.OraclePreparedStatement.executeLargeUpdate(OraclePreparedStatement.java:3756)
at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3736)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1063)
at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.executeUpdate(NewProxyPreparedStatement.java:1502)
at org.hibernate.sql.exec.internal.StandardJdbcMutationExecutor.execute(StandardJdbcMutationExecutor.java:84)
... 18 more
Caused by: Error : 903, Position : 11, Sql = MERGE INTO PUBLIC.IDCSTATE USING (SELECT :1 id, :2 shortname, :3 longname, :4 readingsname, :5 type, :6 version, :7 description, :8 namespaceid, :9 status, :10 statuslastmodified, :11 displaykey, :12 guid, :13 clsfid, :14 createddate, :15 modifieddate, :16 workflowstatemapid, :17 lastmodifiedbyid FROM dual) INCOMING ON (PUBLIC.IDCSTATE.id=INCOMING.id) WHEN MATCHED THEN UPDATE SET PUBLIC.IDCSTATE.shortname=INCOMING.shortname,PUBLIC.IDCSTATE.longname=INCOMING.longname,PUBLIC.IDCSTATE.readingsname=INCOMING.readingsname,PUBLIC.IDCSTATE.type=INCOMING.type,PUBLIC.IDCSTATE.version=INCOMING.version,PUBLIC.IDCSTATE.description=INCOMING.description,PUBLIC.IDCSTATE.namespaceid=INCOMING.namespaceid,PUBLIC.IDCSTATE.status=INCOMING.status,PUBLIC.IDCSTATE.statuslastmodified=INCOMING.statuslastmodified,PUBLIC.IDCSTATE.displaykey=INCOMING.displaykey,PUBLIC.IDCSTATE.guid=INCOMING.guid,PUBLIC.IDCSTATE.clsfid=INCOMING.clsfid,PUBLIC.IDCSTATE.createddate=INCOMING.createddate,PUBLIC.IDCSTATE.modifieddate=INCOMING.modifieddate,PUBLIC.IDCSTATE.workflowstatemapid=INCOMING.workflowstatemapid,PUBLIC.IDCSTATE.lastmodifiedbyid=INCOMING.lastmodifiedbyid WHEN NOT MATCHED THEN INSERT (shortname,longname,readingsname,type,version,description,namespaceid,status,statuslastmodified,displaykey,guid,clsfid,createddate,modifieddate,workflowstatemapid,lastmodifiedbyid,id) VALUES (INCOMING.shortname,INCOMING.longname,INCOMING.readingsname,INCOMING.type,INCOMING.version,INCOMING.description,INCOMING.namespaceid,INCOMING.status,INCOMING.statuslastmodified,INCOMING.displaykey,INCOMING.guid,INCOMING.clsfid,INCOMING.createddate,INCOMING.modifieddate,INCOMING.workflowstatemapid,INCOMING.lastmodifiedbyid,INCOMING.id), OriginalSql = MERGE INTO PUBLIC.IDCSTATE USING (SELECT ? id, ? shortname, ? longname, ? readingsname, ? type, ? version, ? description, ? namespaceid, ? status, ? statuslastmodified, ? displaykey, ? guid, ? clsfid, ? createddate, ? modifieddate, ? workflowstatemapid, ? lastmodifiedbyid FROM dual) INCOMING ON (PUBLIC.IDCSTATE.id=INCOMING.id) WHEN MATCHED THEN UPDATE SET PUBLIC.IDCSTATE.shortname=INCOMING.shortname,PUBLIC.IDCSTATE.longname=INCOMING.longname,PUBLIC.IDCSTATE.readingsname=INCOMING.readingsname,PUBLIC.IDCSTATE.type=INCOMING.type,PUBLIC.IDCSTATE.version=INCOMING.version,PUBLIC.IDCSTATE.description=INCOMING.description,PUBLIC.IDCSTATE.namespaceid=INCOMING.namespaceid,PUBLIC.IDCSTATE.status=INCOMING.status,PUBLIC.IDCSTATE.statuslastmodified=INCOMING.statuslastmodified,PUBLIC.IDCSTATE.displaykey=INCOMING.displaykey,PUBLIC.IDCSTATE.guid=INCOMING.guid,PUBLIC.IDCSTATE.clsfid=INCOMING.clsfid,PUBLIC.IDCSTATE.createddate=INCOMING.createddate,PUBLIC.IDCSTATE.modifieddate=INCOMING.modifieddate,PUBLIC.IDCSTATE.workflowstatemapid=INCOMING.workflowstatemapid,PUBLIC.IDCSTATE.lastmodifiedbyid=INCOMING.lastmodifiedbyid WHEN NOT MATCHED THEN INSERT (shortname,longname,readingsname,type,version,description,namespaceid,status,statuslastmodified,displaykey,guid,clsfid,createddate,modifieddate,workflowstatemapid,lastmodifiedbyid,id) VALUES (INCOMING.shortname,INCOMING.longname,INCOMING.readingsname,INCOMING.type,INCOMING.version,INCOMING.description,INCOMING.namespaceid,INCOMING.status,INCOMING.statuslastmodified,INCOMING.displaykey,INCOMING.guid,INCOMING.clsfid,INCOMING.createddate,INCOMING.modifieddate,INCOMING.workflowstatemapid,INCOMING.lastmodifiedbyid,INCOMING.id), Error Msg = ORA-00903: invalid table name

at oracle.jdbc.driver.T4CTTIoer11.processError(T4CTTIoer11.java:513)
... 34 more
2024-02-27 03:53:38,814 INFO || Closing session. [io.debezium.connector.jdbc.JdbcChangeEventSink]
2024-02-27 03:53:38,816 INFO || Closing the session factory [io.debezium.connector.jdbc.JdbcChangeEventSink]
2024-02-27 03:53:38,828 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Revoke previously assigned partitions idcstate-0 [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:53:38,829 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Member connector-consumer-sink-connector-0-2c411905-a6c7-4ace-863b-efcefabda43d sending LeaveGroup request to coordinator kafka:29092 (id: 2147483646 rack: null) due to the consumer is being closed [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:53:38,831 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Resetting generation due to: consumer pro-actively leaving the group [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:53:38,832 INFO || [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Request joining group due to: consumer pro-actively leaving the group [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-27 03:53:38,839 INFO || Metrics scheduler closed [org.apache.kafka.common.metrics.Metrics]
2024-02-27 03:53:38,839 INFO || Closing reporter org.apache.kafka.common.metrics.JmxReporter [org.apache.kafka.common.metrics.Metrics]
2024-02-27 03:53:38,840 INFO || Metrics reporters closed [org.apache.kafka.common.metrics.Metrics]
2024-02-27 03:53:38,844 INFO || App info kafka.consumer for connector-consumer-sink-connector-0 unregistered [org.apache.kafka.common.utils.AppInfoParser]
2024-02-27 03:53:43,482 INFO || WorkerSourceTask{id=source-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
2024-02-27 03:54:43,483 INFO || WorkerSourceTask{id=source-connector-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]

Regards
 Hrishi

Chris Cranford

unread,
Feb 27, 2024, 7:31:35 AM2/27/24
to debe...@googlegroups.com
Hi Hrishi -

So "public" is the default schema in PostgreSQL. Is it possible you have some records in your topic where the table was originally in "public" and later moved to "prodfxiapp1"?  On the sink side, since we're talking about Oracle, unless the DW_STAGE user has a default schema of PUBLIC, I would have expected that to have read "DW_STAGE." and not "PUBLIC.". 

Can you please enable TRACE logging and attempt to process a message again and provide the logs.  I'd like to take a look and verify if we have a bug.

Thanks,
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/fa69b7bb-8355-4fc5-9fa6-530e351ebfafn%40googlegroups.com.

Hrishikesh Dutta Gupta

unread,
Feb 27, 2024, 11:16:48 PM2/27/24
to debezium
Hi Chris,

  Thanks for the reply,   No, tables were not moved, One thing I would like to bring it to your attention is the target oracle db server has multiple schemas which could be interconnected, but if I have already specified the target schema in the sink config, it should not have created any issue I think,I have tried to enable 'TRACE' logging by executing the below rest apis but I don't see anything in details in the connector logs, :

curl -s -X PUT http://localhost:8083/admin/loggers/io.debezium -H "Content-Type:application/json" -d '{"level": "TRACE"}'

curl -s -X PUT http://localhost:8083/admin/loggers/org.hibernate -H "Content-Type:application/json" -d '{"level": "TRACE"}'

curl -s -X PUT http://localhost:8083/admin/loggers/oracle.jdbc -H "Content-Type:application/json" -d '{"level": "TRACE"}'


Connector logs:


2024-02-28 04:02:41,687 INFO   ||  AbstractConfig values:
   [org.apache.kafka.common.config.AbstractConfig]
2024-02-28 04:02:41,694 INFO   ||  [Worker clientId=connect-1, groupId=1] Connector sink-connector config updated   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-28 04:02:41,697 INFO   ||  [Worker clientId=connect-1, groupId=1] Rebalance started   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2024-02-28 04:02:41,697 INFO   ||  [Worker clientId=connect-1, groupId=1] (Re-)joining group   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2024-02-28 04:02:41,700 INFO   ||  172.21.0.1 - - [28/Feb/2024:04:02:41 +0000] "POST /connectors/ HTTP/1.1" 201 466 "-" "curl/7.29.0" 23   [org.apache.kafka.connect.runtime.rest.RestServer]
2024-02-28 04:02:41,700 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=14, memberId='connect-1-e277e4d4-eb85-442b-9511-8d8266676b0c', protocol='sessioned'}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2024-02-28 04:02:41,704 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=14, memberId='connect-1-e277e4d4-eb85-442b-9511-8d8266676b0c', protocol='sessioned'}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2024-02-28 04:02:41,705 INFO   ||  [Worker clientId=connect-1, groupId=1] Joined group at generation 14 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-e277e4d4-eb85-442b-9511-8d8266676b0c', leaderUrl='http://localhost:8083/', offset=15, connectorIds=[sink-connector, source-connector], taskIds=[source-connector-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-28 04:02:41,705 INFO   ||  [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset 15   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-28 04:02:41,705 INFO   ||  [Worker clientId=connect-1, groupId=1] Starting connector sink-connector   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-28 04:02:41,705 INFO   ||  Creating connector sink-connector of type io.debezium.connector.jdbc.JdbcSinkConnector   [org.apache.kafka.connect.runtime.Worker]
2024-02-28 04:02:41,706 INFO   ||  SinkConnectorConfig values:
config.action.reload = restart
connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = sink-connector
predicates = []
tasks.max = 1
topics = [idcstate]
topics.regex =
transforms = []
value.converter = null
   [org.apache.kafka.connect.runtime.SinkConnectorConfig]
2024-02-28 04:02:41,706 INFO   ||  EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = sink-connector
predicates = []
tasks.max = 1
topics = [idcstate]
topics.regex =
transforms = []
value.converter = null
   [org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2024-02-28 04:02:41,707 INFO   ||  Instantiated connector sink-connector with version 2.6.0.Alpha2 of type class io.debezium.connector.jdbc.JdbcSinkConnector   [org.apache.kafka.connect.runtime.Worker]
2024-02-28 04:02:41,707 INFO   ||  Finished creating connector sink-connector   [org.apache.kafka.connect.runtime.Worker]
2024-02-28 04:02:41,707 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-28 04:02:41,709 INFO   ||  SinkConnectorConfig values:
config.action.reload = restart
connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = sink-connector
predicates = []
tasks.max = 1
topics = [idcstate]
topics.regex =
transforms = []
value.converter = null
   [org.apache.kafka.connect.runtime.SinkConnectorConfig]
2024-02-28 04:02:41,709 INFO   ||  EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = sink-connector
predicates = []
tasks.max = 1
topics = [idcstate]
topics.regex =
transforms = []
value.converter = null
   [org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2024-02-28 04:02:41,721 INFO   ||  [Worker clientId=connect-1, groupId=1] Tasks [sink-connector-0] configs updated   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-28 04:02:41,724 INFO   ||  [Worker clientId=connect-1, groupId=1] Rebalance started   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2024-02-28 04:02:41,724 INFO   ||  [Worker clientId=connect-1, groupId=1] (Re-)joining group   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2024-02-28 04:02:41,726 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully joined group with generation Generation{generationId=15, memberId='connect-1-e277e4d4-eb85-442b-9511-8d8266676b0c', protocol='sessioned'}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2024-02-28 04:02:41,730 INFO   ||  [Worker clientId=connect-1, groupId=1] Successfully synced group in generation Generation{generationId=15, memberId='connect-1-e277e4d4-eb85-442b-9511-8d8266676b0c', protocol='sessioned'}   [org.apache.kafka.connect.runtime.distributed.WorkerCoordinator]
2024-02-28 04:02:41,730 INFO   ||  [Worker clientId=connect-1, groupId=1] Joined group at generation 15 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-e277e4d4-eb85-442b-9511-8d8266676b0c', leaderUrl='http://localhost:8083/', offset=17, connectorIds=[sink-connector, source-connector], taskIds=[sink-connector-0, source-connector-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-28 04:02:41,730 INFO   ||  [Worker clientId=connect-1, groupId=1] Starting connectors and tasks using config offset 17   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-28 04:02:41,731 INFO   ||  [Worker clientId=connect-1, groupId=1] Starting task sink-connector-0   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-28 04:02:41,732 INFO   ||  Creating task sink-connector-0   [org.apache.kafka.connect.runtime.Worker]
2024-02-28 04:02:41,732 INFO   ||  ConnectorConfig values:
config.action.reload = restart
connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = sink-connector
predicates = []
tasks.max = 1
transforms = []
value.converter = null
   [org.apache.kafka.connect.runtime.ConnectorConfig]
2024-02-28 04:02:41,733 INFO   ||  EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = sink-connector
predicates = []
tasks.max = 1
transforms = []
value.converter = null
   [org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2024-02-28 04:02:41,733 INFO   ||  TaskConfig values:
task.class = class io.debezium.connector.jdbc.JdbcSinkConnectorTask
   [org.apache.kafka.connect.runtime.TaskConfig]
2024-02-28 04:02:41,733 INFO   ||  Instantiated task sink-connector-0 with version 2.6.0.Alpha2 of type io.debezium.connector.jdbc.JdbcSinkConnectorTask   [org.apache.kafka.connect.runtime.Worker]
2024-02-28 04:02:41,734 INFO   ||  JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = true
   [org.apache.kafka.connect.json.JsonConverterConfig]
2024-02-28 04:02:41,734 INFO   ||  Set up the key converter class org.apache.kafka.connect.json.JsonConverter for task sink-connector-0 using the worker config   [org.apache.kafka.connect.runtime.Worker]
2024-02-28 04:02:41,734 INFO   ||  JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = true
   [org.apache.kafka.connect.json.JsonConverterConfig]
2024-02-28 04:02:41,734 INFO   ||  Set up the value converter class org.apache.kafka.connect.json.JsonConverter for task sink-connector-0 using the worker config   [org.apache.kafka.connect.runtime.Worker]
2024-02-28 04:02:41,734 INFO   ||  Set up the header converter class org.apache.kafka.connect.storage.SimpleHeaderConverter for task sink-connector-0 using the worker config   [org.apache.kafka.connect.runtime.Worker]
2024-02-28 04:02:41,735 INFO   ||  Initializing: org.apache.kafka.connect.runtime.TransformationChain{}   [org.apache.kafka.connect.runtime.Worker]
2024-02-28 04:02:41,735 INFO   ||  SinkConnectorConfig values:
config.action.reload = restart
connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = sink-connector
predicates = []
tasks.max = 1
topics = [idcstate]
topics.regex =
transforms = []
value.converter = null
   [org.apache.kafka.connect.runtime.SinkConnectorConfig]
2024-02-28 04:02:41,736 INFO   ||  EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.debezium.connector.jdbc.JdbcSinkConnector
errors.deadletterqueue.context.headers.enable = false
errors.deadletterqueue.topic.name =
errors.deadletterqueue.topic.replication.factor = 3
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = sink-connector
predicates = []
tasks.max = 1
topics = [idcstate]
topics.regex =
transforms = []
value.converter = null
   [org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig]
2024-02-28 04:02:41,736 INFO   ||  ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.include.jmx.reporter = true
auto.offset.reset = earliest
bootstrap.servers = [kafka:29092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = connector-consumer-sink-connector-0
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = connect-sink-connector
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 45000
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
   [org.apache.kafka.clients.consumer.ConsumerConfig]
2024-02-28 04:02:41,744 INFO   ||  These configurations '[metrics.context.connect.kafka.cluster.id, metrics.context.connect.group.id]' were supplied but are not used yet.   [org.apache.kafka.clients.consumer.ConsumerConfig]
2024-02-28 04:02:41,744 INFO   ||  Kafka version: 3.6.1   [org.apache.kafka.common.utils.AppInfoParser]
2024-02-28 04:02:41,744 INFO   ||  Kafka commitId: 5e3c2b738d253ff5   [org.apache.kafka.common.utils.AppInfoParser]
2024-02-28 04:02:41,744 INFO   ||  Kafka startTimeMs: 1709092961744   [org.apache.kafka.common.utils.AppInfoParser]
2024-02-28 04:02:41,747 INFO   ||  [Worker clientId=connect-1, groupId=1] Finished starting connectors and tasks   [org.apache.kafka.connect.runtime.distributed.DistributedHerder]
2024-02-28 04:02:41,748 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Subscribed to topic(s): idcstate   [org.apache.kafka.clients.consumer.KafkaConsumer]
2024-02-28 04:02:41,750 INFO   ||  Starting JdbcSinkConnectorConfig with configuration:   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     connector.class = io.debezium.connector.jdbc.JdbcSinkConnector   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     primary.key.mode = record_key   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     connection.password = ********   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     topics = idcstate   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     tasks.max = 1   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     connection.username = DW_STAGE1   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     quote.identifiers = false   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     schema.evolution = basic   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     task.class = io.debezium.connector.jdbc.JdbcSinkConnectorTask   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     name = sink-connector   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     primary.key.fields = id   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     connection.url = jdbc:oracle:thin:@*****:1521/TEST   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,750 INFO   ||     insert.mode = upsert   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:02:41,776 INFO   ||  HHH000130: Instantiating explicit connection provider: org.hibernate.c3p0.internal.C3P0ConnectionProvider   [org.hibernate.engine.jdbc.connections.internal.ConnectionProviderInitiator]
2024-02-28 04:02:41,776 INFO   ||  HHH010002: C3P0 using driver: null at URL: jdbc:oracle:thin:@*****:1521/TEST   [org.hibernate.orm.connections.pooling.c3p0]
2024-02-28 04:02:41,776 INFO   ||  HHH10001001: Connection properties: {password=****, user=DW_STAGE1}   [org.hibernate.orm.connections.pooling.c3p0]
2024-02-28 04:02:41,776 INFO   ||  HHH10001003: Autocommit mode: false   [org.hibernate.orm.connections.pooling.c3p0]
2024-02-28 04:02:41,776 WARN   ||  HHH10001006: No JDBC Driver class was specified by property hibernate.connection.driver_class   [org.hibernate.orm.connections.pooling.c3p0]
2024-02-28 04:02:41,818 INFO   ||  HHH10001007: JDBC isolation level: <unknown>   [org.hibernate.orm.connections.pooling.c3p0]
2024-02-28 04:02:41,827 INFO   ||  Initializing c3p0 pool... com.mchange.v2.c3p0.PoolBackedDataSource@94dd0914 [ connectionPoolDataSource -> com.mchange.v2.c3p0.WrapperConnectionPoolDataSource@63de8183 [ acquireIncrement -> 32, acquireRetryAttempts -> 30, acquireRetryDelay -> 1000, autoCommitOnClose -> false, automaticTestTable -> null, breakAfterAcquireFailure -> false, checkoutTimeout -> 0, connectionCustomizerClassName -> null, connectionTesterClassName -> com.mchange.v2.c3p0.impl.DefaultConnectionTester, contextClassLoaderSource -> caller, debugUnreturnedConnectionStackTraces -> false, factoryClassLocation -> null, forceIgnoreUnresolvedTransactions -> false, forceSynchronousCheckins -> false, identityToken -> 1bqvnr9b11tzu3v41wdm3wu|3571e7d3, idleConnectionTestPeriod -> 0, initialPoolSize -> 5, maxAdministrativeTaskTime -> 0, maxConnectionAge -> 0, maxIdleTime -> 0, maxIdleTimeExcessConnections -> 0, maxPoolSize -> 32, maxStatements -> 0, maxStatementsPerConnection -> 0, minPoolSize -> 5, nestedDataSource -> com.mchange.v2.c3p0.DriverManagerDataSource@dbf8c097 [ description -> null, driverClass -> null, factoryClassLocation -> null, forceUseNamedDriverClass -> false, identityToken -> 1bqvnr9b11tzu3v41wdm3wu|65f2f1b5, jdbcUrl -> jdbc:oracle:thin:@*****:1521/TEST, properties -> {password=******, user=******} ], preferredTestQuery -> null, privilegeSpawnedThreads -> false, propertyCycle -> 0, statementCacheNumDeferredCloseThreads -> 0, testConnectionOnCheckin -> false, testConnectionOnCheckout -> false, unreturnedConnectionTimeout -> 0, usesTraditionalReflectiveProxies -> false; userOverrides: {} ], dataSourceName -> null, extensions -> {}, factoryClassLocation -> null, identityToken -> 1bqvnr9b11tzu3v41wdm3wu|2686fe8c, numHelperThreads -> 3 ]   [com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource]
2024-02-28 04:02:41,876 INFO   ||  HHH000400: Using dialect: org.hibernate.dialect.OracleDialect   [SQL dialect]
2024-02-28 04:02:41,936 INFO   ||  HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform]   [org.hibernate.engine.transaction.jta.platform.internal.JtaPlatformInitiator]
2024-02-28 04:02:41,938 INFO   ||  Using dialect io.debezium.connector.jdbc.dialect.oracle.OracleDatabaseDialect   [io.debezium.connector.jdbc.dialect.DatabaseDialectResolver]
2024-02-28 04:02:41,951 INFO   ||  Database TimeZone: +00:00 (database), GMT (session)   [io.debezium.connector.jdbc.dialect.GeneralDatabaseDialect]
2024-02-28 04:02:41,951 INFO   ||  Database version 11.2.0   [io.debezium.connector.jdbc.JdbcChangeEventSink]
2024-02-28 04:02:41,951 INFO   ||  WorkerSinkTask{id=sink-connector-0} Sink task finished initialization and start   [org.apache.kafka.connect.runtime.WorkerSinkTask]
2024-02-28 04:02:41,951 INFO   ||  WorkerSinkTask{id=sink-connector-0} Executing sink task   [org.apache.kafka.connect.runtime.WorkerSinkTask]
2024-02-28 04:02:41,957 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Cluster ID: 1MB_ZvQxTg69nqqPmkEqKQ   [org.apache.kafka.clients.Metadata]
2024-02-28 04:02:41,957 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Discovered group coordinator kafka:29092 (id: 2147483646 rack: null)   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:02:41,958 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] (Re-)joining group   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:02:41,962 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Request joining group due to: need to re-join with the given member-id: connector-consumer-sink-connector-0-83ef4160-9b0f-47a2-aa07-1620d3773f39   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:02:41,962 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException)   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:02:41,962 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] (Re-)joining group   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:02:44,967 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Successfully joined group with generation Generation{generationId=1, memberId='connector-consumer-sink-connector-0-83ef4160-9b0f-47a2-aa07-1620d3773f39', protocol='range'}   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:02:44,968 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Finished assignment for group at generation 1: {connector-consumer-sink-connector-0-83ef4160-9b0f-47a2-aa07-1620d3773f39=Assignment(partitions=[idcstate-0])}   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:02:44,970 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Successfully synced group in generation Generation{generationId=1, memberId='connector-consumer-sink-connector-0-83ef4160-9b0f-47a2-aa07-1620d3773f39', protocol='range'}   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:02:44,971 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Notifying assignor about the new Assignment(partitions=[idcstate-0])   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:02:44,971 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Adding newly assigned partitions: idcstate-0   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:02:44,972 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Found no committed offset for partition idcstate-0   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:02:44,976 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Resetting offset for partition idcstate-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:29092 (id: 1 rack: null)], epoch=0}}.   [org.apache.kafka.clients.consumer.internals.SubscriptionState]
2024-02-28 04:02:45,448 WARN   ||  SQL Error: 903, SQLState: 42000   [org.hibernate.engine.jdbc.spi.SqlExceptionHelper]
2024-02-28 04:02:45,448 ERROR  ||  ORA-00903: invalid table name
   [org.hibernate.engine.jdbc.spi.SqlExceptionHelper]
2024-02-28 04:02:45,450 ERROR  ||  Failed to process record: Failed to process a sink record   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]

org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:210)
at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:188)
at java.base/java.util.HashMap.forEach(HashMap.java:1337)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:188)
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:149)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.hibernate.exception.SQLGrammarException: error executing work
at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:89)

at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:56)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:95)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:309)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:962)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:949)
at io.debezium.connector.jdbc.RecordWriter.write(RecordWriter.java:51)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:203)
... 17 more
Caused by: java.sql.BatchUpdateException: ORA-00903: invalid table name

at oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10891)
at oracle.jdbc.driver.OraclePreparedStatement.executeBatchFromQueue(OraclePreparedStatement.java:10469)
at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:10018)
at oracle.jdbc.driver.OracleStatement.executeBatch(OracleStatement.java:5420)
at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:289)
at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.executeBatch(NewProxyPreparedStatement.java:2544)
at io.debezium.connector.jdbc.RecordWriter.lambda$processBatch$0(RecordWriter.java:91)
at org.hibernate.jdbc.WorkExecutor.executeWork(WorkExecutor.java:37)
at org.hibernate.internal.AbstractSharedSessionContract.lambda$doWork$3(AbstractSharedSessionContract.java:946)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:304)
... 21 more
2024-02-28 04:03:28,107 INFO   ||  [Producer clientId=connector-producer-source-connector-0] Node -1 disconnected.   [org.apache.kafka.clients.NetworkClient]
2024-02-28 04:03:41,746 ERROR  ||  JDBC sink connector failure   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]

org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:210)
at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:188)
at java.base/java.util.HashMap.forEach(HashMap.java:1337)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:188)
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:149)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.hibernate.exception.SQLGrammarException: error executing work
at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:89)

at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:56)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:95)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:309)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:962)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:949)
at io.debezium.connector.jdbc.RecordWriter.write(RecordWriter.java:51)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:203)
... 17 more
Caused by: java.sql.BatchUpdateException: ORA-00903: invalid table name

at oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10891)
at oracle.jdbc.driver.OraclePreparedStatement.executeBatchFromQueue(OraclePreparedStatement.java:10469)
at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:10018)
at oracle.jdbc.driver.OracleStatement.executeBatch(OracleStatement.java:5420)
at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:289)
at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.executeBatch(NewProxyPreparedStatement.java:2544)
at io.debezium.connector.jdbc.RecordWriter.lambda$processBatch$0(RecordWriter.java:91)
at org.hibernate.jdbc.WorkExecutor.executeWork(WorkExecutor.java:37)
at org.hibernate.internal.AbstractSharedSessionContract.lambda$doWork$3(AbstractSharedSessionContract.java:946)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:304)
... 21 more
2024-02-28 04:03:41,747 ERROR  ||  WorkerSinkTask{id=sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: JDBC sink connector failure   [org.apache.kafka.connect.runtime.WorkerSinkTask]

org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:96)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:210)
at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:188)
at java.base/java.util.HashMap.forEach(HashMap.java:1337)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:188)
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:149)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
... 12 more
Caused by: org.hibernate.exception.SQLGrammarException: error executing work
at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:89)

at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:56)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:95)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:309)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:962)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:949)
at io.debezium.connector.jdbc.RecordWriter.write(RecordWriter.java:51)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:203)
... 17 more
Caused by: java.sql.BatchUpdateException: ORA-00903: invalid table name

at oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10891)
at oracle.jdbc.driver.OraclePreparedStatement.executeBatchFromQueue(OraclePreparedStatement.java:10469)
at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:10018)
at oracle.jdbc.driver.OracleStatement.executeBatch(OracleStatement.java:5420)
at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:289)
at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.executeBatch(NewProxyPreparedStatement.java:2544)
at io.debezium.connector.jdbc.RecordWriter.lambda$processBatch$0(RecordWriter.java:91)
at org.hibernate.jdbc.WorkExecutor.executeWork(WorkExecutor.java:37)
at org.hibernate.internal.AbstractSharedSessionContract.lambda$doWork$3(AbstractSharedSessionContract.java:946)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:304)
... 21 more
2024-02-28 04:03:41,748 ERROR  ||  WorkerSinkTask{id=sink-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)

at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: JDBC sink connector failure
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:96)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
... 11 more

Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to process a sink record
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:210)
at io.debezium.connector.jdbc.JdbcChangeEventSink.lambda$flushBuffers$2(JdbcChangeEventSink.java:188)
at java.base/java.util.HashMap.forEach(HashMap.java:1337)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffers(JdbcChangeEventSink.java:188)
at io.debezium.connector.jdbc.JdbcChangeEventSink.execute(JdbcChangeEventSink.java:149)
at io.debezium.connector.jdbc.JdbcSinkConnectorTask.put(JdbcSinkConnectorTask.java:103)
... 12 more
Caused by: org.hibernate.exception.SQLGrammarException: error executing work
at org.hibernate.exception.internal.SQLStateConversionDelegate.convert(SQLStateConversionDelegate.java:89)

at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:56)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:109)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:95)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:309)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:962)
at org.hibernate.internal.AbstractSharedSessionContract.doWork(AbstractSharedSessionContract.java:949)
at io.debezium.connector.jdbc.RecordWriter.write(RecordWriter.java:51)
at io.debezium.connector.jdbc.JdbcChangeEventSink.flushBuffer(JdbcChangeEventSink.java:203)
... 17 more
Caused by: java.sql.BatchUpdateException: ORA-00903: invalid table name

at oracle.jdbc.driver.OraclePreparedStatement.generateBatchUpdateException(OraclePreparedStatement.java:10891)
at oracle.jdbc.driver.OraclePreparedStatement.executeBatchFromQueue(OraclePreparedStatement.java:10469)
at oracle.jdbc.driver.OraclePreparedStatement.executeLargeBatch(OraclePreparedStatement.java:10018)
at oracle.jdbc.driver.OracleStatement.executeBatch(OracleStatement.java:5420)
at oracle.jdbc.driver.OracleStatementWrapper.executeBatch(OracleStatementWrapper.java:289)
at com.mchange.v2.c3p0.impl.NewProxyPreparedStatement.executeBatch(NewProxyPreparedStatement.java:2544)
at io.debezium.connector.jdbc.RecordWriter.lambda$processBatch$0(RecordWriter.java:91)
at org.hibernate.jdbc.WorkExecutor.executeWork(WorkExecutor.java:37)
at org.hibernate.internal.AbstractSharedSessionContract.lambda$doWork$3(AbstractSharedSessionContract.java:946)
at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.coordinateWork(JdbcCoordinatorImpl.java:304)
... 21 more
2024-02-28 04:03:41,749 INFO   ||  Closing session.   [io.debezium.connector.jdbc.JdbcChangeEventSink]
2024-02-28 04:03:41,750 INFO   ||  Closing the session factory   [io.debezium.connector.jdbc.JdbcSinkConnectorTask]
2024-02-28 04:03:41,758 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Revoke previously assigned partitions idcstate-0   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:03:41,758 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Member connector-consumer-sink-connector-0-83ef4160-9b0f-47a2-aa07-1620d3773f39 sending LeaveGroup request to coordinator kafka:29092 (id: 2147483646 rack: null) due to the consumer is being closed   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:03:41,758 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Resetting generation and member id due to: consumer pro-actively leaving the group   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:03:41,758 INFO   ||  [Consumer clientId=connector-consumer-sink-connector-0, groupId=connect-sink-connector] Request joining group due to: consumer pro-actively leaving the group   [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
2024-02-28 04:03:42,145 INFO   ||  Metrics scheduler closed   [org.apache.kafka.common.metrics.Metrics]
2024-02-28 04:03:42,145 INFO   ||  Closing reporter org.apache.kafka.common.metrics.JmxReporter   [org.apache.kafka.common.metrics.Metrics]
2024-02-28 04:03:42,145 INFO   ||  Metrics reporters closed   [org.apache.kafka.common.metrics.Metrics]
2024-02-28 04:03:42,149 INFO   ||  App info kafka.consumer for connector-consumer-sink-connector-0 unregistered   [org.apache.kafka.common.utils.AppInfoParser]


Regards
 Hrishi

Hrishikesh Dutta Gupta

unread,
Feb 28, 2024, 6:17:40 AM2/28/24
to debezium
Hi ,

  We had another in-depth look at this. Basically this debezium sink connector is throwing this  'invalid table name' against this DW_STAGE schema of 'TEST' oracle instance and we have an oracle CDC running against few tables of another schema(prodfxiapp1) in the same instance(TEST) where prodfxiapp1 is the publisher and DW_STAGE is the subscriber.
This cdc installation was done as part of the existing workflow only , Our source used to be in oracle earlier and we want to move it to postgres without changing the oracle target at this point of time, hence we are trying to use debezium  source and sink connectors along with kafka to bring data from postgres to oracle and keep the rest of the workflow as it is.
We are not seeing this issue with the debezium sink connector anymore once we used a new oracle instance in the same db server , but we need this oracle cdc to be part of the workflow also. The sink connector should ideally just push the kafka topic data into the target oracle tables, right? That's why I think sink connector logs will be more useful to see why is it actually failing with 'invalid table name'. Can you please share some pointers here? Let me know if you need any more information here.

Regards
 Hrishi

Hrishikesh Dutta Gupta

unread,
Mar 6, 2024, 12:29:00 AM3/6/24
to debezium
Hi @**Chris Cranford (Naros)** , We have figured out here what was causing problem, actually in the target oracle DB schema a public synonym was created forcefully  to access these tables from other DB users  from the same server(This is a project specific requirement), we have tweaked the logic to use private synonyms and debezium sink connector is working fine now, Thanks for your help.

Regards
 Hrishi
Reply all
Reply to author
Forward
0 new messages