--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
Patrik Nordwall
Typesafe - Reactive apps on the JVM
Twitter: @patriknw
Hi Patrick,
Thanks for the reply. Yes, these are considerations we're working through and absolutely a major point of design that we need to work out. My primary reason for asking the question was to identify where we can provide logic for identifying and dealing with the issue when it arises - first in development as we're ferreting out possible failure scenarios, and then in production in the event we run into corrupt data unexpectedly. Ideally that won't happen, but sometimes things get missed, in which case I'd rather the failure scenario be stop and notify rather than infinite recovery attempts in a death loop.
For now, I did end up going ahead with the architecture you describe - a sharded supervisor actor and a persistent child actor.
I just checked our dependencies, and it looks like we're on 2.3.5. I'll try updating to 2.3.6 and see if that helps at all. I was assuming it was restarted based on what I saw in the logs and what I understand of the actor lifecycle - we hadn't sent any new messages to the actor, but it was stuck in a recovery loop where it was perpetually trying to recover the same message over and over again (ie, the same failure was being logged 1000s of times in a few seconds). Does that make sense?
When I run the test below, I see: "Recovered 2933 times".
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{ActorLogging, ActorSystem, Props}
import akka.persistence.PersistentActor
import com.typesafe.config.ConfigFactory
import org.scalatest.FunSuite
case class MyCommand(value: Option[String])
case class MyEvent(value: Option[String])
object RecoveryMetric {
val count = new AtomicInteger(0)
def inc() = count.incrementAndGet()
}
class RecoverMe extends PersistentActor with ActorLogging {
val receiveCommand: Receive = {
case cmd: MyCommand => persist(MyEvent(cmd.value)) {
event => log.info("Received: " + event.value.get)
}
}
val receiveRecover: Receive = {
case event: MyEvent => {
RecoveryMetric.inc()
log.info("Recovered: " + event.value.get)
}
}
override def persistenceId: String = "my.test.id"
}
class RecoveryFailureTest extends FunSuite {
val cfg =
s"""
|akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
|akka.loglevel = "INFO"
|akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|akka.remote.netty.tcp.port=2551
|akka.remote.netty.tcp.hostname=127.0.0.1
|akka.persistence.journal.plugin = "cassandra-journal"
|cassandra-journal.contact-points = [ 127.0.0.1 ]
|cassandra-journal.port = 9042
""".stripMargin
ConfigFactory.parseString(cfg)
.withFallback(ConfigFactory.load())
test("Recovery Fails repeatedly") {
val system = ActorSystem("Test")
val actor = system.actorOf(Props.create(classOf[RecoverMe]))
actor ! MyCommand(None)
Thread.sleep(5000)
system.shutdown()
Thread.sleep(2000)
println(s"Recovered ${RecoveryMetric.count.toString} times")
}
}
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import akka.persistence.{RecoveryFailure, PersistentActor}
import com.typesafe.config.ConfigFactory
import org.scalatest.FunSuite
case class MyCommand(value: Option[String])
case class MyEvent(value: Option[String])
object RecoveryMetric {
val count = new AtomicInteger(0)
def inc() = count.incrementAndGet()
}
class RecoverMe extends PersistentActor with ActorLogging {
val receiveCommand: Receive = {
case cmd: MyCommand => persist(MyEvent(cmd.value)) {
event => log.info("Received: " + event.value.get)
}
}
val receiveRecover: Receive = {
case event: MyEvent => {
RecoveryMetric.inc()
log.info("Recovered: " + event.value.get)
}
case RecoveryFailure => {
self ! Kill
}
}
override def persistenceId: String = "my.test.id"
}
class RecoveryFailureTest extends FunSuite {
val cfg =
s"""
|akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
|akka.loglevel = "INFO"
|akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|akka.remote.netty.tcp.port=2551
|akka.remote.netty.tcp.hostname=127.0.0.1
|akka.persistence.journal.plugin = "cassandra-journal"
|cassandra-journal.contact-points = [ 127.0.0.1 ]
|cassandra-journal.port = 9042
""".
stripMargin
val akkaCfg = ConfigFactory.parseString(cfg)
.withFallback(ConfigFactory.load())
test("Recovery Fails repeatedly") {
val system = ActorSystem("Test", akkaCfg)
...