Filtering MongoDB collections on change stream pipeline or in kafka connect

293 views
Skip to first unread message

Lucio Catinelli

unread,
May 4, 2022, 4:13:34 PM5/4/22
to debezium
Hi everyone

I was using Debezium connector for MongoDB CDC filtering a few collections with low data volume and setting change_stream_update_full as mode. When the connector started, there were a lot of network traffic between mongo and kafka connect instance, greater than expected for those small low activity collections. So I analized and tested debezium code and I noticed that when change stream is open, there is no filter in the pipeline (only operation type: https://github.com/debezium/debezium/blob/b2a4524c940adc3c9de67c5f240fb55c57733f1a/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/MongoDbStreamingChangeEventSource.java#L304). So, any change document in mongo is transferred to the kafka connect instance where debezium runs, and then, once reached the instance, debezium applies all the database and collection filters logic.

Also, I analyzed Mongo Kafka connector and I saw that there is a pipeline configuration and a recommended way to stream multiple collections (see https://www.mongodb.com/docs/kafka-connector/current/source-connector/usage-examples/multiple-sources/).

At the end, to solve my issue I modified debezium code and built a custom version that filters collections on the change stream pipeline. But I wanted to ask you if there is a reason for not having that filter on the change stream filters in Debezium and also, which you think is the best approach and why.

Thanks!

Chris Cranford

unread,
May 5, 2022, 9:51:16 AM5/5/22
to debe...@googlegroups.com, Lucio Catinelli
Hi Lucio -

Does your implementation continue to use the client to watch for the changes that is also compatible with MongoDB 4.0?  If so, would you be willing to submit a Jira and send a pull request with the changes?

Thanks,
Chris
--
You received this message because you are subscribed to the Google Groups "debezium" group.
To unsubscribe from this group and stop receiving emails from it, send an email to debezium+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/debezium/2fbe7626-7ded-4951-b23a-62d0f6c49ae3n%40googlegroups.com.

Lucio Catinelli

unread,
May 5, 2022, 2:34:50 PM5/5/22
to debezium
Hi Chris,

Thank you for your response. My implementation adds a filter on the change stream similar to the operation filter ("Filters.in("operationType", getChangeStreamSkippedOperationsFilter())" in the link previously attached) but using "ns" as filter field. That field is available on mongo 4.0 change events (https://www.mongodb.com/docs/v4.0/reference/change-events/) and also in the latest version, that's why I'm pretty sure is compatible. The change adds a filter like this: "Filters.in("ns",Arrays.asList(nsList))". I tested that in mongo 4.2.5.
I can submit a Jira and send the pull request. Also I think the way "nsList" is generated can be improved but we can discuss that in Jira.

Thanks,
Lucio

Chris Cranford

unread,
May 5, 2022, 4:14:28 PM5/5/22
to debe...@googlegroups.com, Lucio Catinelli
That sounds excellent Lucio, looking forward to the contribution!

Gunnar Morling

unread,
May 6, 2022, 6:45:30 AM5/6/22
to debezium
+1 If we can do filtering server-side, that'd be awesome.

--Gunnar
Reply all
Reply to author
Forward
Message has been deleted
Message has been deleted
0 new messages