Issues with non-blocking (asynchronous) communication (Please help ASAP)

189 views
Skip to first unread message

Mahdi Morafah

unread,
Jun 25, 2020, 12:59:15 PM6/25/20
to mpi4py
Hello,

We are working on a project and need to implement a neural network optimization task on several nodes with asynchronous communications. Moreover, there are a couple of nodes (agents) which train a neural network and need to send/receive gradients of the weights to/from a couple of other agents. We are using mpi4py for the asynchronous communication and Tensorflow for training and taking the gradients of the neural network.
Here is what our code looks like:

‘’
Doing some process
‘’

for neighbor in self.Nin:
         
           
if self.COMM.Iprobe(source= neighbor+1, tag= (self.n_variables) + self.shift_in[neighbor]**(self.n_variables) + 2):
                VV
= []
                rrho
= []
               
for i in range(self.n_variables):
                    data1
= np.zeros(self.Z[i].shape , dtype='float32') + 1e-12
                   
self.COMM.Irecv([data1, MPI.FLOAT], source= neighbor+1, tag= ((i+1) + self.shift_in[neighbor]**(i+1) + 1)).wait()
                    VV
.append(tf.Variable(data1))
                   
                   
del data1
                   
                    data2
= np.zeros(self.Z[i].shape, dtype='float32') + 1e-12
                   
self.COMM.Irecv([data2, MPI.FLOAT], source= neighbor+1, tag= (i+1) + self.shift_in[neighbor]**(i+1) + 2).wait()
                    rrho
.append(tf.Variable(data2))
           
                   
del data2
           
           
‘’
           
Doing some other process
       
‘’
                   
           
else:
             
‘’
       
Doing some process
         
‘’

          ‘’
       
Doing some process
         
‘’

for neighbor in self.Nout:
   
for i in range(self.n_variables):
        data1
= np.array([…])
        data2
= np.array([…])
                   
       
self.COMM.Isend([data1, MPI.FLOAT], dest= neighbor+1, tag= ((i+1) + self.shift_out[neighbor]**(i+1) + 1))
       
self.COMM.Isend([data2, MPI.FLOAT], dest= neighbor+1, tag= ((i+1) + self.shift_out[neighbor]**(i+1) + 2))



However, after running this code I get into some errors which sounds like memory issues.

When I run this code on one of my system using (mpiexec -np 5 python main.py), I get the error

[acesgpu-SYS-7048GR-TR:42345] Read -1, expected 18446744073709547520, errno = 22

But my code keeps runnig, and after a while my run pauses.

When I run this code on my other system using (mpiexec -np 5 python main.py), at first I get a warning

A process has executed an operation involving a call to the
"fork()" system call to create a child process.  Open MPI is currently
operating
in a condition that could result in memory corruption or
other system errors
; your job may hang, crash, or produce silent
data corruption
.  The use of fork() (or system() or other calls that
create child processes
) is strongly discouraged.

The process that invoked fork was:

 
Local host:          [[44768,1],2] (PID 200246)

If you are *absolutely sure* that your application will successfully
and correctly survive a call to fork(), you may disable this warning
by setting the mpi_warn_on_fork MCA parameter to 0.


And then the code runs, but after a while a I get an error from Tensorflow package when it tries to assign a value to a variable, which the error seems to be due to lack of sufficient memory.

I have gotten stuck here to find why such things happens. To me I think that I have implemented the mpi4py (asynchronous communication) right, and also the Tensorflow (training of the neural network) is %100 correct. But, I cannot understand why I get these errors. Could you please help me? I really need to debug this code as soon as possible. 


Thanks :))
               

Lisandro Dalcin

unread,
Jun 26, 2020, 4:39:09 AM6/26/20
to mpi...@googlegroups.com
There are a few things in your code that do not look right to me. But heck, what you sent is like pseudocode, impossible to tell whether the errors come from the transcription or the original code.

1.- You for loop with the Iprobe() calls look suspicious. I cannot make full sense of it, because I do not know the details. I also have no idea how Nin Nout shift_in/out are filled, so I cannot assert if your code is correct.

2.- You do COMM.Irecv(...).wait(). But a) if you use uppercase Irecv(), you should use uppercase Wait() on the returned request, and b) Irecv(...).Wait() is simply equivalent to blocking Recv(), so your code just complicates things for zero return.

4.- The error about fork() is something that it is not mpi4py's fault. You are using some other package or library that ends up with a fork() system call. Some MPI implementations do not like/play well with that.

My best guess so far is that you have some logic error in the communication calls that may lead to deadlock. I'm quite good at spotting them, but it is impossible with what I have at hand. If you are in a hurry to get help, next time you should really send a minimal working example that reproduces the problem and allows people to reason about it.


--
You received this message because you are subscribed to the Google Groups "mpi4py" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mpi4py+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/mpi4py/ac63e4bb-d2b3-49a1-b179-57777cda1036o%40googlegroups.com.


--
Lisandro Dalcin
============
Research Scientist
Extreme Computing Research Center (ECRC)
King Abdullah University of Science and Technology (KAUST)
http://ecrc.kaust.edu.sa/
Message has been deleted

Mahdi Morafah

unread,
Jun 26, 2020, 8:18:58 PM6/26/20
to mpi4py

Hi Lisandro,

Thank you very much for your response. Well, the way the code is written, we have a class that just creates a neural network model and by calling its update_step method it just does the batch gradients and sends and receives the gradients from the neighbors. In the main.py we just have a for loop for calling the update_step of the class. The code that I sent here is from exactly how the update_step does and cause the issues. Here is a working example of the code:

COMM = MPI.COMM_WORLD
SIZE
= COMM.Get_size()
RANK
= COMM.Get_rank()


name
= RANK
Nout = [1, 3, 5]
Nin = [2, 0, 4]
n_variables
= 6
shift_out
= {i:(i+1) for i in Nout}
shift_in
= {i:(name+1) for i in Nin}

trainable_variables
= [tf.Variable(
np.zeros((3, 3, 1, 32), dtype='float32')), tf.Variable(np.zeros((32,), dtype='float32')),
tf
.Variable(np
.zeros(
(21632, 128), dtype='float32')), tf.Variable(np.zeros((128,), dtype='float32')), tf.Variable(np.zeros((128, 10), dtype='float32')), tf.Variable(np.zeros((10,), dtype='float32')) ]

send_buff_rho = {j:[[np.zeros(
trainable_variables[i].shape , dtype='float32') + 1e-15 for i in range(n_variables)] for _ in range(30)] for j in Nout}

send_buff_V = {j:[[np.zeros(
trainable_variables[i].shape, dtype='float32') + 1e-15 for i in range(n_variables)] for _ in range(30)] for j in Nout}

send_ind = 0

‘’

Doing some process
‘’

for neighbor in Nin:
         
           
if COMM.Iprobe(source= neighbor, tag= (n_variables) + shift_in[neighbor]**(n_variables) + 2):
                VV
= []
                rrho
= []
               
for i in range(n_variables):
                    data1
= np.zeros(trainable_variables[i].shape , dtype='float32') + 1e-12
                   
COMM.Irecv([data1, MPI.FLOAT], source= neighbor, tag= ((i+1) + shift_in[neighbor]**(i+1) + 1)).Wait()
                    VV
.append(tf.Variable(data1))
                   
                   
del data1
                   
                    data2 = np.zeros(trainable_variables[i].shape, dtype='float32') + 1e-12
                   
COMM.Irecv([data2, MPI.FLOAT], source= neighbor, tag= (i+1) + shift_in[neighbor]**(i+1) + 2).Wait()
                    rrho
.append(tf.Variable(data2))

           
                   
del data2
           
           
‘’
           
Doing some other process
       
‘’
                   
           
else:
             
‘’
       
Doing some process
         
‘’

          ‘’
       
Doing some process
         
‘’

for neighbor in Nout:
    temp1 = []
    temp2 = []
   
for i in range(n_variables):
        data1
= np.random.randn(trainable_variables[i].shape, dtype= 'float32')
        data2
= np.random.randn(trainable_variables[i].shape, dtype= 'float32')
        temp1.append(data1)
        temp2.append(data2)

        if np.mod(send_ind, 30) == 0.0:
             send_ind = 0
                   
        send_buff_V[neighbor][self.send_ind] = temp1
        send_buff_rho[neighbor][self.send_ind] = temp2
               
        del temp1
        del temp2
        del data1
        del data2

                   
       
COMM.Isend([send_buff_V[neighbor][-1][i], MPI.FLOAT], dest= neighbor, tag= ((i+1) + shift_out[neighbor]**(i+1) + 1))
       
COMM.Isend([send_buff_rho[neighbor][-1][i], MPI.FLOAT], dest= neighbor, tag= ((i+1) + shift_out[neighbor]**(i+1) + 2))



I have provided as much as possible information and any other necessary things in the above code. The above code is exactly how the code that I am using looks like.

I use the send_buff_V and send_buff_rho to just store the buffer and avoid being it overwritten or removed while the message has not been received. I also use the Iprobe to check if the message has been arrived so I am goint to take it by Irecv. I know that Irecv.Wait is the same as Recv and just used Irecv.Wait since the message has been arrived if Iprobe is True and there would be no waiting.

