How to consume latest messages from Redis topic

1,273 views
Skip to first unread message

Bao Quach

unread,
Sep 7, 2020, 8:48:12 AM9/7/20
to lettuce-redis-client-users
Hi All

I have a use case where multiple visualization tools need to consumer the latest same stream topic message for multiple consumer group channel on Redis 

Can you please let me how to consume latest messages from Redis topic using syncCommands.xreadgroup(consumer, streamOffset)

I am getting an exception of

Caused by: io.lettuce.core.RedisCommandExecutionException: ERR The $ ID is meaningless in the context of XREADGROUP: you want to read the history of this consumer by specifying a proper ID, or use the > ID to get new messages. The $ ID would just return an empty result set.

When I execute the following code below.

Here is my code

XReadArgs.StreamOffset  streamOffset = XReadArgs.StreamOffset.latest(this.streamTopic);

List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(consumer, streamOffset);

syncCommands.xgroupCreate(streamOffset, this.groupTopic, new XGroupCreateArgs());

List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(consumer, streamOffset);

 
Thank you for your help

Regards, Bao

Mark Paluch

unread,
Sep 7, 2020, 9:43:22 AM9/7/20
to lettuce-redis-client-users
This question would be probably better suited for the Redis mailing list.

From what I understand, you want to introspect the stream. You can obtain the first/last message Id via XINFO STREAM and XRANGE to get hold of the message without actually consuming it.

Cheers, 
Mark

Bao Quach

unread,
Sep 7, 2020, 10:11:35 AM9/7/20
to lettuce-redis-client-users
Hi Mark

I have a Java Microservice collecting nearly real-time traffic data vehicles movement from Sensor or detectors, it then publish to Redis message bus with topic of "vehicles-rt-data", where there are many Java clients application  subscribe the stream topic "vehicles-rt-data" with unique channel group and consuming the latest message to show real-time traffic movement on the graph or  in visualization tools.

I am using lettuce for publish and subscribe message on Redis message bus.

Regards, Bao

Max Grigoriev

unread,
Sep 7, 2020, 10:23:59 AM9/7/20
to lettuce-redis-client-users
You should create a stream at first and then create a consumer group. You can make it in one call:
```
reactiveCommands.xgroupCreate(
XReadArgs.StreamOffset.from("mystream", "$"), "my-group-1", new XGroupCreateArgs().mkstream(true))
.onErrorResume(err -> err instanceof RedisBusyException && "BUSYGROUP Consumer Group name already exists".equals(err.getMessage()), err -> Mono.just("group exists"))
.doOnNext(result -> log.info("Result of {} group creation: {}", zoomConfiguration.getGroupName(), result))
.block(Duration.ofSeconds(10));
```

and then you can read using `xreadgroup` but you should have a consumer group per your tool to be able to receive messages for each tool.

Redis has a detailed doc about streams.

Bao Quach

unread,
Sep 7, 2020, 7:42:08 PM9/7/20
to lettuce-redis-client-users
Hi Max

I am getting an exception of

Caused by: io.lettuce.core.RedisCommandExecutionException: ERR The $ ID is meaningless in the context of XREADGROUP: you want to read the history of this consumer by specifying a proper ID, or use the > ID to get new messages. The $ ID would just return an empty result set.

Here is my code
XReadArgs.StreamOffset streamOffset = XReadArgs.StreamOffset.from(this.streamTopic, "$"); // XReadArgs.StreamOffset.latest(this.streamTopic);
getRedisReactive().xgroupCreate(streamOffset, this.groupTopic, new XGroupCreateArgs().mkstream(true));

Consumer consumer = Consumer.from(groupTopic, consumerName);
Flux<StreamMessage<String, String>>  messages2 =  getRedisReactive().xreadgroup(consumer, streamOffset);
List<StreamMessage<String, String>> first = messages2.collectList().block();

It seem that  using offset of "$" ID does not work in the context of XREADGROUP ?


Regards, Bao

Max Grigoriev

unread,
Sep 8, 2020, 6:21:37 AM9/8/20
to lettuce-redis-client-users
Hi Bao,

#1 `getRedisReactive().xgroupCreate(streamOffset, this.groupTopic, new XGroupCreateArgs().mkstream(true));` isn't invoked. You should subscribe or `.block(Duration.ofSeconds(10))` 

#2 `xreadgroup` needs its own offset. Here's an example.

val readEventArgs = XReadArgs.Builder.count(1).block(Duration.ofSeconds(5));
val readOffset = XReadArgs.StreamOffset.from(eventStreamName, ">");

readEventsReactiveCommands.xreadgroup(consumer, readEventArgs, readOffset)
  .doOnNext(message -> log.info("Reading a new event {} from {} stream", message, eventStreamName))
  .concatMap(this::processMessage)
  ...
  .subscribe();

Reply all
Reply to author
Forward
0 new messages