Debezium with Infinispan problems

517 views
Skip to first unread message

Jiri Kulhanek

unread,
Oct 31, 2023, 5:29:45 AM10/31/23
to debezium
Hello,

We are trying to run Debezium on top of a production Oracle DB with high data throughput and some long running transactions. We are using Debezium 2.4.0.Final embedded into our application.
We aim for a design when embedded Debezium is run in streaming like fashion and not consuming huge amount of memory. As such we have already discovered we need to use Infinispan to avoid caching of the whole transaction in memory until it is committed.
However with that, there was a myriad of other problems we encountered with Infinispan. Lot of it boils down to proper configuration and there is also a source code PR we're planning to contribute once we test it properly. However what is currently killing our solution are intermittent errors from Infinispan along these lines:

Mining session stopped due to error.java.lang.RuntimeException: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Error reading header from 248:4376702 | 0 at org.infinispan.util.concurrent.CompletionStages.join(CompletionStages.java:87) at org.infinispan.stream.impl.AbstractCacheStream.performPublisherOperation(AbstractCacheStream.java:200) at org.infinispan.stream.impl.DistributedCacheStream.collect(DistributedCacheStream.java:279) at org.infinispan.util.AbstractDelegatingCacheStream.collect(AbstractDelegatingCacheStream.java:280) at io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.getTransactionKeysWithPrefix(AbstractInfinispanLogMinerEventProcessor.java:153) at io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.removeEventWithRowId(AbstractInfinispanLogMinerEventProcessor.java:104) at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.handleDataEvent(AbstractLogMinerEventProcessor.java:1040) at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:366) at io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.processRow(AbstractInfinispanLogMinerEventProcessor.java:163) at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:289) at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:219) at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:241) at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:62) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:272) at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalStateException: Error reading header from 248:4376702 | 0 at org.infinispan.persistence.sifs.IndexNode$LeafNode.getHeaderAndKey(IndexNode.java:1115) at org.infinispan.persistence.sifs.IndexNode.recursiveNode(IndexNode.java:1307) at org.infinispan.persistence.sifs.IndexNode.recursiveNode(IndexNode.java:1249) at org.infinispan.persistence.sifs.IndexNode.recursiveNode(IndexNode.java:1249) at org.infinispan.persistence.sifs.IndexNode.lambda$publish$6(IndexNode.java:1227) at org.infinispan.reactive.FlowableCreate$BaseEmitter.attemptSubscribe(FlowableCreate.java:338) at org.infinispan.reactive.FlowableCreate$BaseEmitter.request(FlowableCreate.java:367) at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.setSubscription(SubscriptionArbiter.java:87) at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapInner.onSubscribe(FlowableConcatMap.java:556) at org.infinispan.reactive.FlowableCreate.subscribeActual(FlowableCreate.java:88) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863) at io.reactivex.rxjava3.internal.operators.flowable.FlowableDefer.subscribeActual(FlowableDefer.java:42) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863) at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.drain(FlowableConcatMap.java:318) at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$BaseConcatMapSubscriber.onSubscribe(FlowableConcatMap.java:124) at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromArray.subscribeActual(FlowableFromArray.java:39) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863) at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap.subscribeActual(FlowableConcatMap.java:66) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.internal.jdk8.FlowableMapOptional.subscribeActual(FlowableMapOptional.java:47) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863) at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatArray$ConcatArraySubscriber.onComplete(FlowableConcatArray.java:141) at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatArray.subscribeActual(FlowableConcatArray.java:41) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863) at io.reactivex.rxjava3.internal.operators.flowable.FlowableDefer.subscribeActual(FlowableDefer.java:42) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917) at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863) at io.reactivex.rxjava3.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82) at io.reactivex.rxjava3.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:324) at io.reactivex.rxjava3.internal.schedulers.ExecutorScheduler$ExecutorWorker.runEager(ExecutorScheduler.java:289) at io.reactivex.rxjava3.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:250) at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736) at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35) at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:1982) at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1486) at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1377)

Do you have any experience with this? How to avoid it?
Unfortunately google query does not return anything.

Secondly, I'd like to also ask about the recentTransactions the EventProcessor is collecting. We have cases when the oldestSCN in buffers is very old and the recentTransactions collection is getting huge (millions of transactions). This leads to hundreds of MBs of memory consumption even with Infinispan (some indexes are still in memory). The root cause seems to be that some transaction is getting stuck in the active list. From DB side we don't see it so I'm wondering if it can happen that Debezium misses the Commit event.
While investigating this I discovered that log.mining.transaction.retention.ms is not implemented for Infinispan EventProcessor. Why is that? What is the solution to problem described above?

