Spawning too many processes hangs

328 views
Skip to first unread message

jmla...@gmail.com

unread,
Mar 5, 2014, 4:23:30 PM3/5/14
to mpi...@googlegroups.com
Hello, I am trying to implement a manager/worker implementation of mpi4py where the worker executes a large simulation (which may or may not involve MPI). The "simulation" is called function.py and currently just squares the number given to it. 

Below is a small manager/worker that hangs on my machine, right before the before the last MPI.COMM_SELF.Spawn. I have an 8 core machine and I use the following call....

mpiexec -n 2 python manager_calling_workers.py

I believe the code is well documented. Oddly, if I change the initial call from 
results = main(range(1,9),spawn_function_evaluation_using_MPI)
to 
results = main(range(1,8),spawn_function_evaluation_using_MPI)

I receive no hang. Thank you very much for your time.



---------------------- Begin function.py -----------------------------
from mpi4py import MPI
import sys
import numpy
    
comm_to_function_wrapper = MPI.Comm.Get_parent()
rank = comm_to_function_wrapper.Get_rank()
size = comm_to_function_wrapper.Get_size()
        
# Initialize and receive the starting point 
x = numpy.array(0.0)
comm_to_function_wrapper.Bcast(x,root=0) 
    
local_y = (x**2)/size 
        
comm_to_function_wrapper.Reduce(local_y, None, op=MPI.SUM, root=0) # Give the value back 
comm_to_function_wrapper.Disconnect()
----------------------- End function.py ------------------------------




---------------------- Begin manager_calling_workers.py --------------------
from mpi4py import MPI
import time
import numpy as np
import sys

WORKTAG = 0
DIETAG = 1

class Work():
    def __init__(self, work_items):
        self.work_items = work_items[:]

    def get_next_item(self):
        if len(self.work_items) == 0:
            return None
        return self.work_items.pop()

def manager(wi):
    all_data = []
    size = MPI.COMM_WORLD.Get_size()
    current_work = Work(wi)
    comm = MPI.COMM_WORLD
    status = MPI.Status()

    # Send a task to each worker (provided there are enough tasks)
    print "\n\n\n" + "-"*80 + "\n starting initial allocation", "results %r" % all_data, "work items %r" % current_work.work_items
    for i in range(1, size):
        anext = current_work.get_next_item()
        if not anext: break
        comm.send(obj=anext, dest=i, tag=WORKTAG)

    # Keep appending received data to all_data and sending next tasks as long as they exist
    print "\n" + "-"*80 + "\n starting while loop", "results %r" % all_data, "work items %r" % current_work.work_items
    while 1:
        anext = current_work.get_next_item()
        if not anext: break
        data = comm.recv(obj=None, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status)
        all_data.append(data)
        print "results %r" % all_data, "work items %r " % current_work.work_items
        comm.send(obj=anext, dest=status.Get_source(), tag=WORKTAG)
    print "ending while loop"

    # Receive from all processes. 
    print all_data, "\n" + "-"*80 + "\n starting cleanup for-loop"
    for i in range(1,size):
        data = comm.recv(obj=None, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG)
        all_data.append(data)
        print "results %r" % all_data
    print "ending cleanup for-loop"

    # Send a kill signal to each rank
    for i in range(1,size):
        comm.send(obj=None, dest=i, tag=DIETAG)

    return all_data

def worker(do_work):
    comm = MPI.COMM_WORLD
    status = MPI.Status()

    # Receive command do_work from manager and data. Kill when DIETAG is passed
    while 1:
        data = comm.recv(obj=None, source=0, tag=MPI.ANY_TAG, status=status)
        print "Worker: %d; Received %r" % (comm.Get_rank(),data)
        if status.Get_tag(): break
        print "Worker: %d; Starting to work on %r" % (comm.Get_rank(),data)
        k = do_work(data)
        print "Worker: %d; Finished work on %r; Now sending" % (comm.Get_rank(),data)
        comm.send(obj=k, dest=0)
        print "Worker: %d; Finished work on %r; finished sending" % (comm.Get_rank(),data)

def main(work_list, do_work):
    rank = MPI.COMM_WORLD.Get_rank()
    size = MPI.COMM_WORLD.Get_size()

    if rank == 0:
        all_dat = manager(work_list)
        return all_dat
    else:
        worker(do_work)

