Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Get multiprocessing.Queue to do priorities

1,593 views
Skip to first unread message

uuid

unread,
May 9, 2009, 12:42:34 PM5/9/09
to
Hello,
I was wondering whether there was a way to make multiprocessing.Queue
behave in a priority queue-like fashion. Subclassing with heappush and
heappop for put and get doesn't work the old way (multiprocessing.Queue
seems to use different data structures than Queue.Queue?)

Could one create a heapq within the producer as a proxy, and then feed
a proper queue from that? Does anyone have an idea on how to deal with
the queue item flow control? (Since all of the sorting has to happen
within the heapq, it should only pass items to the real Queue if it's
empty?)

Thanks in advance!

uuid

unread,
May 9, 2009, 1:00:54 PM5/9/09
to
I just read up and it seems that no matter the approach, it's futile to
use multiprocessing.Queue, since there is a bug that prevents true
FIFO. (http://bugs.python.org/issue4999)

Any recommendation on an alternate way to build a priority queue to use
with a "one producer, many consumers" type multiprocessing setup would
be welcomed!

:|

Scott David Daniels

unread,
May 9, 2009, 1:56:44 PM5/9/09
to
uuid wrote:
> Any recommendation on an alternate way to build a priority queue to use
> with a "one producer, many consumers" type multiprocessing setup would
> be welcomed!

? "one producer, many consumers" ?
What would the priority queue do? Choose a consumer?

--Scott David Daniels
Scott....@Acm.Org

uuid

unread,
May 9, 2009, 2:23:25 PM5/9/09
to
Scott David Daniels <Scott....@Acm.Org> wrote:
>
> ? "one producer, many consumers" ?
> What would the priority queue do? Choose a consumer?

Sorry, I should have provided a little more detail. There is one
producer thread, reading urls from multiple files and external input.
These urls have a certain priority, and are fed to multiple consumer
threads for fetching and further processing (XML parsing and such CPU
intensive stuff). Since the content of the urls is changing over time,
it is crucial to have a certain amount of control over the order in
which the requests occur. So, to answer the question:
The priority queue would make sure that out of a number of
asynchronously added items, those with a high priority are fetched first
by the worker threads. Sounds like a perfect case for a heap, if only I
could :)

Scott David Daniels

unread,
May 9, 2009, 4:00:36 PM5/9/09
to

2.6 has a PriorityQueue in the Queue module.
If you aren't using 2.6, you could copy the code for your own version.

--Scott David Daniels
Scott....@Acm.Org

uuid

unread,
May 9, 2009, 6:11:21 PM5/9/09
to
The Queue module, apparently, is thread safe, but *not* process safe.
If you try to use an ordinary Queue, it appears inaccessible to the
worker process. (Which, after all, is quite logical, since methods for
moving items between the threads of the same process are quite
different from inter-process communication.) It appears that creating a
manager that holds a shared queue might be an option
(http://stackoverflow.com/questions/342556/python-2-6-multiprocessing-queue-compatible-with-threads).

Just

for illustration: This shows that Queue.Queue doesn't work with processes:

------------------------
def worker(queue):
while True:
item = queue.get()
print item
queue.task_done()

queue_queue = Queue.Queue()

worker_thread = multiprocessing.Process(target=worker, args=(queue_queue,))
worker_thread.start()
for i in range(10):
queue_queue.put(str(i))
time.sleep(10)
while True:
try:
print 'still on queue: ' + queue_queue.get(False)
except Queue.Empty:
break
worker_thread.join()
------------------------

This yields:

still on queue: 0
still on queue: 1
still on queue: 2
still on queue: 3
still on queue: 4
still on queue: 5
still on queue: 6
still on queue: 7
still on queue: 8
still on queue: 9

So no queue item ever arrives at the worker process.

Jesse Noller

unread,
May 10, 2009, 9:35:03 AM5/10/09
to uuid, Python List
On Sat, May 9, 2009 at 6:11 PM, uuid <M8R-g...@mailinator.com> wrote:
> The Queue module, apparently, is thread safe, but *not* process safe. If you
> try to use an ordinary Queue, it appears inaccessible to the worker process.
> (Which, after all, is quite logical, since methods for moving items between
> the threads of the same process are quite different from inter-process
> communication.) It appears that creating a manager that holds a shared queue
> might be an option
> (http://stackoverflow.com/questions/342556/python-2-6-multiprocessing-queue-compatible-with-threads).

Using a manager, or submitting a patch which adds priority queue to
the multiprocessing.queue module is the correct solution for this.

You can file an enhancement in the tracker, and assign/add me to it,
but without a patch it may take me a bit (wicked busy right now).

jesse

uuid

unread,
May 10, 2009, 10:30:53 AM5/10/09
to
Dear Jesse,
thanks for the hint.

I see you are already assigned to the FIFO bug
(http://bugs.python.org/issue4999), so I won't burden you even more.
Clearly, a reliable FIFO behavior of multiprocessing.Queue helps more
than a priority queue, since it can be used to build one, so that
should really be the first thing to fix.

In the meantime, I think I'll whip up a hack that uses sort of a
bucket- strategy: fill up a prioritized heapq, and then, in regular
intervals, unload its contents into a size-limited multiprocessing
queue.

I'll post this as soon as it works.

-u

madelei...@gmail.com

unread,
Apr 1, 2013, 6:35:44 PM4/1/13
to uuid, Python List
Was this issue ever resolved? What is the current best practice for those wishing to use a priority queue with multiprocessing?

madelei...@gmail.com

unread,
Apr 1, 2013, 6:35:44 PM4/1/13
to comp.lan...@googlegroups.com, Python List, uuid
Was this issue ever resolved? What is the current best practice for those wishing to use a priority queue with multiprocessing?

On Sunday, May 10, 2009 6:35:03 AM UTC-7, Jesse Noller wrote:
0 new messages