Scala: TimeoutException on Await.result

1,326 views
Skip to first unread message

glanne

unread,
May 6, 2012, 7:18:54 AM5/6/12
to akka...@googlegroups.com
I'm trying to adjust my code so that I can run it on a cluster. For that, I need my main thread to wait until my Actors have finished what they are doing. The Actors are supposed to each work on one element in an ArrayBuffer and the altered ArrayBuffer is what my main thread is supposed to use after my Actors have finished. If I get it right, Await.result is what I want to use to achieve that. I've implemented that according to the "Use with Actors" section in the Futures documentation for Akka: http://doc.akka.io/docs/akka/2.0.1/scala/futures.html. However, the Finished message, which my Future waits for, does not seem to get delivered, since my code just throws a TimeoutException.

I do realize that I still might not completely have understood how this all works, but I can't seem to find an example that completely clarifies things for me. For example in the Future documentation there is a line "val future = actor ? msg // enabled by the “ask” import" but msg is not defined anywhere. So I thought I have to replace this with an actual Message (as I did). Is that correct?

This is what my code looks like (the values in Parameter are just Integers, in this case NODES = 4, L = 8):

object Configuration {
       
    sealed trait Message
    case object Broadcast extends Message
    case object Finished extends Message
    case class Work(inds:ArrayBuffer[Individual]) extends Message
    case class Result(result:ArrayBuffer[Individual]) extends Message
    case class NewIndividuals(inds:ArrayBuffer[Individual])
   
    class Worker extends Actor {
       
        def receive = {
            case Work(inds) =>
                sender ! Result(geneticOperation(inds)) // perform the work
        }
       
        private def geneticOperation(inds:ArrayBuffer[Individual]) = {
            for(child <- inds) {
                child.mutate
                child.evaluate
            }
            inds
        }
   
    }
   
