Martin's tweet on actor implementation

1,473 views
Skip to first unread message

Rajiv Kurian

unread,
Dec 31, 2013, 5:49:52 AM12/31/13
to mechanica...@googlegroups.com
Saw this tweet from Martin on implementation of Actor systems:
"I don't think the Disruptor is the best choice :-) . A mesh of point-to-point memory mapped files - Longer story."

Any more details Martin?

Martin Thompson

unread,
Dec 31, 2013, 6:16:18 AM12/31/13
to mechanica...@googlegroups.com
The real value of the Disruptor is in supporting a mostly static graph of dependencies between event processors, or multicasting an event from a single producer to multiple consumers. It can be used as a replacement for a single queue but this is not the sweet spot for its usage. When I developed the performance tests for the Disruptor I deliberately highlighted some of these configurations.

An actor based system will have actors come and go dynamically and dispatch needs to be the most efficient mechanism possible. I prefer to not use a mailbox per actor model and instead have a dispatcher per core with a mesh of 1P1C queues backed by memory mapped files. By memory mapping the queues, the queues then reside outside the Java heap and thus remove the GC pressure from Java and allow multiple language interop.

I believe we are long overdue an actor platform developed with mechanical sympathy for modern processors and memory subsystems. IMHO any actor platform that allows blocking calls or futures has made the most fundamental mistake :-) 

Darach Ennis

unread,
Dec 31, 2013, 6:25:25 AM12/31/13
to mechanica...@googlegroups.com
Erjang (Erlang on the JVM) possibly offers a nice environment to test out mechanically sympathetic actor models.
You should mention the 1P1C mesh idea to Kresten. I think Nitsan has had similar structure of 1P1C memory mapped
file based queue notions too, but not actor model related AFAIKR.

There's a crew in NYC considering building an actor model implementation in Go which would also be interesting...


--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Martin Thompson

unread,
Dec 31, 2013, 6:51:42 AM12/31/13
to mechanica...@googlegroups.com
A few decades back I wrote quite a bit of Prolog and enjoyed it but found many others struggled with it. I love the principles behind Erlang but cannot see it becoming mainstream as a language. B-) We need good actor support in more common languages for mass adoption.

Yeah Nitsan and I have had similar discussions. I've built such systems and know they perform very well. If a good off the shelf product existed then I'm sure many would be saved from rolling their own.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-sympathy+unsub...@googlegroups.com.

Darach Ennis

unread,
Dec 31, 2013, 6:54:40 AM12/31/13
to mechanica...@googlegroups.com
It's a great idea even if it is applied to only the more common languages! Ha ha! :)


To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.

Peter Lawrey

unread,
Dec 31, 2013, 6:58:07 AM12/31/13
to mechanica...@googlegroups.com
I am working on an nPnC memory mapped topic or queue, however you can lock individual memory mapped records between processes on the same machine (by using off heap locks)


Here is an example of how you can have only one consumer for each message.
ExcerptTailer tailer = chronicle.createTailer();
int threadId = AffinitySupport.getThreadId();

// in a busy loop, check there is an excerpt and this is the only consumer.
if (tailer.hasNext() && tailer.compareAndSwapInt(0L, 0, threadId)) {
    tailer.position(4); // skip the lock.
    // binary format
    long v = tailer.readXxxx();
    String text = tailer.readEnum(String.class); // read a UTF-8 String using a string pool.
    // text format
    long x = tailer.parseLong();
    double y = tailer.parseDouble();
    tailer.finish();
}

I am working on more tests to show how this works across processes producing and consuming (not just threads)

Peter.

Rüdiger Möller

unread,
Dec 31, 2013, 10:38:32 AM12/31/13
to mechanica...@googlegroups.com
We use a somewhat similar approach, but address a bigger picture (incl. de/encoding + msg dispatch + LVC). I had best real world results when using 1 thread per sender + (depends: one thread for processing, requires another Q). Though it might not be the absolute optimum in theory, it provides a nice and manageable abstraction (1:N RPC instead of "Messaging" metaphor). Localhost p2p traffic is done by replacing a socket with an offheap queue via mmaped files (which saves complexity as application code does not have to care wether its doing network or local messaging) which is suboptimal ofc.

Rajiv Kurian

