Martin's tweet on actor implementation

1,478 views
Skip to first unread message

Rajiv Kurian

unread,
Dec 31, 2013, 5:49:52 AM12/31/13
to mechanica...@googlegroups.com
Saw this tweet from Martin on implementation of Actor systems:
"I don't think the Disruptor is the best choice :-) . A mesh of point-to-point memory mapped files - Longer story."

Any more details Martin?

Martin Thompson

unread,
Dec 31, 2013, 6:16:18 AM12/31/13
to mechanica...@googlegroups.com
The real value of the Disruptor is in supporting a mostly static graph of dependencies between event processors, or multicasting an event from a single producer to multiple consumers. It can be used as a replacement for a single queue but this is not the sweet spot for its usage. When I developed the performance tests for the Disruptor I deliberately highlighted some of these configurations.

An actor based system will have actors come and go dynamically and dispatch needs to be the most efficient mechanism possible. I prefer to not use a mailbox per actor model and instead have a dispatcher per core with a mesh of 1P1C queues backed by memory mapped files. By memory mapping the queues, the queues then reside outside the Java heap and thus remove the GC pressure from Java and allow multiple language interop.

I believe we are long overdue an actor platform developed with mechanical sympathy for modern processors and memory subsystems. IMHO any actor platform that allows blocking calls or futures has made the most fundamental mistake :-) 

Darach Ennis

unread,
Dec 31, 2013, 6:25:25 AM12/31/13
to mechanica...@googlegroups.com
Erjang (Erlang on the JVM) possibly offers a nice environment to test out mechanically sympathetic actor models.
You should mention the 1P1C mesh idea to Kresten. I think Nitsan has had similar structure of 1P1C memory mapped
file based queue notions too, but not actor model related AFAIKR.

There's a crew in NYC considering building an actor model implementation in Go which would also be interesting...


--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Martin Thompson

unread,
Dec 31, 2013, 6:51:42 AM12/31/13
to mechanica...@googlegroups.com
A few decades back I wrote quite a bit of Prolog and enjoyed it but found many others struggled with it. I love the principles behind Erlang but cannot see it becoming mainstream as a language. B-) We need good actor support in more common languages for mass adoption.

Yeah Nitsan and I have had similar discussions. I've built such systems and know they perform very well. If a good off the shelf product existed then I'm sure many would be saved from rolling their own.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-sympathy+unsub...@googlegroups.com.

Darach Ennis

unread,
Dec 31, 2013, 6:54:40 AM12/31/13
to mechanica...@googlegroups.com
It's a great idea even if it is applied to only the more common languages! Ha ha! :)


To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.

Peter Lawrey

unread,
Dec 31, 2013, 6:58:07 AM12/31/13
to mechanica...@googlegroups.com
I am working on an nPnC memory mapped topic or queue, however you can lock individual memory mapped records between processes on the same machine (by using off heap locks)


Here is an example of how you can have only one consumer for each message.
ExcerptTailer tailer = chronicle.createTailer();
int threadId = AffinitySupport.getThreadId();

// in a busy loop, check there is an excerpt and this is the only consumer.
if (tailer.hasNext() && tailer.compareAndSwapInt(0L, 0, threadId)) {
    tailer.position(4); // skip the lock.
    // binary format
    long v = tailer.readXxxx();
    String text = tailer.readEnum(String.class); // read a UTF-8 String using a string pool.
    // text format
    long x = tailer.parseLong();
    double y = tailer.parseDouble();
    tailer.finish();
}

I am working on more tests to show how this works across processes producing and consuming (not just threads)

Peter.

Rüdiger Möller

unread,
Dec 31, 2013, 10:38:32 AM12/31/13
to mechanica...@googlegroups.com
We use a somewhat similar approach, but address a bigger picture (incl. de/encoding + msg dispatch + LVC). I had best real world results when using 1 thread per sender + (depends: one thread for processing, requires another Q). Though it might not be the absolute optimum in theory, it provides a nice and manageable abstraction (1:N RPC instead of "Messaging" metaphor). Localhost p2p traffic is done by replacing a socket with an offheap queue via mmaped files (which saves complexity as application code does not have to care wether its doing network or local messaging) which is suboptimal ofc.

Rajiv Kurian

unread,
Dec 31, 2013, 2:28:47 PM12/31/13
to mechanica...@googlegroups.com
I actually raised this idea in the Akka dev list once (not the memory mapped queues part).The complication with having a mesh of 1P1C queues which are used to communicate between a lot of actors is work stealing. With anecdotal evidence I have found that static scheduling (assuming that's what you meant) doesn't work with general frameworks where some actors will take more time to process messages than others (image processing actor vs looking up some values from a map). The work across cores will inevitably get imbalanced if we just statically schedule actors to cores. For example if say core 1 is busy and core 2 is out of work we will need to:
i) Design an efficient way to migrate some of the work from core 1's incoming 1P1C queues (one per other core) to core 2.
ii) Remember which actors' messages have been handed out to other idle cores so when we encounter messages for them later (on core 1) we don't process them till the stealing core is done with it's stolen cache of messages and the results have been made visible to core 1. This is to maintain sequential consistency.
iii) Revisit the skipped over messages later to deal with them.

This is just one possibility. Maybe there are better ways. The one mailbox per actor just makes it easy (not efficient) to handle work stealing where an idle core just steals a mailbox. The price to pay is MPSC queues and lack of locality among others.

Martin, do you have any ideas for work stealing in a mesh of 1P1C queues setup or some ways of avoiding it all together? That would make this design really attractive and surely be a boost to performance for actor systems without any loss of semantics.

Thanks!

Rajiv Kurian

unread,
Dec 31, 2013, 2:33:44 PM12/31/13
to mechanica...@googlegroups.com


On Tuesday, December 31, 2013 3:25:25 AM UTC-8, Darach Ennis wrote:
Erjang (Erlang on the JVM) possibly offers a nice environment to test out mechanically sympathetic actor models.
You should mention the 1P1C mesh idea to Kresten. I think Nitsan has had similar structure of 1P1C memory mapped
file based queue notions too, but not actor model related AFAIKR.

There's a crew in NYC considering building an actor model implementation in Go which would also be interesting...
Not sure if Go is a great way to build such systems as it currently stands. There isn't any way to opt out of the go-routine runtime. Using channels as they stand right now wouldn't be a great idea either since they are designed for MPMC scenarios and the implementation is chock full of mutexes - there was a patch by Dmitry Vyukov to change this, not sure if it got merged. Also last time I checked the support for C++11 like atomics (std::acquire and std::release) required to build efficient 1P1C queues was also lacking in the std library.


On Tue, Dec 31, 2013 at 12:16 PM, Martin Thompson <mjp...@gmail.com> wrote:
The real value of the Disruptor is in supporting a mostly static graph of dependencies between event processors, or multicasting an event from a single producer to multiple consumers. It can be used as a replacement for a single queue but this is not the sweet spot for its usage. When I developed the performance tests for the Disruptor I deliberately highlighted some of these configurations.

An actor based system will have actors come and go dynamically and dispatch needs to be the most efficient mechanism possible. I prefer to not use a mailbox per actor model and instead have a dispatcher per core with a mesh of 1P1C queues backed by memory mapped files. By memory mapping the queues, the queues then reside outside the Java heap and thus remove the GC pressure from Java and allow multiple language interop.

I believe we are long overdue an actor platform developed with mechanical sympathy for modern processors and memory subsystems. IMHO any actor platform that allows blocking calls or futures has made the most fundamental mistake :-) 

On Tuesday, 31 December 2013 10:49:52 UTC, Rajiv Kurian wrote:
Saw this tweet from Martin on implementation of Actor systems:
"I don't think the Disruptor is the best choice :-) . A mesh of point-to-point memory mapped files - Longer story."

Any more details Martin?

--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-sympathy+unsub...@googlegroups.com.

Michael Barker

unread,
Dec 31, 2013, 6:04:50 PM12/31/13
to mechanica...@googlegroups.com
Not sure if Go is a great way to build such systems as it currently stands. There isn't any way to opt out of the go-routine runtime. Using channels as they stand right now wouldn't be a great idea either since they are designed for MPMC scenarios and the implementation is chock full of mutexes - there was a patch by Dmitry Vyukov to change this, not sure if it got merged. Also last time I checked the support for C++11 like atomics (std::acquire and std::release) required to build efficient 1P1C queues was also lacking in the std library.

