No CurrentState when preStart is overriden

202 views
Skip to first unread message

Vadim Bobrov

unread,
Jan 28, 2014, 3:38:44 PM1/28/14
to akka...@googlegroups.com
Hi,

I ran into this problem: when preStart is overriden (empty) for an FSM actor it stops sending CurrentState in response to SubscribeTransitionCallBack. Tried putting 

startWith
initialize

into preStart, still not getting CurrentState. What is the correct solution in this case? I am disabling preStart so I can customize recovery in Persistence module

Thanks
Vadim

Oleg Zhurakousky

unread,
Jan 28, 2014, 4:39:21 PM1/28/14
to akka...@googlegroups.com
Can you share some code. I just tried to reproduce it based on your explanation and it all works for me (see code below), but may be I am missing something.

Cheers
Oleg
=========

import akka.actor.Actor

import akka.actor.ActorSystem

import akka.actor.FSM

import akka.actor.FSM.SubscribeTransitionCallBack

import akka.actor.Props


sealed trait State

case object RED extends State

case object GREEN extends State

case class Data

case class AlternateColor


object FsmActor extends App {

  val system = ActorSystem("akka")

  val fsmActor = system.actorOf(Props[FsmActor], "FSMActor")

  val monitor = system.actorOf(Props(classOf[StateMonitor]), "StateMonitor")

  fsmActor ! SubscribeTransitionCallBack(monitor)

  (1 to 10) foreach { i => fsmActor ! AlternateColor }

}

class StateMonitor extends Actor {

  def receive = {

    x => {

        x match {

          case _ => println("MONITOR RECEIVED: " + x)

        }

      }

  }

}

class FsmActor extends Actor with FSM[State, Data] {

  override def preStart(): Unit = {

    println("hello prestart")

  }

  startWith(RED, Data())

  when(RED) {

    case Event(AlternateColor, _) => goto(GREEN)

  }

  when(GREEN) {

    case Event(AlternateColor, _) => goto(RED)

  }

  onTransition {

    case RED -> GREEN => println("I am RED, going GREEN")

    case GREEN -> RED => println("I am GREEN, going RED")

  }

  initialize

}



--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.

Vadim Bobrov

unread,
Jan 28, 2014, 4:51:44 PM1/28/14
to akka...@googlegroups.com
Thanks, Oleg,

If you make FsmActor in your example inherit from Processor (from akka-persistence) you'll see the effect (class FsmActor extends Processor with FSM[State, Data] {)

Vadim

Oleg Zhurakousky

unread,
Jan 28, 2014, 9:53:21 PM1/28/14
to akka...@googlegroups.com
Using Processor actor implies that there may be some initial state, so its preStart() method sends a Recover message to facilitate the recovery before anything else happens. 
The important part is that until the Recovery message is received by Processor everything else sent to the Processor actor will be Stashed during the invocation of aroundReceive().
So, by overriding it with an empty method you pretty much broke the contract where the Processor is waiting for the Recovery and stashing everything else until it gets it.

So the fact that by default Processor sends an empty Recovery ( self ! Recovery() ), you can easily put the same code in your overridden method. Then when you see that everything is back to normal you can start customizing Recovery itself.

Hope that clears it up.
Cheers
Oleg




--

Martin Krasser

unread,
Jan 29, 2014, 2:39:24 AM1/29/14
to akka...@googlegroups.com

On 29.01.14 03:53, Oleg Zhurakousky wrote:
Using Processor actor implies that there may be some initial state, so its preStart() method sends a Recover message to facilitate the recovery before anything else happens. 
The important part is that until the Recovery message is received by Processor everything else sent to the Processor actor will be Stashed during the invocation of aroundReceive().
So, by overriding it with an empty method you pretty much broke the contract where the Processor is waiting for the Recovery and stashing everything else until it gets it.

So the fact that by default Processor sends an empty Recovery ( self ! Recovery() ), you can easily put the same code in your overridden method.

It's ok to leave preStart() empty and let the application send a Recover() request:

fsmActor ! Recover(...)

With this addition, the example works as expected.


Then when you see that everything is back to normal you can start customizing Recovery itself.

Any Recover() request sent after the initial Recover() request is ignored by a processor. A processor must be restarted to accept another Recover() request (which is sent by default in preRestart()).

Vadim Bobrov

unread,
Jan 29, 2014, 12:51:58 PM1/29/14
to akka...@googlegroups.com, kras...@googlemail.com
Oleg, Martin, thanks

