testing akka cluster on Blue Waters @ NCSA

366 views
Skip to first unread message

Boris Capitanu

unread,
Mar 22, 2014, 1:45:18 AM3/22/14
to akka...@googlegroups.com
Hello,

Apologies in advance for the long post... to frame my problem and questions I have to give a lot of contextual info.

I am running a set of experiments to understand the feasibility and performance characteristics of running Akka on the Blue Waters (batch) supercomputer at NCSA.

The general theme for the experiments is to create a map-reduce like system comprised of a producer, one (or more) coordinator(s), and a number of workers. (the reduce step could be optional)
The producer is responsible for generating the "work" which is then sent to the coordinator(s) which in turn assign the work to an idle worker. 
At first I am most concerned with the performance of the solution (rather than fault-tolerance). The only fault-tolerance aspect I'd like to address at first consists of exceptions thrown as part of executing the "work".  When that happens, the "executor" should be restarted and the coordinator informed of the failed job which would get logged and perhaps retried later (up to n times) depending on the exception. Eventually the final solution would include a checkpointing mechanism that would allow the PBS job to be resumed from the last checkpoint in case the it is killed or a malfunction occurs with BlueWaters.

Besides learning about the different options/architectures that are appropriate for an environment such as Blue Waters, the problem I ultimately want to solve is to process approx. 3.2 million gzipped text files (about 3.2TB of gzipped data). The "processing" could be any number of things - feature extraction, OCR error correction, or other text-processing routines as needed.

About my experience: I am no Scala and Akka expert but I do have a decent amount of experience with both. I've taken Martin's functional programming in scala course on Coursera, as well as the reactive programming course by Martin, Erik, and Roland. I've also read through a number of scala and akka books, tutorials, examples...etc. but, as with just about everything else in life, there's always more to learn.

My first attempt was to adapt the Distributed Workers with Akka and Scala example just to see how that performs "out of the box". In the process I've also had to find a solution to be able to get the application deployed to the Blue Waters Torque/PBS batch environment, which wasn't trivial... but I did come up with 2 ways to do that that seem to work quite well (if anyone wants to know how, I could write about that). I did manage to get the example to work - tested it on a small subset of the data (with some changes to WorkProducer) - but I felt that this solution was too overly concerned with fault tolerance at the expense of (some) performance, so I wanted to create another solution where those concerns would be reversed.  (it's still on my TODO list to get some performance measurements of the original example so that I can compare numbers across the different solutions)

The way I'm planning on evaluating the performance of each solution is by having the work producer generate a known quantity of (empty) work as fast as possible, have that work be coordinated by the master over a known number of worker nodes, and have the executor on each worker do nothing else but respond immediately with an (empty) result. From this setup I want to record the timing of the following operations:
  • overall wall clock time from start of work being produced to when the last result was received by the worker actor from the executor (the master and workers would already be up, registered, and ready before the producer starts sending out work) (this time is not affected by other jobs running on the bluewaters system since the work involves no I/O to disk - just network - and the compute nodes where the worker actors are deployed are exclusively dedicated to this task - are not running other user jobs)
  • overall stats about the time elapsed between when a Work object is sent to the master by the producer, and the moment the Master receives it
  • overall stats about the time elapsed between Master sending work to Worker, and Worker responding with WorkIsDone
In (pseudo-scala) code, the solution I have created behaves like this (the relevant parts):

Producer
val numWorkToGenerate = N
for (i <- 1 to numWorkToGenerate) {
   timestamp = getCurrentTimestamp 
   work = new Work(i, timestamp)
   master ! work
}
master ! NoMoreWork

Master
... keep track of workers and what they're working on, and keep a work queue of work that still needs to be assigned ...
 
def receive = {
   case work: Work =>
      record time delta based on current timestamp and timestamp packaged in 'work'
      find idle worker and, if found, update timstamp in 'work' with current timestamp and send 'work' to worker

   case NoMoreWork => if the work queue is empty and none of the workers are working on anything then output all recorded times to log and quit gracefully

   case WorkIsDone =>
      record time delta based on current timestamp and timestamp packaged in 'work'
      if work queue is not empty and there is an idle worker then do the right thing as in the 'case work' above, otherwise check if all work is done and run the NoMoreWork logic
}

Worker
... create an Executor actor that will be used to execute the work and keep track of what it's working on ...
 
def idle = {
   case work: Work =>    
      executor ! work
      context become working
}

def working = {
   case ExecuteDone =>
      master ! WorkDone 
      context become idle
}

Executor
def receive = {
   case work: Work => sender ! ExecuteDone
}

For those interested the actual code is here: https://github.com/borice/akka-grid-workers

When I run this, each physical compute node will run one instance of the Worker app per CPU core;
The Master and Producer will be run on the same compute node.  (other Worker instances will also run on that node, on the 'remaining' CPU cores)

Compared to the original example I started from:
  • my first solution tries to have as little communication as possible
  • there is no acknowledgment from Master to Producer for each 'work' received;  producer operates in a fire-and-forget style
  • the pull-model used in the original example is still there, but has been simplified to be triggered by (not shown above) RegisterWorker, WorkDone, and WorkFailed messages rather than using the WorkerRequestsWork / WorkIsReady mechanism
  • the result of the work is not given to the Master; the master is only informed that the work was done (this is because the simplest version of work may not need to produce a result to someone --- the result could just be an output written to a file or DB or whatever;  i will explore the use of a pub-sub system to publish the result in a future version of the solution)
  • FYI: i did some renaming in the code: Master has been baptized as "Coordinator" and WorkProducer became "Producer"
I'm not concerned with how realistic the above assumptions are right now -- I just want to see what the performance is of the leanest solution i can think of, learn about its shortcomings, and use that to keep iterating to improve on it.  I'm also curious about the reliability of the communication between the compute nodes, and how the akka cluster behaves with no code-added fault tolerance as I scale up the number of nodes. I suspect that on Blue Waters the reliability guarantees are higher than if I were creating an akka cluster over a bunch of commodity computers.

So... finally now I can describe what the problem I'm having is (thanks for reading this far!)

Basically I ran a test of the code on GitHub on BlueWaters on 3 physical nodes, each node having 32 cores (and 64GB RAM).
I'm using the "<role>.min-nr-of-members" config setting to ensure that all nodes that join the akka cluster are transitioned to "Up" only after all workers, coordinator, and producer join the cluster.
I've used the "registerOnMemberUp" callback as the trigger to create the Producer, Coordinator (aka Master), and Worker actors.  When Workers and Producer actors start, they look up the coordinator ActorRef and the Producer uses that as the trigger to start producing the work.
I've used the "producer.workCount" config setting to try different quantities of work and for small values (<100,000), everything works fine. When I tried using workCount=1,000,000 the Producer would start getting disassociated from the cluster -- it would basically detect everyone else as UNREACHABLE, or I would see many entries like the following in the logs:

[ReliableDeliverySupervisor:71] [WARN] - Association with remote system [akka.tcp://cluster@nid25948:38358] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
 
It would seem that either the Producer becomes congested trying to send out the messages to the coordinator, or maybe the coordinator somehow becomes congested in a way that affects the Producer.  It's definitely the case that this solution isn't too viable since a single coordinator will have trouble keeping up with so many messages, especially given that it also needs to be able to exchange messages with the workers (and "in production" there might be thousands of workers, or more).
In theory this should've still worked... as far as I understand it, the actor mailbox size is only limited by the amount of memory available (and I've been generous with that - and saw no OOM messages anywhere or process crashes).
 
I wonder whether the akka cluster extension in this case could be playing a part in the issues I'm seeing.  The heartbeats or gossip messages exchanged between the cluster nodes might add unneeded traffic to a potentially-already-saturated network. I think that the simple solution I presented above could be implemented without using the cluster extension, by just using remote actors. The Producer only needs to know about the Coordinator (and monitor its lifecycle), and similar for the Workers. I wonder what the practical limits are for the maximum size of an akka cluster, assuming there are no JVM-related limits, when the only limit becomes the communication protocol/medium...

I'll be collecting some stats for values of "workCount" that do not cause errors for future comparison...

It seems to me that the next thing I should try is to lessen the stress on the coordinator. I'm thinking a better solution would be to plan to have one Coordinator instance on each physical compute node, and have the workers that are started on that physical node register with it... then on the physical node where the Producer is started, also start a router where the routees are created on each of the physical nodes as Coordinators.  Then the Producer would send the work to the router which could use RoundRobin (or other policies) to route the work to coordinators.

Finally, as part of the mix of solutions I'll be experimenting with I'd like to also simulate the same basic setup in an MPI-based application, and do some timing comparison just for the heck of it.  Its very likely that the MPI solution will perform better (especially when using the highly optimized Cray MPI library over the Gemini router ASIC) but it would be educational to see the magnitude of the difference.  I would be interested to explore the possibility of adding MPI-based communication to Akka as an alternative to TCP, if the performance difference is significant.

I'll be posting updates on this thread as I have more results.
I'm looking for comments and suggestions based on what I wrote above...

Thank you in advance!  Again, sorry for the long post.

-Boris

 
 
 
 
 
 
 
 
 
 

Patrik Nordwall

unread,
Mar 22, 2014, 7:17:15 AM3/22/14
to akka...@googlegroups.com
Hi Boris,


On Sat, Mar 22, 2014 at 6:45 AM, Boris Capitanu <bor...@gmail.com> wrote:
Hello,

Apologies in advance for the long post... to frame my problem and questions I have to give a lot of contextual info.

I am running a set of experiments to understand the feasibility and performance characteristics of running Akka on the Blue Waters (batch) supercomputer at NCSA.

The general theme for the experiments is to create a map-reduce like system comprised of a producer, one (or more) coordinator(s), and a number of workers. (the reduce step could be optional)
The producer is responsible for generating the "work" which is then sent to the coordinator(s) which in turn assign the work to an idle worker. 
At first I am most concerned with the performance of the solution (rather than fault-tolerance). The only fault-tolerance aspect I'd like to address at first consists of exceptions thrown as part of executing the "work".  When that happens, the "executor" should be restarted and the coordinator informed of the failed job which would get logged and perhaps retried later (up to n times) depending on the exception. Eventually the final solution would include a checkpointing mechanism that would allow the PBS job to be resumed from the last checkpoint in case the it is killed or a malfunction occurs with BlueWaters.

Besides learning about the different options/architectures that are appropriate for an environment such as Blue Waters, the problem I ultimately want to solve is to process approx. 3.2 million gzipped text files (about 3.2TB of gzipped data). The "processing" could be any number of things - feature extraction, OCR error correction, or other text-processing routines as needed.

About my experience: I am no Scala and Akka expert but I do have a decent amount of experience with both. I've taken Martin's functional programming in scala course on Coursera, as well as the reactive programming course by Martin, Erik, and Roland. I've also read through a number of scala and akka books, tutorials, examples...etc. but, as with just about everything else in life, there's always more to learn.

My first attempt was to adapt the Distributed Workers with Akka and Scala example just to see how that performs "out of the box". In the process I've also had to find a solution to be able to get the application deployed to the Blue Waters Torque/PBS batch environment, which wasn't trivial... but I did come up with 2 ways to do that that seem to work quite well (if anyone wants to know how, I could write about that). I did manage to get the example to work - tested it on a small subset of the data (with some changes to WorkProducer) - but I felt that this solution was too overly concerned with fault tolerance at the expense of (some) performance, so I wanted to create another solution where those concerns would be reversed.  (it's still on my TODO list to get some performance measurements of the original example so that I can compare numbers across the different solutions)

The way I'm planning on evaluating the performance of each solution is by having the work producer generate a known quantity of (empty) work as fast as possible, have that work be coordinated by the master over a known number of worker nodes, and have the executor on each worker do nothing else but respond immediately with an (empty) result.

Be aware of that the network roundtrips might dominate when you test with empty work in a way that might not be relevant when the workers do real work.
I noticed one error when use onSuccess callback. See here http://doc.akka.io/docs/akka/2.3.0/general/jmm.html#Actors_and_shared_mutable_state 

When I run this, each physical compute node will run one instance of the Worker app per CPU core;
The Master and Producer will be run on the same compute node.  (other Worker instances will also run on that node, on the 'remaining' CPU cores)

Compared to the original example I started from:
  • my first solution tries to have as little communication as possible
  • there is no acknowledgment from Master to Producer for each 'work' received;  producer operates in a fire-and-forget style
  • the pull-model used in the original example is still there, but has been simplified to be triggered by (not shown above) RegisterWorker, WorkDone, and WorkFailed messages rather than using the WorkerRequestsWork / WorkIsReady mechanism
  • the result of the work is not given to the Master; the master is only informed that the work was done (this is because the simplest version of work may not need to produce a result to someone --- the result could just be an output written to a file or DB or whatever;  i will explore the use of a pub-sub system to publish the result in a future version of the solution)
  • FYI: i did some renaming in the code: Master has been baptized as "Coordinator" and WorkProducer became "Producer"
I'm not concerned with how realistic the above assumptions are right now -- I just want to see what the performance is of the leanest solution i can think of, learn about its shortcomings, and use that to keep iterating to improve on it.  I'm also curious about the reliability of the communication between the compute nodes, and how the akka cluster behaves with no code-added fault tolerance as I scale up the number of nodes. I suspect that on Blue Waters the reliability guarantees are higher than if I were creating an akka cluster over a bunch of commodity computers.

So... finally now I can describe what the problem I'm having is (thanks for reading this far!)

Basically I ran a test of the code on GitHub on BlueWaters on 3 physical nodes, each node having 32 cores (and 64GB RAM).
I'm using the "<role>.min-nr-of-members" config setting to ensure that all nodes that join the akka cluster are transitioned to "Up" only after all workers, coordinator, and producer join the cluster.
I've used the "registerOnMemberUp" callback as the trigger to create the Producer, Coordinator (aka Master), and Worker actors.  When Workers and Producer actors start, they look up the coordinator ActorRef and the Producer uses that as the trigger to start producing the work.
I've used the "producer.workCount" config setting to try different quantities of work and for small values (<100,000), everything works fine. When I tried using workCount=1,000,000 the Producer would start getting disassociated from the cluster -- it would basically detect everyone else as UNREACHABLE, or I would see many entries like the following in the logs:

[ReliableDeliverySupervisor:71] [WARN] - Association with remote system [akka.tcp://cluster@nid25948:38358] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].
 
It would seem that either the Producer becomes congested trying to send out the messages to the coordinator, or maybe the coordinator somehow becomes congested in a way that affects the Producer.

yes, you might need to add some flow control, or throttling.
 
 It's definitely the case that this solution isn't too viable since a single coordinator will have trouble keeping up with so many messages, especially given that it also needs to be able to exchange messages with the workers (and "in production" there might be thousands of workers, or more).
In theory this should've still worked... as far as I understand it, the actor mailbox size is only limited by the amount of memory available (and I've been generous with that - and saw no OOM messages anywhere or process crashes).

long garbage collection pauses can also trigger failure detection
 
 
I wonder whether the akka cluster extension in this case could be playing a part in the issues I'm seeing.  The heartbeats or gossip messages exchanged between the cluster nodes might add unneeded traffic to a potentially-already-saturated network. I think that the simple solution I presented above could be implemented without using the cluster extension, by just using remote actors. The Producer only needs to know about the Coordinator (and monitor its lifecycle), and similar for the Workers. I wonder what the practical limits are for the maximum size of an akka cluster, assuming there are no JVM-related limits, when the only limit becomes the communication protocol/medium...

 

I'll be collecting some stats for values of "workCount" that do not cause errors for future comparison...

It seems to me that the next thing I should try is to lessen the stress on the coordinator. I'm thinking a better solution would be to plan to have one Coordinator instance on each physical compute node, and have the workers that are started on that physical node register with it... then on the physical node where the Producer is started, also start a router where the routees are created on each of the physical nodes as Coordinators.  Then the Producer would send the work to the router which could use RoundRobin (or other policies) to route the work to coordinators.

Finally, as part of the mix of solutions I'll be experimenting with I'd like to also simulate the same basic setup in an MPI-based application, and do some timing comparison just for the heck of it.  Its very likely that the MPI solution will perform better (especially when using the highly optimized Cray MPI library over the Gemini router ASIC) but it would be educational to see the magnitude of the difference.  I would be interested to explore the possibility of adding MPI-based communication to Akka as an alternative to TCP, if the performance difference is significant.

I'll be posting updates on this thread as I have more results.

Thanks for an interesting post.

Regards,
Patrik

 
I'm looking for comments and suggestions based on what I wrote above...

Thank you in advance!  Again, sorry for the long post.

-Boris

 
 
 
 
 
 
 
 
 
 

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Boris Capitanu

unread,
Mar 22, 2014, 11:49:42 AM3/22/14
to akka...@googlegroups.com
Hi Patrik,

Thanks for your prompt response!

 
Be aware of that the network roundtrips might dominate when you test with empty work in a way that might not be relevant when the workers do real work.

Right, I realize that, but I just wanted to get some baseline timing information for the absolute minimum amount of time needed to run X (empty) jobs.  I guess I was just trying to measure the overhead of the system.  I could simulate work taking some (fixed) time by inserting a (i know, bad!) Thread.sleep in the executor... I suppose that might make the system more fluent since the network will become less congested.  I'll try that and see what effect it has...


For those interested the actual code is here: https://github.com/borice/akka-grid-workers

I noticed one error when use onSuccess callback. See here http://doc.akka.io/docs/akka/2.3.0/general/jmm.html#Actors_and_shared_mutable_state 

Thank you, fixed.  Given the way messages were exchanged I didn't think that was a problem, but I was just being lazy... :)


long garbage collection pauses can also trigger failure detection

That makes sense... 
 
 
 
I wonder whether the akka cluster extension in this case could be playing a part in the issues I'm seeing.  The heartbeats or gossip messages exchanged between the cluster nodes might add unneeded traffic to a potentially-already-saturated network. I think that the simple solution I presented above could be implemented without using the cluster extension, by just using remote actors. The Producer only needs to know about the Coordinator (and monitor its lifecycle), and similar for the Workers. I wonder what the practical limits are for the maximum size of an akka cluster, assuming there are no JVM-related limits, when the only limit becomes the communication protocol/medium...


Very cool!  I'll try to use some of the same tuning adjustments to the configuration settings that were described in that article. They will likely alleviate some of the issues I'm seeing...
I'll see if I'll be able to run some tests across a > 2400 node akka cluster... if I do, I'll write about it here.

Thanks again!   Scala + Akka is an absolute pleasure to work with... 

-Boris

Patrik Nordwall

unread,
Mar 22, 2014, 2:19:42 PM3/22/14
to akka...@googlegroups.com
I'm glad to hear that. Looking forward to see your results, or questions.
/Patrik

Boris Capitanu

unread,
Mar 23, 2014, 6:39:31 PM3/23/14
to akka...@googlegroups.com
Hi Patrik,

After further testing, I'm running into an issue I can't explain and I'm hoping you could shed some light on it...
The simplest form that showcases the problem is the following setup:

Two actors... a Sender and a Receiver, using RemoteActorRefProvider.  The receiver is started on a known port and waits to receive "case class Message(id: Int)" messages.
When the sender starts (as a different JVM process), it looks up the ActorRef of the receiver, and then does: for i=1 to N { receiver ! Message(i) }

What I'm seeing is that if I look at the amount of time it takes to receive 20K messages, that time depends on the value of N (which it shouldn't). 
For example, when N=100,000,  it takes about 500ms - 600ms to receive 20K messages.   When N=800,00, it takes 6-7 seconds (10x increase) per 20K messages ....
The other weird thing is that the later batches of 20K messages are received progressively faster.

If you look at the GitHub link above, the README.md file contains example log output after running the application for different values of N. 
I've ran both sender and receiver through a profiler (YourKit profiler) and here's what I'm seeing:

For N = 100,000


For N = 800,000 (stopped capturing profiler data after receiver received 300,000 messages)

From the socket send/receive graphs it looks like the sender isn't sending the data out as fast as the network can handle... (in other words, I'm inclined to think that something in the sending side is not taking advantage of all the network bandwidth available.  The CPU usage was not being maxed out either... the garbage collector wasn't working overly hard... so I'm not sure I understand why using a different value for N (the number of messages sent) matters to the amount of time each block of messages is received.

Does this make sense to you?

One other usage for Akka I was planning is to retrieve streaming data from a Microsoft Kinect from a service sending it via UDP, then do some calculations on it, and send it back out to someone else via UDP.   So this would be a live continuous stream of data coming at (for now) 30 readings/second... but soon perhaps at 100 readings/second.

Thanks very much for your help!!!

Best,
Boris

√iktor Ҡlang

unread,
Mar 23, 2014, 7:14:33 PM3/23/14
to Akka User List
Hi Boris,

what serializer are you using for your messages?


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

———————
Viktor Klang
Chief Architect - Typesafe

Twitter: @viktorklang

Boris Capitanu

unread,
Mar 23, 2014, 8:51:06 PM3/23/14
to akka...@googlegroups.com
Hello Viktor,

I am using whatever default serializer Akka comes with.  I have not configured anything special for that.

-Boris

√iktor Ҡlang

unread,
Mar 23, 2014, 8:58:22 PM3/23/14
to Akka User List
Hello Boris,

Then you are likely to benchmark the slowness of Java Serialization.

To maximize performance, one needs to choose an appropriate serializer.


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Boris Capitanu

unread,
Mar 23, 2014, 9:19:45 PM3/23/14
to akka...@googlegroups.com

Then you are likely to benchmark the slowness of Java Serialization.


Viktor, thank you for your feedback.  I'm not convinced the serializer is at fault with this.  Regardless of how inefficient the serializer is, it should not take a variable amount of time to serialize identical objects. For example,

when the sender is told to send 100,000 messages to receiver:
16:07:33.976 [INFO] - delta = 475 ms, received = 40,000
16:07:34.585 [INFO] - delta = 610 ms, received = 60,000
16:07:35.065 [INFO] - delta = 479 ms, received = 80,000
a log message is generated once for each 20K messages the receiver receives;  the delta value is the time elapsed since last log message was printed (so it shows how long it took to receive the "next" 20K messages);   given that all the messages are of the same size (as in "case class Message(id :Int)"), I would expect the time it takes for the receiver to receive 20K messages should be more or less constant;  compare the above with:
17:46:44.280 [INFO] - delta = 14,290 ms, received = 40,000
17:47:01.804 [INFO] - delta = 17,524 ms, received = 60,000
17:47:19.130 [INFO] - delta = 17,325 ms, received = 80,000
17:47:35.865 [INFO] - delta = 16,736 ms, received = 100,000

which is output generated in a different run, when the sender was told to send 1,600,000 messages to the receiver.

In the former case each 25K set of messages is received in ~600ms... while in the latter case, over 16 seconds.
I'm not seeing how the difference can be explained by the serializer used.  Even if a single message is serialized in a long time, it should be the same amount of "long" time regardless of how many messages the sender is told to send to receiver.

Am I missing something?

Thank you!

-Boris


Patrik Nordwall

unread,
Mar 24, 2014, 3:30:28 AM3/24/14
to akka...@googlegroups.com
Hi Boris,

Thanks for packing it in a reproducible sample. As Viktor points out the Java serializer is one of the prime suspect for bottlenecks in remote communication. Here you have found something that I think can be improved in Akka remoting.

I can see that major time is consumed in EndpointWriter stashing and unstashing messages. That happens when the transport can't write. Perhaps some internal buffer is full. Please create a ticket and we will look into if we can improve the efficiency of this buffering mechanism.

What you should do is to implement flow control in the application layer to avoid producing faster than what can be consumed.

and also, implement a good serializer for your messages.

/Patrik

√iktor Ҡlang

unread,
Mar 24, 2014, 5:01:55 AM3/24/14
to Akka User List

Neither networked nor multithreaded applications have constant performance. Thread scheduling artifacts, cache effects, timing issues in handoffs, network congestion, retransmissions etc.

My suggestion is to take out the known slow parts first. Then measure and tune.

Endre Varga

unread,
Mar 24, 2014, 5:19:12 AM3/24/14
to akka...@googlegroups.com
On Mon, Mar 24, 2014 at 8:30 AM, Patrik Nordwall <patrik....@gmail.com> wrote:
Hi Boris,

Thanks for packing it in a reproducible sample. As Viktor points out the Java serializer is one of the prime suspect for bottlenecks in remote communication. Here you have found something that I think can be improved in Akka remoting.

I can see that major time is consumed in EndpointWriter stashing and unstashing messages. That happens when the transport can't write. Perhaps some internal buffer is full. Please create a ticket and we will look into if we can improve the efficiency of this buffering mechanism.

The TCP write buffers should be probably increased in this case because the EndpointWriter waits for Netty to pump out data in the background. This setting is at "akka.remote.netty.tcp.send-buffer-size", by default it is 256000b, which is quite small. Try to increase it first and see what happens.

Also the thread-pool settings for Netty in  "akka.remote.netty.tcp.server-socket-worker-pool" and "akka.remote.netty.tcp.client-socket-worker-pool" allocate only two threads by default, increase it and see what happens.
 

What you should do is to implement flow control in the application layer to avoid producing faster than what can be consumed.

That is another important piece of course.

-Endre

Boris Capitanu

unread,
Mar 24, 2014, 9:54:28 AM3/24/14
to akka...@googlegroups.com

Neither networked nor multithreaded applications have constant performance. Thread scheduling artifacts, cache effects, timing issues in handoffs, network congestion, retransmissions etc.

Yes, that all makes sense...  except that the network congestion and retransmission part should not have been an issue in my experiment.  The runs were done on the same machine in different JVM using "localhost", so there shouldn't be a need for TCP to retransmit anything... and according to the profiler the "network" was not anywhere close to being maxed out (looking at the socket read and write graphs).  

I'll test the settings that Endre pointed out and see what difference those settings make.

Thank you all, again, for your help!

-Boris


Akka Team

unread,
Mar 24, 2014, 10:17:15 AM3/24/14
to Akka User List
Hi Boris,

I'll test the settings that Endre pointed out and see what difference those settings make.

Please report back what you have found. Also, I forgot to mention, but there is another setting that controls the thread-pool (dispatcher) for the remoting subsystem (this is independent of the netty pools). You should probably increase the size of that as well. The setting is under: "akka.remote.default-remote-dispatcher", see  http://doc.akka.io/docs/akka/2.3.0/scala/remoting.html#Remote_Configuration for the complete configuration.

Since you run everything on localhost fairness issues might dominate the case: some subsystems getting less CPU share since they have less threads to run on, and fairness is enforced by the OS per thread. I am interested in the results!

-Endre
 

Thank you all, again, for your help!

-Boris


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

Boris Capitanu

unread,
Mar 24, 2014, 11:02:26 AM3/24/14
to akka...@googlegroups.com
Hi Endre,

My preliminary tests playing with the following settings show no improvement (but in some cases even worse performance):

      send-buffer-size = X
      server-socket-worker-pool = {
        pool-size-min = Y
        pool-size-max = Y
      }
      client-socket-worker-pool = {
        pool-size-min = Z
        pool-size-max = Z
      }

I tried different values for X, Y, Z, such as X = 256000b,  X=512000b, X=1024000b... and Y, Z = 4 and 8
I tried the settings separately, and together... none of the combinations I tried improved the performance of what I've been seeing before.

It looked that whenever I was increasing the value for X, the performance was actually getting worse.  While the changes to Y and Z in isolation made no observable difference in the timings recorded.

there is another setting that controls the thread-pool (dispatcher) for the remoting subsystem (this is independent of the netty pools). You should probably increase the size of that as well. The setting is under: "akka.remote.default-remote-dispatcher", see  http://doc.akka.io/docs/akka/2.3.0/scala/remoting.html#Remote_Configuration for the complete configuration.


I will try that and report back.
 
Since you run everything on localhost fairness issues might dominate the case: some subsystems getting less CPU share since they have less threads to run on, and fairness is enforced by the OS per thread. 

Well... again, the sender and receiver were ran in two separate JVM processes, so the subsystems in the two different processes shouldn't be competing with each other based on the configuration limits imposed by the akka configuration used.  The other problem might be competition for CPU time on the machine itself... but if you look at the CPU utilization graphs in the profiler, the CPU is also not nearly used at it's max capacity. 

I'll try the dispatcher setting and report back...

Oh, what I forgot to mention is that none of the hypothesis we explored so far seem to explain (in my mind at least) why the later sets of 20K messages are being received progressively faster (more sample run outputs to see that are available in the README on GitHub).  In a standard producer-consumer scenario where the producer is the sender actor (which produced 1.6M messages) and the consumer is the akka remoting subsystem (which consumes data from that queue to send out over the network), I would expect the consumer to behave the same way while draining messages from the queue.

Regarding flow control --- I don't want to get to that yet... I'm not even sure how I would know how to detect/trigger flow control. In (oversimplified) traditional flow control the receiver detects that it's being overwhelmed and informs the sender to rate-limit its output until the receiver is capable of processing more messages...etc. In my case the receiver actor (the application layer) would gladly consume as much data as it gets.... it's not being overwhelmed to the point where it should/could be sending a message to the sender to tell it to "slow down". 

Here's the fun part... I noticed that if I add a "Thread.sleep(5000)" in the for loop on the sender side after each 100K messages sent (commented out now in the GitHub code) then sending even 1M messages is done at the same timings per 20K as I see when I send just 100K messages (without the thread.sleep).  

-Boris

Patrik Nordwall

unread,
Mar 24, 2014, 11:04:40 AM3/24/14
to akka...@googlegroups.com
I have created the ticket that I still think is valid: https://www.assembla.com/spaces/akka/tickets/3960

/Patrik

Boris Capitanu

unread,
Mar 24, 2014, 11:19:58 AM3/24/14
to akka...@googlegroups.com
Thank you, Patrik.

I think you hit the nail in the head with that ticket.  I wanted to write earlier that this felt like behavior you see when you have two nested "for" loops:

for i = N to 1
  for j = 1 to N 
    do_something

The stash-unstash cycle you pointed out effectively behaves like the above example... and the reason later messages get sent out faster is because there is progressively less stashing to do.

Is this a hard-to-implement fix?

-Boris

Endre Varga

unread,
Mar 24, 2014, 11:21:12 AM3/24/14
to akka...@googlegroups.com
Hi Boris,

Thank you for trying out! Patrik I think figured out the reason (described in the ticket). This is very much related to backpressure since the messages get piling up in the EndpointWriter which continuosly stashes and unstashes an ever growing amount of messages (gets to O(n^2) eventually). This behavior will not happen if messages are backpressured (that keeps unstash work constant).

Anyway, one thing to try is to set "akka.remote.backoff-interval" to a larger value while setting the send-buffer-size to 1024000b. I would try with backoffs 0.5s and 1s. While 1s is not a very good setting, it is a good way to test our hypothesis.

-Endre


--

Boris Capitanu

unread,
Mar 24, 2014, 11:22:19 AM3/24/14
to akka...@googlegroups.com
Oops... I meant the second "for" was supposed to be "for j = 1 to i"   (not to N) :-)  

Patrik Nordwall

unread,
Mar 24, 2014, 11:24:16 AM3/24/14
to akka...@googlegroups.com
On simple flow control is to let the producer wait for an ack from the consumer after let's say 50000 messages. A bit more advanced is to allow a few unacknowledged batches, so that you always have messages in flight, but not too much.
 

Here's the fun part... I noticed that if I add a "Thread.sleep(5000)" in the for loop on the sender side after each 100K messages sent (commented out now in the GitHub code) then sending even 1M messages is done at the same timings per 20K as I see when I send just 100K messages (without the thread.sleep).  

yes, that is also a primitive flow control (throttling)
 

-Boris

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Endre Varga

unread,
Mar 24, 2014, 11:24:51 AM3/24/14
to akka...@googlegroups.com
Hi Boris,


Yes, I think this is the reason. Setting the backoff will reduce this somewhat, but it is not a fix -- just a way to quickly check what is happening.
 

Is this a hard-to-implement fix?

Not any more. It is hard to fix in 2.2.x because we relied on the mailbox's property to keep messages during restart of an actor to not drop messages betwen reconnect attempts. Since now Gate is the only reconnect behavior and that drops, we no longer need Stash.

-Endre

Boris Capitanu

unread,
Mar 24, 2014, 11:55:15 AM3/24/14
to akka...@googlegroups.com
Anyway, one thing to try is to set "akka.remote.backoff-interval" to a larger value while setting the send-buffer-size to 1024000b. I would try with backoffs 0.5s and 1s. While 1s is not a very good setting, it is a good way to test our hypothesis.

