Threads getting blocked in akka-remote: Help!

58 views
Skip to first unread message

Bruce Mitchener

unread,
Jun 1, 2011, 4:40:01 AM6/1/11
to akka...@googlegroups.com
Hello,

We have encountered what seems like it might be a bit of a bottleneck in akka-remote:

akka:event-driven:dispatcher:global-12 [BLOCKED]
akka.remote.netty.RemoteClient.send(Object, Option, Option, InetSocketAddress, long, boolean, ActorRef, Option, ActorType)
akka.remote.netty.NettyRemoteClientModule$$anonfun$send$1.apply(RemoteClient)
akka.remote.netty.NettyRemoteClientModule$$anonfun$send$1.apply(Object)
akka.remote.netty.NettyRemoteClientModule$class.withClientFor(NettyRemoteClientModule, InetSocketAddress, Option, Function1)
akka.remote.netty.NettyRemoteSupport.withClientFor(InetSocketAddress, Option, Function1)
akka.remote.netty.NettyRemoteClientModule$class.send(NettyRemoteClientModule, Object, Option, Option, InetSocketAddress, long, boolean, ActorRef, Option, ActorType, Option)
akka.remote.netty.NettyRemoteSupport.send(Object, Option, Option, InetSocketAddress, long, boolean, ActorRef, Option, ActorType, Option)
akka.actor.RemoteActorRef.postMessageToMailbox(Object, Option)
akka.actor.ScalaActorRef$class.$bang(ScalaActorRef, Object, Option)
akka.actor.RemoteActorRef.$bang(Object, Option)

With a pretty vanilla configuration, we have some jobs running and they're producing a fair bit of output and sending that over akka-remote to another service which then sends it to our clients:

WorkerService -> FrontEndService -> Clients

The WorkerService is routinely seeing 3-7 threads blocking with the above stack trace while processing stuff in (typically) 14 out of 16 threads in the global dispatcher.

Now, the code for RemoteClient.send starts out like:

  def send[T](
    message: Any,
    senderOption: Option[ActorRef],
    senderFuture: Option[CompletableFuture[T]],
    remoteAddress: InetSocketAddress,
    timeout: Long,
    isOneWay: Boolean,
    actorRef: ActorRef,
    typedActorInfo: Option[Tuple2[String, String]],
    actorType: AkkaActorType): Option[CompletableFuture[T]] = synchronized { // FIXME: find better strategy to prevent race

So, I'm a bit suspicious of that synchronization there.

We haven't altered the akka.remote.client.buffering configs, so retry-message-send-on-failure is still on and capacity is still -1.

Netty's writes are asynchronous, so I'm curious:

  • Is this synchronized really necessary?
  • Why?
  • Is it in the right place? Could it be covering less of the execution of the send?
  • What are some good options to get around this to not spend as much time blocking?
  • Have we done something wrong to end up here?
 - Bruce

√iktor Ҡlang

unread,
Jun 1, 2011, 10:07:51 AM6/1/11
to akka...@googlegroups.com
Hey Bruce,


That's fixed in Akka 2.0 because the security layer (secure cookie handshake) has been reworked.

It would be a lot of work trying to backport that, but what we could do for 1.2 is to only synchronize of secure cookie handshaking is enabled.

Open a ticket,

Cheers,


 
 - Bruce

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.



--
Viktor Klang

Akka Tech Lead
Typesafe - Enterprise-Grade Scala from the Experts

Twitter: @viktorklang

Raymond Roestenburg

unread,
Jun 7, 2011, 4:23:27 PM6/7/11
to akka...@googlegroups.com
Hi Viktor, 

I have just upgraded from somewhere before 1.0 to 1.1.2 for an in production system which has quite high performance requirements (we are having some issues with reconnects). In that previous version there was no synchronized block and no cookies yet, so I would love to see the fix you describe in as soon as possible, if it is only necessary for the secure cookie feature. If it is as simple as just taking out the synchronized I guess I could patch it myself, but I would of course rather have a version update :)
Looking forward to 1.2

2011/6/1 √iktor Ҡlang <viktor...@gmail.com>



--
Raymond Roestenburg

Bruce Mitchener

unread,
Jun 12, 2011, 2:42:28 PM6/12/11
to akka...@googlegroups.com
I must be missing something ... I don't see where that code has much to do with the secure cookie stuff in a way that would matter for race conditions.

I don't think I filed this ticket... I will do so now.

 - Bruce

2011/6/1 √iktor Ҡlang <viktor...@gmail.com>

Bruce Mitchener

unread,
Jun 12, 2011, 2:45:30 PM6/12/11
to akka...@googlegroups.com

√iktor Ҡlang

unread,
Jun 12, 2011, 2:52:13 PM6/12/11
to akka...@googlegroups.com
On Sun, Jun 12, 2011 at 8:42 PM, Bruce Mitchener <bruce.m...@gmail.com> wrote:
I must be missing something ... I don't see where that code has much to do with the secure cookie stuff in a way that would matter for race conditions.

https://github.com/jboner/akka/blob/release-1.2/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala#L230

Two or more Threads can contend for the "first" message, and the ones that didn't win the race can be received _before_ the one who did, which will be unauthenticated and will then reject the connection.

This will be fixed in Akka 2.0 by doing the cookie handshake as a separate request.

If you aren't using the secure cookie feature, you can experiment with turning it off in the config and build a version of Akka without the synchronized-block and see if it works.
 

I don't think I filed this ticket... I will do so now.

No, need, there has already been a ticket for it.

Cheers,

 

√iktor Ҡlang

unread,
Jun 12, 2011, 3:05:10 PM6/12/11
to akka...@googlegroups.com
There's another "fix" that we could consider for 1.2, and that is to remove the "isAuthenticated" AtomicBoolean and send any-if-any cookie on all messages.

Want me to do that and then you can see if that works for you?

Cheers,


2011/6/12 √iktor Ҡlang <viktor...@gmail.com>

√iktor Ҡlang

unread,
Jun 12, 2011, 3:24:44 PM6/12/11
to akka...@googlegroups.com
Hi Raymond,

I've fixed the bottleneck in the "release-1.2" branch by dropping the synchronized-block and resorted to send the cookie for each message, so for maximum performance for 1.2, omit the cookie in the config.

For 2.0 we've switched to a handshake on connection established, which will be as performant as w/o the cookie.

Please try the "release-1.2" branch out and see if it solves your issue.

Sorry for the delay, I've been traveling a lot lately.

Cheers,

Bruce Mitchener

unread,
Jun 13, 2011, 1:52:19 AM6/13/11
to akka...@googlegroups.com
Viktor,

I thought about this some while sleeping apparently.

Right now, the Netty pipeline looks like:

    length-prefixed-decoder -> zlib decoder -> protobuf decoder -> execution handler -> remote server

and then:

    remote server -> protobuf encoder -> zlib compressor -> length prefix encoder

Couldn't you put an optional step in the pipeline for the security cookie handling and then after the first message, remove it from the channel pipeline?

 - Bruce

2011/6/13 √iktor Ҡlang <viktor...@gmail.com>

√iktor Ҡlang

unread,
Jun 13, 2011, 3:19:26 AM6/13/11
to akka...@googlegroups.com
Hi Bruce,

On Mon, Jun 13, 2011 at 7:52 AM, Bruce Mitchener <bruce.m...@gmail.com> wrote:
Viktor,

I thought about this some while sleeping apparently.

Right now, the Netty pipeline looks like:

    length-prefixed-decoder -> zlib decoder -> protobuf decoder -> execution handler -> remote server

and then:

    remote server -> protobuf encoder -> zlib compressor -> length prefix encoder

Couldn't you put an optional step in the pipeline for the security cookie handling and then after the first message, remove it from the channel pipeline?

Yes and no, we're using the StaticChannelPipeline for performance, so we can't remove stages at runtime, but it will conceptually work the same way.

Cheers,

 

Raymond Roestenburg

unread,
Jun 13, 2011, 5:11:34 AM6/13/11
to akka...@googlegroups.com
Great, thanks!
I'll try out release 1.2.

2011/6/12 √iktor Ҡlang <viktor...@gmail.com>

√iktor Ҡlang

unread,
Jun 13, 2011, 5:14:33 AM6/13/11
to akka...@googlegroups.com
On Mon, Jun 13, 2011 at 11:11 AM, Raymond Roestenburg <raymond.r...@gmail.com> wrote:
Great, thanks!
I'll try out release 1.2.

You're welcome!
 

Raymond Roestenburg

unread,
Jun 13, 2011, 1:29:20 PM6/13/11
to akka...@googlegroups.com
I've published akka 1.2-snapshot locally, everything fine. After that I'm trying to publish akka-modules locally, but get compile errors.
Is the akka-modules 1.2-snapshot and akka 1.2 in sync and up to date?

I get this:

[error] /home/rroestenburg/akka-modules/akka-camel/src/main/scala/akka/camel/Producer.scala:118: too many arguments for method postMessageToMailboxAndCreateFutureResultWithTimeout: (message: Any, timeout: Long, channel: akka.actor.UntypedChannel)akka.dispatch.ActorCompletableFuture
[error]           producer.postMessageToMailboxAndCreateFutureResultWithTimeout(result, producer.timeout, sender, senderFuture)

