import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.FSM
import akka.actor.FSM.SubscribeTransitionCallBack
import akka.actor.Props
sealed trait State
case object RED extends State
case object GREEN extends State
case class Data
case class AlternateColor
object FsmActor extends App {
val system = ActorSystem("akka")
val fsmActor = system.actorOf(Props[FsmActor], "FSMActor")
val monitor = system.actorOf(Props(classOf[StateMonitor]), "StateMonitor")
fsmActor ! SubscribeTransitionCallBack(monitor)
(1 to 10) foreach { i => fsmActor ! AlternateColor }
}
class StateMonitor extends Actor {
def receive = {
x => {
x match {
case _ => println("MONITOR RECEIVED: " + x)
}
}
}
}
class FsmActor extends Actor with FSM[State, Data] {
override def preStart(): Unit = {
println("hello prestart")
}
startWith(RED, Data())
when(RED) {
case Event(AlternateColor, _) => goto(GREEN)
}
when(GREEN) {
case Event(AlternateColor, _) => goto(RED)
}
onTransition {
case RED -> GREEN => println("I am RED, going GREEN")
case GREEN -> RED => println("I am GREEN, going RED")
}
initialize
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.
--
Using Processor actor implies that there may be some initial state, so its preStart() method sends a Recover message to facilitate the recovery before anything else happens.The important part is that until the Recovery message is received by Processor everything else sent to the Processor actor will be Stashed during the invocation of aroundReceive().So, by overriding it with an empty method you pretty much broke the contract where the Processor is waiting for the Recovery and stashing everything else until it gets it.
So the fact that by default Processor sends an empty Recovery ( self ! Recovery() ), you can easily put the same code in your overridden method.
Then when you see that everything is back to normal you can start customizing Recovery itself.
-- Martin Krasser blog: http://krasserm.blogspot.com code: http://github.com/krasserm twitter: http://twitter.com/mrt1nz
Oleg, Martin, thanksI did not realize everything is stashed by Processor until after the recovery ends, regardless customized or not. My idea was to run a controlled recovery after my FSM actors are in a certain "ready" state (which involves coordinating several actors). But since a Processor does not process anything until after the recovery is done I'll have to rethink the design
--
Vadim
I was thinking about it some more, so let me share some thoughtsProcessor type actor designed for recovery-first use case where a Message that didn't get a chance to process at the time of actor termination (e.g., system crash) would need to be recovered, processed and only then all other messages sent to it can be processed. So recovery is a pre-requisite state to the internal life-cycle of the Processor actor. And I am sure you understand this.However, what complicates your use case is that you seem to have another pre-requisite state where some other actors must report their states which has to be aggregated and based on the aggregated state you can allow or disallow the recovery. This is a very common use case in Messaging systems (e.g., make sure DB is ready, directories exist on file system etc...). At least this is what I understand based on your explanation. So, if this is correct then I think there is a bit of a design flaw where you are coupling two state management mechanisms into one and all you need to do is to simply separate the two.1. You have an aggregated FsmActor(s) state (you mentioned you may have several). Achieving certain aggregated state from these actors is a pre-requisite state to the Processor actor's life-cycle.2. Processor actor has also internal pre-requisite state achieved thru recovery procedure.So, the way I would structure this is:FsmActor(s) should not be Processor actors. They should simply report the state via SubscribeTransitionCallBack to the StateMonitor, so it can do what the name suggests - monitor and aggregate the state from all the FsmActors, and once certain state is achieved StateMonitor sends Recover() message to the Processor actor.IMHO its cleaner since it lets you look at, address and manage two separate problems - separately.
CheersOleg
P.S. It seems to me that the Processor is based on the assumption that it must preserve the sequence of Messages it received. While in most cases it is a correct assumption and appropriate default behavior, it is not always the case. In other words there are plenty of use cases where ordering of Messages is not important. For those cases it would be nice to have some type of flag which would allow recovery to be decoupled from the lifecycle of the Processor actor, thus allowing it to treat every message equal (no stashing).
--
On Wed, Jan 29, 2014 at 12:51 PM, Vadim Bobrov <vadim...@gmail.com> wrote:
Oleg, Martin, thanks
I did not realize everything is stashed by Processor until after the recovery ends, regardless customized or not. My idea was to run a controlled recovery after my FSM actors are in a certain "ready" state (which involves coordinating several actors). But since a Processor does not process anything until after the recovery is done I'll have to rethink the design--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.
--
> Are you coupling too much responsibility into a single component?here is a birds-eye overview of what my system is. I am picking up messages from ActiveMQ, do some processing and save to HBase. All principal actors are FSM, I found it convenient to orchestrate their cooperative behaviour by listening to state changes. The processor in question is the one that picks up messages from ActiveMQ. It is an FSM and basically has 2 states: Active (continuously picking up messages as they arrive) and StandBy (idle). This design seems pretty intuitive and natural to me but apparently it doesn't quite works with Persistence. I guess I probably have to spawn a separate processor whose sole responsibility will be picking up messages from ActiveMQ and kill it upon moving to StandBy state. One drawback of this is that supervision might become more cumbersome (2 actors + their interaction instead of one)
--
That is what I meant, yes. Leaving messages on the JMS queue wouldn't work for me because I have other actors (like retrieving info over REST) whose event stream I also need to preserve.
On 30.01.14 23:22, Vadim Bobrov wrote:
That is what I meant, yes. Leaving messages on the JMS queue wouldn't work for me because I have other actors (like retrieving info over REST) whose event stream I also need to preserve.
I'm not getting this argument. Why can't you use JMS persistence for the JMS message stream and akka-persistence for the HTTP message stream separately?
Ah, I thought you have JMS persistence in place anyway.