gevent.fork and queue issues (among others)

331 views
Skip to first unread message

ams.fwd

unread,
Sep 4, 2014, 1:35:26 AM9/4/14
to gev...@googlegroups.com
Hi All.

What I am trying to do is run a bunch of processes that run suites of tests and send reports to a web server. The way I do it is:

- instantiate a queue object
- start a pywsgi.WSGIServer instance in the main thread whose app updates the queue object with some data from the requests it gets
-  use gevent.fork to start a bunch of child processes.
- wait for children to exit
- stop server
- tally queue data

When each child is done it sends an http request to the server.

I have run into two problems:

1. While the queue instance is being updated in the app is the same instance as outside the app context, somehow all the data gets lost. Funnily enough if I hit the server using wget the data is maintained.

2. If I start the server after forking (usually I start before) and make the child wait a few seconds so that the server is ready to accept connections, every http request the child makes basically times out.

The code for a reduction case is included below. Any help would be great.

Thanks.

AM

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from gevent import monkey
monkey
.patch_all()

import httplib
import os
import sys

import gevent
from gevent import queue
from gevent.pywsgi import WSGIServer

def get(idx):
    conn
= httplib.HTTPConnection('127.0.0.1', 8000, timeout=2)
    conn
.request('GET', '/1?a=%d' % idx)
    response
= conn.getresponse(buffering=True)
    response
.read()
    conn
.close()

def run_test():
   
print 'Running test'
    gs
= []
   
for i in xrange(100):
        gs
.append(gevent.spawn(get, i))

    gevent
.joinall(gs)
   
# uncomment to test using wget:
   
#for i in `seq 10`; do wget -O- "http://127.0.0.1:8000/?a=$i"; done
   
#gevent.sleep(10)
   
print 'Stopping child'

def cli_main():
    result_queue
= queue.Queue(0)

   
def app(env, start_response):
        result_queue
.put(env['QUERY_STRING'])
       
print id(result_queue), result_queue.qsize()
        start_response
('200 OK', [('Content-Type', 'application/json')])
       
return []

    server
= WSGIServer(('', 8000), app)
   
print 'Starting server on port 8000'

   
#uncomment to test with wget, but comment out the rest until the result print
   
#try:
   
#    server.serve_forever()
   
#except KeyboardInterrupt:
   
#    pass

    server
.start()

    pid
= gevent.fork()
   
if pid == 0:
       
#gevent.sleep(3) # if you want to move server init after the fork
        run_test
()
        sys
.exit(0)

   
print 'Waiting for children to exit'
    status
= os.waitpid(-1, 0)

   
print 'Shutting down server'
    server
.stop()

   
print id(result_queue), result_queue.qsize()
   
for _ in xrange(result_queue.qsize()):
       
print result_queue.get()

if __name__ == '__main__':
    cli_main
()



Damjan

unread,
Sep 4, 2014, 8:17:47 PM9/4/14
to gev...@googlegroups.com
On Thursday, September 4, 2014 7:35:26 AM UTC+2, ams.fwd wrote:
Hi All.

What I am trying to do is run a bunch of processes that run suites of tests and send reports to a web server. The way I do it is:

- instantiate a queue object
- start a pywsgi.WSGIServer instance in the main thread whose app updates the queue object with some data from the requests it gets
-  use gevent.fork to start a bunch of child processes.
- wait for children to exit
- stop server
- tally queue data


I don't think you can use gevent.queue across different processes, and gevent.fork creates a new process.
You should implement some kind of IPC (a pipe or similar) or maybe just use https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue


Liam Slusser

unread,
Sep 4, 2014, 8:33:05 PM9/4/14
to gev...@googlegroups.com
Damjan is correct.  You cannot use a shared queue from forked processes.  I have something similar to what you're trying to do, what I ended up doing is having a thread that managers my queue and then sends things out via pipes to my forked worker processes.

thanks,
liam


--
You received this message because you are subscribed to the Google Groups "gevent: coroutine-based Python network library" group.
To unsubscribe from this group and stop receiving emails from it, send an email to gevent+un...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

AM

unread,
Sep 4, 2014, 8:44:36 PM9/4/14
to gev...@googlegroups.com
So I am not sharing the queue across different processes. The queue is
used by the wsgi server which runs as a greenlet in the main process.
The forked processes merely make GET requests the wsgi server.

The problem is that while making GET requests to the wsgi server from
(say) a bash shell populates the queue, making GET requests to the
server from the forked processes appears to populate the queue but after
the server is shutdown the data is no longer there.

AM
> <mailto:gevent+un...@googlegroups.com>.
> For more options, visit https://groups.google.com/d/optout.
>
>
> --
> You received this message because you are subscribed to the Google
> Groups "gevent: coroutine-based Python network library" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to gevent+un...@googlegroups.com
> <mailto:gevent+un...@googlegroups.com>.

