NATS Jetstream subject filtering

390 views
Skip to first unread message

Bensin Baby

unread,
Jun 5, 2024, 7:24:39 AM6/5/24
to nats
Hi,

I am new to NATS and interested in JetStream. I followed along with Jeremy's videos and created a single publisher and consumer, which have been working fine. I copied the same code to create two more consumers to identify subject filtering. Only one consumer is listening to a subject, while the other two consumers are listening to two different subjects. When I publish a message, I receive the same message on all three consumers. Below, I am pasting my code. Can anyone guide me or advise me on what I did wrong? My actual use case involves millions of devices publishing messages to JetStream with a few subjects, and consumer client applications consuming messages according to the subject.

publisher
---------------
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
        Name:        "orders",
        Description: "message from order",
        Subjects: []string{
            "orders.>",
            "payment.>",
        },
        MaxBytes: 1024 * 1024 * 1024,
    })
_, err = js.Publish(ctx, fmt.Sprintf("orders.ben.%d", i), []byte("Hello world"))

consumer
--------------------
stream, err := js.Stream(ctx, "orders")
    if err != nil {
        log.Fatal(err)
    }

    consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Name:          "order_processor",
        Durable:       "order_processor",
        FilterSubject: "orders.ben.*",
        //FilterSubjects: []string{
        // "orders.ben.*",
        },
    })


consumer 2&3
--------------------------
    consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Name:          "order_processor",
        Durable:       "order_processor",
        FilterSubject: "orders.ben.test",
        // FilterSubjects: []string{
        //  "orders.ben.test",
        // },
    })

    consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Name:          "order_processor",
        Durable:       "order_processor",
        FilterSubject: "payment.add.>",
    })

    cctx, err := consumer.Consume(func(msg jetstream.Msg) {
        log.Printf("Message Recevide ben %s ", string(msg.Subject()))
        msg.Ack()
    })

I have checked with both filtersubject and filtersubjects parameter .


Regards,
Bensin   


Scott Fauerbach

unread,
Jun 5, 2024, 11:27:42 AM6/5/24
to nats
This :  FilterSubject: "orders.ben.*"
will get messages from subject like orders.ben.foo and orders.ben.test


This :  FilterSubject: "orders.ben.test"
will get messages from only orders.ben.test

So it is expected that there be overlap where these consumers receive some of the same  messages. 

The next thing is that all the consumers have the same Name and same Durable. So if you create these in order, the 2nd replaces the 1st and the 3rd replaces the 2nd, so there is only one actual consumer which is actually consumer 3, and should only get payment.add.*

Considering that this looks like the simplification api, and having 3 instances of the same consumer, the server should actually round robin the delivery of messages. This could be an issue with the client. I would figure out your names first, then go from there.
Reply all
Reply to author
Forward
0 new messages