Using MPI_COMM_CONNECT and MPI_COMM_ACCEPT for intercommunication.

296 views
Skip to first unread message

Madison Stemm

unread,
Feb 19, 2014, 2:07:54 PM2/19/14
to mpi...@googlegroups.com

Hi everyone!

I have an issue I'm tasked with solving, involving intercommunication between two processes. Given a situation where master process A spawns a set of worker processes B, who each spawn their own unique worker process C, who then spawn there own worker process D, how can I open a communicator between D to A?

I'm trying to create a loop, using mpi4py, between several pieces of code that were written separately from one another while minimizing modifications to the codes. So, the general framework of the MPI code is going to be:

  1. Master A (one process) spawns 8 processes of worker B, and scatters an array to them.
  2. Each B process spawns a worker C, does some manipulation to the array, and broadcasts it to their own worker. C similarly spawns a worker D and later sends the array to it.
  3. Each worker D manipulates the array in their own way, and then (ideally) master A gathers an array back from each of C's arrays.

The only way I seem to understand doing this is to have each of C's processes call MPI_COMM_CONNECT, and then A must open a new communicator between them with a responding MPI_COMM_ACCEPT for each. The problem is, I can't find any documentation on how to use MPI_COMM_CONNECT or MPI_COMM_ACCEPT.

What is the proper syntax for these calls in mpi4py? And, if there's a better way to do this without the disorderly CONNECT and ACCEPT calls, how would I best achieve this kind of intercommunication?

Thank you so much for any help you can provide!
~Maddie

Addendum: I initially wrote the code with master A as a central hub, spawning B, C, and D. Information would be passed from A to B, back to A, then to C, back to A, then to D, then back to A, and it would repeat. A->B->A->C->A->D->A->B->A etc. I'm trying to avoid doing this, to improve performance. If the performance gain from the new structure wouldn't be very much, please let me know!

For reference, here's my original skeleton mockup I wrote, using A as the master hub.

#!/usr/bin/env python
from mpi4py import MPI
import numpy as np
import sys

#####
# A #
#####

# Create nprocs number of processes of eaach worker script, spawn communicators,
#  and send them the number of processes
nprocs
= 3
print("Spawning {0} processes.".format(nprocs))

comm1
= MPI.COMM_SELF.Spawn(sys.executable, args=["incon.py"],   maxprocs=nprocs)
comm2
= MPI.COMM_SELF.Spawn(sys.executable, args=["transit.py"], maxprocs=nprocs)
comm3
= MPI.COMM_SELF.Spawn(sys.executable, args=["outcon.py"],  maxprocs=nprocs)

procarray
= np.ones(nprocs, dtype='i')*nprocs

comm1
.Scatter([procarray, MPI.INT], None, root=MPI.ROOT)
comm2
.Scatter([procarray, MPI.INT], None, root=MPI.ROOT)
comm3
.Scatter([procarray, MPI.INT], None, root=MPI.ROOT)

comm1
.Barrier()
comm2
.Barrier()
comm3
.Barrier()

# Starting condition:
array1
= np.arange(2*nprocs, dtype='i')

print("Before DEMC loop: {0}".format(array1))

# Communication loop between input converter, transit, and output converter
# DEMC is the hub
for x in range(nprocs):

 
# Scatter array1 through input converter communicator
  comm1
.Barrier()
 
print(array1)
  comm1
.Scatter([array1, MPI.INT], None, root=MPI.ROOT)
  array2
= np.zeros(2*nprocs, dtype='i')
  comm1
.Gather(None, array2, root=MPI.ROOT)

 
# Scatter array2 through transit communicator
  comm2
.Barrier()
  comm2
.Scatter([array2, MPI.INT], None, root=MPI.ROOT)
  array3
= np.zeros(2*nprocs, dtype='d')
  comm2
.Gather(None, array3, root=MPI.ROOT)

 
# Scatter array3 through transit communicator
  comm3
.Barrier()
  comm3
.Scatter([array3, MPI.DOUBLE], None, root=MPI.ROOT)
  array4
= np.zeros(2*nprocs, dtype='i')
  comm3
.Gather(None, array4, root=MPI.ROOT)

 
# Reformat array1 to match array4, restart the loop
  array1
= np.array(array4)

 
print("DEMC Loop {0}: {1}".format(x, array1))

# Orders each worker to halt until all have caught up
comm1
.Barrier()
comm2
.Barrier()
comm3
.Barrier()

# Close communicators
comm1
.Disconnect()
comm2
.Disconnect()
comm3
.Disconnect()

from mpi4py import MPI
import numpy as np

#####
# B #
#####

comm1
= MPI.Comm.Get_parent()
rank  
= comm1.Get_rank()
size  
= comm1.Get_size()
name  
= "InputConverter"

print("This is {0}.{1}. There are {2} of us.".format(name, rank, size))

# Get number processes
nprocs
= np.array(0, dtype='i')
comm1
.Scatter(None, nprocs, root=0)
comm1
.Barrier()

for x in range(nprocs):

 
# Pause until DEMC contacts input converter again
  comm1
.Barrier()

 
# Receive array1 from DEMC
  array1
= np.zeros(2, dtype='i')
  comm1
.Scatter(None, array1, root=0)
 
print("{0}.{1} received array1 {2}".format(name, rank, array1))

 
# Send array2 back to DEMC
  array2
= array1 * 3
  comm1
.Gather([array2, MPI.INT], None, root=0)
 
print("{0}.{1} sent off array2 {2}".format(name, rank, array2))

# Close communicators after all workers have caught up
comm1
.Barrier()
comm1
.Disconnect()

from mpi4py import MPI
import numpy as np

#####
# C #
#####

comm2
= MPI.Comm.Get_parent()
rank  
= comm2.Get_rank()
size  
= comm2.Get_size()
name  
= "Transit"

print("This is {0}.{1}. There are {2} of us.".format(name, rank, size))

# Get number processes
nprocs
= np.array(0, dtype='i')
comm2
.Scatter(None, nprocs, root=0)
comm2
.Barrier()

for x in range(nprocs):

 
# Pause until DEMC contacts Transit again
  comm2
.Barrier()

 
# Receive array2 from DEMC
  array2
= np.zeros(2, dtype='i')
  comm2
.Scatter(None, array2, root=0)
 
print("{0}.{1} received array2 {2}".format(name, rank, array2))

 
# Send off array3 to DEMC
  array3
= np.array(array2 / 1.7, dtype='d')
  comm2
.Gather([array3, MPI.INT], None, root=0)
 
print("{0}.{1} sent off array3 {2}".format(name, rank, array3))

# Close communicators after all workers have caught up
comm2
.Barrier()
comm2
.Disconnect()

from mpi4py import MPI
import numpy as np

#####
# D #
#####

comm3
= MPI.Comm.Get_parent()
rank  
= comm3.Get_rank()
size  
= comm3.Get_size()
name  
= "OutputConverter"

print("This is {0}.{1}. There are {2} of us.".format(name, rank, size))

# Get number processes
nprocs
= np.array(0, dtype='i')
comm3
.Scatter(None, nprocs, root=0)
comm3
.Barrier()

for x in range(nprocs):

 
# Pause until DEMC contacts Output Converter again
  comm3
.Barrier()

 
# Receive array3 from DEMC
  array3
= np.zeros(2, dtype='d')
  comm3
.Scatter(None, array3, root=0)
 
print("{0}.{1} received array3 {2}".format(name, rank, array3))

 
# Send off array4 to DEMC
  array4
= np.array(array3, dtype='i')
  comm3
.Gather([array4, MPI.INT], None, root=0)
 
print("{0}.{1} sent off array4 {2}".format(name, rank, array4))

# Close communicators after all workers have caught up
comm3
.Barrier()
comm3
.Disconnect()




Aron Ahmadia

unread,
Feb 19, 2014, 2:20:42 PM2/19/14
to mpi...@googlegroups.com
Hi Maddie,

Welcome!  Can I ask you a few questions about your project?

What code are you trying to connect?  Why do you need to spawn subprocesses?  MPI Spawn isn't well-supported on supercomputers, and it's usually even hard to use on a cluster.  

