This is what I am trying to do
- Have a Supervisor monitor an Actor. The Actor should run forever and be restarted if fails
object Coach {
def props(): Props = Props[Coach];
}
class Coach() extends Actor with ActorLogging {
val runner = context.actorOf(Runner.props(new Marathon), "runner")
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 2 seconds) {
case _: RuntimeException => Restart
}
override def receive = LoggingReceive {
case GetSetGo => runner ! GoForIt
case "runner restarted" => println("runner restarted, sending message again")
runner ! GoForIt
}
object Runner {
def props(race: Race) = Props(classOf[Runner], race)
}
class Runner(race: Race) extends Actor with ActorLogging {
import context.dispatcher
@throws[Exception](classOf[Exception])
override def postRestart(reason: Throwable): Unit = context.parent ! "runner restarted"
override def receive: Receive = LoggingReceive {
case GoForIt => {
println("running long job")
for (i <- 1 to 3) {
Thread.sleep(200)
}
throw new RuntimeException("MarathonRunner is tired")
}
case Failure(throwable) => throw throwable
}
}
When I run this, I can see in logs that it crashes as as per SupervisorStrategy it was restarted 1 time and then stopped again
[DEBUG] [05/30/2015 19:23:44.785] [main] [EventStream(akka://race)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 19:23:44.786] [main] [EventStream(akka://race)] Default Loggers started
[DEBUG] [05/30/2015 19:23:44.789] [race-akka.actor.default-dispatcher-3] [akka://race/system] now supervising Actor[akka://race/system/deadLetterListener#-892788249]
[DEBUG] [05/30/2015 19:23:44.791] [race-akka.actor.default-dispatcher-2] [akka://race/system/deadLetterListener] started (akka.event.DeadLetterListener@50b0d70d)
[DEBUG] [05/30/2015 19:23:44.793] [race-akka.actor.default-dispatcher-4] [akka://race/user] now supervising Actor[akka://race/user/coach#513818446]
running long job
[DEBUG] [05/30/2015 19:23:44.802] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] started (com.learner.ahka.runforever.Coach@3db89173)
[DEBUG] [05/30/2015 19:23:44.802] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] started (com.learner.ahka.runforever.Runner@ed6aa7d)
[DEBUG] [05/30/2015 19:23:44.803] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] now supervising Actor[akka://race/user/coach/runner#80737681]
[DEBUG] [05/30/2015 19:23:44.803] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach] received handled message GetSetGo
running long job
[DEBUG] [05/30/2015 19:23:44.804] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] received handled message GoForIt
[ERROR] [05/30/2015 19:23:45.414] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:26)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.learner.ahka.runforever.Runner.aroundReceive(Runner.scala:12)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[DEBUG] [05/30/2015 19:23:45.414] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarting
runner restarted, sending message again
[DEBUG] [05/30/2015 19:23:45.415] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] restarted
[DEBUG] [05/30/2015 19:23:45.416] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] received handled message runner restarted
running long job
[DEBUG] [05/30/2015 19:23:45.416] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] received handled message GoForIt
[ERROR] [05/30/2015 19:23:46.026] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Runner$$anonfun$receive$1.applyOrElse(Runner.scala:26)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:62)
at akka.event.LoggingReceive.apply(LoggingReceive.scala:50)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.event.LoggingReceive.applyOrElse(LoggingReceive.scala:50)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at com.learner.ahka.runforever.Runner.aroundReceive(Runner.scala:12)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[DEBUG] [05/30/2015 19:23:46.032] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] stopped
Side Note: I do not know why in normal case Runner received message 2 times(running long job) before crashing
However, as per
guidance from Konrad Malawski, I tried to delegate the actual long running task to Future. The problem that I see is
- When Actor starts the first time, Future kicks off and crashes as per expectation and handled correctly by supervisor
- After restart however, the Runner receives the message, logs statement, but Future block in not executed. we can assert that from below
case object GetSetGo
object Coach {
def props(): Props = Props[Coach];
}
class Coach() extends Actor with ActorLogging {
val runner = context.actorOf(Runner.props(new Marathon), "runner")
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 1, withinTimeRange = 2 seconds) {
case _: RuntimeException => Restart
}
override def receive = LoggingReceive {
case GetSetGo => runner ! "GoForIt"
case "runner restarted" => println("runner restarted, sending message again")
runner ! "GoForIt"
}
}
object Runner {
def props(race: Race) = Props(classOf[Runner], race)
}
class Runner(race: Race) extends Actor with ActorLogging {
import context.dispatcher
@throws[Exception](classOf[Exception])
override def postRestart(reason: Throwable): Unit = context.parent ! "runner restarted"
override def receive: Receive = LoggingReceive {
case "GoForIt" =>
println("I am starting to run")
race.start pipeTo self
case Failure(throwable) => throw throwable
}
}
trait Race {
def start: Future[Any]
}
class Marathon extends Race {
import scala.concurrent.ExecutionContext.Implicits.global
override def start: Future[Any] = future
val future = Future {
println("running long job")
for (i <- 1 to 3) {
Thread.sleep(200)
}
throw new RuntimeException("MarathonRunner is tired")
}
}
What I see in logs now is
[DEBUG] [05/30/2015 19:25:57.278] [main] [EventStream(akka://race)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/30/2015 19:25:57.278] [main] [EventStream(akka://race)] Default Loggers started
[DEBUG] [05/30/2015 19:25:57.286] [race-akka.actor.default-dispatcher-3] [akka://race/system] now supervising Actor[akka://race/system/deadLetterListener#216015246]
[DEBUG] [05/30/2015 19:25:57.288] [race-akka.actor.default-dispatcher-4] [akka://race/system/deadLetterListener] started (akka.event.DeadLetterListener@6639712)
[DEBUG] [05/30/2015 19:25:57.290] [race-akka.actor.default-dispatcher-2] [akka://race/user] now supervising Actor[akka://race/user/coach#-1729867088]
running long job
[DEBUG] [05/30/2015 19:25:57.303] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] started (com.learner.ahka.runforever.Coach@63249c5)
[DEBUG] [05/30/2015 19:25:57.303] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] started (com.learner.ahka.runforever.Runner@7a261495)
[DEBUG] [05/30/2015 19:25:57.303] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] now supervising Actor[akka://race/user/coach/runner#-797067975]
[DEBUG] [05/30/2015 19:25:57.304] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach] received handled message GetSetGo
I am starting to run
[DEBUG] [05/30/2015 19:25:57.305] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] received handled message GoForIt
[DEBUG] [05/30/2015 19:25:57.914] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] received handled message Failure(java.lang.RuntimeException: MarathonRunner is tired)
[ERROR] [05/30/2015 19:25:57.919] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[DEBUG] [05/30/2015 19:25:57.919] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] restarting
runner restarted, sending message again
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] restarted
I am starting to run
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach] received handled message runner restarted
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] received handled message GoForIt
[DEBUG] [05/30/2015 19:25:57.922] [race-akka.actor.default-dispatcher-4] [akka://race/user/coach/runner] received handled message Failure(java.lang.RuntimeException: MarathonRunner is tired)
[ERROR] [05/30/2015 19:25:57.923] [race-akka.actor.default-dispatcher-2] [akka://race/user/coach/runner] MarathonRunner is tired
java.lang.RuntimeException: MarathonRunner is tired
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:22)
at com.learner.ahka.runforever.Marathon$$anonfun$1.apply(Race.scala:17)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[DEBUG] [05/30/2015 19:25:57.925] [race-akka.actor.default-dispatcher-3] [akka://race/user/coach/runner] stopped
As we could see
- the statement in Future (running long job) is printed once, before crashing
- the statement in Runner(I am starting to run)