Below is my complete debezium config for reference and also a some graphs to give some perspective.

Any guidance will be highly appreciated.

Thanks,
Jiri

connector.class = io.debezium.connector.oracle.OracleConnector
   offset.storage.jdbc.password = ********
   schema.history.internal.jdbc.password = ********
   log.mining.buffer.infinispan.cache.global =
<infinispan>  <threads>    
<blocking-bounded-queue-thread-pool  max-threads=\"10\"   name=\"debeziumcdcservice\"    keepalive-time=\"10000\"   queue-length=\"5000\" />  
</threads></infinispan>
   schema.history.internal.jdbc.schema.history.table.ddl = CREATE TABLE %s(id VARCHAR(36) NOT NULL,history_data CLOB,history_data_seq INTEGER,record_insert_ts TIMESTAMP NOT NULL,record_insert_seq INTEGER NOT NULL)
   offset.storage.jdbc.url = ******
   log.mining.strategy = online_catalog
   schema.history.internal.store.only.captured.tables.ddl = true
   schema.history.internal.store.only.captured.databases.ddl = true
   topic.prefix = test_run_123
   decimal.handling.mode = STRING
   offset.storage.file.filename =
   signal.data.collection = PRC02PRD.c##debezuser.DEBEZIUM_SIGNAL
   schema.history.internal.jdbc.schema.history.table.name = DEBEZIUM_DATABASE_HISTORY
   offset.storage.jdbc.offset.table.name = DEBEZIUM_OFFSET_STORAGE
   errors.retry.delay.initial.ms = 300
   schema.history.internal.jdbc.user = c##debezuser
   value.converter = org.apache.kafka.connect.json.JsonConverter
   log.mining.buffer.infinispan.cache.schema_changes = <local-cache name=\"schema_changes\"><persistence passivation=\"false\">\t<file-store read-only=\"false\" preload=\"true\" shared=\"false\" compaction-threshold=\"0.5\">\t\t<data max-file-size=\"16777216\" path=\"/egateshare/data/test_run_123/infinispan/infinispan-schema_changes-data-89431\"/>\t\t<index path=\"/egateshare/data/test_run_123/infinispan/infinispan-schema_changes-index-89431\"/>\t</file-store></persistence><memory max-count=\"10000\"/></local-cache>
   schema.history.internal.jdbc.url = ******
   key.converter = org.apache.kafka.connect.json.JsonConverter
   offset.commit.policy = io.debezium.engine.spi.OffsetCommitPolicy$PeriodicCommitOffsetPolicy
   database.user = c##debezuser
   database.dbname = svcPRC02PRD
   offset.storage = io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore
   database.pdb.name = PRC02PRD
   log.mining.buffer.type = infinispan_embedded
   database.url =******
   offset.flush.timeout.ms = 5000
   errors.retry.delay.max.ms = 10000
   schema.history.internal.skip.unparseable.ddl = true
   offset.flush.interval.ms = 60000
   schema.history.internal = io.debezium.storage.jdbc.history.JdbcSchemaHistory
   log.mining.buffer.infinispan.cache.processed_transactions = <local-cache name=\"processed_transactions\"><persistence passivation=\"false\">\t<file-store read-only=\"false\" preload=\"true\" shared=\"false\" compaction-threshold=\"0.5\">\t\t<data max-file-size=\"16777216\" path=\"/egateshare/data/test_run_123/infinispan/infinispan-processed_transactions-data-89431\"/>\t\t<index path=\"/egateshare/data/test_run_123/infinispan/infinispan-processed_transactions-index-89431\"/>\t</file-store></persistence><memory max-count=\"10000\"/></local-cache>
   schema.history.internal.jdbc.schema.history.table.select = SELECT * FROM %s where ROWNUM <= 1
   errors.max.retries = -1
   log.mining.query.filter.mode = in
   database.password = ********
   name = debezium-oracle:debezium_interfaceId_89431
   log.mining.buffer.infinispan.cache.events = <local-cache name=\"events\"><persistence passivation=\"false\">\t<file-store read-only=\"false\" preload=\"true\" shared=\"false\" compaction-threshold=\"0.5\">\t\t<data max-file-size=\"16777216\" path=\"/egateshare/data/test_run_123/infinispan/infinispan-events-data-89431\"/>\t\t<index path=\"/egateshare/data/test_run_123/infinispan/infinispan-events-index-89431\"/>\t</file-store></persistence><memory max-count=\"10000\"/></local-cache>
   offset.storage.jdbc.user = c##debezuser
   log.mining.buffer.infinispan.cache.transactions = <local-cache name=\"transactions\"><persistence passivation=\"true\">\t<file-store read-only=\"false\" preload=\"true\" shared=\"false\" compaction-threshold=\"0.5\">\t\t<data max-file-size=\"16777216\" path=\"/egateshare/data/test_run_123/infinispan/infinispan-transactions-data-89431\"/>\t\t<index path=\"/egateshare/data/test_run_123/infinispan/infinispan-transactions-index-89431\"/>\t</file-store></persistence><memory max-count=\"10000\"/></local-cache>
   table.include.list = ****
   snapshot.mode = schema_only
