Trying to understand a sudden drop in throughput with Akka IO

358 views
Skip to first unread message

Soumya Simanta

unread,
Dec 20, 2014, 5:55:19 AM12/20/14
to akka...@googlegroups.com

I'm asking this question here because Rediscala uses Akka IO. So maybe someone here could provide some insight into what's going on. 

I'm trying to run a simple test by putting 1 million keys-values into Redis (running on the same machine). For 100,000 keys it is really fast. 

However, the performance degrades a lot when I bump the number of keys to 1 million. The max. heap space is 12G and I'm running this on a Macbook pro. 

As you can see the network write drops significantly after sometime. Not sure what's going on here. Any help would be really appreciated. 

NOTE: The time measurements are just to get an estimate. I plan to use a micro benchmark library for my final measurements. 

I'm using the following versions:

"com.etaty.rediscala" %% "rediscala" % "1.4.0" 
scalaVersion := "2.11.4"

Here is the code.

package redisbenchmark

import akka.util.ByteString

import scala.concurrent.{Future}
import redis.RedisClient

import java.util.UUID

object RedisLocalPerf {

def main(args:Array[String]) = {
implicit val akkaSystem = akka.actor.ActorSystem()

var numberRuns = 1000 //default to 100
var size = 1
if( args.length == 1 )
  numberRuns = Integer.parseInt(args(0))


val s = """How to explain ZeroMQ? Some of us start by saying all the wonderful things it does. It's sockets on steroids. It's like mailboxes with routing. It's fast! Others try to share their moment of enlightenment, that zap-pow-kaboom satori paradigm-shift moment when it all became obvious. Things just become simpler. Complexity goes away. It opens the mind. Others try to explain by comparison. It's smaller, simpler, but still looks familiar. Personally, I like to remember why we made ZeroMQ at all, because that's most likely where you, the reader, still are today.How to explain ZeroMQ? Some of us start by saying all the wonderful things it does. It's sockets on steroids. It's like mailboxes with routing. It's fast! Others try to share their moment of enlightenment, that zap-pow-kaboom satori paradigm-shift moment when it all became obvious. Things just become simpler. Complexity goes away. It opens the mind. Others try to explain by comparison. It's smaller, simpler, but still looks familiar. Personally, I like to remember why we made ZeroMQ at all, because that's most likely where"""

val msgSize = s.getBytes.size
val redis = RedisClient()
implicit val ec = redis.executionContext

val futurePong = redis.ping()
println("Ping sent!")
futurePong.map(pong => {
  println(s"Redis replied with a $pong")
})

val random = UUID.randomUUID().toString
val start = System.currentTimeMillis()
val result: Seq[Future[Boolean]] = for {i <- 1 to numberRuns} yield {
  redis.set(random + i.toString, ByteString(s))
}
val res: Future[List[Boolean]] = Future.sequence(result.toList)
val end = System.currentTimeMillis()
val diff = (end - start)
println(s"for msgSize $msgSize and numOfRuns [$numberRuns] time is $diff ms ")
akkaSystem.shutdown()

}

}




Akka Team

unread,
Dec 20, 2014, 11:35:22 AM12/20/14
to Akka User List
Hi,

My personal guess is that since you don't obey any backpressure when you start flooding the redis client with requests you end up with a lot of queued messages and probably high GC pressure. You can easily test this by looking at the memory profile of your test.



On Sat, Dec 20, 2014 at 6:55 AM, Soumya Simanta <soumya....@gmail.com> wrote:
val res: Future[List[Boolean]] = Future.sequence(result.toList) val end = System.currentTimeMillis() val diff = (end - start) println(s"for msgSize $msgSize and numOfRuns [$numberRuns] time is $diff ms ") akkaSystem.shutdown()

}

What does the above code intend to measure? Didn't you want to actually wait on the "res" future?

-Endre

 

}




--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

Soumya Simanta

unread,
Dec 20, 2014, 2:07:53 PM12/20/14
to akka...@googlegroups.com
Endre, thank you for responding. Following is what the author of Rediscala has to say. 

"Yes i noticed it during my tests, at some point the scale is exponential (bad).

I suspected the thread scheduler to be the limitation.
Or the way Future.sequence works.

If you can isolate a test that scale linearly up to 1M of futures, I would be interested to see it. By replacing akka-io with another java.nio library (xnio) I was able to pass the 1M req (at the speed of around 500k req/s)" 

