Waiting for all threads to finish

351 views
Skip to first unread message

Robin Wood

unread,
Jan 19, 2015, 5:21:04 PM1/19/15
to cellulo...@googlegroups.com
I'm writing an app which needs to fire off a large number of threads, have them process something and pass the result back using a callback. I've started it based on the example here:

http://stackoverflow.com/questions/18210487/celluloid-callback

The problem I've got is that I need to be able to wait for all the threads to finish then do some more work. The example code uses a sleep at the end to have the main script wait long enough for everything to finish before exiting, I can't do this as I've no idea how long the threads will take and it isn't the "right" way to do it anyway.

The only example I've found for this is the following but I've not managed to make it fit:

https://gist.github.com/schmurfy/3758436#file_workers_extended2.rb

Is there a better way to do this?

Robin

Tony Arcieri

unread,
Jan 19, 2015, 5:30:16 PM1/19/15
to cellulo...@googlegroups.com
On Mon, Jan 19, 2015 at 2:21 PM, Robin Wood <ro...@digininja.org> wrote:
I'm writing an app which needs to fire off a large number of threads, have them process something and pass the result back using a callback.

One option is to use futures instead of callbacks. You can use them with Celluloid::Pool.
 
The problem I've got is that I need to be able to wait for all the threads to finish then do some more work. The example code uses a sleep at the end to have the main script wait long enough for everything to finish before exiting, I can't do this as I've no idea how long the threads will take and it isn't the "right" way to do it anyway.

If you really want to wait for an actor to exit, call #terminate on it then use Celluloid::Actor.join(actor) 

--
Tony Arcieri

Tony Arcieri

unread,
Jan 19, 2015, 5:30:53 PM1/19/15
to cellulo...@googlegroups.com
On Mon, Jan 19, 2015 at 2:29 PM, Tony Arcieri <bas...@gmail.com> wrote:
If you really want to wait for an actor to exit, call #terminate on it then use Celluloid::Actor.join(actor) 

Also note there's #terminate! to do asynchronous termination

--
Tony Arcieri

Robin Wood

unread,
Jan 19, 2015, 5:36:00 PM1/19/15
to cellulo...@googlegroups.com
On 19 January 2015 at 22:29, Tony Arcieri <bas...@gmail.com> wrote:
> On Mon, Jan 19, 2015 at 2:21 PM, Robin Wood <ro...@digininja.org> wrote:
>>
>> I'm writing an app which needs to fire off a large number of threads, have
>> them process something and pass the result back using a callback.
>
>
> One option is to use futures instead of callbacks. You can use them with
> Celluloid::Pool.

Don't futures block till they've returned? I started with those but
those but then moved off to this approach.

>>
>> The problem I've got is that I need to be able to wait for all the threads
>> to finish then do some more work. The example code uses a sleep at the end
>> to have the main script wait long enough for everything to finish before
>> exiting, I can't do this as I've no idea how long the threads will take and
>> it isn't the "right" way to do it anyway.
>
>
> If you really want to wait for an actor to exit, call #terminate on it then
> use Celluloid::Actor.join(actor)

I read that #terminate waits till all the current threads finish then
exits just dropping any that haven't been started yet, isn't that the
case?

Robin

>
> --
> Tony Arcieri
>
> --
> You received this message because you are subscribed to the Google Groups
> "Celluloid" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to celluloid-rub...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

Tony Arcieri

unread,
Jan 19, 2015, 5:50:49 PM1/19/15
to cellulo...@googlegroups.com
On Mon, Jan 19, 2015 at 2:35 PM, Robin Wood <ro...@digi.ninja> wrote:
Don't futures block till they've returned? I started with those but
those but then moved off to this approach.

They block if you call `#value` on them and they aren't ready.
 
I read that #terminate waits till all the current threads finish

Do you mean tasks?
 
then exits just dropping any that haven't been started yet, isn't that the case?

If you're using synchronous calls or futures, they won't be "dropped", they will cause an error in the caller (or consumer of the value). 

--
Tony Arcieri

Robin Wood

unread,
Jan 19, 2015, 6:00:14 PM1/19/15
to cellulo...@googlegroups.com


On 19 Jan 2015 17:50, "Tony Arcieri" <bas...@gmail.com> wrote:
>
> On Mon, Jan 19, 2015 at 2:35 PM, Robin Wood <ro...@digi.ninja> wrote:
>>
>> Don't futures block till they've returned? I started with those but
>> those but then moved off to this approach.
>
>
> They block if you call `#value` on them and they aren't ready.
>

So use a future but then use the callback to get the result rather than calling #value, I hadn't thought of that approach, sounds good.

>>
>> I read that #terminate waits till all the current threads finish
>
>
> Do you mean tasks?
>  

Possibly. I just remember reading that it let what was running finish but didn't start anything new so I probably do mean tasks.


>>
>> then exits just dropping any that haven't been started yet, isn't that the case?
>
>
> If you're using synchronous calls or futures, they won't be "dropped", they will cause an error in the caller (or consumer of the value). 

Do they all still run and just send back errors that can be ignored or do they die in some way which causes the error?

Tony Arcieri

unread,
Jan 20, 2015, 4:23:21 PM1/20/15
to cellulo...@googlegroups.com
On Mon, Jan 19, 2015 at 3:00 PM, Robin Wood <ro...@digi.ninja> wrote:

Do they all still run and just send back errors that can be ignored or do they die in some way which causes the error?

All running tasks are terminated with Celluloid::Task::TerminatedError

--
Tony Arcieri

Robin Wood

unread,
Jan 20, 2015, 4:34:25 PM1/20/15
to cellulo...@googlegroups.com
Thats the problem, I don't want the to terminate, I want to block the
main thread till all tasks have been performed then continue so that I
can do things with what the tasks generated. This is what I've got so
far, it opens a file and for each line checks the number of characters
and then should display the list through the dump_data calls.

require 'celluloid/autostart'

class LengthChecker
include Celluloid
include Celluloid::Notifications

def initialize
end

def process word
publish "jobDone", "length", word.length
end

def rationalise data
puts data.inspect
end
end

class Observer
include Celluloid
include Celluloid::Notifications

def initialize
subscribe "jobDone", :on_done

@data = {
"length" => [],
}
end

def on_done(*args)
puts "Checker finished, returned #{args.inspect}"

@data[args[1]] = args[2]
end

def dump_data
puts @data.inspect
end
end

y = Observer.new

lengthChecker = LengthChecker.pool

filename = "test_data.txt"

File.open(filename, "r").each_line do |line|
lengthChecker.future :process, line
end

#lengthChecker.terminate

# NEED TO BLOCK HERE BEFORE DUMPING data.

y.dump_data

puts "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"


The tasks in the final system are going to be more complex and some
will take longer than others.

Does that make more sense?

Robin Wood

unread,
Jan 25, 2015, 6:01:19 PM1/25/15
to cellulo...@googlegroups.com
I've not had any more replies to this and asked on IRC but couldn't
get a working solution so thought I'd give more details to see if I
can get more help.

The script I'm trying to create will go through a text file, of an
unknown length but potentially 10's thousands of lines, and for each
line start a new task. These tasks can take an arbitrary amount of
time so I can't assume the tasks will finish in the order they
started.

Once all tasks have finished I need to go through the results and then
start up a new set of tasks.

People have suggested using futures, storing the tasks and then
iterating through them calling value on each. Here is an example:

https://github.com/httprb/http.rb/blob/master/examples/parallel_requests_with_celluloid.rb

This doesn't work for me as there is a chance the first task started
could take the longest which means while the value loop is waiting for
that first result the rest of the results are queued up in memory
which could potentially take a lot of memory.

