Usinf RPyC & multiprocessing in a server RPyC service

1,013 views
Skip to first unread message

Michael Mann

unread,
Sep 21, 2015, 8:37:11 AM9/21/15
to rpyc

I love the simplicity of RPyC - fantasic however I have one issue which is preventing me from using it.
Has anyone successfully implemented RPyC using  the multiprocessing module in a service exposed by RPyC.

My requirement is as follows,
I have a server process running on multiple machines, a client process centrally controls the schedule and dependencies of multiple "jobs" which are executed by service the server (a "job" for all intensive purposes is a Linux shell script, windows batch file, python module, Java, etc.) 
As multiple jobs may be executed in parallel on the same server, I had implemented this using the python multiprocessing module such that it would run in a separate thread and I could run multiple "jobs" on the same server at the same time.
I do not know nor am I able to limit the number of "Jobs" to be executed in parallel on the server.

I get the following errors, I will paste my code in a subsequent post if it helps however someone maybe able to advise if this is a known issue, RPyC constraint or someone has a suggested pattern or workaround for this problem.

ERROR ON CLIENT SIDE
...
...

"PicklingError: Can't pickle <type 'thread.lock'>: it's not found as thread.lock"


ERROR ON SERVER SIDE:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Python27\lib\multiprocessing\forking.py", line 381, in main
    self = load(from_parent)
  File "C:\Python27\lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "C:\Python27\lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "C:\Python27\lib\pickle.py", line 880, in load_eof
    raise EOFError
EOFError
Exception AttributeError: "'Connection' object has no attribute '_closed'" in  ignored

Any advice would be greatly appreciated.

Michael Mann

unread,
Sep 21, 2015, 8:43:50 AM9/21/15
to rpyc
Simplified code below...


SERVER CODE:


#!/usr/bin/env python
import os
import util
import job
import multiprocessing
import rpyc

class MyClass(rpyc.Service):
   
""" Manages jobs requested on a Node instance"""

   
def exposed_submitJobObj(self,newJob):
       
"""Create a new Job using the input arguments"""

       
self.addJobToQueue(newJob)


   
def addJobToQueue(self,newJob):
       
"""Adds a new job to the node job execution queue and executes it"""
       
       
#execute the job in a new thread
        myJobWorker
= multiprocessing.Process(target=self.jobWorker, args=(newJob,))
        myJobWorker
.start()


   
def jobWorker(self,currJob):
       
"""Multiprocess method to run job as a parallel process """
        currJob
.execute()


def startNode():


   
from rpyc.utils.server import ThreadedServer
    t
= ThreadedServer(NodeManager, port = 18861, protocol_config = {"allow_public_attrs" : True,"allow_pickle":True})
    t
.start()


if __name__ == '__main__':
    startNode
()



CLIENT CODE:

#!/usr/bin/env python
import os
import job
import util
import rpyc

def testJobWrapper():

    c
= rpyc.connect("localhost", 18861, config = {"allow_public_attrs" : True,"allow_pickle":True})

    winenv
='C:\\mydir\\mysubdir\\test.env'
    winarg
=['param1']
    wincmd
= 'C:\\mydir\\mysubdir\\hellow.bat'
    winJob
= job.Job(wincmd,winarg)
    testJob
= job.JobWrapper()
    testJob
.setJobAttributes(winenv,winJob)
    c
.root.exposed_submitJobObj(testJob)


if __name__ == '__main__':
   
#testJob()
    testJobWrapper
()


Rüdiger Kessel

unread,
Sep 21, 2015, 9:47:48 AM9/21/15
to rp...@googlegroups.com
Hi,

your example suggests that the program hellow.bat is execute on the server.

The filepath of hellow.bat is probably a local path on the server.

You create Job object on the client side and you pass this Job object to the server.

How does the Job object (on the client side) know that the job is intended to be run on the server instead on the client where the job is created?

Why do you create the Job object on the client side where it will never be executed?

My suggestion is to move the job.Job() related code from the client to the server and just pass the three strings to the server.

Then the Job object is created on the server side where it will be executed. The client can still influence the environment, the argument and the cmd.

In case you want that the files for the environment and the commands are actually supplied by the client to the server you can handle this by supplying a callable (callback function) to the server to access local files on the client. (You might want to limit the access to certain files or dirs for security reasons).

