Re: [mongodb-user] c# Change streams and filter

1,039 views
Skip to first unread message

Robert Stam

unread,
Jan 3, 2018, 9:59:03 AM1/3/18
to mongod...@googlegroups.com
You are on the right track that the way to filter the change stream documents you receive is to pass a filtering pipeline to the Watch method.

Several things to consider:

1. Your filter is comparing "temp" to 30 (not 22)

2. Watch only returns a change document when something changes. It doesn't consider documents that are already in the collection when Watch is called

3. The documents returned by Watch are ChangeStreamDocuments, and your full document is embedded inside it so your filter query needs to take that into account.


I would use more type safe ways to build the pipeline and the filter, like this:
var pipeline = 
    new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>()
    .Match(Builders<ChangeStreamDocument<BsonDocument>>.Filter.Gte("fullDocument.temp", 22));

Note that the filter is testing the "fullDocument.temp" field (not "temp") because in a change stream document the original document is embedded in the fullDocument field.

You can also consume the results of a Watch more naturally using foreach instead of using the lower level enumerator approach:
var cursor = collection.Watch(pipeline, options);
foreach (var change in cursor.ToEnumerable())
{
    Console.WriteLine(change.ToJson());
}

Let me know if you have any further questions.

Robert


On Wed, Jan 3, 2018 at 8:08 AM, Daniel Moqvist <daniel....@gmail.com> wrote:
I cannot figure out how to set a match criteria on my collections watch method.

For example, in the following I want to get a change stream notification only if temp rises above 22.

            var client = new MongoClient("mongodb://localhost");
            var db = client.GetDatabase("test");
            var collection = db.GetCollection<BsonDocument>("test");

            var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
            var enumerator = collection.Watch(options).ToEnumerable().GetEnumerator();

            //Doesn't work...
            //var pipeline = PipelineDefinition<ChangeStreamDocument<BsonDocument>, ChangeStreamDocument<BsonDocument>>.Create(new[] { @"{$match: { 'temp': { $gte: NumberInt(30) } } }" });
            //var enumerator = collection.Watch(pipeline, options).ToEnumerable().GetEnumerator();
            
            while (enumerator.MoveNext())
            {
                var next = enumerator.Current;
                Console.WriteLine(next.FullDocument?.ToJson());
            }

and Documents in the database looks like this:
            {
                "name" : "room1", 
                "temp" : NumberInt(20)
            }
                       {
                "name" : "room2", 
                "temp" : NumberInt(25)
            }


Is this possible to do with the c# driver?


--
You received this message because you are subscribed to the Google Groups "mongodb-user"
group.
 
For other MongoDB technical support options, see: https://docs.mongodb.com/manual/support/
---
You received this message because you are subscribed to the Google Groups "mongodb-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user+unsubscribe@googlegroups.com.
To post to this group, send email to mongod...@googlegroups.com.
Visit this group at https://groups.google.com/group/mongodb-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/mongodb-user/3b3eab27-51d8-41c2-ac2b-5ec8af991199%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Dan Hanan

unread,
Jan 25, 2018, 2:08:00 AM1/25/18
to mongodb-user
Hi Robert - 

Adding to this thread since I am trying to do something very similar.  Brand new to Change Streams from MongoDB.local SoCal - very cool stuff.  Trying to use a Match in change stream Watch( ) method. 
Used your suggested syntax below and it's compiling and running, but no change events are fired.  

I have test data with 100 records, each with increasing _id values.  Then I have a test app that is updating random documents in place (definitely hits _ids throughout the whole range 1 to 100), just changing other properties on the documents.  When I call Watch( ) without the pipeline parameter, all the change events fire just fine.

Here is my relevant code.  See anything I'm doing wrong? 
Thank you for any help.
-Dan

PipelineDefinition<ChangeStreamDocument<TestData>, ChangeStreamDocument<TestData>> pipeline 
= new EmptyPipelineDefinition<ChangeStreamDocument<TestData>>()
.Match(Builders<ChangeStreamDocument<TestData>>.Filter.Gte("_id", 10));

...

foreach ( ChangeStreamDocument<TestData> change in coll.Watch( pipeline, watchOptions, cancel.Token ).ToEnumerable() )
{
Console.WriteLine( change.OperationType );
Console.WriteLine( change.FullDocument.ToJson() );
}




To unsubscribe from this group and stop receiving emails from it, send an email to mongodb-user...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages