Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

how to use priority queue with multiprocessing

143 views
Skip to first unread message

Marco Hornung

unread,
Jan 13, 2011, 12:07:47 PM1/13/11
to
Hey,

------------------------------------------------------------------------------------------
question
------------------------------------------------------------------------------------------
How can I use a priority queue to schedule jobs within the "multiprocessing pool" module?

------------------------------------------------------------------------------------------
my scenario
------------------------------------------------------------------------------------------
I want to run several jobs on a server. The jobs are being sent by users. However, all jobs have a different priority, and high-priority jobs should be processed before any low-priority job gets touched.
Currently I just append all incoming jobs to the multiprocessing worker pool as follows:
### initialize worker pool
pool = PriorityPool(processes=worker_count)
process_handles = []

### distribute function execution over several processes
for job_parameter in job_parameter_list:
handle = pool.apply_async(process_function, [job_parameter,])
process_handles.append(handle)

This will only put the jobs in some kind of a list - and execute the jobs in the order they come in. Is it possible to use a priority queue for the process-pool?

Kind Regards,
Marco

Marco Hornung

unread,
Jan 13, 2011, 3:03:39 PM1/13/11
to

John Nagle

unread,
Jan 14, 2011, 1:57:59 PM1/14/11
to

You''ll probably have to track the available processes yourself,
starting a new job when there's a process available.

One way to do this is to have a management thread for each
process. Each management thread starts a subprocess, gets
a work item from the priority queue (blocking if necessary),
gives it to the subprocess, waits for the subprocess to
return a result, and goes back to get another work item.

This is straightforward, except for working out a way
to cleanly shut the thing down. One way to do that is
to have a "shutdown" flag visible to all the threads.
That's checked before getting a new task. If it's set,
the thread terminates its subprocess and returns.
Set the terminate flag in a signal handler for control-C.

(I have something that manages multiple processes
using a priority queue, where the queue is implemented
using MySQL. This allows me to put a whole cluster to
work.)

John Nagle

Message has been deleted

Adam Tauno Williams

unread,
Jan 14, 2011, 3:16:48 PM1/14/11
to pytho...@python.org
On Fri, 2011-01-14 at 10:57 -0800, John Nagle wrote:
> On 1/13/2011 9:07 AM, Marco Hornung wrote:
> I want to run several jobs on a server. The jobs are being sent by
> users. However, all jobs have a different priority, and high-priority
> jobs should be processed before any low-priority job gets touched.
> > Currently I just append all incoming jobs to the multiprocessing
> > worker pool as follows: ### initialize worker pool pool =
> > PriorityPool(processes=worker_count) process_handles = []
> > ### distribute function execution over several processes for
> > job_parameter in job_parameter_list: handle =
> > pool.apply_async(process_function, [job_parameter,])
> > process_handles.append(handle)
> > This will only put the jobs in some kind of a list - and execute the
> > jobs in the order they come in. Is it possible to use a priority
> > queue for the process-pool?
> You''ll probably have to track the available processes yourself,
> starting a new job when there's a process available.

Which is exactly what we do in OpenGroupwre Coils' OIE.

There is a process [job] list which is sorted by priority and the next
available process is started when a worker is available. We use
multiprocessing to create a *process*, rather than a thread, for each
job.

> One way to do this is to have a management thread for each
> process. Each management thread starts a subprocess, gets
> a work item from the priority queue (blocking if necessary),
> gives it to the subprocess, waits for the subprocess to
> return a result, and goes back to get another work item.

We have a manager process and an executor process. These communicate
via AMQ, but you could use any mechanism. The manager process controls
the process [job] list. When a process needs to be started a message is
send to the executor which creates a worker process if an opening is
available. Otherwise it messages the manager process to place the
process in a queued state. When a worker process completes it messages
the executor which in turn messages the manager that a process slot may
be available; then the manager looks up the next available process and
messages the executor to start it - provided a worker slot is still
available the executor will start the worker.... [otherwise the process
will go back into a queued state].

> This is straightforward, except for working out a way
> to cleanly shut the thing down. One way to do that is
> to have a "shutdown" flag visible to all the threads.

Using a message bus helps a lot, and with multiprocessing you just do a
join/isalive to make sure a worker is still working.

0 new messages