Hi Team ,
I am trying to trigger snapshot using kafka signal . This is my connector configuration
curl -X POST localhost:8080/connectors \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-d '{
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"
slot.name": "debezium_test_avro",
"tasks.max": "1",
"
retriable.restart.connector.wait.ms": "30000",
"schema.include.list": "public",
"slot.max.retries": "30",
"
slot.retry.delay.ms": "300000",
"topic.prefix": "pg.test.avro",
"binary.handling.mode": "bytes",
"heartbeat.action.query": "UPDATE dbz_heartbeat SET heartbeat_time = NOW();",
"signal.enabled.channels":"source,kafka",
"signal.kafka.topic":"signal-topic",
"signal.kafka.bootstrap.servers":"kafka-kafka-bootstrap:9092",
"signal.data.collection":"dbz_signal",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"errors.log.enable": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"database.dbname": "test",
"database.user": "XXX",
"sanitize.field.names": "true",
"slot.drop.on.stop": "XXX",
"time.precision.mode": "connect",
"
heartbeat.interval.ms": "60000",
"column.include.list": "^.*$",
"database.port": "5432",
"
plugin.name": "pgoutput",
"value.converter.schema.registry.url": "XXXX",
"database.hostname": "XXX",
"database.password": "XXX",
"name": "postgres.test_avro_connector",
"errors.tolerance": "all",
"table.include.list": "public.employee,public.department",
"key.converter.schema.registry.url": "XXXX",
"snapshot.mode": "initial"
},
"name": "postgres.test_avro_connector"
}'
and this is my message
echo 'pg.test.avro#{"type":"execute-snapshot","data": {"data-collections": ["public.employee"], "type": "INCREMENTAL"}}'| kafkactl produce signal-topic --separator=#
there are no error in logs . This is what I found in logs
2 records sent during previous 00:01:00.133, last recorded offset of {server=pg.test.avro} partition is {incremental_snapshot_correlation_id=null, lsn_proc=593052184, messageType=UPDATE, lsn_commit=593049744, lsn=593052184, incremental_snapshot_maximum_key=aced0005757200135b4c6a6176612e6c616e672e4f626a6563743b90ce589f1073296c0200007870000000017372000e6a6176612e6c616e672e4c6f6e673b8be490cc8f23df0200014a000576616c7565787200106a6176612e6c616e672e4e756d62657286ac951d0b94e08b0200007870000000000000000b, txId=692241, incremental_snapshot_collections=[{"incremental_snapshot_collections_id":"public.employee","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":null},{"incremental_snapshot_collections_id":"public.employee","incremental_snapshot_collections_additional_condition":null,"incremental_snapshot_collections_surrogate_key":null}], ts_usec=1752940032072277, incremental_snapshot_primary_key=aced000570}
I am not seeing any message in table kafka topic . Please suggest
Regards,
Renu Yadav