and this:
[error] /home/rroestenburg/akka-modules/akka-dispatcher-extras/src/main/scala/akka/dispatch/HawtDispatcher.scala:141: class HawtDispatcher needs to be abstract, since method mailboxIsEmpty in trait MessageDispatcher of type (actorRef: akka.actor.ActorRef)Boolean is not defined
[error] class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = globalQueue) extends MessageDispatcher  {
[error]       ^
[error] one error found

akka modules: (release-1.2) git commit 63eb29c0008e232801d5b16497fb546a0039fb57
akka : (release-1.2) git commit 654de23f1e3af0c749a9153432a23cc45edb93d8

I have cleaned out my ivy cache a couple of times, ran sbt clean clean-lib update etc, hopefully its not one of those issues. Has anybody else had these problems?

2011/6/13 √iktor Ҡlang <viktor...@gmail.com>

Roland Kuhn

unread,
Jun 13, 2011, 1:58:41 PM6/13/11
to akka...@googlegroups.com
akka-modules 1.2 is unfortunately not in sync with akka 1.2 at the moment. The change which introduced the first breakage you detail below was done by me, but I don’t have commit rights on akka-modules …

You can fix the first one by changing "sender, senderFuture" => "channel" on the line with the error.

Jonas Bonér

unread,
Jun 13, 2011, 2:01:47 PM6/13/11
to akka...@googlegroups.com
Now you have commit rights.

--
Jonas Bonér
CTO Typesafe - Enterprise-Grade Scala from the Experts
Phone: +46 733 777 123
Twitter: @jboner

Roland Kuhn

unread,
Jun 13, 2011, 2:38:40 PM6/13/11
to akka...@googlegroups.com
Okay, I fixed both compilation errors. Someone with better knowledge of HawtDispatch should have a look at my “fix”, though.

Regards,

Roland

Raymond Roestenburg

unread,
Jun 13, 2011, 2:44:12 PM6/13/11
to akka...@googlegroups.com
Thanks Roland!

Derek Williams

unread,
Jun 13, 2011, 2:47:20 PM6/13/11
to akka...@googlegroups.com

It's correct enough. Concurrent queues save on some complexity if you don't keep track of it's size. I did the same with PromiseQueue.

√iktor Ҡlang

unread,
Jun 13, 2011, 6:23:56 PM6/13/11
to akka...@googlegroups.com
Awesome, thanks for staying on top of things!

Raymond Roestenburg

unread,
Jun 13, 2011, 6:36:09 PM6/13/11
to akka...@googlegroups.com
Thanks again, everything works and compiles.
I just noticed something else though, which has happened since 1.1 and has not changed now that I have moved to 1.2.
In some unit tests that we have on the project, I get the following:

akka.actor.Scheduler$SchedulerException: Failed to scheduleOnce a Runnable
        at akka.actor.Scheduler$.scheduleOnce(Scheduler.scala:102) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.dispatch.MessageDispatcher$class.unregister(MessageHandling.scala:140) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.dispatch.ThreadBasedDispatcher.unregister(ThreadBasedDispatcher.scala:40) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.dispatch.ThreadBasedDispatcher.unregister(ThreadBasedDispatcher.scala:17) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.dispatch.MessageDispatcher$$anonfun$detach$1.apply(MessageHandling.scala:83) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.util.ReentrantGuard.withGuard(LockUtil.scala:20) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.dispatch.MessageDispatcher$class.detach(MessageHandling.scala:82) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.dispatch.ExecutorBasedEventDrivenDispatcher.detach(ExecutorBasedEventDrivenDispatcher.scala:66) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.actor.LocalActorRef$$anonfun$stop$1.apply$mcV$sp(ActorRef.scala:705) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.actor.LocalActorRef$$anonfun$stop$1.apply(ActorRef.scala:702) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.actor.LocalActorRef$$anonfun$stop$1.apply(ActorRef.scala:702) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.util.ReentrantGuard.withGuard(LockUtil.scala:20) ~[akka-actor-1.2-SNAPSHOT.jar:na]
        at akka.actor.LocalActorRef.stop(ActorRef.scala:701) ~[akka-actor-1.2-SNAPSHOT.jar:na]
       ...

It happens on tearDown of the unit test, when I stop all the actors I have started (some of them stop other actorRefs that they manage on postStop)
Anyone got an idea why this happens?

2011/6/14 √iktor Ҡlang <viktor...@gmail.com>

√iktor Ҡlang

unread,
Jun 13, 2011, 6:38:22 PM6/13/11
to akka...@googlegroups.com
I think it's related to the introduction of "dispatchFuture" in the EBEDD.
Can you open a ticket, and do you have a reproducible testcase?

Cheers,

Raymond Roestenburg

unread,
Jun 14, 2011, 2:22:10 AM6/14/11
to Akka User List
Created ticket #924, will try to add reproducible testcase

On Jun 14, 12:38 am, √iktor Ҡlang <viktor.kl...@gmail.com> wrote:
> I think it's related to the introduction of "dispatchFuture" in the EBEDD.
> Can you open a ticket, and do you have a reproducible testcase?
>
> Cheers,
> √
>
> On Tue, Jun 14, 2011 at 12:36 AM, Raymond Roestenburg <
>
>
>
>
>
>
>
> raymond.roestenb...@gmail.com> wrote:
> > Thanks again, everything works and compiles.
> > I just noticed something else though, which has happened since 1.1 and has
> > not changed now that I have moved to 1.2.
> > In some unit tests that we have on the project, I get the following:
>
> > akka.actor.Scheduler$SchedulerException: Failed to scheduleOnce a Runnable
> >         at akka.actor.Scheduler$.scheduleOnce(Scheduler.scala:102)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at
> > akka.dispatch.MessageDispatcher$class.unregister(MessageHandling.scala:140)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at
> > akka.dispatch.ThreadBasedDispatcher.unregister(ThreadBasedDispatcher.scala: 40)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at
> > akka.dispatch.ThreadBasedDispatcher.unregister(ThreadBasedDispatcher.scala: 17)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at
> > akka.dispatch.MessageDispatcher$$anonfun$detach$1.apply(MessageHandling.sca la:83)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at akka.util.ReentrantGuard.withGuard(LockUtil.scala:20)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at
> > akka.dispatch.MessageDispatcher$class.detach(MessageHandling.scala:82)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at
> > akka.dispatch.ExecutorBasedEventDrivenDispatcher.detach(ExecutorBasedEventD rivenDispatcher.scala:66)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at
> > akka.actor.LocalActorRef$$anonfun$stop$1.apply$mcV$sp(ActorRef.scala:705)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at
> > akka.actor.LocalActorRef$$anonfun$stop$1.apply(ActorRef.scala:702)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at
> > akka.actor.LocalActorRef$$anonfun$stop$1.apply(ActorRef.scala:702)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at akka.util.ReentrantGuard.withGuard(LockUtil.scala:20)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >         at akka.actor.LocalActorRef.stop(ActorRef.scala:701)
> > ~[akka-actor-1.2-SNAPSHOT.jar:na]
> >        ...
>
> > It happens on tearDown of the unit test, when I stop all the actors I have
> > started (some of them stop other actorRefs that they manage on postStop)
> > Anyone got an idea why this happens?
>
> > 2011/6/14 √iktor Ҡlang <viktor.kl...@gmail.com>
>
> >> Awesome, thanks for staying on top of things!
>
> >> On Mon, Jun 13, 2011 at 8:38 PM, Roland Kuhn <goo...@rkuhn.info> wrote:
>
> >>> Okay, I fixed both compilation errors. Someone with better knowledge of
> >>> HawtDispatch should have a look at my “fix”, though.
>
> >>> Regards,
>
> >>> Roland
>
> >>> On Jun 13, 2011, at 20:01 , Jonas Bonér wrote:
>
> >>>  Now you have commit rights.
>
> >>> --
> >>> Jonas Bonér
> >>> CTO Typesafe <http://www.typesafe.com/> - Enterprise-Grade Scala from
> >>> the Experts
> >>> Phone: +46 733 777 123
> >>> Twitter: @jboner <http://twitter.com/jboner>
>
> >>> On måndag den 13 juni 2011 at 19.58, Roland Kuhn wrote:
>
> >>> akka-modules 1.2 is unfortunately not in sync with akka 1.2 at the
> >>> moment. The change which introduced the first breakage you detail below was
> >>> done by me, but I don’t have commit rights on akka-modules …
>
> >>> You can fix the first one by changing "sender, senderFuture" => "channel"
> >>> on the line with the error.
>
> >>> On Jun 13, 2011, at 19:29 , Raymond Roestenburg wrote:
>
> >>> I've published akka 1.2-snapshot locally, everything fine. After that I'm
> >>> trying to publish akka-modules locally, but get compile errors.
> >>> Is the akka-modules 1.2-snapshot and akka 1.2 in sync and up to date?
>
> >>> I get this:
>
> >>> [error]
> >>> /home/rroestenburg/akka-modules/akka-camel/src/main/scala/akka/camel/Produc er.scala:118:
> >>> too many arguments for method
> >>> postMessageToMailboxAndCreateFutureResultWithTimeout: (message: Any,
> >>> timeout: Long, channel:
> >>> akka.actor.UntypedChannel)akka.dispatch.ActorCompletableFuture
> >>> [error]
> >>> producer.postMessageToMailboxAndCreateFutureResultWithTimeout(result,
> >>> producer.timeout, sender, senderFuture)
>
> >>> and this:
> >>> [error]
> >>> /home/rroestenburg/akka-modules/akka-dispatcher-extras/src/main/scala/akka/ dispatch/HawtDispatcher.scala:141:
> >>> class HawtDispatcher needs to be abstract, since method mailboxIsEmpty in
> >>> trait MessageDispatcher of type (actorRef: akka.actor.ActorRef)Boolean is
> >>> not defined
> >>> [error] class HawtDispatcher(val aggregate: Boolean = true, val parent:
> >>> DispatchQueue = globalQueue) extends MessageDispatcher  {
> >>> [error]       ^
> >>> [error] one error found
>
> >>> akka modules: (release-1.2) git commit
> >>> 63eb29c0008e232801d5b16497fb546a0039fb57
> >>> akka : (release-1.2) git commit 654de23f1e3af0c749a9153432a23cc45edb93d8
>
> >>> I have cleaned out my ivy cache a couple of times, ran sbt clean
> >>> clean-lib update etc, hopefully its not one of those issues. Has anybody
> >>> else had these problems?
>
> >>> 2011/6/13 √iktor Ҡlang <viktor.kl...@gmail.com>
>
> >>> On Mon, Jun 13, 2011 at 11:11 AM, Raymond Roestenburg <
> >>> raymond.roestenb...@gmail.com> wrote:
>
> >>>  Great, thanks!
> >>> I'll try out release 1.2.
>
> >>> You're welcome!
>
> >>> 2011/6/12 √iktor Ҡlang <viktor.kl...@gmail.com>
>
> >>> Hi Raymond,
>
> >>> I've fixed the bottleneck in the "release-1.2" branch by dropping the
> >>> synchronized-block and resorted to send the cookie for each message, so for
> >>> maximum performance for 1.2, omit the cookie in the config.
>
> >>> For 2.0 we've switched to a handshake on connection established, which
> >>> will be as performant as w/o the cookie.
>
> >>> Please try the "release-1.2" branch out and see if it solves your issue.
>
> >>> Sorry for the delay, I've been traveling a lot lately.
>
> >>> Cheers,
> >>> √
>
> >>> On Tue, Jun 7, 2011 at 10:23 PM, Raymond Roestenburg <
> >>> raymond.roestenb...@gmail.com> wrote:
>
> >>> Hi Viktor,
>
> >>> I have just upgraded from somewhere before 1.0 to 1.1.2 for an in
> >>> production system which has quite high performance requirements (we are
> >>> having some issues with reconnects). In that previous version there was no
> >>> synchronized block and no cookies yet, so I would love to see the fix you
> >>> describe in as soon as possible, if it is only necessary for the secure
> >>> cookie feature. If it is as simple as just taking out the synchronized I
> >>> guess I could patch it myself, but I would of course rather have a version
> >>> update :)
> >>> Looking forward to 1.2
>
> >>> 2011/6/1 √iktor Ҡlang <viktor.kl...@gmail.com>
> >>>    - Is this synchronized really necessary?
> >>>    - Why?
> >>>    - Is it in the right place? Could it be covering less of the
> >>>    execution of the send?
> >>>    - What are some good options to get around this to not spend as much
> >>>    time blocking?
> >>>    - Have we done something wrong to end up here?
>
> >>> That's fixed in Akka 2.0 because the security layer (secure cookie
> >>> handshake) has been reworked.
>
> >>> It would be a lot of work trying to backport that, but what we
>
> ...
>
> read more »

√iktor Ҡlang

unread,
Jun 14, 2011, 3:11:42 AM6/14/11
to akka...@googlegroups.com
Great, thanks


On Tue, Jun 14, 2011 at 8:22 AM, Raymond Roestenburg <raymond.r...@gmail.com> wrote:
Created ticket #924, will try to add reproducible testcase


--
Viktor Klang

Akka Tech Lead
Typesafe - Enterprise-Grade Scala from the Experts

Twitter: @viktorklang

Reply all
Reply to author
Forward
0 new messages