Debezium Engine - Blocking snapshots not producing data when no transactions present. (postgres)

445 views
Skip to first unread message

David Ešner

unread,
Jan 30, 2024, 7:58:31 AMJan 30
to debezium
Hi,

I am attempting to use Blocking Snapshot signals in order to force resync of table (after config change) on connector restart and in some cases it does not produce any data. (Postgres connector)

Context:
We are running Debezium Engine directly with custom ChangeConsumer processing the events. We intend to run the connector in micro-batch manner so it is stoped manually after some simple conditions are met and then continues from the saved offset when restarted. 

Everything works as expected, only when we change the connector configuration to add table (or want a manual resync) we produce the Blocking Snapshot event, it is registered, but it doesn't produce any data in some circumstances.

The snapshot produces data when:
  • Some transaction was produced in the monitored tables between last and current execution.
  • Some transaction was produced during last execution.
I noticed that in cases where there is no lastCommitLsn/lastCompletelyProcessedLsn in the offset the snapshot does not produce any data. The stream syncing works fine even in this case. 

I am most likely missing some logic of how the snapshot / offsets work. 

Configuration:
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=/path-to-config/state.dat
offset.flush.interval.ms=0
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=localhost
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=postgres
snapshot.mode=initial
schema.include.list=inventory
table.include.list=inventory.customers,inventory.orders,inventory.products
publication.autocreate.mode=filtered
plugin.name=pgoutput
snapshot.max.threads=1
signal.enabled.channels=file
signal.file=/
path-to-config/signal-file.jsonl


Signal:
{"id":"d139b9b7-7777-4547-917d-11111111111", "type":"execute-snapshot", "data":{"type":"BLOCKING", "data-collections": ["inventory.products"]}}

Bellow I am attaching the logs from the run where snapshot worked and from the one where it didn't.


skipped_snapshot.log
snapshot_complete_after_event.log

jiri.p...@gmail.com

unread,
Jan 31, 2024, 2:47:54 AMJan 31
to debezium
Hi,

thanks for the report. Would it be possible to share the code implementing DebeziumEngine so we can exlude a possibility of an issue in it?

Thanks

Jiri

David Ešner

unread,
Jan 31, 2024, 3:39:43 AMJan 31
to debezium
Hi Jiri, thanks a lot for the quick response! The prototype code is available here

However, in the meantime I tried to test the same scenario with the latest Debezium server (debezium-server-dist-2.5.1.Final) to rule out issues in our custom implementation and the results were the same.

Here's also my testing debezium server config:
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=/data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.whitelist=inventory
debezium.source.table.include.list=inventory.customers,inventory.products
debezium.source.plugin.name=pgoutput
debezium.source.topic.prefix=testcdc

debezium.source.signal.enabled.channels=file
debezium.source.signal.file=/debezium-server/conf/signal-file.jsonl
debezium.sink.type=http
debezium.sink.http.url=http://localhost:8000/log_messages
debezium.sink.http.headers.Content-Type=application/json
debezium.format.value=json
debezium.format.key=json
quarkus.log.level=debug
quarkus.log.console.json=false


Also there may be something else wrong with it. I tried to change the signal type to INCREMENTAL instead:
`{"id":"d139b9b7-7777-4547-917d-11111111111", "type":"execute-snapshot", "data":{"type":"INCREMENTAL", "data-collections": ["inventory.products"]}}`

And this time it failed: io.debezium.DebeziumException: Incremental snapshot is not properly configured, either sinalling data collection is not provided or connector-specific snapshotting not set

I wasn't able to find out why.

Thank you


Dne středa 31. ledna 2024 v 8:47:54 UTC+1 uživatel jiri.p...@gmail.com napsal:

jiri.p...@gmail.com

unread,
Jan 31, 2024, 5:19:29 AMJan 31
to debezium
For incremental snapshots it is mandatory to have source signal channel enabled. It is used for an internal deduplication logic

J.

jiri.p...@gmail.com

unread,
Jan 31, 2024, 5:26:28 AMJan 31
to debezium
BTW, I wouldn't be surprised if the root cause is the issue https://issues.redhat.com/browse/DBZ-7312

J.

David Ešner

unread,
Feb 1, 2024, 1:42:48 AMFeb 1
to debezium
-- For incremental snapshots it is mandatory to have source signal channel enabled. It is used for an internal deduplication logic

Thank you so much for your advice, I didn't realise that.

I did some more experimenting and I found out the following:

The incremental snapshots work when I enable the source channel (and send the signal through it), e.g. create the data collection and adjust the config like that:
debezium.source.signal.enabled.channels=source,file
debezium.source.signal.data.collection=inventory.debezium_signal
  • The incremental snapshots then cannot be requested via file channel signal in this case. Otherwise it fails with the same error (Incremental snapshot is not properly configured, either sinalling data collection is not provided or connector-specific snapshotting not set)
  • The original issue with the Blocking snapshot is present only when I send the signals via the file channel. If I use the source data collection (table) than it performs the blocking signal as expected even if no data was returned between the past executions (connector restarts)
    • If I use the file channel than it behaves like described in the original problem.



Dne středa 31. ledna 2024 v 11:26:28 UTC+1 uživatel jiri.p...@gmail.com napsal:

David Esner

unread,
Feb 1, 2024, 1:43:17 AMFeb 1
to debezium
For incremental snapshots it is mandatory to have source signal channel enabled. It is used for an internal deduplication logic

Thank you very much, I didn't realise that. I followed the documentation and created the signal table, and enabled the source signal channel as shown bellow, but I still get the same error. Am I still missing something?

debezium.source.signal.enabled.channels=source,file
debezium.source.signal.data.collection=inventory.debezium_signal
debezium.source.signal.file=/debezium-server/conf/signal-file.jsonl

jiri.p...@gmail.com

unread,
Feb 1, 2024, 1:44:49 AMFeb 1
to debezium
Could you please share the complete log again now with the new setting?

J.

David Ešner

unread,
Feb 1, 2024, 1:54:32 AMFeb 1
to debe...@googlegroups.com
I am sorry for the duplicated message. I accidentally used another account and thought it just didn't go through :/. 

So the current state is that I can perform both incremental and blocking snapshots via signalling table. The problem is with the file signal channel as described above.

My current config:
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=/debezium_engine_wrapper/testing_config/state.dat
offset.flush.interval.ms=0
connector.class=io.debezium.connector.postgresql.PostgresConnector
database.hostname=localhost
database.port=5432
database.user=postgres
database.password=postgres
database.dbname=postgres
snapshot.mode=initial
schema.include.list=inventory
table.include.list=inventory.customers,inventory.orders,inventory.products
publication.autocreate.mode=filtered
plugin.name=pgoutput
snapshot.max.threads=1
signal.enabled.channels=file,source
signal.data.collection=inventory.debezium_signal
signal.file=/debezium_engine_wrapper/testing_config/signal-file.jsonl

I am attaching 2 log files. The one where blocking worked via signal table(fixes the original problem) + the one where incremental failed via file channel signal. (it works via signal table)


čt 1. 2. 2024 v 7:44 odesílatel jiri.p...@gmail.com <jiri.p...@gmail.com> napsal:
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/1f9d8d20-9026-4a28-aa02-0cb8b1e733f9n%40googlegroups.com.
incremental_via_file.log
blocking_via_signal_table.log

jiri.p...@gmail.com

unread,
Feb 1, 2024, 2:21:39 AMFeb 1
to debezium
It is really strange as the configuration looks to be perfectly correct :-(

Could you please try to use Debezium 2.5.1.Final (from the logs you still use 2.41.Final) and also try to enable `TRACE` levell logging for `io.debezium` category.
We need only the incremental snapshot case now.

Thanks

Jiri

David Ešner

unread,
Feb 1, 2024, 2:31:11 AMFeb 1
to debezium
Ok, including the log where I used the latest version and triggered the incremental snapshot via file channel. Configuration remained the same.


Dne čtvrtek 1. února 2024 v 8:21:39 UTC+1 uživatel jiri.p...@gmail.com napsal:
incremental_via_file.log

David Ešner

unread,
Feb 1, 2024, 2:33:53 AMFeb 1
to debezium
Oh, sorry I forgot to set the log level to trace. Attaching new log with TRACE level


Dne čtvrtek 1. února 2024 v 8:31:11 UTC+1 uživatel David Ešner napsal:
incremental_via_file.log

jiri.p...@gmail.com

unread,
Feb 1, 2024, 2:49:50 AMFeb 1
to debezium
So I believe you've found a bug - or more precisely timing issue.

Source channel is enabled only after the streaming is started. File based channel is started asynchronously and can start executing signals before the streaming phase is running.

To validate the hypothesis

1) Stop the connector
2) Clear file with signals
3) Start connector
4) Wait for the streaming to start - there will be a message like `Retrieved latest position from stored offset...` or `No previous LSN found in Kafka, ...`
5) Trigger the snapshot via file

If it is now working we identified the problem and can fix it

Jiri

David Ešner

unread,
Feb 1, 2024, 9:30:43 AMFeb 1
to debezium
Hi Jiri,

it seems you are right. I can confirm that when I insert the INCREMENTAL snapshot signal later into the file, it is processed just fine.