Hugh Cole-Baker

unread,
Sep 5, 2014, 4:05:26 PM9/5/14
to gev...@googlegroups.com
On Thursday, 4 September 2014 06:35:26 UTC+1, ams.fwd wrote:
Hi All.
(...cut...)

I have run into two problems:

1. While the queue instance is being updated in the app is the same instance as outside the app context, somehow all the data gets lost. Funnily enough if I hit the server using wget the data is maintained.

2. If I start the server after forking (usually I start before) and make the child wait a few seconds so that the server is ready to accept connections, every http request the child makes basically times out.


My guess at what's happening in the first case: whenever you fork(), the child gets a copy of the state of the parent. This includes any listening sockets too. So, when you start the WSGI server then fork, there is actually a WSGI server listening on a socket in the child process as well as the parent, as well as a copy of the parent's memory space including a copy of result_queue...

Normally with opening a socket and then forking, both processes would share the listening socket, and incoming connections would be divided between the two processes. However, in the parent you call os.waitpid() after forking. I think this call is blocking the main thread in the parent process, and so blocking all greenlets in the parent process from running. The WSGI server in the parent needs to spawn and run greenlets to accept connections, but it can't. So no connections can be accepted in the parent, and so all the connections from the child actually go to the listening socket / server in the child process. Presumably the copy of result_queue in the child process is filling up. Then when the child process exits, os.waitpid() returns, all the greenlets in the parent process are un-blocked, but no connections ever were accepted by the parent process, and so the queue in the parent is empty.

In the second case, when you start the server in the parent after the fork(), there is no server running in the child and so only the parent is accepting connections on the socket. But when the parent calls os.waitpid(), it blocks the main thread in the parent process, and so no greenlets can run, no connections can be accepted in the parent - and so all the connections from the child timeout.

Dealing with fork(), subprocesses and signals in gevent is tricky. I think the proper non-blocking substitute for os.waitpid() would be to use libev child watchers. I suggest looking at using gipc - http://gehrcke.de/gipc/ - which should simplify all this for you.

Hugh Cole-Baker

unread,
Sep 7, 2014, 5:03:33 PM9/7/14
to gev...@googlegroups.com
On Friday, 5 September 2014 01:44:36 UTC+1, ams.fwd wrote:
So I am not sharing the queue across different processes. The queue is
used by the wsgi server which runs as a greenlet in the main process.
The forked processes merely make GET requests the wsgi server.

The problem is that while making GET requests to the wsgi server from
(say) a bash shell populates the queue, making GET requests to the
server from the forked processes appears to populate the queue but after
the server is shutdown the data is no longer there.

AM


Your problem is that when the parent calls os.waitpid(), that's a blocking call, so it blocks the entire thread and all greenlets that run within it.

So, the WSGI server in the parent can't respond to any requests, so the requests from the child time out (when you initialise the server in the parent after forking).

When you initialise the server before forking, the child gets a copy of the server, and it's the child which is responding to the requests, not the parent.

You need to avoid making blocking calls like waitpid() with gevent. You could try using the 'gipc' package which should handle the tricky parts of forking child processes for you.

AM

unread,
Sep 8, 2014, 2:36:53 AM9/8/14
to gev...@googlegroups.com
On 09/07/2014 02:03 PM, Hugh Cole-Baker wrote:
> On Friday, 5 September 2014 01:44:36 UTC+1, ams.fwd wrote:
>
> So I am not sharing the queue across different processes. The
> queue is
> used by the wsgi server which runs as a greenlet in the main process.
> The forked processes merely make GET requests the wsgi server.
>
> The problem is that while making GET requests to the wsgi server from
> (say) a bash shell populates the queue, making GET requests to the
> server from the forked processes appears to populate the queue but
> after
> the server is shutdown the data is no longer there.
>
> AM
>
>
> Your problem is that when the parent calls os.waitpid(), that's a
> blocking call, so it blocks the entire thread and all greenlets that
> run within it.
>
> So, the WSGI server in the parent can't respond to any requests, so
> the requests from the child time out (when you initialise the server
> in the parent after forking).
>
> When you initialise the server before forking, the child gets a copy
> of the server, and it's the child which is responding to the requests,
> not the parent.
>

Wow! I am not sure how I missed that. Thanks a ton.

I did try out gipc but could not really find examples of launching a
pool of processes instead of launching each one in a loop and keeping
track of all the readers and writers. I will take a look at it again and
see if I can do that easily.

Thanks again.
AM

> You need to avoid making blocking calls like waitpid() with gevent.
> You could try using the 'gipc' package which should handle the tricky
> parts of forking child processes for you.
>
Reply all
Reply to author
Forward
0 new messages