Akka Persistent Actor stops processing messages

695 views
Skip to first unread message

Richard Ney

unread,
Jan 13, 2017, 3:51:10 AM1/13/17
to Akka User List
First a thank you to people answering questions.

Against first the environment

Akka 2.4.11
Scala 2.11.8
Cassandra 3.9

I've been chasing an issue for days where one of my persistent actors appears to stop processing message to the point where the JVM memory grows to 8GB before it gets an out of memory crash. Upon analyzing the dump I'm finding the internal stash is overflowing with billions of messages queued. I've done several 'jcmd <pid> Thread.print' commands to try and see if the threads are blocking in my code. The dumps are revealing nothing and I can't find a single one that's actually executing my code. I examined all the akka.actor.default-dispatcher threads which appear to be in a waiting state:

"manhattan-akka.actor.default-dispatcher-43" #89 prio=5 os_prio=0 tid=0x00007fa4f0007800 nid=0x97 waiting on condition [0x00007fa5a435b000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000005c0fbdf80> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
    at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

And the Cassandra plug-in threads are also waiting:

"manhattan-cassandra-plugin-default-dispatcher-49" #95 prio=5 os_prio=0 tid=0x00007fa504049000 nid=0x9d waiting on condition [0x00007fa52f5fa000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000005c1b65640> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
    at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Trying to figure out if this a bug where a journal write is never returning which is all I can think of at this time. I've now set a limit on the internal stash but that will only prevent the OOM exception but my actor will still stop processing data.

Any pearls of wisdom are appreciated.

-Richard

Richard Ney

unread,
Jan 15, 2017, 9:56:42 PM1/15/17
to Akka User List
Added to this based on my latest test runs. At this time I've based on code changes witht he most recent being the change from 'persist' to 'persistAsync' I've now observed my message backing up in three different queues

mailbox --> internalStash --> journalBatch

The attached bitmap shows the chain. Is this a possible but with the Cassandra journal plug in or maybe an issue with my custom serializers? Any tips on how to chase this issue?

-Richard
Screenshot 2017-01-15 18.14.48.png

Richard Ney

unread,
Jan 16, 2017, 3:24:48 AM1/16/17
to Akka User List
I should clarify my post. My messages aren't backing up in those three queues at the same time, the problem has moved each time I've changed the actor's queuing behavior. The first time I examined a heap dump all the objects were in the mailbox of the actor. So I changed the mailbox to use a bounded deque. The issue occurred again with the memory increasing so the next dump showed all the messages queuing up in the internalStash. So I changed the actor again to use 'persistAsync' instead of 'persist' and this time the dump shows all the messages queuing in the journalBatch.

So maybe my question should be: What is the message flow from the journalBatch into the Cassandra message table? Is the same dispatcher thread that processes messages used to move the messages to the Cassandra plug-in so if the 'persistAsync' or receiveCommand handler takes too much time processing the message we end up backing up messages in queue?

As my other message said, just trying to figure out where to look next in the chain since this only happens under load.

Regards,

Richard

Patrik Nordwall

unread,
Jan 17, 2017, 6:06:34 AM1/17/17
to akka...@googlegroups.com
I think the bottleneck is the serialization or the writes to Cassandra, both happening in the Cassandra journal (akka-persistence-cassandra). Are you using the latest version of akka-persistence-cassandra? Such things should run on cassandra-plugin-default-dispatcher or cassandra-plugin-blocking-dispatcher

The other queues are probably just symptoms of that.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Akka Tech Lead
Lightbend -  Reactive apps on the JVM
Twitter: @patriknw

Richard Ney

unread,
Jan 18, 2017, 3:31:29 AM1/18/17
to Akka User List
I've done several thread dumps and the two dispatcher pools dealing with Cassandra have all their threads WAITING or TIMED_WAIT and based on their call stacks it appears they aren't doing any work like waiting for Cassandra to return unless I'm reading them wrong.

"manhattan-cassandra-plugin-default-dispatcher-42359" #47941 prio=5 os_prio=0 tid=0x00007f33e805c800 nid=0x3d35 waiting on condition [0x00007f336eec6000]

   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000005c19a8e28> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)

    at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


"manhattan-cassandra-plugin-blocking-dispatcher-42441" #48090 prio=5 os_prio=0 tid=0x00007f33f016b000 nid=0x3e2e waiting on condition [0x00007f33805f5000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000006c37a33f0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

When I examine the memory dump and diver into EventSourced.scala I see that the journalBatch queue is full and the writeInProgress variable is set to TRUE so I know the Actor thinks a write is in progress. Correct me if I'm wrong but shouldn't a Cassandra write that takes more than 30 seconds set off the CircuitBreaker pattern in writeMessages inside the AsyncWriteJournal based on this code?
try breaker.withCircuitBreaker(asyncWriteMessages(prep))
catch { case NonFatal(e) ⇒ Future.failed(e) }
It seems that would be one of the logical protections to put in place for a fail fast approach to interacting with an external repository that doesn't return.

Again thanks for any help, I'm currently in the process of building a local version of Akka that I can instrument to try tracking this issue futher.

-Richard
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Patrik Nordwall

unread,
Jan 18, 2017, 4:36:05 AM1/18/17
to akka...@googlegroups.com
On Wed, Jan 18, 2017 at 9:31 AM, Richard Ney <kamisa...@gmail.com> wrote:
I've done several thread dumps and the two dispatcher pools dealing with Cassandra have all their threads WAITING or TIMED_WAIT and based on their call stacks it appears they aren't doing any work like waiting for Cassandra to return unless I'm reading them wrong.

"manhattan-cassandra-plugin-default-dispatcher-42359" #47941 prio=5 os_prio=0 tid=0x00007f33e805c800 nid=0x3d35 waiting on condition [0x00007f336eec6000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000005c19a8e28> (a akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
    at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


"manhattan-cassandra-plugin-blocking-dispatcher-42441" #48090 prio=5 os_prio=0 tid=0x00007f33f016b000 nid=0x3e2e waiting on condition [0x00007f33805f5000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000006c37a33f0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

When I examine the memory dump and diver into EventSourced.scala I see that the journalBatch queue is full and the writeInProgress variable is set to TRUE so I know the Actor thinks a write is in progress. Correct me if I'm wrong but shouldn't a Cassandra write that takes more than 30 seconds set off the CircuitBreaker pattern in writeMessages inside the AsyncWriteJournal based on this code?
try breaker.withCircuitBreaker(asyncWriteMessages(prep))
catch { case NonFatal(e) ⇒ Future.failed(e) }
It seems that would be one of the logical protections to put in place for a fail fast approach to interacting with an external repository that doesn't return.

Yes, it should.

Do you have a mimimized reproducer that we can use to debug the issue?

Can you see if you can trigger the problem earlier, with less load, by slowing down the serializer or writes (adding a sleep there, or patching akka-persistence-cassandra with sleeps and extra logging)? 
 
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages