Hello,
We are trying to run Debezium on top of a production Oracle DB with high data throughput and some long running transactions. We are using Debezium 2.4.0.Final embedded into our application.
We aim for a design when embedded Debezium is run in streaming like fashion and not consuming huge amount of memory. As such we have already discovered we need to use Infinispan to avoid caching of the whole transaction in memory until it is committed.
However with that, there was a myriad of other problems we encountered with Infinispan. Lot of it boils down to proper configuration and there is also a source code PR we're planning to contribute once we test it properly. However what is currently killing our solution are intermittent errors from Infinispan along these lines:
Mining session stopped due to error.java.lang.RuntimeException: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Error reading header from 248:4376702 | 0
at org.infinispan.util.concurrent.CompletionStages.join(CompletionStages.java:87)
at org.infinispan.stream.impl.AbstractCacheStream.performPublisherOperation(AbstractCacheStream.java:200)
at org.infinispan.stream.impl.DistributedCacheStream.collect(DistributedCacheStream.java:279)
at org.infinispan.util.AbstractDelegatingCacheStream.collect(AbstractDelegatingCacheStream.java:280)
at io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.getTransactionKeysWithPrefix(AbstractInfinispanLogMinerEventProcessor.java:153)
at io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.removeEventWithRowId(AbstractInfinispanLogMinerEventProcessor.java:104)
at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.handleDataEvent(AbstractLogMinerEventProcessor.java:1040)
at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processRow(AbstractLogMinerEventProcessor.java:366)
at io.debezium.connector.oracle.logminer.processor.infinispan.AbstractInfinispanLogMinerEventProcessor.processRow(AbstractInfinispanLogMinerEventProcessor.java:163)
at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.processResults(AbstractLogMinerEventProcessor.java:289)
at io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor.process(AbstractLogMinerEventProcessor.java:219)
at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:241)
at io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:62)
at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:272)
at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:197)
at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:137)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: Error reading header from 248:4376702 | 0
at org.infinispan.persistence.sifs.IndexNode$LeafNode.getHeaderAndKey(IndexNode.java:1115)
at org.infinispan.persistence.sifs.IndexNode.recursiveNode(IndexNode.java:1307)
at org.infinispan.persistence.sifs.IndexNode.recursiveNode(IndexNode.java:1249)
at org.infinispan.persistence.sifs.IndexNode.recursiveNode(IndexNode.java:1249)
at org.infinispan.persistence.sifs.IndexNode.lambda$publish$6(IndexNode.java:1227)
at org.infinispan.reactive.FlowableCreate$BaseEmitter.attemptSubscribe(FlowableCreate.java:338)
at org.infinispan.reactive.FlowableCreate$BaseEmitter.request(FlowableCreate.java:367)
at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.setSubscription(SubscriptionArbiter.java:87)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapInner.onSubscribe(FlowableConcatMap.java:556)
at org.infinispan.reactive.FlowableCreate.subscribeActual(FlowableCreate.java:88)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableDefer.subscribeActual(FlowableDefer.java:42)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.drain(FlowableConcatMap.java:318)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$BaseConcatMapSubscriber.onSubscribe(FlowableConcatMap.java:124)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableFromArray.subscribeActual(FlowableFromArray.java:39)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap.subscribeActual(FlowableConcatMap.java:66)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
at io.reactivex.rxjava3.internal.jdk8.FlowableMapOptional.subscribeActual(FlowableMapOptional.java:47)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatArray$ConcatArraySubscriber.onComplete(FlowableConcatArray.java:141)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatArray.subscribeActual(FlowableConcatArray.java:41)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableDefer.subscribeActual(FlowableDefer.java:42)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15917)
at io.reactivex.rxjava3.core.Flowable.subscribe(Flowable.java:15863)
at io.reactivex.rxjava3.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
at io.reactivex.rxjava3.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:324)
at io.reactivex.rxjava3.internal.schedulers.ExecutorScheduler$ExecutorWorker.runEager(ExecutorScheduler.java:289)
at io.reactivex.rxjava3.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:250)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1736)
at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:1982)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1486)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1377)
Do you have any experience with this? How to avoid it?
Unfortunately google query does not return anything.
Secondly, I'd like to also ask about the recentTransactions the EventProcessor is collecting. We have cases when the oldestSCN in buffers is very old and the recentTransactions collection is getting huge (millions of transactions). This leads to hundreds of MBs of memory consumption even with Infinispan (some indexes are still in memory). The root cause seems to be that some transaction is getting stuck in the active list. From DB side we don't see it so I'm wondering if it can happen that Debezium misses the Commit event.
While investigating this I discovered that
log.mining.transaction.retention.ms is not implemented for Infinispan EventProcessor. Why is that? What is the solution to problem described above?
Below is my complete debezium config for reference and also a some graphs to give some perspective.
Any guidance will be highly appreciated.
Thanks,
Jiri
connector.class = io.debezium.connector.oracle.OracleConnector
offset.storage.jdbc.password = ********
schema.history.internal.jdbc.password = ********
log.mining.buffer.infinispan.cache.global =
<infinispan> <threads>
<blocking-bounded-queue-thread-pool max-threads=\"10\" name=\"debeziumcdcservice\" keepalive-time=\"10000\" queue-length=\"5000\" />
</threads></infinispan>
schema.history.internal.jdbc.schema.history.table.ddl = CREATE TABLE %s(id VARCHAR(36) NOT NULL,history_data CLOB,history_data_seq INTEGER,record_insert_ts TIMESTAMP NOT NULL,record_insert_seq INTEGER NOT NULL)
offset.storage.jdbc.url = ******
log.mining.strategy = online_catalog
schema.history.internal.store.only.captured.tables.ddl = true
schema.history.internal.store.only.captured.databases.ddl = true
topic.prefix = test_run_123
decimal.handling.mode = STRING
offset.storage.file.filename =
signal.data.collection = PRC02PRD.c##debezuser.DEBEZIUM_SIGNAL
schema.history.internal.jdbc.schema.history.table.name = DEBEZIUM_DATABASE_HISTORY
offset.storage.jdbc.offset.table.name = DEBEZIUM_OFFSET_STORAGE
errors.retry.delay.initial.ms = 300
schema.history.internal.jdbc.user = c##debezuser
value.converter = org.apache.kafka.connect.json.JsonConverter
log.mining.buffer.infinispan.cache.schema_changes = <local-cache name=\"schema_changes\"><persistence passivation=\"false\">\t<file-store read-only=\"false\" preload=\"true\" shared=\"false\" compaction-threshold=\"0.5\">\t\t<data max-file-size=\"16777216\" path=\"/egateshare/data/test_run_123/infinispan/infinispan-schema_changes-data-89431\"/>\t\t<index path=\"/egateshare/data/test_run_123/infinispan/infinispan-schema_changes-index-89431\"/>\t</file-store></persistence><memory max-count=\"10000\"/></local-cache>
schema.history.internal.jdbc.url = ******
key.converter = org.apache.kafka.connect.json.JsonConverter
offset.commit.policy = io.debezium.engine.spi.OffsetCommitPolicy$PeriodicCommitOffsetPolicy
database.user = c##debezuser
database.dbname = svcPRC02PRD
offset.storage = io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore
database.pdb.name = PRC02PRD
log.mining.buffer.type = infinispan_embedded
database.url =******
offset.flush.timeout.ms = 5000
errors.retry.delay.max.ms = 10000
schema.history.internal.skip.unparseable.ddl = true
offset.flush.interval.ms = 60000
schema.history.internal = io.debezium.storage.jdbc.history.JdbcSchemaHistory
log.mining.buffer.infinispan.cache.processed_transactions = <local-cache name=\"processed_transactions\"><persistence passivation=\"false\">\t<file-store read-only=\"false\" preload=\"true\" shared=\"false\" compaction-threshold=\"0.5\">\t\t<data max-file-size=\"16777216\" path=\"/egateshare/data/test_run_123/infinispan/infinispan-processed_transactions-data-89431\"/>\t\t<index path=\"/egateshare/data/test_run_123/infinispan/infinispan-processed_transactions-index-89431\"/>\t</file-store></persistence><memory max-count=\"10000\"/></local-cache>
schema.history.internal.jdbc.schema.history.table.select = SELECT * FROM %s where ROWNUM <= 1
errors.max.retries = -1
log.mining.query.filter.mode = in
database.password = ********
name = debezium-oracle:debezium_interfaceId_89431
log.mining.buffer.infinispan.cache.events = <local-cache name=\"events\"><persistence passivation=\"false\">\t<file-store read-only=\"false\" preload=\"true\" shared=\"false\" compaction-threshold=\"0.5\">\t\t<data max-file-size=\"16777216\" path=\"/egateshare/data/test_run_123/infinispan/infinispan-events-data-89431\"/>\t\t<index path=\"/egateshare/data/test_run_123/infinispan/infinispan-events-index-89431\"/>\t</file-store></persistence><memory max-count=\"10000\"/></local-cache>
offset.storage.jdbc.user = c##debezuser
log.mining.buffer.infinispan.cache.transactions = <local-cache name=\"transactions\"><persistence passivation=\"true\">\t<file-store read-only=\"false\" preload=\"true\" shared=\"false\" compaction-threshold=\"0.5\">\t\t<data max-file-size=\"16777216\" path=\"/egateshare/data/test_run_123/infinispan/infinispan-transactions-data-89431\"/>\t\t<index path=\"/egateshare/data/test_run_123/infinispan/infinispan-transactions-index-89431\"/>\t</file-store></persistence><memory max-count=\"10000\"/></local-cache>
table.include.list = ****
snapshot.mode = schema_only 