Misunderstand about Resume strategy

54 views
Skip to first unread message

Kilic Ali-Firat

unread,
Jan 12, 2017, 8:31:56 AM1/12/17
to Akka User List
Hello eveyone, 

I'm playing with the failure strategy in Akka and I wrote a sample test to check if I can recover a state of an actor like below :

case object Get
case object LetItCrash


object ActorConsumer {
 
def props = Props(ActorConsumer())
}


case class ActorConsumer() extends Actor with ActorLogging {


 
override def supervisorStrategy: SupervisorStrategy =
   
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minutes) {
   
case _ : Exception => Resume
 
}


 
var x : Int = 0


 
override def preStart = {
   
super.preStart
    log
.error(
     
"""
        | **********************************************************************
        | In PRE START of {}
        | value of x -> {}
        | **********************************************************************
      """
.stripMargin, self.path.name, this.x)


 
}


 
override def preRestart(reason: Throwable, message: Option[Any]) = {
   
super.preRestart(reason, message)
    log
.error("Before restart x value = " + x)
    log
.error(
     
"""
        | **********************************************************************
        | IN PRE RESTART of {}
        | Reason of restart: {}
        | message option : {}
        | Value of x : {}
        | **********************************************************************
      """
.stripMargin, self.path.name, reason, message, this.x)
 
}


 
override def postRestart(reason: Throwable) = {
   
super.postRestart(reason)
    log
.error(
     
"""
        | **********************************************************************
        | IN POST RESTART of {}
        | Reason of restart: {}
        | Value of x : {}
        | **********************************************************************
      """
.stripMargin, self.path.name, reason, this.x)
 
}


 
override def postStop = {
   
super.postStop
    log
.debug(
     
"""
        | **********************************************************************
        | In POST STOP of {}
        | **********************************************************************
      """
.stripMargin, self.path.name)
    context
.stop(self)
 
}




 
def receive = {
   
case y : Int =>
      println
("receive = " + y)
      x
= x +  y
      println
("x = " + this.x)
   
case LetItCrash => throw new Exception
   
case Get => sender() ! x
 
}
}

Below the test :


class TestResumeActor
   
extends TestKit(ActorSystem("foo"))
   
with ImplicitSender
   
with WordSpecLike
   
with Matchers
   
with BeforeAndAfterAll {


 
override def afterAll() = {
   
TestKit.shutdownActorSystem(system)
 
}


 
"An actor consumer" must {
   
"get 1 even exceptions are throws" in {
      val actorRef
= TestActorRef[ActorConsumer]
      actorRef
! 1
      actorRef
! LetItCrash
      expectMsg
(1)
      actorRef
! Get
      expectMsg
(1)
   
}
 
}
}

Here the test stack trace :

> testOnly test.TestResumeActor
[ERROR] [01/12/2017 14:27:58.848] [pool-36-thread-1-ScalaTest-running-TestResumeActor] [akka://foo/user/$$a]
 
**********************************************************************
 
In PRE START of $$a
 value of x
-> 0
 
**********************************************************************
     
receive
= 1
x
= 1
[ERROR] [01/12/2017 14:27:58.857] [foo-akka.actor.default-dispatcher-3] [akka://foo/user/$$a] null
java
.lang.Exception
 at test
.ActorConsumer$$anonfun$receive$1.applyOrElse(Example.scala:97)
 at akka
.actor.Actor$class.aroundReceive(Actor.scala:484)
 at test
.ActorConsumer.aroundReceive(Example.scala:31)
 at akka
.actor.ActorCell.receiveMessage(ActorCell.scala:526)
 at akka
.actor.ActorCell.invoke(ActorCell.scala:495)
 at akka
.testkit.CallingThreadDispatcher.process$1(CallingThreadDispatcher.scala:250)
 at akka
.testkit.CallingThreadDispatcher.runQueue(CallingThreadDispatcher.scala:283)
 at akka
.testkit.CallingThreadDispatcher.dispatch(CallingThreadDispatcher.scala:207)
 at akka
.actor.dungeon.Dispatch$class.sendMessage(Dispatch.scala:144)
 at akka
.actor.ActorCell.sendMessage(ActorCell.scala:374)
 at akka
.actor.Cell$class.sendMessage(ActorCell.scala:295)
 at akka
.actor.ActorCell.sendMessage(ActorCell.scala:374)
 at akka
.actor.LocalActorRef.$bang(ActorRef.scala:406)
 at test
.TestResumeActor$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcI$sp(Example.scala:119)
 at test
.TestResumeActor$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(Example.scala:116)
 at test
.TestResumeActor$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(Example.scala:116)
 at org
.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
 at org
.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
 at org
.scalatest.Transformer.apply(Transformer.scala:22)
 at org
.scalatest.Transformer.apply(Transformer.scala:20)
 at org
.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:1103)
 at org
.scalatest.TestSuite$class.withFixture(TestSuite.scala:193)
 at test
.TestResumeActor.withFixture(Example.scala:102)
 at org
.scalatest.WordSpecLike$class.invokeWithFixture$1(WordSpecLike.scala:1100)
 at org
.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1112)
 at org
