Future not executed when Restarted Actor receives message

43 views
Skip to first unread message

Harit Himanshu

unread,
May 30, 2015, 10:45:02 PM5/30/15
to akka...@googlegroups.com
Hey there!

This is what I am trying to do

- Have a Supervisor monitor an Actor. The Actor should run forever and be restarted if fails

object Coach {
  def props(): Props = Props[Coach];
}

class Coach() extends Actor with ActorLogging {

  val runner = context.actorOf(Runner.props(new Marathon), "runner")

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 2 seconds) {
    case _: RuntimeException => Restart
  }

  override def receive = LoggingReceive {
    case GetSetGo => runner ! GoForIt
    case "runner restarted" => println("runner restarted, sending message again")
      runner ! GoForIt
  }

object Runner {
  def props(race: Race) = Props(classOf[Runner], race)
}

class Runner(race: Race) extends Actor with ActorLogging {

  import context.dispatcher


  @throws[Exception](classOf[Exception])
  override def postRestart(reason: Throwable): Unit = context.parent ! "runner restarted"

  override def receive: Receive = LoggingReceive {
    case GoForIt => {
      println("running long job")
      for (i <- 1 to 3) {
        Thread.sleep(200)
      }
      throw new RuntimeException("MarathonRunner is tired")
    }
    case Failure(throwable) => throw throwable
  }
}

When I run this, I can see in logs that it crashes as as per SupervisorStrategy it was restarted 1 time and then stopped again

[DEBUG] [05/30/2015 19:23:44.785] [main] [EventStream(akka://race)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 19:23:44.786] [main] [EventStream(akka://race)] Default Loggers started
[DEBUG] [05/30/2015 19:23:44.789] [race-akka.actor.default-dispatcher-3] [akka://race/system] now supervising Actor[akka://race/system/deadLetterListener#-892788249]
[DEBUG] [05/30/2015 19:23:44.791] [race-akka.actor.default-dispatcher-2] [akka://race/system/deadLetterListener] started (akka.event.DeadLetterListener@50b0d70d)
[DEBUG] [05/30/2015 19:23:44.793] [race-akka.actor.default-dispatcher-4] [akka://race/user] now supervising Actor[akka://race/user/coach#513818446]
running long job
[DEBUG] [05/30/2015 19:23:44.802] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] started (com.learner.ahka.runforever.Coach@3db89173)
[DEBUG] [05/30/2015 19:23:44.802] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] started (com.learner.ahka.runforever.Runner@ed6aa7d)
[DEBUG] [05/30/2015 19:23:44.803] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] now supervising Actor[akka://race/user/coach/runner#80737681]
[DEBUG] [05/30/2015 19:23:44.803] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] received handled message GetSetGo
running long job
[DEBUG] [05/30/2015 19:23:44.804] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] received handled message GoForIt
[ERROR] [05/30/2015 19:23:45.414] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:26)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.learner.ahka.runforever.Runner.aroundReceive(Runner.scala:12)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[DEBUG] [05/30/2015 19:23:45.414] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarting
runner restarted, sending message again
[DEBUG] [05/30/2015 19:23:45.415] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarted
[DEBUG] [05/30/2015 19:23:45.416] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] received handled message runner restarted
running long job
[DEBUG] [05/30/2015 19:23:45.416] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] received handled message GoForIt
[ERROR] [05/30/2015 19:23:46.026] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:26)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.learner.ahka.runforever.Runner.aroundReceive(Runner.scala:12)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[DEBUG] [05/30/2015 19:23:46.032] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] stopped

Side Note: I do not know why in normal case Runner received message 2 times(running long job) before crashing

However, as per guidance from Konrad Malawski, I tried to delegate the actual long running task to Future. The problem that I see is
- When Actor starts the first time, Future kicks off and crashes as per expectation and handled correctly by supervisor
- After restart however, the Runner receives the message, logs statement, but Future block in not executed. we can assert that from below 

case object GetSetGo

object Coach {
  def props(): Props = Props[Coach];
}

class Coach() extends Actor with ActorLogging {

  val runner = context.actorOf(Runner.props(new Marathon), "runner")

  override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 2 seconds) {
    case _: RuntimeException => Restart
  }

  override def receive = LoggingReceive {
    case GetSetGo => runner ! "GoForIt"
    case "runner restarted" => println("runner restarted, sending message again")
      runner ! "GoForIt"
  }
}

object Runner {
  def props(race: Race) = Props(classOf[Runner], race)
}

class Runner(race: Race) extends Actor with ActorLogging {

  import context.dispatcher


  @throws[Exception](classOf[Exception])
  override def postRestart(reason: Throwable): Unit = context.parent ! "runner restarted"

  override def receive: Receive = LoggingReceive {
    case "GoForIt" =>
      println("I am starting to run")
      race.start pipeTo self
    case Failure(throwable) => throw throwable
  }
}

trait Race {
  def start: Future[Any]
}

class Marathon extends Race {

  import scala.concurrent.ExecutionContext.Implicits.global

  override def start: Future[Any] = future

  val future = Future {
    println("running long job")
    for (i <- 1 to 3) {
      Thread.sleep(200)
    }
    throw new RuntimeException("MarathonRunner is tired")
  }
}

What I see in logs now is  

[DEBUG] [05/30/2015 19:25:57.278] [main] [EventStream(akka://race)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 19:25:57.278] [main] [EventStream(akka://race)] Default Loggers started
[DEBUG] [05/30/2015 19:25:57.286] [race-akka.actor.default-dispatcher-3] [akka://race/system] now supervising Actor[akka://race/system/deadLetterListener#216015246]
[DEBUG] [05/30/2015 19:25:57.288] [race-akka.actor.default-dispatcher-4] [akka://race/system/deadLetterListener] started (akka.event.DeadLetterListener@6639712)
[DEBUG] [05/30/2015 19:25:57.290] [race-akka.actor.default-dispatcher-2] [akka://race/user] now supervising Actor[akka://race/user/coach#-1729867088]
running long job
[DEBUG] [05/30/2015 19:25:57.303] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] started (com.learner.ahka.runforever.Coach@63249c5)
[DEBUG] [05/30/2015 19:25:57.303] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] started (com.learner.ahka.runforever.Runner@7a261495)
[DEBUG] [05/30/2015 19:25:57.303] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] now supervising Actor[akka://race/user/coach/runner#-797067975]
[DEBUG] [05/30/2015 19:25:57.304] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] received handled message GetSetGo
I am starting to run
[DEBUG] [05/30/2015 19:25:57.305] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] received handled message GoForIt
[DEBUG] [05/30/2015 19:25:57.914] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] received handled message Failure(java.lang.RuntimeException: MarathonRunner is tired)
[ERROR] [05/30/2015 19:25:57.919] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[DEBUG] [05/30/2015 19:25:57.919] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] restarting
runner restarted, sending message again
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] restarted
I am starting to run
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] received handled message runner restarted
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] received handled message GoForIt
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] received handled message Failure(java.lang.RuntimeException: MarathonRunner is tired)
[ERROR] [05/30/2015 19:25:57.923] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[DEBUG] [05/30/2015 19:25:57.925] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] stopped

As we could see 
- the statement in Future (running long job) is printed once, before crashing
- the statement in Runner(I am starting to run)
- The Future is not executed after Runner restarted 

Please help me understand what am I doing wrong

Thank you very much
Reply all
Reply to author
Forward
0 new messages