Need of `onRecoveryCompleted` in akka typed for persistence

69 views
Skip to first unread message

Yaroslav Klymko

unread,
Oct 18, 2017, 3:54:14 PM10/18/17
to Akka User List
Hi guys,
Not sure this is the right place to ask:
I'm looking into akka typed for persistence and wondering whether we really need onRecoveryCompleted at https://github.com/akka/akka/blob/master/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala#L174
Why not just have something like actions: State => PersistentActor.Actions[Command, Event, State] ?

Patrik Nordwall

unread,
Oct 18, 2017, 4:22:12 PM10/18/17
to akka...@googlegroups.com
Do you suggest something like this?

  def immutable[Command, Event, State](
    persistenceId: String,
    initialState:  State,
    actions:       State => Actions[Command, Event, State],
    applyEvent:    (Event, State) ⇒ State): PersistentBehavior[Command, Event, State]

I it would be rather difficult to understand what state that is, especially since there is also Actions.byState. A typical usage of onRecoveryCompleted is to perform side effects, such as resending to an external service, and it's better to not mixing that concern with the construction of the command handlers.

/Patrik


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Akka Tech Lead
Lightbend -  Reactive apps on the JVM
Twitter: @patriknw

Yaroslav Klymko

unread,
Oct 18, 2017, 4:28:43 PM10/18/17
to Akka User List
I think renaming 
actions:       State => Actions[Command, Event, State]
to
onRecoveryCompleted:       State => Actions[Command, Event, State]

Makes it really clear. Also you might just decide to stop the actor in case the state is wrong, etc.

On Wednesday, October 18, 2017 at 10:22:12 PM UTC+2, Patrik Nordwall wrote:
Do you suggest something like this?

  def immutable[Command, Event, State](
    persistenceId: String,
    initialState:  State,
    actions:       State => Actions[Command, Event, State],
    applyEvent:    (Event, State) ⇒ State): PersistentBehavior[Command, Event, State]

I it would be rather difficult to understand what state that is, especially since there is also Actions.byState. A typical usage of onRecoveryCompleted is to perform side effects, such as resending to an external service, and it's better to not mixing that concern with the construction of the command handlers.

/Patrik

On Wed, Oct 18, 2017 at 2:54 PM, Yaroslav Klymko <t3h...@gmail.com> wrote:
Hi guys,
Not sure this is the right place to ask:
I'm looking into akka typed for persistence and wondering whether we really need onRecoveryCompleted at https://github.com/akka/akka/blob/master/akka-typed/src/main/scala/akka/typed/persistence/scaladsl/PersistentActor.scala#L174
Why not just have something like actions: State => PersistentActor.Actions[Command, Event, State] ?

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Yaroslav Klymko

unread,
Oct 19, 2017, 6:48:39 AM10/19/17
to Akka User List
I also have other concern regarding.

case PersistAll(events) ⇒
// apply the event before persist so that validation exception is handled before persisting
// the invalid event, in case such validation is implemented in the event handler.
state = events.foldLeft(state)(applyEvent)
persistAll(scala.collection.immutable.Seq(events)) { _ ⇒
sideEffects.foreach(applySideEffect)
}


We have local practice to apply events before persisting to make sure those are valid and then reusing resulting state.
We also have explicit logic on what to do with event that is not valid - Keep | DropEvent | DropCmd

But this logic enforces users to apply events after persistence and rely on exceptions.


Please let me know in case I'm wrong, I'd really like to understand your logic :)

Patrik Nordwall

unread,
Oct 19, 2017, 3:33:59 PM10/19/17
to akka...@googlegroups.com
Invalid events must not be stored, because then the actor will not be able to recover after a restart (later startup). Validation should typically be done by validating the incoming command before persisting the event. Validation should not be placed in applyEvent, but as an additional protection in case applyEvent throws exception (for validation or other reasons) we have chosen to call applyEvent before calling persistAll.

I'm not sure I understand what you are missing or suggest should be done differently, so please clarify. Isn't the problem that you mix the concerns of validating cmd/event with applying the event? If you have a separate step for validation you can do that before persisting, and the applyEvent is intended purely for changing the state and cannot fail.

To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Yaroslav Klymko

unread,
Oct 19, 2017, 3:52:09 PM10/19/17
to Akka User List



On Thursday, October 19, 2017 at 9:33:59 PM UTC+2, Patrik Nordwall wrote:
Invalid events must not be stored, because then the actor will not be able to recover after a restart (later startup). Validation should typically be done by validating the incoming command before persisting the event.

That depends on what you mean by invalid event:
 * completely invalid event within state
 * valid event that does not change state
 * given batch of events, and some of them are invalid, what to do with others?
 
Validation should not be placed in applyEvent, but as an additional protection in case applyEvent throws exception (for validation or other reasons) we have chosen to call applyEvent before calling persistAll.

For instance we would like to avoid exceptions, thus we have `state: Either[Error, State]` and validation result is also of type `state: Either[Error, State]`
 

I'm not sure I understand what you are missing or suggest should be done differently, so please clarify. Isn't the problem that you mix the concerns of validating cmd/event with applying the event? If you have a separate step for validation you can do that before persisting, and the applyEvent is intended purely for changing the state and cannot fail.

Let's consider we want to avoid exceptions, thus we have verified that event can be applied to state and then either return PersistentActor.persist or proceed further without persistence.
However now we cannot avoid useless in our case call `state = events.foldLeft(state)(applyEvent)` as we had to do that on our own already.
 
So I propose to remove concept of `state` from command handling phase, and have similar to typed actor approach with Behavior

And the transition might look like `onRecoveryCompleted: State => PersistentBehavior[Command, Event, State]`


What do you think ?

Patrik Nordwall

unread,
Oct 20, 2017, 10:44:35 AM10/20/17
to akka...@googlegroups.com
On Thu, Oct 19, 2017 at 2:52 PM, Yaroslav Klymko <t3h...@gmail.com> wrote:



On Thursday, October 19, 2017 at 9:33:59 PM UTC+2, Patrik Nordwall wrote:
Invalid events must not be stored, because then the actor will not be able to recover after a restart (later startup). Validation should typically be done by validating the incoming command before persisting the event.

That depends on what you mean by invalid event:
 * completely invalid event within state
 * valid event that does not change state
 * given batch of events, and some of them are invalid, what to do with others?
 
Validation should not be placed in applyEvent, but as an additional protection in case applyEvent throws exception (for validation or other reasons) we have chosen to call applyEvent before calling persistAll.

For instance we would like to avoid exceptions, thus we have `state: Either[Error, State]` and validation result is also of type `state: Either[Error, State]`
 

I'm not sure I understand what you are missing or suggest should be done differently, so please clarify. Isn't the problem that you mix the concerns of validating cmd/event with applying the event? If you have a separate step for validation you can do that before persisting, and the applyEvent is intended purely for changing the state and cannot fail.

Let's consider we want to avoid exceptions, thus we have verified that event can be applied to state and then either return PersistentActor.persist or proceed further without persistence.
However now we cannot avoid useless in our case call `state = events.foldLeft(state)(applyEvent)` as we had to do that on our own already.

Ok, I still think you can do all that with a separate validation/decision step (without exceptions) before persisting and keep applyEvent for the events that have already been persisted.

I tried to explain that in the blog post as:

After processing a message an ordinary, non-persistent, typed actor returns the Behavior that is used for next message. As you can see in the above examples that is not supported by typed persistent actors. Instead, the state is returned by applyEvent. The reason a new behavior can’t be returned is that behavior is part of the actor’s state and must also carefully be reconstructed during recovery. If it would have been supported it would mean that the behavior must be restored when replaying events and also encoded in the state anyway when snapshots are used. That would be very prone to mistakes.
 
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Yaroslav Klymko

unread,
Oct 20, 2017, 11:29:18 AM10/20/17
to Akka User List

On Friday, October 20, 2017 at 4:44:35 PM UTC+2, Patrik Nordwall wrote:


On Thu, Oct 19, 2017 at 2:52 PM, Yaroslav Klymko <t3h...@gmail.com> wrote:



On Thursday, October 19, 2017 at 9:33:59 PM UTC+2, Patrik Nordwall wrote:
Invalid events must not be stored, because then the actor will not be able to recover after a restart (later startup). Validation should typically be done by validating the incoming command before persisting the event.

That depends on what you mean by invalid event:
 * completely invalid event within state
 * valid event that does not change state
 * given batch of events, and some of them are invalid, what to do with others?
 
Validation should not be placed in applyEvent, but as an additional protection in case applyEvent throws exception (for validation or other reasons) we have chosen to call applyEvent before calling persistAll.

For instance we would like to avoid exceptions, thus we have `state: Either[Error, State]` and validation result is also of type `state: Either[Error, State]`
 

I'm not sure I understand what you are missing or suggest should be done differently, so please clarify. Isn't the problem that you mix the concerns of validating cmd/event with applying the event? If you have a separate step for validation you can do that before persisting, and the applyEvent is intended purely for changing the state and cannot fail.

Let's consider we want to avoid exceptions, thus we have verified that event can be applied to state and then either return PersistentActor.persist or proceed further without persistence.
However now we cannot avoid useless in our case call `state = events.foldLeft(state)(applyEvent)` as we had to do that on our own already.

Ok, I still think you can do all that with a separate validation/decision step (without exceptions) before persisting and keep applyEvent for the events that have already been persisted.

It might be heavy operation (it is in my case) and I'd rather avoid unnecessary calls, this will over compensate many optimisations like the one you have for Options :)
 

I tried to explain that in the blog post as:

After processing a message an ordinary, non-persistent, typed actor returns the Behavior that is used for next message. As you can see in the above examples that is not supported by typed persistent actors. Instead, the state is returned by applyEvent. The reason a new behavior can’t be returned is that behavior is part of the actor’s state and must also carefully be reconstructed during recovery. If it would have been supported it would mean that the behavior must be restored when replaying events and also encoded in the state anyway when snapshots are used. That would be very prone to mistakes.

Behaviour might be just retrieved by state, like `onRecoveryCompleted: State => PersistentBehavior[Command, Event, State]` so you don't need to encode it into state.

Patrik Nordwall

unread,
Oct 27, 2017, 5:24:10 AM10/27/17
to akka...@googlegroups.com
On Fri, Oct 20, 2017 at 5:29 PM, Yaroslav Klymko <t3h...@gmail.com> wrote:

On Friday, October 20, 2017 at 4:44:35 PM UTC+2, Patrik Nordwall wrote:


On Thu, Oct 19, 2017 at 2:52 PM, Yaroslav Klymko <t3h...@gmail.com> wrote:



On Thursday, October 19, 2017 at 9:33:59 PM UTC+2, Patrik Nordwall wrote:
Invalid events must not be stored, because then the actor will not be able to recover after a restart (later startup). Validation should typically be done by validating the incoming command before persisting the event.

That depends on what you mean by invalid event:
 * completely invalid event within state
 * valid event that does not change state
 * given batch of events, and some of them are invalid, what to do with others?
 
Validation should not be placed in applyEvent, but as an additional protection in case applyEvent throws exception (for validation or other reasons) we have chosen to call applyEvent before calling persistAll.

For instance we would like to avoid exceptions, thus we have `state: Either[Error, State]` and validation result is also of type `state: Either[Error, State]`
 

I'm not sure I understand what you are missing or suggest should be done differently, so please clarify. Isn't the problem that you mix the concerns of validating cmd/event with applying the event? If you have a separate step for validation you can do that before persisting, and the applyEvent is intended purely for changing the state and cannot fail.

Let's consider we want to avoid exceptions, thus we have verified that event can be applied to state and then either return PersistentActor.persist or proceed further without persistence.
However now we cannot avoid useless in our case call `state = events.foldLeft(state)(applyEvent)` as we had to do that on our own already.

Ok, I still think you can do all that with a separate validation/decision step (without exceptions) before persisting and keep applyEvent for the events that have already been persisted.

It might be heavy operation (it is in my case) and I'd rather avoid unnecessary calls, this will over compensate many optimisations like the one you have for Options :)

I don't understand what you mean. I'm not suggesting that you perform the validation twice. I suggest that you separate the validation from the actual update. In that way it will actually be more efficient, because you don't have to validate when replaying events during recovery.
Perhaps you can share a code example illustrating your point, if you still think I'm wrong?
 
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Yaroslav Klymko

unread,
Oct 27, 2017, 5:41:32 AM10/27/17
to Akka User List
Hi Patrik,
here is very simplified example:


object Example {

case class Entry(name: String)

case class State(entries: Map[String, Entry]) {

def applyEvent(event: Event): Either[String, State] = {
event match {
case Event.EntryAdded(id, name) =>
entries.get(id) match {
case None => Right(copy(entries + (id -> Entry(name))))
case Some(_) => Left("entry already exists")
}

case Event.EntryRenamed(id, newName) =>
entries.get(id) match {
case None => Left("entry is missing")
case Some(entry) =>
if (entry.name == newName) Left("old and new names are equal")
else Right(copy(entries + (id -> Entry(newName))))
}
}
}


def validateCmd(cmd: Cmd): Either[String, State] = {
cmd match {
case Cmd.AddEntry(id, name) => applyEvent(Event.EntryAdded(id, name))
case Cmd.RenameEntry(id, newName) => applyEvent(Event.EntryRenamed(id, newName))
}
}
}

sealed trait Cmd
object Cmd {
case class AddEntry(id: String, name: String) extends Cmd
case class RenameEntry(id: String, newName: String) extends Cmd
}

sealed trait Event
object Event {
case class EntryAdded(id: String, name: String) extends Event
case class EntryRenamed(id: String, newName: String) extends Event
}
}


So for instance after validating command we already have target state, and as we are using the same code for applying event and validating command, we don't need to perform it twice, no need in second sanity check after event has been persisted.
Does this help to understand my point? 

Patrik Nordwall

unread,
Oct 27, 2017, 6:09:40 AM10/27/17
to akka...@googlegroups.com
You can rewrite that to:

  case class State(entries: Map[String, Entry]) {

    def applyEvent(event: Event): State = {
      event match {
        case Event.EntryAdded(id, name) =>
          copy(entries + (id -> Entry(name)))
        case Event.EntryRenamed(id, newName) =>
          copy(entries + (id -> Entry(newName)))
      }
    }

    def validateCmd(cmd: Cmd): Either[String, Done.type] = {
      cmd match {
        case Cmd.AddEntry(id, name) =>
          entries.get(id) match {
            case None    => Right(Done)
            case Some(_) => Left("entry already exists")
          }
        case Cmd.RenameEntry(id, newName) =>
          entries.get(id) match {
            case None => Left("entry is missing")
            case Some(entry) =>
              if (entry.name == newName) Left("old and new names are equal")
              else Right(Done)
          }

      }
    }
  }



To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Yaroslav Klymko

unread,
Oct 27, 2017, 6:19:23 AM10/27/17
to Akka User List
Sure,

But then I won't see the potential problems that during recovery for instance we are renaming missing entry.
With ongoing development and more complicated data structures, when you might have tens nesting deep it is becoming crucial to validate recovery against previous versions and to understand what went wrong.
Thus having similar flow during command validation and event validation also helps.

So I'm just wanted to point out that your new approach is becoming less flexible, however more type safe. But I believe you can preserve flexibility and still make it type safe :)

Patrik Nordwall

unread,
Oct 27, 2017, 7:25:41 AM10/27/17
to akka...@googlegroups.com
On Fri, Oct 27, 2017 at 12:19 PM, Yaroslav Klymko <t3h...@gmail.com> wrote:
Sure,

But then I won't see the potential problems that during recovery for instance we are renaming missing entry.
With ongoing development and more complicated data structures, when you might have tens nesting deep it is becoming crucial to validate recovery against previous versions and to understand what went wrong.
Thus having similar flow during command validation and event validation also helps.

That's for catching bugs when evolving the model. I think that should be written as precondition and invariant checks, typically throwing exceptions.
 

So I'm just wanted to point out that your new approach is becoming less flexible, however more type safe. But I believe you can preserve flexibility and still make it type safe :)

Thanks for feedback.

PersistentActor is extremely flexible, but also very easy to get wrong.
 
Btw, you could also change validateCommand to be a factory for the event if you like:

def createEvent(cmd: Cmd): Either[String, Event] = {
      cmd match {
        case Cmd.AddEntry(id, name) =>
          entries.get(id) match {
            case None    => Right(Event.EntryAdded(id, name))
            case Some(_) => Left("entry already exists")
          }
        case Cmd.RenameEntry(id, newName) =>
          entries.get(id) match {
            case None => Left("entry is missing")
            case Some(entry) =>
              if (entry.name == newName) Left("old and new names are equal")
              else Right(Event.EntryRenamed(id, newName))
          }
      }
    }

 
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Alexandr Sova

unread,
Nov 6, 2017, 10:17:56 AM11/6/17
to Akka User List
Maybe in that case you still shouldn't make any "validation" or something else at applyEvent. Look at event adapters - it should solve your "renaming missing entry" case and similar cases.
Events are much like accounting or git commits.
Event have a temporal nature - events mark that something is indeed happen at the past If you haven't had an entry at the past then event shouldn't be emitted, but if you have had that entry at the past - event should be applied as-is while recovery because at that recovery point at time you re-constructing an application state at that point in time or it's projection. If you try to apply events on top of last state than please don't. Read more about eventsourcing in general, watch some Greg Young videos but don't shoot your legs.
Events couldn't be undone - only new events could undo the consequences of previous events but not events themselves.
Events couldn't be invalid - rather commands could. Event is just a fact that some command or part of it had being executed correctly.
I'm using existing onRecoveryComplete at my PersistentFSM as () => Unit and think it's quite handy right now. Any case it's get called at PersistentFSM::receiveRecover ant that method is the place where you can get all the control on recovery process, but should you?
Reply all
Reply to author
Forward
0 new messages