scala.concurrent.forkjoin.ForkJoinPool.scan() taking up lots of CPU cycles

4,946 views
Skip to first unread message

Evan Chan

unread,
Apr 15, 2013, 6:40:39 PM4/15/13
to akka...@googlegroups.com
Hi guys,

We have an Akka-based app that takes data from Kafka, does some transformations, and writes them to Cassandra.   Each node does 1-2k messages per sec.   All actors, no futures.  The actors hand data to each other in a straight linear pipeline and we just use the default dispatcher (FJ).

We are seeing some weird behavior where all the threads will do work for a few seconds, then be idle for a few seconds, and this will repeat, and don't quite understand why.   We turned on remote VisualVM and found that ForkJoinPool.scan() method was taking up up to 40% of the CPU time (consistently at least 25%).   Does anyone have insight as to why this might be occuring?
- Are the threads blocked on something?   From other metrics, it doesn't seem like either Kafka or Cassandra is blocking for a couple seconds, and it is strange that all the threads block at the same time (see included VisualVM charts).
- Could a deadlock be occurring?    How would we debug a deadlock?   We are using limited mailbox capacity (at 5000) with push timeout of -1.
- Should we try separating out the IO actors (reading from Kafka and writing to Cassandra) to separate thread pools?

We're pretty sure that GC is not the problem, it was taking up only about 3% of CPU max, and our process is not close to using up its allocated heap.

thanks,
Evan
Screen Shot 2013-04-15 at 3.19.37 PM.png
Screen Shot 2013-04-15 at 3.20.15 PM.png

√iktor Ҡlang

unread,
Apr 15, 2013, 7:03:24 PM4/15/13
to Akka User List
What version of Akka and Scala?


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Viktor Klang
Director of Engineering

Twitter: @viktorklang

Evan Chan

unread,
Apr 15, 2013, 7:04:10 PM4/15/13
to akka...@googlegroups.com
Sorry, Akka 2.1.0 and Scala 2.10.0.

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/6HKTvw4yBnU/unsubscribe?hl=en.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
--
Evan Chan
Senior Software Engineer | 
e...@ooyala.com | (650) 996-4600
www.ooyala.com | blog | @ooyala

√iktor Ҡlang

unread,
Apr 15, 2013, 7:11:57 PM4/15/13
to Akka User List
Alright.

First of all, doing infinite timeouts is almost always a recipe for deadlocks. Check VisualVM or get a live thread dump to see if that's happening.

If you are suspecting that this is a FJP bug, you can do the following:

So what you can try is to grab the latest jsr166e (http://g.oswego.edu/dl/concurrency-interest/) and make an ExecutorServiceProvider with that FJP,
and configure your actors' dispatcher to use that ESP instead of fork-join-pool :

// THIS IS THE AKKA FJP ESP YOU CAN BASE YOUR VERSION ON THIS

class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrerequisites) extends ExecutorServiceConfigurator(config, prerequisites) {
  import ForkJoinExecutorConfigurator._

  def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t match {
    case correct: ForkJoinPool.ForkJoinWorkerThreadFactory ⇒ correct
    case x ⇒ throw new IllegalStateException("The prerequisites for the ForkJoinExecutorConfigurator is a ForkJoinPool.ForkJoinWorkerThreadFactory!")
  }

  class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
                                       val parallelism: Int) extends ExecutorServiceFactory {
    def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing)
  }
  final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
    val tf = threadFactory match {
      case m: MonitorableThreadFactory ⇒
        // add the dispatcher id to the thread names
        m.withName(m.name + "-" + id)
      case other ⇒ other
    }
    new ForkJoinExecutorServiceFactory(
      validate(tf),
      ThreadPoolConfig.scaledPoolSize(
        config.getInt("parallelism-min"),
        config.getDouble("parallelism-factor"),
        config.getInt("parallelism-max")))
  }


Also, if this turns out to be an FJP bug which isn't fixed in jsr166e, we'll need to report that to concurrency-interest, and if it _is_ fixed, I'll try to make sure the new version is lifted into a future scala version.

Does that help?

Cheers,

Evan Chan

unread,
Apr 16, 2013, 3:27:25 AM4/16/13
to akka...@googlegroups.com
Viktor,

Thanks for the quick reply.