2023-10-31 10_11_31-MDW Dashboard.png

Chris Cranford

unread,
Nov 1, 2023, 2:46:21 AM11/1/23
to debe...@googlegroups.com
Hi Jiri -

Are you using Infinispan in embedded mode or are you using Infinispan with a remote cluster?  I have a current PR [1] which addresses a number of performance optimizations already around Infinispan, and in particular situations where the underlying iterators for the Infinispan data set wasn't being managed properly.  I had some discussions with the Infinispan team and we discovered that the local embedded caches are properly dealing with this but that the RemoteCache implementation was not.  In aforementioned PR applies the fixes for both in case there are potentially loop holes in the embedded mode, so I'd suggest if you could integrate that into your PoC, that could potentially help.

To your second question, the main reason why the retention configuration was added was as a way to safeguard the heap-based processor from hitting OutOfMemory errors.  With the non-heap implementation, this shouldn't really be necessary, but I can certainly see that whether there is something amiss that having that option for this situation could prove helpful.  Certainly welcome a PR to introduce it.

As to your findings with the stuck transaction, are you on Oracle RAC or standalone?  Does your configuration enable LOB?  And finally, since you have Infinispan in the picture, if you could ask the DBA to give you a LogMiner dump of the transaction that is stuck and compare that with the details in the Infinispan cache to determine what SCN(s) were missed, perhaps we can compare that against the connector logs to determine what could have caused that.  As your data loads are high, I'm not sure whether you can afford DEBUG logging, but if the situation is frequent enough, perhaps you can grab such a log with a given transaction so we have all the detail from the connector's point of view, then bounce that against the differences from the DBA's LogMiner output vs Infinispan caches?

Thanks,
Chris

[1]: https://github.com/debezium/debezium/pull/4944
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/3ea7bee0-0c33-455f-b294-a2b71c4a31b9n%40googlegroups.com.

Jiri Kulhanek

unread,
Nov 1, 2023, 3:42:20 AM11/1/23
to debezium
Hi Chris,

great to see this PR.  I'm happy to pull in your PR and run our tests with it.  We have Infinispan embedded mode. However I'm not sure what kind of loop holes to watch for.
Here is the optimization which made really huge difference for our case: https://github.com/debezium/debezium/compare/main...koszta5:debezium:inMemoryCacheToSpeedUpInfiniSpan
Developed by my colleague, this enabled the throughput we have now. Without it, the Debezium was not able to catch-up at all with the data from the source. Another improvement was also to use logminer filtering at DB side (  log.mining.query.filter.mode = in ).
Can you please review our code change?

To your other questions: We are on Oracle RAC and don't have LOB enabled.

I will try to get the Oracle dump next time this happens so we can run those comparisons. However right now I have trouble replicating mostly because my test runs usually crash on the " Error reading header from 248:4376702 " exception after several hours. Do you know what this exception signals and what could be the prevention? You can see my infinispan setting in the config I provided. Do you see anything suspicious? I'm having the transaction cache using passivation (otherwise it was getting too big in memory) and I setup " <memory max-count=\"10000\"/> " to all caches, otherwise I was getting " Too many records for this key (short overflow)" Infinispan exception.

Thanks,
Jiri

Chris Cranford

unread,
Nov 1, 2023, 3:58:00 AM11/1/23
to debe...@googlegroups.com
Hi Jiri -

That's an interesting idea to use the heap for that, though I'm curious what, if any, impact does that have when the connector is stopped and restarted mid-transactions since that cache isn't persistent.

Well looking at the Infinispan source, this error appears when decoding the entry header in the index file 0, offset 248 with file size of 4,376,702 bytes. I've put the question to the Infinispan team and once I hear a response, I can let you know or you're welcomed to join the conversation here [1] on their Zulip.

