12 11:08:24.613] [run-main] [ActorSystem(DummySystem)] exception while executing timer task
java.lang.IllegalStateException: Promise already completed: akka.dispatch.DefaultPromise@59fdef61 tried to complete with Right(Done)
at akka.dispatch.Promise$class.complete(Future.scala:741)
at akka.dispatch.DefaultPromise.complete(Future.scala:806)
at akka.dispatch.Promise$class.success(Future.scala:747)
at akka.dispatch.DefaultPromise.success(Future.scala:806)
at akka.pattern.AskSupport$PromiseActorRef.$bang(AskSupport.scala:171)
at akka.actor.DefaultScheduler$$anon$5.run(Scheduler.scala:174)
at akka.actor.DefaultScheduler.akka$actor$DefaultScheduler$$execDirectly(Scheduler.scala:200)
at akka.actor.DefaultScheduler$$anonfun$close$1.apply(Scheduler.scala:208)
at akka.actor.DefaultScheduler$$anonfun$close$1.apply(Scheduler.scala:208)
at scala.collection.Iterator$class.foreach(Iterator.scala:660)
at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
at scala.collection.JavaConversions$JSetWrapper.foreach(JavaConversions.scala:661)
at akka.actor.DefaultScheduler.close(Scheduler.scala:208)
at dummy.Dummy$.main(Dummy.scala:59)
The example code is the following :
package dummy
import akka.actor._
import akka.util.duration._
import akka.util.Timeout
import akka.dispatch.Await
import akka.pattern.ask
import akka.pattern.gracefulStop
import akka.dispatch.Future
import java.io.Closeable
sealed trait MyMessage
case class DoItMessage(cmd:String) extends MyMessage
case class EndMessage extends MyMessage
object Dummy {
def main(args:Array[String]) {
import com.typesafe.config.ConfigFactory
implicit val system=ActorSystem("DummySystem",ConfigFactory.load.getConfig("dummy"))
val simu = system.actorOf(
Props(new MySimulator(system))
.withDispatcher("simu-dispatcher"),
name="simulator")
import akka.routing.RoundRobinRouter
val processor = system.actorOf(
Props(new MyMessageProcessor(simu))
.withDispatcher("workers-dispatcher")
.withRouter(RoundRobinRouter(10)),
name="default")
for(i <- 1 to 100) {
processor ! DoItMessage("Do the job with ID#%d now".format(i))
}
//print("All jobs sent")
try {
val stoppingProcessor: Future[Boolean] = gracefulStop(processor, 10 seconds)
Await.result(stoppingProcessor, 11 seconds)
system.scheduler match { case x:Closeable => x.close() case _ => }
val stoppingSimulator: Future[Boolean] = gracefulStop(simu, 10 seconds)
Await.result(stoppingSimulator, 11 seconds)
system.shutdown()
println("Finished")
} catch {
case e: ActorTimeoutException => println("the actor wasn't stopped within 10 minutes")
case e: Exception => //e.printStackTrace()
}
}
}
class MyMessageProcessor(simu:ActorRef) extends Actor {
def receive = {
case msg:DoItMessage =>
implicit val timeout = Timeout(5 minutes)
val future = simu ? msg
future.onSuccess {
case msg:String => print("O")
}
print("o")
}
}
class MySimulator(system:ActorSystem) extends Actor {
def receive = {
case _:DoItMessage => system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done") // Fake processing
}
}
The configuration file :
dummy {
akka {
loglevel = WARNING
actor {
default-dispatcher {
}
}
scheduler {
tick-duration = 100ms
ticks-per-wheel = 1000
}
}
simu-dispatcher {
type = Dispatcher
mailbox-capacity = 100000
}
workers-dispatcher {
mailbox-capacity = 10
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 0
parallelism-max = 6000
parallelism-factor = 10.0
}
}
}
The SBT buit file :
name := "AkkaSandbox"
version := "0.1"
scalaVersion := "2.9.1"
libraryDependencies += "org.scalatest" %% "scalatest" % "1.6.1" % "test"
libraryDependencies += "junit" % "junit" % "4.10" % "test"
libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0-RC2"
I think that I would need a coordinated shutdown; may be something that shutdown everything but only once that there's nothing more to do within processor actors AND simulator actor, AND also within the scheduler. About java.lang.IllegalStateException: Promise already completed, the close on scheduler should have wait for all scheduled operations to be done, and I don't see why some futures look like already completed... or may be I missed something.