Hello there
This is interesting as I was learning about snapshotting and deleting messages in Akka-Persistence. I ran the same class with and without the line
deleteMessages(metadata.sequenceNr - eventDeleteOffset)
in the following code. I am running this with leveldb. The following are the results - first without deleting messages and then after deleting messages
Harits-MacBook-Pro-2:akka101 harit$ sbt clean
[info] Loading project definition from /Users/harit/IdeaProjects/akka101/project
[info] Set current project to akka101 (in build file:/Users/harit/IdeaProjects/akka101/)
[success] Total time: 1 s, completed Nov 29, 2015 7:00:30 AM
Harits-MacBook-Pro-2:akka101 harit$ ls -ltr target/example/snapshots/; ls -ltr target/example/journal/
total 1544
-rw-r--r-- 1 harit staff 789102 Nov 29 07:01 snapshot-snapshot-persistence-id-100000-1448809284249
total 39808
-rw-r--r-- 1 harit staff 0 Nov 29 07:00 LOCK
-rw-r--r-- 1 harit staff 16 Nov 29 07:00 CURRENT
-rw-r--r-- 1 harit staff 3780054 Nov 29 07:01 000005.sst
-rw-r--r-- 1 harit staff 3782685 Nov 29 07:01 000007.sst
-rw-r--r-- 1 harit staff 3782685 Nov 29 07:01 000009.sst
-rw-r--r-- 1 harit staff 3782686 Nov 29 07:01 000011.sst
-rw-r--r-- 1 harit staff 1048576 Nov 29 07:01 MANIFEST-000002
-rw-r--r-- 1 harit staff 4194304 Nov 29 07:01 000010.log
Harits-MacBook-Pro-2:akka101 harit$ sbt clean
[info] Loading project definition from /Users/harit/IdeaProjects/akka101/project
[info] Set current project to akka101 (in build file:/Users/harit/IdeaProjects/akka101/)
[success] Total time: 0 s, completed Nov 29, 2015 7:02:03 AM
Harits-MacBook-Pro-2:akka101 harit$ ls -ltr target/example/snapshots/; ls -ltr target/example/journal/
total 1544
-rw-r--r-- 1 harit staff 789102 Nov 29 07:02 snapshot-snapshot-persistence-id-100000-1448809373502
total 40224
-rw-r--r-- 1 harit staff 0 Nov 29 07:02 LOCK
-rw-r--r-- 1 harit staff 16 Nov 29 07:02 CURRENT
-rw-r--r-- 1 harit staff 3780054 Nov 29 07:02 000005.sst
-rw-r--r-- 1 harit staff 3782685 Nov 29 07:02 000007.sst
-rw-r--r-- 1 harit staff 3782685 Nov 29 07:02 000009.sst
-rw-r--r-- 1 harit staff 3782686 Nov 29 07:02 000011.sst
-rw-r--r-- 1 harit staff 378 Nov 29 07:03 MANIFEST-000002
-rw-r--r-- 1 harit staff 5451539 Nov 29 07:03 000010.log
Harits-MacBook-Pro-2:akka101 harit$
To my surprise, I found out that the file sizes are almost similar, even after deleting the most of the data
snapshot saved. seqNum:100000, timeStamp:1448809373502
message deleted till sequenceNumber: 99990
I may be making mistake since I am new to this. I wanted to learn about
- What is that I am doing wrong?
- What is the best way to test if deleteMessages is actually working rather than checking file sizes manually
Thanks everyone.
My complete code is
import akka.actor.{Props, ActorSystem, ActorLogging}
import akka.event.LoggingReceive
import akka.persistence._
case class Command(data: String)
case class Event(data: String)
case object InternalState
case object TakeSnapshot
case object ShutDown
case object Fail
case class State(queue: List[String] = Nil) {
def updated(event: Event): State = copy(event.data :: queue)
def size: Int = queue.length
override def toString: String = queue.reverse.toString
}
class PersistentSnapshotActor extends PersistentActor with ActorLogging {
override def persistenceId = "snapshot-persistence-id"
val eventDeleteOffset = 10 // this is to make sure we have some past events
var state = State()
def updateState(event: Event) = state = state.updated(event)
def numberOfEvents = state.size
def receiveRecover = LoggingReceive {
case event: Event => updateState(event)
case SnapshotOffer(_, snapshot: State) =>
println(s"offered state: $snapshot")
state = snapshot
}
def receiveCommand = LoggingReceive {
case Command(data) => persist(Event(data))(updateState)
case Fail => throw new Exception("killing persistent actor.")
case ShutDown => context.stop(self)
case InternalState => println(state)
case TakeSnapshot => saveSnapshot(state)
case SaveSnapshotSuccess(metadata) =>
println(s"snapshot saved. seqNum:${metadata.sequenceNr}, timeStamp:${metadata.timestamp}")
deleteMessages(metadata.sequenceNr - eventDeleteOffset)
case SaveSnapshotFailure(_, reason) => println(s"failed to save snapshot: $reason")
case DeleteMessagesSuccess(toSequenceNr) => println(s"message deleted till sequenceNumber: $toSequenceNr")
case DeleteMessagesFailure(reason, toSequenceNr) => println(s"Error in deleting message till sequenceNr $toSequenceNr: $reason")
}
}
object PersistentSnapshotActorApp extends App {
val system = ActorSystem("snapshotSystem")
val persistentActor = system.actorOf(Props[PersistentSnapshotActor], "persistentSnapshotActor")
for (i <- 1 to 100000) {
persistentActor ! Command(i.toString)
}
persistentActor ! TakeSnapshot
persistentActor ! InternalState
Thread.sleep(60000)
system.terminate()
}
My configuration looks like
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
# DO NOT USE THIS IN PRODUCTION !!!
akka.persistence.journal.leveldb.native = false