I also like to note that I am going to try to vectorize of the variables needed to be sent and instead of using for loop for send/recv there would be just one message, but not sure if i am going to get into bandwidth issues or not, since the length of the message would become huge (especially when we're going to use deeper neural networks). This is pretty much of all information I think. Please let me know if you wanted more information. My question or issue is to make sure that the asynchronous communication has been implemented right, and how to avoid the memory issues in this code.

I also do not understand this error:

[acesgpu-SYS-7048GR-TR:42345] Read -1, expected 18446744073709547520, errno = 22

I am also not sure of the fork warning whether to ignore it or it is going to be troublesome.

I really appreciate your help and time. Thank you very much

Thank you very much
To unsubscribe from this group and stop receiving emails from it, send an email to mpi...@googlegroups.com.

Lisandro Dalcin

unread,
Jun 27, 2020, 5:14:21 AM6/27/20
to mpi...@googlegroups.com

Your code is not a minimal working example, it seems to use TensorFlow (is that what `tf` is in there?). I do not have TensorFlow installed.

1.- Send an example that uses only mpi4py and numpy.
2.- Do not paste the code in the email body. Put it in a main.py script and attach it. If you paste, things do not go well, I get some weird quotation marks, if I copy and paste back, then the code does not run (indentation errors).
3.- Make sure that the code in main.py is complete and self-contained. The code you sent was missing import statements. Maybe these are trivial imports, but why should I waste time figuring them out?
4.- PLEASE RUN YOUR CODE before sending it, so that you are sure the code runs with no import/syntax error and fails as expected.
5.- Remember to confirm how many MPI processes are you using to run the code.

Please understand that here we provide help for free and in good will. Do not make the life of those trying to help miserable by forcing them to guess missing information and fight with trivial bugs (missing import statements, weird quotation marks in the code, using heavy dependencies like TensorFlow when it is obviously not needed, indentation errors, etc.). If you do not follow these rules, the outcome is slower resolution of your issue.

All that being said, I still think that you code has some logic bugs that lead to deadlocks.

I said it before and I'm going to say it again. Irecv().Wait() is the same as Recv(), things will work the same, and there is no "eliminate waiting" argument to make.

How much experience do you have with MPI? Have you used it before? Have you ever coded something similar to what you are trying to do now? The code, as written the last time, seems to have send-to-self calls. Is that intended?

You are using Isend() and discarding the result Request objects. You should collect them all in a list and eventually call MPI.Request.Waitall(list_of_requests)

Mahdi Morafah

unread,
Jun 28, 2020, 1:38:09 AM6/28/20
to mpi4py
Hi Lisandro,

I am very thankful for your time and help. And My apologies for the code and any inconvenience. I have written an executable code using mpi4py, numpy, time. The code has been attached on this message (main.py). I am using 6 processors for the code (mpiexec -np 6 python main.py) 

Well, my experience using MPI is not too much and previously I was working more with the blocking communications.

The main point we're trying to implement is the asynchronous sending and receiving. Each processors need to send messages and continue doing their work. And each processors should receive the messages from the other specified processors once it's available and delivered (if the message has not been delivered yet, it should not wait for the message to become available). [Using MPI.Request.Waitall(list_of_requests) pauses the processor until the communication is done and we don't want to do that]. [and send-to-self calls is not intended] 

We want to make sure that this code that I have attached on this message clearly does that, and there is no issues with it.

We also want to use this code with more processors, and more send/recv messages. Hence, we also want to make sure that we don't get into memory or bandwidth issues.

As for now, we have the issues that I stated before on this chain of messages, and I really appreciate your response, help, and time.

Thank you very much for your time, and help
main.py

Lisandro Dalcin

unread,
Jun 29, 2020, 7:01:27 AM6/29/20
to mpi...@googlegroups.com
On Sun, 28 Jun 2020 at 08:38, Mahdi Morafah <morafa...@gmail.com> wrote:
Hi Lisandro,

I am very thankful for your time and help. And My apologies for the code and any inconvenience. I have written an executable code using mpi4py, numpy, time. The code has been attached on this message (main.py). I am using 6 processors for the code (mpiexec -np 6 python main.py) 


Your script requires so much memory to run that it almost hung my tiny desktop computer with 16GB RAM. The mpiexec run was eventually terminated, but at that time most of my GNOME Shell extensions crashed (most likely because of out of memory), and I had to reboot. Buddy, with all my respect and love: apologies are not enough, you have to be more careful in the future, otherwise you will piss people off. A minimal working example/reproducer should also attempt to use little resources and run quickly, you cannot expect the people helping you to have a server-grade computer to debug a piece of code.

So, again, I was not able to run your script. BTW, beware that MPI is limited to message sizes of at most 2GB entries, otherwise you need to implement some special things (google for Jeff Hammond's BigMPI, these tricks are easy to implement with mpi4py).

 
Well, my experience using MPI is not too much and previously I was working more with the blocking communications.


Well, I still cannot make sense of your code. I'm not sure what you are trying to do, and the code does not look correct to me. But because I cannot manage to run it, I cannot tell where the problem is. Maybe you can send the equivalent code that uses blocking Send()/Recv(), I assume you have a working version using blocking communication. But please, please, I have to stress it again: self-contained code, and communicating small arrays, so that it can run on a low-powered laptop.

The main point we're trying to implement is the asynchronous sending and receiving. Each processors need to send messages and continue doing their work. And each processors should receive the messages from the other specified processors once it's available and delivered (if the message has not been delivered yet, it should not wait for the message to become available). [Using MPI.Request.Waitall(list_of_requests) pauses the processor until the communication is done and we don't want to do that]. [and send-to-self calls is not intended] 

If you use Isend(), you need to store the return Request object and eventually Test()/Wait()/Waitall()/etc for it, otherwise your program is not a correct MPI program. Sorry, but you cannot bend MPI to your will, you have to follow the rules, waiting for initiated requests is MANDATORY in MPI. A good MPI program overlaps computation and communication such that by the time Wait() is called, the operation completed (or almost completed), and the Wait() time is almost zero or very small, but a good (and thus correct) MPI program DO HAVE these Wait() calls.

Mahdi Morafah

unread,
Jun 30, 2020, 1:44:28 AM6/30/20
to mpi4py
Thanks again for your time, and help.

I have modified the code, and reduced the number of messages to two and numpy arrays to very small sizes. I have also reduced the number of processors to 4. The code has been attached on this post (main.py) and it uses 4 processors. I hope that there would be no other issues with running the code on a low-powered laptop.

The algorithm that we're trying to implement can be best described as below:

each processors do the below operations:

for i=1:100
   
do some work

   
if message from other processor is available:
         receive message
   
else:
       
do some work

   
do some work

    sending message to other processors  

The above code is the algorithm that each processor should do. Each processor send messages to some other processors and receive messages from other processors which are not necessarily the same as send-to-processors. For each processor, the send-to-processors (OUT Neighbor) and the receive-from-processors (IN Neighbor) are not necessarily the same. This algorithm is for directed graphs where the N_in and N_out is not the same for each node.

The sending and receiving should be implemented asynchronous. This means that the algorithm should not stop at the sending part for the other processors to receive the messages, and it should continue the algorithm. Also, the messages should be read only when the messages has been delivered and are available.

This is what we're trying to implement. I have added MPI.Request.Waitall(list_of_requests) at the end of sending part in the new code, but not sure if this is going to effect the asynchronous setup that I explained above.

Thanks again for your time, and kindness
main.py

Lisandro Dalcin

unread,
Jun 30, 2020, 5:51:22 AM6/30/20
to mpi...@googlegroups.com
OK, now your script is free of trivial issues, but it deadlocks. This means that there is some logic bug, your program is not a correct MPI program. And I cannot easily advice on how to do thing, because I do not know the details of your application, 
nor I have the time to learn all these details to assist you. I insist that your algorithm looks weird. Maybe you think MPI works in a way it actually does not. Did you try to make your algorithm work with just two processes?

You said before that you have a version of this algorithm using BLOCKING communication. Was that version working without any issue? You did not send that code. That would be the right starting point. A simple example using just a couple of MPI processes and using BLOCKING Send()/Recv() calls. From there, I may be able to provide suggestions about how to transform it to use Isend()/Irecv()/Wait() and maybe Iprobe(). Otherwise, I'm sorry, but I don't know how to help you further.

PS: The primary purpose of this group is to ask/answer about issues related to mpi4py as a software library, not to discuss/teach people the basics about how to code proper MPI programs. We do it anyway, but there is a limit on how much I can personally help. Take into account that I'm the main developer and lone maintainer of mpi4py, and I do all that for free using time I should rather spend having some rest. Other users in this group may be able to assist you further if I run out of spare cycles.


--
You received this message because you are subscribed to the Google Groups "mpi4py" group.
To unsubscribe from this group and stop receiving emails from it, send an email to mpi4py+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/mpi4py/340546c2-1f8c-4453-bb4a-8ffd9ce59acbo%40googlegroups.com.

Mahdi Morafah

unread,
Jul 2, 2020, 1:47:38 PM7/2/20
to mpi4py
Hi Lisandro,

Sorry for the delay in response, I did not have the synchronous (blocking) version of the algorithm we wanted to implement so needed a bit time to think about it and make it work. I have attached two code on this post. Both of them use the blocking version of our algorithm; the first one uses Recv then wait and Send (main1.py), the other uses Send and Recv at the end of the algorithm (main2.py). Both of them are ok, either one that can be converted to the asynchronous version easier and more robust is good for our purpose. I really appreciate your help, and time.

Also, I tried the two processors of the implementation and it worked without the strange warnings or errors.

Again, thanks a mile for your help and time.
To unsubscribe from this group and stop receiving emails from it, send an email to mpi...@googlegroups.com.
main1.py
main2.py
Reply all
Reply to author
Forward
0 new messages