Actors message retry and Fault handling

2,692 views
Skip to first unread message

Rune Barikmo

unread,
May 19, 2012, 11:21:27 AM5/19/12
to Akka User List
Hi,
I'm new to akka and I have some questions regarding supervisors and
fault tolerance.
I'm trying to test out some retry functionality on a pool of workers
using a round robin router, but I'm not sure how I can achieve the
behaviour I want.
The behaviour I want is the following:
1) If a worker request fails, resend the message to the failed worker
until maxNrOfRetries is reached as specified by supervisorStrategy
2) If the worker has failed "maxNrOfRetries" times (worker terminated)
I want to stop all the other routees in the router as I would need the
results from all the workers to compute the final answer.

I have attached some test code below where I have a supervisor that
creates a router with some worker threads. I override the router
supervisor strategy and set the maxNrOfRetries to 2. I handle the
retry in the preRestart hook of the worker. Is this the right way to
handle retries? Seems to work at least.

The part that I'm missing is a way to terminate all the routees when 1
(or more) worker has been terminated. I have a timeout that waits for
the futures to complete, but in cases of failure I want to throw an
exception without waiting for the timeout if just one of the workers
has terminated. I know I can can use: sender !
akka.actor.Status.Failure(ex) to complete the future with an
exception. But I only really want to do this after I have retried for
the last time.

I guess I could implement this using a retry counter inside the Worker
actor and just send: sender ! akka.actor.Status.Failure(ex) when max
retries has been reached. This would throw an exception in the
supervisor thread would it not?

Whats the best way of implementing this behaviour in Akka?



// Supervisor actor
override def supervisorStrategy() = OneForOneStrategy(maxNrOfRetries =
2) {
case _:IOException => Restart
}

try {
val router = context.actorOf(Props(new
Worker).withRouter(RoundRobinRouter(4,supervisorStrategy =
supervisorStrategy())))
val futures = for (i <- 1 to 10) yield router.ask(22)(2
seconds).mapTo[Int]
val result = Future.fold(futures)(0)(_+_) // should not wait for
all the results if one of the workers has terminated
Await.result(result, 10 seconds) // should not wait for
all the results if one of the workers has terminated
}
catch {
case e: Exception => println("failure: " + e.getMessage)
}

// Worker actor
class Worker extends Actor {
def receive = {
case s : String => println("ok request: " + s); sender ! s.toInt
case i: Int => throw new IOException("failed")
}

override def preRestart(reason: Throwable, message: Option[Any]) {
println("in preRestart hook")
message match {
case Some(msg) => self ! msg
case None =>
}
}
}

Any advice is appreaciated.

Cheers
Rune

Rune Barikmo

unread,
May 21, 2012, 2:47:49 PM5/21/12
to Akka User List
Hi again,




I solved the problem by handling everything in the worker and omitting
the supervisor functionality completely. In this test sample code you
can see that I catch all the exceptions inside the worker actor it
self.
When I hit maximum retries I send akka.actor.Status.Failure(exception)
back to the parent actor which results in IOException at the parent on
not a TimeoutException("Futures timed out after [10000] milliseconds")

 val maxNumRetries = 4

 case class Retry(count : Int,message: Int,reason:
Throwable,parent:ActorRef)

 class Worker extends Actor {

   def receive = {
     case s : String => println("ok request: " + s)
     case i: Int =>  {
       try {
         handleMessage(i)
       } catch {
         case e: Exception => self ! Retry(1,i,e,sender)
       }
     }
     case r: Retry => {
       println("Retrying count: " + r.count)
       if (r.count < maxNumRetries) {
         try {
           handleMessage(r.message)
         } catch {
           case e: Exception => self ! Retry(r.count
+1,r.message,e,r.parent)
         }
       } else {
         println("Giving up")
         r.parent ! akka.actor.Status.Failure(r.reason)
       }
     }
   }

   private def handleMessage(i: Int) {
     throw new IOException("failed")
   }
 }


