Redis streams partitioning

1,985 views
Skip to first unread message

ey...@zebra-med.com

unread,
Aug 8, 2018, 5:42:25 AM8/8/18
to Redis DB
Hey,

Is there any solution that supports partitioning of Redis streams (same as in Kafka Streams)?
I'm implementing a streaming solution that is replicating real time data from MongoDB to Postgres and I'd like to use Redis streams for that.
I'm worried about out of order message ingestion resulting from different consumers that are processing the same message key.

Is there any solution available for this scenario?

Thanks

Dvir Volk

unread,
Aug 8, 2018, 6:10:39 AM8/8/18
to redi...@googlegroups.com
If I'm getting what you mean right, what you need is consumer groups.



--
You received this message because you are subscribed to the Google Groups "Redis DB" group.
To unsubscribe from this group and stop receiving emails from it, send an email to redis-db+u...@googlegroups.com.
To post to this group, send email to redi...@googlegroups.com.
Visit this group at https://groups.google.com/group/redis-db.
For more options, visit https://groups.google.com/d/optout.

ey...@zebra-med.com

unread,
Aug 8, 2018, 6:18:28 AM8/8/18
to Redis DB
I'm going to use consumer group, but as I see it,  it doesn't solve my problem.
Let's say I have N consumers in my group, each of them is reading a different chunk of the data.
If I have two messages that represent two updates of the same db record, I want to process them in the same order.
If two different consumers (of the same group) will process these two messages (which relate to the same record id) - I"m going to have a race condition.
In some cases, messages will be processed out of order (not the order they came into the stream).
If I'll be able to partition (by the record id) the stream, two consumers won't handle the same id  and I won't have this kind of issue.  

Sripathi Krishnan

unread,
Aug 8, 2018, 8:25:37 AM8/8/18
to redi...@googlegroups.com
You could just create n streams, and have 1 consumer per stream only. The publisher picks up the stream to publish using some logic on the record, so that all related records end up in the same stream. Then your consumer will process the messages sequentially. This way, you achieve parallelism by creating more streams. 

If a consumer crashes, that particular stream would be locked. You can then launch a new consumer and resume processing the messages. 

Would that work?



--Sri

ey...@zebra-med.com

unread,
Aug 8, 2018, 10:06:05 AM8/8/18
to Redis DB
Thank you for you reply, It would work, yes.
But it seems to me that managing the partitioning  by myself would require a lot of overhead.
What would happen if I want to scale up the number of consumers?
- Adding an extra stream
- Recalculate the hashing formula 

Pedro Melo

unread,
Aug 10, 2018, 3:20:07 AM8/10/18
to redi...@googlegroups.com
Hi,

I have the same use case, supporting changelogs between different systems, where the order of processing of messages must be strict using a partition key.

And no, you cannot do that with true current consumer groups implementation.

While would like to see that in the future, i hope it shows up as a new X command. To have a decent system with logical partitioning, you have to have re-partitioning built in when a new consumer joins the consumer group, and this introduces stop-the-world sync points. Not a deal breaker, but needed.

For now the current consumer groups are very practical only on situations where the destination system can detect out-of-order messages, although recovering from those situations will always be tricky...

Bye


Sent from my iPhone

Salvatore Sanfilippo

unread,
Aug 10, 2018, 5:57:04 AM8/10/18
to redi...@googlegroups.com
Hello Pedro,

I'm not sure I really understand the underlying problem here, even if
I'm sure it is clear to people that implement this pattern. So if you
are willing to show me it in general terms with a real world example
what happens in this use case, I'm very interested to analyze the
problem to see how Redis could solve it. Thanks!
Salvatore 'antirez' Sanfilippo
open source developer - Redis Labs https://redislabs.com

"If a system is to have conceptual integrity, someone must control the
concepts."
— Fred Brooks, "The Mythical Man-Month", 1975.

ey...@zebra-med.com

unread,
Aug 10, 2018, 8:33:18 AM8/10/18
to Redis DB
Hey Salvatore,

In my case, use case is as follows:
1. Replicate source db to target db in near real time
2. Ensure that target db is updated in the same update order of source db

If I'm using consumer groups, how can I ensure that the same order of messages coming into the stream will be processed in the same order if I'm using N consumers in my group.  

Thanks

Nick Farrell