I'm currently using notifications for the tasks to call back to a
method that stores the returned value in Redis so as soon as the task
is done the memory is cleared rather than stacking all the results up.
The only problem with this is blocking the second set of tasks till
the first set has finished. All the examples I've seen have used a
sleep to just freeze the main flow till they are confident that all
the tasks have finished. I can't do this as I can't know up front how
long each task will take or how many tasks there will be, it is also
not the right way to engineer something like this.

I can't be the first person to have the need to block till all tasks
are finished so there must be a solution out there I just can't find
it.

Can someone help?

Robin

Tony Arcieri

unread,
Jan 25, 2015, 6:06:25 PM1/25/15
to cellulo...@googlegroups.com
Here is the simplest strategy:

- Create your actors
- Start work on them
- Have the main thread loop through them and call Celluloid::Actor.join(...) on them
- Have the actors call terminate on themselves when they're done doing their work
--
Tony Arcieri

Robin Wood

unread,
Jan 25, 2015, 6:14:36 PM1/25/15
to cellulo...@googlegroups.com
Ok thanks, I'll give that a go in the morning.

Robin

Robin Wood

unread,
Feb 5, 2015, 6:08:22 AM2/5/15
to cellulo...@googlegroups.com
On 25 January 2015 at 23:06, Tony Arcieri <bas...@gmail.com> wrote:
> Here is the simplest strategy:
>
> - Create your actors
> - Start work on them
> - Have the main thread loop through them and call Celluloid::Actor.join(...)
> on them
> - Have the actors call terminate on themselves when they're done doing their
> work
>

Got very delayed on this due to other stuff but finally back on it,
I've tried a few things and can't get it to work, I think I'm probably
doing something wrong, hopefully you can spot something here:


require "celluloid"

class Rocket
include Celluloid

def launch(from)
# do things
end
end

actors = []
1.upto(5) do |from|
r = Rocket.new
r.async.launch(from)
actors << r
end

actors.each do |act|
puts "join"
Celluloid::Actor.join(act)
end

puts "continue things here"

--------

actors ends up with an array of actors. When I do the loop with the
join in I get the first join printed then I get this error:

D, [2015-02-05T11:04:35.377602 #12954] DEBUG -- : Terminating 5 actors...
/usr/local/rvm/gems/ruby-2.2.0/gems/celluloid-0.16.0/lib/celluloid/thread_handle.rb:37:in
`sleep': No live threads left. Deadlock? (fatal)
from /usr/local/rvm/gems/ruby-2.2.0/gems/celluloid-0.16.0/lib/celluloid/thread_handle.rb:37:in
`wait'
from /usr/local/rvm/gems/ruby-2.2.0/gems/celluloid-0.16.0/lib/celluloid/thread_handle.rb:37:in
`block in join'

I tried wrapping the join in:

if act.alive?

but that didn't help.

This approach also doesn't seem to allow for using a pool, if I create
a pool then start things through that then I don't get the actors
returned.

What have I done wrong?

Robin

Tony Arcieri

unread,
Feb 5, 2015, 12:13:15 PM2/5/15
to cellulo...@googlegroups.com
On Thu, Feb 5, 2015 at 3:08 AM, Robin Wood <ro...@digi.ninja> wrote:
What have I done wrong?

You're never terminating the actors. It's effectively a livelock:

class Rocket
    include Celluloid

    def launch(from)
       # do things
       terminate
    end
end
 

--
Tony Arcieri

Robin Wood

unread,
Feb 5, 2015, 12:32:55 PM2/5/15
to cellulo...@googlegroups.com

Thanks, will try that. Not heard of live lock before, what does that mean in terms of the threads?

Robin

--

Tony Arcieri

unread,
Feb 5, 2015, 12:34:07 PM2/5/15
to cellulo...@googlegroups.com
On Thu, Feb 5, 2015 at 9:32 AM, Robin Wood <ro...@digi.ninja> wrote:

Thanks, will try that. Not heard of live lock before, what does that mean in terms of the threads?

It means threads are unable to make further progress 

--
Tony Arcieri

Robin Wood

unread,
Feb 5, 2015, 12:37:31 PM2/5/15
to cellulo...@googlegroups.com

Is that because the join forces everything into the first actor meaning the rest can't run?

Robin

Tony Arcieri

unread,
Feb 5, 2015, 12:51:24 PM2/5/15
to cellulo...@googlegroups.com
On Thu, Feb 5, 2015 at 9:37 AM, Robin Wood <ro...@digi.ninja> wrote:

Is that because the join forces everything into the first actor meaning the rest can't run?

The rest are sitting around idle while the main thread is trying to join on them. 

The main thread can't join on the others until they terminate.

--
Tony Arcieri

Robin Wood

unread,
Feb 5, 2015, 1:11:04 PM2/5/15
to cellulo...@googlegroups.com

Ok makes sense, thanks.

If I wanted to use a pool would I just pull a list of actors out of the pool or is there a different way to get them?

--

Robin Wood

unread,
Feb 7, 2015, 6:14:54 PM2/7/15
to cellulo...@googlegroups.com
I've been trying to get this to work with a pool and it is failing
which I'd kind of expect but not in this way. This is the code:

require "celluloid"

class Rocket
include Celluloid

def launch(from)
# do something
puts "Blast off!"
terminate
end
end

pool = Rocket.pool

puts "Launching 9 rockets"

1.upto(9) do |from|
puts "Launching with #{from}"
pool.async.launch(from)
end

puts "All launched"

puts "Joining All"

Celluloid::Actors.all do |act|
puts "Join"
Celluloid::Actor.join(act)
end

puts "All joined"
puts "continue things here"

And when I run it I get this:


Launching 9 rockets
Launching with 1
Launching with 2
Launching with 3
Launching with 4
1...
3...
Launching with 5
2...4...Launching with 6


Launching with 7
Launching with 8
Launching with 9
All launched
Joining All
D, [2015-02-07T22:46:36.298472 #9071] DEBUG -- : Terminating 5 actors...
Celluloid::TaskFiber backtrace unavailable. Please try
`Celluloid.task_class = Celluloid::TaskThread` if you need backtraces
here.
W, [2015-02-07T22:46:36.299452 #9071] WARN -- : Terminating task:
type=:call, meta={:method_name=>:launch}, status=:sleeping
Celluloid::TaskFiber backtrace unavailable. Please try
`Celluloid.task_class = Celluloid::TaskThread` if you need backtraces
here.
W, [2015-02-07T22:46:36.299661 #9071] WARN -- : Terminating task:
type=:call, meta={:method_name=>:launch}, status=:sleeping
Celluloid::TaskFiber backtrace unavailable. Please try
`Celluloid.task_class = Celluloid::TaskThread` if you need backtraces
here.
W, [2015-02-07T22:46:36.299911 #9071] WARN -- : Terminating task:
type=:call, meta={:method_name=>:launch}, status=:sleeping
Celluloid::TaskFiber backtrace unavailable. Please try
`Celluloid.task_class = Celluloid::TaskThread` if you need backtraces
here.
D, [2015-02-07T22:46:36.301292 #9071] DEBUG -- :
Celluloid::PoolManager: async call `launch` aborted!
Celluloid::Task::TerminatedError: task was terminated

I don't get the "Blast off" output which means terminate never gets
called and I don't get the "join" to show that a join has been called.

Is it possible to do this with a pool? I could be starting 10k actors
and don't want them all running at once so a pool seems like the
correct way to do it. If there are only 4 actors created and I call
terminate on them when they are finished will new ones be respawned by
the pool? I assume that if they then wouldn't be joined as they would
be new and created after I've ran the join loop.

Am I missing something really obvious here? Am I really trying to do
something that no one else does with Celluloid?

Robin
Reply all
Reply to author
Forward
0 new messages