SignalProcessor thread synchronization

43 views
Skip to first unread message

Kate Galieva

unread,
Nov 15, 2023, 11:12:38 PM11/15/23
to debezium
Hey Debezium team!

I'm trying to figure out the high level plan for the binlog and the signals processing threads coordination.
AbstractIncrementalSnapshotContext is used by two threads - the main binlog reading thread and by SignalProcessor thread.
SignalProcessor thread can call AbstractIncrementalSnapshotContext's stopSnapshot while the binlog thread is in the middle of the AbstractIncrementalSnapshotChangeEventSource.readChunk().
An exception that I'm getting:

 org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:67)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:428)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$28(MySqlStreamingChangeEventSource.java:980)
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1263)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1089)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:648)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:949)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.debezium.DebeziumException: Error processing binlog event
... 7 more
Caused by: java.lang.NullPointerException: Cannot invoke "io.debezium.pipeline.source.snapshot.incremental.DataCollection.getId()" because the return value of "io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext.currentDataCollectionId()" is null
at io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.notifyInProgress(IncrementalSnapshotNotificationService.java:132)
at io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotChangeEventSource.readChunk(AbstractIncrementalSnapshotChangeEventSource.java:399)
at io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotChangeEventSource.readUntilGtidChange(MySqlReadOnlyIncrementalSnapshotChangeEventSource.java:127)
at io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotChangeEventSource.processTransactionCommittedEvent(MySqlReadOnlyIncrementalSnapshotChangeEventSource.java:166)
at io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotChangeEventSource.processTransactionCommittedEvent(MySqlReadOnlyIncrementalSnapshotChangeEventSource.java:80)
at io.debezium.pipeline.EventDispatcher.dispatchTransactionCommittedEvent(EventDispatcher.java:335)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleTransactionCompletion(MySqlStreamingChangeEventSource.java:665)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:962)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:408)
... 6 more


Technically, the race condition can lead to the incorrect queries as well. For example if the context changes in the middle of readChunk() then the additional condition will be picked from the collection different than the one cached in the currentTable:
final String selectStatement = buildChunkQuery(currentTable, context.currentDataCollectionId().getAdditionalCondition());      

Could you please share your thoughts on the threads synchronization for SignalProcessor?

Thanks!

jiri.p...@gmail.com

unread,
Nov 16, 2023, 1:17:49 AM11/16/23
to debezium
Hi,

maybe the stopping should be done in a similar way as pausing. Instead of directly changing the internal state the stopSnapshot will just set a flag that snapshot is going to be stopped.

The snapshotRunning is IMHO run from only one thred. This method would check also the flag, and reset the internal state as well. It would also need to be renamed so it is clear that it is not only checking the state but doing changes too.

The other option is to modify readChunk methodto check if stopping flag is set. If it is then just set the state and return.

WDYT?

J.
Reply all
Reply to author
Forward
0 new messages