Go doesn't have a sophisticated memory model ala Java/C++.  The constructs available (http://golang.org/ref/mem) for guaranteing ordering and visibilty across threads aren't particularly useful for building custom lock-free concurrent structures.  My take is that if Go's concurrency model (channels and goroutines) works for you application then great, if you want a different model and/or implementation for passing data between threads then pick a different tool.

Mike.
 

Martin Thompson

unread,
Jan 1, 2014, 6:27:46 AM1/1/14
to mechanica...@googlegroups.com
On 31 December 2013 19:28, Rajiv Kurian <geet...@gmail.com> wrote:
I actually raised this idea in the Akka dev list once (not the memory mapped queues part).The complication with having a mesh of 1P1C queues which are used to communicate between a lot of actors is work stealing. With anecdotal evidence I have found that static scheduling (assuming that's what you meant) doesn't work with general frameworks where some actors will take more time to process messages than others (image processing actor vs looking up some values from a map). The work across cores will inevitably get imbalanced if we just statically schedule actors to cores. For example if say core 1 is busy and core 2 is out of work we will need to:
i) Design an efficient way to migrate some of the work from core 1's incoming 1P1C queues (one per other core) to core 2.
ii) Remember which actors' messages have been handed out to other idle cores so when we encounter messages for them later (on core 1) we don't process them till the stealing core is done with it's stolen cache of messages and the results have been made visible to core 1. This is to maintain sequential consistency.
iii) Revisit the skipped over messages later to deal with them.

This is just one possibility. Maybe there are better ways. The one mailbox per actor just makes it easy (not efficient) to handle work stealing where an idle core just steals a mailbox. The price to pay is MPSC queues and lack of locality among others.

Martin, do you have any ideas for work stealing in a mesh of 1P1C queues setup or some ways of avoiding it all together? That would make this design really attractive and surely be a boost to performance for actor systems without any loss of semantics.

I find it is best to initially statically allocate with some consideration to special case actors, e.g. large compute jobs or IO.  Rebalancing needs to be based on both queue length and drain rate per dispatcher but also importantly which actors have more frequent communications so they get co-located. This can be achieved by having a central load balancer that can put dispatcher queues into an administration mode to rebalance. This allows the 1P1C semantics to be preserved without the overhead of work stealing.

Rüdiger Möller

unread,
Jan 1, 2014, 8:35:34 AM1/1/14
to mechanica...@googlegroups.com
You name it. Once logic gets more complex and in case of high load with varying communication patterns, we had serious problems where some actors (=clusternodes) became the bottleneck which caused queuing and downgraded overall system performance massively. We then initially started putting decoding of incoming messages to a per-sender thread while still maintaining singlethreaded processing (putting decoded messages into mpsc q). However in most cases best overall system performance could be reached by even doing processing on a per sender thread.

Ofc this completely negates the original idea of "Actors", as processing code gets multithreaded then, however frequently locality isn't that bad as there is kind of "natural" sharding as long different senders (actors) do not adress the same data (which is rare by design in our system).

I can hardly imagine how large actor meshed networks with 'static' thread assignment work for unstable, varying load in complex applications. A concept for dynamic work stealing/balancing is key for a general approach. 

If i understand Martins idea correctly (1 dispatcher per core), one would have to move actors from one dispatcher to another for balancing ?

Another note: On a single machine its more easy to put backpressure to senders, once you scale out on multiple machines, backpressure has higher latency which can lead to strange 'resonance' effects resulting in unstable throughput and unpredictable latency peaks and throughput low's. We get rid of all this with the per sender thread tweak.

Rüdiger Möller

unread,
Jan 1, 2014, 8:37:41 AM1/1/14
to mechanica...@googlegroups.com
Its clear that blocking calls aren't applicable at all in a reactive framework, but what problems do you see in 'Futures' ?

Martin Thompson

unread,
Jan 1, 2014, 9:19:18 AM1/1/14
to mechanica...@googlegroups.com

If i understand Martins idea correctly (1 dispatcher per core), one would have to move actors from one dispatcher to another for balancing ?

Yes the system needs to be capable of rebalancing when faced with unpredictable workloads.

 
Another note: On a single machine its more easy to put backpressure to senders, once you scale out on multiple machines, backpressure has higher latency which can lead to strange 'resonance' effects resulting in unstable throughput and unpredictable latency peaks and throughput low's. We get rid of all this with the per sender thread tweak.

TCP has the advantage of applying appropriate back pressure between nodes. Unfortunately, with multicast or Unicast UDP there is little support available.  Late join and recovery scenarios can get very messy. Todd and I are about to start some new work in this area that we should be able to open source later this year. We want to have flow and congestion control support available to event driven systems that is built into the transport layer but integrated to the application layer.

Martin Thompson

unread,
Jan 1, 2014, 9:24:16 AM1/1/14
to
 
Its clear that blocking calls aren't applicable at all in a reactive framework, but what problems do you see in 'Futures' ?

I think "Futures" are a poor substitute for being pure event driven and using state machines. Futures make failure cases very ugly very quickly and they are really just methadone for trying to wean programmers off synchronous designs :-) 

Rüdiger Möller

unread,
Jan 1, 2014, 10:46:42 AM1/1/14
to mechanica...@googlegroups.com
We are using reliable UDP even for unicast for various reasons (failover etc.).
I have best results with this simple backpressure algorithm:

1) each say 10.000'th message an ack request ist sent on a topic.
2) receivers reply with an ack response (ack req is queued like regular incoming msg)
3) allow N unanswered ack requests, reduce send rate (by blocking some nanos) dependent on the number of unreplied ack's

ofc a sender then has to know roughly how many receivers there are. This way its possible to control queue sizes with very little overhead. By varying ack-interval, max unanswered ack's and the 'pause(unrepliedAcks)' function one can trade latency/queue size against throughput. By setting am upper bound to the nanos paused, one can avoid complete shutdown in case of failing receivers.

BTW applying network algorithms (incl retransmission in case) to in-memory queues works pretty well and is also non-blocking. If tuned correctly (AKA flow control such that no overflows trigger retransmisions) it can be nearly as effective as dedicated shared memory IPC queue (i have an implementation of this in my [~beta] fast-cast 2.0 open source lib, currently relying on system-file-locks so miserable latency .. haven't had time to apply modern locking to this..).

Rüdiger Möller

unread,
Jan 1, 2014, 10:57:37 AM1/1/14
to mechanica...@googlegroups.com
Ok, so you are asking for more mechanical sympathy of the human brain :-). Thinking of some typical use cases involving futures, I have to admit lack of phantasy on how to solve these without futures in an unmessy way.


Am Mittwoch, 1. Januar 2014 15:23:43 UTC+1 schrieb Martin Thompson:
 
Its clear that blocking calls aren't applicable at all in a reactive framework, but what problems do you see in 'Futures' ?

Rajiv Kurian

unread,
Jan 1, 2014, 12:55:27 PM1/1/14
to mechanica...@googlegroups.com
Just to be sure by static allocation you mean some scheme that would let each individual dispatcher computationally (without any central look up table) know which dispatcher a particular actor belongs to? For example: hash an actor-id to one of the dispatchers using the same hash function.
Could you please provide some more details on how the central load balancer would work? I understand the notion of co-locating actors that communicate frequently but am trying to wrap my head around how rebalancing would exactly be done (implementation wise). I am specifically wondering about:
i) How the central load balancer will move actors from one dispatcher to the other? Specifically how will the other dispatchers know about this change and atomically cut over to the new dispatcher when sending a message to an actor. For example: say actor-1 was moved from dispatcher-1 to dispatcher-2 because of either co-location or rebalancing benefits. Dispatchers 3-n also need to know about this change so that when they address actor-1 in the future they enqueue the messages to the right dispatcher's incoming queue. Also dispatcher-1 which just lost actor-1 could still have messages for actor-1 left on it's incoming queues. These need to be processed before any other messages for actor-1 can be processed by it's new dispatcher.

Thanks

Tristan Slominski

unread,
Jan 1, 2014, 3:14:20 PM1/1/14
to mechanica...@googlegroups.com
Would you mind sharing some of the use cases you have in mind? I'd be curious to see where the sticking points are that you came across.

Martin Thompson

unread,
Jan 1, 2014, 3:27:32 PM1/1/14
to mechanica...@googlegroups.com