If a code needs to be "tightly coupled", then it's worth spending some time figuring out if:

a] MPI is really your best option for this part of the work
b] You really need dynamic spawning

Assuming the answer to both of those questions is "Yes!", then you're looking for MPI Intercommunicators.  There are some examples in test/test_cco_obj_inter.py on how to use these.  

Let me know if this helps.

-A

Madison Stemm

unread,
Feb 19, 2014, 2:30:29 PM2/19/14
to mpi...@googlegroups.com, ar...@ahmadia.net
Hi Aron,

Thanks for the response. Excellent questions! I'll be the first to admit that I'm new to MPI, and spawn may not be the best way to approach this. MPI might not be either.

To answer your questions:
I'm trying to connect two main scripts, one written in Python and one written in C. One runs a markov chain Monte Carlo method, and the other runs a simulation. The results of the simulation are fed back to the MC-MC method for further atunement. We've written two further scripts to act as a variable converter between the MC-MC method and the sim. In the skeleton example I provided above, the MC-MC routine is the master A, and the simulation, "Transit", is C.

I used MPI spawn only because it seemed the most convenient way to open a communicator between two processes. Would it be easier/optimised to start all of the processes at launch? ie, with
mpirun -np 1 A.py -np 8 B.py -np 8 C.py -np 8 D.py

MPI is the option my advisor currently wants to use to take advantage of ability to handle multiple processors from workers B, C, and D. But we don't need dynamic spawning, that was just the method I latched onto as MPI documentation has been a little spartan and daunting for a new user.

With that additional information, can you think of a better way to tackle this? In the meantime, I'll check out the intercommunicator example! =)

~Maddie

Aron Ahmadia

unread,
Feb 19, 2014, 2:39:32 PM2/19/14
to mpi...@googlegroups.com
Hi Maddie,

Yes, you should be using Cython or the CFFI to connect your C script to Python :)


See if this tutorial will help you write a Cython extension module:


-A


--
You received this message because you are subscribed to the Google Groups "mpi4py" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mpi4py+un...@googlegroups.com.
To post to this group, send email to mpi...@googlegroups.com.
Visit this group at http://groups.google.com/group/mpi4py.
To view this discussion on the web visit https://groups.google.com/d/msgid/mpi4py/91bbc857-2966-48f7-b13a-6ab03fa4891d%40googlegroups.com.

For more options, visit https://groups.google.com/groups/opt_out.

Madison Stemm

unread,
Feb 19, 2014, 2:42:26 PM2/19/14
to mpi...@googlegroups.com, ar...@ahmadia.net
Hi Aron,

I will be using Cython to connect it to python — it's a useful module! — but that dosn't solve the issue of passing variables between the two codes, nor the parallel processing. The plan was to use Cython to call the C code for worker C, and parallel process it all (and communicate back and forth between the two codes) with MPI.

~Maddie

Aron Ahmadia

unread,
Feb 19, 2014, 2:55:59 PM2/19/14
to mpi...@googlegroups.com

I will be using Cython to connect it to python — it's a useful module! — but that dosn't solve the issue of passing variables between the two codes, nor the parallel processing. The plan was to use Cython to call the C code for worker C, and parallel process it all (and communicate back and forth between the two codes) with MPI.

