Signal table insertion not resulting in anything

2,914 views
Skip to first unread message

Vera van Mondfrans

unread,
May 25, 2021, 4:18:27 AM5/25/21
to debezium

Hi,

I've recently upgraded to 1.6Beta1 and am eager to use the new signaling feature.

I have created the following table:

CREATE TABLE prd_meta.debezium_signals (
  `id` TEXT NOT NULL,
  `type` TEXT NOT NULL,
  `data` TEXT NOT NULL
);


Added the following to my configuration:

database.history.kafka.topic: "debezium.database-changes"

And run the following query:

INSERT INTO
  prd_meta.debezium_signals
VALUES(
  'ad-hoc-1',
  'execute-snapshot',
  '{"data-collections": ["cli_VerasSpaceAgency.resource_entries"]}'
)

However, the MySQL connector doesn't show any output and no snapshot is starting. I'm also not sure where to look for debugging information regarding this. Any idea what I'm forgetting?

Regards, Vera van Mondfrans

jiri.p...@gmail.com

unread,
May 25, 2021, 4:30:09 AM5/25/21
to debezium
Hi,

could you please share your connector configuration and the Kafka Connect log?

J.

Vera van Mondfrans

unread,
May 25, 2021, 6:15:42 AM5/25/21
to debezium
Hi,

I've attached a log file, and here's my configuration, too. FYI, I use Strimzi to manage this, which works like a charm :)


    database.history.kafka.bootstrap.servers:  kafka-cluster-kafka-bootstrap:9092
    database.history.kafka.topic:              debezium.database-changes
    database.hostname:                         10.6.0.209
    database.password:                         ${file:/opt/kafka/external-configuration/kafka-mysql-connector-config/__encrypt__debezium-mysql-credentials.properties:mysql_password}
    database.port:                             3306
    database.server.id:                        184054
    database.server.name:                      cloudsql
    database.user:                             ${file:/opt/kafka/external-configuration/kafka-mysql-connector-config/__encrypt__debezium-mysql-credentials.properties:mysql_username}
    signal.data.collection:                    prd_meta.debezium_signals
    snapshot.locking.mode:                     none
    snapshot.mode:                             when_needed
    table.whitelist:                           .*\.( ?breakdown_progress| ?krs_breakdown| ?krs_company| ?krs_legal_entity| ?krs_organisational_unit| ?krs_position| ?krs_project| ?krs_relation| ?krs_resource_org_unit| ?krs_resource_position| ?krs_resource_role| ?krs_resource_skill| ?krs_skill_levels| ?krs_skill| ?resource_entries| ?resource_entry_group_recurrence| )$
    Transforms:                                route
    transforms.route.language:                 jsr223.groovy
    transforms.route.null.handling.mode:       drop
    transforms.route.topic.expression:         topic == 'cloudsql' ? null : topic.replaceAll(
  /cloudsql\..+\./,
  value.op == 'r' ? 'debezium.resync.' : 'debezium.'
)
    transforms.route.type:  io.debezium.transforms.ContentBasedRouter
debezium.log

Vera van Mondfrans

unread,
May 25, 2021, 6:20:36 AM5/25/21
to debezium
I also only just now noticed the following:

2021-05-25 10:08:32,121 WARN Using configuration property "table.whitelist" is deprecated and will be removed in future versions. Please use "table.include.list" instead. (io.debezium.config.Configuration) [task-thread-mysql-connector-0]


I quickly want to ask, can we still use regular expressions in table.include.list?

jiri.p...@gmail.com

unread,
May 25, 2021, 6:20:47 AM5/25/21
to debezium
Hi,

could you please add the signal table  prd_meta.debezium_signals to the table whitelist?

J.

Vera van Mondfrans

unread,
May 26, 2021, 5:30:07 AM5/26/21
to debezium
Hi,

