An attempt to isolate state mutation in distributed systems via node affinity

117 views
Skip to first unread message

Alex Robson

unread,
Jan 27, 2014, 11:05:52 PM1/27/14
to distsys...@googlegroups.com
Hello all,

## Scenario
I am working on a very gradual migration of an n-tier system to something decoupled and distributed. There are many reasons why I have been tasked to do so, and scale/perf isn't the only consideration. There are some constraints and limitations but also some freedom to gradually introduce new things where warranted.

That said, here are the pieces of the system as it sits today. Sorry this isn't more concise, you can probably skip down to the problem section.

### Language
A language runtime with shared state mutability and parallel execution via threading.

### Use of messaging - RabbitMQ
We're using RabbitMQ. Our queues are bounded, we exhibit back pressure on message producers and we monitor ingress and egress both in the cluster and at each node.

### General Approach - Event Sourcing
In cases where dimensions are changing frequently, we're using an event sourcing model. Events are stored separately from the last snapshot/baseline and replayed/applied against it to arrive at the best approximation of the current state. After X events, a new snapshot is created to place an upper bound on the number of events that would need to be read and replayed for any operation.

### Ids - Flake
We're using Flake-style ids for our events and our snapshots so that we have relative ordering and some protection against NTP shenanigans.

### Vector clocks
We use vector clocks on the snapshots. In the event that a divergent replica is created, we can detect this on read and repair it.

### Problem
Without a coordinating mechanism, distributed systems that follow this pattern seem to allow any node to process an event for any snapshot. This at least makes it possible that two separate nodes could be processing two separate events for the same snapshot at the same time.

Our system does not require a precise total ordering of events; only that events must happen in *some* serial order to produce a valid outcome. In other words, we can allow events to play in the order they hit the queues without obsessing over time-based-ordering-exactitude.

Allowing a system to evaluate events against one snapshot in parallel would seem to significantly increase the number of divergent replicas in the system even in the absence of network partitions. Put another way; parallel mutation of state makes the system far more difficult to reason about and increases the overhead required to resolve divergent replicas. (hi, I'm Captain Obvious?)

### Local Isolation via Affinity
Currently, we prevent this parallel mutation at the local level by assigning items to work queues based on a consistent hash of the snapshot's id. Each queue has a single worker thread so there is never any chance that events meant for a snapshot can end up being processed simultaneously. This allows us to satisfy the requirement (locally) that events must play in some serial order while preventing lots and lots of read-repairing due to incompatible parallel mutations.

### Distributed Isolation via Affinity
I would like to find some way to apply isolation at the system level. We have experimented with message routing by consistent hashing the snapshot id so that events for the same snapshot will end up on the same queue. The problem is that we cannot use competing consumers on these queues and if a consumer kicks the bucket/loses connectivity, we end up with orphaned messages in that queue. While there are some options we could pursue, they seem a bit more hackish than I am comfortable with and off the beaten path. I didn't come up with/invent any of these other aspects on my own, they seem to be widely used and vetted. I find that it's best if I don't originate distributed archiecture unsupervised; I'd much rather discover an established approach to this.

What I am hoping for is some guidance that goes something like, "Duh, you just need to read book/paper X" or "Isolation via affinity is not a thing, there is a different concept called Y, go read about that". Hopefully I'm not so far off in "doing it wrong" territory that everything must be scrapped because I've missed a fundemental principle, but even then, I'd rather know it sooner than later.

Thanks all,
Alex

Clemens Vasters

unread,
Jan 28, 2014, 3:03:28 AM1/28/14
to distsys...@googlegroups.com

Hi Alex,

 

that set of requirements is very similar to what we in our team (Windows Azure/Server Service Bus) got when the Workflow team for SharePoint came to us asking for a messaging foundation. I’m not going to try to sell you anything, but rather capture some of the issues you have below and explain how we approached those. You can solve some of this with extra scaffolding around the messaging system, but having the features integrated is more comfy

 

·         Your model of having one thread per queue and relying on that for ordered processing couples your consumers to decision made by the senders in terms of how far you can scale out. That’s not particularly healthy.

·         That model also gets you into hot water once your receiver crashes for any reason, assuming you use a peek/lock (receive/ack) model. In that case, the next incarnation of your receiver (which is a competing consumer from the queue’s perspective) will skip that message while it’s locked.

·         You also have issues with associating processors with jobs that span messages (isolation as you call it) and keeping track of processing state in case of failures as you speak of potential for orphaned messages.

 

We’re addressing that hairball of problems with two related features in our broker:

 

·         We have the notion of sessions. A session is effectively a job-id that you choose and set on a message submitted into the broker. Those sessions are unbounded in the broker. Sessions are a multiplexing mechanism that enforce order. Instead of calling Receive, you call (nod to Berkeley Sockets) ‘AcceptMessageSession’ which gives you a receiver object that will exclusively deliver messages with the same session-id. You can also ask for a receiver for a specific session-id. The system provides an assurance that nobody else can get at messages with the same session-id except that receiver, but you can have competing consumers for sessions. Furthermore, the locks on sessions and messages are synchronized, so that if you drop the ball, you won’t regain control of the session before the lock on the last fumbled message expires, which provides in-order delivery assurance on that session.

·         Sessions can have session state. We have a utility function on the session receiver that allows you to make an message-sized binary annotation on the session. That can be done in a transaction with completing/acknowledging messages. This means that you can track progress of work on a state machine (workflow) as you process messages and the next receiver that picks up after failover or that picks up after the processor goes to sleep (some workflows last for months) will know what the state is. Turns out that SharePoint Online’s many million of concurrent workflow instances track their core progress state that way. The game Halo 4 manages the flow of stats from the Infinity Spartan Ops and Multiplayer games that way.

 

As a result of these two features you get to scale out the backend independent of how the jobs are submitted since you get competing consumers across multiplexed sessions. You also get in-order assurance and job isolation.

 

I don’t know whether there are other brokers doing this. I don’t think Rabbit does. Our caveat for the time being is that sessions don’t yet work over AMQP (which is a scheduling issue more than anything else, I expect that this year), but only over our SBMP protocol that is tied to the CLR (hence the AMQP push)

 

Cheers

Clemens

--
You received this message because you are subscribed to the Google Groups "Distributed Systems" group.
To unsubscribe from this group and stop receiving emails from it, send an email to distsys-discu...@googlegroups.com.
To post to this group, send email to distsys...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/distsys-discuss/f08eea1d-c81f-4faf-a9e9-4e8065c69a34%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Alex Robson

unread,
Jan 28, 2014, 10:00:42 AM1/28/14
to distsys...@googlegroups.com
Hi Clemens,
Thanks for your response. I failed to clarify the difference between the concept of a local work queue and the message queues in our broker. Our local work queues don't directly correlate to the broker's queues at all. We have multiple worker threads at the node level, we just don't allow multiple threads processing messages for the same "session id", to use terminology from your example. I think you must be doing the same thing, otherwise having a selective consumer per session wouldn't give your serial/linear message processing. 

I think it's very cool that you built the capability to selectively consume messages based on some larger context. While I believe AMQP 0-9 or 0-9-1 protocol has the idea of selective consuming on a queue using a filter argument when establishing a consumer, I do not think RabbitMQ actually supports this. We use correlation id on the messages somewhat like you're using session id except we don't have the means to guarantee exclusivity based on that alone. 
Your response is appreciated and I totally agree that having the features in the broker makes things more comfortable.

Thanks,
Alex 

Sent from Mailbox for iPad


--
You received this message because you are subscribed to a topic in the Google Groups "Distributed Systems" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/distsys-discuss/ehznRCNiQew/unsubscribe.
To unsubscribe from this group and all its topics, send an email to distsys-discu...@googlegroups.com.


To post to this group, send email to distsys...@googlegroups.com.

Reply all
Reply to author
Forward
0 new messages