With ruby 1.8.6 (2007-03-13 patchlevel 0) [i386-cygwin] I get a
deadlock error. ruby 1.8.5 (2006-08-25) [i386-mswin32] is not giving
me that error. I think that my code is written correctly and that
this may be a bug in the version/build of ruby I am running on. Does
anybody see a problem with this code? Thanks.
require 'thread'
class ThreadPool
def initialize(thread_size=10, queue_size=100)
@mutex = Mutex.new
@cv = ConditionVariable.new
@queue = []
@max_queue_size = queue_size
@threads = []
thread_size.times { @threads << Thread.new { start_worker } }
end
def add_work(*args, &callback)
push_task(Task.new(*args, &callback))
end
def push_task(task)
puts "#{Thread.current}|push_task|sync on @mutex"
@mutex.synchronize do
while @max_queue_size > 0 && @queue.size >= @max_queue_size do
puts "#{Thread.current}|push_task|wait on @mutex"
@cv.wait(@mutex)
end
@queue.push(task)
puts "#{Thread.current}|execute|broadcast on @mutex"
@cv.broadcast
end
puts "#{Thread.current}|push_task|done with sync on @mutex"
task
end
def pop_task
task = nil
puts "#{Thread.current}|pop_task|sync on @mutex"
@mutex.synchronize do
while @queue.size == 0 do
puts "#{Thread.current}|pop_task|wait on @mutex"
@cv.wait(@mutex)
end
task = @queue.shift
puts "#{Thread.current}|execute|broadcast on @mutex"
@cv.broadcast
end
puts "#{Thread.current}|pop_task|done with sync on @mutex"
task
end
def start_worker
puts "#{Thread.current} running worker"
while true
task = pop_task
return if task == :stop
task.execute
end
end
class Task
attr_reader :result, :exception
def initialize(*args, &callback)
@args = args
@callback = callback
@done = false
@result = nil
@exception = nil
@mutex = Mutex.new
@cv = ConditionVariable.new
end
def execute
begin
@result = @callback.call(*@args)
rescue Exception => e
@exception = e
STDERR.puts "Error in thread #{Thread.current} - #{e}"
e.backtrace.each { |element| STDERR.puts(element) }
end
puts "#{Thread.current}|execute|sync on @mutex"
@mutex.synchronize do
@done = true
puts "#{Thread.current}|execute|broadcast on @mutex"
@cv.broadcast
end
puts "#{Thread.current}|execute|done with sync on @mutex"
end
def join
puts "#{Thread.current}|join|sync on @mutex"
@mutex.synchronize do
while !@done
puts "#{Thread.current}|join|wait on @mutex"
@cv.wait(@mutex)
end
end
puts "#{Thread.current}|join|done with sync on @mutex"
end
end
end
tasks = []
tp = ThreadPool.new(3, 10)
sleep(1)
100.times do |id|
STDERR.puts "adding work"
tasks << tp.add_work do
puts "Running #{id} #{Thread.current}"
sleep 2
puts "Ending #{id} #{Thread.current}"
Time.now
end
end
puts "Waiting for tasks to complete"
tasks.each do |task|
task.join
if !task.exception.nil?
puts "Failed task - #{task.exception}"
else
puts "Result - #{task.result}"
end
end
output:
--------------------------
$ ruby -v
ruby 1.8.6 (2007-03-13 patchlevel 0) [i386-cygwin]
[/users/bcastill/tpool]
$ ruby thread_pool.rb
#<Thread:0x1002b63c> running worker
#<Thread:0x1002b63c>|pop_task|sync on @mutex
#<Thread:0x1002b63c>|pop_task|wait on @mutex
#<Thread:0x1002b4fc> running worker
#<Thread:0x1002b4fc>|pop_task|sync on @mutex
#<Thread:0x1002b4fc>|pop_task|wait on @mutex
#<Thread:0x1002b3d0> running worker
#<Thread:0x1002b3d0>|pop_task|sync on @mutex
#<Thread:0x1002b3d0>|pop_task|wait on @mutex
adding work
#<Thread:0x1003c964>|push_task|sync on @mutex
#<Thread:0x1003c964>|execute|broadcast on @mutex
#<Thread:0x1002b3d0>|execute|broadcast on @mutex
#<Thread:0x1003c964>|push_task|done with sync on @mutex
adding work
#<Thread:0x1003c964>|push_task|sync on @mutex
#<Thread:0x1002b4fc>|pop_task|wait on @mutex
#<Thread:0x1002b3d0>|pop_task|done with sync on @mutex
Running 0 #<Thread:0x1002b3d0>
Ending 0 #<Thread:0x1002b3d0>
#<Thread:0x1002b3d0>|execute|sync on @mutex
#<Thread:0x1002b3d0>|execute|broadcast on @mutex
#<Thread:0x1002b3d0>|execute|done with sync on @mutex
#<Thread:0x1002b3d0>|pop_task|sync on @mutex
#<Thread:0x1002b3d0>|pop_task|wait on @mutex
deadlock 0x1002b4fc: sleep:- - thread_pool.rb:39
deadlock 0x1002b63c: sleep:- - thread_pool.rb:39
deadlock 0x1003c964: sleep:- (main) - thread_pool.rb:20
deadlock 0x1002b3d0: sleep:- - thread_pool.rb:39
thread_pool.rb:39:in `push_task': Thread(0x1002b3d0): deadlock (fatal)
from thread_pool.rb:15:in `add_work'
from thread_pool.rb:109
from thread_pool.rb:107:in `times'
from thread_pool.rb:107
I've tried using some different version of ruby to see if the code
works or not:
Failed versions:
------------------------
ruby 1.8.6 (2007-03-13 patchlevel 0) [i686-linux]
ruby 1.8.6 (2007-03-13 patchlevel 0) [i386-cygwin]
ruby 1.8.6 (2007-03-13 patchlevel 0) [i386-mswin32]
Working versions:
------------------------
ruby 1.8.5 (2006-08-25) [i386-mswin32]
ruby 1.8.5 (2007-08-27 rev 4201) [x86-jruby1.0.1]
ruby 1.8.3 (2005-09-21) [i686-linux]
ruby 1.9.0 (2007-09-15 patchlevel 0) [i386-cygwin]
It seems to me like there is something wrong with the latest stable
version of ruby.
1.8.6-p0 is broken, but it is not the most recent stable release.
The latest stable release is 1.8.6-p36.
-mental
I received the error with the stable recommended version from
http://www.ruby-lang.org/en/downloads/.
Should I be using the stable snapshot?
Hmm. It looks like that is different from p36 (the file size is different):
ftp://ftp.ruby-lang.org/pub/ruby/1.8/
Try the -p36 tarball and see if it makes a difference (it should, based
on others' experience).
Perhaps someone running the site needs to be more diligent about what is
offered for download.
-mental
Yes, p36 works fine. I should have done a search for deadlocks on this
group before posting.
> Perhaps someone running the site needs to be more diligent about what is
> offered for download.
That does concern me. The windows binary on http://www.ruby-lang.org/en/downloads/
points to p0 as well.
Is it a problem with updating the web page or should those links be
soft links to the latest 1.8.6 version?
Here is the directory listing from the ftp server in /pub/ruby
lrwxrwxrwx 1 1014 100 26 Aug 02 12:16 ruby-1.8.6-
p36.tar.bz2 -> 1.8/ruby-1.8.6-p36.tar.bz2
lrwxrwxrwx 1 1014 100 25 Aug 02 12:16 ruby-1.8.6-
p36.tar.gz -> 1.8/ruby-1.8.6-p36.tar.gz
lrwxrwxrwx 1 1014 100 22 Aug 02 12:16 ruby-1.8.6-
p36.zip -> 1.8/ruby-1.8.6-p36.zip
lrwxrwxrwx 1 1000 100 22 Mar 12 2007
ruby-1.8.6.tar.bz2 -> 1.8/ruby-1.8.6.tar.bz2
lrwxrwxrwx 1 1000 100 21 Mar 12 2007
ruby-1.8.6.tar.gz -> 1.8/ruby-1.8.6.tar.gz
lrwxrwxrwx 1 1000 100 18 Mar 12 2007 ruby-1.8.6.zip
-> 1.8/ruby-1.8.6.zip
> -mental
Even though 1.8.6-p0 was not working with Mutex/ConditionVariable, I
changed the code to use the Monitor class. I did not receive the
error even in 1.8.6-p0.
I found this post about the differences between Mutex and Monitor.
I think I will stick to using Monitor for future code.
Here is the code that works on 1.8.6-p0:
require 'thread'
require 'monitor'
class ThreadPool
class PoolStopped < Exception; end
def initialize(thread_size=10, queue_size=100)
@mutex = Monitor.new
@cv = @mutex.new_cond
@queue = []
@max_queue_size = queue_size
@threads = []
@stopped = false
thread_size.times { @threads << Thread.new { start_worker } }
end
def add_work(*args, &callback)
push_task(Task.new(*args, &callback))
end
def push_task(task)
@mutex.synchronize do
raise PoolStopped.new if @stopped
@cv.wait_while { @max_queue_size > 0 && @queue.size >=
@max_queue_size }
@queue.push(task)
@cv.broadcast
end
task
end
def pop_task
task = nil
@mutex.synchronize do
@cv.wait_while { @queue.size == 0 }
task = @queue.shift
@cv.broadcast
end
task
end
def shutdown
@mutex.synchronize do
@stopped = true
@threads.each { @queue.push(:stop) }
@cv.broadcast
end
@threads.each { |thread| thread.join }
end
def start_worker
while true
task = pop_task
return if task == :stop
task.execute
end
end
# wait for current work to complete
def sync
tasks = @mutex.synchronize { @queue.dup }
tasks.each { |task| task.join }
end
class Task
attr_reader :result, :exception
def initialize(*args, &callback)
@args = args
@callback = callback
@done = false
@result = nil
@exception = nil
@mutex = Monitor.new
@cv = @mutex.new_cond
end
def execute
begin
@result = @callback.call(*@args)
rescue Exception => e
@exception = e
STDERR.puts "Error in thread #{Thread.current} - #{e}"
e.backtrace.each { |element| STDERR.puts(element) }
end
@mutex.synchronize do
@done = true
@cv.broadcast
end
end
def join
@mutex.synchronize { @cv.wait_until { @done } }
end
end
end
tasks = []
tp = ThreadPool.new(10, 1000)
sleep(1)
100.times do |id|
STDERR.puts "adding work"
tasks << tp.add_work do
puts "Running #{id} #{Thread.current}"
sleep 5
puts "Ending #{id} #{Thread.current}"
end
end
puts "Waiting for shutdown"
tp.shutdown
puts "done"
I'd strongly recommend against using Monitor -- it is slow and has some
subtle bugs. Have you considered using fastthread?
-mental