Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Re: multiprocessing, what am I doing wrong?

17 views
Skip to first unread message

MRAB

unread,
Feb 23, 2012, 3:42:05 PM2/23/12
to pytho...@python.org
On 23/02/2012 17:59, Eric Frederich wrote:
> Below is some pretty simple code and the resulting output.
> Sometimes the code runs through but sometimes it just freezes for no
> apparent reason.
> The output pasted is where it just got frozen on me.
> It called start() on the 2nd worker but the 2nd worker never seemed to
> enter the run method.
>
[snip]

The 2nd worker did enter the run method; there are 2 lines of "2".

Maybe there's an uncaught exception in the run method for some reason.
Try doing something like this:

try:
args = self.inbox.get_nowait()
except Queue.Empty:
break
except:
import traceback
print "*** Exception in worker"
print >> sys.stderr, traceback.print_exc()
sys.stderr.flush()
print "***"
raise

MRAB

unread,
Feb 24, 2012, 1:36:10 PM2/24/12
to pytho...@python.org
On 24/02/2012 17:00, Eric Frederich wrote:
> I can sill get it to freeze and nothing is printed out from the other
> except block.
> Does it look like I'm doing anything wrong here?
>
[snip]
I don't normally use multiprocessing, so I forgot about a critical
detail. :-(

When the multiprocessing module starts a process, that process
_imports_ the module which contains the function which is to be run, so
what's happening is that when your script is run, it creates and starts
workers, the multiprocessing module makes a new process for each
worker, each of those processes then imports the script, which creates
and starts workers, etc, leading to an ever-increasing number of
processes.

The solution is to ensure that the script/module distinguishes between
being run as the main script and being imported as a module:

#!/usr/bin/env python

import sys
import Queue
import multiprocessing
import time

def FOO(a, b, c):
print 'foo', a, b, c
return (a + b) * c

class MyWorker(multiprocessing.Process):
def __init__(self, inbox, outbox):
super(MyWorker, self).__init__()
self.inbox = inbox
self.outbox = outbox
print >> sys.stderr, '1' * 80; sys.stderr.flush()
def run(self):
print >> sys.stderr, '2' * 80; sys.stderr.flush()
while True:
try:
args = self.inbox.get_nowait()
except Queue.Empty:
break
self.outbox.put(FOO(*args))

if __name__ == '__main__':
# This file is being run as the main script. This part won't be
# run if the file is imported.
todo = multiprocessing.Queue()

for i in xrange(100):
todo.put((i, i+1, i+2))

print >> sys.stderr, 'a' * 80; sys.stderr.flush()
result_queue = multiprocessing.Queue()

print >> sys.stderr, 'b' * 80; sys.stderr.flush()
w1 = MyWorker(todo, result_queue)
print >> sys.stderr, 'c' * 80; sys.stderr.flush()
w2 = MyWorker(todo, result_queue)

print >> sys.stderr, 'd' * 80; sys.stderr.flush()
w1.start()
print >> sys.stderr, 'e' * 80; sys.stderr.flush()
w2.start()
print >> sys.stderr, 'f' * 80; sys.stderr.flush()

for i in xrange(100):
print result_queue.get()

MRAB

unread,
Feb 27, 2012, 9:38:26 PM2/27/12
to pytho...@python.org
On 27/02/2012 16:57, Eric Frederich wrote:
> Still freezing sometimes, like 1 out of 10 times that I run it.
> Here is updated code and a couple of outputs.
>
[snip]
I don't know what the problem is. All I can suggest is a slightly
modified version.

If a worker that says it's terminating without first saying that it's
got nothing, then I can only assume that the worker had some uncaught
exception.


#!/usr/bin/env python

import sys
import Queue
import multiprocessing
import time

def FOO(a, b, c):
print 'foo', a, b, c
return (a + b) * c

class MyWorker(multiprocessing.Process):
def __init__(self, name, inbox, outbox):
super(MyWorker, self).__init__()
self.name = name
self.inbox = inbox
self.outbox = outbox
print >> sys.stderr, 'Created %s' % self.name; sys.stderr.flush()
def run(self):
print >> sys.stderr, 'Running %s' % self.name; sys.stderr.flush()
try:
while True:
try:
args = self.inbox.get_nowait()
print >> sys.stderr, '%s got something to do' %
self.name; sys.stderr.flush()
except Queue.Empty:
print >> sys.stderr, '%s got nothing' % self.name;
sys.stderr.flush()
break
self.outbox.put(FOO(*args))
finally:
print >> sys.stderr, '%s is terminating' % self.name;
sys.stderr.flush()

if __name__ == '__main__':
# This file is being run as the main script. This part won't be
# run if the file is imported.

print >> sys.stderr, 'Creating todo queue'; sys.stderr.flush()
todo = multiprocessing.Queue()

for i in xrange(100):
todo.put((i, i + 1, i + 2))

print >> sys.stderr, 'Creating results queue'; sys.stderr.flush()
result_queue = multiprocessing.Queue()

print >> sys.stderr, 'Creating Workers'; sys.stderr.flush()
w1 = MyWorker('Worker 1', todo, result_queue)
w2 = MyWorker('Worker 2', todo, result_queue)

print >> sys.stderr, 'Starting Worker 1'; sys.stderr.flush()
w1.start()
print >> sys.stderr, 'Starting Worker 2'; sys.stderr.flush()
w2.start()

MRAB

unread,
Feb 28, 2012, 1:12:36 PM2/28/12
to pytho...@python.org
On 28/02/2012 17:16, Eric Frederich wrote:
> If I do a time.sleep(0.001) right at the beginning of the run() method,
> then it completes fine.
> I was able to run it through a couple hundred times without problem.
> If I sleep for less time than that or not at all, it may or may not
> complete.
>
[snip]

To me that suggests that the OS works in units of 1 millisecond, so a
sleep of less than 0.001 seconds is rounded to 0 milliseconds.
0 new messages