Error when using Future onComplete

287 views
Skip to first unread message

Johannes Hollerer

unread,
Feb 21, 2012, 6:00:49 AM2/21/12
to akka...@googlegroups.com
I am running the following code (on the actual akka-actor-2.0 Snapshot), to get a result to another part of my app - which should get the result of a calculation (async).

Timeout timeout = new Timeout(Duration.create(5, java.util.concurrent.TimeUnit.MINUTES));
Future<Object> ask = Patterns.ask(master, new Calculate(),timeout);
ask.onComplete(new OnComplete(){
@Override
public void onComplete(Throwable arg0, Object arg1) {
  if (null != arg0){
System.out.println(arg0)
  }else{
System.out.println("Asynchron Result:"+arg1.toString());
  }
               }
});


I can access the result and everything seems to work fine, but on the console i get the following error:

[ERROR] [02/21/2012 11:40:49.881] [MySystem-akka.actor.default-dispatcher-7] [TaskInvocation] Promise already completed: akka.dispatch.DefaultPromise@32b8f675 tried to complete with Left(akka.pattern.AskTimeoutException)
java.lang.IllegalStateException: Promise already completed: akka.dispatch.DefaultPromise@32b8f675 tried to complete with Left(akka.pattern.AskTimeoutException)
at akka.dispatch.Promise$class.complete(Future.scala:736)
at akka.dispatch.DefaultPromise.complete(Future.scala:801)
at akka.dispatch.Promise$class.failure(Future.scala:748)
at akka.dispatch.DefaultPromise.failure(Future.scala:801)
at akka.pattern.AskSupport$$anonfun$1.apply$mcV$sp(AskSupport.scala:194)
at akka.actor.DefaultScheduler$$anon$6$$anon$7.run(Scheduler.scala:183)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:82)
at akka.jsr166y.ForkJoinTask$AdaptedRunnable.exec(ForkJoinTask.java:1400)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:268)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:929)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1303)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)


Am i missing something - or done it wrong ??



√iktor Ҡlang

unread,
Feb 21, 2012, 6:17:56 AM2/21/12
to akka...@googlegroups.com
Code works, but it's just overly cautious, I've fixed it in the current master.
Thanks for reporting.

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To view this discussion on the web visit https://groups.google.com/d/msg/akka-user/-/y5I4UZIurMoJ.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.



--
Viktor Klang

Akka Tech Lead
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

david crosson

unread,
Feb 26, 2012, 5:29:39 AM2/26/12
to akka...@googlegroups.com
I've the same kind of exceptions raised but not systematically. I'm not sure it is a related to the one you got. I'm using AKKA 2.0-RC2 and scala API. I'm testing (and learning to use) akka, right now I'm looking for the best way to stop an application, but only once everything has been completed.

12 11:08:24.613] [run-main] [ActorSystem(DummySystem)] exception while executing timer task
java.lang.IllegalStateException: Promise already completed: akka.dispatch.DefaultPromise@59fdef61 tried to complete with Right(Done)
at akka.dispatch.Promise$class.complete(Future.scala:741)
at akka.dispatch.DefaultPromise.complete(Future.scala:806)
at akka.dispatch.Promise$class.success(Future.scala:747)
at akka.dispatch.DefaultPromise.success(Future.scala:806)
at akka.pattern.AskSupport$PromiseActorRef.$bang(AskSupport.scala:171)
at akka.actor.DefaultScheduler$$anon$5.run(Scheduler.scala:174)
at akka.actor.DefaultScheduler.akka$actor$DefaultScheduler$$execDirectly(Scheduler.scala:200)
at akka.actor.DefaultScheduler$$anonfun$close$1.apply(Scheduler.scala:208)
at akka.actor.DefaultScheduler$$anonfun$close$1.apply(Scheduler.scala:208)
at scala.collection.Iterator$class.foreach(Iterator.scala:660)
at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
at scala.collection.JavaConversions$JSetWrapper.foreach(JavaConversions.scala:661)
at akka.actor.DefaultScheduler.close(Scheduler.scala:208)
at dummy.Dummy$.main(Dummy.scala:59)


The example code is the following : 

package dummy

import akka.actor._
import akka.util.duration._
import akka.util.Timeout
import akka.dispatch.Await
import akka.pattern.ask
import akka.pattern.gracefulStop
import akka.dispatch.Future
import java.io.Closeable

sealed trait MyMessage
case class DoItMessage(cmd:String) extends MyMessage
case class EndMessage extends MyMessage

object Dummy {
  def main(args:Array[String]) {
import com.typesafe.config.ConfigFactory
    implicit val system=ActorSystem("DummySystem",ConfigFactory.load.getConfig("dummy"))
    
    val simu = system.actorOf(
        Props(new MySimulator(system))
        .withDispatcher("simu-dispatcher"),
        name="simulator")
    
    import akka.routing.RoundRobinRouter
    val processor = system.actorOf(
        Props(new MyMessageProcessor(simu))
        .withDispatcher("workers-dispatcher")
        .withRouter(RoundRobinRouter(10)),
        name="default")

    for(i <- 1 to 100) {
      processor ! DoItMessage("Do the job with ID#%d now".format(i))
    }
    //print("All jobs sent")
try {  
 
 val stoppingProcessor: Future[Boolean] = gracefulStop(processor, 10 seconds)
 Await.result(stoppingProcessor, 11 seconds)

 system.scheduler match { case x:Closeable => x.close() case _ => }
 
 val stoppingSimulator: Future[Boolean] = gracefulStop(simu, 10 seconds)
 Await.result(stoppingSimulator, 11 seconds)  
 
 system.shutdown()
 println("Finished")
} catch {
 case e: ActorTimeoutException => println("the actor wasn't stopped within 10 minutes")
 case e: Exception => //e.printStackTrace()
}
  }
}


class MyMessageProcessor(simu:ActorRef) extends Actor {
  def receive = {
    case msg:DoItMessage =>
      implicit val timeout = Timeout(5 minutes)
      val future = simu ? msg
      future.onSuccess { 
        case msg:String => print("O")
      }
      print("o")
  }
}

class MySimulator(system:ActorSystem) extends Actor {
  def receive = {
    case _:DoItMessage => system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done") // Fake processing
  }
}


The configuration file : 
dummy {
  akka {
    loglevel = WARNING
    actor {
      default-dispatcher {
      }
    }
    scheduler {
      tick-duration = 100ms
      ticks-per-wheel = 1000
    }
  }
  
  simu-dispatcher {
    type = Dispatcher
    mailbox-capacity = 100000
  }
  
  workers-dispatcher {
    mailbox-capacity = 10
    executor = "fork-join-executor"
    fork-join-executor {
      parallelism-min = 0
      parallelism-max = 6000
      parallelism-factor = 10.0      
    }
  }
}

The SBT buit file :
name := "AkkaSandbox"

version := "0.1"

scalaVersion := "2.9.1"

libraryDependencies += "org.scalatest" %% "scalatest" % "1.6.1" % "test"

libraryDependencies += "junit" % "junit" % "4.10" % "test"

libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0-RC2"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"


I think that I would need a coordinated shutdown; may be something that shutdown everything but only once that there's nothing more to do within processor actors AND simulator actor, AND also within the scheduler. About java.lang.IllegalStateException: Promise already completed, the close on scheduler should have wait for all scheduled operations to be done, and I don't see why some futures look like already completed... or may be I missed something.
Thanks for reporting.

To unsubscribe from this group, send email to akka-user+unsubscribe@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.



--
Viktor Klang

Akka Tech Lead
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

Thanks for reporting.

To unsubscribe from this group, send email to akka-user+unsubscribe@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.



--
Viktor Klang

Akka Tech Lead
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

Thanks for reporting.

To unsubscribe from this group, send email to akka-user+unsubscribe@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.



--
Viktor Klang

Akka Tech Lead
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

Thanks for reporting.

To unsubscribe from this group, send email to akka-user+unsubscribe@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

√iktor Ҡlang

unread,
Feb 26, 2012, 6:36:47 AM2/26/12
to akka...@googlegroups.com
Why are you doing this: system.scheduler match { case x:Closeable => x.close() case _ => }

To view this discussion on the web visit https://groups.google.com/d/msg/akka-user/-/2z-i0ST1-VIJ.

To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

david crosson

unread,
Feb 26, 2012, 9:07:48 AM2/26/12
to akka...@googlegroups.com
Because of what is said in : http://akka.io/api/akka/2.0-RC2/#akka.actor.Scheduler : 
" This one needs one special behavior: if Closeable, it MUST execute all outstanding tasks upon .close() in order to properly shutdown all dispatchers."

I've understood this as if I call close on the scheduler (DefaultScheduler implements Closeable), it will wait that all scheduleOnce tasks to be executed.

If I remove "system.scheduler match { case x:Closeable => x.close() case _ => }",  I get systematic "java.lang.IllegalStateException: Promise already completed: akka.dispatch.DefaultPromise@40c11557 tried to complete with Right(Done)"

If I remove "system.shutdown()" this time everything is processed, but although I stopped both simu, and processor, the application doesn't exit naturally. If I keep system.shutdown, the application exists, but of course everything is not processed, as shutdown means stop.

In fact, I'm trying to find a way to have my actor system shutdown properly (a kind of gracefullStop on ActorSystem), and only once there's no more messages, futures, or "scheduleOnce" tasks pending...  I'm going to try to call shutdown through ActorSystem registerTermination; may be this is the solution.

Code is available here : git://github.com/dacr/akka-sandbox.git

√iktor Ҡlang

