forkoff
SYNOPSIS
brain-dead simple parallel processing for ruby
URI
http://rubyforge.org/projects/codeforpeople
INSTALL
gem install forkoff
DESCRIPTION
forkoff works for any enumerable object, iterating a code block to
run in a
child process and collecting the results. forkoff can limit the
number of
child processes which is, by default, 8.
SAMPLES
<========< samples/a.rb >========>
~ > cat samples/a.rb
#
# forkoff makes it trivial to do parallel processing with ruby,
the following
# prints out each word in a separate process
#
require 'forkoff'
%w( hey you ).forkoff!{|word| puts "#{ word } from
#{ Process.pid }"}
~ > ruby samples/a.rb
hey from 3239
you from 3240
<========< samples/b.rb >========>
~ > cat samples/b.rb
#
# for example, this takes only 1 second or so to complete
#
require 'forkoff'
a = Time.now.to_f
results =
(0..7).forkoff do |i|
sleep 1
i ** 2
end
b = Time.now.to_f
elapsed = b - a
puts "elapsed: #{ elapsed }"
puts "results: #{ results.inspect }"
~ > ruby samples/b.rb
elapsed: 1.07044386863708
results: [0, 1, 4, 9, 16, 25, 36, 49]
<========< samples/c.rb >========>
~ > cat samples/c.rb
#
# forkoff does *NOT* spawn processes in batches, waiting for each
batch to
# complete. rather, it keeps a certain number of processes busy
until all
# results have been gathered. in otherwords the following will
ensure that 2
# processes are running at all times, until the list is complete.
note that
# the following will take about 2 seconds to run (2 sets of 2 @ 1
second).
#
require 'forkoff'
pid = Process.pid
a = Time.now.to_f
pstrees =
%w( a b c d ).forkoff! :processes => 2 do |letter|
sleep 1
{ letter => ` pstree -l 2 #{ pid } ` }
end
b = Time.now.to_f
puts
puts "pid: #{ pid }"
puts "elapsed: #{ b - a }"
puts
require 'yaml'
pstrees.each do |pstree|
y pstree
end
~ > ruby samples/c.rb
pid: 3254
elapsed: 2.12998485565186
---
a: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03255 ahoward ruby -Ilib samples/c.rb
\-+- 03256 ahoward ruby -Ilib samples/c.rb
---
b: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03255 ahoward ruby -Ilib samples/c.rb
\-+- 03256 ahoward ruby -Ilib samples/c.rb
---
c: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03261 ahoward (ruby)
\-+- 03262 ahoward ruby -Ilib samples/c.rb
---
d: |
-+- 03254 ahoward ruby -Ilib samples/c.rb
|-+- 03261 ahoward ruby -Ilib samples/c.rb
\-+- 03262 ahoward ruby -Ilib samples/c.rb
a @ http://codeforpeople.com/
--
we can deny everything, except that we have the possibility of being
better. simply reflect on that.
h.h. the 14th dalai lama
>
> NAME
>
> forkoff
>
Nice. Great idea.
>
> # forkoff does *NOT* spawn processes in batches, waiting for each
> batch to
> # complete. rather, it keeps a certain number of processes busy
> until all
> # results have been gathered. in otherwords the following will
> ensure that 2
> # processes are running at all times, until the list is complete.
> note that
> # the following will take about 2 seconds to run (2 sets of 2 @ 1
> second).
> #
I assume then that at most 2 processes are forked, and each keeps
working?
right now it's 8 - 2 is more reasonable. at this point this code is
fully proof of concept - i'll take that as a suggestion (that i agree
with)
cheers.
ara howard wrote:
|
| NAME
|
| forkoff
|
| SYNOPSIS
|
| brain-dead simple parallel processing for ruby
|
| URI
|
| http://rubyforge.org/projects/codeforpeople
|
| INSTALL
|
| gem install forkoff
|
| DESCRIPTION
|
| forkoff works for any enumerable object, iterating a code block to run
| in a
| child process and collecting the results. forkoff can limit the
| number of
| child processes which is, by default, 8.
So, the tool that captures run away processes and terminates them will
be called 'sodoff', I wager? :P
SCNR
- --
Phillip Gawlowski
Twitter: twitter.com/cynicalryan
You thought I was taking your woman away from you. You're jealous.
You tried to kill me with your bare hands. Would a Kelvan do that?
Would he have to? You're reacting with the emotions of a human.
You are human.
~ -- Kirk, "By Any Other Name," stardate 4657.5
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.8 (MingW32)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org
iEYEARECAAYFAkgIC/AACgkQbtAgaoJTgL/V1wCeMxLdzlPEbQDtp3fya03PRP8z
O6AAn0BA5yY/MU1dzKYt1Ezd/YbsFakv
=QIF9
-----END PGP SIGNATURE-----
oh yeah, that's good - taken!
ara.t.howard wrote:
|
| On Apr 17, 2008, at 8:48 PM, Phillip Gawlowski wrote:
|>
|> So, the tool that captures run away processes and terminates them will
|> be called 'sodoff', I wager? :P
|
| oh yeah, that's good - taken!
I want credit. Dollars aren't worth a dime. :P
- --
Phillip Gawlowski
Twitter: twitter.com/cynicalryan
"You speak truth," said Themistocles; "I should never have been famous
if I had been of Seriphus"
~ -- Plutarch (46-120 AD)
~ -- Life of Themistocles
-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.8 (MingW32)
Comment: Using GnuPG with Mozilla - http://enigmail.mozdev.org
iEYEARECAAYFAkgIGAMACgkQbtAgaoJTgL/40QCgpIHgsDVOKQHPfTLEWA05FwLs
73gAn0D6YYgbh0Td+nNcVf6xGMr6ZPGM
=hUj0
-----END PGP SIGNATURE-----
I think this is a great idea!
Kudos
Very neat indeed!
martin
yes. forkoff has a number of consumer *green* threads used to manage
an array of queues containing the elements destined to be passed to a
forked process/native thread for execution of the block. the code is
very short, give a read.
cheers.
We might learn from both implementations.
gegroet,
Erik V. - http://www.erikveen.dds.nl/
----------------------------------------------------------------
Here's my code:
----------------------------------------------------------------
module Enumerable
def fork(max_number_of_threads=nil, &block)
thread_limiter =
EV::ThreadLimiter.new(max_number_of_threads)
collect do |x|
thread_limiter.fork do
Thread.current.abort_on_exception = true
r, w = IO.pipe
if pid = Process.fork
w.close
Process.wait(pid)
data = r.read
r.close
Marshal.load(data)
else
r.close
Marshal.dump(block.call(x), w)
w.close
exit
end
end
end.collect do |t|
t.value
end
end
end
----------------------------------------------------------------
module EV
class ThreadLimiter
def initialize(max_number_of_threads)
@number_of_threads = 0
@max_number_of_threads = max_number_of_threads
yield(self) if block_given?
end
def fork(*args, &block)
Thread.pass while @max_number_of_threads and
@max_number_of_threads > 0 and
@number_of_threads >
@max_number_of_threads
# If this methods is called from several threads, then
# @number_of_threads might get bigger than
@max_number_of_threads.
# This usually a) isn't the case and b) doesn't really matter
(to me...).
# I'm willing to accept this "risk", because a)
Thread.exclusive is
# much, much faster than Mutex#synchronize and b) we can't run
into
# deadlocks.
Thread.exclusive{@number_of_threads += 1}
Thread.fork do
begin
res = block.call(*args)
ensure
Thread.exclusive{@number_of_threads -= 1}
end
res
end
end
end
end
----------------------------------------------------------------
Here's a benchmark:
require "benchmark"
Benchmark.bm(15) do |bm|
rc = nil
r2 = nil
r4 = nil
rx = nil
data = 1..10
test = lambda{|x| 1_000_000.times{7+8}; [x, Process.pid]}
bm.report(" collect "){rc = data.collect(&test)}
bm.report(" 2 processes"){r2 = data.fork(2, &test)}
bm.report(" 4 processes"){r4 = data.fork(4, &test)}
bm.report("inf processes"){rx = data.fork(-1, &test)}
p rc
p r2
p r4
p rx
end
It produces these results on a dual core machine:
user system total real
collect 4.530000 0.000000 4.530000 ( 4.527982)
2 processes 0.030000 0.050000 3.170000 ( 1.733209)
4 processes 0.160000 0.370000 3.610000 ( 1.927826)
inf processes 0.000000 0.000000 3.080000 ( 1.691932)
[[1, 18732], [2, 18732], [3, 18732], [4, 18732], [5, 18732], [6,
18732], [7, 18732], [8, 18732], [9, 18732], [10, 18732]]
[[1, 18733], [2, 18734], [3, 18735], [4, 18736], [5, 18737], [6,
18738], [7, 18739], [8, 18740], [9, 18741], [10, 18742]]
[[1, 18743], [2, 18744], [3, 18745], [4, 18746], [5, 18747], [6,
18748], [7, 18749], [8, 18750], [9, 18751], [10, 18752]]
[[1, 18753], [2, 18754], [3, 18755], [4, 18756], [5, 18757], [6,
18758], [7, 18759], [8, 18760], [9, 18761], [10, 18762]]
----------------------------------------------------------------
If i isn't defined outside the loop, you don't have to pas i to
the thread, so "Thread.new{" will do. However, if i is defined
outside the loop (which it isn't, in your code...),
"Thread.new(i){|i|" won't work (see below): It's better to use
"Thread.new(i1){|i2|" instead.
gegroet,
Erik V. - http://www.erikveen.dds.nl/
----------------------------------------------------------------
a = (1..10).to_a
a1 = a.map{|i| Thread.new { sleep 0.01 ; i }}.map{|t|
t.value}
a2 = a.map{|i| Thread.new(i) {|i| sleep 0.01 ; i }}.map{|t|
t.value} # Will do.
i = nil
a3 = a.map{|i| Thread.new(i) {|i| sleep 0.01 ; i }}.map{|t|
t.value} # Won't do!
a4 = a.map{|i1| Thread.new(i1){|i2|sleep 0.01 ; i2}}.map{|t|
t.value}
p a1 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
p a2 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
p a3 # ==> [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]
p a4 # ==> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
----------------------------------------------------------------
this is precisely how forkoff manages the number of processes, only
it's via Queues and a fixed number of threads consuming from those
queues
the latest impl from svn:
regards.
indeed - left over from a previous iteration.
%w( a b c d ).forkoff! :processes => 2 do |letter|
for this syntax
%w( a b c d ).forkoff! 2 do |letter|
though. Then I wouldn't have to remember that 'processes' keyword.
Anyhow, it is a great piece of code.
/Fredrik
the next release supports either an hash (options) or numeric argument
- so either will work - may release tonight...
cheers.
to do this you'll want to combine forkoff with my slave lib: which
sets up an object which is fronted by drb, an which can indeed be a
singleton - note that this object is, itself, running in a child
process, but you can ignore this for the most part. an simple example:
cfp:~ > cat a.rb
require 'rubygems'
require 'slave'
require 'forkoff'
slave = Slave.new(:threadsafe => true){ Hash.new }
process_global = slave.object
( 0 .. 4 ).each do |i|
process_global[i] = i ** 2
end
process_global.each do |k,v|
p k => v
end
cfp:~ > ruby a.rb
{0=>0}
{1=>1}
{2=>4}
{3=>9}
{4=>16}
even with these abstractions you have to consider deeply what's
happening with threads/processes etc - but yes, it's definitely
possible with little code.
cheers.
Sorry, just figured out calling fork makes the thread a child process...
just need to find out how to set a timeout on the thread... anyone
suggestions?
require 'timeout'
begin
Timeout.timeout(seconds){ thread.join }
rescue Timeout::Error
thread.kill rescue nil
end
That'll kill the thread, but not the child process that was forked, at least
that's what I remember.
Try this as a general strategy:
parent_t = Thread.new(cmd) do |exec_me|
if cpid = fork then
Thread.current[:cpid] = cpid
cpid, exit_status = Process.waitpid2(cpid)
Thread.current[:exit_status] = exit_status
else
exec exec_me
end
end
unless parent_t.join( seconds ) # seconds contains your timeout value
cpid = parent_t[:cpid]
%w[ TERM KILL ].each do |sig|
Process.kill(sig, cpid)
break if parent_t.join(1)
end
# Do something with Thread.current[:exit_status] to report the child
# process exit status.
enjoy,
-jeremy
--
========================================================================
Jeremy Hinegardner jer...@hinegardner.org