https://github.com/etaty/rediscala/issues/67

If replacing akka-io with java.nio resolves this then either akka-io is not used correctly in Rediscala OR it is a fundamental limitation of akka-io. 


My other responses inline. 


On Saturday, December 20, 2014 6:35:22 AM UTC-5, Akka Team wrote:
Hi,

My personal guess is that since you don't obey any backpressure when you start flooding the redis client with requests you end up with a lot of queued messages and probably high GC pressure. You can easily test this by looking at the memory profile of your test.


Yes, the memory pressure is indeed high. The young generation (Edge) space fills up very quickly and then a minor GC is kicked off. 
Can I use akka-streams to resolve and add backpressure here? Any pointers here will be greatly appreciated. 
 


On Sat, Dec 20, 2014 at 6:55 AM, Soumya Simanta <soumya....@gmail.com> wrote:
val res: Future[List[Boolean]] = Future.sequence(result.toList) val end = System.currentTimeMillis() val diff = (end - start) println(s"for msgSize $msgSize and numOfRuns [$numberRuns] time is $diff ms ") akkaSystem.shutdown()

}

What does the above code intend to measure? Didn't you want to actually wait on the "res" future?

Yes, you are correct again. I should be waiting on res in order to get an estimate of overall latency. 
 

Soumya Simanta

unread,
Dec 21, 2014, 4:55:01 PM12/21/14
to akka...@googlegroups.com
Here is my attempt to create a version with back pressure with Reactive Stream. Not sure if it completely correct or not. Can someone please verify if the code below is correct? 
Even with this version I don't see any change is throughput and the network IO graph looks very similar to what I had without using reactive streams. 

On the other hand if I use 100 Rediscala client actors the inserts of much faster. I understand that now there are 100 queues (mailboxes) and therefore its faster. But I still don't understand why the performance is so bad for a single client after a certain threshold, even after using back pressure (assuming I'm using Akka streams correctly).


Code with Akka streams and one Rediscala client. 

import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Source
import akka.util.ByteString
import redis.RedisClient

object RedisStreamClient extends App {

  val message = """How to explain ZeroMQ? Some of us start by saying all the wonderful things it does. It's sockets on steroids. It's like mailboxes with routing. It's fast! Others try to share their moment of enlightenment, that zap-pow-kaboom satori paradigm-shift moment when it all became obvious. Things just become simpler. Complexity goes away. It opens the mind. Others try to explain by comparison. It's smaller, simpler, but still looks familiar. Personally, I like to remember why we made ZeroMQ at all, because that's most likely where you, the reader, still are today.How to explain ZeroMQ? Some of us start by saying all the wonderful things it does. It's sockets on steroids. It's like mailboxes with routing. It's fast! Others try to share their moment of enlightenment, that zap-pow-kaboom satori paradigm-shift moment when it all became obvious. Things just become simpler. Complexity goes away. It opens the mind. Others try to explain by comparison. It's smaller, simpler, but still looks familiar. Personally, I like to remember why we made ZeroMQ at all, because that's most likely where"""

  implicit val system = ActorSystem("Sys")

  implicit val materializer = FlowMaterializer()

  val msgSize = message.getBytes.size

  val redis = RedisClient()
  implicit val ec = redis.executionContext
  val random = UUID.randomUUID().toString

  val source = Source( () => (1 to 1000000).iterator )
  source.map{  x => x + 1 }.foreach( x => redis.set(random+x.toString, ByteString(message) ) ).onComplete( _ => system.shutdown())

}




Code for with 100 Rediscala clients.

import akka.actor.{ActorLogging, Props, Actor}
import akka.util.ByteString
import redisbenchmark.RedisBenchmarkActor.InsertValues

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import redis.RedisClient

import java.util.UUID


object RedisLocalPerfMultipleActors {


  def main(args: Array[String]) : Unit = {
    implicit val akkaSystem = akka.actor.ActorSystem()
    //create 100 RedisClient actors
    val actors = 1 to 100
    actors.map{ x => akkaSystem.actorOf(Props(new RedisBenchmarkActor(10000)), "actor"+x) }.map{ actor => actor ! InsertValues}

    //TODO shutdown the actor system
    //Not sure how to wait for all Futures to complete before shutting down the actor system

  }

}


class RedisBenchmarkActor(runs: Int) extends Actor with ActorLogging {

  val redis = RedisClient()
  //implicit val ec = redis.executionContext
  log.info(s"Actor created with $runs ")

  def receive = {

    case InsertValues => {

      log.info("Inserting values ")

      val random = UUID.randomUUID().toString
      val start = System.currentTimeMillis()
      val result: Seq[Future[Boolean]] = for {i <- 1 to runs} yield {
        //log.info("sending values ....")
        redis.set(random + i.toString, ByteString(RedisBenchmarkActor.message))
      }

    }
  }

}


object RedisBenchmarkActor {

  object InsertValues

  val message = """How to explain ZeroMQ? Some of us start by saying all the wonderful things it does. It's sockets on steroids. It's like mailboxes with routing. It's fast! Others try to share their moment of enlightenment, that zap-pow-kaboom satori paradigm-shift moment when it all became obvious. Things just become simpler. Complexity goes away. It opens the mind. Others try to explain by comparison. It's smaller, simpler, but still looks familiar. Personally, I like to remember why we made ZeroMQ at all, because that's most likely where you, the reader, still are today.How to explain ZeroMQ? Some of us start by saying all the wonderful things it does. It's sockets on steroids. It's like mailboxes with routing. It's fast! Others try to share their moment of enlightenment, that zap-pow-kaboom satori paradigm-shift moment when it all became obvious. Things just become simpler. Complexity goes away. It opens the mind. Others try to explain by comparison. It's smaller, simpler, but still looks familiar. Personally, I like to remember why we made ZeroMQ at all, because that's most likely where"""

}


Soumya Simanta

unread,
Dec 22, 2014, 3:53:20 AM12/22/14
to akka...@googlegroups.com
Looks like my akka-streams code was not doing back pressure. Not sure how I can change it handle back pressure. 

Then I changed my code to the following. I borrowed the code from one of the Akka stream activator examples (WritePrimes). I added a buffer in between that also helped significantly. 


  val maxRandomNumberSize = 1000000
  val randomSource = Source(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize)))

  def insertValues : Flow[Int,Boolean] = {
    Flow[Int].mapAsyncUnordered(k => redis.set(k + random, message))
  }

  val blackhole = BlackholeSink

  //val stream  = source.via(insertValues).runWith(blackhole) //No buffer
  val streamWithRandomSource = randomSource.buffer(20000, OverflowStrategy.backpressure).via(insertValues).runWith(blackhole)

Not the network IO looks much more uniform. It's a pleasure to see back pressure work (visually) :-)



I did see my CPU usage bump up in this version. Any reason why ? 

Akka Team

unread,
Dec 22, 2014, 8:56:30 AM12/22/14
to Akka User List
Hi Soumya

First of all, the performance of Akka IO (the original actor based one) might be slow or fast, but it does not degrade if writes are properly backpressured. Also it does not use Futures at all, so I guess this is an artifact of how you drive it.

Now your first reactive-streams approach didn't work because the "map" stage that created an actor already let in the next element as soon as the actor was created. What you want is to let in the next element after the elements has been written. In other words you just created an ever growing number of actors without waiting for the write to complete.

Your second solution is correct though because mapAsyncUnordered only lets in the next elements when the passed future completes -- which in your case corresponds to a finished write.

As for the CPU usage, without the actual profile it doesn't say too much. For example if your ForkJoinPool (the default dispatcher) has not much work to do, it will spend a lot of time in its work stealing cycle (scan()) because there is no work to steal. This is purely an artifact of that pool and has nothing to to with actual CPU usage of the application. You can try with an Executor based pool if you want to test this.

If you really want to play around to see how much throughput is possible, you should try the following approaches step by step:

 - Increase the throughput setting of the dispatcher that executes the stream and redis client. You can try values from 100 to even 1000
 - Increase the default buffer size of the stream materializer (you can pass a MaterializerSettings object). You should try buffer sizes of 32, 64, 128.

Btw, streams are currently not optimized at all, so don't get overly high expectations yet :)

-Endre

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Soumya Simanta

unread,
Dec 22, 2014, 11:40:30 PM12/22/14
to akka...@googlegroups.com
Endre, 

Thank you for taking the time to explain everything. It was really helpful not only in understanding the streams basics but also to create a better/faster version of what I'm trying to do. 
Before I go any further I want to say that I love Akka streams and it is going to be a useful API for a lot of my future work. Thanks to the Akka team. 

