Akka-Persistence deleteMessages not deleting messages?

222 views
Skip to first unread message

Harit Himanshu

unread,
Nov 29, 2015, 10:10:21 AM11/29/15
to Akka User List
Hello there

This is interesting as I was learning about snapshotting and deleting messages in Akka-Persistence. I ran the same class with and without the line

deleteMessages(metadata.sequenceNr - eventDeleteOffset)

in the following code. I am running this with leveldb. The following are the results - first without deleting messages and then after deleting messages

Harits-MacBook-Pro-2:akka101 harit$ sbt clean
[info] Loading project definition from /Users/harit/IdeaProjects/akka101/project
[info] Set current project to akka101 (in build file:/Users/harit/IdeaProjects/akka101/)
[success] Total time: 1 s, completed Nov 29, 2015 7:00:30 AM
Harits-MacBook-Pro-2:akka101 harit$ ls -ltr target/example/snapshots/; ls -ltr target/example/journal/
total 1544
-rw-r--r--  1 harit  staff  789102 Nov 29 07:01 snapshot-snapshot-persistence-id-100000-1448809284249
total 39808
-rw-r--r--  1 harit  staff        0 Nov 29 07:00 LOCK
-rw-r--r--  1 harit  staff       16 Nov 29 07:00 CURRENT
-rw-r--r--  1 harit  staff  3780054 Nov 29 07:01 000005.sst
-rw-r--r--  1 harit  staff  3782685 Nov 29 07:01 000007.sst
-rw-r--r--  1 harit  staff  3782685 Nov 29 07:01 000009.sst
-rw-r--r--  1 harit  staff  3782686 Nov 29 07:01 000011.sst
-rw-r--r--  1 harit  staff  1048576 Nov 29 07:01 MANIFEST-000002
-rw-r--r--  1 harit  staff  4194304 Nov 29 07:01 000010.log

Harits-MacBook-Pro-2:akka101 harit$ sbt clean
[info] Loading project definition from /Users/harit/IdeaProjects/akka101/project
[info] Set current project to akka101 (in build file:/Users/harit/IdeaProjects/akka101/)
[success] Total time: 0 s, completed Nov 29, 2015 7:02:03 AM

Harits-MacBook-Pro-2:akka101 harit$ ls -ltr target/example/snapshots/; ls -ltr target/example/journal/
total 1544
-rw-r--r--  1 harit  staff  789102 Nov 29 07:02 snapshot-snapshot-persistence-id-100000-1448809373502
total 40224
-rw-r--r--  1 harit  staff        0 Nov 29 07:02 LOCK
-rw-r--r--  1 harit  staff       16 Nov 29 07:02 CURRENT
-rw-r--r--  1 harit  staff  3780054 Nov 29 07:02 000005.sst
-rw-r--r--  1 harit  staff  3782685 Nov 29 07:02 000007.sst
-rw-r--r--  1 harit  staff  3782685 Nov 29 07:02 000009.sst
-rw-r--r--  1 harit  staff  3782686 Nov 29 07:02 000011.sst
-rw-r--r--  1 harit  staff      378 Nov 29 07:03 MANIFEST-000002
-rw-r--r--  1 harit  staff  5451539 Nov 29 07:03 000010.log
Harits-MacBook-Pro-2:akka101 harit$ 


To my surprise, I found out that the file sizes are almost similar, even after deleting the most of the data

snapshot saved. seqNum:100000, timeStamp:1448809373502
message deleted till sequenceNumber: 99990

I may be making mistake since I am new to this. I wanted to learn about
  • What is that I am doing wrong?
  • What is the best way to test if deleteMessages is actually working rather than checking file sizes manually
Thanks everyone.

My complete code is

import akka.actor.{Props, ActorSystem, ActorLogging}
import akka.event.LoggingReceive
import akka.persistence._

case class Command(data: String)

case class Event(data: String)

case object InternalState

case object TakeSnapshot

case object ShutDown

case object Fail

case class State(queue: List[String] = Nil) {
  def updated(event: Event): State = copy(event.data :: queue)

  def size: Int = queue.length

  override def toString: String = queue.reverse.toString
}

class PersistentSnapshotActor extends PersistentActor with ActorLogging {
  override def persistenceId = "snapshot-persistence-id"

  val eventDeleteOffset = 10 // this is to make sure we have some past events
  var state = State()

  def updateState(event: Event) = state = state.updated(event)

  def numberOfEvents = state.size

  def receiveRecover = LoggingReceive {
    case event: Event => updateState(event)
    case SnapshotOffer(_, snapshot: State) =>
      println(s"offered state: $snapshot")
      state = snapshot
  }

  def receiveCommand = LoggingReceive {
    case Command(data) => persist(Event(data))(updateState)
    case Fail => throw new Exception("killing persistent actor.")
    case ShutDown => context.stop(self)
    case InternalState => println(state)
    case TakeSnapshot => saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>
      println(s"snapshot saved. seqNum:${metadata.sequenceNr}, timeStamp:${metadata.timestamp}")
      deleteMessages(metadata.sequenceNr - eventDeleteOffset)

    case SaveSnapshotFailure(_, reason) => println(s"failed to save snapshot: $reason")
    case DeleteMessagesSuccess(toSequenceNr) => println(s"message deleted till sequenceNumber: $toSequenceNr")
    case DeleteMessagesFailure(reason, toSequenceNr) => println(s"Error in deleting message till sequenceNr $toSequenceNr: $reason")
  }
}

object PersistentSnapshotActorApp extends App {
  val system = ActorSystem("snapshotSystem")
  val persistentActor = system.actorOf(Props[PersistentSnapshotActor], "persistentSnapshotActor")

  for (i <- 1 to 100000) {
    persistentActor ! Command(i.toString)
  }
  persistentActor ! TakeSnapshot
  persistentActor ! InternalState
  Thread.sleep(60000)
  system.terminate()
}

My configuration looks like

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"

# DO NOT USE THIS IN PRODUCTION !!!
akka.persistence.journal.leveldb.native = false

Patrik Nordwall

unread,
Dec 1, 2015, 1:42:19 PM12/1/15
to Akka User List
Leveldb doesn't delete immediately. It will be done by a delayed (sometimes long) compaction process.
/Patrik
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Harit Himanshu

unread,
Dec 1, 2015, 10:48:37 PM12/1/15
to akka...@googlegroups.com
hmm, interesting, I will probably test it again in sometime and update the results here. Thanks a lot Patrik

You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/aRFIzlw8X80/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Patrik Nordwall

unread,
Dec 2, 2015, 3:55:15 AM12/2/15
to akka...@googlegroups.com
Someone investigated it some time ago. You might be interested in this: https://github.com/akka/akka/issues/13962

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Harit Himanshu

unread,
Dec 2, 2015, 8:49:40 AM12/2/15
to akka...@googlegroups.com
very interesting. I will look into this Patrik. Thanks a lot for your help
+ Harit
Reply all
Reply to author
Forward
0 new messages