Debezium Oracle Connector and Signal table

31 views
Skip to first unread message

Johan Vandevenne

unread,
Jun 22, 2022, 2:37:50 AMJun 22
to debezium
Hello,

I'm currently evaluating Debezium with the Oracle connector version 1.9.3. I'm trying to trigger snapshots using the Signaling table but I can't get it to work.
I've created the tabel using the script:

CREATE TABLE <tableName> (id VARCHAR(<varcharValue>) PRIMARY KEY, type VARCHAR(<varcharValue>) NOT NULL, data VARCHAR(<varcharValue>) NULL);

I've enabled supplemental logging:
ALTER TABLE <tablename> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

I've added the table to the "table.include.list" and the "signal.data.collection"

In the debug logging of the connector I can see that the connector detects changes in the table when I insert a record, however, the supplied actions (log or execute-snapshot) are not executed.

What could be the cause of this ?

kind regards
Johan

Nathan Smit

unread,
Jun 22, 2022, 3:04:57 AMJun 22
to debezium
Hey there,

Could you possibly share your config?  If your signal table is picking up inserts and you're not getting errors, maybe make sure the signal.data.collection item is formatted like this: <databaseName>.<schemaName>.<tableName>

This is what my config looks like for example:
table.include.list=DEBEZIUM.DEBEZIUM_SIGNAL
signal.data.collection=GLODCP.DEBEZIUM.DEBEZIUM_SIGNAL

If that doesn't work, it'd also help to see the insert you're running.

Johan Vandevenne

unread,
Jun 22, 2022, 3:21:50 AMJun 22
to debezium
Hello,

Here's my config:

{
  "name" : "Debezium-johan",
  "config" : {
    "connector.class" : "io.debezium.connector.oracle.OracleConnector",
    "database.dbname" : "cdbdocd",
    "database.history.consumer.bootstrap.servers" : "xxx:9092",
    "database.history.consumer.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"xxx\";",
    "database.history.consumer.sasl.mechanism" : "PLAIN",
    "database.history.consumer.security.protocol" : "SASL_PLAINTEXT",
    "database.history.consumer.ssl.endpoint.identification.algorithm" : "https",
    "database.history.kafka.bootstrap.servers" : "xxx:9092",
    "database.history.kafka.topic" : "dbz-1-schema-changes.inventory",
    "database.history.producer.bootstrap.servers" : "xxx:9092",
    "database.history.producer.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"xxx\";",
    "database.history.producer.sasl.mechanism" : "PLAIN",
    "database.history.producer.security.protocol" : "SASL_PLAINTEXT",
    "database.history.producer.ssl.endpoint.identification.algorithm" : "https",
    "database.history.sasl.jaas.config" : "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"xxx\";",
    "database.history.sasl.mechanism" : "PLAIN",
    "database.history.security.protocol" : "SASL_PLAINTEXT",
    "database.history.ssl.endpoint.identification.algorithm" : "https",
    "database.hostname" : "xxx",
    "database.password" : "dbz",
    "database.pdb.name" : "documend",
    "database.port" : "1521",
    "database.server.name" : "documend",
    "database.tablename.case.insensitive" : "true",
    "database.url" : "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=xxx)(PORT=1521))(CONNECT_DATA=(SERVER=dedicated)(SERVICE_NAME=xxx)))",
    "database.user" : "c##dbzuser",
    "errors.log.enable" : "true",
    "errors.log.include.messages" : "true",
    "internal.database.oracle.version" : "12+",
    "internal.log.mining.log.query.max.retries" : "10",
    "internal.log.mining.query.logs.for.snapshot.offset" : "false",
    "key.converter" : "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable" : "true",
    "log.mining.archive.log.only.scn.poll.interval.ms" : "30000",
    "name" : "Debezium-johan",
    "rac.nodes" : "10.30.24.197,10.30.24.198",
    "signal.data.collection" : "cdbdocd.ADMPZ90894.DEBEZIUM_SIGNAL",
    "table.include.list" : "ADMPZ90894.MYORDERS,ADMPZ90894.PRODUCTS,ADMPZ90894.CUSTOMERS,ADMPZ90894.ORDERITEM,ADMPZ90894.DEBEZIUM_SIGNAL",
    "time.precision.mode" : "connect",
    "value.converter" : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable" : "true"
  }
}

