I wrote a small program to verify it is
all ok, sometimes it runs fully and succeeds but most times it get
stuck cause it doesn't get all items that it "put" into the map.
So what it does is it adds 1 million items into this typed actor
map. It then waits till the map's actual size is 1 million ( to
make sure all messages were delivered to the map actor). It then
gets all items from the map and compares if the key contains the
correct value (this always works correctly). Finally it compares
the number of gets/checks it did, but this is where it fails, many
runs get less items than those were put in the map along with some
deadletter logging:
package actors
import akka.actor.{TypedProps, ActorSystem, TypedActor}
import scala.concurrent.ExecutionContext.Implicits.global
import java.util.concurrent.atomic.AtomicInteger
/**
* @author kkougios
* Date: 2014/04/08 10:36 AM
*/
object Main extends App
{
println("Starting up")
val system = ActorSystem("test")
val typed = TypedActor(system)
val service: MapService[Int, String] =
typed.typedActorOf(TypedProps[MapServiceImpl[Int, String]])
val Iterations = 1000000
try {
// populate the map using many threads
(1 to Iterations).par.foreach {
i =>
service(i) = "x" + i
}
// wait till the map is fully populated
while (service.size.get != Iterations) {
println(service.size.get)
Thread.sleep(1000)
}
// now get all items from the map, again using
many threads
val l = (1 to Iterations).par.map {
i =>
service(i).map {
s => (s, "x" + i)
}
}.toList
// and verify key<->value pairs match
println("looking for issues")
val checked = new AtomicInteger
l.foreach {
f =>
f.foreach {
case (actual, expected) =>
checked.incrementAndGet()
if (actual != expected) println("FOUND
ISSUE")
}
}
// also verify we got/checked "Iterations" number of
items
println("waiting for checked")
while (checked.get != Iterations) {
println(checked.get)
Thread.sleep(1000)
}
} finally {
typed.poisonPill(service)
println("shutting down")
system.shutdown()
}
}
running this produces (usually) something like this:
Starting up
[INFO] [04/08/2014 14:04:16.919]
[test-akka.actor.default-dispatcher-11] [akka://test/deadLetters]
Message [java.lang.String] from
Actor[akka://test/user/$a#1286986617] to
Actor[akka://test/deadLetters] was not delivered. [1] dead letters
encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.919]
[test-akka.actor.default-dispatcher-12] [akka://test/deadLetters]
Message [java.lang.String] from
Actor[akka://test/user/$a#1286986617] to
Actor[akka://test/deadLetters] was not delivered. [2] dead letters
encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920]
[test-akka.actor.default-dispatcher-12] [akka://test/deadLetters]
Message [java.lang.String] from
Actor[akka://test/user/$a#1286986617] to
Actor[akka://test/deadLetters] was not delivered. [3] dead letters
encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920]
[test-akka.actor.default-dispatcher-12] [akka://test/deadLetters]
Message [java.lang.String] from
Actor[akka://test/user/$a#1286986617] to
Actor[akka://test/deadLetters] was not delivered. [4] dead letters
encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920]
[test-akka.actor.default-dispatcher-12] [akka://test/deadLetters]
Message [java.lang.String] from
Actor[akka://test/user/$a#1286986617] to
Actor[akka://test/deadLetters] was not delivered. [5] dead letters
encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920]
[test-akka.actor.default-dispatcher-12] [akka://test/deadLetters]
Message [java.lang.String] from
Actor[akka://test/user/$a#1286986617] to
Actor[akka://test/deadLetters] was not delivered. [6] dead letters
encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920]
[test-akka.actor.default-dispatcher-10] [akka://test/deadLetters]
Message [java.lang.String] from
Actor[akka://test/user/$a#1286986617] to
Actor[akka://test/deadLetters] was not delivered. [7] dead letters
encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920]
[test-akka.actor.default-dispatcher-10] [akka://test/deadLetters]
Message [java.lang.String] from
Actor[akka://test/user/$a#1286986617] to
Actor[akka://test/deadLetters] was not delivered. [8] dead letters
encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920]
[test-akka.actor.default-dispatcher-10] [akka://test/deadLetters]
Message [java.lang.String] from
Actor[akka://test/user/$a#1286986617] to
Actor[akka://test/deadLetters] was not delivered. [9] dead letters
encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
[INFO] [04/08/2014 14:04:16.920]
[test-akka.actor.default-dispatcher-10] [akka://test/deadLetters]
Message [java.lang.String] from
Actor[akka://test/user/$a#1286986617] to
Actor[akka://test/deadLetters] was not delivered. [10] dead
letters encountered, no more dead letters will be logged. This
logging can be turned off or adjusted with configuration settings
'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
looking for issues
waiting for checked
970038
977033 <- this should be 1 million which would allow the
program to exit
977033
977033
977033
977033
... goes on forever
Any ideas what might be wrong? Here is the typed actor code:
package actors
import scala.concurrent.Future
/**
* @author kkougios
* Date: 2014/04/08 10:39 AM
*/