I tweaked both the dispatchers settings as well as the type of dispatcher used by default dispatcher. The program still ends up taking a good deal of my CPU (NOTE: The screenshot below is for FJP and not a ThreadpoolExecutor but I see similar usage with TPE). The memory footprint is always under control as excepted. I gave 12G of heap space to the JVM. 
The frequency of young generation GC depends on the MaterializerSettings buffer sizes. I've not tweaked the GC yet. Do you think that can make a difference ? 

BTW, does the a size of 64 mean that there will be 64 items in each buffer in the pipeline. I bumped it to 512 and saw an increase in throughput. 

Here is the configuration and screenshots of one of the better runs I had. I'm not sure if I'm limited by TCP or how Rediscala is using Akka IO at this point. 
Any further insights will be very useful and appreciated. In the mean time I'll continue to play around with different values. 

Thanks again !  

My machine config is 

Darwin 14.0.0 Darwin Kernel Version 14.0.0: Fri Sep 19 00:26:44 PDT 2014; root:xnu-2782.1.97~2/RELEASE_X86_64 x86_64

  Processor Name: Intel Core i7 

  Processor Speed: 2.6 GHz

  Number of Processors: 1

  Total Number of Cores: 4

  L2 Cache (per Core): 256 KB

  L3 Cache: 6 MB

  Memory: 16 GB


application.conf 

rediscala {
    rediscala-client-worker-dispatcher {
    mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
    throughput = 1000
  }
}

actor {
  default-dispatcher {

    type = "Dispatcher"
    executor = "fork-join-executor"
    default-executor {
      fallback = "fork-join-executor"
    }

    # This will be used if you have set "executor = "fork-join-executor""
    fork-join-executor {
      # Min number of threads to cap factor-based parallelism number to
      parallelism-min = 5

      # Max number of threads to cap factor-based parallelism number to
      parallelism-max = 5
    }
    throughput = 1000
  }
}

I'm using the following for the FlowMaterializer

 val settings = MaterializerSettings(system)
 implicit val materializer = FlowMaterializer(settings.copy(maxInputBufferSize = 512, initialInputBufferSize = 512))

Soumya Simanta

unread,
Dec 23, 2014, 3:02:09 AM12/23/14
to akka...@googlegroups.com
Another akka-streams back pressure related question in context of the following piece of code. 

def insertValues(rnd: String): Flow[Int, Boolean] = {
    Flow[Int].mapAsyncUnordered(k => redis.set(k + rnd, message))
  }
val maxSeq = 5000000
val seqSource = Source( () => (1 to maxSeq).iterator )
val streamWithSeqSource = seqSource.via(insertValues(random1)).runWith(blackhole)

My understanding  is that the next request is send to Redis from the client only after a "single" Future is completed. Is this correct ? 
Is there a way I can batch a bunch of set requests and wait for them to be over before I can send a new batch ? 

Akka Team

unread,
Dec 23, 2014, 9:05:58 AM12/23/14
to Akka User List
Hi,

On Tue, Dec 23, 2014 at 12:40 AM, Soumya Simanta <soumya....@gmail.com> wrote:
Endre, 

Thank you for taking the time to explain everything. It was really helpful not only in understanding the streams basics but also to create a better/faster version of what I'm trying to do. 
Before I go any further I want to say that I love Akka streams and it is going to be a useful API for a lot of my future work. Thanks to the Akka team. 

I tweaked both the dispatchers settings as well as the type of dispatcher used by default dispatcher. The program still ends up taking a good deal of my CPU (NOTE: The screenshot below is for FJP and not a ThreadpoolExecutor but I see similar usage with TPE).

I wouldn't worry too much about CPU usage right now this can be an artifact of various scheduling effects (there is a pinned dispatcher, FJP can also distort measurements). You can try to use several parallel streams instead of one and see how things scale out horizontally.
 
The memory footprint is always under control as excepted. I gave 12G of heap space to the JVM. 
The frequency of young generation GC depends on the MaterializerSettings buffer sizes. I've not tweaked the GC yet. Do you think that can make a difference ? 

Since more random elements (boxed integers) are kept in memory longer with higher buffers sizes, this is expected. In reality you would store real domain objects which are already allocated so that is less of an issue.
 

BTW, does the a size of 64 mean that there will be 64 items in each buffer in the pipeline. I bumped it to 512 and saw an increase in throughput. 

I wouldn't go above 128.
 

Here is the configuration and screenshots of one of the better runs I had. I'm not sure if I'm limited by TCP or how Rediscala is using Akka IO at this point. 
Any further insights will be very useful and appreciated. In the mean time I'll continue to play around with different values. 

