Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Multiprocessing problem

1 view
Skip to first unread message

Matt Chaput

unread,
Mar 2, 2010, 3:59:15 PM3/2/10
to pytho...@python.org
Hi,

I'm having a problem with the multiprocessing package.

I'm trying to use a simple pattern where a supervisor object starts a
bunch of worker processes, instantiating them with two queues (a job
queue for tasks to complete and an results queue for the results). The
supervisor puts all the jobs in the "job" queue, then join()s the
workers, and then pulls all the completed results off the "results" queue.

(I don't think I can just use something like Pool.imap_unordered for
this because the workers need to be objects with state.)

Here's a simplified example:

http://pastie.org/850512

The problem is that seemingly randomly, but almost always, the worker
processes will deadlock at some point and stop working before they
complete. This will leave the whole program stalled forever. This seems
more likely the more work each worker does (to the point where adding
the time.sleep(0.01) as seen in the example code above guarantees it).
The problem seems to occur on both Windows and Mac OS X.

I've tried many random variations of the code (e.g. using JoinableQueue,
calling cancel_join_thread() on one or both queues even though I have no
idea what it does, etc.) but keep having the problem.

Am I just using multiprocessing wrong? Is this a bug? Any advice?

Thanks,

Matt

Matt Chaput

unread,
Mar 2, 2010, 6:04:09 PM3/2/10
to pytho...@python.org
On 3/2/2010 3:59 PM, Matt Chaput wrote:
> I'm trying to use a simple pattern where a supervisor object starts a
> bunch of worker processes, instantiating them with two queues (a job
> queue for tasks to complete and an results queue for the results). The
> supervisor puts all the jobs in the "job" queue, then join()s the
> workers, and then pulls all the completed results off the "results"
queue.

> Here's a simplified example:
>
> http://pastie.org/850512

I should mention that if I change my code so the workers just pull
things off the job queue but don't put any results on the result queue
until after they see the None sentinel in the job queue and break out of
the loop, I don't get the deadlock. So it's something about getting from
one queue and putting to another queue in close proximity.

Hopefully I'm making a simple mistake with how I'm using the library and
it'll be easy to fix...

Thanks,

Matt

MRAB

unread,
Mar 2, 2010, 7:53:22 PM3/2/10
to pytho...@python.org
Matt Chaput wrote:
> Hi,
>
> I'm having a problem with the multiprocessing package.
>
> I'm trying to use a simple pattern where a supervisor object starts a
> bunch of worker processes, instantiating them with two queues (a job
> queue for tasks to complete and an results queue for the results). The
> supervisor puts all the jobs in the "job" queue, then join()s the
> workers, and then pulls all the completed results off the "results" queue.
>
> (I don't think I can just use something like Pool.imap_unordered for
> this because the workers need to be objects with state.)
>
> Here's a simplified example:
>
> http://pastie.org/850512
>
> The problem is that seemingly randomly, but almost always, the worker
> processes will deadlock at some point and stop working before they
> complete. This will leave the whole program stalled forever. This seems
> more likely the more work each worker does (to the point where adding
> the time.sleep(0.01) as seen in the example code above guarantees it).
> The problem seems to occur on both Windows and Mac OS X.
>
> I've tried many random variations of the code (e.g. using JoinableQueue,
> calling cancel_join_thread() on one or both queues even though I have no
> idea what it does, etc.) but keep having the problem.
>
> Am I just using multiprocessing wrong? Is this a bug? Any advice?
>
There's a difference between multithreading and multiprocessing.

In multithreading the threads share the same address space, so objects
can be passed between the threads simply by passing references to those
objects.

In multiprocessing, however, the process don't share an address space,
so the objects themselves need to be transferred between the processes
via pipes, but the pipes have a limited capacity.

If the main process doesn't get the results from the queue until the
worker processes terminate, and the worker processes don't terminate
until they've put their results in the queue, and the pipe consequently
fills up, then deadlock can result.

Matt Chaput

unread,
Mar 2, 2010, 8:07:49 PM3/2/10
to pytho...@python.org
> If the main process doesn't get the results from the queue until the
> worker processes terminate, and the worker processes don't terminate
> until they've put their results in the queue, and the pipe consequently
> fills up, then deadlock can result.

The queue never fills up... on platforms with qsize() I can see this. I
remove items from the results queue as I add to the job queue, and if I
add timeouts everywhere the workers never raise Empty and the supervisor
never raises Full. They just deadlock.

I've rewritten the code so the worker threads don't push information
back while they run, they just write to a temporary file which the
supervisor can read, which avoids the issue. But if anyone can tell me
what I was doing wrong for future reference, I'd greatly appreciate it.

Thanks,

Matt

larudwer

unread,
Mar 3, 2010, 12:49:36 PM3/3/10
to
Hello Matt

I think the problem is here:

for n in xrange(100000):
outqueue.put(str(n)) <-- fill the queue with 100000
elements
try:
r = inqueue.get_nowait() <-- queue is still empty because
processes need some time to start
results.append(r)
except Empty:
pass <-- causing 100000 passes

....

print "-"
for task in tasks:
outqueue.put(None) <-- put even more data in the queue
...
# in the meantime the processes start to run and are trying to put data
# in to the output queue. However this queue might fill up, and lock
# all processes that try to write data in the already filled up queue

print "joining"
for task in tasks:
task.join() <-- can never succeed because processes
are waiting for someone reading the result queue
print "joined"

This example works:

from Queue import Empty, Full
from multiprocessing import Queue, Process
from base64 import b64encode
import time, random

class Worker(Process):
def __init__(self, inqueue, outqueue):
Process.__init__(self)
self.inqueue = inqueue
self.outqueue = outqueue

def run(self):
inqueue = self.inqueue
outqueue = self.outqueue
c = 0
while True:
arg = inqueue.get()
if arg is None: break
c += 1
b = b64encode(arg)
outqueue.put(b)

# Clean-up code goes here
outqueue.put(c)

class Supervisor(object):
def __init__(self):
pass

def go(self):
outqueue = Queue()
inqueue = Queue()
tasks = [Worker(outqueue, inqueue) for _ in xrange(4)]
for task in tasks:
task.start()

results = []
print "*"
for n in xrange(100000):
outqueue.put(str(n))

print "-"
for task in tasks:
outqueue.put(None)

print "emptying queue"
try:
while True:
r = inqueue.get_nowait()
results.append(r)
except Empty:
pass
print "done"
print len(results)

print "joining"
for task in tasks:
task.join()
print "joined"

if __name__ == "__main__":
s = Supervisor()
s.go()

0 new messages