I've added it and that does seem to fix it, however, with the above insert the connector now fatally throws this error:

 2021-05-26 09:26:47,698 INFO WorkerSourceTask{id=mysql-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask) [task-thread-mysql-connector-0]
 2021-05-26 09:26:47,699 ERROR WorkerSourceTask{id=mysql-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [ta
 org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
     at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
     at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:369)
     at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1118)
     at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:966)
     at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
     at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
     at java.base/java.lang.Thread.run(Thread.java:834)
 Caused by: io.debezium.DebeziumException: Error processing binlog event
     ... 6 more
 Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1622021207, file=mysql-bin.000355, pos=54595416, incremental_snapshot_maximum_key=aced000570, gtids=8193a141
     at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253)
     at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$3(MySqlStreamingChangeEventSource.java:681)
     at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:730)
     at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:680)
     at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:352)
     ... 5 more
 Caused by: io.debezium.DebeziumException: Database error while executing incremental snapshot
     at io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource.readChunk(SignalBasedIncrementalSnapshotChangeEventSource.java:257)
     at io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource.addDataCollectionNamesToSnapshot(SignalBasedIncrementalSnapshotChangeEventSource.java:273)
     at io.debezium.pipeline.signal.ExecuteSnapshot.arrived(ExecuteSnapshot.java:64)
     at io.debezium.pipeline.signal.Signal.process(Signal.java:136)
     at io.debezium.pipeline.signal.Signal.process(Signal.java:180)
     at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:227)
     at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:77)
     at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
     at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
     ... 9 more
 Caused by: java.sql.SQLIntegrityConstraintViolationException: Column 'data' cannot be null
     at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117)
     at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
     at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
     at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953)
     at com.mysql.cj.jdbc.ClientPreparedStatement.execute(ClientPreparedStatement.java:370)
     at io.debezium.jdbc.JdbcConnection.prepareUpdate(JdbcConnection.java:764)
     at io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource.emitWindowOpen(SignalBasedIncrementalSnapshotChangeEventSource.java:135)
     at io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource.readChunk(SignalBasedIncrementalSnapshotChangeEventSource.java:211)
     ... 17 more
 2021-05-26 09:26:47,700 INFO Stopping down connector (io.debezium.connector.common.BaseSourceTask) [task-thread-mysql-connector-0]

Perhaps the docs could include the correct table definition? The way it is written right now I assumed all columns should be non-nullable

Vera van Mondfrans

unread,
May 26, 2021, 5:42:31 AM5/26/21
to debezium
I made the data column nullable, and now its working!

I do think it'd be very nice to document message types and their data payloads, however. Right now I'm just guessing, though I might also just have missed some docs :)

Is there a way to trigger a full snapshot with this?

jiri.p...@gmail.com

unread,
May 27, 2021, 6:04:12 AM5/27/21
to debezium
Thanks fro the feedback,  I've created https://issues.redhat.com/browse/DBZ-3568

Regarding the full snapshot, it is not possible yet but as you can see from the structure of the message it is planned.

J.

Thiago Dantas

unread,
Jun 21, 2021, 11:46:19 AM6/21/21
to debezium
is there something wrong with the docker image debezium/connect:1.6?
I'm trying exactly the same thing and not seeing any log messages regarding signaling

My connector config
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
transforms.unwrap.delete.handling.mode=rewrite
max.queue.size=20000
topic.creation.default.partitions=1
tasks.max=1
database.history.kafka.topic=data_eng.dbTransactions.history
transforms=unwrap
database.history.consumer.security.protocol=SSL
table.include.list=dbo.AuthorizeRequestCore,dbo.debezium_signal
topic.creation.default.replication.factor=2
signal.data.collection=dbo.debezium_signal
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
converters=timestampConverter
snapshot.fetch.size=10240
database.user=**
database.dbname=dbTransactions
timestampConverter.type=oryanmoshe.kafka.connect.util.TimestampConverter
database.history.producer.security.protocol=SSL
acks=1
database.history.kafka.bootstrap.servers=**
time.precision.mode=connect
database.server.name=data_eng__db_transactions
snapshot.isolation.mode=read_uncommitted
database.port=1433
topic.creation.enable=true
database.hostname=**
database.password=**
transforms.unwrap.add.fields=op,source.ts_ms
max.batch.size=10000
snapshot.mode=schema_only