I find it is best to initially statically allocate with some consideration to special case actors, e.g. large compute jobs or IO.  Rebalancing needs to be based on both queue length and drain rate per dispatcher but also importantly which actors have more frequent communications so they get co-located. This can be achieved by having a central load balancer that can put dispatcher queues into an administration mode to rebalance. This allows the 1P1C semantics to be preserved without the overhead of work stealing.
Just to be sure by static allocation you mean some scheme that would let each individual dispatcher computationally (without any central look up table) know which dispatcher a particular actor belongs to? For example: hash an actor-id to one of the dispatchers using the same hash function.
Could you please provide some more details on how the central load balancer would work? I understand the notion of co-locating actors that communicate frequently but am trying to wrap my head around how rebalancing would exactly be done (implementation wise). I am specifically wondering about:
i) How the central load balancer will move actors from one dispatcher to the other? Specifically how will the other dispatchers know about this change and atomically cut over to the new dispatcher when sending a message to an actor. For example: say actor-1 was moved from dispatcher-1 to dispatcher-2 because of either co-location or rebalancing benefits. Dispatchers 3-n also need to know about this change so that when they address actor-1 in the future they enqueue the messages to the right dispatcher's incoming queue. Also dispatcher-1 which just lost actor-1 could still have messages for actor-1 left on it's incoming queues. These need to be processed before any other messages for actor-1 can be processed by it's new dispatcher.

Each dispatcher keeps track of what actors it hosts so local comms can be optimised away. When references are handed out or migrated they can then be registered with a central directory. The load balancer also manages the directory thus preserving the single writer principle. Directories can be federated to scale and provide local caching across node clusters.

Race conditions can be handled when migrating actors by keeping a forwarding action at the old dispatcher.

awei...@voltdb.com

unread,
Jan 1, 2014, 6:26:54 PM1/1/14
to mechanica...@googlegroups.com
Would that make listenable futures the sweet sweet crystal we are all looking for?

I had a more involved response and I submitted it, but it never appeared. Maybe I hit reply to author.

Ariel


On Wednesday, January 1, 2014 9:23:43 AM UTC-5, Martin Thompson wrote:
 
Its clear that blocking calls aren't applicable at all in a reactive framework, but what problems do you see in 'Futures' ?

Rüdiger Möller

unread,
Jan 1, 2014, 7:49:02 PM1/1/14
to mechanica...@googlegroups.com
Futures are an all day tool to do multiple step application logic in a non-blocking fashion like (using j8 lambda notation for shortness, omit error handling):

mapAndStream( "some query/subscription", (Future f) => { // step 1 
   result = reduce(f); // then step 2
   if ( lastResult ) 
      f.done(); // 'close future mailbox' = invalidate callback entry
      doTransaction(result, (Future transactionResponse) => { // then step 3
         ....
   }); 
});

Ofc one could use an actor based aequivalent, however if there are hundreds of such multiple step business processes, i'd imagine this is hard to manage, maintain amd extend.
A future could be seen as a special 'temporary Actor' (which is actually true in an 1:N communication scheme, a future might receive several results.) 

Rajiv Kurian

unread,
Jan 1, 2014, 8:03:51 PM1/1/14
to mechanica...@googlegroups.com
 Sorry I keep pestering for details - I am attempting to build a prototype to do something like this actually. Is my summary correct?
i) A load balancer thread or multiple threads in case of a federated directory looks at metrics like queue length, drain rate and communication patterns to rebalance actors across dispatchers.
ii) When a dispatcher needs to send a message to an actor it first checks if the actor is on the same dispatcher. If yes then it can just make a method call instead of any communication. If no then it looks it up the right dispatcher from a central directory and places the message on the corresponding queue.
iii) When the load balancer thread determines that actor-1 should be moved away from say dispatcher-1 to dispatcher-2 it needs to first communicate with the dispatcher-1 so that it can set up a proper forwarding map. The load balancer can then update the central directory. Eventually all dispatchers will send actor-1's messages to their respective outgoing queues to dispatcher-2. And since this only happens eventually the forwarding map will take care of all the messages sent to dispatcher-1 in the meantime.

The forwarding map though doesn't take care of messages already on dispatcher-1's incoming queues. Consider the following timeline:
1) Actor-2 on some other dispatcher (maybe dispatcher-3) could have sent 50 messages destined for actor-1 to dispatcher-1. These are still waiting to be processed.
2) Now the load balancer thread starts a rebalance event proposing actor-1 be moved from dispatcher-1 to dispatcher-2.
3) In response to (2) the forwarding map is set up on dispatcher-1 and the load balancer also updates the central directory. All future messages from actor-2 will end up going to dispatcher-2, either through the forwarding map (which might happen in the beginning) or through a look up on the central directory.
4) Actor-2 learns of the change in actor-1's dispatcher and sends msg #51 to dispatcher-2. Dispatcher-2 being idle immediately processes msg #51.
5) Now dispatcher-1 gets around to seeing the 50 messages from actor-2 and forwards them to dispatcher-2.
6) Dispatcher-2 now processes them serially. What ends up happening is that msg #51 (sent from actor-2) is processed before msg #1-50 (also from actor-2) which breaks the actor contract of serial processing for a sender-destination pair.

 What we need to ensure is that till such time that actor-2 (on dispatcher-3) is sending messages to dispatcher-1 (eventually forwarded) instead of directly to dispatcher-2, these messages need to be processed before the ones sent directly to dispatcher-2. A simple forwarding map doesn't guarantee that. I feel like some kind of two phase commit protocol is needed. Something like:
1) Load balancer thread proposes a rebalance event moving actor-1 from dispatcher-1 to dispatcher-2.
2) All threads once they read this event will enqueue future messages destined for actor-1 in some local queue (instead of sending them to dispatcher-1) and signal the load balancer that they are now aware of the rebalance mode. Dispatcher-1 will also enqueue messages for actor-1 on this local queue instead of invoking method calls.
3) The load-balancer now signals dispatcher-1 that the other dispatchers are aware of this proposal. Now dispatcher-1 is guaranteed that no other dispatcher will be sending messages destined for actor-1 on any of it's incoming queues (from step 2).
4) Now dispatcher-1 scans it's incoming queues and forwards all messages (thus maintaining order) for actor-1 to dispatcher-2. Only a single scan will do since it is guaranteed that any concurrent incoming messages will not be for actor-1. This ensures that all messages already sent to actor-1 (hence on dispatcher-1's queues) before or during the rebalance event are forwarded to dispatcher-2 before newer messages are enqueued/processed.
5) Dispatcher-1 can then signal the load-balancer thread that it's done forwarding all old messages for actor-1 to dispatcher-2. The load balancer can then signal all the other threads that the forwarding phase is over.
6) Now all the threads forward the messages enqueued in their temporary local queues (from step-2) to dispatcher-2. From then on they also send messages destined for actor-1 to dispatcher-2 (via the central directory). The 

This should ensure that the actor contract is maintained across rebalancing events.

Tristan Slominski

unread,
Jan 1, 2014, 8:39:56 PM1/1/14
to mechanica...@googlegroups.com
Thank you for the example. I am curious about specific examples because I tend to map futures onto actor become + fork/join (actor pattern). So, attempting to transliterate your example (in pseudo JavaScript):

var reduceBeh = function reduceBeh(message) {
  var result = reduce(message); // do reduce work
  if (lastResult) {
    this.behavior = doTransactionBeh; // become next step
    this.self(result); // send result to self
  };
};

var doTransactionBeh = function doTransactionBeh(message) {
  var transactionResponse = doTransaction(message); // do transaction work
  this.behavior = step3Beh; // become next step
  this.self(transactionResponse); // send transaction response to self
};

mapAndStream("some query/subscription", function (message) {
  var reducer = this.sponsor(reduceBeh); // create an actor with "first step" behavior
  reducer(message); // send it work to do
});

I'm not sure I transliterated it right, and I hand-waved the mapAndStream interface for now, but it seems that become + fork/join could handle hundreds of business logic steps. Additionally, each actor is a "saga" (of Udi Dahan fame) that effectively captures the state of the computation. That is, I can instrument it without any special treatment as I could instrument any other actor. (as opposed to having to understand that futures have special semantics in my normal execution environment).

Also, there is opportunity for sugar here. Something along the lines of:

mapAndStream("some query/subscription", function (message) {
  var reducer = this.sponsor(sequence(reduceBeh, transactionBeh, step3Beh)); // syntactic sugar to create an actor with implied sequence of behaviors
  reducer(message);
});

Rüdiger Möller

unread,
Jan 1, 2014, 9:18:15 PM1/1/14
to
Maybe a more simple approach (no guarantee for correctness ;) ..):

