Could one create a heapq within the producer as a proxy, and then feed
a proper queue from that? Does anyone have an idea on how to deal with
the queue item flow control? (Since all of the sorting has to happen
within the heapq, it should only pass items to the real Queue if it's
empty?)
Thanks in advance!
Any recommendation on an alternate way to build a priority queue to use
with a "one producer, many consumers" type multiprocessing setup would
be welcomed!
:|
? "one producer, many consumers" ?
What would the priority queue do? Choose a consumer?
--Scott David Daniels
Scott....@Acm.Org
Sorry, I should have provided a little more detail. There is one
producer thread, reading urls from multiple files and external input.
These urls have a certain priority, and are fed to multiple consumer
threads for fetching and further processing (XML parsing and such CPU
intensive stuff). Since the content of the urls is changing over time,
it is crucial to have a certain amount of control over the order in
which the requests occur. So, to answer the question:
The priority queue would make sure that out of a number of
asynchronously added items, those with a high priority are fetched first
by the worker threads. Sounds like a perfect case for a heap, if only I
could :)
2.6 has a PriorityQueue in the Queue module.
If you aren't using 2.6, you could copy the code for your own version.
--Scott David Daniels
Scott....@Acm.Org
Just
for illustration: This shows that Queue.Queue doesn't work with processes:
------------------------
def worker(queue):
while True:
item = queue.get()
print item
queue.task_done()
queue_queue = Queue.Queue()
worker_thread = multiprocessing.Process(target=worker, args=(queue_queue,))
worker_thread.start()
for i in range(10):
queue_queue.put(str(i))
time.sleep(10)
while True:
try:
print 'still on queue: ' + queue_queue.get(False)
except Queue.Empty:
break
worker_thread.join()
------------------------
This yields:
still on queue: 0
still on queue: 1
still on queue: 2
still on queue: 3
still on queue: 4
still on queue: 5
still on queue: 6
still on queue: 7
still on queue: 8
still on queue: 9
So no queue item ever arrives at the worker process.
Using a manager, or submitting a patch which adds priority queue to
the multiprocessing.queue module is the correct solution for this.
You can file an enhancement in the tracker, and assign/add me to it,
but without a patch it may take me a bit (wicked busy right now).
jesse
I see you are already assigned to the FIFO bug
(http://bugs.python.org/issue4999), so I won't burden you even more.
Clearly, a reliable FIFO behavior of multiprocessing.Queue helps more
than a priority queue, since it can be used to build one, so that
should really be the first thing to fix.
In the meantime, I think I'll whip up a hack that uses sort of a
bucket- strategy: fill up a prioritized heapq, and then, in regular
intervals, unload its contents into a size-limited multiprocessing
queue.
I'll post this as soon as it works.
-u