# A do_work function to pass to the workers that uses spawns new MPI ranks
def spawn_function_evaluation_using_MPI(a):
    start_time = time.time()
    x = float(a)

    print "before spawn"
    comm_to_function = MPI.COMM_SELF.Spawn(sys.executable,args=['function.py'],maxprocs=1) # Spawn an instance of function.py 
    print "after spawn"

    comm_to_function.Bcast(np.array([x]),root=MPI.ROOT) # Broadcast the starting point to function.py

    y = np.array(0.0) # Initialize the variable that will store the function evaluation
    comm_to_function.Reduce(None, y, op=MPI.SUM, root=MPI.ROOT) # Sum the output from function.py

    comm_to_function.Disconnect()

    end_time = time.time()

    return y

if __name__ == "__main__":
    results = main(range(1,9),spawn_function_evaluation_using_MPI)
------------------------------- End manager_calling_workers.py -----------------------

Aron Ahmadia

unread,
Mar 5, 2014, 4:33:04 PM3/5/14
to mpi...@googlegroups.com
64 is a pretty common limit for the number of processes on a single compute node, and is a limitation of either the operating system or the MPI implementation.  This isn't a bug in mpi4py, but a limitation you're going to encounter on your system.  You may be able to spawn more using a different MPI implementation, but you should really be reconsidering your approach if it revolves around spawing lots of processes, which MPI is not really well optimized for.

A


--
You received this message because you are subscribed to the Google Groups "mpi4py" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mpi4py+un...@googlegroups.com.
To post to this group, send email to mpi...@googlegroups.com.
Visit this group at http://groups.google.com/group/mpi4py.
To view this discussion on the web visit https://groups.google.com/d/msgid/mpi4py/8238f13c-e7e2-4d48-a7b8-5b32777e2811%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

Message has been deleted

jmla...@gmail.com

unread,
Mar 5, 2014, 5:07:40 PM3/5/14
to mpi...@googlegroups.com, ar...@ahmadia.net
Thank you very much for your response. 

I just want to be clear that I'm not spawning a bunch of processes at the same time. As the code is currently written, I spawn a single function.py process (with only 1 rank) for each integer given to a worker. And my entire task list is only [1,2,3,4,5,6,7,8]. Since I am calling mpiexec with only 2 threads, there is only one worker (and one manager). 

After the worker is done, it then spawns another function.py. Is there a way to "nicely clean up" the no-longer-in-use function.py that was spawned before?

Aron Ahmadia

unread,
Mar 5, 2014, 5:13:35 PM3/5/14
to mpi...@googlegroups.com
Now we're delving into territory I'm not very familiar with.  My understanding is that once you've spawned a new MPI process, there's no responsibility of the MPI implementation to clean it up after you, so there may be a limit to how many processes you can spawn before your implementation bogs down.  

I'll also make this general recommendation.  You should not treat MPI as a way to dynamically spawn and destroy workers.  That's now how the majority of developers how use MPI work with it.  That's not how it's supported.  It's designed for use on large, static partitions of a well-connected high performance computing machine.

If you want dynamic work allocation, start up a pool of workers and distribute work among them.  There are a number of libraries out there for doing this sort of thing.  There are also many non-MPI libraries that will help with this sort of parallelism.

A


Lisandro Dalcin

unread,
Mar 6, 2014, 4:14:38 AM3/6/14
to mpi4py
On 6 March 2014 00:23, <jmla...@gmail.com> wrote:
> Hello, I am trying to implement a manager/worker implementation of mpi4py
> where the worker executes a large simulation (which may or may not involve
> MPI). The "simulation" is called function.py and currently just squares the
> number given to it.
>
> Below is a small manager/worker that hangs on my machine, right before the
> before the last MPI.COMM_SELF.Spawn. I have an 8 core machine and I use the
> following call....
>
> mpiexec -n 2 python manager_calling_workers.py
>
> I believe the code is well documented. Oddly, if I change the initial call
> from
> results = main(range(1,9),spawn_function_evaluation_using_MPI)
> to
> results = main(range(1,8),spawn_function_evaluation_using_MPI)
>
> I receive no hang. Thank you very much for your time.


I've tried your code in my machine, Fedora 20, 8 cores, mpich-3.0.4,
and it works just fine, it does not hang.

