val numWorkToGenerate = N
for (i <- 1 to numWorkToGenerate) {
timestamp = getCurrentTimestamp
work = new Work(i, timestamp)
master ! work
}
master ! NoMoreWork
... keep track of workers and what they're working on, and keep a work queue of work that still needs to be assigned ...
def receive = {
case work: Work =>
record time delta based on current timestamp and timestamp packaged in 'work'
find idle worker and, if found, update timstamp in 'work' with current timestamp and send 'work' to worker
case NoMoreWork => if the work queue is empty and none of the workers are working on anything then output all recorded times to log and quit gracefully
case WorkIsDone =>
record time delta based on current timestamp and timestamp packaged in 'work'
if work queue is not empty and there is an idle worker then do the right thing as in the 'case work' above, otherwise check if all work is done and run the NoMoreWork logic
}
... create an Executor actor that will be used to execute the work and keep track of what it's working on ...
def idle = {
case work: Work =>
executor ! work
context become working
}
def working = {
case ExecuteDone =>
master ! WorkDone
context become idle
}
def receive = {
case work: Work => sender ! ExecuteDone
}
[ReliableDeliverySupervisor:71] [WARN] - Association with remote system [akka.tcp://cluster@nid25948:38358] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
Hello,Apologies in advance for the long post... to frame my problem and questions I have to give a lot of contextual info.I am running a set of experiments to understand the feasibility and performance characteristics of running Akka on the Blue Waters (batch) supercomputer at NCSA.The general theme for the experiments is to create a map-reduce like system comprised of a producer, one (or more) coordinator(s), and a number of workers. (the reduce step could be optional)The producer is responsible for generating the "work" which is then sent to the coordinator(s) which in turn assign the work to an idle worker.At first I am most concerned with the performance of the solution (rather than fault-tolerance). The only fault-tolerance aspect I'd like to address at first consists of exceptions thrown as part of executing the "work". When that happens, the "executor" should be restarted and the coordinator informed of the failed job which would get logged and perhaps retried later (up to n times) depending on the exception. Eventually the final solution would include a checkpointing mechanism that would allow the PBS job to be resumed from the last checkpoint in case the it is killed or a malfunction occurs with BlueWaters.Besides learning about the different options/architectures that are appropriate for an environment such as Blue Waters, the problem I ultimately want to solve is to process approx. 3.2 million gzipped text files (about 3.2TB of gzipped data). The "processing" could be any number of things - feature extraction, OCR error correction, or other text-processing routines as needed.About my experience: I am no Scala and Akka expert but I do have a decent amount of experience with both. I've taken Martin's functional programming in scala course on Coursera, as well as the reactive programming course by Martin, Erik, and Roland. I've also read through a number of scala and akka books, tutorials, examples...etc. but, as with just about everything else in life, there's always more to learn.My first attempt was to adapt the Distributed Workers with Akka and Scala example just to see how that performs "out of the box". In the process I've also had to find a solution to be able to get the application deployed to the Blue Waters Torque/PBS batch environment, which wasn't trivial... but I did come up with 2 ways to do that that seem to work quite well (if anyone wants to know how, I could write about that). I did manage to get the example to work - tested it on a small subset of the data (with some changes to WorkProducer) - but I felt that this solution was too overly concerned with fault tolerance at the expense of (some) performance, so I wanted to create another solution where those concerns would be reversed. (it's still on my TODO list to get some performance measurements of the original example so that I can compare numbers across the different solutions)The way I'm planning on evaluating the performance of each solution is by having the work producer generate a known quantity of (empty) work as fast as possible, have that work be coordinated by the master over a known number of worker nodes, and have the executor on each worker do nothing else but respond immediately with an (empty) result.
When I run this, each physical compute node will run one instance of the Worker app per CPU core;The Master and Producer will be run on the same compute node. (other Worker instances will also run on that node, on the 'remaining' CPU cores)Compared to the original example I started from:
- my first solution tries to have as little communication as possible
- there is no acknowledgment from Master to Producer for each 'work' received; producer operates in a fire-and-forget style
- the pull-model used in the original example is still there, but has been simplified to be triggered by (not shown above) RegisterWorker, WorkDone, and WorkFailed messages rather than using the WorkerRequestsWork / WorkIsReady mechanism
- the result of the work is not given to the Master; the master is only informed that the work was done (this is because the simplest version of work may not need to produce a result to someone --- the result could just be an output written to a file or DB or whatever; i will explore the use of a pub-sub system to publish the result in a future version of the solution)
- FYI: i did some renaming in the code: Master has been baptized as "Coordinator" and WorkProducer became "Producer"
I'm not concerned with how realistic the above assumptions are right now -- I just want to see what the performance is of the leanest solution i can think of, learn about its shortcomings, and use that to keep iterating to improve on it. I'm also curious about the reliability of the communication between the compute nodes, and how the akka cluster behaves with no code-added fault tolerance as I scale up the number of nodes. I suspect that on Blue Waters the reliability guarantees are higher than if I were creating an akka cluster over a bunch of commodity computers.So... finally now I can describe what the problem I'm having is (thanks for reading this far!)
Basically I ran a test of the code on GitHub on BlueWaters on 3 physical nodes, each node having 32 cores (and 64GB RAM).I'm using the "<role>.min-nr-of-members" config setting to ensure that all nodes that join the akka cluster are transitioned to "Up" only after all workers, coordinator, and producer join the cluster.I've used the "registerOnMemberUp" callback as the trigger to create the Producer, Coordinator (aka Master), and Worker actors. When Workers and Producer actors start, they look up the coordinator ActorRef and the Producer uses that as the trigger to start producing the work.I've used the "producer.workCount" config setting to try different quantities of work and for small values (<100,000), everything works fine. When I tried using workCount=1,000,000 the Producer would start getting disassociated from the cluster -- it would basically detect everyone else as UNREACHABLE, or I would see many entries like the following in the logs:[ReliableDeliverySupervisor:71] [WARN] - Association with remote system [akka.tcp://cluster@nid25948:38358] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].It would seem that either the Producer becomes congested trying to send out the messages to the coordinator, or maybe the coordinator somehow becomes congested in a way that affects the Producer.
It's definitely the case that this solution isn't too viable since a single coordinator will have trouble keeping up with so many messages, especially given that it also needs to be able to exchange messages with the workers (and "in production" there might be thousands of workers, or more).In theory this should've still worked... as far as I understand it, the actor mailbox size is only limited by the amount of memory available (and I've been generous with that - and saw no OOM messages anywhere or process crashes).
I wonder whether the akka cluster extension in this case could be playing a part in the issues I'm seeing. The heartbeats or gossip messages exchanged between the cluster nodes might add unneeded traffic to a potentially-already-saturated network. I think that the simple solution I presented above could be implemented without using the cluster extension, by just using remote actors. The Producer only needs to know about the Coordinator (and monitor its lifecycle), and similar for the Workers. I wonder what the practical limits are for the maximum size of an akka cluster, assuming there are no JVM-related limits, when the only limit becomes the communication protocol/medium...
I'll be collecting some stats for values of "workCount" that do not cause errors for future comparison...It seems to me that the next thing I should try is to lessen the stress on the coordinator. I'm thinking a better solution would be to plan to have one Coordinator instance on each physical compute node, and have the workers that are started on that physical node register with it... then on the physical node where the Producer is started, also start a router where the routees are created on each of the physical nodes as Coordinators. Then the Producer would send the work to the router which could use RoundRobin (or other policies) to route the work to coordinators.
Finally, as part of the mix of solutions I'll be experimenting with I'd like to also simulate the same basic setup in an MPI-based application, and do some timing comparison just for the heck of it. Its very likely that the MPI solution will perform better (especially when using the highly optimized Cray MPI library over the Gemini router ASIC) but it would be educational to see the magnitude of the difference. I would be interested to explore the possibility of adding MPI-based communication to Akka as an alternative to TCP, if the performance difference is significant.I'll be posting updates on this thread as I have more results.
I'm looking for comments and suggestions based on what I wrote above...Thank you in advance! Again, sorry for the long post.-Boris
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Patrik Nordwall
Typesafe - Reactive apps on the JVM
Twitter: @patriknw
Be aware of that the network roundtrips might dominate when you test with empty work in a way that might not be relevant when the workers do real work.
For those interested the actual code is here: https://github.com/borice/akka-grid-workersI noticed one error when use onSuccess callback. See here http://doc.akka.io/docs/akka/2.3.0/general/jmm.html#Actors_and_shared_mutable_state
long garbage collection pauses can also trigger failure detection
I wonder whether the akka cluster extension in this case could be playing a part in the issues I'm seeing. The heartbeats or gossip messages exchanged between the cluster nodes might add unneeded traffic to a potentially-already-saturated network. I think that the simple solution I presented above could be implemented without using the cluster extension, by just using remote actors. The Producer only needs to know about the Coordinator (and monitor its lifecycle), and similar for the Workers. I wonder what the practical limits are for the maximum size of an akka cluster, assuming there are no JVM-related limits, when the only limit becomes the communication protocol/medium...
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Then you are likely to benchmark the slowness of Java Serialization.
16:07:33.976 [INFO] - delta = 475 ms, received = 40,000 16:07:34.585 [INFO] - delta = 610 ms, received = 60,000 16:07:35.065 [INFO] - delta = 479 ms, received = 80,000
17:46:44.280 [INFO] - delta = 14,290 ms, received = 40,000 17:47:01.804 [INFO] - delta = 17,524 ms, received = 60,000 17:47:19.130 [INFO] - delta = 17,325 ms, received = 80,000 17:47:35.865 [INFO] - delta = 16,736 ms, received = 100,000
Neither networked nor multithreaded applications have constant performance. Thread scheduling artifacts, cache effects, timing issues in handoffs, network congestion, retransmissions etc.
My suggestion is to take out the known slow parts first. Then measure and tune.
Hi Boris,Thanks for packing it in a reproducible sample. As Viktor points out the Java serializer is one of the prime suspect for bottlenecks in remote communication. Here you have found something that I think can be improved in Akka remoting.I can see that major time is consumed in EndpointWriter stashing and unstashing messages. That happens when the transport can't write. Perhaps some internal buffer is full. Please create a ticket and we will look into if we can improve the efficiency of this buffering mechanism.
What you should do is to implement flow control in the application layer to avoid producing faster than what can be consumed.
Neither networked nor multithreaded applications have constant performance. Thread scheduling artifacts, cache effects, timing issues in handoffs, network congestion, retransmissions etc.
I'll test the settings that Endre pointed out and see what difference those settings make.
Thank you all, again, for your help!-Boris
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
there is another setting that controls the thread-pool (dispatcher) for the remoting subsystem (this is independent of the netty pools). You should probably increase the size of that as well. The setting is under: "akka.remote.default-remote-dispatcher", see http://doc.akka.io/docs/akka/2.3.0/scala/remoting.html#Remote_Configuration for the complete configuration.
Since you run everything on localhost fairness issues might dominate the case: some subsystems getting less CPU share since they have less threads to run on, and fairness is enforced by the OS per thread.
--
Here's the fun part... I noticed that if I add a "Thread.sleep(5000)" in the for loop on the sender side after each 100K messages sent (commented out now in the GitHub code) then sending even 1M messages is done at the same timings per 20K as I see when I send just 100K messages (without the thread.sleep).
-Boris--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Is this a hard-to-implement fix?
Anyway, one thing to try is to set "akka.remote.backoff-interval" to a larger value while setting the send-buffer-size to 1024000b. I would try with backoffs 0.5s and 1s. While 1s is not a very good setting, it is a good way to test our hypothesis.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Patrik Nordwall
Typesafe - Reactive apps on the JVM
Twitter: @patriknw
Hello all, we are working on a course project to simulate twitter server and twitter users and test the server for the load it can handle. We have users on one system (with one client master and around 10,000 user actors) and server on another machine(one master and 1000 worker actors) to resolve the queries it gets from the user actors remotely. We are sending an average of 6000 queries/sec to the server. We have one TCP connection between the 2 systems and we get the following warning[TwitterServerSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://TwitterServerSystem...@122.122.122.122:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FTwitterClientSystem%40127.0.0.1%3A56833-0/endpointWriter] [65138] buffered messages in EndpointWriter for [akka.tcp://TwitterClientSystem...@127.0.0.1:56833]. You should probably implement flow control to avoid flooding the remote connection.
# Log warning if the number of messages in the backoff buffer in the endpoint
# writer exceeds this limit. It can be disabled by setting the value to off.
akka.remote.log-buffer-size-exceeding = 50000
Just came across this thread and thought it is relevant to our problem. We know since we have one tcp connection(can we increase the number of connections?), it might be a bottleneck. Both the server and client machines buffer messages for each other.How do we go about using this new fix ?
Thank you Patrik, we are restricting the sender to send lesser queries now.
Could you tell us what exactly is to be used from the fix ? We have akka 2.3.6.
My first attempt was to adapt the Distributed Workers with Akka and Scala example just to see how that performs "out of the box". In the process I've also had to find a solution to be able to get the application deployed to the Blue Waters Torque/PBS batch environment, which wasn't trivial... but I did come up with 2 ways to do that that seem to work quite well (if anyone wants to know how, I could write about that).
#!/bin/bash#PBS -l nodes=50:ppn=16:xe#PBS -l walltime=15:00:00#PBS -l gres=ccm#PBS -N akka-job#PBS -q normal#PBS -o $PBS_JOBID.out#PBS -j oe##PBS -e $PBS_JOBID.err##PBS -V[ -r /opt/modules/default/init/bash ] && source /opt/modules/default/init/bashcd "$PBS_O_WORKDIR"aprun -B ./scripts/mpi_deploy ./scripts/launchJob_mpi.sh