Any more details Martin?
--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-sympathy+unsub...@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.
ExcerptTailer tailer = chronicle.createTailer();
int threadId = AffinitySupport.getThreadId();
// in a busy loop, check there is an excerpt and this is the only consumer.
if (tailer.hasNext() && tailer.compareAndSwapInt(0L, 0, threadId)) {
tailer.position(4); // skip the lock.
// binary format
long v = tailer.readXxxx();
String text = tailer.readEnum(String.class); // read a UTF-8 String using a string pool.
// text format
long x = tailer.parseLong();
double y = tailer.parseDouble();
tailer.finish();
}
Erjang (Erlang on the JVM) possibly offers a nice environment to test out mechanically sympathetic actor models.You should mention the 1P1C mesh idea to Kresten. I think Nitsan has had similar structure of 1P1C memory mappedfile based queue notions too, but not actor model related AFAIKR.There's a crew in NYC considering building an actor model implementation in Go which would also be interesting...
On Tue, Dec 31, 2013 at 12:16 PM, Martin Thompson <mjp...@gmail.com> wrote:
The real value of the Disruptor is in supporting a mostly static graph of dependencies between event processors, or multicasting an event from a single producer to multiple consumers. It can be used as a replacement for a single queue but this is not the sweet spot for its usage. When I developed the performance tests for the Disruptor I deliberately highlighted some of these configurations.An actor based system will have actors come and go dynamically and dispatch needs to be the most efficient mechanism possible. I prefer to not use a mailbox per actor model and instead have a dispatcher per core with a mesh of 1P1C queues backed by memory mapped files. By memory mapping the queues, the queues then reside outside the Java heap and thus remove the GC pressure from Java and allow multiple language interop.
I believe we are long overdue an actor platform developed with mechanical sympathy for modern processors and memory subsystems. IMHO any actor platform that allows blocking calls or futures has made the most fundamental mistake :-)
On Tuesday, 31 December 2013 10:49:52 UTC, Rajiv Kurian wrote:Saw this tweet from Martin on implementation of Actor systems:
"I don't think the Disruptor is the best choice :-) . A mesh of point-to-point memory mapped files - Longer story."Any more details Martin?
--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-sympathy+unsub...@googlegroups.com.
Not sure if Go is a great way to build such systems as it currently stands. There isn't any way to opt out of the go-routine runtime. Using channels as they stand right now wouldn't be a great idea either since they are designed for MPMC scenarios and the implementation is chock full of mutexes - there was a patch by Dmitry Vyukov to change this, not sure if it got merged. Also last time I checked the support for C++11 like atomics (std::acquire and std::release) required to build efficient 1P1C queues was also lacking in the std library.
I actually raised this idea in the Akka dev list once (not the memory mapped queues part).The complication with having a mesh of 1P1C queues which are used to communicate between a lot of actors is work stealing. With anecdotal evidence I have found that static scheduling (assuming that's what you meant) doesn't work with general frameworks where some actors will take more time to process messages than others (image processing actor vs looking up some values from a map). The work across cores will inevitably get imbalanced if we just statically schedule actors to cores. For example if say core 1 is busy and core 2 is out of work we will need to:i) Design an efficient way to migrate some of the work from core 1's incoming 1P1C queues (one per other core) to core 2.ii) Remember which actors' messages have been handed out to other idle cores so when we encounter messages for them later (on core 1) we don't process them till the stealing core is done with it's stolen cache of messages and the results have been made visible to core 1. This is to maintain sequential consistency.iii) Revisit the skipped over messages later to deal with them.This is just one possibility. Maybe there are better ways. The one mailbox per actor just makes it easy (not efficient) to handle work stealing where an idle core just steals a mailbox. The price to pay is MPSC queues and lack of locality among others.Martin, do you have any ideas for work stealing in a mesh of 1P1C queues setup or some ways of avoiding it all together? That would make this design really attractive and surely be a boost to performance for actor systems without any loss of semantics.
If i understand Martins idea correctly (1 dispatcher per core), one would have to move actors from one dispatcher to another for balancing ?
Another note: On a single machine its more easy to put backpressure to senders, once you scale out on multiple machines, backpressure has higher latency which can lead to strange 'resonance' effects resulting in unstable throughput and unpredictable latency peaks and throughput low's. We get rid of all this with the per sender thread tweak.
Its clear that blocking calls aren't applicable at all in a reactive framework, but what problems do you see in 'Futures' ?
Its clear that blocking calls aren't applicable at all in a reactive framework, but what problems do you see in 'Futures' ?
I find it is best to initially statically allocate with some consideration to special case actors, e.g. large compute jobs or IO. Rebalancing needs to be based on both queue length and drain rate per dispatcher but also importantly which actors have more frequent communications so they get co-located. This can be achieved by having a central load balancer that can put dispatcher queues into an administration mode to rebalance. This allows the 1P1C semantics to be preserved without the overhead of work stealing.
Just to be sure by static allocation you mean some scheme that would let each individual dispatcher computationally (without any central look up table) know which dispatcher a particular actor belongs to? For example: hash an actor-id to one of the dispatchers using the same hash function.Could you please provide some more details on how the central load balancer would work? I understand the notion of co-locating actors that communicate frequently but am trying to wrap my head around how rebalancing would exactly be done (implementation wise). I am specifically wondering about:i) How the central load balancer will move actors from one dispatcher to the other? Specifically how will the other dispatchers know about this change and atomically cut over to the new dispatcher when sending a message to an actor. For example: say actor-1 was moved from dispatcher-1 to dispatcher-2 because of either co-location or rebalancing benefits. Dispatchers 3-n also need to know about this change so that when they address actor-1 in the future they enqueue the messages to the right dispatcher's incoming queue. Also dispatcher-1 which just lost actor-1 could still have messages for actor-1 left on it's incoming queues. These need to be processed before any other messages for actor-1 can be processed by it's new dispatcher.
Its clear that blocking calls aren't applicable at all in a reactive framework, but what problems do you see in 'Futures' ?
I find it is best to initially statically allocate with some consideration to special case actors, e.g. large compute jobs or IO. Rebalancing needs to be based on both queue length and drain rate per dispatcher but also importantly which actors have more frequent communications so they get co-located. This can be achieved by having a central load balancer that can put dispatcher queues into an administration mode to rebalance. This allows the 1P1C semantics to be preserved without the overhead of work stealing.
Just to be sure by static allocation you mean some scheme that would let each individual dispatcher computationally (without any central look up table) know which dispatcher a particular actor belongs to? For example: hash an actor-id to one of the dispatchers using the same hash function.Could you please provide some more details on how the central load balancer would work? I understand the notion of co-locating actors that communicate frequently but am trying to wrap my head around how rebalancing would exactly be done (implementation wise). I am specifically wondering about:i) How the central load balancer will move actors from one dispatcher to the other? Specifically how will the other dispatchers know about this change and atomically cut over to the new dispatcher when sending a message to an actor. For example: say actor-1 was moved from dispatcher-1 to dispatcher-2 because of either co-location or rebalancing benefits. Dispatchers 3-n also need to know about this change so that when they address actor-1 in the future they enqueue the messages to the right dispatcher's incoming queue. Also dispatcher-1 which just lost actor-1 could still have messages for actor-1 left on it's incoming queues. These need to be processed before any other messages for actor-1 can be processed by it's new dispatcher.Each dispatcher keeps track of what actors it hosts so local comms can be optimised away. When references are handed out or migrated they can then be registered with a central directory. The load balancer also manages the directory thus preserving the single writer principle. Directories can be federated to scale and provide local caching across node clusters.Sorry I keep pestering for details - I am attempting to build a prototype to do something like this actually. Is my summary correct?i) A load balancer thread or multiple threads in case of a federated directory looks at metrics like queue length, drain rate and communication patterns to rebalance actors across dispatchers.ii) When a dispatcher needs to send a message to an actor it first checks if the actor is on the same dispatcher. If yes then it can just make a method call instead of any communication. If no then it looks it up the right dispatcher from a central directory and places the message on the corresponding queue.iii) When the load balancer thread determines that actor-1 should be moved away from say dispatcher-1 to dispatcher-2 it needs to first communicate with the dispatcher-1 so that it can set up a proper forwarding map. The load balancer can then update the central directory. Eventually all dispatchers will send actor-1's messages to their respective outgoing queues to dispatcher-2. And since this only happens eventually the forwarding map will take care of all the messages sent to dispatcher-1 in the meantime.
The forwarding map though doesn't take care of messages already on dispatcher-1's incoming queues. Consider the following timeline:
1) Actor-2 on some other dispatcher (maybe dispatcher-3) could have sent 50 messages destined for actor-1 to dispatcher-1. These are still waiting to be processed.2) Now the load balancer thread starts a rebalance event proposing actor-1 be moved from dispatcher-1 to dispatcher-2.3) In response to (2) the forwarding map is set up on dispatcher-1 and the load balancer also updates the central directory. All future messages from actor-2 will end up going to dispatcher-2, either through the forwarding map (which might happen in the beginning) or through a look up on the central directory.4) Actor-2 learns of the change in actor-1's dispatcher and sends msg #51 to dispatcher-2. Dispatcher-2 being idle immediately processes msg #51.5) Now dispatcher-1 gets around to seeing the 50 messages from actor-2 and forwards them to dispatcher-2.6) Dispatcher-2 now processes them serially. What ends up happening is that msg #51 (sent from actor-2) is processed before msg #1-50 (also from actor-2) which breaks the actor contract of serial processing for a sender-destination pair.
What we need to ensure is that till such time that actor-2 (on dispatcher-3) is sending messages to dispatcher-1 (eventually forwarded) instead of directly to dispatcher-2, these messages need to be processed before the ones sent directly to dispatcher-2. A simple forwarding map doesn't guarantee that. I feel like some kind of two phase commit protocol is needed. Something like:
1) Load balancer thread proposes a rebalance event moving actor-1 from dispatcher-1 to dispatcher-2.2) All threads once they read this event will enqueue future messages destined for actor-1 in some local queue (instead of sending them to dispatcher-1) and signal the load balancer that they are now aware of the rebalance mode. Dispatcher-1 will also enqueue messages for actor-1 on this local queue instead of invoking method calls.
3) The load-balancer now signals dispatcher-1 that the other dispatchers are aware of this proposal. Now dispatcher-1 is guaranteed that no other dispatcher will be sending messages destined for actor-1 on any of it's incoming queues (from step 2).4) Now dispatcher-1 scans it's incoming queues and forwards all messages (thus maintaining order) for actor-1 to dispatcher-2. Only a single scan will do since it is guaranteed that any concurrent incoming messages will not be for actor-1. This ensures that all messages already sent to actor-1 (hence on dispatcher-1's queues) before or during the rebalance event are forwarded to dispatcher-2 before newer messages are enqueued/processed.5) Dispatcher-1 can then signal the load-balancer thread that it's done forwarding all old messages for actor-1 to dispatcher-2. The load balancer can then signal all the other threads that the forwarding phase is over.6) Now all the threads forward the messages enqueued in their temporary local queues (from step-2) to dispatcher-2. From then on they also send messages destined for actor-1 to dispatcher-2 (via the central directory). TheThis should ensure that the actor contract is maintained across rebalancing events.
Just for clarification,
a) do you support direct dispatch (without Q) for 'local' messages ?
b) do you maintain a Q per dispatcher, per actor, or per actor per sender-to-this-actor ?
--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-sympathy+unsub...@googlegroups.com.
Just for clarification,
a) do you support direct dispatch (without Q) for 'local' messages ?
Yes and this is why blocking calls and futures hamper so many optimisations. :-)
--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.
The fun of working things out for yourself. :-)The dispatcher makes the decision if the call can be optimised. Send must always invoke the dispatcher to make the routing decision. This is needed for the remote calls anyway.
On 8 January 2014 08:05, Rajiv Kurian <geet...@gmail.com> wrote:
On Thursday, January 2, 2014 9:02:48 AM UTC-8, Martin Thompson wrote:Just for clarification,
a) do you support direct dispatch (without Q) for 'local' messages ?
Yes and this is why blocking calls and futures hamper so many optimisations. :-)Direct dispatch without a thread local queue is dangerous. If an actor for some reason decides to msg itself, it's processing function will be invoked from within itself invalidating the actor contract. Even messages to other actors are dangerous. Say Actor A msgs Actor B, they are on the same dispatcher and so in the middle of Actor A's processing function we call Actor B's processing function. This is all fine and good unless Actor B decides to msg Actor A in response. This will again cause Actor A's processing function to be entered twice.The function invocation sequence will look something like this:actorA'sProcessingFunction() {some stuff...ActorB'sProcessFunction() {some more stuff..ActorA'sProcessingFunction() {// Bad things happen now.}}}b) do you maintain a Q per dispatcher, per actor, or per actor per sender-to-this-actor ?Either a MPSC queue per dispatcher or SPSC queues in a mesh between dispatchers for message flow. Plus an addition administration queue per dispatcher.What "throws you completely off"?
--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-sympathy+unsub...@googlegroups.com.
On Wednesday, January 8, 2014 12:52:14 AM UTC-8, Martin Thompson wrote:The fun of working things out for yourself. :-)The dispatcher makes the decision if the call can be optimised. Send must always invoke the dispatcher to make the routing decision. This is needed for the remote calls anyway.I get that, but I don't understand how the dispatcher could say anything more than if the destination actor is scheduled on the same thread or not. It cannot predict what the direct dispatch of the destination actor's will do. My example shows the problem with direct dispatch if two actor's (on the same thread) end up sending msgs to each other and both get optimized to direct dispatch calls. Maybe this falls into your secret sauce category :)Consider the example where Actor A and Actor B are on the same thread:1) Actor A receives a msg - SayHelloToB and does something like this:if (msg == sayHelloToB) {Change size variable of internal hash map's backing array. At this moment this change is partial and assumed not partially visible to future messages to this actor.Send msg HelloFromA to Actor BAllocate a new backing array for internal hash map. Now the change is complete.} else if (msg == ThankYouFromB) {Use internal hash map to make life altering decision.}2) Actor B receives the HelloFromA msg and decides to msg ActorA back with ThankYouFromBif (msg == HelloFromA) {Send msg ThankYouFromB to Actor A}
Since Actor A and Actor B are on the same dispatcher, these send actions are optimized to direct calls. What ends up happening on Actor A is this:if (msg == sayHelloToB) {Change size variable of internal hash map's backing array. At this moment this change is partial and assumed not visible to future messages to this actor.// Expanded from processing logic for msg ThankYOuFromB.Use internal hash map for life changing decision - BOOM....Allocate a new backing array for internal hash map. Too late!}
:-) That's what I said though! Quoting myself - "Direct dispatch without a thread local queue is dangerous."
I believe we are long overdue an actor platform developed with mechanical sympathy for modern processors and memory subsystems. IMHO any actor platform that allows blocking calls or futures has made the most fundamental mistake :-)
"whether the definition assumes the computational universe to be resource limited or not?"
EOT.
If there is a queue then there is always a potential delay (blocking) or rejection unless resources are unlimited...this applies to both the physical & digital world (both computational). A call is always potentially "blocking"...async or not. I think we need better naming classifications for (coordinated) interactions across boundaries...code or flow.
If this is your actor contract (correct me if you see something missing):
(1) If Actor A sends messages to Actor B, it is guaranteed, that all messages are received+processed in the order sent.
If two Actors A,B send messages concurrent to Actor C, sequential consistency is only guaranteed for A=>C and B=>C, it is not guaranteed, that a message sent by Actor A to C "before" B sending a message to C is received by C first.
(2) During processing of a message inside an actor, no other message can be processed by this actor. (from your sample)
Without (2) one can do:
public boolean doDirectCall(String methodName, ActorProxy proxy) {
return proxy.getDispatcher().getWorker() == Thread.currentThread();
}
If you want to guarantee (2), you'll need to check the call stack of the current thread. If there is already a method call of "this" on current thread, you need to queue a call, else you could do direct dispatch.
The easiest way to achieve this is to keep a counter at the actor instance. Once a message is started, do counter++, after that do counter--.
public boolean doDirectCall(String methodName, ActorProxy proxy) {
return proxy.getDispatcher().getWorker() == Thread.currentThread() && proxy.getActor().getCurrentInvocationCount() == 0;
}
If an actor calls a message on itself, the counter must not be incremented.
Am Mittwoch, 8. Januar 2014 10:15:03 UTC+1 schrieb Rajiv Kurian:
Some very good points here.I've employed some rebalancing techniques based on queue lengths and drain rates but it feels like childs play compared to what should be possible.
A big part of actor based systems I feel is missing is the encapsulation of expensive resources. For example, logging to disk, external comms, large computations etc. It is critical these do not get contended and used effectively. For this I normally dedicate an actor, or pool of actors, to each resource. I've not been able to find much research on this area other than taking inspiration from driver and kernel design patterns.
--
You received this message because you are subscribed to the Google Groups "mechanical-sympathy" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mechanical-symp...@googlegroups.com.
"I also think finding an intuitive model to programmers, which doesn't require house-keeping (e.g. register/unregister actors) is very, very important and has to be considered right from the start.""Why hide the registration / unregistration of actors at all? The ability to link, monitor and act upon lifecycle events is an important buildingblock for composing robust systems from more primitive parts. Erlang and Akka supervisors make this an absolute pleasure. Once the overhead orthe house-keeping (eg: OTP or Akka 'boilerplate') is mastered it works forevermore in your favor.
Speaking for Akka, although I've never used it in production, it has excellent documentation and examples. For example, Erlang the language youcan learn in a day. The OTP (actor) environment will take a few days to get your head around and a few weeks (using it every day) to use reasonably effectively. A small expenditure in effort. The payback though grows with your mastery.Sometimes that little extra house-keeping is a good thing. A good high level introduction to Erlang scheduling by Jesper Louis Andersen:A weightier tome on scalability of the Erlang VM:
Akka's scheduler:Kresten Krab Thorup has some slides on the reasoning behind his approach to Erlang on the JVM:
In general though, event-based actors are, at least for me, far more intuitive and hassle free (given an equivalent to Erlang/OTP or Akka)than thread-based designs. Yes, the model isn't perfect as synchronous message passing can introduce deadlocks but its better than mostother models because its very intuitive. It would certainly be nice to see some mechanical sympathy in the Erlang/OTP runtime and I'm hopingsome of the discussions on this thread (& this list in general) reach the Erlang folk (mechanical sympathy isn't just for C-like languages and theJVM) too.Cheers,Darach.
--
Having fiddled with a prototype actor implementation and trying to figure out opportunities for dynamic load balancing (generated randomly interconnected actors with rand bursts of output) my excitement on actors diminished somewhat ;)- in a dynamic system there are inherently actors building up huge incoming queues due to N:1 situations (bad latency, mem alloc) and/or busts- to avoid these,
a) one would need a generic way to scale actors (split) them, so they get faster. Not possible without changing the actor contractb) one would need the possibility to put 'non-blocking' backpressure throughout the actor graph to the root cause of overload. This would require continuations or having one thread per actor.
The issues are identical to messaging based clusters (as actors are an "in-memory" cluster). One can get it to work using semi-static resource allocation + (partially) handcrafted backpressure using explicit feedback channel communication.The better scaling of actors vs synchronized threads in high event frequency scenarios can be attributed to Q's avoiding cross-core contention. However to avoid contention a Q of size[number of senders] is sufficient. No need to have 5 million element size queues hanging around.Another issue is the fact, that when trying to schedule actors with different priorities on a single thread (to avoid huge Q's) one rebuilds functionality already implemented (most probably in a more efficient fashion) at OS level when scheduling threads.These issues make me think a CSP model might be implementable in a more efficient way. Has anybody experience with CSP-based systems ?.
- in a dynamic system there are inherently actors building up huge incoming queues due to N:1 situations (bad latency, mem alloc) and/or busts- to avoid these,
a) one would need a generic way to scale actors (split) them, so they get faster. Not possible without changing the actor contractb) one would need the possibility to put 'non-blocking' backpressure throughout the actor graph to the root cause of overload. This would require continuations or having one thread per actor.Based on what we have been discussing, you should have an entire thread almost exclusively dedicated to one actor since the incoming queues should be full of messages for said actor. Unless more than one busy actor got scheduled to the same core. Is this what happened (maybe rebalancing not implemented yet)? Or is it that the load for a single actor was so much that a single thread could not handle it. If that is the case then I think the actor model is no good because of it's inherent single threaded nature. Or alternatively you'll have to manually split your actor into n actors each handling 1//nth of the traffic. Curious to know what was the problem.
What would make CSP models more efficient?