Some time ago I wrote a numerical client server tool (Monte Carlo Simualtion) with rpyc where a client can employ a number of servers to do repeated calculations. The code of for the calculations is supplied by the client (using xml). The server communcates the calculation results back to the client using numpy arrays. At the end the system could handle any number of servers with any number of simultanious (different) jobs and clients. By definition one client could execute one job at a time.

I think this pattern was similar to yours and it was rather straight forward to implement it  by using rpyc. In case you use callback from the server to the client you need to make sure that a thread is listening for incomming rpyc request from the server on the client side while the server is working.

Greetings
Rüdiger



--

---
You received this message because you are subscribed to the Google Groups "rpyc" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rpyc+uns...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Michael Mann

unread,
Sep 21, 2015, 10:12:19 AM9/21/15
to rpyc, ruedige...@gmail.com
Thank you for your reply.

Your understanding of my requirement sounds about right. However I had tested that same approach as you suggested (passing arguments strings and lists to the server) and having the server instantiate the class and execute the process. (see code below with both exposed functions) However I was getting the same error on both approaches.

If I understand your question regarding "creating Job object on client and not using it" correctly - the reason I was creating the Job() on the client is only because the details for the Job and output & return code from the execution of the Job() is in the Job() object for handling on the client side, and I do use it in the client side by passing it to the exposed service on the server.  I could approach this differently as you mention by passing in the respective strings and create on the server and retrieve return code and output in a different way, but that doesn't resolve my error.

This is the code with both approaches


SERVER CODE:


import os
import job
import multiprocessing
import rpyc
from rpyc.utils.server import ThreadedServer


######

class MyClass(rpyc.Service):
   
""" Manages jobs requested on a Node instance"""



   
def on_connect(self):
   
# code that runs when a connection is created
   
# (to init the serivce, if needed)
       
pass


   
def on_disconnect(self):
       
# code that runs when the connection has already closed
       
# (to finalize the service, if needed)
       
pass



    def exposed_submitJobObj(self,newJob):
       
"""Create a new Job using the input arguments"""


       
self.addJobToQueue(newJob)


    def exposed_submitJob(self,env,jobType,jobcmd,arglst):

       
"""Create a new Job using the input arguments"""



        newjob
= job.Job(jobcmd,arglst)
        jobwrapper
= job.JobWrapper()
        jobwrapper
.setJobAttributes(env,newjob)
       
self.addJobToQueue(jobwrapper)



   
def addJobToQueue(self,newJob):
       
"""Adds a new job to the node job execution queue and executes it"""


       
#execute the job in a new thread
        myJobWorker
= multiprocessing.Process(target=self.jobWorker, args=(newJob,))
        myJobWorker
.start()


   
def jobWorker(self,currJob):
       
"""Multiprocess method to run job as a parallel process """
        currJob
.execute()




def startNode():

   
#retrieve node name from node configuration file
    appConfig  
= util.config()


    t
= ThreadedServer(MyClass, port = 18861, protocol_config = {"allow_public_attrs" : True,"allow_pickle":True})
    t
.start()




######

if __name__ == '__main__':
    startNode
()



CLIENT CODE:

import os
import job
import rpyc


def testJobWrapper():


    c
= rpyc.connect("localhost", 18861, config = {"allow_public_attrs" : True,"allow_pickle":True})

    c
.root.exposed_test()


    winenv
='C:\\py_sandbox\\schedulex_scripts\\test.env'
    wintype
='Job'
    winarg
=['param1']
    wincmd
= 'C:\\py_sandbox\\schedulex_scripts\\hellow.bat'
   
   
# ---->APPROACH 1 - using job()) <----

    winJob
= job.Job(wincmd,winarg)
    testJob
= job.JobWrapper()

    testJob
.setJobAttributes('C:\\py_sandbox\\schedulex_scripts\\test.env',winJob)
    c
.root.exposed_submitJobObj(testJob)
   
   
#---->APPROACH 2<------
    c
.root.exposed_submitJob(winenv,wintype,wincmd,winarg)


if __name__ == '__main__':
   
    testJobWrapper
()




FYI: This is Python 2.7 running on Windows,

Thanks

Michael Mann

unread,
Sep 21, 2015, 10:23:53 AM9/21/15
to rpyc, ruedige...@gmail.com
I should add that I believe this is directly related to using multiprocessing module in server because I do not get an error when I execute the job directly (not using multiprocessing.process()) in function exposed_submitJobObj(self,newJob)
using newJob.execute() as opposed to calling addJobToQueue(self,newJob)


Rüdiger Kessel

