We've noticed an issue with how the LogMiner CDC events are received from the signal table in Oracle that is leading to an exception in the signal processing code.
What appears to be happening is that when we do a single INSERT statement into the CDC table in Oracle what is received in the Debezium Oracle connector is two separate events -- a CREATE immediately followed by an UPDATE. The CREATE has 2 of the 3 fields set properly, but the 'data' field is set to "LM_EMPTY_STRING", then the UPDATE is received in which the 'data' field is set to what we had written in the INSERT statement. The signal processing code throws an exception because the string "LM_EMPTY_STRING" cannot be parsed as JSON, and then the UPDATE is ignored because only CREATE events are considered for signal processing.
Based on some cursory reading, this splitting of INSERT into separate CREATE and UPDATE events appears to be a "normal" thing in Oracle when dealing with larger VARCHAR2 attributes. We had originally defined it as 2048 bytes, then reduced it to 255 bytes, but the behaviour remained unchanged. We could further reduce it but it will be difficult to write the required JSON to trigger snapshots for multiple tables in one operation as we'll exceed the column size.
I'm by no means an Oracle expert so perhaps there is a setting to avoid this or a Debezium connector setting to handle this (although I didn't see anything in the code that could handle such a scenario).
Our CDC table is defined as follows:
```
SQL> describe CDC_SIGNAL;
Name Null? Type
----------------------------------------- -------- ----------------------------
ID NOT NULL VARCHAR2(42 CHAR)
TYPE NOT NULL VARCHAR2(32 CHAR)
DATA VARCHAR2(2048CHAR)
```
these are the 2 separate CDC events received:
CREATE:
```
2024-04-10 13:35:30,644 TRACE | debezium-oracleconnector-test-oracle-change-event-source-coordinator | io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:282)ogger{55} | Received change record Struct{after=Struct{ID=1012,TYPE=log,DATA=LM_EMPTY_STRING},source=Struct{version=2.6.0.Final,connector=oracle,name=test-oracle,ts_ms=1712756127000,db=OTNE,ts_us=1712756127000000,ts_ns=1712756127000000000,schema=MY_SCHEMA,table=CDC_SIGNAL,txId=05000d00970a0000,scn=4017799,commit_scn=4017832,rs_id=0x0002df.0002a957.0170,ssn=0,redo_thread=1,user_name=MY_SCHEMA},op=c,ts_ms=1712756130644,ts_us=1712756130644233,ts_ns=1712756130644233711} for CREATE operation on key Struct{ID=1012} with context OracleOffsetContext [scn=4017799, commit_scn=["4017832:1:05000d00970a0000"], lcr_position=null]
```
UPDATE:
```
2024-04-10 13:35:30,659 TRACE | debezium-oracleconnector-test-oracle-change-event-source-coordinator | io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:282)ogger{55} | Received change record Struct{before=Struct{ID=1012,TYPE=log},after=Struct{ID=1012,TYPE=log,DATA={"message": "Signal message at offset {}"}},source=Struct{version=2.6.0.Final,connector=oracle,name=test-oracle,ts_ms=1712756127000,db=OTNE,ts_us=1712756127000000,ts_ns=1712756127000000000,schema=MY_SCHEMA,table=CDC_SIGNAL,txId=05000d00970a0000,scn=4017799,commit_scn=4017832,rs_id=0x0002df.0002a958.0150,ssn=0,redo_thread=1,user_name=MY_SCHEMA},op=u,ts_ms=1712756130659,ts_us=1712756130659681,ts_ns=1712756130659681284} for UPDATE operation on key Struct{ID=1012} with context OracleOffsetContext [scn=4017799, commit_scn=["4017832:1:05000d00970a0000"], lcr_position=null]
```
and this is the exception raised:
```
2024-04-10 13:35:30,647 DEBUG | debezium-oracleconnector-test-oracle-change-event-source-coordinator | io.debezium.pipeline.signal.SignalProcessor.processSignal(SignalProcessor.java:181)ogger{55} | Received signal id = '1012', type = 'log', data = 'LM_EMPTY_STRING'
2024-04-10 13:35:30,656 WARN | debezium-oracleconnector-test-oracle-change-event-source-coordinator | io.debezium.pipeline.signal.SignalProcessor.processSignal(SignalProcessor.java:195)ogger{55} | Signal '1012' has been received but the data 'LM_EMPTY_STRING' cannot be parsed
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'LM_EMPTY_STRING': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (String)"LM_EMPTY_STRING"; line: 1, column: 16]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:2082)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:808)
at io.debezium.document.JacksonReader.parseDocument(JacksonReader.java:115)
at io.debezium.document.JacksonReader.parse(JacksonReader.java:102)
at io.debezium.document.JacksonReader.read(JacksonReader.java:57)
at io.debezium.pipeline.signal.SignalProcessor.processSignal(SignalProcessor.java:189)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:596)
at io.debezium.pipeline.signal.SignalProcessor.lambda$processSourceSignal$4(SignalProcessor.java:155)
at io.debezium.pipeline.signal.SignalProcessor.executeWithSemaphore(SignalProcessor.java:165)
at io.debezium.pipeline.signal.SignalProcessor.processSourceSignal(SignalProcessor.java:149)
at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:290)
at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:79)
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:47)
at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:271)
at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.lambda$handleCommit$1(AbstractLogMinerEventProcessor.java:564)
at io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer.dispatchChangeEvent(TransactionCommitConsumer.java:383)
at io.debezium.connector.oracle.logminer.processor.TransactionCommitConsumer.accept(TransactionCommitConsumer.java:117)
at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.handleCommit(AbstractLogMinerEventProcessor.java:580)
at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:395)
at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:320)
at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:241)
at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:246)
at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:61)
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:280)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:140)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
```
Regards,
Allain