unread,
Aug 11, 2018, 2:13:05 AM8/11/18
to Redis DB
Store the message ID in your database along with the payload and use a SQL rule/trigger to atomically drop upserts if the record with the same key exists and has a newer message ID. No need to sync your consumer groups.

Pedro Melo

unread,
Aug 13, 2018, 8:46:30 AM8/13/18
to redi...@googlegroups.com
Hi Salvatore,

I’ll try to make the case as simple as possible. One of our use cases deals with orders (as in eCommerce orders from clients), and updates to them. Another use case is with products and stock updates to them. In the latter we have updates saying “stock is now X for product Y”. In the former we have updates “order Y was paid”, or “order Y was shipped”.

In this situations (we use Kinesis but it could be Kafka, same concepts), we pick a partition key (Order ID or SKU both work), and all messages for each of them will be placed on the same partition. We then have a single consumer per partition.

This makes sure that all the messages for the same “entity” will be processed in the correct order by the same consumer. Scalability is achieved by adding more partitions (and therefore more consumers). Basically, the messages are partitioned by the producer by stating which attribute of the message is to be used as partition key.

With Redis streams consumer groups, the distribution of messages is done round-robin between all the consumers, without respect of order inside the stream. This is the exact same behaviour as NSQ (see https://nsq.io/overview/design.html#simplifying-configuration-and-administration - “topics” in NSQ are the Redis keys, and “channels” are the “consumer groups”) and it works **great**, we’ve been using NSQ for more than 5 years now (and we will consider replace NSQ with Redis Streams because inside the use case of round-robin delivery, Redis streams offer much more control of memory usage than NSQ).

But this is not enough if you need order of messages per entity per stream, like we have on Kafka or Kinesis. So not a replacement for those in this particular use case.

Let me try and show the problem more visually. In the diagrams below you see messages like “<entiy_id:version>”. The “entity_id” is what we use for partition key (Order ID or SKU) and “version" is something that can help us detect out-of-order messages, it will be a monotonic increasing counter per entity ID (the best solution) or a simple microsecond timestamp.

If you have 2 partitions with one consumer each, the following stream:

<1:1> <2:1> <3:1> <3:2> <2:2> <4:1> <1:2> <1:3>

Should be delivered as such:

Partition 1: <1:1> <3:1> <3:2> <1:2> <1:3>
Partition 2: <2:1> <2:2> <4:1>

With the current implementation of Redis stream consumers groups.

Consumer 1: <1:1> <3:1> <2:2> <1:2>
Consumer 2: <2:1> <3:2> <4:1> <1:3>

Notice that there are three race conditions between <3:1> and <3:2>, <2:1> and <2:2> and <1:2> and <1:3>. It will all depend on how fast each consumer is processing each message.

Further points that we are aware:

* yes, our system sometimes makes each partition unbalanced, with one of them with more messages than the others: we know, but with sufficient number of partitions this is not really a real problem, the distribution is pretty normal. We can monitor for this, and if we want to “fix” it, we can always re-partition a topic. It is not a guarantee that it will fix it, but usually this is not a problem.
* yes, a round-robin *usually* would work better as a load-balancer but for us, keeping each shard/partition been processed in a timely fashion is a operation issue that we can easily monitor and adjust, it is a capacity planning issue, and easily solved;
* yes, if all messages were idempotent, we could process them out-of-order: we do not control some of our message production, so we cannot guarantee that;
* yes, we can detect out-of-order messages with “version” and use the destination database to order them. We do that in some cases, but you end up using the destination SQL (or other) DB as a “dead-letter” or “mini-ordering-queue”. Depending on the amount of message traffic (we see upwards of 10k messages per second on some flows) this is not a viable solution, and it is a workaround for the capabilities of the queueing system we are using. I rather make sure the queueing system is good enough to cope with this;
* yes, we can detect out-of-order messages with “version” and we could “push them back into the queue to reprocess”, but this causes more problems than it solves because you might be pushing version 2 to the end after version 3 and 4 are already there, so messages 3 and 4 will also be pushed back. To illustrate:

You have stream 1, 2, 3, 4 with other messages in between (not shown). If for some reason the actual stream ends up being consumed as 2, 1, 3, 4 and we push message 2 to the end after we get it and notice that we haven’t seen message 1, you’ll end up with 1, 3, 4, 2 and it will most likely trigger the same issue with 3 and 4. Yes it will eventually work out, but with a lot of re-queueing.


I hope this makes our use case clear. Here are some links regarding the same concepts on both Kinesis and Kafka:

https://simplydistributed.wordpress.com/2016/12/13/kafka-partitioning/ — the key takeaway "Kafka ensures strict ordering within a partition i.e. consumers will receive it in the order which a producer published the data to begin with”;
https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key — concepts, and in particular the partition-key concept;
* I never used Nats Streaming but there might be good stuff on comparing with it… https://www.nats.io/documentation/streaming/nats-streaming-intro/

One final note: we do have use cases where the current behaviour is more than enough and already an improvement over other solutions (like NSQ topics and channels), but not where we need strict ordering. There, Kinesis and Kafka are still better for this particular uses cases.

Salvatore, hope this is enough to explain it, please tell me if you need further clarification. And do enjoy your vacation :)

Bye,

Sripathi Krishnan

unread,
Aug 13, 2018, 11:41:31 AM8/13/18
to redi...@googlegroups.com
You are equating 1 Kafka stream to 1 Redis stream. Try to compare 1 Kafka partition to 1 Redis stream.
  1. In Kafka, producer is responsible for partitioning data. You could do the same in redis, the producer decides which redis stream to write data to
  2. In Kafka, there is 1 consumer per partition. If you have 1 consumer per shard in redis, you get the exact same behaviour
  3. In Kafka, to scale, you add more partitions and corresponding consumers. You can do the exact same thing in redis.
Earlier, eyalh suggested drawbacks of this approach, and I wanted to address them.

- What would happen if I want to scale up the number of consumers?
- Adding an extra stream
- Recalculate the hashing formula 

It is possible to add new consumers without much disruption. Start off with say 48 streams, but only have 4 consumers. Producers use a hashing formula to distribute to 48 streams. Initially you have 4 consumers - and each of them calls the XREAD command with 12 keys. Redis guarantees (?) that messages from each of the 12 streams will be in the order they were inserted - so you will still process orders or transactions in the right application specific order. 

Using the above approach - when you want to add more customers - simply change the XREAD command to query fewer keys. The producers do not need to change at all. 

I am not sure of the internals of streams - perhaps it's okay to have a consumer blocking on more than 12 keys without a degrade in performance. 

My general understanding relate to streams is (I could be wrong): 
  1. Use 1 stream -> 1 consumer when you want to process in order
  2. Use N streams -> N consumers when you want to scale, producers decide on sharding logic
  3. Use 1 stream -> N consumers part of the same consumer group when you want to load balance and are okay to process messages out of order.

--Sri

Pedro Melo

unread,
Aug 14, 2018, 3:14:21 AM8/14/18
to redi...@googlegroups.com
Thank you Sri, you logic is sound and I think it will solve this nicely.

I’ll think more about it to be sure but yes I think your design is the correct way that go for us.

Best regards,

Sent from my iPhone

Salvatore Sanfilippo

unread,
Sep 14, 2018, 10:35:40 AM9/14/18
to redi...@googlegroups.com
Hello Pedro, Sripathi,

thank you both for explaining use cases and patterns. As a community
we need to develop a "culture" around the new tool we have, Streams in
that case, so this process is very useful.
Pedro is perfectly correct that in his use case, the messages would be
processed out of order using a single key, because indeed a single
stream with an associated consumer group only cares about load
balancing efficiently.
It's useless to repeat that, but I just want to stress how much
superior is this approach (later on the solutions about ordering): not
only single consumers can fail (this is a huge advantage IMHO) and the
system is still able to recover, but this is also a special case of
the general case of consumers processing at a difference pace.

I also completely agree with Sripathi about the fact that partitions
in Kafka are a lot more like just different keys in Redis Streams,
even if, to be honest, in that case the user is left handling things a
bit more manually. The consumer groups as a distribution mechanism is
*very* server-side. While orchestrating the partitioning manually
requires the logic to be at least partially on the client.
However note how this design is somewhat imposed by the nature of
Redis: anyway at some point you want to partition to N keys because
otherwise you are hitting always the same server, which is definitely
a problem when it comes to scalability.

However Sripathi here introduces a concept that I used in the past but
I never figured it could be useful in this context, which is "pre
sharding". This looks like an interesting idea for real world designs
and operations. Btw listening to a few tens of keys per client is
perfectly supported and is not going to be a CPU issue. The work Redis
needs to do when serving a consumer is not *directly* related to the
number of streams a given consumer is monitoring. However an O(N)
(very small in constant times) cost will be payed every time we
re-block for N keys, and unblock back, but while we are blocked the
cost is the same as blocking for a single key from the POV of the
producer. In general, it is possible to block for 30, 40 or 100 keys
without issues, unless there are 100k consumers connected in the same
instance...

About the question asked by Sripathi: yes, within the same stream,
there is the guarantee of ordering. In case of reboots or failover
however, while Redis propagates everything to replicas and AOF as it
happens, all the guarantees are limited to the ones that Redis can
provide from the POV of the data consistency. So for instance if we
are using just RDB files and we reboot the instance, the consumer
group state may be stale.

I've the feeling that this discussion should be reprocessed to be
inserted inside the documentation, specifically in the Streams
tutorial. I'll try to do that.

Thank you for this thread,
Salvatore

Salvatore Sanfilippo

unread,
Sep 14, 2018, 10:43:28 AM9/14/18
to redi...@googlegroups.com
I tried to incorporate certain ideas of this post in the Stream doc:

+We could say that schematically the following is true:
+
+* If you use 1 stream -> 1 consumer, you are processing messages in order.
+* If you use N stream with N consumers, so only a given consumer hits
a subset of the N streams, you can scale the above model of 1 stream
-> 1 consumer.
+* If you use 1 stream -> N consumers, you are load balancing to N
consumers, however in that case, messages about the same logical item
may be consumed out of order, because a given consumer may process
message 3 faster than another consumer is processing message 4.
+
+So basically Kafka partitions are more similar to using N different Redis keys.
+While Redis consumer groups are a server-side load balancing system
of messages from a given stream to N different consumers.

Sripathi Krishnan

unread,
Sep 15, 2018, 5:44:53 AM9/15/18
to redi...@googlegroups.com
Most distributed queue-like systems have a concept of sharding, and the decision on how to shard is left to the producer. 

To take a few examples - 
  1. AWS SQS FIFO queues have a concept called message group id. Producers are required to explicitly set this field in every message. The message group categrizes messages into groups. SQS FIFO queues can only guarantee within a particular message group.
    From documentation:
    >> The tag that specifies that a message belongs to a specific message group. Messages that belong to the same message group are always processed one by one, in a strict order relative to the message group (however, messages that belong to different message groups might be processed out of order).
  2. AWS Kinesis Data Streams has a concept called "Partition Key". The partition key decides the shard that a message ends up in. And a shard is the unit of throughput. Messages within a shard are processed in order.
Summary - in all popular systems, the client producer has to tell the system the basis for sharding/partitioning the data. 

The only difference between redis streams and other systems is how you specify the partition key. In redis, you just directly choose the redis key for the stream. In other systems, an explicit field in the message is hashed and then the system picks up the shard / partition. 

IMO - just building a library on top of redis streams will eliminate the minor annoyance for producers and consumers.

--Sri

Pedro Melo

unread,
Sep 16, 2018, 4:35:12 AM9/16/18
to redi...@googlegroups.com
Exactly... we use kinesis and chose the partition key carefully to make sure that consumers will not have issues processing messages from multiple partitions out of order between them.

For example, SKU is a good partition key because each SKU is independent from the point of view of the supply chain software. Order ID is algo a good key, for most consumers.

This is a good trade-off, “pre-sharding”, and it works well.

Salvatore, the updates to the documentation look good.

Thanks 

Sent from my iPhone

Krushita Shah

unread,
Oct 4, 2018, 2:45:32 PM10/4/18
to Redis DB

Reading https://groups.google.com/forum/#!topic/redis-db/td-aPJKycH0 I am still confused on what to conclude.

  • If you use N streams with N consumers, so only a given consumer hits a subset of the N streams, you can scale the above model of 1 stream -> 1 consumer.

So, if the producer is creating and posting to N different streams, and the consumers (say less than N) within the consumer group are guaranteed to receive a message from non-pending stream only, it looks like ordered processing per stream is possible. Can you clarify?

Reply all
Reply to author
Forward
0 new messages