.scalatest.WordSpecLike$$anonfun$runTest$1.apply(WordSpecLike.scala:1112)
 at org
.scalatest.SuperEngine.runTestImpl(Engine.scala:297)
 at org
.scalatest.WordSpecLike$class.runTest(WordSpecLike.scala:1112)
 at test
.TestResumeActor.runTest(Example.scala:102)
 at org
.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1171)
 at org
.scalatest.WordSpecLike$$anonfun$runTests$1.apply(WordSpecLike.scala:1171)
 at org
.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:404)
 at org
.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:392)
 at scala
.collection.immutable.List.foreach(List.scala:381)
 at org
.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:392)
 at org
.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:381)
 at org
.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:418)
 at org
.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:392)
 at scala
.collection.immutable.List.foreach(List.scala:381)
 at org
.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:392)
 at org
.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:387)
 at org
.scalatest.SuperEngine.runTestsImpl(Engine.scala:469)
 at org
.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1171)
 at test
.TestResumeActor.runTests(Example.scala:102)
 at org
.scalatest.Suite$class.run(Suite.scala:1146)
 at test
.TestResumeActor.org$scalatest$WordSpecLike$$super$run(Example.scala:102)
 at org
.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1217)
 at org
.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1217)
 at org
.scalatest.SuperEngine.runImpl(Engine.scala:533)
 at org
.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1217)
 at test
.TestResumeActor.org$scalatest$BeforeAndAfterAll$$super$run(Example.scala:102)
 at org
.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
 at org
.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
 at test
.TestResumeActor.run(Example.scala:102)
 at org
.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
 at org
.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:470)
 at sbt
.TestRunner.runTest$1(TestFramework.scala:76)
 at sbt
.TestRunner.run(TestFramework.scala:85)
 at sbt
.TestFramework$$anon$2$$anonfun$$init$$1$$anonfun$apply$8.apply(TestFramework.scala:202)
 at sbt
.TestFramework$$anon$2$$anonfun$$init$$1$$anonfun$apply$8.apply(TestFramework.scala:202)
 at sbt
.TestFramework$.sbt$TestFramework$$withContextLoader(TestFramework.scala:185)
 at sbt
.TestFramework$$anon$2$$anonfun$$init$$1.apply(TestFramework.scala:202)
 at sbt
.TestFramework$$anon$2$$anonfun$$init$$1.apply(TestFramework.scala:202)
 at sbt
.TestFunction.apply(TestFramework.scala:207)
 at sbt
.Tests$.sbt$Tests$$processRunnable$1(Tests.scala:239)
 at sbt
.Tests$$anonfun$makeSerial$1.apply(Tests.scala:245)
 at sbt
.Tests$$anonfun$makeSerial$1.apply(Tests.scala:245)
 at sbt
.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:44)
 at sbt
.std.Transform$$anon$3$$anonfun$apply$2.apply(System.scala:44)
 at sbt
.std.Transform$$anon$4.work(System.scala:63)
 at sbt
.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
 at sbt
.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
 at sbt
.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
 at sbt
.Execute.work(Execute.scala:237)
 at sbt
.Execute$$anonfun$submit$1.apply(Execute.scala:228)
 at sbt
.Execute$$anonfun$submit$1.apply(Execute.scala:228)
 at sbt
.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
 at sbt
.CompletionService$$anon$2.call(CompletionService.scala:28)
 at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java
.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java
.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java
.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java
.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java
.lang.Thread.run(Thread.java:745)


