Queue of multiprocessing doesn't work well with gevent

427 views
Skip to first unread message

Tony Wang

unread,
May 17, 2017, 12:14:02 PM5/17/17
to gevent: coroutine-based Python network library

Hi guys


Have you guys  encountered this case? The tutorial of gevent  said that, multiprocessing.queue will hang when monkey patched, I want to know is there any better solution for this case? Thanks!


Details about question:

 

It's a producer and worker workflow with multiprocessing and gevent. I want to share some data with Queue of multiprocessing between Process. And at the same time, gevent producer and worker get data and put task to the Queue.


task1_producer will generate some data and put them into q1 task1_worker comsumes the data from task q1 and put generated data into q2 and q3.


Then the task2 does.


But question here is that, data has been inserted into q3 and q4, but nothing happened with task2. If add some logs in task2, you will find that, q3 is empty. Why would this happened? What's the best method to share data between process?


Demo code in Stackoverflow
http://stackoverflow.com/questions/44016677/queue-of-multiprocessing-doesnt-work-well-with-gevent

Matt Billenstein

unread,
May 17, 2017, 3:10:01 PM5/17/17
to gev...@googlegroups.com
I wouldn't mix the two -- either single process with gevent or all
multiprocessing. And I think multiprocessing has primitives for
(de)serializing data between processes using pickle.

For true data sharing between disparate processes, an external queue is
probably the best option. redis, rq, nsq, beanstalkd, are simple methods to
implement this.

m

On Wed, May 17, 2017 at 09:09:13AM -0700, Tony Wang wrote:
> Hi guys
>
> Have you guys A encountered this case? The tutorial of geventA A said
> that, multiprocessing.queue will hang when monkey patched, I want to know
> is there any better solution for this case? Thanks!
>
> Details about question:
>
> A
>
> It's a producer and worker workflow with multiprocessing and gevent. I
> want to share some data with Queue of multiprocessing between Process. And
> at the same time, gevent producer and worker get data and put task to the
> Queue.
>
> task1_producer will generate some data and put them into q1 task1_worker
> comsumes the data from task q1 and put generated data into q2 and q3.
>
> Then the task2 does.
>
> But question here is that, data has been inserted into q3 and q4, but
> nothing happened with task2. If add some logs in task2, you will find
> that, q3 is empty. Why would this happened? What's the best method to
> share data between process?
>
> Demo code in Stackoverflow
> http://stackoverflow.com/questions/44016677/queue-of-multiprocessing-doesnt-work-well-with-gevent
>
> --
> You received this message because you are subscribed to the Google Groups
> "gevent: coroutine-based Python network library" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to gevent+un...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.


--
Matt Billenstein
ma...@vazor.com
http://www.vazor.com/

Grady Player

unread,
May 17, 2017, 10:47:15 PM5/17/17
to gev...@googlegroups.com
You could use Giovanna

Grady


Sent from my iPhone

Grady Player

unread,
May 17, 2017, 10:51:05 PM5/17/17
to gev...@googlegroups.com
Sorry for my previous insane reply, I was typing that you could use gipc to send the objects across processes, but they have to essentially be something that can be serialized like a dictionary of kv pairs etc...

Grady


Sent from my iPhone

> On May 17, 2017, at 1:09 PM, Matt Billenstein <ma...@vazor.com> wrote:
>

Tony Wang

unread,
May 18, 2017, 8:24:57 AM5/18/17
to gev...@googlegroups.com
Hi Matt

Thanks for your suggestion. I have used RabbitMQ and Redis, great software for messages. I thought that  some library like gevent and mutilprocessing.Queue could solve it more easier in such "lightweight" case .

Hi Grady

Thanks for your reply. Have you tried gipc in the same case as mine? Curious about is there any disadvantage 

Tony Wang 

Warm Regards

Grady Player

unread,
May 18, 2017, 2:28:59 PM5/18/17
to gev...@googlegroups.com
Yeah I have used it in a manner similar to that, what I have is something like (whitespace mangled to make it look on in email try not to copy paste):

Setup pipes: 

myPipe, yourPipe = msgpipe.CreatePipeEndpoints(msgpipe.T_ANON)
messagePipe = msgpipe.MessagePipe(myPipe)
messagePipe.AddHandlers({'done_obj': done_obj})
helper = gipc.start_process(helper_main, args=(yourPipe))
def main( _mypipe):
    				       messagePipe = msgpipe.MessagePipe(GV.myPipe)
messagePipe.AddHandlers({"add_obj":add_obj})




Main                                  Helper
 obj
  |
 json = obj.json()
  |
 messagePipe.Send("add_obj",json)

                                      add_obj(json):
                                         Q.push(Obj.fromJSON(json))

                                      do_stuff_loop():
                                         obj = Q.pop()
                                         doStuff(obj)
                                         returnPipe.Send("done_obj",obj.json)


on the return I have to figure correlate the returned object, you can do that with overriding __eq__() or iterating a list and checking the variables you care about etc.. 
in my case I have a 3-Tuple of values, that that I care about so I can drop or retry obj etc...

Tony Wang

unread,
May 19, 2017, 10:10:39 AM5/19/17
to gev...@googlegroups.com
Hi Grady, Great! 

I will have a test with gipc . Thanks again for your kind help

Tony Wang

unread,
Jun 26, 2017, 5:20:56 PM6/26/17
to gev...@googlegroups.com
Hi all

In this case, when I changed Queue from multiprocessing to gevent.queue, the script works well . I tested it in python2.7 and python3.6, welcome you guys have more test if it works.

from multiprocessing import Value, Process
from gevent.queue import Queue

Communication between Queue is much more efficient than Redis such third-party message queue.

Comparison:

Queue consuming time 4.76837158203125e-06

redis rpop consuming time  0.046524763107299805

* Redis server runs in the same machine

Reply all
Reply to author
Forward
0 new messages