On Mon, Apr 15, 2013 at 3:11 PM, √iktor Ҡlang <viktor...@gmail.com> wrote:
Alright.

First of all, doing infinite timeouts is almost always a recipe for deadlocks. Check VisualVM or get a live thread dump to see if that's happening.

Since all the Akka threads seem to "unblock" themselves after a few seconds, is that really a deadlock?   Anyways, here is part of the thread dump.  It's hard to tell what is happening:

"EventStoreIngestor-akka.actor.default-dispatcher-15" - Thread t@53
   java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <1b614425> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:1594)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

   Locked ownable synchronizers:
- None

"EventStoreIngestor-akka.actor.default-dispatcher-14" - Thread t@52
   java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <1b614425> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:1594)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

   Locked ownable synchronizers:
- None

"EventStoreIngestor-akka.actor.default-dispatcher-13" - Thread t@51
   java.lang.Thread.State: WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <1b614425> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:1594)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

   Locked ownable synchronizers:
- None


 

If you are suspecting that this is a FJP bug, you can do the following:

So what you can try is to grab the latest jsr166e (http://g.oswego.edu/dl/concurrency-interest/) and make an ExecutorServiceProvider with that FJP,
and configure your actors' dispatcher to use that ESP instead of fork-join-pool :

Before I try that, I'm going to try just using a plain ExecutorService....  

Roland Kuhn

unread,
Apr 16, 2013, 3:51:22 AM4/16/13
to akka...@googlegroups.com
Hi Evan,

are all threads exactly like that, or is there maybe one which is blocking on something else? Trying a completely different executor is a good way of narrowing it down, please tell us what you find.

Regards,

Roland


Dr. Roland Kuhn
Akka Tech Lead
Typesafe – Empowering professional developers to build amazing apps.
twitter: @rolandkuhn

Endre Sándor Varga

unread,
Apr 16, 2013, 4:09:56 AM4/16/13
to akka...@googlegroups.com
2013.04.16. 09:51:22 dátumon Roland Kuhn <goo...@rkuhn.info> írta:

> Hi Evan,
>
> are all threads exactly like that, or is there maybe one which is
> blocking on something else? Trying a completely different executor is a
> good way of narrowing it down, please tell us what you find.

What does "org.apache.thrift.transport.TIOStreamTransport.read" do? It has
the second largest self time.

Evan Chan

unread,
Apr 16, 2013, 12:03:24 PM4/16/13
to akka...@googlegroups.com
I believe it is when the Cassandra Thrift client is waiting for data back from Cassandra.
 


--
     Read the docs: http://akka.io/docs/
     Check the FAQ: http://akka.io/faq/
     Search the archives: https://groups.google.com/group/akka-user
---You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.


Endre Sándor Varga

unread,
Apr 16, 2013, 12:17:42 PM4/16/13
to akka...@googlegroups.com

> I believe it is when the Cassandra Thrift client is waiting for data back
> from Cassandra.

Seems to be a suspiciously synchronous API to me. Can this cause problems?

Evan Chan

unread,
Apr 16, 2013, 12:50:24 PM4/16/13
to akka...@googlegroups.com
On Mon, Apr 15, 2013 at 11:51 PM, Roland Kuhn <goo...@rkuhn.info> wrote:
Hi Evan,

are all threads exactly like that, or is there maybe one which is blocking on something else? Trying a completely different executor is a good way of narrowing it down, please tell us what you find.

Roland,

There is one thread which is blocked on Kafka:

"EventStoreIngestor-akka.actor.default-dispatcher-4" - Thread t@20
   java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <5cc65c7a> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:62)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:36)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:43)


So I suppose that one theory is that things are just waiting for Kafka.

It sure would be nice if we could have an Akka plugin for VisualVM, an overlay on top of the threads visualizer which lets you see which actors are blocking.


By the way, I did try switching the default-dispatcher to use the ExecutionService by inserting this in my config:

executor = "thread-pool-executor"
 
What didn't change:
- I still see CPU cycling between idle and 80% every few seconds
- throughput is about the same

What changed:
- no longer see ForkJoinPool.scan() in VisualVM CPU sampling
- Thread dump stack traces look slightly different

Bottom line is, I'll have to try something else.  I will also try disabling the bounded mailbox (by commenting out mailbox-capacity).

