Once the message is deleted, I am ideally making this journal bounded (otherwise it will increases forever). In this case, snapshots will be smaller as well.
import akka.actor.{Props, ActorSystem, ActorLogging}
import akka.event.LoggingReceive
import akka.persistence.{SaveSnapshotFailure, SaveSnapshotSuccess, SnapshotOffer, PersistentActor}
case class Command(data: String)
case class Event(data: String)
case object InternalState
case object TakeSnapshot
case object ShutDown
case object Fail
case class State(queue: List[String] = Nil) {
def updated(event: Event): State = copy(event.data :: queue)
def size: Int = queue.length
override def toString: String = queue.reverse.toString
}
class PersistentSnapshotActor extends PersistentActor with ActorLogging {
override def persistenceId = "snapshot-persistence-id"
var state = State()
def updateState(event: Event) = state = state.updated(event)
def numberOfEvents = state.size
def receiveRecover = LoggingReceive {
case event: Event => updateState(event)
case SnapshotOffer(_, snapshot: State) =>
log.debug(s"offered state: $snapshot")
state = snapshot
}
def receiveCommand = LoggingReceive {
case Command(data) => persist(Event(data))(updateState)
case Fail => throw new Exception("killing persistent actor.")
case ShutDown => context.stop(self)
case InternalState => println(state)
case TakeSnapshot => saveSnapshot(state)
delete
case SaveSnapshotSuccess => log.debug("snapshot saved successfully.")
case SaveSnapshotFailure(_, reason) => log.error(s"failed to save snapshot: $reason")
}
}
object PersistentSnapshotActorApp extends App {
val system = ActorSystem("snapshotSystem")
val persistentActor = system.actorOf(Props[PersistentSnapshotActor], "persistentSnapshotActor")
persistentActor ! Command("1")
persistentActor ! Command("2")
persistentActor ! Command("3")
persistentActor ! Command("4")
persistentActor ! TakeSnapshot
persistentActor ! InternalState
persistentActor ! Command("5")
persistentActor ! TakeSnapshot
persistentActor ! Fail
persistentActor ! InternalState
}