Oracle CDC events from source signal table causing exceptions in signal processing

86 views
Skip to first unread message

Allain Legacy

unread,
Apr 10, 2024, 11:03:58 AMApr 10
to debezium
We've noticed an issue with how the LogMiner CDC events are received from the signal table in Oracle that is leading to an exception in the signal processing code.    

What appears to be happening is that when we do a single INSERT statement into the CDC table in Oracle what is received in the Debezium Oracle connector is two separate events -- a CREATE immediately followed by an UPDATE.   The CREATE has 2 of the 3 fields set properly, but the 'data' field is set to "LM_EMPTY_STRING", then the UPDATE is received in which the 'data' field is set to what we had written in the INSERT statement.    The signal processing code throws an exception because the string "LM_EMPTY_STRING" cannot be parsed as JSON, and then the UPDATE is ignored because only CREATE events are considered for signal processing. 

Based on some cursory reading, this splitting of INSERT into separate CREATE and UPDATE events appears to be a "normal" thing in Oracle when dealing with larger VARCHAR2 attributes.   We had originally defined it as 2048 bytes, then reduced it to 255 bytes, but the behaviour remained unchanged.   We could further reduce it but it will be difficult to write the required JSON to trigger snapshots for multiple tables in one operation as we'll exceed the column size. 

I'm by no means an Oracle expert so perhaps there is a setting to avoid this or a Debezium connector setting to handle this (although I didn't see anything in the code that could handle such a scenario).


Our CDC table is defined as follows:

```
SQL> describe CDC_SIGNAL;
 Name                                      Null?    Type
 ----------------------------------------- -------- ----------------------------
 ID                                        NOT NULL VARCHAR2(42 CHAR)
 TYPE                                      NOT NULL VARCHAR2(32 CHAR)
 DATA                                               VARCHAR2(2048CHAR)

```

these are the 2 separate CDC events received:

CREATE:
```
2024-04-10 13:35:30,644 TRACE | debezium-oracleconnector-test-oracle-change-event-source-coordinator | io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:282)ogger{55} | Received change record Struct{after=Struct{ID=1012,TYPE=log,DATA=LM_EMPTY_STRING},source=Struct{version=2.6.0.Final,connector=oracle,name=test-oracle,ts_ms=1712756127000,db=OTNE,ts_us=1712756127000000,ts_ns=1712756127000000000,schema=MY_SCHEMA,table=CDC_SIGNAL,txId=05000d00970a0000,scn=4017799,commit_scn=4017832,rs_id=0x0002df.0002a957.0170,ssn=0,redo_thread=1,user_name=MY_SCHEMA},op=c,ts_ms=1712756130644,ts_us=1712756130644233,ts_ns=1712756130644233711} for CREATE operation on key Struct{ID=1012} with context OracleOffsetContext [scn=4017799, commit_scn=["4017832:1:05000d00970a0000"], lcr_position=null]
```


UPDATE:
```
2024-04-10 13:35:30,659 TRACE | debezium-oracleconnector-test-oracle-change-event-source-coordinator | io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:282)ogger{55} | Received change record Struct{before=Struct{ID=1012,TYPE=log},after=Struct{ID=1012,TYPE=log,DATA={"message": "Signal message at offset {}"}},source=Struct{version=2.6.0.Final,connector=oracle,name=test-oracle,ts_ms=1712756127000,db=OTNE,ts_us=1712756127000000,ts_ns=1712756127000000000,schema=MY_SCHEMA,table=CDC_SIGNAL,txId=05000d00970a0000,scn=4017799,commit_scn=4017832,rs_id=0x0002df.0002a958.0150,ssn=0,redo_thread=1,user_name=MY_SCHEMA},op=u,ts_ms=1712756130659,ts_us=1712756130659681,ts_ns=1712756130659681284} for UPDATE operation on key Struct{ID=1012} with context OracleOffsetContext [scn=4017799, commit_scn=["4017832:1:05000d00970a0000"], lcr_position=null]
```

and this is the exception raised:

```
2024-04-10 13:35:30,647 DEBUG | debezium-oracleconnector-test-oracle-change-event-source-coordinator | io.debezium.pipeline.signal.SignalProcessor.processSignal(SignalProcessor.java:181)ogger{55} | Received signal id = '1012', type = 'log', data = 'LM_EMPTY_STRING'
2024-04-10 13:35:30,656  WARN | debezium-oracleconnector-test-oracle-change-event-source-coordinator | io.debezium.pipeline.signal.SignalProcessor.processSignal(SignalProcessor.java:195)ogger{55} | Signal '1012' has been received but the data 'LM_EMPTY_STRING' cannot be parsed
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'LM_EMPTY_STRING': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (String)"LM_EMPTY_STRING"; line: 1, column: 16]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:2082)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:808)
        at io.debezium.document.JacksonReader.parseDocument(JacksonReader.java:115)
        at io.debezium.document.JacksonReader.parse(JacksonReader.java:102)
        at io.debezium.document.JacksonReader.read(JacksonReader.java:57)
        at io.debezium.pipeline.signal.SignalProcessor.processSignal(SignalProcessor.java:189)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
        at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
        at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
        at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
        at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
        at io.debezium.pipeline.signal.SignalProcessor.lambda$processSourceSignal$4(SignalProcessor.java:155)
        at io.debezium.pipeline.signal.SignalProcessor.executeWithSemaphore(SignalProcessor.java:165)
        at io.debezium.pipeline.signal.SignalProcessor.processSourceSignal(SignalProcessor.java:149)
        at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:290)
        at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:79)
        at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:47)
        at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:271)
        at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.lambda$handleCommit$1(AbstractLogMinerEventProcessor.java:564)
        at io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer.dispatchChangeEvent(TransactionCommitConsumer.java:383)
        at io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer.accept(TransactionCommitConsumer.java:117)
        at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.handleCommit(AbstractLogMinerEventProcessor.java:580)
        at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:395)
        at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:320)
        at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:241)
        at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:246)
        at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:61)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:280)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197)
        at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:140)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

```

Regards,
Allain

Chris Cranford

unread,
Apr 10, 2024, 11:07:54 AMApr 10
to debe...@googlegroups.com
Hi Allain -

I know on Exadata, if this is an extended VARCHAR field, these are stored in the same logical way that CLOB data is stored, therefore, in order for the connector to correctly read the data and merge the operations into a single logical event, you will need to enable the `lob.enabled` feature. This is because we handle CLOB, BLOB, and XML data types uniquely different due to how they're materialized in LogMiner's data stream.  Since Extended Strings in Exadata work similarly, that's why this feature must be enabled; otherwise you will receive multiple events for tables with such columns, as the insert is technically a multi-step process that gets recorded as multiple steps in the redo logs.

Thanks,
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/9d366fce-979b-45c4-80a9-d138f4e1516dn%40googlegroups.com.

Message has been deleted
Message has been deleted

Allain Legacy

unread,
May 15, 2024, 2:56:00 PMMay 15
to debezium
Hi Chris,
Any other thoughts on this?  Using lob.enabled=true did not address the issue.  Still not able to get the signal processing to work because of the split INSERT+UPDATE SQL statement on the signal table.

Allain

Chris Cranford

unread,
May 15, 2024, 9:31:48 PMMay 15
to debe...@googlegroups.com
HI Allain -

Can you share the log entries logged by the AbstractLogMinerEventProcessor when TRACE is enabled and those events are first detected? 
I'm specifically looking for these log entries:

    DML: LogMinerEventRow{ ... }
        INSERT INTO ...
    DML: LogMinerEventRow{ ... }
        UPDATE ...

I can perhaps try and build a PR, but I'm afraid you would need to test it for us as we don't have a way to test with Exadata.

Thanks,
Chris

Allain Legacy

unread,
Jun 8, 2024, 8:29:29 PMJun 8
to debezium
Thanks Chris,
We decided to take the easier path and simply switch to using the Kafka signaling method which avoids this issue.

Allain

Reply all
Reply to author
Forward
0 new messages