unread,
Sep 21, 2015, 1:48:05 PM9/21/15
to rp...@googlegroups.com
I think the problem is not rpyc related. It seems that in the traceback no rpyc lib is envolved.

You should test to call exposed_submitJob() directly on the server without using rpyc. Probably the problem will show up as well.

If you create the Job object on the client side and you pass the object to the server then the server just gets a reference (called netref if I remember correctly) to the object. If you now call currJob.execute() on the server then the server gets a reference to the method execute() in the job objects and executes this method. Now, where does the object exist? It is on the client side. So where is the execute-method? It is also on the client side. So you end up running the job on the client PC that way. Why do you use the whole server hokus pokus to end up running a local job?
If you want something to be executed on the server you need to make sure that it exists as a callable on the server (and not just a netref to the client) and that the server has all resources including file acces to run it.
One way to achieve this is to create the job object on the server. There might also be a way of copying byte code but I do not know how to achieve this.
You can pass the object (as netref) back to the client and the client can access the object via netref as if it would be existing locally.

So rpyc allows to create objects anwhere and to use them anywhere. But the objects stay where they were created and execution is always happening where the objects were created. (The same is true for the object data by the way). rpyc is doing a very good job to hide the details and you can use remote objects as if they would be existing locally.

So the usual problem is that the client knows what the server should do including some resources. The server has the computing power and other required resources. Now one needs to create a callable on the server side based on the specification from the client and execute it on the server. During the execution the client might supply some local resources (like files) to support the server. At the end some results and some status informations should be available on the client side.

rpyc can organize the communication between client and server and it can zerodeploy (http://rpyc.readthedocs.org/en/latest/docs/zerodeploy.html) rpyc. You might need something similar to deploy the "what to do list (usually called program)" to the server. So you need to write a framework which deploys the "program" to the server where you want the execution to happen. You can use rpyc as a transport mechanism (e.g. remote file access). Then you can use rpyc to start the execution on the server (standard remote call). During execution the server can access all objects, which are not locally available, via netref. At the end the client can access results and data if you pass back a netref to the client. You need to make sure that as long as you want to access stuff via netref a thread handling rpyc requests is running on the server and the client.

In my Monte Carlo tool I only needed the computing power from the server. The programs are numerical calculations (equations) encoded in xml which were parsed on the server side and compiled to a python callable. The client provides an interface object for file access, standard IO and status/control information.

The limitation of this approach is that the "program" is limited to what the parser can handle. But for my tool this limitation existed also for the local simulation tool which existed before. If you use pythen as a parser then you could transport python source code to the server and compile it there which would be more generic. Or you could copy batch files and run them on the server (similar to zerodeploy).

I hope this will help...

Greetings
Rüdiger



.


2015-09-21 16:23 GMT+02:00 Michael Mann <mwm...@gmail.com>:
I should add that I believe this is directly related to using multiprocessing module in server because I do not get an error when I execute the job directly (not using multiprocessing.process()) in function exposed_submitJobObj(self,newJob)
using newJob.execute() as opposed to calling addJobToQueue(self,newJob)


Michael

unread,
Sep 21, 2015, 11:11:23 PM9/21/15
to rp...@googlegroups.com
Great info, and explanation Thank you Rüdiger

Regarding the error, to try narrow down where to focus my effort I have tested the following:

  • Using RPyC and multiprocessing passing Job object from client to server --> FAIL
  • Using RPyC and multiprocessing passing strings and lists from client to server where the server creates Job Object --> FAIL
  • Using RPyC and not using multiprocessing --> PASS
  • Not Using RPyC and using multiprocessing --> PASS

The above behavior lead me to believe that the issue is neither RPyC or Multiprocessing isolation but rather trying to use them together.




--

---
You received this message because you are subscribed to a topic in the Google Groups "rpyc" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rpyc/BWIlXGaqlMk/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rpyc+uns...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Michael Mann

Rüdiger Kessel

unread,
Sep 21, 2015, 11:55:49 PM9/21/15
to rp...@googlegroups.com
The multiprocessing module might rely on the fact that it is called by the main thread which did the initialization. 
But the rpyc server is always using a new thread for every connection.

A solution might be to run the Threaded Server in a new thread and operate a queue with the main thread. submitJob() will simply stuff the parameter in the queue. The mean thread fetches the stuff creates the object and calls execute which will spawn a new process.

 

...


Reply all
Reply to author
Forward
0 new messages