I did not realize everything is stashed by Processor until after the recovery ends, regardless customized or not. My idea was to run a controlled recovery after my FSM actors are in a certain "ready" state (which involves coordinating several actors). But since a Processor does not process anything until after the recovery is done I'll have to rethink the design

Oleg Zhurakousky

unread,
Jan 30, 2014, 8:10:17 AM1/30/14
to akka...@googlegroups.com
Vadim

I was thinking about it some more, so let me share some thoughts
Processor type actor designed for recovery-first use case where a Message that didn't get a chance to process at the time of actor termination (e.g., system crash) would need to be recovered, processed and only then all other messages sent to it can be processed. So recovery is a pre-requisite state to the internal life-cycle of the Processor actor. And I am sure you understand this. 
However, what complicates your use case is that you seem to have another pre-requisite state where some other actors must report their states which has to be aggregated and based on the aggregated state you can allow or disallow the recovery. This is a very common use case in Messaging systems (e.g., make sure DB is ready, directories exist on file system etc...). At least this is what I understand based on your explanation. So, if this is correct then I think there is a bit of a design flaw where you are coupling two state management mechanisms into one and all you need to do is to simply separate the two. 
1. You have an aggregated FsmActor(s) state (you mentioned you may have several). Achieving certain aggregated state from these actors is a pre-requisite state to the Processor actor's life-cycle.
2. Processor actor has also internal pre-requisite state achieved thru recovery procedure. 
So, the way I would structure this is:
FsmActor(s) should not be Processor actors. They should simply report the state via SubscribeTransitionCallBack to the StateMonitor, so it can do what the name suggests - monitor and aggregate the state from all the FsmActors, and once certain state is achieved StateMonitor sends Recover() message to the Processor actor. 
IMHO its cleaner since it lets you look at, address and manage two separate problems - separately.

Cheers
Oleg

P.S. It seems to me that the Processor is based on the assumption that it must preserve the sequence of Messages it received. While in most cases it is a correct assumption and appropriate default behavior, it is not always the case. In other words there are plenty of use cases where ordering of Messages is not important. For those cases it would be nice to have some type of flag which would allow recovery to be decoupled from the lifecycle of the Processor actor, thus allowing it to treat every message equal (no stashing).


On Wed, Jan 29, 2014 at 12:51 PM, Vadim Bobrov <vadim...@gmail.com> wrote:
Oleg, Martin, thanks

I did not realize everything is stashed by Processor until after the recovery ends, regardless customized or not. My idea was to run a controlled recovery after my FSM actors are in a certain "ready" state (which involves coordinating several actors). But since a Processor does not process anything until after the recovery is done I'll have to rethink the design

--

Martin Krasser

unread,
Jan 30, 2014, 9:54:22 AM1/30/14
to akka...@googlegroups.com

On 30.01.14 14:10, Oleg Zhurakousky wrote:
Vadim

I was thinking about it some more, so let me share some thoughts
Processor type actor designed for recovery-first use case where a Message that didn't get a chance to process at the time of actor termination (e.g., system crash) would need to be recovered, processed and only then all other messages sent to it can be processed. So recovery is a pre-requisite state to the internal life-cycle of the Processor actor. And I am sure you understand this. 
However, what complicates your use case is that you seem to have another pre-requisite state where some other actors must report their states which has to be aggregated and based on the aggregated state you can allow or disallow the recovery. This is a very common use case in Messaging systems (e.g., make sure DB is ready, directories exist on file system etc...). At least this is what I understand based on your explanation. So, if this is correct then I think there is a bit of a design flaw where you are coupling two state management mechanisms into one and all you need to do is to simply separate the two. 
1. You have an aggregated FsmActor(s) state (you mentioned you may have several). Achieving certain aggregated state from these actors is a pre-requisite state to the Processor actor's life-cycle.
2. Processor actor has also internal pre-requisite state achieved thru recovery procedure. 
So, the way I would structure this is:
FsmActor(s) should not be Processor actors. They should simply report the state via SubscribeTransitionCallBack to the StateMonitor, so it can do what the name suggests - monitor and aggregate the state from all the FsmActors, and once certain state is achieved StateMonitor sends Recover() message to the Processor actor. 
IMHO its cleaner since it lets you look at, address and manage two separate problems - separately.

+1



Cheers
Oleg