CREATE TABLE dbo.debezium_signal (id VARCHAR(32) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
insert into dbo.debezium_signal
values (substring(convert(varchar(50), newid()),1, 32), 'log', '{"message": "Signal message at offset {}"}')
INSERT INTO dbo.debezium_signal
VALUES( substring(convert(varchar(50), newid()),1, 32), 'execute-snapshot', '{"data-collections": ["dbo.AuthorizeRequestCore"]}')

The only log messages I see are regular CDC log messages


2021-06-21 15:44:48,713 INFO || 1 records sent during previous 00:11:45.5, last recorded offset: {transaction_id=null, event_serial_no=1, commit_lsn=000248de:00001cdb:0003, change_lsn=000248de:00001cdb:0002} [io.debezium.connector.common.BaseSourceTask]
2021-06-21 15:44:52,537 INFO || WorkerSourceTask{id=data_eng__db_transactions-0} Committing offsets [org.apache.kafka.connect.runtime.WorkerSourceTask]
2021-06-21 15:44:52,538 INFO || WorkerSourceTask{id=data_eng__db_transactions-0} flushing 0 outstanding messages for offset commit [org.apache.kafka.connect.runtime.WorkerSourceTask]
2021-06-21 15:44:52,543 INFO || WorkerSourceTask{id=data_eng__db_transactions-0} Finished commitOffsets successfully in 5 ms [org.apache.kafka.connect.runtime.WorkerSourceTask]

Nathan Kuhl

unread,
Jun 28, 2021, 5:37:11 PM6/28/21
to debezium
I'm also noticing that inserting records into the signal table isn't triggering an incremental snapshot in the Debezium-PostgreSQL connector. I've included my configuration below. I've checked the connector logs and the only messages I see are regular CDC messages, like the above post^^.

{
  "name": "dbz-test-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "plugin.name": "wal2json",
    "database.hostname": "127.0.0.1",
    "database.port": "5432",
    "database.user": "nathankuhl",
    "database.password": "*******",
    "database.dbname" : "postgres",
    "database.server.name": "DBTestServer",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",
    "value.converter.schemas.enable": "false",
    "slot.name": "de_debezium",
    "table.include.list": "public.accounts",
    "signal.data.collection": "public.debezium_signals_alt"
  }
}

Here's the create table DDL I'm using for the signal data collection:

create table public.debezium_signals_alt (
  id varchar PRIMARY KEY,
  signal_type varchar NOT NULL,
  message varchar NULL
);

And here's the insert statement:
INSERT INTO public.debezium_signals_alt(id, signal_type, message) VALUES('ad-hoc-2', 'execute-snapshot', '{"data-collections": ["public.accounts"]}');

Have I missed any additional configurations?

Thiago Dantas

unread,
Jun 28, 2021, 6:35:17 PM6/28/21
to debe...@googlegroups.com
Nathan, add your signal table to  table.include.list
I got mine working, its kinda quirky with sql server but it works really well so far

Investment advisory services are offered by Stash Investments LLC, a federally-registered investment adviser.

Stash's outgoing and incoming emails are electronically archived and subject to review and/or disclosure to someone other than the recipient. We cannot accept requests for securities transactions or other similar instructions through email. This communication is not an offer or solicitation to buy or sell any financial product(s). We cannot ensure the security of information emailed over the Internet, so you should be careful when transmitting confidential information such as account numbers and security holdings. If the reader of this message is not the intended recipient, or an employee or agent responsible for delivering this message to the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this communication in error, please notify us immediately by replying to this message and deleting it from your computer.

--
You received this message because you are subscribed to a topic in the Google Groups "debezium" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/debezium/m-I1gWoZb74/unsubscribe.
To unsubscribe from this group and all its topics, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/923155d6-383c-425b-9995-4884c2d3b591n%40googlegroups.com.

Nathan Kuhl

unread,
Jun 29, 2021, 11:07:11 AM6/29/21
to debezium
thanks! it's now working. It would be helpful if the documentation was updated with that tip.

Thiago Dantas

unread,
Jun 29, 2021, 11:11:02 AM6/29/21
to debe...@googlegroups.com
It is said on  https://debezium.io/documentation/reference/configuration/signalling.html 

The signalling data collection must be explictly added among captured data collections.

But I agree the documentation can be overall improved but we should remember it is a brand new feature 





Nathan Kuhl

unread,
Jun 29, 2021, 11:29:42 AM6/29/21
to debezium
Ahh there it is. They may consider using the name of the parameter, "table.include.list" to be perfectly clear

Gunnar Morling

unread,
Jun 29, 2021, 11:54:15 AM6/29/21
to debezium
Yes, good point. Fancy sending a PR for updating the docs? That said, I'm wondering whether the signalling table shouldn't be implicitly part of the included tables. @Jiri, I vaguely remember us talking about this before; is there any reason for not doing this?

Thiago Dantas

unread,
Jun 29, 2021, 12:44:05 PM6/29/21
to debe...@googlegroups.com
Message has been deleted

Fly Yang