I've used the backoff-interval = 0.5s and send-buffer-size=1024000b and I do see the timings becoming more consistent (albeit worse).  
The standard deviation of the timings observed is much lower.

Well - I think we narrowed down the issue.  I'll wait for a fix...  I'll be glad to test any nightly builds that include a fix if it would be helpful.

-Boris

√iktor Ҡlang

unread,
Mar 24, 2014, 12:00:03 PM3/24/14
to Akka User List
Nice!


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Patrik Nordwall

unread,
Apr 25, 2014, 9:48:55 AM4/25/14
to akka...@googlegroups.com
Boris, you should try the timestamped snapshot 2.3-20140425-151510 that is published to repo http://repo.akka.io/snapshots/

It is supposed to handle bursts of many messages without (much) degraded throughput or false failure detection. More details here: https://groups.google.com/d/msg/akka-dev/mFvz_d737t4/pZSmbFRLAV8J

Regards,
Patrik

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Nidhi

unread,
Nov 23, 2014, 7:32:02 PM11/23/14
to akka...@googlegroups.com
Hello all, we are working on a course project to simulate twitter server and twitter users and test the server for the load it can handle. We have users on one system (with one client master and around 10,000 user actors) and server on another machine(one master and 1000 worker actors) to resolve the queries it gets from the user actors remotely. We are sending an average of 6000 queries/sec to the server. We have one TCP connection between the 2 systems and we get the following warning 

[TwitterServerSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://TwitterServerSystem...@122.122.122.122:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FTwitterClientSystem%40127.0.0.1%3A56833-0/endpointWriter] [65138] buffered messages in EndpointWriter for [akka.tcp://TwitterClientSystem...@127.0.0.1:56833]. You should probably implement flow control to avoid flooding the remote connection.

Just came across this thread and thought it is relevant to our problem. We know since we have one tcp connection(can we increase the number of connections?), it might be a bottleneck. Both the server and client machines buffer messages for each other. 

How do we go about using this new fix ?

Thank you.

Regards,
Nidhi

Patrik Nordwall

unread,
Nov 25, 2014, 2:46:44 AM11/25/14
to akka...@googlegroups.com
On Mon, Nov 24, 2014 at 1:32 AM, Nidhi <nidhi.ashwa...@gmail.com> wrote:
Hello all, we are working on a course project to simulate twitter server and twitter users and test the server for the load it can handle. We have users on one system (with one client master and around 10,000 user actors) and server on another machine(one master and 1000 worker actors) to resolve the queries it gets from the user actors remotely. We are sending an average of 6000 queries/sec to the server. We have one TCP connection between the 2 systems and we get the following warning 

[TwitterServerSystem-akka.remote.default-remote-dispatcher-6] [akka.tcp://TwitterServerSystem...@122.122.122.122:2552/system/endpointManager/reliableEndpointWriter-akka.tcp%3A%2F%2FTwitterClientSystem%40127.0.0.1%3A56833-0/endpointWriter] [65138] buffered messages in EndpointWriter for [akka.tcp://TwitterClientSystem...@127.0.0.1:56833]. You should probably implement flow control to avoid flooding the remote connection.


This warning indicates that many remote messages have been queued on the sender side. It is configured by:

    # Log warning if the number of messages in the backoff buffer in the endpoint

    # writer exceeds this limit. It can be disabled by setting the value to off.

    akka.remote.log-buffer-size-exceeding = 50000

 
The recommendation about flow control means that you should add some application level protocol (messages) between sender and receiver that controls how much the sender is allowed to produce before it stops sending more. Without that you will get out of memory if the sender continues to produce faster than what can be consumed.


Just came across this thread and thought it is relevant to our problem. We know since we have one tcp connection(can we increase the number of connections?), it might be a bottleneck. Both the server and client machines buffer messages for each other. 

How do we go about using this new fix ?

The improvements for sending bursts of remote messages have been included in Akka 2.3.x since 2.3.3.

Regards,
Patrik

Nidhi

unread,
Nov 25, 2014, 3:35:07 PM11/25/14
to akka...@googlegroups.com
Thank you Patrik, we are restricting the sender to send lesser queries now. Could you tell us what exactly is to be used from the fix ? We have akka 2.3.6. 

Patrik Nordwall

unread,
Nov 26, 2014, 7:50:42 AM11/26/14
to akka...@googlegroups.com
On Tue, Nov 25, 2014 at 9:35 PM, Nidhi <nidhi.ashwa...@gmail.com> wrote:
Thank you Patrik, we are restricting the sender to send lesser queries now.

Sounds good.
 
Could you tell us what exactly is to be used from the fix ? We have akka 2.3.6. 

There is nothing you need to enable to use the improvements, apart from using Akka 2.3.3 or later. Latest is 2.3.7 (always recommended to use latest stable). Details can be found in the pull request, if you are interested.

I think I published some benchmark results here: https://groups.google.com/forum/#!msg/akka-dev/mFvz_d737t4/pZSmbFRLAV8J

Note that serialization is often a bottleneck. In case you use (default) Java serialization you should use something faster.

Let me know if I misunderstood your question.

Regards,
Patrik

Michał Knapik

unread,
Jul 24, 2015, 4:06:40 AM7/24/15
to Akka User List, bor...@gmail.com
Hi Boris,

My first attempt was to adapt the Distributed Workers with Akka and Scala example just to see how that performs "out of the box". In the process I've also had to find a solution to be able to get the application deployed to the Blue Waters Torque/PBS batch environment, which wasn't trivial... but I did come up with 2 ways to do that that seem to work quite well (if anyone wants to know how, I could write about that). 

I have to deploy Akka Work Stealing system on PBS managed mainframe. Could you write more about the deployment process?

Best Regards,
Michał

Boris Capitanu

unread,
Jul 29, 2015, 9:58:43 AM7/29/15
to Michał Knapik, Akka User List
Hi Michał,

There are 2 approaches I discovered for running a Akka-based solution on a PBS-managed mainframe.

My Akka-based app was made up for 3 parts: a coordinator, a producer, and a number of workers. 
The coordinator is the one that becomes the “master” node to which all other nodes connect to, to form the Akka cluster.


Option 1: Via MPI
————————

In this mode the trick is to use a custom mpi_deploy helper and the PMI_FORK_RANK environment variable to coordinate what process you need to start on the nodes.
I use rank = 0 to start the “master” process, rank = 1 to start the “producer”, and rank > 1 to start “workers”.

file: akka_mpi.pbs

#!/bin/bash
#PBS -l nodes=50:ppn=16:xe
#PBS -l walltime=15:00:00
#PBS -l gres=ccm
#PBS -N akka-job
#PBS -q normal
#PBS -o $PBS_JOBID.out
#PBS -j oe
##PBS -e $PBS_JOBID.err
##PBS -V

[ -r /opt/modules/default/init/bash ] && source /opt/modules/default/init/bash

cd "$PBS_O_WORKDIR"

aprun -B ./scripts/mpi_deploy ./scripts/launchJob_mpi.sh

note that in the above we requested 50 nodes with 16 processor cores per node for a total of 50*16=800 cores.
In this case, the PMI_FORK_RANK environment variable will take values between 0 and 799, and each node will have 16 processes started.
If you only want to start 1 process per node (let’s assume your process already knows how to take advantage of all cores of a node), then you can alter the ppn=16 to ppn=1 in your PBS request. You still get 50 nodes, but you’ll only get one process started per node and your PMI_FORK_RANK will take values from 0 to 49.

file: scripts/mpi_deploy   (compiled from attached mpi_deploy.c)

mpi_deploy.zip
Reply all
Reply to author
Forward
0 new messages