Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

[ANN] forkoff - parallel processing for ruby enumerables

4 views
Skip to first unread message

ara howard

unread,
Apr 17, 2008, 9:43:05 PM4/17/08
to

NAME

forkoff

SYNOPSIS

brain-dead simple parallel processing for ruby

URI

http://rubyforge.org/projects/codeforpeople

INSTALL

gem install forkoff

DESCRIPTION

forkoff works for any enumerable object, iterating a code block to
run in a
child process and collecting the results. forkoff can limit the
number of
child processes which is, by default, 8.

SAMPLES

<========< samples/a.rb >========>

~ > cat samples/a.rb

#
# forkoff makes it trivial to do parallel processing with ruby,
the following
# prints out each word in a separate process
#

require 'forkoff'

%w( hey you ).forkoff!{|word| puts "#{ word } from
#{ Process.pid }"}

~ > ruby samples/a.rb

hey from 3239
you from 3240


<========< samples/b.rb >========>

~ > cat samples/b.rb

#
# for example, this takes only 1 second or so to complete
#

require 'forkoff'

a = Time.now.to_f

results =
(0..7).forkoff do |i|

sleep 1

i ** 2

end

b = Time.now.to_f

elapsed = b - a

puts "elapsed: #{ elapsed }"
puts "results: #{ results.inspect }"

~ > ruby samples/b.rb

elapsed: 1.07044386863708
results: [0, 1, 4, 9, 16, 25, 36, 49]


<========< samples/c.rb >========>

~ > cat samples/c.rb

#
# forkoff does *NOT* spawn processes in batches, waiting for each
batch to
# complete. rather, it keeps a certain number of processes busy
until all
# results have been gathered. in otherwords the following will
ensure that 2
# processes are running at all times, until the list is complete.
note that
# the following will take about 2 seconds to run (2 sets of 2 @ 1
second).
#

require 'forkoff'

pid = Process.pid

a = Time.now.to_f

pstrees =
%w( a b c d ).forkoff! :processes => 2 do |letter|
sleep 1
{ letter => ` pstree -l 2 #{ pid } ` }
end


b = Time.now.to_f

puts
puts "pid: #{ pid }"
puts "elapsed: #{ b - a }"
puts

require 'yaml'

pstrees.each do |pstree|
y pstree
end

~ > ruby samples/c.rb


pid: 3254
elapsed: 2.12998485565186

---
a: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03255 ahoward ruby -Ilib samples/c.rb
\-+- 03256 ahoward ruby -Ilib samples/c.rb

---
b: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03255 ahoward ruby -Ilib samples/c.rb
\-+- 03256 ahoward ruby -Ilib samples/c.rb

---
c: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03261 ahoward (ruby)
\-+- 03262 ahoward ruby -Ilib samples/c.rb

---
d: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03261 ahoward ruby -Ilib samples/c.rb
\-+- 03262 ahoward ruby -Ilib samples/c.rb

a @ http://codeforpeople.com/
--
we can deny everything, except that we have the possibility of being
better. simply reflect on that.
h.h. the 14th dalai lama


Roger Pack

unread,
Apr 17, 2008, 9:51:47 PM4/17/08
to
[Note: parts of this message were removed to make it a legal post.]

>
> NAME
>
> forkoff
>

Nice. Great idea.


>
> # forkoff does *NOT* spawn processes in batches, waiting for each
> batch to
> # complete. rather, it keeps a certain number of processes busy
> until all
> # results have been gathered. in otherwords the following will
> ensure that 2
> # processes are running at all times, until the list is complete.
> note that
> # the following will take about 2 seconds to run (2 sets of 2 @ 1
> second).
> #

I assume then that at most 2 processes are forked, and each keeps
working?

ara.t.howard

unread,
Apr 17, 2008, 10:38:19 PM4/17/08
to

On Apr 17, 2008, at 7:51 PM, Roger Pack wrote:
> I assume then that at most 2 processes are forked, and each keeps
> working?

right now it's 8 - 2 is more reasonable. at this point this code is
fully proof of concept - i'll take that as a suggestion (that i agree
with)

cheers.

Phillip Gawlowski

unread,
Apr 17, 2008, 10:48:18 PM4/17/08
to
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

ara howard wrote:
|
| NAME
|
| forkoff
|
| SYNOPSIS
|
| brain-dead simple parallel processing for ruby
|
| URI
|
| http://rubyforge.org/projects/codeforpeople
|
| INSTALL
|
| gem install forkoff
|
| DESCRIPTION
|
| forkoff works for any enumerable object, iterating a code block to run
| in a
| child process and collecting the results. forkoff can limit the
| number of
| child processes which is, by default, 8.