unread,
Jul 1, 2021, 12:37:46 AM7/1/21
to debezium
I use signals to perform incremental snapshots in oracle and nothing happens. The database is oracle19.3, debezium-1.6.0.CR1. The connector configuration is as follows:
{
  "connector.class": "io.debezium.connector.oracle.OracleConnector",
  "database.user": "c##dbzlogminer",
  "database.dbname": "idatadb",
  "tasks.max": "1",
  "database.pdb.name": "jydb",
  "database.history.kafka.bootstrap.servers": "172.21.224.66:9092,172.21.224.67:9092,172.21.224.68:9092",
  "database.history.kafka.topic": "jydb-changes.inventory",
  "database.server.name": "JYDB",
  "schema.include.list": "jydb",
  "heartbeat.interval.ms": "60000",
  "database.port": "1521",
  "decimal.handling.mode": "string",
  "database.hostname": "172.21.224.66",
  "database.password": "xxxxx",
  "signal.data.collection": "jydb.debezium_signal",
  "lob.enabled": "true",
  "heartbeat.topics.prefix": "__connector_heartbeat",
  "database.history.store.only.captured.tables.ddl": "true",
  "table.include.list": "JYDB.SECUMAIN,JYDB.DEBEZIUM_SIGNAL,JYDB.FR_TEST,JYDB.A_TEST"
}
step:
1. CREATE TABLE jydb.debezium_signal (id VARCHAR(32) PRIMARY KEY, type VARCHAR(32) NOT NULL, data VARCHAR(2048) NULL);
2. insert into jydb.debezium_signal
values('924e3ff8-2245-43ca-ba77-2af9affa','log','{"message": "Signal message at offset {}"}');
3. INSERT INTO jydb.debezium_signal
VALUES(
  'a24e3ff8-2245-43ca-ba77-2af9afqb',
  'execute-snapshot',
  '{"data-collections": ["jydb.fr_test"]}'
);

Nothing happens in the log, is it my configuration incorrectly?

Chris Cranford

unread,
Jul 1, 2021, 2:45:27 AM7/1/21
to debe...@googlegroups.com, Fly Yang
Hi Fly Yang -

The incremental snapshot feature is not yet available for the Oracle connector, see https://issues.redhat.com/browse/DBZ-3692.  We hope to deliver on this as a part of this upcoming quarter's release cadence.

Thanks,
Chris
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/47bca05a-617f-4dec-bbdd-81dd8d16308en%40googlegroups.com.

Message has been deleted

Shiva Nagesh

unread,
Oct 8, 2021, 9:18:36 AM10/8/21
to debezium
Hi dan,

we are using debezium sql server 1.6.2 final in a container, and followed the steps mentioned in documentation.
1) created connector with one table - loads fine.
2) added rest of tables from the database. - connector only reads the new data.
3) created signal table in sql server database  with mentioned schema.
4) updated the connector to add signal.data.collection(used database.schema.signal_table) and added the table in table.include.list. (used schema.signal_table)
5) restarted the connector.
6) inserted a record in signal table to trigger snapshot for one of the tables added 2nd time.

But somehow the snapshot is not being triggered. Is there something we are missing.

Thiago Dantas

unread,
Oct 8, 2021, 9:29:22 AM10/8/21
to debezium
Hi Shiva, a few things to check

Is the signal table CDC enabled? (sys.sp_cdc_enable_table)
Can you check the container logs to see if debezium is picking up the signal table in CDC mode?
Can you try signaling for a log message while you're reading the logs?  insert into SIGNAL_TABLE values (newid(), 'log', '{"message": "Im Alive"}') 
Can you share your full execute-snapshot signal insert?

Shiva Nagesh

unread,
Oct 8, 2021, 10:19:56 AM10/8/21
to debe...@googlegroups.com
Hi Dan, 

With finally got the snapshot triggered with changing the schema name and DB name to case sensitive.

but the connector failed with below exception.

org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:291) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:59) at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:159) at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:122) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=1, incremental_snapshot_maximum_key=aced000570, commit_lsn=00001e8a:00000cbf:0003, change_lsn=00001e8a:00000cbf:0002, incremental_snapshot_collections=DevTest_BI.dbo.table1,DevTest_BI.dbo.table2, incremental_snapshot_primary_key=aced000570} at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStreamingChangeEventSource.java:270) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:613) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:285) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:170) ... 8 more Caused by: java.lang.NullPointerException at io.debezium.util.ColumnUtils.toArray(ColumnUtils.java:41) at io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource.lambda$readChunk$2(SignalBasedIncrementalSnapshotChangeEventSource.java:230) at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:649) at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:517) at io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource.readChunk(SignalBasedIncrementalSnapshotChangeEventSource.java:225) at io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource.addDataCollectionNamesToSnapshot(SignalBasedIncrementalSnapshotChangeEventSource.java:278) at io.debezium.pipeline.signal.ExecuteSnapshot.arrived(ExecuteSnapshot.java:64) at io.debezium.pipeline.signal.Signal.process(Signal.java:136) at io.debezium.pipeline.signal.Signal.process(Signal.java:180) at io.debezium.pipeline.EventDispatcher$2.changeRecord(EventDispatcher.java:227) at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:77) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217) ... 12 more

Can you please help if we are doing anything wrong?

Jose Riego Valenzuela

unread,
Oct 8, 2021, 11:02:22 AM10/8/21
to debezium
I'm having a similar issue not being able to trigger an ad-hoc snapshot but using the Kafka topic signal method.
I hope it's OK to post here, but I can create a new post if it's considered unrelated.

This is part of the source connector config:
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"table.whitelist": "sourcedb.mytable",
"snapshot.new.tables": "parallel",
"database.whitelist": "sourcedb",
"signal.kafka.bootstrap.servers": "****",
"signal.kafka.topic": "eda-connect-signals",
"database.server.name": "eda_source",
"name": "source-debezium-mytable",
"snapshot.mode": "schema_only"

and this is me trying to send a signal to the topic:
echo 'source-debezium-mytable|{"type":"execute-snapshot","data": {"data-collections": ["sourcedb.mytable"], "type": "INCREMENTAL"}}' | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server ${KAFKA_BOOTSTRAP} --producer.config ${KAFKA_PROPERTIES} --topic eda-connect-signals --property "parse.key=true" --property "key.separator=|"

I have tried lots of variations but I'm definitely missing something. Thanks!

jiri.p...@gmail.com

unread,
Oct 10, 2021, 11:36:44 PM10/10/21
to debezium

Shiva Nagesh

unread,
Oct 11, 2021, 6:37:17 AM10/11/21
to debe...@googlegroups.com
Can we use the read-only property for SQL server as well for doing incremental snapshots?

You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/4c64e4ba-a40d-4220-b947-c0536b8c1b04n%40googlegroups.com.

jiri.p...@gmail.com

unread,
Oct 11, 2021, 11:56:35 PM10/11/21
to debezium
Hi,

no, it is MySQL feature only. But in future we intend to provide signal sending support via Kafka topic and probably via REST for other conectors too.

J.

Thiago Dantas

unread,
Oct 13, 2021, 12:26:31 PM10/13/21
to debezium
Shiva can you try changing snapshot.mode to schema_only?
That's what I use and it works

Jose Riego Valenzuela

unread,
Oct 14, 2021, 4:27:56 AM10/14/21
to debezium
Am I correct in assuming the incremental snapshot should replicate the whole table?
We have a big table to replicate and I finally managed to get it to pick up the signal to start the snapshot. I'm trying this with snapshot.mode to schema_only. 
It starts but it completes in a few minutes when it should take hours to get all the data. We are missing older information in this table.

Does incremental snapshot go through the whole table or just whatever is available right now in the binlogs?
Message has been deleted

Aqhil Mohammad

unread,
Oct 30, 2023, 10:08:15 AM10/30/23
to debezium
Hello Dan,

Sorry to start thread again. I'm in the same phase as above. However when doing incremental snapshot not getting data.

Steps Performed :

CDC enabled on signal table
Signal table added in table.include.list property

INSERT INTO dbo.debezium_signal (id, type, data) VALUES('ad-hoc-6', 'execute-snapshot', '{"data-collections": ["AAF_CDC.dbo.currency"],"type":"incremental", "additional-condition":"date >= ''2023-10-26'' and date < ''2023-10-30''"}');

Do we require signal topic to be created ? Seems yes 

[WARN] 2023-10-30 13:44:35,639 [kafka-producer-network-thread | connector-producer-cdna-sql-source-connector-0] org.apache.kafka.clients.NetworkClient handleSuccessfulResponse - [Producer clientId=connector-producer-cdna-sql-source-connector-0] Error while fetching metadata with correlation id 65467 : {stc-con-grp-3-AAF_CDC-dbo-debezium_signal=UNKNOWN_TOPIC_OR_PARTITION}
Reply all
Reply to author
Forward
0 new messages