Thanks,
Chris

[1]: https://infinispan.zulipchat.com/#narrow/stream/118645-infinispan/topic/Error.3A.20.22reading.20header.20from.20248.3A4376702.22/near/399675625

Chris Cranford

unread,
Nov 1, 2023, 5:28:30 AM11/1/23
to debe...@googlegroups.com
Jiri -

Can you supply your full cache configuration either here or in the Zulip chat link provided below?

Thanks,
Chris

Jiri Kulhanek

unread,
Nov 1, 2023, 2:53:11 PM11/1/23
to debezium
Hi Chris,

I run Debezium with your branch DBZ-7047 and it does not help with the slow performance we are seeing. 
In these charts you can see first your PR and second is with our modification with in memory map for transaction look-up (on top of your branch).
Btw, you are correctly pointing out that our solution might have troubles during restart. Good point. But it should be easily solvable. E.g. the memory map can be restored from Infinispan cache which is still there.

2023-11-01 19_51_34-DebeziumCustom - Dashboards - Grafana — Mozilla Firefox.png

Thanks,
Jiri

Chris Cranford

unread,
Nov 2, 2023, 7:45:22 AM11/2/23
to debe...@googlegroups.com
Hi Jiri -

Did William and the ISPN team give you any other great information yesterday to help? I know the thread got quite long with technical details.

Chris

Jiri Kulhanek

unread,
Nov 2, 2023, 10:32:41 AM11/2/23
to debezium
Hi Chris,

the ISPN team recommended upgrade (there is some fix for this in 4.0.18 and more in upcoming 4.0.20) and to disable Expiration Reaper.
So I was running my test with 4.0.20.final version and this setting: <expiration interval="-1"/> under "<local-cache..." element.

Nevertheless that was not very successful so far. I got this error from Infinispan:
Error encountered with index, SIFS may not operate properly.java.lang.RuntimeException: java.lang.NegativeArraySizeException: -1 at org.infinispan.persistence.sifs.IndexNode.commonPrefix(IndexNode.java:989)

Moreover, Debezium did not shutdown itself after this error. Instead it got stuck on this (the thread ...-change-event-source-coordinator  is parked here):

java...@11.0.19/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1868)
java...@11.0.19/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2021)
org.infinispan.commons.util.concurrent.CompletableFutures.await(CompletableFutures.java:130)
org.infinispan.interceptors.impl.SimpleAsyncInvocationStage.get(SimpleAsyncInvocationStage.java:36)
org.infinispan.interceptors.impl.AsyncInterceptorChainImpl.invoke(AsyncInterceptorChainImpl.java:249)
org.infinispan.cache.impl.InvocationHelper.doInvoke(InvocationHelper.java:323)
org.infinispan.cache.impl.InvocationHelper.invoke(InvocationHelper.java:111)
org.infinispan.cache.impl.InvocationHelper.invoke(InvocationHelper.java:93)
org.infinispan.cache.impl.CacheImpl.remove(CacheImpl.java:690)
org.infinispan.cache.impl.CacheImpl.remove(CacheImpl.java:684)
org.infinispan.cache.impl.EncoderCache.remove(EncoderCache.java:786)
io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.removeEventsWithTransaction(AbstractInfinispanLogMinerEventProcessor.java:415)
io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.removeTransactionAndEventsFromCache(AbstractInfinispanLogMinerEventProcessor.java:200)
io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.removeTransactionAndEventsFromCache(AbstractInfinispanLogMinerEventProcessor.java:48)
io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.handleCommit(AbstractLogMinerEventProcessor.java:557)
io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:340)
io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.processRow(AbstractInfinispanLogMinerEventProcessor.java:170)
io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:292)
io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:220)
io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:247)
io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:61)
io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:272)
io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197)
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137)

Thanks,
Jiri

Jiri Kulhanek

unread,
Nov 2, 2023, 10:33:27 AM11/2/23
to debezium
correction: I'm running 4.0.19.final as 4.0.20 is not ready yet.

Jiri Kulhanek

unread,
Nov 3, 2023, 6:23:46 AM11/3/23
to debezium
Hi Chris,

an update: There was another error with 4.0.19, something introduced in 4.0.18 so I downgraded to 4.0.17 and run just with the expiration reaper disabled.
However that did not help either, the original problem is still there after some 15 hours and 1mil records collected.
I asked in the ISPN forum again.
Reply all
Reply to author
Forward
0 new messages