Hi everyone!
I have an issue I'm tasked with solving, involving intercommunication between two processes. Given a situation where master process A spawns a set of worker processes B, who each spawn their own unique worker process C, who then spawn there own worker process D, how can I open a communicator between D to A?
I'm trying to create a loop, using mpi4py, between several pieces of code that were written separately from one another while minimizing modifications to the codes. So, the general framework of the MPI code is going to be:
The only way I seem to understand doing this is to have each of C's processes call MPI_COMM_CONNECT, and then A must open a new communicator between them with a responding MPI_COMM_ACCEPT for each. The problem is, I can't find any documentation on how to use MPI_COMM_CONNECT or MPI_COMM_ACCEPT.
What is the proper syntax for these calls in mpi4py? And, if there's a better way to do this without the disorderly CONNECT and ACCEPT calls, how would I best achieve this kind of intercommunication?
Thank you so much for any help you can provide!
~Maddie
Addendum: I initially wrote the code with master A as a central hub, spawning B, C, and D. Information would be passed from A to B, back to A, then to C, back to A, then to D, then back to A, and it would repeat. A->B->A->C->A->D->A->B->A etc. I'm trying to avoid doing this, to improve performance. If the performance gain from the new structure wouldn't be very much, please let me know!
For reference, here's my original skeleton mockup I wrote, using A as the master hub.
#!/usr/bin/env python
from mpi4py import MPI
import numpy as np
import sys
#####
# A #
#####
# Create nprocs number of processes of eaach worker script, spawn communicators,
# and send them the number of processes
nprocs = 3
print("Spawning {0} processes.".format(nprocs))
comm1 = MPI.COMM_SELF.Spawn(sys.executable, args=["incon.py"], maxprocs=nprocs)
comm2 = MPI.COMM_SELF.Spawn(sys.executable, args=["transit.py"], maxprocs=nprocs)
comm3 = MPI.COMM_SELF.Spawn(sys.executable, args=["outcon.py"], maxprocs=nprocs)
procarray = np.ones(nprocs, dtype='i')*nprocs
comm1.Scatter([procarray, MPI.INT], None, root=MPI.ROOT)
comm2.Scatter([procarray, MPI.INT], None, root=MPI.ROOT)
comm3.Scatter([procarray, MPI.INT], None, root=MPI.ROOT)
comm1.Barrier()
comm2.Barrier()
comm3.Barrier()
# Starting condition:
array1 = np.arange(2*nprocs, dtype='i')
print("Before DEMC loop: {0}".format(array1))
# Communication loop between input converter, transit, and output converter
# DEMC is the hub
for x in range(nprocs):
# Scatter array1 through input converter communicator
comm1.Barrier()
print(array1)
comm1.Scatter([array1, MPI.INT], None, root=MPI.ROOT)
array2 = np.zeros(2*nprocs, dtype='i')
comm1.Gather(None, array2, root=MPI.ROOT)
# Scatter array2 through transit communicator
comm2.Barrier()
comm2.Scatter([array2, MPI.INT], None, root=MPI.ROOT)
array3 = np.zeros(2*nprocs, dtype='d')
comm2.Gather(None, array3, root=MPI.ROOT)
# Scatter array3 through transit communicator
comm3.Barrier()
comm3.Scatter([array3, MPI.DOUBLE], None, root=MPI.ROOT)
array4 = np.zeros(2*nprocs, dtype='i')
comm3.Gather(None, array4, root=MPI.ROOT)
# Reformat array1 to match array4, restart the loop
array1 = np.array(array4)
print("DEMC Loop {0}: {1}".format(x, array1))
# Orders each worker to halt until all have caught up
comm1.Barrier()
comm2.Barrier()
comm3.Barrier()
# Close communicators
comm1.Disconnect()
comm2.Disconnect()
comm3.Disconnect()
from mpi4py import MPI
import numpy as np
#####
# B #
#####
comm1 = MPI.Comm.Get_parent()
rank = comm1.Get_rank()
size = comm1.Get_size()
name = "InputConverter"
print("This is {0}.{1}. There are {2} of us.".format(name, rank, size))
# Get number processes
nprocs = np.array(0, dtype='i')
comm1.Scatter(None, nprocs, root=0)
comm1.Barrier()
for x in range(nprocs):
# Pause until DEMC contacts input converter again
comm1.Barrier()
# Receive array1 from DEMC
array1 = np.zeros(2, dtype='i')
comm1.Scatter(None, array1, root=0)
print("{0}.{1} received array1 {2}".format(name, rank, array1))
# Send array2 back to DEMC
array2 = array1 * 3
comm1.Gather([array2, MPI.INT], None, root=0)
print("{0}.{1} sent off array2 {2}".format(name, rank, array2))
# Close communicators after all workers have caught up
comm1.Barrier()
comm1.Disconnect()
from mpi4py import MPI
import numpy as np
#####
# C #
#####
comm2 = MPI.Comm.Get_parent()
rank = comm2.Get_rank()
size = comm2.Get_size()
name = "Transit"
print("This is {0}.{1}. There are {2} of us.".format(name, rank, size))
# Get number processes
nprocs = np.array(0, dtype='i')
comm2.Scatter(None, nprocs, root=0)
comm2.Barrier()
for x in range(nprocs):
# Pause until DEMC contacts Transit again
comm2.Barrier()
# Receive array2 from DEMC
array2 = np.zeros(2, dtype='i')
comm2.Scatter(None, array2, root=0)
print("{0}.{1} received array2 {2}".format(name, rank, array2))
# Send off array3 to DEMC
array3 = np.array(array2 / 1.7, dtype='d')
comm2.Gather([array3, MPI.INT], None, root=0)
print("{0}.{1} sent off array3 {2}".format(name, rank, array3))
# Close communicators after all workers have caught up
comm2.Barrier()
comm2.Disconnect()
from mpi4py import MPI
import numpy as np
#####
# D #
#####
comm3 = MPI.Comm.Get_parent()
rank = comm3.Get_rank()
size = comm3.Get_size()
name = "OutputConverter"
print("This is {0}.{1}. There are {2} of us.".format(name, rank, size))
# Get number processes
nprocs = np.array(0, dtype='i')
comm3.Scatter(None, nprocs, root=0)
comm3.Barrier()
for x in range(nprocs):
# Pause until DEMC contacts Output Converter again
comm3.Barrier()
# Receive array3 from DEMC
array3 = np.zeros(2, dtype='d')
comm3.Scatter(None, array3, root=0)
print("{0}.{1} received array3 {2}".format(name, rank, array3))
# Send off array4 to DEMC
array4 = np.array(array3, dtype='i')
comm3.Gather([array4, MPI.INT], None, root=0)
print("{0}.{1} sent off array4 {2}".format(name, rank, array4))
# Close communicators after all workers have caught up
comm3.Barrier()
comm3.Disconnect()
mpirun -np 1 A.py -np 8 B.py -np 8 C.py -np 8 D.py--You received this message because you are subscribed to the Google Groups "mpi4py" group.To view this discussion on the web visit https://groups.google.com/d/msgid/mpi4py/91bbc857-2966-48f7-b13a-6ab03fa4891d%40googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to mpi4py+un...@googlegroups.com.
To post to this group, send email to mpi...@googlegroups.com.
Visit this group at http://groups.google.com/group/mpi4py.
I will be using Cython to connect it to python — it's a useful module! — but that dosn't solve the issue of passing variables between the two codes, nor the parallel processing. The plan was to use Cython to call the C code for worker C, and parallel process it all (and communicate back and forth between the two codes) with MPI.
That would explain the lack of documentation! I've been mostly getting by with lectures I've found online, mostly powerpoints and some video lectures with demonstrations. But knowing MPI would definitely make this task a lot easier!
So, this is going to be a very ignorant question, but if I don't use Spawn, how do I use MPI.COMM_WORLD and subcommunicators to handle communications? I imagine I need to create groups somehow, right?
~Maddie
I'm definitely not opposed to writing my own MPI code, though, and I read through your example. It does help... however (I'm going to be such a pain here) is there anyway to do somethign like this with MPI that involves multiple programs? A "Multiple Program Multiple Data" paradigm, so to speak?
If I were to execute multiple codes with "mpirun -np 1 demoA.py -np 4 demoB.py", how would I find the rank numbers of demoB's processes to put them into a group, and then open a communicator between demoA and demoB? I ask this because, given the large size and complexity of the codes I'm working with, writing them all into one program would be exceedingly difficult.