This is the insert statement:
INSERT INTO ADMPZ90894.DEBEZIUM_SIGNAL (id, type, data) VALUES('ad-hoc-11', 'log', '{"data-collections": ["ADMPZ90894.MYORDERS"],"type":"incremental"}');

Nathan Smit

unread,
Jun 22, 2022, 3:44:47 AMJun 22
to debezium
that looks fine to me....maybe a dumb suggestion:  Have you tried upper-casing the db name in signal.data.collection?  So instead do    "signal.data.collection" : "CDBDOCD.ADMPZ90894.DEBEZIUM_SIGNAL",

Otherwise you may need to put the connector into debug mode to try and get a clearer idea of why the signal isn't being picked up

Johan Vandevenne

unread,
Jun 22, 2022, 4:03:58 AMJun 22
to debe...@googlegroups.com
Hello Nathan, 

I tried your suggestion but still no lock. I'm already running the connector in DEBUG mode. I can see the record is picked up by the changeprocessor but nothing happens:

[2022-06-22 09:59:08,521] DEBUG Fetching results for SCN [101019099, 101019144] (io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor)
[2022-06-22 09:59:08,717] DEBUG Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=documend}, sourceOffset={commit_scn=101019123, transaction_id=null, snapshot_scn=99657009, scn=101019121}} ConnectRecord{topic='documend.ADMPZ90894.DEBEZIUM_SIGNAL', kafkaPartition=null, key=Struct{ID=ad-hoc-13}, keySchema=Schema{documend.ADMPZ90894.DEBEZIUM_SIGNAL.Key:STRUCT}, value=Struct{before=Struct{ID=ad-hoc-13,TYPE=log,DATA=*******************************},source=Struct{version=1.9.3.Final,connector=oracle,name=documend,ts_ms=1655884746000,db=DOCUMEND,schema=ADMPZ90894,table=DEBEZIUM_SIGNAL,txId=110000001da40000,scn=101019121,commit_scn=101019123},op=d,ts_ms=1655884748717}, valueSchema=Schema{documend.ADMPZ90894.DEBEZIUM_SIGNAL.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=[ConnectHeader(key=__debezium.newkey, value=Struct{ID=ad-hoc-14}, schema=Schema{documend.ADMPZ90894.DEBEZIUM_SIGNAL.Key:STRUCT})])}]' (io.debezium.connector.base.ChangeEventQueue)
[2022-06-22 09:59:08,717] DEBUG Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=documend}, sourceOffset={commit_scn=101019123, transaction_id=null, snapshot_scn=99657009, scn=101019121}} ConnectRecord{topic='documend.ADMPZ90894.DEBEZIUM_SIGNAL', kafkaPartition=null, key=Struct{ID=ad-hoc-13}, keySchema=Schema{documend.ADMPZ90894.DEBEZIUM_SIGNAL.Key:STRUCT}, value=null, valueSchema=null, timestamp=null, headers=ConnectHeaders(headers=[ConnectHeader(key=__debezium.newkey, value=Struct{ID=ad-hoc-14}, schema=Schema{documend.ADMPZ90894.DEBEZIUM_SIGNAL.Key:STRUCT})])}]' (io.debezium.connector.base.ChangeEventQueue)
[2022-06-22 09:59:08,717] DEBUG Enqueuing source record 'DataChangeEvent [record=SourceRecord{sourcePartition={server=documend}, sourceOffset={commit_scn=101019123, transaction_id=null, snapshot_scn=99657009, scn=101019121}} ConnectRecord{topic='documend.ADMPZ90894.DEBEZIUM_SIGNAL', kafkaPartition=null, key=Struct{ID=ad-hoc-14}, keySchema=Schema{documend.ADMPZ90894.DEBEZIUM_SIGNAL.Key:STRUCT}, value=Struct{after=Struct{ID=ad-hoc-14,TYPE=log,DATA=*******************************},source=Struct{version=1.9.3.Final,connector=oracle,name=documend,ts_ms=1655884746000,db=DOCUMEND,schema=ADMPZ90894,table=DEBEZIUM_SIGNAL,txId=110000001da40000,scn=101019121,commit_scn=101019123},op=c,ts_ms=1655884748717}, valueSchema=Schema{documend.ADMPZ90894.DEBEZIUM_SIGNAL.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=[ConnectHeader(key=__debezium.oldkey, value=Struct{ID=ad-hoc-13}, schema=Schema{documend.ADMPZ90894.DEBEZIUM_SIGNAL.Key:STRUCT})])}]' (io.debezium.connector.base.ChangeEventQueue)
[2022-06-22 09:59:08,717] DEBUG Counters{rows=9, stuckCount=0, dmlCount=1, ddlCount=0, insertCount=0, updateCount=1, deleteCount=0, commitCount=4, rollbackCount=0, tableMetadataCount=0}. (io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor)
[2022-06-22 09:59:08,717] DEBUG Processed in 3 ms. Lag: 2717. Offset SCN: 101019121, Offset Commit SCN: 101019123, Active Transactions: 0, Sleep: 3000 (io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor)
[2022-06-22 09:59:08,724] DEBUG Oracle Session UGA 8.33MB (max = 25.04MB), PGA 62.39MB (max = 149.2MB) (io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource)
[2022-06-22 09:59:09,003] DEBUG checking for more records... (io.debezium.connector.base.ChangeEventQueue)
[2022-06-22 09:59:09,003] DEBUG no records available yet, sleeping a bit... (io.debezium.connector.base.ChangeEventQueue)