1 let an actor have a tag indicating dispatcher ownership (e.g. thread instance). this can be checked to decide Q or direct call.
2 balancer switches tag atomically from disp 1 => disp 2 + state: IN_TRANSITION
3 next message incoming from disp 1 will now get queued => OK
4 next msg from disp 2 needs to trigger complete dispatch of all msg's in Q until empty, switches actor state to say 'DEF_STATE', then processes next one unqueued.

a) This may create a serious hickup during transition. The hickup could be reduced by keeping per-sender queues (or only processing msg of disp2 in (4)).

b) Another option would be to just Q messages from all dispatchers until the Q is empty at some point, then switch actor state from transition to DEF_STATE.

both variants have drawbacks, a) creates anomalities in processing performance, b) is risky in case the q never gets empty.

Regarding your 2 phase approach, regardless of what you are doing: enqueued messages from disp 2 before the transition have to be processed after rebalancing before any new incoming msg from disp 2 can be processed unqueued.

This could be quite nasty as rebalancing typically will occur under high load ..

Rüdiger Möller

unread,
Jan 1, 2014, 9:15:54 PM1/1/14
to mechanica...@googlegroups.com
Well your syntactic sugar is actually a future isn't it ? Imo its just a different representation of the same underlying processing concept. Futures can be temp actors behind the scenes. As Martin pointed out, it supports thinking in a "serial" process-oriented manner instead of a state machine. As for me I am happy that my fellow coworkers stopped using blocking patterns (AKA send msg, wait for result). So I am very happy that futures make async code look like "normal code, just more braces" :-).

Tristan Slominski

unread,
Jan 1, 2014, 9:32:18 PM1/1/14
to mechanica...@googlegroups.com
I guess you could call it a future, fair enough for now :)

Martin Thompson

unread,
Jan 2, 2014, 6:55:19 AM1/2/14
to mechanica...@googlegroups.com
I find it is best to initially statically allocate with some consideration to special case actors, e.g. large compute jobs or IO.  Rebalancing needs to be based on both queue length and drain rate per dispatcher but also importantly which actors have more frequent communications so they get co-located. This can be achieved by having a central load balancer that can put dispatcher queues into an administration mode to rebalance. This allows the 1P1C semantics to be preserved without the overhead of work stealing.
Just to be sure by static allocation you mean some scheme that would let each individual dispatcher computationally (without any central look up table) know which dispatcher a particular actor belongs to? For example: hash an actor-id to one of the dispatchers using the same hash function.
Could you please provide some more details on how the central load balancer would work? I understand the notion of co-locating actors that communicate frequently but am trying to wrap my head around how rebalancing would exactly be done (implementation wise). I am specifically wondering about:
i) How the central load balancer will move actors from one dispatcher to the other? Specifically how will the other dispatchers know about this change and atomically cut over to the new dispatcher when sending a message to an actor. For example: say actor-1 was moved from dispatcher-1 to dispatcher-2 because of either co-location or rebalancing benefits. Dispatchers 3-n also need to know about this change so that when they address actor-1 in the future they enqueue the messages to the right dispatcher's incoming queue. Also dispatcher-1 which just lost actor-1 could still have messages for actor-1 left on it's incoming queues. These need to be processed before any other messages for actor-1 can be processed by it's new dispatcher.

Each dispatcher keeps track of what actors it hosts so local comms can be optimised away. When references are handed out or migrated they can then be registered with a central directory. The load balancer also manages the directory thus preserving the single writer principle. Directories can be federated to scale and provide local caching across node clusters.
 Sorry I keep pestering for details - I am attempting to build a prototype to do something like this actually. Is my summary correct?
i) A load balancer thread or multiple threads in case of a federated directory looks at metrics like queue length, drain rate and communication patterns to rebalance actors across dispatchers.
ii) When a dispatcher needs to send a message to an actor it first checks if the actor is on the same dispatcher. If yes then it can just make a method call instead of any communication. If no then it looks it up the right dispatcher from a central directory and places the message on the corresponding queue.
iii) When the load balancer thread determines that actor-1 should be moved away from say dispatcher-1 to dispatcher-2 it needs to first communicate with the dispatcher-1 so that it can set up a proper forwarding map. The load balancer can then update the central directory. Eventually all dispatchers will send actor-1's messages to their respective outgoing queues to dispatcher-2. And since this only happens eventually the forwarding map will take care of all the messages sent to dispatcher-1 in the meantime.

Each dispatcher has a MPSC administration queue that it checks after each event dispatch. Migrations etc. are sent to this queue so each dispatcher can maintain local state. The load balancer will move migrated events to other queues when they are put in admin mode. With multiple queues it is possible to have race conditions on migration and also just as other actors lookup and send, or migrating across nodes in a cluster, thus forwarding actions are useful.

The forwarding map though doesn't take care of messages already on dispatcher-1's incoming queues. Consider the following timeline:
1) Actor-2 on some other dispatcher (maybe dispatcher-3) could have sent 50 messages destined for actor-1 to dispatcher-1. These are still waiting to be processed.
2) Now the load balancer thread starts a rebalance event proposing actor-1 be moved from dispatcher-1 to dispatcher-2.
3) In response to (2) the forwarding map is set up on dispatcher-1 and the load balancer also updates the central directory. All future messages from actor-2 will end up going to dispatcher-2, either through the forwarding map (which might happen in the beginning) or through a look up on the central directory.
4) Actor-2 learns of the change in actor-1's dispatcher and sends msg #51 to dispatcher-2. Dispatcher-2 being idle immediately processes msg #51.
5) Now dispatcher-1 gets around to seeing the 50 messages from actor-2 and forwards them to dispatcher-2.
6) Dispatcher-2 now processes them serially. What ends up happening is that msg #51 (sent from actor-2) is processed before msg #1-50 (also from actor-2) which breaks the actor contract of serial processing for a sender-destination pair.

See comment above.
 
 What we need to ensure is that till such time that actor-2 (on dispatcher-3) is sending messages to dispatcher-1 (eventually forwarded) instead of directly to dispatcher-2, these messages need to be processed before the ones sent directly to dispatcher-2. A simple forwarding map doesn't guarantee that. I feel like some kind of two phase commit protocol is needed. Something like:
1) Load balancer thread proposes a rebalance event moving actor-1 from dispatcher-1 to dispatcher-2.
2) All threads once they read this event will enqueue future messages destined for actor-1 in some local queue (instead of sending them to dispatcher-1) and signal the load balancer that they are now aware of the rebalance mode. Dispatcher-1 will also enqueue messages for actor-1 on this local queue instead of invoking method calls.
3) The load-balancer now signals dispatcher-1 that the other dispatchers are aware of this proposal. Now dispatcher-1 is guaranteed that no other dispatcher will be sending messages destined for actor-1 on any of it's incoming queues (from step 2).
4) Now dispatcher-1 scans it's incoming queues and forwards all messages (thus maintaining order) for actor-1 to dispatcher-2. Only a single scan will do since it is guaranteed that any concurrent incoming messages will not be for actor-1. This ensures that all messages already sent to actor-1 (hence on dispatcher-1's queues) before or during the rebalance event are forwarded to dispatcher-2 before newer messages are enqueued/processed.
5) Dispatcher-1 can then signal the load-balancer thread that it's done forwarding all old messages for actor-1 to dispatcher-2. The load balancer can then signal all the other threads that the forwarding phase is over.
6) Now all the threads forward the messages enqueued in their temporary local queues (from step-2) to dispatcher-2. From then on they also send messages destined for actor-1 to dispatcher-2 (via the central directory). The 

This should ensure that the actor contract is maintained across rebalancing events.

Admin queues are the answer :-) Much simpler.
 

Rüdiger Möller

unread,
Jan 2, 2014, 10:12:57 AM1/2/14
to mechanica...@googlegroups.com
Erm, this throws me completely off ..

Just for clarification,

a) do you support direct dispatch (without Q) for 'local' messages ?
b) do you maintain a Q per dispatcher, per actor, or per actor per sender-to-this-actor ?

Martin Thompson

unread,
Jan 2, 2014, 12:02:48 PM1/2/14
to mechanica...@googlegroups.com
Just for clarification,

a) do you support direct dispatch (without Q) for 'local' messages ?

Yes and this is why blocking calls and futures hamper so many optimisations. :-)
 
b) do you maintain a Q per dispatcher, per actor, or per actor per sender-to-this-actor ?

Either a MPSC queue per dispatcher or SPSC queues in a mesh between dispatchers for message flow. Plus an addition administration queue per dispatcher.

What "throws you completely off"?

Rajiv Kurian

