I ran a few tests on the new Python 2.6 multiprocessing module before
migrating a threading code, and found out the locking code is not
working well. In this case, a pool of 5 processes is running, each
trying to get the lock and releasing it after waiting 0.2 seconds
(action is repeated twice). It looks like the multiprocessing lock
allows multiple locking after the second pass. Running the exact same
code with threads works correctly.
Further down is the test code, output is great when running with
threads (the sequence of lock/unlock looks good), but the output gets
mixed up (mutliple locks in a row) when running with processes.
My setup is : Mac OS X 10.5 running Python 2.6.1 from MacPython
Did I do something wrong, or is there a limitation for multiprocessing
locks that I am not aware of?
Thank you for your help!
-- Fred
-------------------------------
#!/usr/bin/python
# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue, Lock
from Queue import Empty
from threading import Thread
import time
class test_lock_process(object):
def __init__(self, lock, id, queue):
self.lock = lock
self.id = id
self.queue = queue
self.read_lock()
def read_lock(self):
for i in xrange(2):
self.lock.acquire()
self.queue.put('[proc%d] Got lock' % self.id)
time.sleep(.2)
self.queue.put('[proc%d] Released lock' % self.id)
self.lock.release()
def test_lock(processes=10, lock=Lock(), process=True, queue=None):
print_result = False
if queue == None:
print_result = True
queue = Queue()
threads = []
for i in xrange(processes):
if process: threads.append(Process(target=test_lock_process,
args=(lock,i,queue,)))
else: threads.append(Thread(target=test_lock_process, args=
(lock,i,queue,)))
for t in threads:
t.start()
for t in threads:
t.join()
if print_result:
try:
while True: print queue.get(block=False)
except Empty:
pass
if __name__ == "__main__":
#test_lock(processes=5, process=True)
test_lock(processes=5)
> I ran a few tests on the new Python 2.6 multiprocessing module before
> migrating a threading code, and found out the locking code is not
> working well. In this case, a pool of 5 processes is running, each
> trying to get the lock and releasing it after waiting 0.2 seconds
> (action is repeated twice). It looks like the multiprocessing lock
> allows multiple locking after the second pass. Running the exact same
> code with threads works correctly.
I've tested your code on Windows and I think the problem is on the Queue
class. If you replace the Queue with some print statements or write to a
log file, the sequence lock/release is OK.
You should file a bug report on http://bugs.python.org/
--
Gabriel Genellina
A little bit of instrumentation in the code reveals the problem. The
Queue class doesn't always return the items in the order that they
were put in. This should probably be either documented or fixed! I
suspect it is impossible to fix for a multi-producer Queue though.
The first number is time since the program started. Tested under linux.
0.048810 [proc0] Got lock
0.248679 [proc0] Released lock
0.248858 [proc0] Got lock
0.448666 [proc0] Released lock
0.448859 [proc2] Got lock
0.648639 [proc2] Released lock
0.648893 [proc3] Got lock
0.848633 [proc3] Released lock
0.848767 [proc3] Got lock
1.048635 [proc3] Released lock
1.049090 [proc1] Got lock
1.248617 [proc1] Released lock
1.248743 [proc1] Got lock
1.448634 [proc1] Released lock
1.448810 [proc4] Got lock
1.648674 [proc4] Released lock
1.648831 [proc4] Got lock
1.849867 [proc2] Got lock <--- out of order
1.849679 [proc4] Released lock <--- out of order
2.048683 [proc2] Released lock
#!/usr/bin/python -*- coding: utf-8 -*-
from multiprocessing import Process, Queue, Lock
from Queue import Empty
from threading import Thread
import time
start = time.time()
def now():
return time.time() - start
class test_lock_process(object):
def __init__(self, lock, id, queue):
self.lock = lock
self.id = id
self.queue = queue
self.read_lock()
def read_lock(self):
for i in xrange(2):
self.lock.acquire()
self.queue.put('%9.6f [proc%d] Got lock' % (now(),
self.id))
time.sleep(.2)
self.queue.put('%9.6f [proc%d] Released lock' % (now(),
self.id))
self.lock.release()
def test_lock(processes=10, lock=Lock(), process=True, queue=None):
print_result = False
if queue == None:
print_result = True
queue = Queue()
threads = []
for i in xrange(processes):
if process: threads.append(Process(target=test_lock_process,
args=(lock,i,queue,)))
else: threads.append(Thread(target=test_lock_process,
args=(lock,i,queue,)))
for t in threads:
t.start()
for t in threads:
t.join()
if print_result:
try:
while True: print queue.get(block=False)
except Empty:
pass
if __name__ == "__main__":
#test_lock(processes=5, process=True)
test_lock(processes=5)
--
Nick Craig-Wood <ni...@craig-wood.com> -- http://www.craig-wood.com/nick
Thanks for your help gabriel, I just tested it without the queue and
it works! I'll file a bug about the queues.
Fred
For those interested, the code that works (well, it always did, but
this shows the real result):
class test_lock_process(object):
def __init__(self, lock):
self.lock = lock
self.read_lock()
def read_lock(self):
for i in xrange(5):
self.lock.acquire()
logging.info('Got lock')
time.sleep(.2)
logging.info('Released lock')
self.lock.release()
if __name__ == "__main__":
logging.basicConfig(format='[%(process)04d@%(relativeCreated)04d] %
(message)s', level=logging.DEBUG)
lock = Lock()
processes = []
for i in xrange(2):
processes.append(Process(target=test_lock_process, args=
(lock,)))
for t in processes:
t.start()
for t in processes:
t.join()
Opened issue #4999 [http://bugs.python.org/issue4999] on the matter,
referencing this thread.
--
Frédéric Sagnes
Thanks, I've assigned it to myself. Hopefully I can get a fix put
together soonish, time permitting.
-jesse
Sounds like it might be hard or impossible to fix to me. I'd love to
be proved wrong though!
If you were thinking of passing time.time() /
clock_gettime(CLOCK_MONOTONIC) along in the Queue too, then you'll
want to know that it can differ by significant amounts on different
processors :-(
Good luck!
Consider my parade rained on. And after looking at it this morning,
yes - this is going to be hard, and should be fixed for a FIFO queue
:\
-jesse