We have Kafka and Kafka Connect deployed via AWS MSK (3.4.0) and AWS MSK Connect (2.7.1)
We are attempting to take an adhoc source snapshot via the debezium mongodb connector (
https://debezium.io/documentation/reference/2.3/connectors/mongodb.html#mongodb-performing-a-snapshot).
But, after it is initiated it collects a batch of data (see a spike in number of messages for the topic and then it stops). We receive the following errors
**Error**
> 2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] [2023-07-28
> 21:43:41,205] ERROR [mongodb-default|task-0] Producer
> failure (io.debezium.pipeline.ErrorHandler:57)
> msk-mongodb-default Link
**MSK Connect Debezium Configuration**
```
connector_configuration = {
"tasks.max" : 5
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string" : "${local.mongodb_secrets.CONNECTION}/?replicaSet=rs0&authSource=admin&directConnection=true",
"topic.prefix" : "mongodb.default",
"snapshot.max.threads" : 4
"capture.mode" : "change_streams_update_full_with_pre_image"
//"database.include.list" : "test-kafka"
"collection.include.list" : "steamspy.allgamesbyowners,steam.ccu"
"signal.enabled.channels" : "source,kafka"
"signal.data.collection" : "debezium.signals"
"signal.kafka.topic" : "debezium.signals.channel"
"signal.kafka.bootstrap.servers" : module.prod_msk_default.bootstrap_brokers_sasl_iam
"signal.consumer.security.protocol" : "SASL_SSL"
"signal.consumer.sasl.mechanism" : "AWS_MSK_IAM"
"signal.consumer.sasl.jaas.config" : "software.amazon.msk.auth.iam.IAMLoginModule required;"
"signal.consumer.sasl.client.callback.handler.class" : "software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
```
**Source Signal Sent**
```
db.getCollection("signals").insertOne({"type":"execute-snapshot","data": {"data-collections": ["steam.ccu"], "type": "INCREMENTAL"}})
```
**Full Stack Trace**
```
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] [2023-07-28 21:43:4mongodb-default|task-0] Opened connection [connectionId{localValue:11, serverValue:25819743}] to
mongo.acme.com:27017 (org.mongodb.driver.connection:71) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] [2023-07-28 21:43:41mongodb-default|task-0] Producer failure (io.debezium.pipeline.ErrorHandler:57) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] java.lang.NullPointerException mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.ConcurrentHashMap.containsKey(ConcurrentHashMap.java:964) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.Collections$SetFromMap.contains(Collections.java:5564) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.SourceInfo.isInitialSyncOngoing(SourceInfo.java:408) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.SourceInfo.snapshot(SourceInfo.java:473) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.AbstractSourceInfoStructMaker.commonStruct(AbstractSourceInfoStructMaker.java:64) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:39) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:14) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.AbstractSourceInfo.struct(AbstractSourceInfo.java:77) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.CommonOffsetContext.getSourceInfo(CommonOffsetContext.java:24) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.buildNotificationWith(IncrementalSnapshotNotificationService.java:154) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.notifyInProgress(IncrementalSnapshotNotificationService.java:130) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.lambda$readChunk$8(MongoDbIncrementalSnapshotChangeEventSource.java:344) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.notifyReplicaSets(MongoDbIncrementalSnapshotChangeEventSource.java:439) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.init(MongoDbIncrementalSnapshotChangeEventSource.java:263) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.init(MongoDbIncrementalSnapshotChangeEventSource.java:57) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$initStreamEvents$3(ChangeEventSourceCoordinator.java:221) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.Optional.ifPresent(Optional.java:183) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:221) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:203) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:172) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.lang.Thread.run(Thread.java:829) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] [2023-07-28 21:43:4mongodb-default|task-0] Connected metrics set to 'false' (io.debezium.pipeline.ChangeEventSourceCoordinator:282) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] [2023-07-28 21:43:4mongodb-default|task-0] WorkerSmongodb-default-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:485) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] [2023-07-28 21:43:4mongodb-default|task-0] WorkerSmongodb-default-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:502) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] [2023-07-28 21:43:41mongodb-default|task-0] WorkerSmongodb-default-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:72) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:125) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.lang.Thread.run(Thread.java:829) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] Caused by: java.lang.NullPointerException mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.ConcurrentHashMap.containsKey(ConcurrentHashMap.java:964) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at java.base/java.util.Collections$SetFromMap.contains(Collections.java:5564) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.SourceInfo.isInitialSyncOngoing(SourceInfo.java:408) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.SourceInfo.snapshot(SourceInfo.java:473) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.AbstractSourceInfoStructMaker.commonStruct(AbstractSourceInfoStructMaker.java:64) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:39) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:14) mongodb-default Link
2023-07-28T21:43:41.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.AbstractSourceInfo.struct(AbstractSourceInfo.java:77) mongodb-default Link
```
Kafka Connect then attempts to restart and throws the error
```
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] [2023-07-28 21:47:42mongodb-default|task-0] WorkerSmongodb-default-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:191) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped. mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:72) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:125) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at java.base/java.lang.Thread.run(Thread.java:829) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] Caused by: java.lang.NullPointerException mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at java.base/java.util.concurrent.ConcurrentHashMap.containsKey(ConcurrentHashMap.java:964) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at java.base/java.util.Collections$SetFromMap.contains(Collections.java:5564) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.SourceInfo.isInitialSyncOngoing(SourceInfo.java:408) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.SourceInfo.snapshot(SourceInfo.java:473) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.AbstractSourceInfoStructMaker.commonStruct(AbstractSourceInfoStructMaker.java:64) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:39) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbSourceInfoStructMaker.struct(MongoDbSourceInfoStructMaker.java:14) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.AbstractSourceInfo.struct(AbstractSourceInfo.java:77) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.CommonOffsetContext.getSourceInfo(CommonOffsetContext.java:24) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.buildNotificationWith(IncrementalSnapshotNotificationService.java:154) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.notification.IncrementalSnapshotNotificationService.notifyInProgress(IncrementalSnapshotNotificationService.java:130) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.lambda$readChunk$8(MongoDbIncrementalSnapshotChangeEventSource.java:344) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.notifyReplicaSets(MongoDbIncrementalSnapshotChangeEventSource.java:439) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.readChunk(MongoDbIncrementalSnapshotChangeEventSource.java:342) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.init(MongoDbIncrementalSnapshotChangeEventSource.java:263) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource.init(MongoDbIncrementalSnapshotChangeEventSource.java:57) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$initStreamEvents$3(ChangeEventSourceCoordinator.java:221) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at java.base/java.util.Optional.ifPresent(Optional.java:183) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.initStreamEvents(ChangeEventSourceCoordinator.java:221) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(ChangeEventSourceCoordinator.java:203) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.executeChangeEventSources(ChangeEventSourceCoordinator.java:172) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] at io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java:118) mongodb-default Link
2023-07-28T21:47:42.000Z [Worker-0c6eca6069786095a] ... 5 more
```