unread,
Jan 2, 2014, 3:14:14 PM1/2/14
to mechanica...@googlegroups.com
I don't understand how migrations would work without invalidating the source-destination serial processing contract yet. Martin, would it be possible to provide an example of a high level work-flow/algorithm for a typical run including a migration of an actor from one dispatcher to another?

Martin Thompson

unread,
Jan 3, 2014, 6:04:59 AM1/3/14
to mechanica...@googlegroups.com
Some of the queue implementation I've developed have an admin mode which can be set from the loadbalancer thread. When in this mode, the producer and consumer do not use the queue and thus they can be modified to support migrations.

The full details of how to do this I'll leave as an exercise for the reader otherwise how do I make a living if I gave away full details without getting paid. :-)


--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.

Rajiv Kurian

unread,
Jan 3, 2014, 12:40:18 PM1/3/14
to mechanica...@googlegroups.com
Haha. Fair enough.

Rüdiger Möller

unread,
Jan 3, 2014, 2:13:58 PM1/3/14
to mechanica...@googlegroups.com
As you might imagine I have a playground implementation of actors based on byte code weaving (ripped of my fast-cast 1:N remoting). An actor references other actors using runtime generated proxies, so there is no such thing like a central actor lookup and stuff. A dispatcher consists of a thread and a MPSC queue, a Q entry is actor ref + method call object and that's about it, so I had problems to incorporate SPSC + Admin Q's into my mind model of an actor system :-). I am primary investigating ways to provide a simple "user interface" to programmers currently.

Rüdiger Möller

unread,
Jan 3, 2014, 2:32:52 PM1/3/14
to mechanica...@googlegroups.com
:)) . 
Since the open source hype created the illusion of "software is free" in the non-tech management, a lot of business driven companies think any problem can be solved using jboss, hibernate, javascript and hoardes of 'cheap' junior level consultants .. one just has to google & download.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-sympathy+unsub...@googlegroups.com.

Rajiv Kurian

unread,
Jan 8, 2014, 3:05:05 AM1/8/14
to mechanica...@googlegroups.com


On Thursday, January 2, 2014 9:02:48 AM UTC-8, Martin Thompson wrote:

Just for clarification,

a) do you support direct dispatch (without Q) for 'local' messages ?

Yes and this is why blocking calls and futures hamper so many optimisations. :-)
Direct dispatch without a thread local queue is dangerous. If an actor for some reason decides to msg itself, it's processing function will be invoked from within itself invalidating the actor contract. Even messages to other actors are dangerous. Say Actor A msgs Actor B, they are on the same dispatcher and so in the middle of Actor A's processing function we call Actor B's processing function. This is all fine and good unless Actor B decides to msg Actor A in response. This will again cause Actor A's processing function to be entered twice.
The function invocation sequence will look something like this:

actorA'sProcessingFunction() {
  some stuff...
  ActorB'sProcessFunction() {
     some more stuff..
     ActorA'sProcessingFunction() {
       // Bad things happen now.

Martin Thompson

unread,
Jan 8, 2014, 3:52:14 AM1/8/14
to mechanica...@googlegroups.com
The fun of working things out for yourself. :-)

The dispatcher makes the decision if the call can be optimised. Send must always invoke the dispatcher to make the routing decision. This is needed for the remote calls anyway.


--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.

Rajiv Kurian

unread,
Jan 8, 2014, 4:15:03 AM1/8/14
to mechanica...@googlegroups.com


On Wednesday, January 8, 2014 12:52:14 AM UTC-8, Martin Thompson wrote:
The fun of working things out for yourself. :-)

The dispatcher makes the decision if the call can be optimised. Send must always invoke the dispatcher to make the routing decision. This is needed for the remote calls anyway.
I get that, but I don't understand how the dispatcher could say anything more than if the destination actor is scheduled on the same thread or not. It cannot predict what the direct dispatch of the destination actor's will do. My example shows the problem with direct dispatch if two actor's (on the same thread) end up sending msgs to each other and both get optimized to direct dispatch calls. Maybe this falls into your secret sauce category :)

Consider the example where Actor A and Actor B are on the same thread:
1) Actor A receives a msg - SayHelloToB and does something like this:
  if (msg == sayHelloToB) {
    Change size variable of internal hash map's backing array. At this moment this change is partial and assumed not partially visible to future messages to this actor.
    Send msg HelloFromA to Actor B
    Allocate a new backing array for internal hash map. Now the change is complete.
   } else if (msg == ThankYouFromB) {
     Use internal hash map to make life altering decision.
   }
2) Actor B receives the HelloFromA msg and decides to msg ActorA back with ThankYouFromB
  if (msg == HelloFromA) {
    Send msg ThankYouFromB to Actor A
  }

Since Actor A and Actor B are on the same dispatcher, these send actions are optimized to direct calls. What ends up happening on Actor A is this:
  if (msg == sayHelloToB) {
    Change size variable of internal hash map's backing array. At this moment this change is partial and assumed not visible to future messages to this actor.
    // Expanded from processing logic for msg ThankYOuFromB.
    Use internal hash map for life changing decision - BOOM.
     ...
    Allocate a new backing array for internal hash map. Too late!
   }




On 8 January 2014 08:05, Rajiv Kurian <geet...@gmail.com> wrote:


On Thursday, January 2, 2014 9:02:48 AM UTC-8, Martin Thompson wrote:

Just for clarification,

a) do you support direct dispatch (without Q) for 'local' messages ?

Yes and this is why blocking calls and futures hamper so many optimisations. :-)
Direct dispatch without a thread local queue is dangerous. If an actor for some reason decides to msg itself, it's processing function will be invoked from within itself invalidating the actor contract. Even messages to other actors are dangerous. Say Actor A msgs Actor B, they are on the same dispatcher and so in the middle of Actor A's processing function we call Actor B's processing function. This is all fine and good unless Actor B decides to msg Actor A in response. This will again cause Actor A's processing function to be entered twice.
The function invocation sequence will look something like this:

actorA'sProcessingFunction() {
  some stuff...
  ActorB'sProcessFunction() {
     some more stuff..
     ActorA'sProcessingFunction() {
       // Bad things happen now.
     }
   }
}
 
b) do you maintain a Q per dispatcher, per actor, or per actor per sender-to-this-actor ?

Either a MPSC queue per dispatcher or SPSC queues in a mesh between dispatchers for message flow. Plus an addition administration queue per dispatcher.

What "throws you completely off"?

--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-sympathy+unsub...@googlegroups.com.

Martin Thompson

unread,
Jan 8, 2014, 4:21:35 AM1/8/14
to mechanica...@googlegroups.com
I don't mind helping a bit when you are getting really close to how to do it :-)

An option is to keep a local non-concurrent queue to the dispatcher. Local sends get enqueued and processed just like normal messages. This keeps the code clean and similar.

Rajiv Kurian

unread,
Jan 8, 2014, 4:24:41 AM1/8/14
to mechanica...@googlegroups.com


On Wednesday, January 8, 2014 1:15:03 AM UTC-8, Rajiv Kurian wrote:


On Wednesday, January 8, 2014 12:52:14 AM UTC-8, Martin Thompson wrote:
The fun of working things out for yourself. :-)

The dispatcher makes the decision if the call can be optimised. Send must always invoke the dispatcher to make the routing decision. This is needed for the remote calls anyway.
I get that, but I don't understand how the dispatcher could say anything more than if the destination actor is scheduled on the same thread or not. It cannot predict what the direct dispatch of the destination actor's will do. My example shows the problem with direct dispatch if two actor's (on the same thread) end up sending msgs to each other and both get optimized to direct dispatch calls. Maybe this falls into your secret sauce category :)

Consider the example where Actor A and Actor B are on the same thread:
1) Actor A receives a msg - SayHelloToB and does something like this:
  if (msg == sayHelloToB) {
    Change size variable of internal hash map's backing array. At this moment this change is partial and assumed not partially visible to future messages to this actor.
    Send msg HelloFromA to Actor B
    Allocate a new backing array for internal hash map. Now the change is complete.
   } else if (msg == ThankYouFromB) {
     Use internal hash map to make life altering decision.
   }
2) Actor B receives the HelloFromA msg and decides to msg ActorA back with ThankYouFromB
  if (msg == HelloFromA) {
    Send msg ThankYouFromB to Actor A
  }

Since Actor A and Actor B are on the same dispatcher, these send actions are optimized to direct calls. What ends up happening on Actor A is this:
  if (msg == sayHelloToB) {
    Change size variable of internal hash map's backing array. At this moment this change is partial and assumed not visible to future messages to this actor.
    // Expanded from processing logic for msg ThankYOuFromB.
    Use internal hash map for life changing decision - BOOM.
     ...
    Allocate a new backing array for internal hash map. Too late!
   }
 