I believe you maxed out the streams part, so any other bottleneck will be very likely in the Rediscala client or below. Your screenshot shows that around 70MByte/s is achieved which around 0,5Gbit/s. Assuming that TCP is used this is not bad at all.

-Endre

Akka Team

unread,
Dec 23, 2014, 9:15:08 AM12/23/14
to Akka User List
On Tue, Dec 23, 2014 at 4:02 AM, Soumya Simanta <soumya....@gmail.com> wrote:
Another akka-streams back pressure related question in context of the following piece of code. 

def insertValues(rnd: String): Flow[Int, Boolean] = {
    Flow[Int].mapAsyncUnordered(k => redis.set(k + rnd, message))
  }
val maxSeq = 5000000
val seqSource = Source( () => (1 to maxSeq).iterator )
val streamWithSeqSource = seqSource.via(insertValues(random1)).runWith(blackhole)

My understanding  is that the next request is send to Redis from the client only after a "single" Future is completed. Is this correct ? 

No, the number of allowed uncompleted Futures is defined by the buffer size. If there wouldn't be a parallelization between Futures then there would be no need for an ordered and unordered version of the same operation.
 
Is there a way I can batch a bunch of set requests and wait for them to be over before I can send a new batch ? 

If there would be a version of set that accepts a Seq[] of writes, let's say "batchSet" then you could use:

seqSource.grouped(100).mapAsyncUnordered(ks => redis.batchSet(...))

Where grouped makes maximum 100 sized groups from the stream of elements resulting in a stream of sequences. You need API support for that from the Redis client though.

-Endre

Soumya Simanta

unread,
Dec 23, 2014, 9:26:35 PM12/23/14
to akka...@googlegroups.com
Endre, thank you again. 
I think you are correct. It looks like the primary limitation is around not being able to batch more operations in one network call (TCP). 
I increased the message size (10 times) and I'm able to send more bytes per second. At some point I'll hit the network limit. 

The following is for 1 million messages of around 10K each.



Can you explain a little more why you won't recommend going any higher than 128 for the buffer size of FlowMaterializer? 

Also, is there a way I can measure the actual latency distribution while using the akka-streams? 
Something like HDRHistrogram of the all the requests. 

Thanks
-Soumya

Soumya Simanta

unread,
Dec 23, 2014, 11:27:47 PM12/23/14
to akka...@googlegroups.com



def insertValues(rnd: String): Flow[Int, Boolean] = {
    Flow[Int].mapAsyncUnordered(k => redis.set(k + rnd, message))
  }
val maxSeq = 5000000
val seqSource = Source( () => (1 to maxSeq).iterator )
val streamWithSeqSource = seqSource.via(insertValues(random1)).runWith(blackhole)

My understanding  is that the next request is send to Redis from the client only after a "single" Future is completed. Is this correct ? 

No, the number of allowed uncompleted Futures is defined by the buffer size. If there wouldn't be a parallelization between Futures then there would be no need for an ordered and unordered version of the same operation.
 

Understood. So a map version will always be slower. 


 
Is there a way I can batch a bunch of set requests and wait for them to be over before I can send a new batch ? 

If there would be a version of set that accepts a Seq[] of writes, let's say "batchSet" then you could use:

seqSource.grouped(100).mapAsyncUnordered(ks => redis.batchSet(...))

Where grouped makes maximum 100 sized groups from the stream of elements resulting in a stream of sequences. You need API support for that from the Redis client though.

Yeah I tried that. Here is the code and the network IO. Throughput is better, of course at the cost of latency. I've not figured out a way to measure latency. Once I've a reliable way of doing so I can figure out what the difference in latency is. 

  seqSource.grouped(100).mapAsyncUnordered { grp => {
    val tran = redis.transaction()
    for (i <- grp) yield {
      tran.set(i + random2, message)
    }
    tran.exec()
  }
  }.runWith(blackhole) 


Akka Team

unread,
Jan 1, 2015, 8:25:58 AM1/1/15
to Akka User List
Hi,

One thing to remember when using grouped is that it will _always_ wait for 100 elements (or whatever number is configured) except on stream close, so if your input stream is an interactive source then some of the batches might get delayed by a long time (until the 100 elements arrive).

I added a ticket to write a simple batcher element recipe in the new cookbook: https://github.com/akka/akka/issues/16610

-Endre



--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages