Cluster Sharding Supervision Strategy

522 views
Skip to first unread message

Chris Carter

unread,
Sep 16, 2014, 2:37:04 PM9/16/14
to akka...@googlegroups.com
Hi there,

We have an Akka Cluster which is using Cluster Sharding and Persistence to create a series of actors which are sharded across the cluster, and restored via the usual Persistence mechanism.  In certain cases we might see a command come in which is not validated properly and persists an Event which throws an exception upon recover, which then causes the Sharded actor to restart and attempt recover again, which then causes the exception, leading us into a spiraling loop of death.  Ideally we'll individually handle all possible exceptions during Event recovery and properly validate Commands, but while we're in development we're still working on that, so I'd like to create a supervisor strategy for that particular Shard type which will bail on recovery after a few tries.

How do I go about this with Graph Sharding?  Right now it's being instantiated from the ActorSystem itself, do I have to create a specific guardian actor for particular Shard systems? Or do I just have to set the default supervisor strategy for the whole system and rely on the Shard actors picking that up?

Thanks for any hints you guys can provide!

Chris

Patrik Nordwall

unread,
Sep 17, 2014, 3:06:55 AM9/17/14
to akka...@googlegroups.com
Hi Chris,

Currently it is not possible to define the supervision strategy for the shard entries.
Your request is valid for consideration and I have created an issue: https://github.com/akka/akka/issues/15901
It is not completely trivial, because what does it mean to stop an entry actor when using cluster sharding? It will be started up again when next message arrives.

Exceptions during recovery means that the data is corrupt, and that is supposed to stop the actor. Are you sure that it is restarted (in actor lifecycle terminology)? Are you using latest 2.3.6 version? However, it will be started up again when next message arrives, and then you will have same recovery failure.

What you can do right now is to split up your entry actor in two actors, one parent supervisor actor, with appropriate supervision strategy, and the real entry actor as child. The parent actor would forward all messages to the child.

Regards,
Patrik

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Chris Carter

unread,
Sep 17, 2014, 11:59:43 AM9/17/14
to akka...@googlegroups.com
Hi Patrick,

Thanks for the reply.  Yes, these are considerations we're working through and absolutely a major point of design that we need to work out.  My primary reason for asking the question was to identify where we can provide logic for identifying and dealing with the issue when it arises - first in development as we're ferreting out possible failure scenarios, and then in production in the event we run into corrupt data unexpectedly.  Ideally that won't happen, but sometimes things get missed, in which case I'd rather the failure scenario be stop and notify rather than infinite recovery attempts in a death loop.  For now, I did end up going ahead with the architecture you describe - a sharded supervisor actor and a persistent child actor.

I just checked our dependencies, and it looks like we're on 2.3.5.  I'll try updating to 2.3.6 and see if that helps at all.  I was assuming it was restarted based on what I saw in the logs and what I understand of the actor lifecycle - we hadn't sent any new messages to the actor, but it was stuck in a recovery loop where it was perpetually trying to recover the same message over and over again (ie, the same failure was being logged 1000s of times in a few seconds).  Does that make sense?

Either way - it sounds like the approach we settled on for now (the supervisor + child) is the way to go, so we'll stick with that.  Thanks a lot for the reply and advice!

Chris

Patrik Nordwall

unread,
Sep 17, 2014, 1:18:11 PM9/17/14
to akka...@googlegroups.com
On Wed, Sep 17, 2014 at 5:59 PM, Chris Carter <ch...@carterventures.com> wrote:
Hi Patrick,

Thanks for the reply.  Yes, these are considerations we're working through and absolutely a major point of design that we need to work out.  My primary reason for asking the question was to identify where we can provide logic for identifying and dealing with the issue when it arises - first in development as we're ferreting out possible failure scenarios, and then in production in the event we run into corrupt data unexpectedly.  Ideally that won't happen, but sometimes things get missed, in which case I'd rather the failure scenario be stop and notify rather than infinite recovery attempts in a death loop.

Yes, an infinite recovery loop is not acceptable.
 
  For now, I did end up going ahead with the architecture you describe - a sharded supervisor actor and a persistent child actor.

I just checked our dependencies, and it looks like we're on 2.3.5.  I'll try updating to 2.3.6 and see if that helps at all.  I was assuming it was restarted based on what I saw in the logs and what I understand of the actor lifecycle - we hadn't sent any new messages to the actor, but it was stuck in a recovery loop where it was perpetually trying to recover the same message over and over again (ie, the same failure was being logged 1000s of times in a few seconds).  Does that make sense?

It makes sense. I will try it tomorrow and report back. You might have found a bug. I expected that it would be stopped if recovery failed.

/Patrik

Chris Carter

unread,
Sep 17, 2014, 2:12:59 PM9/17/14
to akka...@googlegroups.com
If it helps, here's a simple test case recreating the scenario as we're experiencing it.  The environment we're running is:
  • Cassandra persistence provider 0.3.3
  • Scala 2.11
  • Akka 2.3.5
  • Java 8 (have tested on 7 too)

When I run the test below, I see: "Recovered 2933 times".


import java.util.concurrent.atomic.AtomicInteger

import akka.actor.{ActorLogging, ActorSystem, Props}
import akka.persistence.PersistentActor
import com.typesafe.config.ConfigFactory
import org.scalatest.FunSuite

case class MyCommand(value: Option[String])
case class MyEvent(value: Option[String])

object RecoveryMetric {
  val count
= new AtomicInteger(0)

 
def inc() = count.incrementAndGet()
}

class RecoverMe extends PersistentActor with ActorLogging {


  val receiveCommand
: Receive = {
   
case cmd: MyCommand => persist(MyEvent(cmd.value)) {
     
event => log.info("Received: " + event.value.get)
   
}
 
}

  val receiveRecover
: Receive = {
   
case event: MyEvent => {
     
RecoveryMetric.inc()
      log
.info("Recovered: " + event.value.get)
   
}
 
}

 
override def persistenceId: String = "my.test.id"
}


class RecoveryFailureTest extends FunSuite {


  val cfg
=
    s
"""
        |akka.loggers = ["
akka.event.slf4j.Slf4jLogger"]
        |akka.loglevel = "
INFO"
        |akka.actor.provider = "
akka.cluster.ClusterActorRefProvider"
        |akka.remote.netty.tcp.port=2551
        |akka.remote.netty.tcp.hostname=127.0.0.1
        |akka.persistence.journal.plugin = "
cassandra-journal"
        |cassandra-journal.contact-points = [ 127.0.0.1 ]
        |cassandra-journal.port = 9042
      """
.stripMargin

 
ConfigFactory.parseString(cfg)
   
.withFallback(ConfigFactory.load())

  test
("Recovery Fails repeatedly") {
    val system
= ActorSystem("Test")
    val actor
= system.actorOf(Props.create(classOf[RecoverMe]))
    actor
! MyCommand(None)
   
Thread.sleep(5000)
    system
.shutdown()
   
Thread.sleep(2000)
    println
(s"Recovered ${RecoveryMetric.count.toString} times")
 
}

}

Chris Carter

unread,
Sep 18, 2014, 2:24:11 AM9/18/14
to akka...@googlegroups.com
Correction to the example in my previous email, this version actually uses the test config generated by the code and also handles the RecoveryFailure message:

import java.util.concurrent.atomic.AtomicInteger

import akka.actor._
import akka.persistence.{RecoveryFailure, PersistentActor}

import com.typesafe.config.ConfigFactory
import org.scalatest.FunSuite

case class MyCommand(value: Option[String])
case class MyEvent(value: Option[String])

object RecoveryMetric {
  val count
= new AtomicInteger(0)

 
def inc() = count.incrementAndGet()
}

class RecoverMe extends PersistentActor with ActorLogging {


  val receiveCommand
: Receive = {
   
case cmd: MyCommand => persist(MyEvent(cmd.value)) {
     
event => log.info("Received: " + event.value.get)
   
}
 
}

  val receiveRecover
: Receive = {
   
case event: MyEvent => {
     
RecoveryMetric.inc()
      log
.info("Recovered: " + event.value.get)
   
}

   
case RecoveryFailure => {
     
self ! Kill

   
}
 
}

 
override def persistenceId: String = "my.test.id"
}


class RecoveryFailureTest extends FunSuite {

  val cfg
=
    s
"""
        |akka.loggers = ["
akka.event.slf4j.Slf4jLogger"]
        |akka.loglevel = "
INFO"
        |akka.actor.provider = "
akka.cluster.ClusterActorRefProvider"
        |akka.remote.netty.tcp.port=2551
        |akka.remote.netty.tcp.hostname=127.0.0.1
        |akka.persistence.journal.plugin = "
cassandra-journal"
        |cassandra-journal.contact-points = [ 127.0.0.1 ]
        |cassandra-journal.port = 9042
      """
.
stripMargin

  val akkaCfg
= ConfigFactory.parseString(cfg)

   
.withFallback(ConfigFactory.load())

  test
("Recovery Fails repeatedly") {

    val system
= ActorSystem("Test", akkaCfg)

Patrik Nordwall

unread,
Sep 18, 2014, 4:11:09 AM9/18/14
to akka...@googlegroups.com
Thank you, Chris!

Now I understand the problem. 
The exception in receiveRecover is caused by event.value.get, and that triggers a restart and doesn't generate the RecoveryFailure message. I agree that we should handle all kinds of exceptions from receiveRecover in the same way.


The scenario I was thinking about was exceptions from de-serialization and I have verified that those generates RecoveryFailure, and if that is not handled the actor is by default stopped.

[ERROR] [09/18/2014 10:02:23.450] [PersistentActorSpec-akka.actor.default-dispatcher-2] [akka://PersistentActorSpec/user/$a] Processor killed after recovery failure (persistent id = [my.test.id]). To avoid killing processors on recovery failure, a processor must handle RecoveryFailure messages. RecoveryFailure was caused by: java.io.InvalidClassException: akka.persistence.PersistentActorSpec$MyEvent; local class incompatible: stream classdesc serialVersionUID = 8162434214324358497, local class serialVersionUID = -4420973383950594447 (akka.actor.ActorKilledException)

By the way, you should use
  case _: RecoveryFailure =>
to handle the RecoveryFailure message.

/Patrik

Chris Carter

unread,
Sep 18, 2014, 12:52:46 PM9/18/14
to akka...@googlegroups.com
Thanks Patrik, I've created an issue here: https://github.com/akka/akka/issues/15943

I updated the example to include the RecoveryFailure option, I realized we're not handling that in our production code either.  Thanks for the reminder!

Chris
...

Patrik Nordwall

unread,
Sep 18, 2014, 1:21:05 PM9/18/14
to akka...@googlegroups.com
Thanks for reporting!
/Patrik
Reply all
Reply to author
Forward
0 new messages