So, the tool that captures run away processes and terminates them will
be called 'sodoff', I wager? :P

SCNR

- --
Phillip Gawlowski
Twitter: twitter.com/cynicalryan

You thought I was taking your woman away from you. You're jealous.
You tried to kill me with your bare hands. Would a Kelvan do that?
Would he have to? You're reacting with the emotions of a human.
You are human.
~ -- Kirk, "By Any Other Name," stardate 4657.5
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.8 (MingW32)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkgIC/AACgkQbtAgaoJTgL/V1wCeMxLdzlPEbQDtp3fya03PRP8z
O6AAn0BA5yY/MU1dzKYt1Ezd/YbsFakv
=QIF9
-----END PGP SIGNATURE-----

ara.t.howard

unread,
Apr 17, 2008, 10:54:13 PM4/17/08
to

On Apr 17, 2008, at 8:48 PM, Phillip Gawlowski wrote:
>
> So, the tool that captures run away processes and terminates them will
> be called 'sodoff', I wager? :P

oh yeah, that's good - taken!

Phillip Gawlowski

unread,
Apr 17, 2008, 11:39:51 PM4/17/08
to
-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

ara.t.howard wrote:
|
| On Apr 17, 2008, at 8:48 PM, Phillip Gawlowski wrote:
|>
|> So, the tool that captures run away processes and terminates them will
|> be called 'sodoff', I wager? :P
|
| oh yeah, that's good - taken!

I want credit. Dollars aren't worth a dime. :P

- --
Phillip Gawlowski
Twitter: twitter.com/cynicalryan

"You speak truth," said Themistocles; "I should never have been famous
if I had been of Seriphus"
~ -- Plutarch (46-120 AD)
~ -- Life of Themistocles


-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.8 (MingW32)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org

iEYEARECAAYFAkgIGAMACgkQbtAgaoJTgL/40QCgpIHgsDVOKQHPfTLEWA05FwLs
73gAn0D6YYgbh0Td+nNcVf6xGMr6ZPGM
=hUj0
-----END PGP SIGNATURE-----

Piyush Ranjan

unread,
Apr 18, 2008, 12:58:02 AM4/18/08
to
[Note: parts of this message were removed to make it a legal post.]

I think this is a great idea!
Kudos

Martin DeMello

unread,
Apr 18, 2008, 4:00:10 AM4/18/08
to
On Thu, Apr 17, 2008 at 6:43 PM, ara howard <ara.t....@gmail.com> wrote:
> DESCRIPTION
>
> forkoff works for any enumerable object, iterating a code block to run in
> a
> child process and collecting the results. forkoff can limit the number of
> child processes which is, by default, 8.

Very neat indeed!

martin

fedzor

unread,
Apr 18, 2008, 8:23:26 AM4/18/08
to
Since it's using Kernel#fork(), does this mean it is using OS threads?

ara.t.howard

unread,
Apr 18, 2008, 11:59:44 AM4/18/08
to

On Apr 18, 2008, at 6:23 AM, fedzor wrote:
> Since it's using Kernel#fork(), does this mean it is using OS threads?

yes. forkoff has a number of consumer *green* threads used to manage
an array of queues containing the elements destined to be passed to a
forked process/native thread for execution of the block. the code is
very short, give a read.

cheers.

Erik Veenstra

unread,
Apr 18, 2008, 2:19:17 PM4/18/08
to
I've once implemented Enumerable#fork myself. It doesn't use
queues, or a producer-consumer like pattern. It simply tells a
generic ThreadLimiter to spawn a new thread. Within this
thread, a new process is spawned. The number of concurrent
threads, and thus the number of concurrent processes, is
controlled by ThreadLimiter.

We might learn from both implementations.

gegroet,
Erik V. - http://www.erikveen.dds.nl/

----------------------------------------------------------------

Here's my code:

----------------------------------------------------------------

module Enumerable
def fork(max_number_of_threads=nil, &block)
thread_limiter =
EV::ThreadLimiter.new(max_number_of_threads)

collect do |x|
thread_limiter.fork do
Thread.current.abort_on_exception = true

r, w = IO.pipe

if pid = Process.fork
w.close
Process.wait(pid)
data = r.read
r.close
Marshal.load(data)
else
r.close
Marshal.dump(block.call(x), w)
w.close
exit
end
end
end.collect do |t|
t.value
end
end
end

----------------------------------------------------------------

module EV
class ThreadLimiter
def initialize(max_number_of_threads)
@number_of_threads = 0
@max_number_of_threads = max_number_of_threads

yield(self) if block_given?
end

def fork(*args, &block)
Thread.pass while @max_number_of_threads and
@max_number_of_threads > 0 and
@number_of_threads >
@max_number_of_threads

# If this methods is called from several threads, then
# @number_of_threads might get bigger than
@max_number_of_threads.
# This usually a) isn't the case and b) doesn't really matter
(to me...).
# I'm willing to accept this "risk", because a)
Thread.exclusive is
# much, much faster than Mutex#synchronize and b) we can't run
into
# deadlocks.

Thread.exclusive{@number_of_threads += 1}

Thread.fork do
begin
res = block.call(*args)
ensure
Thread.exclusive{@number_of_threads -= 1}
end

res
end
end
end
end

----------------------------------------------------------------

Here's a benchmark:

require "benchmark"

Benchmark.bm(15) do |bm|
rc = nil
r2 = nil
r4 = nil
rx = nil

data = 1..10
test = lambda{|x| 1_000_000.times{7+8}; [x, Process.pid]}

bm.report(" collect "){rc = data.collect(&test)}
bm.report(" 2 processes"){r2 = data.fork(2, &test)}
bm.report(" 4 processes"){r4 = data.fork(4, &test)}
bm.report("inf processes"){rx = data.fork(-1, &test)}

p rc
p r2
p r4
p rx
end

It produces these results on a dual core machine:

user system total real
collect 4.530000 0.000000 4.530000 ( 4.527982)
2 processes 0.030000 0.050000 3.170000 ( 1.733209)
4 processes 0.160000 0.370000 3.610000 ( 1.927826)
inf processes 0.000000 0.000000 3.080000 ( 1.691932)
[[1, 18732], [2, 18732], [3, 18732], [4, 18732], [5, 18732], [6,
18732], [7, 18732], [8, 18732], [9, 18732], [10, 18732]]
[[1, 18733], [2, 18734], [3, 18735], [4, 18736], [5, 18737], [6,
18738], [7, 18739], [8, 18740], [9, 18741], [10, 18742]]
[[1, 18743], [2, 18744], [3, 18745], [4, 18746], [5, 18747], [6,
18748], [7, 18749], [8, 18750], [9, 18751], [10, 18752]]
[[1, 18753], [2, 18754], [3, 18755], [4, 18756], [5, 18757], [6,
18758], [7, 18759], [8, 18760], [9, 18761], [10, 18762]]

----------------------------------------------------------------

Erik Veenstra

unread,
Apr 18, 2008, 2:20:47 PM4/18/08
to
Just a word of warning: The construction "Thread.new(i){|i|" is
useless, by definition. Just like "i=i" is useless too.

If i isn't defined outside the loop, you don't have to pas i to
the thread, so "Thread.new{" will do. However, if i is defined
outside the loop (which it isn't, in your code...),
"Thread.new(i){|i|" won't work (see below): It's better to use
"Thread.new(i1){|i2|" instead.

gegroet,
Erik V. - http://www.erikveen.dds.nl/

----------------------------------------------------------------

a = (1..10).to_a
a1 = a.map{|i| Thread.new { sleep 0.01 ; i }}.map{|t|
t.value}
a2 = a.map{|i| Thread.new(i) {|i| sleep 0.01 ; i }}.map{|t|
t.value} # Will do.
i = nil
a3 = a.map{|i| Thread.new(i) {|i| sleep 0.01 ; i }}.map{|t|
t.value} # Won't do!
a4 = a.map{|i1| Thread.new(i1){|i2|sleep 0.01 ; i2}}.map{|t|
t.value}

p a1 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
p a2 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
p a3 # ==> [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]
p a4 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

----------------------------------------------------------------

ara.t.howard

unread,
Apr 18, 2008, 2:24:41 PM4/18/08
to

On Apr 18, 2008, at 12:19 PM, Erik Veenstra wrote:
> I've once implemented Enumerable#fork myself. It doesn't use
> queues, or a producer-consumer like pattern. It simply tells a
> generic ThreadLimiter to spawn a new thread. Within this
> thread, a new process is spawned. The number of concurrent
> threads, and thus the number of concurrent processes, is
> controlled by ThreadLimiter.
>
> We might learn from both implementations.

this is precisely how forkoff manages the number of processes, only
it's via Queues and a fixed number of threads consuming from those
queues

the latest impl from svn:

http://p.ramaze.net/1141

regards.

ara.t.howard

unread,
Apr 18, 2008, 2:26:06 PM4/18/08
to

On Apr 18, 2008, at 12:20 PM, Erik Veenstra wrote:
> If i isn't defined outside the loop, you don't have to pas i to
> the thread, so "Thread.new{" will do. However, if i is defined
> outside the loop (which it isn't, in your code...),
> "Thread.new(i){|i|" won't work (see below): It's better to use
> "Thread.new(i1){|i2|" instead.

indeed - left over from a previous iteration.

Fredrik

unread,
Apr 20, 2008, 9:16:05 PM4/20/08
to
This is really nice! I would prefer to change this syntax

%w( a b c d ).forkoff! :processes => 2 do |letter|

for this syntax

%w( a b c d ).forkoff! 2 do |letter|

though. Then I wouldn't have to remember that 'processes' keyword.
Anyhow, it is a great piece of code.

/Fredrik

ara.t.howard

unread,
Apr 20, 2008, 9:42:17 PM4/20/08
to


the next release supports either an hash (options) or numeric argument
- so either will work - may release tonight...

cheers.

ara.t.howard

unread,
Apr 20, 2008, 10:00:31 PM4/20/08
to

On Apr 20, 2008, at 12:32 PM, Shot (Piotr Szotkowski) wrote:
> My example code:
>
> results = []
> run = 0
> runs = 2 ** fsm_inputs.size
> (0...runs).forkoff do |vector|
> results[vector] = by_input_sets vector
> # snip some run-based stats
> run += 1
> end
>
> This, obviously, doesn’t work (i.e., results is an empty array at the
> end and run is 0 in every iteration). I can get the results by making
> the block return the by_input_sets call’s result, but I still lose the
> run-based stats.
>
> It seems a singleton-based approach would work (I’d create a singleton
> object outside of the loop and have the results array and run counter
> be its properties), but maybe there is an easier way?

to do this you'll want to combine forkoff with my slave lib: which
sets up an object which is fronted by drb, an which can indeed be a
singleton - note that this object is, itself, running in a child
process, but you can ignore this for the most part. an simple example:

cfp:~ > cat a.rb
require 'rubygems'
require 'slave'
require 'forkoff'

slave = Slave.new(:threadsafe => true){ Hash.new }
process_global = slave.object

( 0 .. 4 ).each do |i|
process_global[i] = i ** 2
end

process_global.each do |k,v|
p k => v
end


cfp:~ > ruby a.rb
{0=>0}
{1=>1}
{2=>4}
{3=>9}
{4=>16}


even with these abstractions you have to consider deeply what's
happening with threads/processes etc - but yes, it's definitely
possible with little code.

cheers.

Abdul-rahman Advany

unread,
Apr 29, 2008, 5:16:30 PM4/29/08
to
Checking out the source I only see pid = fork (this is a call to
Thread.new isn't it?), I don't see that real fork (kernel) is used... or
I am wrong?
--
Posted via http://www.ruby-forum.com/.

Abdul-rahman Advany

unread,
Apr 29, 2008, 6:15:08 PM4/29/08
to
Abdul-rahman Advany wrote:
> Checking out the source I only see pid = fork (this is a call to
> Thread.new isn't it?), I don't see that real fork (kernel) is used... or
> I am wrong?

Sorry, just figured out calling fork makes the thread a child process...
just need to find out how to set a timeout on the thread... anyone
suggestions?

ara.t.howard

unread,
Apr 29, 2008, 7:02:36 PM4/29/08
to

On Apr 29, 2008, at 4:15 PM, Abdul-rahman Advany wrote:
> Sorry, just figured out calling fork makes the thread a child
> process...
> just need to find out how to set a timeout on the thread... anyone
> suggestions?

require 'timeout'

begin
Timeout.timeout(seconds){ thread.join }
rescue Timeout::Error
thread.kill rescue nil
end

Jeremy Hinegardner

unread,
Apr 30, 2008, 11:32:45 AM4/30/08
to
On Wed, Apr 30, 2008 at 08:02:36AM +0900, ara.t.howard wrote:
>
> On Apr 29, 2008, at 4:15 PM, Abdul-rahman Advany wrote:
>> Sorry, just figured out calling fork makes the thread a child process...
>> just need to find out how to set a timeout on the thread... anyone
>> suggestions?
>
> require 'timeout'
>
> begin
> Timeout.timeout(seconds){ thread.join }
> rescue Timeout::Error
> thread.kill rescue nil
> end

That'll kill the thread, but not the child process that was forked, at least
that's what I remember.

Try this as a general strategy:

parent_t = Thread.new(cmd) do |exec_me|
if cpid = fork then
Thread.current[:cpid] = cpid
cpid, exit_status = Process.waitpid2(cpid)
Thread.current[:exit_status] = exit_status
else
exec exec_me
end
end


unless parent_t.join( seconds ) # seconds contains your timeout value
cpid = parent_t[:cpid]
%w[ TERM KILL ].each do |sig|
Process.kill(sig, cpid)
break if parent_t.join(1)
end

# Do something with Thread.current[:exit_status] to report the child
# process exit status.

enjoy,

-jeremy


--
========================================================================
Jeremy Hinegardner jer...@hinegardner.org


0 new messages