thanks,
Evan

Evan Chan

unread,
Apr 16, 2013, 12:55:13 PM4/16/13
to akka...@googlegroups.com
By the way, I just tried disabling the mailbox capacity and it didn't affect the performance.

Evan Chan

unread,
Apr 16, 2013, 12:59:05 PM4/16/13
to akka...@googlegroups.com
In theory one can use nonblocking Thrift APIs, but no major Scala/Java Cassandra client I know of actually implements this.  So unfortunately we are stuck using thread pools.  :(

--
     Read the docs: http://akka.io/docs/
     Check the FAQ: http://akka.io/faq/
     Search the archives: https://groups.google.com/group/akka-user
---You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/6HKTvw4yBnU/unsubscribe?hl=en.
To unsubscribe from this group and all its topics, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user?hl=en.
For more options, visit https://groups.google.com/groups/opt_out.


Roland Kuhn

unread,
Apr 16, 2013, 2:47:53 PM4/16/13
to akka...@googlegroups.com
This means that your problem is caused by the blocking Kafka API. FJP.scan() is just something which idle threads in that pool do periodically to pick up new work. Since it is the reader side I’d surmise that your Akka system is just a little too fast for the producer to keep up :-)

Regards,

Roland

Evan Chan

unread,
Apr 17, 2013, 2:14:06 PM4/17/13
to akka...@googlegroups.com
Roland,

Thanks.  I've confirmed that was the problem.

√iktor Ҡlang

unread,
Apr 17, 2013, 2:21:48 PM4/17/13
to Akka User List
Glad Akka wasn't at fault :-) (Go Akka go!)

Roland Kuhn

unread,
Apr 17, 2013, 2:24:27 PM4/17/13
to akka...@googlegroups.com
Glad we could help figuring it out!

Happy hAkking,

Roland

Rob Withers

unread,
Jul 20, 2013, 5:13:23 PM7/20/13
to akka...@googlegroups.com
What is the best solution for this issue, and if it is still needed?  If so, is there some code I could use?

thanks,
rob

Akka Team

unread,
Jul 22, 2013, 6:21:14 AM7/22/13
to Akka User List
Hi Rob!


What is the best solution for this issue, and if it is still needed?  If so, is there some code I could use?

Which issue are you exactly referring to? Using, Kafka, or using Thrift, or using blocking APIs in general?

-Endre
 

thanks,
rob


On Wednesday, April 17, 2013 12:24:27 PM UTC-6, rkuhn wrote:
Glad we could help figuring it out!

Happy hAkking,

Roland

17 apr 2013 kl. 20:14 skrev Evan Chan:

Roland,

Thanks.  I've confirmed that was the problem.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.
 
 



--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

Rob Withers

unread,
Jul 22, 2013, 9:31:08 AM7/22/13
to akka...@googlegroups.com
Hi Endre,

Yes, I am referring to using kafka and experiencing high cpu utilization.  I never saw a description of what the best solution for this issue was.  I am trying to setup a running example or akka and kafka, so I have not yet experienced this issue, but I am just trying to prepare for it.

thanks,
rob

√iktor Ҡlang

unread,
Jul 22, 2013, 10:05:26 AM7/22/13
to Akka User List
Are you using managed blocking or not?

Cheers,

Akka Team

unread,
Jul 22, 2013, 10:06:29 AM7/22/13
to Akka User List
Hi Rob,

The issue was with the blocking code, to quote Roland's answer: "This means that your problem is caused by the blocking Kafka API. FJP.scan() is just something which idle threads in that pool do periodically to pick up new work. Since it is the reader side I’d surmise that your Akka system is just a little too fast for the producer to keep up :-)"

High CPU utilization is not always the sign of load, it can happen in case of idleness if there is some kind of busy-waiting involved.

-Endre


On Mon, Jul 22, 2013 at 3:31 PM, Rob Withers <reef...@gmail.com> wrote:

Rob Withers

unread,
Jul 22, 2013, 9:31:33 PM7/22/13
to akka...@googlegroups.com
Hi y'all,

Yes, the kafka api is blocking on consumerIterator.next().  Let me get the Mailbox working, then I can evaluate what is actually happening here.  I'll flag this thread for later.

thanks,
rob
Reply all
Reply to author
Forward
0 new messages