unread,
Dec 31, 2013, 2:28:47 PM12/31/13
to mechanica...@googlegroups.com
I actually raised this idea in the Akka dev list once (not the memory mapped queues part).The complication with having a mesh of 1P1C queues which are used to communicate between a lot of actors is work stealing. With anecdotal evidence I have found that static scheduling (assuming that's what you meant) doesn't work with general frameworks where some actors will take more time to process messages than others (image processing actor vs looking up some values from a map). The work across cores will inevitably get imbalanced if we just statically schedule actors to cores. For example if say core 1 is busy and core 2 is out of work we will need to:
i) Design an efficient way to migrate some of the work from core 1's incoming 1P1C queues (one per other core) to core 2.
ii) Remember which actors' messages have been handed out to other idle cores so when we encounter messages for them later (on core 1) we don't process them till the stealing core is done with it's stolen cache of messages and the results have been made visible to core 1. This is to maintain sequential consistency.
iii) Revisit the skipped over messages later to deal with them.

This is just one possibility. Maybe there are better ways. The one mailbox per actor just makes it easy (not efficient) to handle work stealing where an idle core just steals a mailbox. The price to pay is MPSC queues and lack of locality among others.

Martin, do you have any ideas for work stealing in a mesh of 1P1C queues setup or some ways of avoiding it all together? That would make this design really attractive and surely be a boost to performance for actor systems without any loss of semantics.

Thanks!

Rajiv Kurian

unread,
Dec 31, 2013, 2:33:44 PM12/31/13
to mechanica...@googlegroups.com


On Tuesday, December 31, 2013 3:25:25 AM UTC-8, Darach Ennis wrote:
Erjang (Erlang on the JVM) possibly offers a nice environment to test out mechanically sympathetic actor models.
You should mention the 1P1C mesh idea to Kresten. I think Nitsan has had similar structure of 1P1C memory mapped
file based queue notions too, but not actor model related AFAIKR.

There's a crew in NYC considering building an actor model implementation in Go which would also be interesting...
Not sure if Go is a great way to build such systems as it currently stands. There isn't any way to opt out of the go-routine runtime. Using channels as they stand right now wouldn't be a great idea either since they are designed for MPMC scenarios and the implementation is chock full of mutexes - there was a patch by Dmitry Vyukov to change this, not sure if it got merged. Also last time I checked the support for C++11 like atomics (std::acquire and std::release) required to build efficient 1P1C queues was also lacking in the std library.


On Tue, Dec 31, 2013 at 12:16 PM, Martin Thompson <mjp...@gmail.com> wrote:
The real value of the Disruptor is in supporting a mostly static graph of dependencies between event processors, or multicasting an event from a single producer to multiple consumers. It can be used as a replacement for a single queue but this is not the sweet spot for its usage. When I developed the performance tests for the Disruptor I deliberately highlighted some of these configurations.

An actor based system will have actors come and go dynamically and dispatch needs to be the most efficient mechanism possible. I prefer to not use a mailbox per actor model and instead have a dispatcher per core with a mesh of 1P1C queues backed by memory mapped files. By memory mapping the queues, the queues then reside outside the Java heap and thus remove the GC pressure from Java and allow multiple language interop.

I believe we are long overdue an actor platform developed with mechanical sympathy for modern processors and memory subsystems. IMHO any actor platform that allows blocking calls or futures has made the most fundamental mistake :-) 

On Tuesday, 31 December 2013 10:49:52 UTC, Rajiv Kurian wrote:
Saw this tweet from Martin on implementation of Actor systems:
"I don't think the Disruptor is the best choice :-) . A mesh of point-to-point memory mapped files - Longer story."

Any more details Martin?

--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-sympathy+unsub...@googlegroups.com.

Michael Barker

unread,
Dec 31, 2013, 6:04:50 PM12/31/13
to mechanica...@googlegroups.com
Not sure if Go is a great way to build such systems as it currently stands. There isn't any way to opt out of the go-routine runtime. Using channels as they stand right now wouldn't be a great idea either since they are designed for MPMC scenarios and the implementation is chock full of mutexes - there was a patch by Dmitry Vyukov to change this, not sure if it got merged. Also last time I checked the support for C++11 like atomics (std::acquire and std::release) required to build efficient 1P1C queues was also lacking in the std library.

Go doesn't have a sophisticated memory model ala Java/C++.  The constructs available (http://golang.org/ref/mem) for guaranteing ordering and visibilty across threads aren't particularly useful for building custom lock-free concurrent structures.  My take is that if Go's concurrency model (channels and goroutines) works for you application then great, if you want a different model and/or implementation for passing data between threads then pick a different tool.

Mike.
 

Martin Thompson