--
You received this message because you are subscribed to a topic in the Google Groups "debezium" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/debezium/TetE9K_muzQ/unsubscribe.
To unsubscribe from this group and all its topics, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/f88a67c5-b1b1-4e79-88bd-8ea3232598fcn%40googlegroups.com.

Johan Vandevenne

unread,
Jun 22, 2022, 4:48:19 AMJun 22
to debe...@googlegroups.com
Hello,

I figured it out.
Instead of specifying 
"signal.data.collection" : "cdbdocd.ADMPZ90894.DEBEZIUM_SIGNAL",

I had to specify:
"signal.data.collection" : "documend.ADMPZ90894.DEBEZIUM_SIGNAL",

where "documend" is the PDB name of the database. "cdbdocd" is the database name.

kind regards
Johan

Nathan Smit

unread,
Jun 22, 2022, 5:50:08 AMJun 22
to debezium
That's great!  I was wondering if running with PDB made a difference to the db that needs to be specified there (I'm on 19c but we don't use PDB currently).  Perhaps worth creating a pull request to clarify this in the documentation

Chris Cranford

unread,
Jun 22, 2022, 10:18:16 AMJun 22
to debe...@googlegroups.com, Nathan Smit
Hi Nathan & Johan -

So for any others who may stumble onto this thread, the Oracle connector has 2 key configuration options

    database.dbname
    database.pdb.name

If you supply a PDB configuration, then outside of "database.dbname" (which would always be the root database), all references to a database refer to the PDB.  When you do not use PDBs (or don't use CDB multi-tenancy), then you don't supply the "database.pdb.name" option and all references to database name align with the value supplied in the "database.dbname" property.  It's a bit confusing and it's most definitely an area we want to improve on.  But if there are any documentation clarifications you'd like to see, we'd welcome those.

HTH,
CC
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/c85e7752-b603-4323-b07b-b391640479c7n%40googlegroups.com.

Reply all
Reply to author
Forward
0 new messages