Unable to run INCREMENTAL snapshot with MongoDB on DEBEZIUM=2.7

28 views
Skip to first unread message

Artemiy Kozyr

unread,
Jun 13, 2024, 9:00:19 AMJun 13
to debezium
Hi team,

I am trying to run an ad-hoc incremental snapshot for MongoDB but I was unsuccessful.

Steps I took:

1. Here's my connector config:

```
{
"name": "source__mongodb_analytics__snapshot_initial",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.connection.string": "${file:secrets/source__mongodb_analytics.properties:uri}",
"tasks.max": "4",
"topic.prefix": "mongodb",
"topic.naming.strategy": "io.debezium.schema.DefaultTopicNamingStrategy",
"topic.delimiter": "__",

"topic.creation.default.replication.factor": 1,
"topic.creation.default.partitions": 4,
"topic.creation.default.cleanup.policy": "delete",
"topic.creation.default.compression.type": "zstd",

"filters.match.mode": "regex",
"database.include.list": "app_etl_data_prod",
"collection.include.list": "app_etl_data_prod.cars,__debezium_signals",

"capture.mode": "change_streams_update_full",
"capture.scope": "deployment",
"tombstones.on.delete": "false",
"snapshot.mode": "initial",
"snapshot.max.threads": 4,
"snapshot.fetch.size": 10000,
"max.batch.size": 2048,
"max.queue.size": 8192,
"max.queue.size.in.bytes": 5000000,

"connect.max.attempts": 8,
"incremental.snapshot.chunk.size": 10000,
"incremental.snapshot.watermarking.strategy": "insert_insert",

"notification.enabled.channels": "sink,log,jmx",
"notification.sink.topic.name": "__debezium_notifications",
"signal.enabled.channels": "kafka",
"signal.kafka.topic": "__debezium_signals",
"signal.kafka.bootstrap.servers": "kafka:9092",

"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "mongodb__(.*)__(.*)",
"transforms.route.replacement": "$1_$2",

"errors.tolerance": "none",
"errors.log.enable": "true",
"errors.log.include.message": "true",
"errors.retry.timeout": "0",
}
}
```

2. Here's the signal message:

```
echo 'mongodb:{"type": "execute-snapshot","data": {"type": "incremental","data-collections": ["app_etl_data_prod.cars"],"additional-conditions": [{"data-collection": "app_etl_data_prod.cars","filter": "color='blue' AND brand='MyBrand'"}]}}' | \
kcat -b kafka:9092 -t __debezium_signals -K: -P
```

3. Here's the output log (Kafka Connect):

```
2024-06-13 12:43:20,927 WARN MongoDB|mongodb|streaming Action execute-snapshot failed. The signal SignalRecord{id='null', type='execute-snapshot', data='{
"type" : "incremental",
"data-collections" : [ "app_etl_data_prod.cars" ],
"additional-conditions" : [ {
"data-collection" : "app_etl_data_prod.cars",
"filter" : "color=blue AND brand=MyBrand"
} ]
}', additionalData={channelOffset=21}} may not have been processed. [io.debezium.pipeline.signal.SignalProcessor]
io.debezium.DebeziumException: Database error while executing incremental snapshot for table 'DataCollection{id=app_etl_data_prod.cars, additionalCondition=color=blue AND brand=MyBrand, surrogateKey=}'
at io.debezium.connector.mongodb.snapshot.MongoDbIncrementalSnapshotChangeEventSource.readChunk(MongoDbIncrementalSnapshotChangeEventSource.java:343)
at io.debezium.connector.mongodb.snapshot.MongoDbIncrementalSnapshotChangeEventSource.addDataCollectionNamesToSnapshot(MongoDbIncrementalSnapshotChangeEventSource.java:427)
at io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot.arrived(ExecuteSnapshot.java:78)
at io.debezium.pipeline.signal.SignalProcessor.processSignal(SignalProcessor.java:191)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:658)
at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at io.debezium.pipeline.signal.SignalProcessor.lambda$process$3(SignalProcessor.java:143)
at io.debezium.pipeline.signal.SignalProcessor.executeWithSemaphore(SignalProcessor.java:165)
at io.debezium.pipeline.signal.SignalProcessor.process(SignalProcessor.java:138)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.debezium.DebeziumException: Error while attempting to emit window open for chunk '9ecc6d80-9389-4e01-8bd4-69bbf9cb61e4'
at io.debezium.connector.mongodb.connection.MongoDbConnections.lambda$eventSourcingErrorHandler$1(MongoDbConnections.java:53)
at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:111)
at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:88)
at io.debezium.connector.mongodb.snapshot.MongoDbIncrementalSnapshotChangeEventSource.emitWindowOpen(MongoDbIncrementalSnapshotChangeEventSource.java:216)
at io.debezium.connector.mongodb.snapshot.MongoDbIncrementalSnapshotChangeEventSource.readChunk(MongoDbIncrementalSnapshotChangeEventSource.java:290)
... 24 more
Caused by: java.lang.NullPointerException
at io.debezium.connector.mongodb.snapshot.MongoDbIncrementalSnapshotChangeEventSource.lambda$emitWindowOpen$0(MongoDbIncrementalSnapshotChangeEventSource.java:219)
at io.debezium.connector.mongodb.connection.MongoDbConnection.lambda$execute$0(MongoDbConnection.java:89)
at io.debezium.connector.mongodb.connection.MongoDbConnection.execute(MongoDbConnection.java:105)
... 27 more
```

Key log output messages:

```
io.debezium.DebeziumException: Database error while executing incremental snapshot for table 'DataCollection{id=app_etl_data_prod.cars, additionalCondition=color=blue AND brand=MyBrand, surrogateKey=}'

Caused by: io.debezium.DebeziumException: Error while attempting to emit window open for chunk '9ecc6d80-9389-4e01-8bd4-69bbf9cb61e4'

Caused by: java.lang.NullPointerException
```

I guess something is wrong with NullPointerException

4. I use MongoDB Connector 2.7.0.Beta2 version

```
    {
        "class": "io.debezium.connector.mongodb.MongoDbConnector",
        "type": "source",
        "version": "2.7.0.Beta2"
    },
```

5. I also get these offsets:

```
{
    "offsets": [
        {
            "offset": {
                "incremental_snapshot_collections": "[{\"incremental_snapshot_collections_id\":\"app_etl_data_prod.cars\",\"incremental_snapshot_collections_additional_condition\":\"(color=blue AND brand=MyBrand)\"}]",
                "incremental_snapshot_correlation_id": null,
                "incremental_snapshot_maximum_key": "aced000570",
                "incremental_snapshot_primary_key": "aced000570",
                "ord": 169,
                "resume_token": "82666AEBCB000000A92B022C0100296E5A1004CBD817ABF3AC4B12B075519463159D5B46645F696400646640DF2B0B613B465C4DC7500004",
                "sec": 1718283211
            },
            "partition": {
                "server_id": "mongodb"
            }
        }
    ]
}
```

Is there a way I can overcome this? Any suggestions.

Thank you.

Mario Fiore Vitale

unread,
Jun 14, 2024, 3:56:20 AMJun 14
to debezium
Hi, 

when you use the incremental snapshot the source channel is required and also the signal.data.collection[1] property. Can you try with this configuration? 

Reply all
Reply to author
Forward
0 new messages