Thank you

Dne čtvrtek 1. února 2024 v 8:49:50 UTC+1 uživatel jiri.p...@gmail.com napsal:

Mario Fiore Vitale

unread,
Feb 1, 2024, 9:32:28 AMFeb 1
to debezium
Hi,

can you please open an issue https://issues.redhat.com/projects/DBZ/issues

Thanks. 

David Ešner

unread,
Feb 2, 2024, 3:29:30 AMFeb 2
to debezium
Submitted here https://issues.redhat.com/browse/DBZ-7441

Until the fix I can easily work around this problem by delaying the signal written to the file or by using the table source signalling
 
Thank you very much for all your help!



Dne čtvrtek 1. února 2024 v 15:32:28 UTC+1 uživatel mvi...@redhat.com napsal:

David Ešner

unread,
Mar 5, 2024, 5:22:41 AMMar 5
to debezium
Hi, if I may reuse this thread, after further testing we wanted to return to the BLOCKING snapshots after table addition and it still does not work for us as expected. The original issue when blocking snapshot is registered but not executed remains.

I observed that it the blocking snapshot works only after successful incremental snapshot is performed. This is my testing scenario to replicate the issue (Note I am using file signalling):

1. Run Debezium Server until init sync is finished
2. Stop
3. Run Debezium Server
4. Signal INCREMENTAL snapshot of table customers -> The changes are captured properly
5. Signal BLOCKING snapshot of table customers -> The changes are captured properly
6. Signal BLOCKING snapshot of table customers -> The changes are captured properly
7. Stop Debezium Server
8. Start Debezium Server
9. Signal BLOCKING snapshot of table customers -> The changes are not captured
10. Signal INCREMENTAL snapshot of table customers -> The changes are captured properly
11. Stop Debezium Server
12. Start Debezium Server
13. Signal BLOCKING snapshot of table customers -> The changes are captured properly

From what I observed the BLOCKING snapshots stop working after the connector is stopped unless the last snapshot was INCREMENTAL. If the last snapshot wasn't incremental, after restart the BLOCKING snapshots start working only after INCREMENTAL snapshot is triggered. 

BLOCKING snapshot file event:
{"id":"d139b9b7-7777-4547-917d-111111111", "type":"execute-snapshot", "data":{"type":"BLOCKING", "data-collections": ["inventory.customers"]}}

INCREMENTAL snapshot file event:
{"id":"d139b9b7-7777-4547-917d-111111111", "type":"execute-snapshot", "data":{"type":"INCREMENTAL", "data-collections": ["inventory.customers"]}}

Debezium Server config:
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=/debezium-server/data/offsets.dat
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.schema.whitelist=inventory
debezium.source.table.include.list=inventory.customers,inventory.products
debezium.source.topic.prefix=testcdc
debezium.source.schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
debezium.source.schema.history.internal.file.filename=/debezium-server/data/schema.dat
debezium.sink.type=http
debezium.sink.http.url=http://localhost:8000/log_messages
debezium.sink.http.headers.Content-Type=application/json
debezium.format.value=json
debezium.format.key=json
debezium.source.signal.enabled.channels=source,file
debezium.source.signal.data.collection=inventory.debezium_signals
debezium.source.signal.file=/debezium-server/conf/signal-file.jsonl
quarkus.log.level=trace
quarkus.log.console.json=false
debezium.source.snapshot.mode=initial
debezium.source.snapshot.locking.mode=none
debezium.errors.max.retries=1

Dne pátek 2. února 2024 v 9:29:30 UTC+1 uživatel David Ešner napsal:

Mario Fiore Vitale

unread,
Mar 5, 2024, 5:55:05 AMMar 5
to debe...@googlegroups.com
Hi David, 
can you please share the logs for the not working scenario?

You received this message because you are subscribed to a topic in the Google Groups "debezium" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/debezium/GbL2dw76mv4/unsubscribe.
To unsubscribe from this group and all its topics, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/889e8ff0-78db-40b0-9446-c37324de5601n%40googlegroups.com.


--

Mario Fiore Vitale

Senior Software Engineer

Red Hat

David Ešner

unread,
Mar 5, 2024, 6:37:26 AMMar 5
to debezium
Of course, sorry I forgot to include it . Attaching the log file. It's only debug though as this `quarkus.log.level=trace` configuration option of the Debezium Server is not changing the level to trace for me

Dne úterý 5. března 2024 v 11:55:05 UTC+1 uživatel mvi...@redhat.com napsal:
log_blocking.log

Mario Fiore Vitale

unread,
Mar 6, 2024, 3:47:26 AMMar 6
to debezium
Can you please try to trigger the blocking snapshot using the source channel? 

David Ešner

unread,
Mar 6, 2024, 4:37:07 AMMar 6
to debezium
I juste tested it and signals via the source channel are processed just fine.

Dne středa 6. března 2024 v 9:47:26 UTC+1 uživatel mvi...@redhat.com napsal:

Mario Fiore Vitale

unread,
Mar 6, 2024, 5:28:55 AMMar 6
to debezium
Hi,

seems that https://issues.redhat.com/browse/DBZ-7441 resolved the issue with incremental snapshot but not with blocking snapshot. I opened https://issues.redhat.com/browse/DBZ-7606 to keep track of it. 
Meanwhile you can use source channel as workaround. 

Thanks,
Mario.

Mario Fiore Vitale

unread,
Mar 6, 2024, 9:12:17 AMMar 6
to debezium
I'll try to reproduce it but it will really help if you can send a thread dump. 

David Ešner

unread,
Mar 6, 2024, 9:21:09 AMMar 6
to debezium
I am not sure how to obtain a thread dump. Could you point me how to do that?
Dne středa 6. března 2024 v 15:12:17 UTC+1 uživatel mvi...@redhat.com napsal:

Mario Fiore Vitale

unread,
Mar 6, 2024, 9:28:03 AMMar 6
to debezium
Before going to thread dump, can you please send a blocking snapshot signal using file and then try to insert/modify/delete in a monitored table (i.e customers or products) and see if it will start the blocking snapshot. 

David Ešner

unread,
Mar 6, 2024, 10:07:28 AMMar 6
to debezium
Yes, you are on to something. When I signal blocking snapshot nothing happens, but when I create some event in the monitored table. It performs the previously requested blocking snapshot and then captures the new events

Dne středa 6. března 2024 v 15:28:03 UTC+1 uživatel mvi...@redhat.com napsal:

Mario Fiore Vitale

unread,
Mar 6, 2024, 11:11:08 AMMar 6
to debe...@googlegroups.com
Well, let's take the thread dump and see. Try to have a look to https://www.baeldung.com/java-thread-dump, I suggest redirecting the output to a file and capture it after you see in the logs that the signal is received. In this case you need to use the file channel. 

Mario.


c hj

unread,
Apr 22, 2024, 1:58:19 PMApr 22
to debezium

Hi,

Hope you are all doing well!

I am using Debezium Engine 2.6.1 Final, and Debezium Postgres Connector 2.6.1 Final, and I have been facing the same issue.

While trying to perform the ad-hoc blocking snapshot, using file signal channel, for tables that have only snapshot events and there are no changes in the database after the replication slot was created, the ad-hoc would not work.

In the database, I have 3 tables: products, items, and test, however in the table.include.list I have included only 2 of them as follows:

props.setProperty("table.include.list", "public.products,public.items");

Started the engine and all the snapshot events were retrieved successfully. (So far, no changes in the database).

I sent a signal through the signal file, to perform an ad-hoc for the products table, but nothing happened.

 (Signal file: {"id":"unique-signal-id-0-file","type":"execute-snapshot","data":{"type":"BLOCKING","data-collections":["public.products"]}})

Then, I added 1 row in the “test” table (which is not in the table.include.list) and the blocking ad-hoc snapshot of the products table that I required earlier started right after this change. 
(Screenshot from the log file: https://prnt.sc/ecuNjn8WLZJd )

Then, right after that, I performed another ad-hoc, for the “items” table, and it was performed successfully. (Processing signal line: {"id":"unique-signal-id-2","type":"execute-snapshot","data":{"type":"BLOCKING","data-collections":["public.items"]}})

As it has already been mentioned in this email thread, it looks like the ad-hoc would not be performed if there is no change in the transaction log.

For your convenience, I have attached the TRACE log file as well as Thread dumbs.

Shouldn’t the ad-hoc work even though there are no changes in the database?

Looking forward to your reply!

threaddump-blocking_snapshot_no_events.tdump
debezium_log.log
threaddump-added_events_in_db.tdump

Chris Cranford

unread,
Apr 22, 2024, 2:08:18 PMApr 22
to debe...@googlegroups.com
Hi,

Could you please raise a Jira [1] issue, you shouldn't need to insert a row to trigger the snapshot.

Thanks,
Chris

[1]: https://issues.redhat.com/projects/DBZ

c hj

unread,
Apr 25, 2024, 12:46:47 PMApr 25
to debezium
Hi,
Thank you for your reply!

I just wanted to let you know that I opened the Jira issue, and you can find it in the following link:
Reply all
Reply to author
Forward
0 new messages