unstashAll can leave unprocessed messages in actor's mailbox?

383 views
Skip to first unread message

Mark Hatton

unread,
Aug 10, 2013, 8:26:06 AM8/10/13
to akka...@googlegroups.com
Hi,

Just thought I'd share an issue I came across on my first foray into using stash and become.

The back-story is that I wanted a (non-blocking) actor implementation that would call a method returning Future[T].  And I wanted the actor to not dequeue the next message from its mailbox until this future had completed.  I started with this:-

case object Execute

class StashTestActor extends Actor with Stash {
  def receive = {

    case Execute =>

      context.become {
        case Execute => stash()
      }

      def doExec() = Future( println("Execute") ) // assume this is a long-running method that returns a future

      doExec() andThen { case _ =>
        context.become(receive)
        unstashAll()
      }

  }
}


... however this does not work as expected; sending 10 messages to the above actor results in the "Execute" println only being invoked once.  Here's some simple calling code:

object Main {

  def main(args: Array[String]) = {

    val actorSystem = ActorSystem("test")
    val testActor = actorSystem.actorOf(Props(new StashTestActor).withDispatcher("dispatcher-with-deque-mailbox"))

    for (_ <- 1 to 10) testActor ! Execute
  }

}


The issue being that after the call to unstashAll(), the receive() method is never called again and hence the actor's mailbox remains with 9 items queued but unprocessed.

I'd previously been lead to believe that actors always eagerly processed their mailbox, so it took me a while to figure out exactly what was going on here.

There is an obvious workaround, to introduce another message that awakens the actor after calling unstashAll(), thusly:

case object Awaken

class StashTestActor extends Actor with Stash {
  def receive = {

    case Execute =>

      context.become {
        case Execute => stash()
      }

      def doExec() = Future( println("Execute") ) // assume this is a long-running method that returns a future

      doExec() andThen { case _ =>
        context.become(receive)
        unstashAll()
        self ! Awaken
      }

    case Awaken =>  // awakens the actor after unstash operation.

  }
}


But adding this hack seems unnecessary.  Should the actor not commence processing its mailbox after an unstash?

Tested with Akka 2.2.0 and 2.1.4.

Appreciate your thoughts.

Best,

Mark Hatton

√iktor Ҡlang

unread,
Aug 10, 2013, 8:37:06 AM8/10/13
to Akka User List



On Sat, Aug 10, 2013 at 2:26 PM, Mark Hatton <hatto...@gmail.com> wrote:
Hi,

[...] 


      doExec() andThen { case _ =>
        context.become(receive)
        unstashAll()
      }


You are closing over internal actor state (context) and calling it from otuside the actor, this is a big no-no. 

"When using future callbacks, inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This breaks the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time.

Read more about it in the docs for Actors and the JMM"


You want:

doExec() map { _ => Continue } pipeTo self

case Continue =>
  context become receive
  unstashAll()
case Status.Failure(t) => // OMG doExec failed
  throw t // or appropriate

Cheers,


--
Viktor Klang
Director of Engineering

Twitter: @viktorklang

Mark Hatton

unread,
Aug 10, 2013, 8:56:38 AM8/10/13
to akka...@googlegroups.com
When using future callbacks, inside actors you need to carefully avoid closing over the containing actor’s reference

Thanks Victor, I must have missed that part of the documentation ;)  It makes total sense not to share the actor internals.

Using pipeTo worked a charm, final implementation:

class StashTestActor extends Actor with Stash with PipeToSupport {

  def receive = {

    case Execute =>

      context.become {
        case Execute =>
          stash()
        case Continue =>
          context.become(receive)
          unstashAll()

      }

      def doExec() = Future( println("Execute") ) // assume this is a long-running method that returns a future

      doExec() map { _ => Continue } pipeTo self
  }
}


Hope to see you at ScalaExchange in December.

Mark

√iktor Ҡlang

unread,
Aug 10, 2013, 9:03:48 AM8/10/13
to Akka User List


On Aug 10, 2013 3:01 PM, "Mark Hatton" <hatto...@gmail.com> wrote:
>>
>> When using future callbacks, inside actors you need to carefully avoid closing over the containing actor’s reference
>
>
> Thanks Victor, I must have missed that part of the documentation ;)  It makes total sense not to share the actor internals.
>
> Using pipeTo worked a charm, final implementation:
>
> class StashTestActor extends Actor with Stash with PipeToSupport {
>
>   def receive = {
>
>     case Execute =>
>
>       context.become {
>         case Execute =>
>           stash()
>         case Continue =>
>           context.become(receive)
>           unstashAll()
>
>       }
>
>       def doExec() = Future( println("Execute") ) // assume this is a long-running method that returns a future
>
>       doExec() map { _ => Continue } pipeTo self
>   }
> }
>

You will want to handle the failure case as well. If doExec fails.

>
> Hope to see you at ScalaExchange in December.

Absolutely, I'll be there.

Happy hAkking,

Cheers,
V

>
> Mark
>
>
> On Saturday, 10 August 2013 13:37:06 UTC+1, √ wrote:
>>
>>
>>
>>
>> On Sat, Aug 10, 2013 at 2:26 PM, Mark Hatton <hatto...@gmail.com> wrote:
>>>
>>> Hi,
>>
>>
>> [...] 
>>>
>>>
>>>
>>>       doExec() andThen { case _ =>
>>>         context.become(receive)
>>>         unstashAll()
>>>       }
>>
>>
>> You are closing over internal actor state (context) and calling it from otuside the actor, this is a big no-no. 
>>
>> "When using future callbacks, inside actors you need to carefully avoid closing over the containing actor’s reference, i.e. do not call methods or access mutable state on the enclosing actor from within the callback. This breaks the actor encapsulation and may introduce synchronization bugs and race conditions because the callback will be scheduled concurrently to the enclosing actor. Unfortunately there is not yet a way to detect these illegal accesses at compile time.
>>
>> Read more about it in the docs for Actors and the JMM"
>>
>>
>> You want:
>>
>> doExec() map { _ => Continue } pipeTo self
>>
>> case Continue =>
>>   context become receive
>>   unstashAll()
>> case Status.Failure(t) => // OMG doExec failed
>>   throw t // or appropriate
>>
>> Cheers,
>> √
>>
>>
>> --
>> Viktor Klang
>> Director of Engineering
>> Typesafe
>>
>> Twitter: @viktorklang
>

> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://akka.io/faq/
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
> To post to this group, send email to akka...@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/groups/opt_out.
>  
>  

Mark Hatton

unread,
Aug 10, 2013, 9:25:04 AM8/10/13
to akka...@googlegroups.com

You will want to handle the failure case as well. If doExec fails.

Ah yes, I was being unintentionally optimistic there.

For completeness, with error handling:

class StashTestActor extends Actor with Stash with PipeToSupport {
  def receive = {

    case Execute =>

      context.become {
        case Execute =>
          stash()
        case Continue =>
          context.become(receive)
          unstashAll()
        case Status.Failure(t) =>
          Console.err.println(t.toString) // TODO handle error
          self ! Continue

√iktor Ҡlang

unread,
Aug 10, 2013, 11:14:17 AM8/10/13
to Akka User List
The question though is what you gain by having doExec being asynchronous.
If you do the work inside the actor itself then you need not use stash.

Just thinking out loud.

Cheers,

Mark Hatton

unread,
Aug 12, 2013, 7:56:32 AM8/12/13
to akka...@googlegroups.com
Fair question.  The back-story is that "doExec" calls an external system using an asynchronous HTTP client.  The external system has limited capacity so we are using a round-robin router and a pool of actors to limit the concurrency in the caller.

So essentially we're using actors to queue calls to an existing async library.  Can you think of a more elegant solution?

Cheers,

Mark
Reply all
Reply to author
Forward
0 new messages