unread,
Jan 1, 2014, 6:27:46 AM1/1/14
to mechanica...@googlegroups.com
On 31 December 2013 19:28, Rajiv Kurian <geet...@gmail.com> wrote:
I actually raised this idea in the Akka dev list once (not the memory mapped queues part).The complication with having a mesh of 1P1C queues which are used to communicate between a lot of actors is work stealing. With anecdotal evidence I have found that static scheduling (assuming that's what you meant) doesn't work with general frameworks where some actors will take more time to process messages than others (image processing actor vs looking up some values from a map). The work across cores will inevitably get imbalanced if we just statically schedule actors to cores. For example if say core 1 is busy and core 2 is out of work we will need to:
i) Design an efficient way to migrate some of the work from core 1's incoming 1P1C queues (one per other core) to core 2.
ii) Remember which actors' messages have been handed out to other idle cores so when we encounter messages for them later (on core 1) we don't process them till the stealing core is done with it's stolen cache of messages and the results have been made visible to core 1. This is to maintain sequential consistency.
iii) Revisit the skipped over messages later to deal with them.

This is just one possibility. Maybe there are better ways. The one mailbox per actor just makes it easy (not efficient) to handle work stealing where an idle core just steals a mailbox. The price to pay is MPSC queues and lack of locality among others.

Martin, do you have any ideas for work stealing in a mesh of 1P1C queues setup or some ways of avoiding it all together? That would make this design really attractive and surely be a boost to performance for actor systems without any loss of semantics.

I find it is best to initially statically allocate with some consideration to special case actors, e.g. large compute jobs or IO.  Rebalancing needs to be based on both queue length and drain rate per dispatcher but also importantly which actors have more frequent communications so they get co-located. This can be achieved by having a central load balancer that can put dispatcher queues into an administration mode to rebalance. This allows the 1P1C semantics to be preserved without the overhead of work stealing.

Rüdiger Möller

unread,
Jan 1, 2014, 8:35:34 AM1/1/14
to mechanica...@googlegroups.com
You name it. Once logic gets more complex and in case of high load with varying communication patterns, we had serious problems where some actors (=clusternodes) became the bottleneck which caused queuing and downgraded overall system performance massively. We then initially started putting decoding of incoming messages to a per-sender thread while still maintaining singlethreaded processing (putting decoded messages into mpsc q). However in most cases best overall system performance could be reached by even doing processing on a per sender thread.

Ofc this completely negates the original idea of "Actors", as processing code gets multithreaded then, however frequently locality isn't that bad as there is kind of "natural" sharding as long different senders (actors) do not adress the same data (which is rare by design in our system).

I can hardly imagine how large actor meshed networks with 'static' thread assignment work for unstable, varying load in complex applications. A concept for dynamic work stealing/balancing is key for a general approach. 

If i understand Martins idea correctly (1 dispatcher per core), one would have to move actors from one dispatcher to another for balancing ?

Another note: On a single machine its more easy to put backpressure to senders, once you scale out on multiple machines, backpressure has higher latency which can lead to strange 'resonance' effects resulting in unstable throughput and unpredictable latency peaks and throughput low's. We get rid of all this with the per sender thread tweak.

Rüdiger Möller

unread,
Jan 1, 2014, 8:37:41 AM1/1/14
to mechanica...@googlegroups.com
Its clear that blocking calls aren't applicable at all in a reactive framework, but what problems do you see in 'Futures' ?

Martin Thompson

unread,
Jan 1, 2014, 9:19:18 AM1/1/14
to mechanica...@googlegroups.com

If i understand Martins idea correctly (1 dispatcher per core), one would have to move actors from one dispatcher to another for balancing ?

Yes the system needs to be capable of rebalancing when faced with unpredictable workloads.

 
Another note: On a single machine its more easy to put backpressure to senders, once you scale out on multiple machines, backpressure has higher latency which can lead to strange 'resonance' effects resulting in unstable throughput and unpredictable latency peaks and throughput low's. We get rid of all this with the per sender thread tweak.

TCP has the advantage of applying appropriate back pressure between nodes. Unfortunately, with multicast or Unicast UDP there is little support available.  Late join and recovery scenarios can get very messy. Todd and I are about to start some new work in this area that we should be able to open source later this year. We want to have flow and congestion control support available to event driven systems that is built into the transport layer but integrated to the application layer.

Martin Thompson

unread,
Jan 1, 2014, 9:24:16 AM1/1/14
to
 
Its clear that blocking calls aren't applicable at all in a reactive framework, but what problems do you see in 'Futures' ?

I think "Futures" are a poor substitute for being pure event driven and using state machines. Futures make failure cases very ugly very quickly and they are really just methadone for trying to wean programmers off synchronous designs :-) 

Rüdiger Möller

unread,
Jan 1, 2014, 10:46:42 AM1/1/14
to mechanica...@googlegroups.com
We are using reliable UDP even for unicast for various reasons (failover etc.).
I have best results with this simple backpressure algorithm:

1) each say 10.000'th message an ack request ist sent on a topic.
2) receivers reply with an ack response (ack req is queued like regular incoming msg)
3) allow N unanswered ack requests, reduce send rate (by blocking some nanos) dependent on the number of unreplied ack's

ofc a sender then has to know roughly how many receivers there are. This way its possible to control queue sizes with very little overhead. By varying ack-interval, max unanswered ack's and the 'pause(unrepliedAcks)' function one can trade latency/queue size against throughput. By setting am upper bound to the nanos paused, one can avoid complete shutdown in case of failing receivers.

BTW applying network algorithms (incl retransmission in case) to in-memory queues works pretty well and is also non-blocking. If tuned correctly (AKA flow control such that no overflows trigger retransmisions) it can be nearly as effective as dedicated shared memory IPC queue (i have an implementation of this in my [~beta] fast-cast 2.0 open source lib, currently relying on system-file-locks so miserable latency .. haven't had time to apply modern locking to this..).

Rüdiger Möller

unread,
Jan 1, 2014, 10:57:37 AM1/1/14
to mechanica...@googlegroups.com
Ok, so you are asking for more mechanical sympathy of the human brain :-). Thinking of some typical use cases involving futures, I have to admit lack of phantasy on how to solve these without futures in an unmessy way.


Am Mittwoch, 1. Januar 2014 15:23:43 UTC+1 schrieb Martin Thompson:
 
Its clear that blocking calls aren't applicable at all in a reactive framework, but what problems do you see in 'Futures' ?

Rajiv Kurian

unread,
Jan 1, 2014, 12:55:27 PM1/1/14
to mechanica...@googlegroups.com
Just to be sure by static allocation you mean some scheme that would let each individual dispatcher computationally (without any central look up table) know which dispatcher a particular actor belongs to? For example: hash an actor-id to one of the dispatchers using the same hash function.
Could you please provide some more details on how the central load balancer would work? I understand the notion of co-locating actors that communicate frequently but am trying to wrap my head around how rebalancing would exactly be done (implementation wise). I am specifically wondering about:
i) How the central load balancer will move actors from one dispatcher to the other? Specifically how will the other dispatchers know about this change and atomically cut over to the new dispatcher when sending a message to an actor. For example: say actor-1 was moved from dispatcher-1 to dispatcher-2 because of either co-location or rebalancing benefits. Dispatchers 3-n also need to know about this change so that when they address actor-1 in the future they enqueue the messages to the right dispatcher's incoming queue. Also dispatcher-1 which just lost actor-1 could still have messages for actor-1 left on it's incoming queues. These need to be processed before any other messages for actor-1 can be processed by it's new dispatcher.

Thanks

Tristan Slominski

unread,
Jan 1, 2014, 3:14:20 PM1/1/14
to mechanica...@googlegroups.com
Would you mind sharing some of the use cases you have in mind? I'd be curious to see where the sticking points are that you came across.

Martin Thompson

unread,
Jan 1, 2014, 3:27:32 PM1/1/14
to mechanica...@googlegroups.com

I find it is best to initially statically allocate with some consideration to special case actors, e.g. large compute jobs or IO.  Rebalancing needs to be based on both queue length and drain rate per dispatcher but also importantly which actors have more frequent communications so they get co-located. This can be achieved by having a central load balancer that can put dispatcher queues into an administration mode to rebalance. This allows the 1P1C semantics to be preserved without the overhead of work stealing.
Just to be sure by static allocation you mean some scheme that would let each individual dispatcher computationally (without any central look up table) know which dispatcher a particular actor belongs to? For example: hash an actor-id to one of the dispatchers using the same hash function.
Could you please provide some more details on how the central load balancer would work? I understand the notion of co-locating actors that communicate frequently but am trying to wrap my head around how rebalancing would exactly be done (implementation wise). I am specifically wondering about:
i) How the central load balancer will move actors from one dispatcher to the other? Specifically how will the other dispatchers know about this change and atomically cut over to the new dispatcher when sending a message to an actor. For example: say actor-1 was moved from dispatcher-1 to dispatcher-2 because of either co-location or rebalancing benefits. Dispatchers 3-n also need to know about this change so that when they address actor-1 in the future they enqueue the messages to the right dispatcher's incoming queue. Also dispatcher-1 which just lost actor-1 could still have messages for actor-1 left on it's incoming queues. These need to be processed before any other messages for actor-1 can be processed by it's new dispatcher.

Each dispatcher keeps track of what actors it hosts so local comms can be optimised away. When references are handed out or migrated they can then be registered with a central directory. The load balancer also manages the directory thus preserving the single writer principle. Directories can be federated to scale and provide local caching across node clusters.

Race conditions can be handled when migrating actors by keeping a forwarding action at the old dispatcher.

awei...@voltdb.com

unread,
Jan 1, 2014, 6:26:54 PM1/1/14
to mechanica...@googlegroups.com
Would that make listenable futures the sweet sweet crystal we are all looking for?

I had a more involved response and I submitted it, but it never appeared. Maybe I hit reply to author.

Ariel


On Wednesday, January 1, 2014 9:23:43 AM UTC-5, Martin Thompson wrote:
 
Its clear that blocking calls aren't applicable at all in a reactive framework, but what problems do you see in 'Futures' ?

Rüdiger Möller

unread,
Jan 1, 2014, 7:49:02 PM1/1/14
to mechanica...@googlegroups.com
Futures are an all day tool to do multiple step application logic in a non-blocking fashion like (using j8 lambda notation for shortness, omit error handling):

mapAndStream( "some query/subscription", (Future f) => { // step 1 
   result = reduce(f); // then step 2
   if ( lastResult ) 
      f.done(); // 'close future mailbox' = invalidate callback entry
      doTransaction(result, (Future transactionResponse) => { // then step 3
         ....
   }); 
});

Ofc one could use an actor based aequivalent, however if there are hundreds of such multiple step business processes, i'd imagine this is hard to manage, maintain amd extend.
A future could be seen as a special 'temporary Actor' (which is actually true in an 1:N communication scheme, a future might receive several results.) 

Rajiv Kurian

unread,
Jan 1, 2014, 8:03:51 PM1/1/14
to mechanica...@googlegroups.com
 Sorry I keep pestering for details - I am attempting to build a prototype to do something like this actually. Is my summary correct?
i) A load balancer thread or multiple threads in case of a federated directory looks at metrics like queue length, drain rate and communication patterns to rebalance actors across dispatchers.
ii) When a dispatcher needs to send a message to an actor it first checks if the actor is on the same dispatcher. If yes then it can just make a method call instead of any communication. If no then it looks it up the right dispatcher from a central directory and places the message on the corresponding queue.
iii) When the load balancer thread determines that actor-1 should be moved away from say dispatcher-1 to dispatcher-2 it needs to first communicate with the dispatcher-1 so that it can set up a proper forwarding map. The load balancer can then update the central directory. Eventually all dispatchers will send actor-1's messages to their respective outgoing queues to dispatcher-2. And since this only happens eventually the forwarding map will take care of all the messages sent to dispatcher-1 in the meantime.

The forwarding map though doesn't take care of messages already on dispatcher-1's incoming queues. Consider the following timeline:
1) Actor-2 on some other dispatcher (maybe dispatcher-3) could have sent 50 messages destined for actor-1 to dispatcher-1. These are still waiting to be processed.
2) Now the load balancer thread starts a rebalance event proposing actor-1 be moved from dispatcher-1 to dispatcher-2.
3) In response to (2) the forwarding map is set up on dispatcher-1 and the load balancer also updates the central directory. All future messages from actor-2 will end up going to dispatcher-2, either through the forwarding map (which might happen in the beginning) or through a look up on the central directory.
4) Actor-2 learns of the change in actor-1's dispatcher and sends msg #51 to dispatcher-2. Dispatcher-2 being idle immediately processes msg #51.
5) Now dispatcher-1 gets around to seeing the 50 messages from actor-2 and forwards them to dispatcher-2.
6) Dispatcher-2 now processes them serially. What ends up happening is that msg #51 (sent from actor-2) is processed before msg #1-50 (also from actor-2) which breaks the actor contract of serial processing for a sender-destination pair.

 What we need to ensure is that till such time that actor-2 (on dispatcher-3) is sending messages to dispatcher-1 (eventually forwarded) instead of directly to dispatcher-2, these messages need to be processed before the ones sent directly to dispatcher-2. A simple forwarding map doesn't guarantee that. I feel like some kind of two phase commit protocol is needed. Something like:
1) Load balancer thread proposes a rebalance event moving actor-1 from dispatcher-1 to dispatcher-2.
2) All threads once they read this event will enqueue future messages destined for actor-1 in some local queue (instead of sending them to dispatcher-1) and signal the load balancer that they are now aware of the rebalance mode. Dispatcher-1 will also enqueue messages for actor-1 on this local queue instead of invoking method calls.
3) The load-balancer now signals dispatcher-1 that the other dispatchers are aware of this proposal. Now dispatcher-1 is guaranteed that no other dispatcher will be sending messages destined for actor-1 on any of it's incoming queues (from step 2).
4) Now dispatcher-1 scans it's incoming queues and forwards all messages (thus maintaining order) for actor-1 to dispatcher-2. Only a single scan will do since it is guaranteed that any concurrent incoming messages will not be for actor-1. This ensures that all messages already sent to actor-1 (hence on dispatcher-1's queues) before or during the rebalance event are forwarded to dispatcher-2 before newer messages are enqueued/processed.
5) Dispatcher-1 can then signal the load-balancer thread that it's done forwarding all old messages for actor-1 to dispatcher-2. The load balancer can then signal all the other threads that the forwarding phase is over.
6) Now all the threads forward the messages enqueued in their temporary local queues (from step-2) to dispatcher-2. From then on they also send messages destined for actor-1 to dispatcher-2 (via the central directory). The 

This should ensure that the actor contract is maintained across rebalancing events.

Tristan Slominski

unread,
Jan 1, 2014, 8:39:56 PM1/1/14
to mechanica...@googlegroups.com
Thank you for the example. I am curious about specific examples because I tend to map futures onto actor become + fork/join (actor pattern). So, attempting to transliterate your example (in pseudo JavaScript):

var reduceBeh = function reduceBeh(message) {
  var result = reduce(message); // do reduce work
  if (lastResult) {
    this.behavior = doTransactionBeh; // become next step
    this.self(result); // send result to self
  };
};

var doTransactionBeh = function doTransactionBeh(message) {
  var transactionResponse = doTransaction(message); // do transaction work
  this.behavior = step3Beh; // become next step
  this.self(transactionResponse); // send transaction response to self
};

mapAndStream("some query/subscription", function (message) {
  var reducer = this.sponsor(reduceBeh); // create an actor with "first step" behavior
  reducer(message); // send it work to do
});

I'm not sure I transliterated it right, and I hand-waved the mapAndStream interface for now, but it seems that become + fork/join could handle hundreds of business logic steps. Additionally, each actor is a "saga" (of Udi Dahan fame) that effectively captures the state of the computation. That is, I can instrument it without any special treatment as I could instrument any other actor. (as opposed to having to understand that futures have special semantics in my normal execution environment).

Also, there is opportunity for sugar here. Something along the lines of:

mapAndStream("some query/subscription", function (message) {
  var reducer = this.sponsor(sequence(reduceBeh, transactionBeh, step3Beh)); // syntactic sugar to create an actor with implied sequence of behaviors
  reducer(message);
});

Rüdiger Möller

unread,
Jan 1, 2014, 9:18:15 PM1/1/14
to
Maybe a more simple approach (no guarantee for correctness ;) ..):

1 let an actor have a tag indicating dispatcher ownership (e.g. thread instance). this can be checked to decide Q or direct call.
2 balancer switches tag atomically from disp 1 => disp 2 + state: IN_TRANSITION
3 next message incoming from disp 1 will now get queued => OK
4 next msg from disp 2 needs to trigger complete dispatch of all msg's in Q until empty, switches actor state to say 'DEF_STATE', then processes next one unqueued.

a) This may create a serious hickup during transition. The hickup could be reduced by keeping per-sender queues (or only processing msg of disp2 in (4)).

b) Another option would be to just Q messages from all dispatchers until the Q is empty at some point, then switch actor state from transition to DEF_STATE.

both variants have drawbacks, a) creates anomalities in processing performance, b) is risky in case the q never gets empty.

Regarding your 2 phase approach, regardless of what you are doing: enqueued messages from disp 2 before the transition have to be processed after rebalancing before any new incoming msg from disp 2 can be processed unqueued.

This could be quite nasty as rebalancing typically will occur under high load ..

Rüdiger Möller

unread,
Jan 1, 2014, 9:15:54 PM1/1/14
to mechanica...@googlegroups.com
Well your syntactic sugar is actually a future isn't it ? Imo its just a different representation of the same underlying processing concept. Futures can be temp actors behind the scenes. As Martin pointed out, it supports thinking in a "serial" process-oriented manner instead of a state machine. As for me I am happy that my fellow coworkers stopped using blocking patterns (AKA send msg, wait for result). So I am very happy that futures make async code look like "normal code, just more braces" :-).

Tristan Slominski

unread,
Jan 1, 2014, 9:32:18 PM1/1/14
to mechanica...@googlegroups.com
I guess you could call it a future, fair enough for now :)

Martin Thompson

unread,
Jan 2, 2014, 6:55:19 AM1/2/14
to mechanica...@googlegroups.com
I find it is best to initially statically allocate with some consideration to special case actors, e.g. large compute jobs or IO.  Rebalancing needs to be based on both queue length and drain rate per dispatcher but also importantly which actors have more frequent communications so they get co-located. This can be achieved by having a central load balancer that can put dispatcher queues into an administration mode to rebalance. This allows the 1P1C semantics to be preserved without the overhead of work stealing.
Just to be sure by static allocation you mean some scheme that would let each individual dispatcher computationally (without any central look up table) know which dispatcher a particular actor belongs to? For example: hash an actor-id to one of the dispatchers using the same hash function.
Could you please provide some more details on how the central load balancer would work? I understand the notion of co-locating actors that communicate frequently but am trying to wrap my head around how rebalancing would exactly be done (implementation wise). I am specifically wondering about:
i) How the central load balancer will move actors from one dispatcher to the other? Specifically how will the other dispatchers know about this change and atomically cut over to the new dispatcher when sending a message to an actor. For example: say actor-1 was moved from dispatcher-1 to dispatcher-2 because of either co-location or rebalancing benefits. Dispatchers 3-n also need to know about this change so that when they address actor-1 in the future they enqueue the messages to the right dispatcher's incoming queue. Also dispatcher-1 which just lost actor-1 could still have messages for actor-1 left on it's incoming queues. These need to be processed before any other messages for actor-1 can be processed by it's new dispatcher.

Each dispatcher keeps track of what actors it hosts so local comms can be optimised away. When references are handed out or migrated they can then be registered with a central directory. The load balancer also manages the directory thus preserving the single writer principle. Directories can be federated to scale and provide local caching across node clusters.
 Sorry I keep pestering for details - I am attempting to build a prototype to do something like this actually. Is my summary correct?
i) A load balancer thread or multiple threads in case of a federated directory looks at metrics like queue length, drain rate and communication patterns to rebalance actors across dispatchers.
ii) When a dispatcher needs to send a message to an actor it first checks if the actor is on the same dispatcher. If yes then it can just make a method call instead of any communication. If no then it looks it up the right dispatcher from a central directory and places the message on the corresponding queue.
iii) When the load balancer thread determines that actor-1 should be moved away from say dispatcher-1 to dispatcher-2 it needs to first communicate with the dispatcher-1 so that it can set up a proper forwarding map. The load balancer can then update the central directory. Eventually all dispatchers will send actor-1's messages to their respective outgoing queues to dispatcher-2. And since this only happens eventually the forwarding map will take care of all the messages sent to dispatcher-1 in the meantime.

Each dispatcher has a MPSC administration queue that it checks after each event dispatch. Migrations etc. are sent to this queue so each dispatcher can maintain local state. The load balancer will move migrated events to other queues when they are put in admin mode. With multiple queues it is possible to have race conditions on migration and also just as other actors lookup and send, or migrating across nodes in a cluster, thus forwarding actions are useful.

The forwarding map though doesn't take care of messages already on dispatcher-1's incoming queues. Consider the following timeline:
1) Actor-2 on some other dispatcher (maybe dispatcher-3) could have sent 50 messages destined for actor-1 to dispatcher-1. These are still waiting to be processed.
2) Now the load balancer thread starts a rebalance event proposing actor-1 be moved from dispatcher-1 to dispatcher-2.
3) In response to (2) the forwarding map is set up on dispatcher-1 and the load balancer also updates the central directory. All future messages from actor-2 will end up going to dispatcher-2, either through the forwarding map (which might happen in the beginning) or through a look up on the central directory.
4) Actor-2 learns of the change in actor-1's dispatcher and sends msg #51 to dispatcher-2. Dispatcher-2 being idle immediately processes msg #51.
5) Now dispatcher-1 gets around to seeing the 50 messages from actor-2 and forwards them to dispatcher-2.
6) Dispatcher-2 now processes them serially. What ends up happening is that msg #51 (sent from actor-2) is processed before msg #1-50 (also from actor-2) which breaks the actor contract of serial processing for a sender-destination pair.

See comment above.
 
 What we need to ensure is that till such time that actor-2 (on dispatcher-3) is sending messages to dispatcher-1 (eventually forwarded) instead of directly to dispatcher-2, these messages need to be processed before the ones sent directly to dispatcher-2. A simple forwarding map doesn't guarantee that. I feel like some kind of two phase commit protocol is needed. Something like:
1) Load balancer thread proposes a rebalance event moving actor-1 from dispatcher-1 to dispatcher-2.
2) All threads once they read this event will enqueue future messages destined for actor-1 in some local queue (instead of sending them to dispatcher-1) and signal the load balancer that they are now aware of the rebalance mode. Dispatcher-1 will also enqueue messages for actor-1 on this local queue instead of invoking method calls.
3) The load-balancer now signals dispatcher-1 that the other dispatchers are aware of this proposal. Now dispatcher-1 is guaranteed that no other dispatcher will be sending messages destined for actor-1 on any of it's incoming queues (from step 2).
4) Now dispatcher-1 scans it's incoming queues and forwards all messages (thus maintaining order) for actor-1 to dispatcher-2. Only a single scan will do since it is guaranteed that any concurrent incoming messages will not be for actor-1. This ensures that all messages already sent to actor-1 (hence on dispatcher-1's queues) before or during the rebalance event are forwarded to dispatcher-2 before newer messages are enqueued/processed.
5) Dispatcher-1 can then signal the load-balancer thread that it's done forwarding all old messages for actor-1 to dispatcher-2. The load balancer can then signal all the other threads that the forwarding phase is over.
6) Now all the threads forward the messages enqueued in their temporary local queues (from step-2) to dispatcher-2. From then on they also send messages destined for actor-1 to dispatcher-2 (via the central directory). The 

This should ensure that the actor contract is maintained across rebalancing events.

Admin queues are the answer :-) Much simpler.
 

Rüdiger Möller

unread,
Jan 2, 2014, 10:12:57 AM1/2/14
to mechanica...@googlegroups.com
Erm, this throws me completely off ..

Just for clarification,

a) do you support direct dispatch (without Q) for 'local' messages ?
b) do you maintain a Q per dispatcher, per actor, or per actor per sender-to-this-actor ?

Martin Thompson

unread,
Jan 2, 2014, 12:02:48 PM1/2/14
to mechanica...@googlegroups.com
Just for clarification,

a) do you support direct dispatch (without Q) for 'local' messages ?

Yes and this is why blocking calls and futures hamper so many optimisations. :-)
 
b) do you maintain a Q per dispatcher, per actor, or per actor per sender-to-this-actor ?

Either a MPSC queue per dispatcher or SPSC queues in a mesh between dispatchers for message flow. Plus an addition administration queue per dispatcher.

What "throws you completely off"?

Rajiv Kurian

unread,
Jan 2, 2014, 3:14:14 PM1/2/14
to mechanica...@googlegroups.com
I don't understand how migrations would work without invalidating the source-destination serial processing contract yet. Martin, would it be possible to provide an example of a high level work-flow/algorithm for a typical run including a migration of an actor from one dispatcher to another?

Martin Thompson

unread,
Jan 3, 2014, 6:04:59 AM1/3/14
to mechanica...@googlegroups.com
Some of the queue implementation I've developed have an admin mode which can be set from the loadbalancer thread. When in this mode, the producer and consumer do not use the queue and thus they can be modified to support migrations.

The full details of how to do this I'll leave as an exercise for the reader otherwise how do I make a living if I gave away full details without getting paid. :-)


--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.

Rajiv Kurian

unread,
Jan 3, 2014, 12:40:18 PM1/3/14
to mechanica...@googlegroups.com
Haha. Fair enough.

Rüdiger Möller

unread,
Jan 3, 2014, 2:13:58 PM1/3/14
to mechanica...@googlegroups.com
As you might imagine I have a playground implementation of actors based on byte code weaving (ripped of my fast-cast 1:N remoting). An actor references other actors using runtime generated proxies, so there is no such thing like a central actor lookup and stuff. A dispatcher consists of a thread and a MPSC queue, a Q entry is actor ref + method call object and that's about it, so I had problems to incorporate SPSC + Admin Q's into my mind model of an actor system :-). I am primary investigating ways to provide a simple "user interface" to programmers currently.

Rüdiger Möller

unread,
Jan 3, 2014, 2:32:52 PM1/3/14