PS: Please, next time send your code as attachements.


--
Lisandro Dalcin
---------------
CIMEC (UNL/CONICET)
Predio CONICET-Santa Fe
Colectora RN 168 Km 472, Paraje El Pozo
3000 Santa Fe, Argentina
Tel: +54-342-4511594 (ext 1016)
Tel/Fax: +54-342-4511169

jmla...@gmail.com

unread,
Mar 6, 2014, 10:01:05 AM3/6/14
to mpi...@googlegroups.com
Thank you very much for your response. I thought that I had updated to the latest version of mpiexec but I didn't correctly add it to my path. 

If you want to see the behavior with a for loop spawning tasks, it is below. Notice the big pause before processes 8 and 16. This goes away after pointing to the latest mpiexec version:
--------------- Before ---------------------------
$ mpiexec --version | grep Version
    Version:                                 1.4.1
$ mpiexec -n 1 python for_loop_spawning_tasks.py
  Time since start: 0.023376
  I am rank: 0 out of 1, and I am working on array(1.0)
  Time since start: 0.082610
  I am rank: 0 out of 1, and I am working on array(2.0)
  Time since start: 0.142399
  I am rank: 0 out of 1, and I am working on array(3.0)
  Time since start: 0.202576
  I am rank: 0 out of 1, and I am working on array(4.0)
  Time since start: 0.262836
  I am rank: 0 out of 1, and I am working on array(5.0)
  Time since start: 0.322562
  I am rank: 0 out of 1, and I am working on array(6.0)
  Time since start: 0.382178
  I am rank: 0 out of 1, and I am working on array(7.0)
  Time since start: 15.445694
  I am rank: 0 out of 1, and I am working on array(8.0)
  Time since start: 15.506626
  I am rank: 0 out of 1, and I am working on array(9.0)
  Time since start: 15.568428
  I am rank: 0 out of 1, and I am working on array(10.0)
  Time since start: 15.629240
  I am rank: 0 out of 1, and I am working on array(11.0)
  Time since start: 15.688019
  I am rank: 0 out of 1, and I am working on array(12.0)
  Time since start: 15.749711
  I am rank: 0 out of 1, and I am working on array(13.0)
  Time since start: 15.814348
  I am rank: 0 out of 1, and I am working on array(14.0)
  Time since start: 15.876091
  I am rank: 0 out of 1, and I am working on array(15.0)
  Time since start: 29.941570
  I am rank: 0 out of 1, and I am working on array(16.0)

--------------- After ---------------------------
$ PATH=/home/user/software/mpich-install/bin:$PATH; export PATH
$ mpiexec --version | grep Version
    Version:                                 3.1
$ mpiexec -n 1 python for_loop.py
 Time since start: 0.016905
 I am rank: 0 out of 1, and I am working on array(1.0)
 Time since start: 0.082583
 I am rank: 0 out of 1, and I am working on array(2.0)
 Time since start: 0.147218
 I am rank: 0 out of 1, and I am working on array(3.0)
 Time since start: 0.209403
 I am rank: 0 out of 1, and I am working on array(4.0)
 Time since start: 0.285737
 I am rank: 0 out of 1, and I am working on array(5.0)
 Time since start: 0.354851
 I am rank: 0 out of 1, and I am working on array(6.0)
 Time since start: 0.415769
 I am rank: 0 out of 1, and I am working on array(7.0)
 Time since start: 0.477238
 I am rank: 0 out of 1, and I am working on array(8.0)
 Time since start: 0.538507
 I am rank: 0 out of 1, and I am working on array(9.0)
 Time since start: 0.609821
 I am rank: 0 out of 1, and I am working on array(10.0)
 Time since start: 0.678219
 I am rank: 0 out of 1, and I am working on array(11.0)
 Time since start: 0.739566
 I am rank: 0 out of 1, and I am working on array(12.0)
 Time since start: 0.800696
 I am rank: 0 out of 1, and I am working on array(13.0)
 Time since start: 0.861690
 I am rank: 0 out of 1, and I am working on array(14.0)
 Time since start: 0.930769
 I am rank: 0 out of 1, and I am working on array(15.0)
 Time since start: 0.992717
 I am rank: 0 out of 1, and I am working on array(16.0)



Thank you again very much for your responses

Reply all
Reply to author
Forward
0 new messages