This works, but I would like to use the supervisor functionality in
Akka to handle this and not implement the retryCount and "last resort"
functionality inside the Worker actor. As I showed in the last post I
can easily perform retries using the preRestart hook, but the problem
is to complete the parent actor with an exception (thrown from the
child) after reaching max retries count.

Any ideas on how to solve this in a better way?

Cheers
Rune

Patrik Nordwall

unread,
May 21, 2012, 3:15:12 PM5/21/12
to akka...@googlegroups.com
Hi Rune

On Sat, May 19, 2012 at 5:21 PM, Rune Barikmo <rune.b...@gmail.com> wrote:
Hi,
I'm new to akka and I have some questions regarding supervisors and
fault tolerance.
I'm trying to test out some retry functionality on a pool of workers
using a round robin router, but I'm not sure how I can achieve the
behaviour I want.
The behaviour I want is the following:
1) If a worker request fails, resend the message to the failed worker
until maxNrOfRetries is reached as specified by supervisorStrategy
2) If the worker has failed "maxNrOfRetries" times (worker terminated)
I want to stop all the other routees in the router as I would need the
results from all the workers to compute the final answer.

I have attached some test code below where I have a supervisor that
creates a router with some worker threads. I override the router
supervisor strategy and set the maxNrOfRetries to 2. I handle the
retry in the preRestart hook of the worker. Is this the right way to
handle retries? Seems to work at least.

You need to use forward so that replies goes to the original sender.
Those messages will be placed last in the mailbox, so I assume that ordering or the messages are not of importance for you.
 

The part that I'm missing is a way to terminate all the routees when 1
(or more) worker has been terminated. I have a timeout that waits for
the futures to complete, but in cases of failure I want to throw an
exception without waiting for the timeout if just one of the workers
has terminated. I know I can can use: sender !
akka.actor.Status.Failure(ex) to complete the future with an
exception. But I only really want to do this after I have retried for
the last time.

I guess I could implement this using a retry counter inside the Worker
actor and just send: sender ! akka.actor.Status.Failure(ex) when max
retries has been reached. This would throw an exception in the
supervisor thread would it not?

Whats the best way of implementing this behaviour in Akka?

Here is one way of doing it: https://gist.github.com/2764019
Please ask if something needs clarification.

Rune

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.




--

Patrik Nordwall
Typesafe The software stack for applications that scale
Twitter: @patriknw


Rune Barikmo

unread,
May 21, 2012, 4:12:39 PM5/21/12
to Akka User List
Hi Patrik,

Your example implements the exact behavior that I was after.
The main bits that I was missing was the monitoring of the routees and
using "forward" instead of "!"

Thanks a lot for your help.

Cheers
Rune

On May 21, 9:15 pm, Patrik Nordwall <patrik.nordw...@gmail.com> wrote:
> Hi Rune
>
> Typesafe <http://typesafe.com/> -  The software stack for applications that
> scale
> Twitter: @patriknw

Jonas Boner

unread,
May 21, 2012, 4:20:59 PM5/21/12
to akka...@googlegroups.com

Nice example Patrik.
Why not add to the docs?

Patrik Nordwall

unread,
May 22, 2012, 1:32:23 AM5/22/12
to akka...@googlegroups.com
Glad that it helped you.

I don't think it's worth a place in docs, doesn't add any new things that we haven't described. I can post it at "let it crash".

/Patrik

Patrik Nordwall

unread,
May 22, 2012, 2:38:57 AM5/22/12
to akka...@googlegroups.com

Roland Kuhn

unread,
May 22, 2012, 3:11:31 AM5/22/12
to akka...@googlegroups.com, akka...@googlegroups.com
nice one!

Jonas Bonér

unread,
May 22, 2012, 8:54:48 AM5/22/12
to akka...@googlegroups.com
Great blog post. Thanks. 
Jonas Bonér
CTO

Typesafe - The software stack for applications that scale
Phone: +46 733 777 123
Twitter: @jboner
Reply all
Reply to author
Forward
0 new messages