    class Master(nrOfWorkers:Int, nrOfMessages:Int, oldInds:ArrayBuffer[Individual], listener:ActorRef)
        extends Actor {
        
        var inds = new ArrayBuffer[Individual]
        var nrOfResults:Int = _
        var origin:ActorRef = _
        val start:Long = System.currentTimeMillis
        
        val workerRouter = context.actorOf(
                Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
        
        def receive = {
            case Broadcast =>
                  origin = sender
                var workOn = new ArrayBuffer[Individual]   
                for(i <- 0 until nrOfMessages)
                    workOn += oldInds(i)
                oldInds.trimStart(nrOfMessages)
                workerRouter ! Work(workOn)
            case Result(newInds) =>
                inds ++= newInds
                if(nrOfResults == nrOfMessages)
                    listener ! NewIndividuals(inds)
            case Finished =>
                println("Listener has finished")
                origin ! Finished
                context.stop(self)
        }
        
    }
   
    class Listener extends Actor {
        def receive = {
            case NewIndividuals(inds) =>
                Configuration.environment(inds)
                sender ! Finished
                context.system.shutdown()               
        }
    }
       
    var d = Parameter.N_DROPLET_TYPE-2
   
      /** Apply genetic operators (recombination, mutation) on the population
       * during the current generation.
       *  @param g Current generation.
       */
    def apply(g:Int) {
        var children = Population.recombinate    // create offspring
        calculate(Parameter.NODES,(Parameter.L / Parameter.NODES),children)
    }
     
    def calculate(nrOfWorkers:Int, nrOfMessages:Int, inds:ArrayBuffer[Individual]) {
        val system = ActorSystem("GeneticOperations")
        val listener = system.actorOf(Props[Listener], name="listener")
        val master = system.actorOf(Props(new Master(
                nrOfWorkers,nrOfMessages,inds,listener)),
                name="master")
        master ! Broadcast
        implicit val timeout = Timeout(500 seconds)
        val future = master ? Finished
        println("Waiting for Finished message")
        val result = Await.result(future, timeout.duration).asInstanceOf[String]
        println("Got Finished message")
    }
   
    private def environment(children:ArrayBuffer[Individual]) {
          if(Parameter.NEUTRAL) {        // if neutral evolution no selection pressure
              Population.neutralEvolution(children)
              writeDiversity
          }
          else Population.select(children)
          writeAllToEvoPath
    }
    case object Broadcast extends Message
    case object Finished extends Message
    case class Work(inds:ArrayBuffer[Individual]) extends Message
    case class Result(result:ArrayBuffer[Individual]) extends Message
    case class NewIndividuals(inds:ArrayBuffer[Individual])
   
    class Worker extends Actor {
       
        def receive = {
            case Work(inds) =>
                sender ! Result(geneticOperation(inds)) // perform the work
        }
       
        private def geneticOperation(inds:ArrayBuffer[Individual]) = {
            for(child <- inds) {
                child.mutate
                child.evaluate
            }
            inds
        }
   
    }
   
    class Master(nrOfWorkers:Int, nrOfMessages:Int, oldInds:ArrayBuffer[Individual], listener:ActorRef)
        extends Actor {
        
        var inds = new ArrayBuffer[Individual]
        var nrOfResults:Int = _
        var origin:ActorRef = _
        val start:Long = System.currentTimeMillis
        
        val workerRouter = context.actorOf(
                Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
        
        def receive = {
            case Broadcast =>
                  origin = sender
                var workOn = new ArrayBuffer[Individual]   
                for(i <- 0 until nrOfMessages)
                    workOn += oldInds(i)
                oldInds.trimStart(nrOfMessages)
                workerRouter ! Work(workOn)
            case Result(newInds) =>
                inds ++= newInds
                if(nrOfResults == nrOfMessages)
                    listener ! NewIndividuals(inds)
            case Finished =>
                println("Listener has finished")
                origin ! Finished
                context.stop(self)
        }
        
    }
    object Configuration {
   
    sealed trait Message
    case object Broadcast extends Message
    case object Finished extends Message
    case class Work(inds:ArrayBuffer[Individual]) extends Message
    case class Result(result:ArrayBuffer[Individual]) extends Message
    case class NewIndividuals(inds:ArrayBuffer[Individual])
   
    class Worker extends Actor {
       
        def receive = {
            case Work(inds) =>
                sender ! Result(geneticOperation(inds)) // perform the work
        }
       
        private def geneticOperation(inds:ArrayBuffer[Individual]) = {
            for(child <- inds) {
                child.mutate
                child.evaluate
            }
            inds
        }
   
    }
   
    class Master(nrOfWorkers:Int, nrOfMessages:Int, oldInds:ArrayBuffer[Individual], listener:ActorRef)
        extends Actor {
        
        var inds = new ArrayBuffer[Individual]
        var nrOfResults:Int = _
        var origin:ActorRef = _
        val start:Long = System.currentTimeMillis
        
        val workerRouter = context.actorOf(
                Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
        
        def receive = {
            case Broadcast =>
                  origin = sender
                var workOn = new ArrayBuffer[Individual]   
                for(i <- 0 until nrOfMessages)
                    workOn += oldInds(i)
                oldInds.trimStart(nrOfMessages)
                workerRouter ! Work(workOn)
            case Result(newInds) =>
                inds ++= newInds
                if(nrOfResults == nrOfMessages)
                    listener ! NewIndividuals(inds)
            case Finished =>
                println("Listener has finished")
                origin ! Finished
                context.stop(self)
        }
        
    }
   
    class Listener extends Actor {
        def receive = {
            case NewIndividuals(inds) =>
                Configuration.environment(inds)
                sender ! Finished
                context.system.shutdown()               
        }
    }
       
    var d = Parameter.N_DROPLET_TYPE-2
   
      /** Apply genetic operators (recombination, mutation) on the population
       * during the current generation.
       *  @param g Current generation.
       */
    def apply(g:Int) {
        var children = Population.recombinate    // create offspring
        calculate(Parameter.NODES,(Parameter.L / Parameter.NODES),children)
    }
     
    def calculate(nrOfWorkers:Int, nrOfMessages:Int, inds:ArrayBuffer[Individual]) {
        val system = ActorSystem("GeneticOperations")
        val listener = system.actorOf(Props[Listener], name="listener")
        val master = system.actorOf(Props(new Master(
                nrOfWorkers,nrOfMessages,inds,listener)),
                name="master")
        master ! Broadcast
        implicit val timeout = Timeout(500 seconds)
        val future = master ? Finished
        println("Waiting for Finished message")
        val result = Await.result(future, timeout.duration).asInstanceOf[String]
        println("Got Finished message")
    }
   
    private def environment(children:ArrayBuffer[Individual]) {
          if(Parameter.NEUTRAL) {        // if neutral evolution no selection pressure
              Population.neutralEvolution(children)
              writeDiversity
          }
          else Population.select(children)
          writeAllToEvoPath
    }
    class Listener extends Actor {
        def receive = {
            case NewIndividuals(inds) =>
                Configuration.environment(inds)
                sender ! Finished
                context.system.shutdown()               
        }
    }
       
    var d = Parameter.N_DROPLET_TYPE-2
   
      /** Apply genetic operators (recombination, mutation) on the population
       * during the current generation.
       *  @param g Current generation.
       */
    def apply(g:Int) {
        var children = Population.recombinate    // create offspring
        calculate(Parameter.NODES,(Parameter.L / Parameter.NODES),children)
    }
     
    def calculate(nrOfWorkers:Int, nrOfMessages:Int, inds:ArrayBuffer[Individual]) {
        val system = ActorSystem("GeneticOperations")
        val listener = system.actorOf(Props[Listener], name="listener")
        val master = system.actorOf(Props(new Master(
                nrOfWorkers,nrOfMessages,inds,listener)),
                name="master")
        master ! Broadcast
        implicit val timeout = Timeout(500 seconds)
        val future = master ? Finished
        println("Waiting for Finished message")
        val result = Await.result(future, timeout.duration).asInstanceOf[String]
        println("Got Finished message")
    }
   
    private def environment(children:ArrayBuffer[Individual]) {
          if(Parameter.NEUTRAL) {        // if neutral evolution no selection pressure
              Population.neutralEvolution(children)
              writeDiversity
          }
          else Population.select(children)
          writeAllToEvoPath
    }

Akka Team

unread,
May 6, 2012, 8:04:40 AM5/6/12
to akka...@googlegroups.com
What "origin" do you expect the Broadcast message to have?

Cheers,



--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To view this discussion on the web visit https://groups.google.com/d/msg/akka-user/-/1dAS5u3OORgJ.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.



--
Akka Team
Typesafe - The software stack for applications that scale
Blog: letitcrash.com
Twitter: @akkateam

glanne

unread,
May 6, 2012, 10:03:07 AM5/6/12
to akka...@googlegroups.com
Hi,
this is the point where I'm not sure if I've correctly understood how the Await works (I assume I haven't). I expected that I have to send that Finished message that I'm listening for to that place where I'm listening for it. That is why I try to send it to where the Broadcast came from, since that's the place where I'm waiting for the Finished. I've just tried it with a Future instead, so I put


implicit val timeout = Timeout(500 seconds)
implicit val ec = ExecutionContext.fromExecutorService(// what does need to be here?)
val future = Future { actorsFinished }

println("Waiting for Finished message")
var result = Await.result(future, timeout.duration)
result must be(true)

println("Got Finished message")

actorsFinished is a variable in Configuration which is set to false before setting up the ActorSystem. When the Listener is finished it is set to true. Would that work? How would the ExecutorService look like for me then?

Thanks!
glanne

√iktor Ҡlang

unread,
May 6, 2012, 10:24:04 AM5/6/12
to akka...@googlegroups.com
Hi,

When you send a message to an Actor, it will look for an implicit ActorRef in scope to use that as the sender, if it cannot find one, it will use the Dead Letter ActorRef as the sender.

Since Broadcast is sent in Configuration, there's no implicit ActorRef in scope and as such your "origin" will be the Dead Letter ActorRef, so when you say that you're done you reply to "origin" you're sending that as a DeadLetter.
In this case you might want to send to "sender" as well, that you're done.

Cheers,


--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To view this discussion on the web visit https://groups.google.com/d/msg/akka-user/-/3atoXDInPJMJ.

To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.



--
Viktor Klang

Akka Tech Lead
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

glanne

unread,
May 6, 2012, 11:16:58 AM5/6/12
to akka...@googlegroups.com
Thanks for the explanation!
Unfortunately I now seem to have a different problem. I should have a total of 4 Workers. However, only one Worker sends a Result back to the Master (I've inserted some println to check that) and then the Future just times out because the other 3 Workers do not seem to be doing anything and so the Master never sends a message to the Listener and the Listener never sends the Finished message. Shouldn't the WorkerRouter take care of that?
(Note: In the code I posted there is a "nrOfResults += 1" missing in Master, receive, Result and the if-clause should be if(nrOfResults == nrOfWorkers))


Am Sonntag, 6. Mai 2012 13:18:54 UTC+2 schrieb glanne:

√iktor Ҡlang

unread,
May 6, 2012, 11:24:46 AM5/6/12
to akka...@googlegroups.com
The master won't wait for the worker to be finished, so it'll reply to the Finished message prematurely.

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To view this discussion on the web visit https://groups.google.com/d/msg/akka-user/-/cG4TxJToUsUJ.

To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

glanne

unread,
May 6, 2012, 12:08:14 PM5/6/12
to akka...@googlegroups.com
This surprises me, because that is what I included the if-clause for that checks if the Master has as many replies (Results) as there are Workers. Only then is it supposed to send the NewIndividuals message to the Listener and only then should the Listener send a Finish message and the Master will reply to that with a Finished message. Here's the new code for Listener and Master (everything else remains the same):


class Master(nrOfWorkers:Int, nrOfMessages:Int, oldInds:ArrayBuffer[Individual], listener:ActorRef)
        extends Actor {
        
        var inds = new ArrayBuffer[Individual]
        var nrOfResults:Int = _
        var origin:ActorRef = _
        val start:Long = System.currentTimeMillis
        
        val workerRouter = context.actorOf(
                Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
        
        def receive = {
            case Broadcast =>
                  origin = sender
                var workOn = new ArrayBuffer[Individual]   
                for(i <- 0 until nrOfMessages)
                    workOn += oldInds(i)
                oldInds.trimStart(nrOfMessages)
                workerRouter ! Work(workOn)
            case Result(newInds) =>
                inds ++= newInds
                nrOfResults += 1
                println("The master has " + nrOfResults + " results")
                if(nrOfResults == nrOfWorkers)

                    listener ! NewIndividuals(inds)
            case Finished =>
                context.stop(self)
                sender ! Finished

        }
        
    }
   
    class Listener extends Actor {
        def receive = {
            case NewIndividuals(inds) =>
                println("Listener coming up")

                Configuration.environment(inds)
                sender ! Finished
                context.system.shutdown()               
        }
    }

In calculate() I've inserted a println before and after Await.result. Both messages get printed before the Finished messages have been sent ("Listener coming up" has not been printed). I just don't see why the Master apparently sends a Finished message although that should only happen if it has received one from the Listener.

√iktor Ҡlang

unread,
May 6, 2012, 12:43:38 PM5/6/12
to akka...@googlegroups.com
def calculate(nrOfWorkers:Int, nrOfMessages:Int, inds:ArrayBuffer[Individual]) {
        val system = ActorSystem("GeneticOperations")
        val listener = system.actorOf(Props[Listener], name="listener")
        val master = system.actorOf(Props(new Master(
                nrOfWorkers,nrOfMessages,inds,listener)),
                name="master")
        master ! Broadcast
        implicit val timeout = Timeout(500 seconds)
        val future = master ? Finished <----- YOUR PROBLEM LIES HERE

        println("Waiting for Finished message")
        val result = Await.result(future, timeout.duration).asInstanceOf[String]
        println("Got Finished message")
    }


You first send Broadcast, then immediately after, you send Finished.


--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To view this discussion on the web visit https://groups.google.com/d/msg/akka-user/-/aCjPGyMjicgJ.

To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

glanne

unread,
May 6, 2012, 2:24:49 PM5/6/12
to akka...@googlegroups.com
Thanks for pointing me there, I apparently also had a wrong understanding of what ? does. Thanks for your patience!


Roland Kuhn

unread,
May 7, 2012, 2:41:42 AM5/7/12
to akka...@googlegroups.com
I’m not sure I correctly followed the whole thread, but I think you want “master ? Broadcast” and then await that one, and you most definitely do not want to shutdown() the system from within the Listener: since I take it that you want your main thread to control the overall program flow, let that one do the shutdown() in the end.

Regards,

Roland

On May 6, 2012, at 20:24 , glanne wrote:

Thanks for pointing me there, I apparently also had a wrong understanding of what ? does. Thanks for your patience!



--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To view this discussion on the web visit https://groups.google.com/d/msg/akka-user/-/TN0Az0K6X14J.

To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.

Roland Kuhn
Typesafe – The software stack for applications that scale.
twitter: @rolandkuhn


alexand...@googlemail.com

unread,
May 7, 2012, 3:48:23 AM5/7/12
to akka...@googlegroups.com
Yes, you're right with your assumptions of what I'm doing.
Since I got it from Viktor's post that ? actually sends a message instead of just listening for one, I've changed my code to do master ? Broadcast.
Thanks for pointing out the correct way to shutdown, from the Getting Started tutorial in the Akka docs I had assumed that you have to do it from within the Actors.


Am schrieb Roland Kuhn <goo...@rkuhn.info>:

√iktor Ҡlang

unread,
May 7, 2012, 4:01:09 AM5/7/12
to akka...@googlegroups.com
I really recommend to read the documentation: http://doc.akka.io/docs/akka/2.0.1/

Cheers,
--
Viktor Klang

Akka Tech Lead
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

Roland Kuhn

unread,
May 7, 2012, 4:58:50 AM5/7/12
to akka...@googlegroups.com
On May 7, 2012, at 09:48 , alexand...@googlemail.com wrote:

Yes, you're right with your assumptions of what I'm doing.
Since I got it from Viktor's post that ? actually sends a message instead of just listening for one, I've changed my code to do master ? Broadcast.
Thanks for pointing out the correct way to shutdown, from the Getting Started tutorial in the Akka docs I had assumed that you have to do it from within the Actors.

No, that is not required. I added a paragraph explaining this to the section you are quoting, because you are right that it was misleading.

Regards,

Roland
Reply all
Reply to author
Forward
0 new messages