akka.persistence, snapshots and deleting messages from journal

3,489 views
Skip to first unread message

dpen...@good-cloud.com

unread,
Feb 18, 2014, 3:39:28 PM2/18/14
to akka...@googlegroups.com
I would assume that a common scenario is to periodically snapshot state and keep 1 or more snapshots.  In this case, I assume that as you delete older snapshots, you would also delete messages older than the snapshot from the journal in order to keep the journal from growing indefinitely.

Is this a typical usage or am I missing something?  I don't see anything in the code that automatically does this (and I didn't really expect to given the general flexibility theme of the framework).  However, I don't see anything in the documentation that alludes to any other use of deleteMessage other than in dealing with exceptions.

-david

Patrik Nordwall

unread,
Feb 19, 2014, 2:45:17 AM2/19/14
to akka...@googlegroups.com
You can use deleteMessages to delete all messages up to a specified sequence number. I would not use it too eagerly, since the information in the history can be valuable in itself, but that is of course depending on the application.

/Patrik


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.



--

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

dpen...@good-cloud.com

unread,
Feb 19, 2014, 9:08:37 AM2/19/14
to akka...@googlegroups.com
Agreed - in our legacy system, we have a storage manager that removes old journals and snapshots as space is needed.  The old journals are sometimes very useful for postmortem analysis.

-david

Brian McGee

unread,
Sep 18, 2014, 7:06:55 AM9/18/14
to akka...@googlegroups.com
Hey,

I'm implementing a PersistentActor which batches writes for a db. After successfully writing a batch I can't figure out how to determine the latest message number I should use to pass to deleteMessages to prevent the events I just persisted to the db from appearing in the recovery. Any ideas?

Brian

danny.b...@protectwise.com

unread,
Sep 18, 2014, 7:34:36 AM9/18/14
to akka...@googlegroups.com
Make sure you're receiving the snapshot success message.

case SaveSnapshotSuccess(metadata) => {
  deleteMessages(metadata.sequenceNr, true)
}

You can also save that metadata in case you want to clean up snapshots. Per Patrik's comment, you may want to verify that some interval has passed before you deleteMessages. If you want, you can also track the last snapshot for easy deletion.

lastSnapshot.foreach( m => deleteSnapshot(m.sequenceNr, m.timestamp))

Harit Himanshu

unread,
Nov 27, 2015, 7:41:23 PM11/27/15
to Akka User List
Is this is a correct way to create snapshots and delete messages? I have created an offset to retain 10 old entries (assuming that these jobs may have been worked upon)

import akka.actor.{Props, ActorSystem, ActorLogging}
import akka.event.LoggingReceive
import akka.persistence._

case class Command(data: String)

case class Event(data: String)

case object InternalState

case object TakeSnapshot

case object ShutDown

case object Fail

case class State(queue: List[String] = Nil) {
  def updated(event: Event): State = copy(event.data :: queue)

  def size: Int = queue.length

  override def toString: String = queue.reverse.toString
}

class PersistentSnapshotActor extends PersistentActor with ActorLogging {
  override def persistenceId = "snapshot-persistence-id"

  val eventDeleteOffset = 10 // this is to make sure we have some past events
  var state = State()

  def updateState(event: Event) = state = state.updated(event)

  def numberOfEvents = state.size

  def receiveRecover = LoggingReceive {
    case event: Event => updateState(event)
    case SnapshotOffer(_, snapshot: State) =>
      println(s"offered state: $snapshot")
      state = snapshot
  }

  def receiveCommand = LoggingReceive {
    case Command(data) => persist(Event(data))(updateState)
    case Fail => throw new Exception("killing persistent actor.")
    case ShutDown => context.stop(self)
    case InternalState => println(state)
    case TakeSnapshot => saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>
      println(s"snapshot saved. seqNum:${metadata.sequenceNr}, timeStamp:${metadata.timestamp}")
      deleteMessages(metadata.sequenceNr - eventDeleteOffset)

    case SaveSnapshotFailure(_, reason) => println(s"failed to save snapshot: $reason")
    case DeleteMessagesSuccess(toSequenceNr) => println(s"message deleted till sequenceNumber: $toSequenceNr")
    case DeleteMessagesFailure(reason, toSequenceNr) => println(s"Error in deleting message till sequenceNr $toSequenceNr: $reason")
  }
}

object PersistentSnapshotActorApp extends App {
  val system = ActorSystem("snapshotSystem")
  val persistentActor = system.actorOf(Props[PersistentSnapshotActor], "persistentSnapshotActor")

  for (i <- 1 to 100) {
    persistentActor ! Command(i.toString)
  }
  persistentActor ! InternalState
  persistentActor ! TakeSnapshot
  for (i <- 101 to 150) {
    persistentActor ! Command(i.toString)
  }
  persistentActor ! TakeSnapshot
  persistentActor ! Fail
  persistentActor ! InternalState
  persistentActor ! Fail
  Thread.sleep(1000)
  system.terminate()
}

when I run it, I get

List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
[ERROR] [11/27/2015 16:38:40.684] [snapshotSystem-akka.actor.default-dispatcher-11] [akka://snapshotSystem/user/persistentSnapshotActor] killing persistent actor.
java.lang.Exception: killing persistent actor.
at PersistentSnapshotActor$$anonfun$receiveCommand$1.applyOrElse(PersistentSnapshotActor.scala:44)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at PersistentSnapshotActor.akka$persistence$Eventsourced$$super$aroundReceive(PersistentSnapshotActor.scala:25)
at akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:599)
at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:158)
at PersistentSnapshotActor.aroundReceive(PersistentSnapshotActor.scala:25)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525)
at akka.actor.ActorCell.invoke(ActorCell.scala:494)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

offered state: List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150)
List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150)
[ERROR] [11/27/2015 16:38:40.698] [snapshotSystem-akka.actor.default-dispatcher-8] [akka://snapshotSystem/user/persistentSnapshotActor] killing persistent actor.
java.lang.Exception: killing persistent actor.
at PersistentSnapshotActor$$anonfun$receiveCommand$1.applyOrElse(PersistentSnapshotActor.scala:44)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at PersistentSnapshotActor.akka$persistence$Eventsourced$$super$aroundReceive(PersistentSnapshotActor.scala:25)
at akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:599)
at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:158)
at PersistentSnapshotActor.aroundReceive(PersistentSnapshotActor.scala:25)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525)
at akka.actor.ActorCell.invoke(ActorCell.scala:494)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

offered state: List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150)
snapshot saved. seqNum:100, timeStamp:1448671120594
snapshot saved. seqNum:150, timeStamp:1448671120681
message deleted till sequenceNumber: 90
message deleted till sequenceNumber: 140

The state still tells me that I have 150 events since on every deleteMessage I am not updating up the state.

But I wanted to check if this is recommended way to do it

Thanks
+ Harit Himanshu

Brice Figureau

unread,
Dec 2, 2015, 8:23:14 AM12/2/15
to akka...@googlegroups.com
I thought it would be the good idea to do so, and I was doing this
beforehand.

But if you're using the Cassandra journal as I do, be aware that
deleting past events produces tombstones that are removed only after
gc_grace_seconds and a major compaction happens (or a minor one that
removes tombstones). Until that time, those tombstones will hurt your
reads quite hard from what I've found (and it's more than certainly
related to the static column). This was the issue for me, because I had
several PersistentView that were doing a constant replay of some of the
more prolific PersistentActor, up to the point where the replaying
cassandra query was taking more time than the view refresh rate, and
thus the cassandra journal was sending more and more read requests, up
to the point where the cassandra cluster couldn't follow anymore and was
using more 100% CPU, dropping writes.

I fixed the problem by not doing any deletion at the expense of using
more and more disk space. I will probably do the deletion in a
controlled way (and making sure we can run a user compaction just after).

HTH,
--
Brice Figureau
My Blog: http://www.masterzen.fr/

Patrik Nordwall

unread,
Dec 14, 2015, 1:25:25 PM12/14/15
to akka...@googlegroups.com
Thanks for sharing, Brice. That is an important finding.

--
>>>>>>>>>>      Read the docs: http://akka.io/docs/

>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Michael Zhong

unread,
Dec 19, 2017, 1:52:37 AM12/19/17
to Akka User List
    case SaveSnapshotSuccess(metadata) =>
      println(s"snapshot saved. seqNum:${metadata.sequenceNr}, timeStamp:${metadata.timestamp}")
      deleteMessages(metadata.sequenceNr - eventDeleteOffset)

Isn't this falsey? event is about snapshot, deleteMessages(..) is about snapshot, they are essentially different stores, Or their Nr lays in same Nr-series?


在 2015年12月15日星期二 UTC+8上午2:25:25,Patrik Nordwall写道:
Reply all
Reply to author
Forward
0 new messages