unread,
Feb 26, 2012, 10:18:20 AM2/26/12
to akka...@googlegroups.com
On Sun, Feb 26, 2012 at 3:07 PM, david crosson <crosso...@gmail.com> wrote:
Because of what is said in : http://akka.io/api/akka/2.0-RC2/#akka.actor.Scheduler : 
" This one needs one special behavior: if Closeable, it MUST execute all outstanding tasks upon .close() in order to properly shutdown all dispatchers."

I've understood this as if I call close on the scheduler (DefaultScheduler implements Closeable), it will wait that all scheduleOnce tasks to be executed.

If I remove "system.scheduler match { case x:Closeable => x.close() case _ => }",  I get systematic "java.lang.IllegalStateException: Promise already completed: akka.dispatch.DefaultPromise@40c11557 tried to complete with Right(Done)"

If I remove "system.shutdown()" this time everything is processed, but although I stopped both simu, and processor, the application doesn't exit naturally. If I keep system.shutdown, the application exists, but of course everything is not processed, as shutdown means stop.

In fact, I'm trying to find a way to have my actor system shutdown properly (a kind of gracefullStop on ActorSystem), and only once there's no more messages, futures, or "scheduleOnce" tasks pending...

That's a bit of nonsense though, as you don't have 1 timeline but one timeline per thread, which means that "there's no" is some sort of snapshot across all timelines.

 
To view this discussion on the web visit https://groups.google.com/d/msg/akka-user/-/q2f5hRiuDVgJ.

To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

√iktor Ҡlang

unread,
Feb 26, 2012, 10:19:41 AM2/26/12
to akka...@googlegroups.com


2012/2/26 √iktor Ҡlang <viktor...@gmail.com>



On Sun, Feb 26, 2012 at 3:07 PM, david crosson <crosso...@gmail.com> wrote:
Because of what is said in : http://akka.io/api/akka/2.0-RC2/#akka.actor.Scheduler : 
" This one needs one special behavior: if Closeable, it MUST execute all outstanding tasks upon .close() in order to properly shutdown all dispatchers."

I've understood this as if I call close on the scheduler (DefaultScheduler implements Closeable), it will wait that all scheduleOnce tasks to be executed.

If I remove "system.scheduler match { case x:Closeable => x.close() case _ => }",  I get systematic "java.lang.IllegalStateException: Promise already completed: akka.dispatch.DefaultPromise@40c11557 tried to complete with Right(Done)"

If I remove "system.shutdown()" this time everything is processed, but although I stopped both simu, and processor, the application doesn't exit naturally. If I keep system.shutdown, the application exists, but of course everything is not processed, as shutdown means stop.

In fact, I'm trying to find a way to have my actor system shutdown properly (a kind of gracefullStop on ActorSystem), and only once there's no more messages, futures, or "scheduleOnce" tasks pending...

That's a bit of nonsense though, as you don't have 1 timeline but one timeline per thread, which means that "there's no" is some sort of snapshot across all timelines.


So, if you really need this. You need to keep track of "everything" so you know when "everything's done".

What's the use-case btw?

Cheers,

david crosson

unread,
Feb 26, 2012, 3:17:24 PM2/26/12
to akka...@googlegroups.com
ok thanks for your response, so I've added an application lifecycle actor which will stops the actor system once everything is done.

class ApplicationManager(system:ActorSystem, howmanyjob:Int) extends Actor {
  var count=0
  def receive = {
    case DoneMessage => 
      count+=1
      if (count%10000==0) println(count)
      if (count == howmanyjob) system.shutdown() 
  }
}

and you're right it is preferrable that the application manages itself its lifecycle. Everything is running fine now, and performances are very good (git://github.com/dacr/akka-sandbox.git). 1m37s to process 10 million messages, each message requiring 1s to be processed (asynchronous behavior is simulated). Application exists normally, and no more IllegalStateException.

regards,
David.

√iktor Ҡlang

unread,
Feb 26, 2012, 3:59:32 PM2/26/12
to akka...@googlegroups.com
On Sun, Feb 26, 2012 at 9:17 PM, david crosson <crosso...@gmail.com> wrote:
ok thanks for your response, so I've added an application lifecycle actor which will stops the actor system once everything is done.

class ApplicationManager(system:ActorSystem, howmanyjob:Int) extends Actor {
  var count=0
  def receive = {
    case DoneMessage => 
      count+=1
      if (count%10000==0) println(count)
      if (count == howmanyjob) system.shutdown() 
  }
}

and you're right it is preferrable that the application manages itself its lifecycle. Everything is running fine now, and performances are very good (git://github.com/dacr/akka-sandbox.git). 1m37s to process 10 million messages, each message requiring 1s to be processed (asynchronous behavior is simulated). Application exists normally, and no more IllegalStateException.

Excellent!
Happy hAkking

Cheers,
 
To view this discussion on the web visit https://groups.google.com/d/msg/akka-user/-/7YyxK-1DNAYJ.

To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.

For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.
Reply all
Reply to author
Forward
0 new messages