Yes, I apologize for the lack of documentation :(

Right now, mpi4py is most useful to folks who already know the C MPI interface well, and are using Python to wrap their modules or libraries.  We run a tutorial at supercomputing every year, but it's mostly oriented towards folks who already know MPI, so we don't spend a lot of time teaching it :/

First off, you should avoid using Spawn.  Just use processes that are responsible for the various pieces of your simulation.

This will give you access to a communicator called MPI.COMM_WORLD that includes all the processes you launched at the start.  You can then split off subcommunicators to handle each simulation needed by the MCMC solver.

Does this make sense?

A

Madison Stemm

unread,
Feb 19, 2014, 8:42:33 PM2/19/14
to mpi...@googlegroups.com, ar...@ahmadia.net
That would explain the lack of documentation! I've been mostly getting by with lectures I've found online, mostly powerpoints and some video lectures with demonstrations. But knowing MPI would definitely make this task a lot easier!

So, this is going to be a very ignorant question, but if I don't use Spawn, how do I use MPI.COMM_WORLD and subcommunicators to handle communications? I imagine I need to create groups somehow, right?

~Maddie

Aron Ahmadia

unread,
Feb 19, 2014, 10:29:35 PM2/19/14
to mpi...@googlegroups.com
On Wed, Feb 19, 2014 at 8:42 PM, Madison Stemm <astro...@gmail.com> wrote:
That would explain the lack of documentation! I've been mostly getting by with lectures I've found online, mostly powerpoints and some video lectures with demonstrations. But knowing MPI would definitely make this task a lot easier!

So, this is going to be a very ignorant question, but if I don't use Spawn, how do I use MPI.COMM_WORLD and subcommunicators to handle communications? I imagine I need to create groups somehow, right?

~Maddie

Hi Maddie,

There are two routes you could use.  You could try and build atop a parallel Python process pool (there are many implemented, and I think there might be one that is astronomy-specific).  I'm not really familiar with these tools, though, so I would need to look around to try and connect you.

The other route is to write some MPI code yourself.  MPI is a bit weird to write, since it follows the Single Program Multiple Data paradigm usually.  That is, you write one program, and then it gets executes as multiple processes, then does different things depending on one variable (usually, the process 'rank').  I've put together an example of the sort of code you'd write to sort out your tasks, take a look here:


I'm putting that code in the public domain, so feel free to start from that.   You can try running it with something like:

mpirun -np 4 python group_demo.py

Let me know how it goes.

-A

Madison Stemm

unread,
Feb 20, 2014, 3:25:46 PM2/20/14
to mpi...@googlegroups.com, ar...@ahmadia.net
Hi Aron,

The first option, if you can find anyone to connect me to, would be the absolute best!

I'm definitely not opposed to writing my own MPI code, though, and I read through your example. It does help... however (I'm going to be such a pain here) is there anyway to do somethign like this with MPI that involves multiple programs? A "Multiple Program Multiple Data" paradigm, so to speak?

If I were to execute multiple codes with "mpirun -np 1 demoA.py -np 4 demoB.py", how would I find the rank numbers of demoB's processes to put them into a group, and then open a communicator between demoA and demoB? I ask this because, given the large size and complexity of the codes I'm working with, writing them all into one program would be exceedingly difficult.

Thanks for being so patient and helpful =)

~Maddie

Aron Ahmadia

unread,
Feb 20, 2014, 3:34:00 PM2/20/14
to mpi...@googlegroups.com
Maddie,

It is really okay if this is taking a while to understand :)

On Thu, Feb 20, 2014 at 3:25 PM, Madison Stemm <astro...@gmail.com> wrote:
I'm definitely not opposed to writing my own MPI code, though, and I read through your example. It does help... however (I'm going to be such a pain here) is there anyway to do somethign like this with MPI that involves multiple programs? A "Multiple Program Multiple Data" paradigm, so to speak?


Are you familiar with working with Python modules?  You certainly don't need to put all the source into a single file.  You can also have (to use my example) taskA and taskB launch other programs if needed.
 
If I were to execute multiple codes with "mpirun -np 1 demoA.py -np 4 demoB.py", how would I find the rank numbers of demoB's processes to put them into a group, and then open a communicator between demoA and demoB? I ask this because, given the large size and complexity of the codes I'm working with, writing them all into one program would be exceedingly difficult.

We're effectively doing this by having all the processes divide themselves by rank.  Let's say you start a job with 8 processes.  They will have ids 0-7.  It's up to you to divide that group of processes into how you want them.  In the example, process 0 would be the controller, 1 would be responsible for task A, and 2-7 would be working together on task A.  The communicators were created by the "split" call, where each process chose the communicator it would join by picking a color.  process 0 and 1 ended up in their own communicators, and 2-7 ended up in a third communicator.

A

Reply all
Reply to author
Forward
0 new messages