IMO actor messages can only be optimized to direct dispatch only if the entire call chain (cycles and all) sends messages to each other as the last statement of a processing block. A bit like tail call optimization in certain languages. Either this needs to be enforced compile time, or made an unofficial rule for the application programmer to follow.

Rajiv Kurian

unread,
Jan 8, 2014, 4:29:25 AM1/8/14
to mechanica...@googlegroups.com
:-) That's what I said though! Quoting myself - "Direct dispatch without a thread local queue is dangerous."

Martin Thompson

unread,
Jan 8, 2014, 4:36:33 AM1/8/14
to mechanica...@googlegroups.com
We should probably tighten up terminology to avoid confusion. 

Let's avoid the use of the term "direct dispatch" as it can imply calling the receive/process method directly on the stack. We need to preserve the thread safety of the actor so no new thread can enter until the last message is processed.

We need to look at where the costs are greatest by profiling. In my experience that is the concurrent exchange of messages. So if a message can be "handled" locally on the dispatcher thus avoiding the concurrent exchange then this can be a performance boost. Dispatcher local optimisations must preserve the thread safety of the actor. Your "tail call" optimisation example is interesting but could be a bit restrictive for general purpose.

Martin Thompson

unread,
Jan 8, 2014, 4:39:04 AM1/8/14
to mechanica...@googlegroups.com

On Wednesday, 8 January 2014 09:29:25 UTC, Rajiv Kurian wrote:
:-) That's what I said though! Quoting myself - "Direct dispatch without a thread local queue is dangerous."

Sorry I missed that! :-) 

William Louth

unread,
Jan 8, 2014, 7:18:36 AM1/8/14
to mechanica...@googlegroups.com
Could you elaborate on what you deem "blocking" and "non-blocking" and whether the definition assumes the computational universe to be resource limited or not?

On Tuesday, December 31, 2013 12:16:18 PM UTC+1, Martin Thompson wrote:
I believe we are long overdue an actor platform developed with mechanical sympathy for modern processors and memory subsystems. IMHO any actor platform that allows blocking calls or futures has made the most fundamental mistake :-) 

Martin Thompson

unread,
Jan 8, 2014, 7:22:30 AM1/8/14
to mechanica...@googlegroups.com
By blocking I mean waiting on the action completing. Completion of an action can come back as another event asynchronously.

Martin Thompson

unread,
Jan 8, 2014, 7:24:42 AM1/8/14
to mechanica...@googlegroups.com

 "whether the definition assumes the computational universe to be resource limited or not?"

Can you expand on what you mean by this? Maybe give an example. 

Rüdiger Möller

unread,
Jan 8, 2014, 7:30:42 AM1/8/14
to
If this is your actor contract (correct me if you see something missing):

(1) If Actor A sends messages to Actor B, it is guaranteed, that all messages are received+processed in the order sent.
If two Actors A,B send messages concurrent to Actor C, sequential consistency is only guaranteed for A=>C and B=>C, it is not guaranteed, that a message sent by Actor A to C "before" B sending a message to C is received by C first.

(2) During processing of a message inside an actor, no other message can be processed by this actor. (from your sample)

Without (2) one can do:

            public boolean doDirectCall(String methodName, ActorProxy proxy) {
                return proxy.getDispatcher().getWorker() == Thread.currentThread();
            }


If you want to guarantee (2), you'll need to check the call stack of the current thread. If there is already a method call of "this" on current thread, you need to queue a call, else you could do direct dispatch.

The easiest way to achieve this is to keep a counter at the actor instance. Once a message is started, do counter++, after that do counter--.
            public boolean doDirectCall(String methodName, ActorProxy proxy) {
                return proxy.getDispatcher().getWorker() == Thread.currentThread() && proxy.getActor().getCurrentInvocationCount() == 0;
            }

If an actor calls a message on itself, the counter must not be incremented.

William Louth

unread,
Jan 8, 2014, 7:35:49 AM1/8/14
to mechanica...@googlegroups.com
a delay in the posting of an event/signal/action/... because of a control mechanism, adaptive control valves or QoS, could be viewed as "blocking" but most definitely necessary in the context of limited resources

William Louth

unread,
Jan 8, 2014, 7:44:34 AM1/8/14
to mechanica...@googlegroups.com
The reason I care little for the current crop of actor programming models & implementations is for a more fundamental reason that is sorely missed in all the discussions below...the actors, the systems and their channels/links/queues are not adaptive and hence designed and/or configured with some particular model of how might the software act and not how it does act. We should put down these toys and try to first figure out how software can become (evolve) intelligent at runtime beyond the limited view of the coder or deployer. An adaptive mechanism builds on lower adaptive mechanisms...turtles all the way down. In practice this means that any values we use to drive the adaptive mechanism at one level should be themselves adaptive at another. This is not the case for Akka or anything of the sorts.

Martin Thompson

unread,
Jan 8, 2014, 7:46:33 AM1/8/14
to mechanica...@googlegroups.com
If these are sent as queued messages then they are non-blocking and not delayed in any way.

I don't have a clear view of how sending events has a relationship to the "computational universe" as you put it. Are you referring to how your product stalls a thread as a means of doing resource control? If so, then I think this could be considered a band aid for a poor scheduler implementation in the actor dispatcher :-) 

I like to manage limited resources; such as storage, network, or databases; with specific actors that employ smart batching to get best utilisation out of the resource.

William Louth

unread,
Jan 8, 2014, 8:22:14 AM1/8/14
to mechanica...@googlegroups.com
If there is a queue then there is always a potential delay (blocking) or rejection unless resources are unlimited...this applies to both the physical & digital world (both computational). A call is always potentially "blocking"...async or not. I think we need better naming classifications for (coordinated) interactions across boundaries...code or flow.

William Louth

unread,
Jan 8, 2014, 8:28:49 AM1/8/14
to mechanica...@googlegroups.com
QoS is a band-aid?

EOT.

Rüdiger Möller

