Callback on MySQL Snapshot Completion /w Debezium Embedded

165 views
Skip to first unread message

Alan Shreve

unread,
Jul 20, 2017, 11:49:45 AM7/20/17
to debe...@googlegroups.com
Hi there -

I'm attempting to use the debezium embedded library with a MySQL database to keep the full state of the database in application memory (it's a small dataset: a single table with < 100k rows).

The application server has the requirement that it must have all of the state before it can process any requests. I've run into a problem in that there is no way to register a callback to be notified when the snapshot is complete. Some of the following workarounds I've considered are:

1) Waiting for the first non-snapshot record. This introduces a non-deterministic amount of time because it will not know snapshot completion for a table that goes a long period without any mutations

2) Forking the code. I'd rather avoid this in order make sure I can pull in the latest fixes.

3) Using a terrible, terrible hack for prototyping purposes where I've registered a log4j appender that watches for the "Completed writing all snapshot records" log record. Obviously no good for production because it's not a supported API and it races with multiple connectors.

Am I missing an easier way to do this? If not, would you be receptive to a patch for it, and if so, where is the most appropriate place in the code to add this functionality?

thank you!

Alan Shreve

unread,
Jul 20, 2017, 2:20:22 PM7/20/17
to debe...@googlegroups.com
So i've discovered SnapshotMode.INITIAL_ONLY which appears to be exactly what I need except that then the embedded engine spins forever:

2017-07-20 17:57:33 DEBUG EmbeddedEngine:648 - Embedded engine returned from polling task for records
2017-07-20 17:57:33 DEBUG EmbeddedEngine:684 - Received no records from the task
2017-07-20 17:57:33 DEBUG EmbeddedEngine:646 - Embedded engine is polling task for records on thread Thread[pool-4-thread-1,5,main]
2017-07-20 17:57:33 DEBUG EmbeddedEngine:648 - Embedded engine returned from polling task for records
2017-07-20 17:57:33 DEBUG EmbeddedEngine:684 - Received no records from the task
2017-07-20 17:57:33 DEBUG EmbeddedEngine:646 - Embedded engine is polling task for records on thread Thread[pool-4-thread-1,5,main]
2017-07-20 17:57:33 DEBUG EmbeddedEngine:648 - Embedded engine returned from polling task for records
2017-07-20 17:57:33 DEBUG EmbeddedEngine:684 - Received no records from the task
2017-07-20 17:57:33 DEBUG EmbeddedEngine:646 - Embedded engine is polling task for records on thread Thread[pool-4-thread-1,5,main]

etc ...

I'd be happy to submit a patch with a little guidance from the maintainers on how best to break that loop. Some possibilities I'm considering are:

- Making ChainedReader return a sentinel record and modifying EmbeddedEngine to check for it
- Making ChainedReader throw an InterruptedException

Alan Shreve

unread,
Jul 20, 2017, 2:54:13 PM7/20/17
to debe...@googlegroups.com
The patch I eventually settled on to make this work. Happy to open a DBZ ticket to get this merged in if it's an reasonable approach.

diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
index 9482e24..d60e759 100644
--- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
+++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlConnectorTask.java
@@ -197,6 +197,10 @@ public synchronized void start(Map<String, String> props) {

     @Override
     public List<SourceRecord> poll() throws InterruptedException {
+        if (readers.state() == Reader.State.STOPPED) {
+            throw new InterruptedException();
+        }
+
         Reader currentReader = readers;
         if (currentReader == null) {
             return null;
Reply all
Reply to author
Forward
0 new messages