P.S. It seems to me that the Processor is based on the assumption that it must preserve the sequence of Messages it received. While in most cases it is a correct assumption and appropriate default behavior, it is not always the case. In other words there are plenty of use cases where ordering of Messages is not important. For those cases it would be nice to have some type of flag which would allow recovery to be decoupled from the lifecycle of the Processor actor, thus allowing it to treat every message equal (no stashing).

I think that's a useful addition. Please create a ticket.

Thanks,
Martin




On Wed, Jan 29, 2014 at 12:51 PM, Vadim Bobrov <vadim...@gmail.com> wrote:
Oleg, Martin, thanks

I did not realize everything is stashed by Processor until after the recovery ends, regardless customized or not. My idea was to run a controlled recovery after my FSM actors are in a certain "ready" state (which involves coordinating several actors). But since a Processor does not process anything until after the recovery is done I'll have to rethink the design
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.

Vadim Bobrov

unread,
Jan 30, 2014, 10:44:34 AM1/30/14
to akka...@googlegroups.com, kras...@googlemail.com
> It seems to me that the Processor is based on the assumption that it must preserve the sequence of Messages it received...

Oleg, thanks. I'll give it a thought. However:

if the Processor is based on the assumption that it basically cannot do any processing until after the recovery is done, then customizing persistence (selectively saving only some of the messages) and recovery should be disallowed, since they kinda contradict to the idea of non-selective stashing. In my use case I wanted to only persist "data" messages and avoid persisting any commands (such as enable/disable/pause etc.) So I wanted to let the app handle its state and coordination (without persisting) and recovery (of data only). How about this - stashing only starts upon receiving Recover() rather than immediately? in the default case nothing changes, but the user has a chance to override preStart() and do something before the recovery (and stashing) starts?

Oleg Zhurakousky

unread,
Jan 30, 2014, 11:09:38 AM1/30/14
to akka...@googlegroups.com
While this would work for your use case, it may present more confusion when you look at it in more general perspective.
The Processor and Recovery mechanism is based on a well known pattern MessageStore - http://www.eaipatterns.com/MessageStore.html ( i think its the 4th time I bring it up in the last week ;) ) which implies that a consumer backed by a persistent message store must rehydrate and process messages from the Message Store before any other messages it receives. And that is how Processor's default behavior is designed. 
The flag that I proposed in the P.S will allow to override this but only as an override, not a default behavior and only for cases where message ordering is simply not important. What you proposing is selective stashing based on success of Recovery regardless of when recovery is sent.
There are few issues with that.
1. What should happen in this case if you send more then one Recovery?
2. If you never send a Recovery what is the overall purpose of the Processor component and makes it different then any other actor?
3. You can easily do what you proposing yourself with the flexibility of the Actor model, but since in this case it won't match any well known pattern or common use case how would you characterize such component in the general sense?
4. Back to my original question; Are you coupling too much responsibility into a single component? How will it affect the flexibility of the overall design. You have (as I believe) two independent conditions (states) in the overall life-cycle of your system. Your life-cycle begins with gathering and aggregating states from  FSM Actors thus creating a state that  determines the life-cycle of the Processor actor. 
So I hope you see the issue. 

Cheers
Oleg



--

Vadim Bobrov

unread,
Jan 30, 2014, 1:34:57 PM1/30/14
to akka...@googlegroups.com
> Are you coupling too much responsibility into a single component?

here is a birds-eye overview of what my system is. I am picking up messages from ActiveMQ, do some processing and save to HBase. All principal actors are FSM, I found it convenient to orchestrate their cooperative behaviour by listening to state changes. The processor in question is the one that picks up messages from ActiveMQ. It is an FSM and basically has 2 states: Active (continuously picking up messages as they arrive) and StandBy (idle). This design seems pretty intuitive and natural to me but apparently it doesn't quite works with Persistence. I guess I probably have to spawn a separate processor whose sole responsibility will be picking up messages from ActiveMQ and kill it upon moving to StandBy state. One drawback of this is that supervision might become more cumbersome (2 actors + their interaction instead of one)

Oleg Zhurakousky

unread,
Jan 30, 2014, 2:56:32 PM1/30/14
to akka...@googlegroups.com
I see. . .
So, your FSM actor in question is a JMS listener. 
Side question. If you are using JMS, why not just rely on its own persistence/transaction mechanism. This way unprocessed (uncommitted) messages will remain on its queue, so you can avoid Persistence all together.
Having said that I am going to assume you have your reasons, so. . .
What you are trying to achieve is to have your FSM actor to start JMS listener when its in the active state. So what if you have JmsManager(FSM) and JmsListener(Process) actors where JmsListener will be a child of JmsManager. While you still end up with two actors you also create a parent/child relationship which would allow you to still supervise only one actor - JmsManager since its lifecycle would affect all its children.  