unread,
Jan 8, 2014, 8:34:57 AM1/8/14
to mechanica...@googlegroups.com
I agree dynamic adaption needs to be addressed (see Martin's statements). I don't know an existing actor lib doing this. However in order to make full use of multiprocessor hardware, there is IMO no alternative to actor-alike approaches, they are more easy to program and create less contention compared to tradional shared memory MT. It should be possible to create better adaptive Actor implementations. Think of a system consisting of a cloud of actors. With intelligent dispatchers it could be possible to dynamically move actors from one Dispatcher (~Core) to another at runtime.

For this, the following problems must get solved:

- fast dispatch for actors served by the same thread
- a performance-neutral capture of interaction patterns at runtime. I am currently experimenting with "sentinel" messages put into the system (e.g. each 10.000'th message)
- efficient migration of an actor from one to another dispatcher.

a system like this could deliver breakthrough performance on multicore (>8) hardware with a managable programing interface.

Martin Thompson

unread,
Jan 8, 2014, 9:03:21 AM1/8/14
to mechanica...@googlegroups.com

On Wednesday, 8 January 2014 13:22:14 UTC, William Louth wrote:
If there is a queue then there is always a potential delay (blocking) or rejection unless resources are unlimited...this applies to both the physical & digital world (both computational). A call is always potentially "blocking"...async or not. I think we need better naming classifications for (coordinated) interactions across boundaries...code or flow.

I think you are trying to redefine the the well established meaning of non-blocking from concurrent algorithm theory.

A queue can be non-blocking. For example, offer can return false when full thus not blocking the calling thread. This thread is now free to do other work, i.e. it is not blocked.

All resources are limited and thus the best way to manage them to make queues bounded and apply appropriate back pressure. Little's Law can be employed to established the required bounds on queue length to provide QoS, or more simply put - response time guarantees. If a system cannot honour the response time guarantees then it simply rejects new input in a friendly manner.  I have blogged about this.

Martin Thompson

unread,
Jan 8, 2014, 9:36:17 AM1/8/14
to mechanica...@googlegroups.com
Interesting. I've not tried tricks like this.

I've had actor systems that need to be transactional in face of exceptions. To address this I queue messages to be sent until the end of processing then let them go. The same technique can be employed to ensure the actor is not reentrant.

On Wednesday, 8 January 2014 12:29:20 UTC, Rüdiger Möller wrote:
If this is your actor contract (correct me if you see something missing):

(1) If Actor A sends messages to Actor B, it is guaranteed, that all messages are received+processed in the order sent.
If two Actors A,B send messages concurrent to Actor C, sequential consistency is only guaranteed for A=>C and B=>C, it is not guaranteed, that a message sent by Actor A to C "before" B sending a message to C is received by C first.

(2) During processing of a message inside an actor, no other message can be processed by this actor. (from your sample)

Without (2) one can do:

            public boolean doDirectCall(String methodName, ActorProxy proxy) {
                return proxy.getDispatcher().getWorker() == Thread.currentThread();
            }


If you want to guarantee (2), you'll need to check the call stack of the current thread. If there is already a method call of "this" on current thread, you need to queue a call, else you could do direct dispatch.

The easiest way to achieve this is to keep a counter at the actor instance. Once a message is started, do counter++, after that do counter--.
            public boolean doDirectCall(String methodName, ActorProxy proxy) {
                return proxy.getDispatcher().getWorker() == Thread.currentThread() && proxy.getActor().getCurrentInvocationCount() == 0;
            }

If an actor calls a message on itself, the counter must not be incremented.



Am Mittwoch, 8. Januar 2014 10:15:03 UTC+1 schrieb Rajiv Kurian:

Martin Thompson

unread,
Jan 8, 2014, 9:55:26 AM1/8/14
to mechanica...@googlegroups.com
Some very good points here.

I've employed some rebalancing techniques based on queue lengths and drain rates but it feels like childs play compared to what should be possible.

A big part of actor based systems I feel is missing is the encapsulation of expensive resources. For example, logging to disk, external comms, large computations etc. It is critical these do not get contended and used effectively.  For this I normally dedicate an actor, or pool of actors, to each resource. I've not been able to find much research on this area other than taking inspiration from driver and kernel design patterns.

Rüdiger Möller

unread,
Jan 8, 2014, 11:16:42 AM1/8/14
to mechanica...@googlegroups.com


Am Mittwoch, 8. Januar 2014 15:55:26 UTC+1 schrieb Martin Thompson:
Some very good points here.

I've employed some rebalancing techniques based on queue lengths and drain rates but it feels like childs play compared to what should be possible.

Not that I have a concrete working implementation, I thought about introducing backpressure (delay not blocking) into actor systems to contain Q sizes as migrating an actor gets more problematic the longer the queue of pending messages is (even with your idea of forwarding you still get extra load when the system already is in a pressure situation). Observed backpressure then could be the trigger for a re-balance. What do you think ?

Regarding rebalancing: Instead of "moving actors" one could consider "Dispatcher Split" to rebalance, splitting one Dispatcher into 2 new ones. I am not sure if this gives an advantage over "moving" single actors, just a gut feel idea.
 

A big part of actor based systems I feel is missing is the encapsulation of expensive resources. For example, logging to disk, external comms, large computations etc. It is critical these do not get contended and used effectively.  For this I normally dedicate an actor, or pool of actors, to each resource. I've not been able to find much research on this area other than taking inspiration from driver and kernel design patterns.


I have no idea about that except "slow things are slow" :-). I am falling back to threads (-pools) in order to isolate potential blocking operations, using an "Anonymous Actor" dispatched in the invoking actor's thread in case I need to transfer results back. But that's trivial.

Btw, if you ever should decide to give up on secretiveness and create an open source project, I'd happily contribute. I am not too deep into the details of lockfree programming and besides time issues providing the 2cnd fastest actor library is not that tempting :-).

I also think finding an intuitive model to programmers, which doesn't require house-keeping (e.g. register/unregister actors) is very, very important and has to be considered right from the start.

 

Darach Ennis

unread,
Jan 8, 2014, 12:06:03 PM1/8/14
to mechanica...@googlegroups.com
"I also think finding an intuitive model to programmers, which doesn't require house-keeping (e.g. register/unregister actors) is very, very important and has to be considered right from the start.""

Why hide the registration / unregistration of actors at all? The ability to link, monitor and act upon lifecycle events is an important building
block for composing robust systems from more primitive parts. Erlang and Akka supervisors make this an absolute pleasure. Once the overhead or
the house-keeping (eg: OTP or Akka 'boilerplate') is mastered it works forevermore in your favor.

Speaking for Akka, although I've never used it in production, it has excellent documentation and examples. For example, Erlang the language you
can learn in a day. The OTP (actor) environment will take a few days to get your head around and a few weeks (using it every day) to use reasonably effectively. A small expenditure in effort. The payback though grows with your mastery.

Sometimes that little extra house-keeping is a good thing. A good high level introduction to Erlang scheduling by Jesper Louis Andersen:


A weightier tome on scalability of the Erlang VM:


Akka's scheduler:


Kresten Krab Thorup has some slides on the reasoning behind his approach to Erlang on the JVM:


In general though, event-based actors are, at least for me, far more intuitive and hassle free (given an equivalent to Erlang/OTP or Akka)
than thread-based designs. Yes, the model isn't perfect as synchronous message passing can introduce deadlocks but its better than most
other models because its very intuitive. It would certainly be nice to see some mechanical sympathy in the Erlang/OTP runtime and I'm hoping
some of the discussions on this thread (& this list in general) reach the Erlang folk (mechanical sympathy isn't just for C-like languages and the
JVM) too.

Cheers,

Darach.


--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.

Rajiv Kurian

unread,
Jan 8, 2014, 1:08:14 PM1/8/14
to mechanica...@googlegroups.com
The counter trick seems like it would work at first glance! Have you tried it?

Rüdiger Möller

unread,
Jan 8, 2014, 1:35:32 PM1/8/14
to mechanica...@googlegroups.com
I have used it in a similar problem (different area), I will add it to my actor prototype once I have time (gotta work since i spent too much time posting ;-) ). Currently I am still obsessed with computing Pi :-))

Rüdiger Möller

unread,
Jan 8, 2014, 1:59:54 PM1/8/14
to mechanica...@googlegroups.com

"I also think finding an intuitive model to programmers, which doesn't require house-keeping (e.g. register/unregister actors) is very, very important and has to be considered right from the start.""

Why hide the registration / unregistration of actors at all? The ability to link, monitor and act upon lifecycle events is an important building
block for composing robust systems from more primitive parts. Erlang and Akka supervisors make this an absolute pleasure. Once the overhead or
the house-keeping (eg: OTP or Akka 'boilerplate') is mastered it works forevermore in your favor.

My ideal (don't know if it is realizable):

  • I want actors to be garbage collectable (so no lists of actors are kept nowhere). I then can use them in a very fine grained fashion (use Actors instead Objects). I want to avoid the necessity to register/unregister them (error prone). 
  • Ideally, you create create an actor via a byte-code weaving factory  (already implemented) like
    MyActor act = Actors.New(MyActor.class).init(); // init message done already in actor's thread
    The system then looks for an appropriate DispatchingThread.
  • If an Actor is instantiated from within another Actor, it inherits the Dispatcher, except explicitely specified otherwise. E.g. "Actors.New(SubActor.class)" vs. "Actors.New(SubActor.class, Actors.AnyDispatcher())".
    This way one could use anonymous temporary actor instances for callback/future results like:
    actor.someMethod( 1,2,3, Actors.Anonym( (Double d) => { .. processing ...} );
    The Actor balancing system only separates actors which have been allocated using 
    "Actors.New(SubActor.class, Actors.AnyDispatcher())". So one can force Actors to share a Dispatcher(-Thread) by using the appropriate constructor.
The only thing a user has to configure is the number of Threads (Dispatchers) used at max.

 

Speaking for Akka, although I've never used it in production, it has excellent documentation and examples. For example, Erlang the language you
can learn in a day. The OTP (actor) environment will take a few days to get your head around and a few weeks (using it every day) to use reasonably effectively. A small expenditure in effort. The payback though grows with your mastery.

Sometimes that little extra house-keeping is a good thing. A good high level introduction to Erlang scheduling by Jesper Louis Andersen:


A weightier tome on scalability of the Erlang VM:


Akka's scheduler:


Kresten Krab Thorup has some slides on the reasoning behind his approach to Erlang on the JVM:


In general though, event-based actors are, at least for me, far more intuitive and hassle free (given an equivalent to Erlang/OTP or Akka)
than thread-based designs. Yes, the model isn't perfect as synchronous message passing can introduce deadlocks but its better than most
other models because its very intuitive. It would certainly be nice to see some mechanical sympathy in the Erlang/OTP runtime and I'm hoping
some of the discussions on this thread (& this list in general) reach the Erlang folk (mechanical sympathy isn't just for C-like languages and the
JVM) too.

Cheers,

Darach.



Thanks for inspiring lecture :-) 

Rüdiger Möller

unread,
Jan 8, 2014, 2:17:20 PM1/8/14
to mechanica...@googlegroups.com
That's also nifty :-) . Once the ability to "record" method call sequences is there (e.g. runtime generated proxies) one could even create kind of "Transaction Objects" which can be replayed to different instances of actors, modified, concatenated, validated ... 

I am getting Nerd Flash's right now ..

Martin Thompson

unread,
Jan 8, 2014, 2:24:41 PM1/8/14
to mechanica...@googlegroups.com
You might want to have a look at Elixir


Actors/Processes are more like objects in usage. However it runs on Beam which is not that performant.

Darach Ennis

unread,
Jan 8, 2014, 2:34:25 PM1/8/14
to mechanica...@googlegroups.com, mechanica...@googlegroups.com
Yes, Elixir is not dissimilar to your ideal so the references to erlang scheduling are relevant. As Martin says, it's more of a road car than a formula one! :)

Cheers,

Darach.
--

Rüdiger Möller

unread,
Jan 8, 2014, 2:39:19 PM1/8/14
to mechanica...@googlegroups.com
Never seen that, thanks for the hint.

However I am pretty confident i can do this in regular Java using runtime bytecode generation (in fact I already have a working prototype pretty much looking like that, however I have not implemented balancing and I use a plain polled ConcurrentLinkedQueue). I'll dedicate this night to check out sentinel messages for communication-pattern capture, also I'd like to checkout effects of using the MPSC queue of the mighty Nitsan instead of ConcurrentLinkedQueue :-).