[ERROR] [01/12/2017 14:27:58.858] [foo-akka.actor.default-dispatcher-3] [akka://foo/user/$$a] Before restart x value = 1
[ERROR] [01/12/2017 14:27:58.858] [foo-akka.actor.default-dispatcher-3] [akka://foo/user/$$a]
 
**********************************************************************
 IN PRE RESTART of $$a
 
Reason of restart: java.lang.Exception
 message option
: Some(LetItCrash)
 
Value of x : 1
 
**********************************************************************
     
[ERROR] [01/12/2017 14:27:58.860] [foo-akka.actor.default-dispatcher-3] [akka://foo/user/$$a]
 
**********************************************************************
 
In PRE START of $$a
 value of x
-> 0
 
**********************************************************************
     
[ERROR] [01/12/2017 14:27:58.860] [foo-akka.actor.default-dispatcher-3] [akka://foo/user/$$a]
 
**********************************************************************
 IN POST RESTART of $$a
 
Reason of restart: java.lang.Exception
 
Value of x : 0
 
**********************************************************************
     
[INFO] [01/12/2017 14:27:58.866] [foo-akka.actor.default-dispatcher-2] [akka://foo/user/$$a] Message [test.Get$] from Actor[akka://foo/system/testActor-1#1221684879] to TestActor[akka://foo/user/$$a] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[info] TestResumeActor:
[info] An actor consumer
[info] - must get 1 even exceptions are throws *** FAILED ***
[info]   java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg while waiting for 1
[info]   at scala.Predef$.assert(Predef.scala:165)
[info]   at akka.testkit.TestKitBase$class.expectMsg_internal(TestKit.scala:387)
[info]   at akka.testkit.TestKitBase$class.expectMsg(TestKit.scala:364)
[info]   at akka.testkit.TestKit.expectMsg(TestKit.scala:814)
[info]   at test.TestResumeActor$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcI$sp(Example.scala:122)
[info]   at test.TestResumeActor$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(Example.scala:116)
[info]   at test.TestResumeActor$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(Example.scala:116)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   ...
[info] Run completed in 3 seconds, 444 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0




Using the Resume recover strategy, I was expected to a get an integer equals to 1. There are two things that I didn't understand well :
  1. Is the used recover strategy should keep the state in the actor 
  2. In the stack trace, the last messages (which failed the test) is not delivered.but my actor should be alive when sending him the Get message. So why this message is not delivered ?
Thank you very much guys !




Konrad Malawski

unread,
Jan 12, 2017, 8:41:11 AM1/12/17
to Akka User List

On Thu, Jan 12, 2017 at 2:31 PM, Kilic Ali-Firat <kilic.a...@gmail.com> wrote:

 
override def supervisorStrategy: SupervisorStrategy =
   
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minutes) {
   
case _ : Exception => Resume
 
}

The supervision strategy must be in the parent of this actor.
You're not setting your "own" strategy, your parent is handling it for it.

What you've done here has effect for any children of this Actor, not for itself.
Read up about spawning child actors.


--
Cheers,
Konrad 'ktoso' Malawski

Kilic Ali-Firat

unread,
Jan 12, 2017, 12:41:23 PM1/12/17
to Akka User List
Ok after reading the documentation which says :

The supervisor itself is supervised by the top-level actor provided by the ActorSystem, which has the default policy to restart in case of all Exception cases (with the notable exceptions of ActorInitializationException and ActorKilledException). Since the default directive in case of a restart is to kill all children, we expected our poor child not to survive this failure.

So I change my code to do not override preStart, postStart and let the top-level actor use the default  strategy. Now I have this code :

case class ActorConsumer() extends Actor with ActorLogging {





 
var x : Int = 0


 
def receive = {

   
case y : Int =>
      println
("receive = " + y)
      x
= x +  y
      println
("x = " + this.x)
   
case LetItCrash =>

      sender
! x
     
throw new Exception

   
case Get => sender() ! x
 
}
}


class TestResumeActor
   
extends TestKit(ActorSystem("foo"))
   
with ImplicitSender
   
with WordSpecLike
   
with Matchers
   
with BeforeAndAfterAll {


 
override def afterAll() = {
   
TestKit.shutdownActorSystem(system)
 
}


 
"An actor consumer" must {
   
"get 1 even exceptions are throws" in {
      val actorRef
= TestActorRef[ActorConsumer]
      actorRef
! 1
      actorRef
! LetItCrash
      expectMsg
(1)
      actorRef
! Get
      expectMsg
(1)
   
}
 
}
}


The test fails again and I didn't defined a fail over strategy like in my first post. Also, because I'm in a test environment, do I need to do something special when create a child of actor system ?
Reply all
Reply to author
Forward
0 new messages