Hi community,
I’m running a MongoDB → Kafka → Target DB migration and have run into an issue with Debezium’s MongoDB connector (MongoDB 4.4, Debezium version 2.5). Snapshot mode is set as never, so I can signal it to snapshot on demand. CDC ran fine, incremental snapshots were also working correctly.
Problem:
After the connector completed its initial incremental snapshot, I left the connector running to continue CDC until cutover. A few days later, CDC silently stopped.
-
Connector status still shows RUNNING.
-
No errors or warnings in logs
-
No offsets advancing
-
Oplog changes are not being consumed
I tried triggering another incremental snapshot. Kafka Connect logs show that the connector received the incremental snapshot signal, Debezium metrics show that snapshot is currently in progress. But nothing actually happens — no scans, no reads, no progress. It just hangs indefinitely. Restarting the connector did not help.
As a workaround, I deleted the connector and recreated a new one with the same configuration. That allowed it to snapshot again and sync the two clusters, and it’s working for now. But I still can’t pinpoint what the actual problem was, since there were 0 errors or warnings in the logs, which makes it very hard to troubleshoot.
{
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"collection.include.list": "",
"signal.kafka.bootstrap.servers": "kafka:29092",
"mongodb.connection.string": "",
"incremental.snapshot.chunk.size": "40000",
"tasks.max": "1",
"incremental.snapshot.watermarking.strategy": "insert_insert",
"signal.kafka.topic": "ott_caching4_signals",
"heartbeat.interval.ms": "1000",
"signal.enabled.channels": "source,kafka",
"topic.heartbeat.prefix": "__ott-caching4",
"incremental.snapshot.allow.schema.changes": "true",
"key.converter.schemas.enable": "true",
"topic.prefix": "source-ott-caching4",
"producer.override.max.request.size": "52428800",
"cursor.max.await.time.ms": "15000",
"value.converter.schemas.enable": "true",
"signal.data.collection": "cdc.ott_caching4_signals",
"name": "source-ott-caching-4-f1",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.include.list": "ott_caching",
"snapshot.mode": "never"
}
{
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"errors.log.include.messages": "true",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneDefaultStrategy",
"tasks.max": "1",
"namespace.mapper.value.collection.field": "source.collection",
"update.one.upsert": "true",
"delete.on.null.values": "true",
"collection": "",
"topics.regex": "source-ott-caching4\\..*",
"namespace.mapper": "com.mongodb.kafka.connect.sink.namespace.mapping.FieldPathNamespaceMapper",
"namespace.mapper.value.database.field": "source.db",
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.mongodb.ChangeStreamHandler",
"dead.letter.queue.topic.name": "dlq-debezium-sink-errors",
"key.converter.schemas.enable": "true",
"database": "sink-fallback-db",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy",
"connection.uri": "",
"value.converter.schemas.enable": "true",
"name": "sink-ott-caching4",
"errors.tolerance": "all",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.log.enable": "true",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
}
Thank you in advanced.