Rüdiger Möller

unread,
Jan 9, 2014, 8:15:51 PM1/9/14
to
Having fiddled with a prototype actor implementation and trying to figure out opportunities for dynamic load balancing (generated randomly interconnected actors with rand bursts of output) my excitement on actors diminished somewhat ;)

- in a dynamic system there are inherently actors building up huge incoming queues due to N:1 situations (bad latency, mem alloc) and/or busts
- to avoid these,
   a) one would need a generic way to scale actors (split) them, so they get faster. Not possible without changing the actor contract
   b) one would need the possibility to put 'non-blocking' backpressure throughout the actor graph to the root cause of overload. This would require continuations or having one thread per actor.

The issues are identical to messaging based clusters (as actors are an "in-memory" cluster). One can get it to work using semi-static resource allocation + (partially) handcrafted backpressure using explicit feedback channel communication.
The better scaling of actors vs synchronized threads in high event frequency scenarios can be attributed to Q's avoiding cross-core contention. However to avoid contention a Q of size[number of senders] is sufficient. No need to have 5 million element size queues hanging around.

Another issue is the fact, that when trying to schedule actors with different priorities on a single thread (to avoid huge Q's) one rebuilds functionality already implemented (most probably in a more efficient fashion) at OS level when scheduling threads.

These issues make me think a CSP model might be implementable in a more efficient way. Has anybody experience with CSP-based systems ?.


Rajiv Kurian

unread,
Jan 9, 2014, 8:48:41 PM1/9/14
to mechanica...@googlegroups.com


On Thursday, January 9, 2014 5:04:40 PM UTC-8, Rüdiger Möller wrote:
Having fiddled with a prototype actor implementation and trying to figure out opportunities for dynamic load balancing (generated randomly interconnected actors with rand bursts of output) my excitement on actors diminished somewhat ;)

- in a dynamic system there are inherently actors building up huge incoming queues due to N:1 situations (bad latency, mem alloc) and/or busts
- to avoid these,
   a) one would need a generic way to scale actors (split) them, so they get faster. Not possible without changing the actor contract
   b) one would need the possibility to put 'non-blocking' backpressure throughout the actor graph to the root cause of overload. This would require continuations or having one thread per actor.
Based on what we have been discussing, you should have an entire thread almost exclusively dedicated to one actor since the incoming queues should be full of messages for said actor. Unless more than one busy actor got scheduled to the same core. Is this what happened (maybe rebalancing not implemented yet)? Or is it that the load for a single actor was so much that a single thread could not handle it. If that is the case then I think the actor model is no good because of it's inherent single threaded nature. Or alternatively you'll have to manually split your actor into n actors each handling 1//nth of the traffic. Curious to know what was the problem.

The issues are identical to messaging based clusters (as actors are an "in-memory" cluster). One can get it to work using semi-static resource allocation + (partially) handcrafted backpressure using explicit feedback channel communication.
The better scaling of actors vs synchronized threads in high event frequency scenarios can be attributed to Q's avoiding cross-core contention. However to avoid contention a Q of size[number of senders] is sufficient. No need to have 5 million element size queues hanging around.

Another issue is the fact, that when trying to schedule actors with different priorities on a single thread (to avoid huge Q's) one rebuilds functionality already implemented (most probably in a more efficient fashion) at OS level when scheduling threads.

These issues make me think a CSP model might be implementable in a more efficient way. Has anybody experience with CSP-based systems ?.
What would make CSP models more efficient? 

Rüdiger Möller

unread,
Jan 10, 2014, 4:38:56 AM1/10/14
to mechanica...@googlegroups.com


- in a dynamic system there are inherently actors building up huge incoming queues due to N:1 situations (bad latency, mem alloc) and/or busts
- to avoid these,
   a) one would need a generic way to scale actors (split) them, so they get faster. Not possible without changing the actor contract
   b) one would need the possibility to put 'non-blocking' backpressure throughout the actor graph to the root cause of overload. This would require continuations or having one thread per actor.
Based on what we have been discussing, you should have an entire thread almost exclusively dedicated to one actor since the incoming queues should be full of messages for said actor. Unless more than one busy actor got scheduled to the same core. Is this what happened (maybe rebalancing not implemented yet)? Or is it that the load for a single actor was so much that a single thread could not handle it. If that is the case then I think the actor model is no good because of it's inherent single threaded nature. Or alternatively you'll have to manually split your actor into n actors each handling 1//nth of the traffic. Curious to know what was the problem.

Balancing implicitely requires to run more than one actor on a core (thread). As soon you run more than one Actor on a single thread, you risk imbalance (Think of A sends to B+C, for each incoming msg B sends 2 msg to C). Imbalance means queues are building up somewhere. I we had continuations (actually there is one impl: Java Flow), one could try to prioritize actors on a single thread. But then you are reimplementing the thread scheduler in Java. This might be useful as an OS-level context switch is probably more expensive than that. 
Big Queues hamper performance and slow down Actor migration. To avoid big queues, I need to employ back pressure. If I schedule several actors on a single thread, (without a custom scheduler) backpressure on one actor also hits all actors scheduled on the same thread (=>risk of deadlocks, starvation)

A solution is (as you said) to use a thread per actor and than pin them to cores depending on runtime observed message flow. In this scenario one can create "Actor-isolated" backpressure (e.g. by slowing down the sending thread) another solution is a green thread model using continuations.
 
What would make CSP models more efficient? 

From what I understood (I am not a very academic person, so sometimes use wrong names):

  • Actors replace backpressure with queueing. 
  • in CSP there is no queueing, instead a sender is blocked if the receiver is busy.
Think of 2 Actors A,B where A is sending to B, but B is twice as slow. A way to solve this in a generic fashion would be to "automatically" clone B, then re-sequence the output of B and B' to keep FIFO (let B+B' look like a single B to the application). But "cloning" Actors automatically cannot be implemented transparently to the application (haven't thought in-depth about that but its obvious).

If I would use the CSP model, A would get blocked until B is done. So you simply slow down the 'root cause' of imbalance. The challenge here is to implement the "blocking" without actually eating up CPU. Additionally without large queues one can migrate "CSP-Actors" at low cost to other core's.

If you think of a complex mesh of Actors, it is pretty clear that "blocking" is much simpler to handle than queuing.

Adam Zell

unread,
Jan 22, 2014, 11:05:39 AM1/22/14
to mechanica...@googlegroups.com
FYI: http://blog.paralleluniverse.co/2013/05/02/quasar-pulsar/

"There have been several attempts of porting actors to the JVM. Quasar and Pulsar’s main contribution — from which many of their advantages stem — is true lightweight threads 1. Lightweight threads provide many of the same benefits “regular”, OS threads do, namely a single, simple control flow and the ability to block and wait for some resource to become available while in the meantime allowing other threads to run on the CPU. Unlike regular threads, lightweight threads are not scheduled by the operating system so their context-switch is often faster, and they require far less system resources. As a result, a single machine can handle millions of them."
Reply all
Reply to author
Forward
0 new messages