Oleg


On Thu, Jan 30, 2014 at 1:34 PM, Vadim Bobrov <vadim...@gmail.com> wrote:
> Are you coupling too much responsibility into a single component?

here is a birds-eye overview of what my system is. I am picking up messages from ActiveMQ, do some processing and save to HBase. All principal actors are FSM, I found it convenient to orchestrate their cooperative behaviour by listening to state changes. The processor in question is the one that picks up messages from ActiveMQ. It is an FSM and basically has 2 states: Active (continuously picking up messages as they arrive) and StandBy (idle). This design seems pretty intuitive and natural to me but apparently it doesn't quite works with Persistence. I guess I probably have to spawn a separate processor whose sole responsibility will be picking up messages from ActiveMQ and kill it upon moving to StandBy state. One drawback of this is that supervision might become more cumbersome (2 actors + their interaction instead of one)

--

Vadim Bobrov

unread,
Jan 30, 2014, 5:22:21 PM1/30/14
to akka...@googlegroups.com
That is what I meant, yes. Leaving messages on the JMS queue wouldn't work for me because I have other actors (like retrieving info over REST) whose event stream I also need to preserve. Thanks!

Martin Krasser

unread,
Jan 31, 2014, 1:59:43 AM1/31/14
to akka...@googlegroups.com

On 30.01.14 16:44, Vadim Bobrov wrote:
> > It seems to me that the Processor is based on the assumption that it
> must preserve the sequence of Messages it received...
>
> Oleg, thanks. I'll give it a thought. However:
>
> if the Processor is based on the assumption that it basically cannot
> do any processing until after the recovery is done, then customizing
> persistence (selectively saving only some of the messages) and
> recovery should be disallowed, since they kinda contradict to the idea
> of non-selective stashing.

Well, not really. The order across persistent and non-persistent
messages is preserved by a processor, and hence should also be done
during recovery (i.e. non-selective stashing is the right choice).

> In my use case I wanted to only persist "data" messages and avoid
> persisting any commands (such as enable/disable/pause etc.) So I
> wanted to let the app handle its state and coordination (without
> persisting) and recovery (of data only). How about this - stashing
> only starts upon receiving Recover() rather than immediately? in the
> default case nothing changes, but the user has a chance to override
> preStart() and do something before the recovery (and stashing) starts?

As Oleg already mentioned, this is best done by a separate actor that
collaborates with a processor. Alternatively, let your app create
whatever initialization data is needed for the processor and pass these
data to the processor constructor. Not allowing applications to see
partially-recovered processors on both start and re-start is critical
for most use cases, especially for consistent recovery a network of
processors.

Martin Krasser

unread,
Jan 31, 2014, 2:02:34 AM1/31/14
to akka...@googlegroups.com

On 30.01.14 23:22, Vadim Bobrov wrote:
That is what I meant, yes. Leaving messages on the JMS queue wouldn't work for me because I have other actors (like retrieving info over REST) whose event stream I also need to preserve.

I'm not getting this argument. Why can't you use JMS persistence for the JMS message stream and akka-persistence for the HTTP message stream separately?

Vadim Bobrov

unread,
Jan 31, 2014, 9:31:02 AM1/31/14
to akka...@googlegroups.com, kras...@googlemail.com

On Friday, January 31, 2014 2:02:34 AM UTC-5, Martin Krasser wrote:

On 30.01.14 23:22, Vadim Bobrov wrote:
That is what I meant, yes. Leaving messages on the JMS queue wouldn't work for me because I have other actors (like retrieving info over REST) whose event stream I also need to preserve.

I'm not getting this argument. Why can't you use JMS persistence for the JMS message stream and akka-persistence for the HTTP message stream separately?

Well, 2 persistence methods instead of 1. More complexity, less reliability. 2 branches of code to be tested. 2 disk stores to take care of.  

Martin Krasser

unread,
Jan 31, 2014, 9:49:07 AM1/31/14
to akka...@googlegroups.com

Ah, I thought you have JMS persistence in place anyway.

Reply all
Reply to author
Forward
0 new messages