Pulsar, Flask and multithreading

427 views
Skip to first unread message

Sebastian Krause

unread,
Jul 7, 2014, 10:51:05 AM7/7/14
to python...@googlegroups.com
Hello,

I'm very interested in Pulsar because it currently seems to be the only(!) WSGI server which natively supports multiple processes on Windows using a single shared socket; at least with Python 3, but I'm using 3.4 anyway (the Github issue a few days ago was mine). I'm going to use an existing application built on Flask (which according to it documentation it not really built for asynchronous servers: http://flask.pocoo.org/docs/design/#thread-locals). My plan is the following:
  1. Let Pulsar serve all static files directly for best performance.
  2. Route everything else through Flask with wait_for_body_middleware.
The number of workers will be about the same as the number of CPUs since have a some CPU intensive tasks where the GIL is the limitation, but I don't want to waste memory with even more processes. The WSGIHandler I've built looks like this:

wsgi.WsgiHandler([wsgi.MediaRouter('ui/', ui, show_indexes=False),
                  wait_for_body_middleware,
                  app])

During my tests I have found some concerns and problems which I don't really understand yet, so I better ask here if Pulsar is actually the right thing for me.

1. Pulsar's documentation mentions the option thread_workers which is supposed to control the number of threads used for blocking call or CPU intensive tasks. But no matter what I try I can't get Pulsar to actually process more than one request per process at once. I've created a small example program at https://gist.github.com/realpath/1f00083433740b4d684f which simply blocks the call for 0.1 seconds and prints the current thread ID. No matter how high I set the currency level in ApacheBench it always receives exactly 10 requests per seconds which is expected when we sleep 0.1 seconds and serve all requests in sequential order. It also always prints the same thread ID, so my assumption is that actually no multithreading is happening within a process:

<_MainThread(MainThread, started 5928)>
<_MainThread(MainThread, started 5928)>
<_MainThread(MainThread, started 5928)>
<_MainThread(MainThread, started 5928)>
[...]

The performance behavior is the same with workers=0 and workers=1. Do I need any other options or have I simply misunderstood Pulsar's multithreading?

2. Assuming I can get multithreading to work, will Pulsar work fine with Flasks ThreadLocal? We need to guarantee that one thread = one request, a thread can be reused only after a request is done.

3. My last concern is with wait_for_body_middleware. It seems to read the whole response into memory first, but I might send large dynamically generated files directly from Flask as a stream which I can't store in memory. Is there any way I could work around that problem? Maybe by writing my own wait_for_body_middleware?

Sebastian

Luca Sbardella

unread,
Jul 7, 2014, 11:48:23 AM7/7/14
to Sebastian Krause, python...@googlegroups.com
Hi Sebastian,
 

I'm very interested in Pulsar because it currently seems to be the only(!) WSGI server which natively supports multiple processes on Windows using a single shared socket; at least with Python 3, but I'm using 3.4 anyway (the Github issue a few days ago was mine). I'm going to use an existing application built on Flask (which according to it documentation it not really built for asynchronous servers: http://flask.pocoo.org/docs/design/#thread-locals). My plan is the following:
  1. Let Pulsar serve all static files directly for best performance.
  2. Route everything else through Flask with wait_for_body_middleware.
makes sense
 
The number of workers will be about the same as the number of CPUs since have a some CPU intensive tasks where the GIL is the limitation, but I don't want to waste memory with even more processes. The WSGIHandler I've built looks like this:

wsgi.WsgiHandler([wsgi.MediaRouter('ui/', ui, show_indexes=False),
                  wait_for_body_middleware,
                  app])
 
The performance behavior is the same with workers=0 and workers=1. Do I need any other options or have I simply misunderstood Pulsar's multithreading?


To run your application in pulsar executors (thread pools) you need to wrap your wsgi callable with the middleware_in_executor utility:

wsgi.WsgiHandler([wsgi.MediaRouter('ui/', ui, show_indexes=False),
                  wait_for_body_middleware,
                  wsgi.middleware_in_executor(app)])

The documentation is not very clear on this topic but this is how the django integration application is implemented:

2. Assuming I can get multithreading to work, will Pulsar work fine with Flasks ThreadLocal? We need to guarantee that one thread = one request, a thread can be reused only after a request is done.

yes
 

3. My last concern is with wait_for_body_middleware. It seems to read the whole response into memory first, but I might send large dynamically generated files directly from Flask as a stream which I can't store in memory. Is there any way I could work around that problem? Maybe by writing my own wait_for_body_middleware?


If you need to do streaming on a given url, than don't use wait_for_body_middleware, you can use a specialised Router to handle urls for streaming.
The middleware is placed before standard (blocking) wsgi handlers such as django and flask.

Luca

Sebastian Krause

unread,
Jul 8, 2014, 3:50:10 AM7/8/14
to python...@googlegroups.com, dazo...@gmail.com
Hi Luca,

Thanks a lot for the hint, the middleware_in_executor step really did solve all multithreading problems.

 
If you need to do streaming on a given url, than don't use wait_for_body_middleware, you can use a specialised Router to handle urls for streaming.
The middleware is placed before standard (blocking) wsgi handlers such as django and flask.

Unfortunately the generation of the stream depends on a lot of Flask-internal structures (user authentication, Flask-SQLAlchemy etc.), so not routing it through Flask would be really difficult. But once the stream has actually been opened it can be read without any need more for need for Flask (the return object is a werkzeug.wrappers.Reponse object with the property "is_stream" set to True). Would it be possible with a modified middleware that Pulsar gets the stream from Flask, releases the executor thread for the next request, and then serves the stream asynchronously without reading all into memory first (maybe through a generator expression)?

Sebastian

Luca Sbardella

unread,
Jul 8, 2014, 4:50:18 AM7/8/14
to Sebastian Krause, python...@googlegroups.com

Unfortunately the generation of the stream depends on a lot of Flask-internal structures (user authentication, Flask-SQLAlchemy etc.), so not routing it through Flask would be really difficult. But once the stream has actually been opened it can be read without any need more for need for Flask (the return object is a werkzeug.wrappers.Reponse object with the property "is_stream" set to True). Would it be possible with a modified middleware that Pulsar gets the stream from Flask, releases the executor thread for the next request, and then serves the stream asynchronously without reading all into memory first (maybe through a generator expression)?


Yes,
you can do it but you will probably need two types of flask applications, one for streaming and one for standard HTTP.
The streaming_app can be wrapped by a function which does the streaming asynchronously once flask returns the response.
In other words your wsgi handler would look something like

wsgi.WsgiHandler([wsgi.MediaRouter('ui/', ui, show_indexes=False),
                  streaming_middleware(streaming_app),
                  wait_for_body_middleware,
                  wsgi.middleware_in_executor(app)])
                               
You will need to write the streaming_middleware using pulsar async framework.

Sebastian Krause

unread,
Jul 8, 2014, 7:11:58 AM7/8/14
to python...@googlegroups.com, dazo...@gmail.com
Hi Luca,


you can do it but you will probably need two types of flask applications, one for streaming and one for standard HTTP.
The streaming_app can be wrapped by a function which does the streaming asynchronously once flask returns the response.
In other words your wsgi handler would look something like

wsgi.WsgiHandler([wsgi.MediaRouter('ui/', ui, show_indexes=False),
                  streaming_middleware(streaming_app),
                  wait_for_body_middleware,
                  wsgi.middleware_in_executor(app)])
                               
You will need to write the streaming_middleware using pulsar async framework.

My idea was to just use a single app, but check in streaming_middleware what Flask/Werkzeug has actually returned (or hint through some hidden HTTP headers from the Flask app that we have a streaming response). If it's a stream, the streaming_middleware will return some kind of async wrapper for the stream which breaks the middleware chain. If it's a "normal" reponse, streaming_middleware will return None so that wait_for_body_middleware will be executed.

I've tried to debug my way through the middlewares, but I already fail to find out where the response's HTTP body is actually passing through and what kind of object it is. In the function _wait_for_body_middleware() in middleware.py we have the following lines:

stream = environ['wsgi.input']
chunk = stream.read()

I initially thought that "chunk" would now contain the reponse body, but I always only see an empty byte string b''. My mistake is probably that I haven't really understood Python's asyncio and Pulsar's async framework yet. Which component from http://pythonhosted.org/pulsar/tutorials/coroutine.html would be best for wrapping a stream anyway?

Sebastian

Sebastian Krause

unread,
Jul 8, 2014, 7:56:27 AM7/8/14
to python...@googlegroups.com, dazo...@gmail.com
On Tuesday, July 8, 2014 1:11:58 PM UTC+2, Sebastian Krause wrote:
I've tried to debug my way through the middlewares, but I already fail to find out where the response's HTTP body is actually passing through and what kind of object it is. In the function _wait_for_body_middleware() in middleware.py we have the following lines:

stream = environ['wsgi.input']
chunk = stream.read()

I initially thought that "chunk" would now contain the reponse body, but I always only see an empty byte string b''.

I think I just realized a big misunderstanding on my side: wait_for_body_middleware is only for the request body and has nothing to do with the reponse body, right? Then it's obvious that "chunk" was always empty since I've only been testing with GET requests. But where does Pulsar actually read out the reponse body to send it to the client? Request bodies won't be too large, the reponse is where I get the MemoryError once I tried to send really big files.

Sebastian

Luca Sbardella

unread,
Jul 8, 2014, 8:35:04 AM7/8/14
to Sebastian Krause, python...@googlegroups.com

I think I just realized a big misunderstanding on my side: wait_for_body_middleware is only for the request body and has nothing to do with the reponse body, right? Then it's obvious that "chunk" was always empty since I've only been testing with GET requests. But where does Pulsar actually read out the reponse body to send it to the client? Request bodies won't be too large, the reponse is where I get the MemoryError once I tried to send really big files.

Correct,

wait_for_body_middleware is almost a standard for any wsgi handler.

Unless you want to write an asynchronous proxy server, such as the one in http://pythonhosted.org/pulsar/tutorials/proxy.html where the incoming body may be a large file (a movie for example) that you need to send back to the original client (request body and response body are the same)

In your case, the request body is small, therefore you should use the wait_for_body_middleware to simplify things.

The implementation of an asynchronous response depends on how your receive the data you want to send back to the client.
Is the data already loaded in memory or are you accumulating it over time?
Did you get MemoryError using pulsar? If so what did your handler return?

Luca Sbardella

unread,
Jul 8, 2014, 9:03:59 AM7/8/14
to Sebastian Krause, python...@googlegroups.com
But where does Pulsar actually read out the reponse body to send it to the client? 


in the _response method of HttpServerResponse class.

Sebastian Krause

unread,
Jul 8, 2014, 9:23:52 AM7/8/14
to python...@googlegroups.com, dazo...@gmail.com
Hi Luca,


The implementation of an asynchronous response depends on how your receive the data you want to send back to the client.
Is the data already loaded in memory or are you accumulating it over time?
Did you get MemoryError using pulsar? If so what did your handler return?

When I send a very large file or stream using Flask's send_file() method (http://flask.pocoo.org/docs/api/#flask.send_file), it returns a werkzeug.wsgi.FileWrapper object (http://werkzeug.pocoo.org/docs/wsgi/#werkzeug.wsgi.FileWrapper) object which is simply a utility class that converts a file-like object into an iterable. Pulsar does actually iterate through this file chunk by chunk in pulsar.apps.wsgi.server.HttpServerResponse._response() (line 457) and calls the self.write method on the chunk. However, I still get a MemoryError.

I've created a small test program that demonstrates the MemoryError issue by generating 4 GB of reponse data using a generator function: https://gist.github.com/realpath/67f36488f7e2e9b4980d

You might need a 32-bit Python to reproduce the problem since 64-bit processes usually don't have a problem reading in 4 GB of data. I'm using the 32-bit Windows version of Python 3.4.1 on a 64-bit Windows 7. This is what happens here with the test program (when I run wget http://localhost:8080/ -O /dev/null):

When I set a debugging break point in the "for chunk in response" loop (pulsar.apps.wsgi.server.HttpServerResponse._response, line 457) everything seems to look fine: It reads a chunk and writes it to the client. wget reports that is has received 8192 bytes of data. But when I just let the program run at full speed, Python works at 100% until the following exception happens:

Traceback (most recent call last):
  File "D:\virtualenv\lib\site-packages\pulsar\apps\wsgi\server.py", line 459, in _response
    self.write(chunk)
  File "D:\virtualenv\lib\site-packages\pulsar\apps\wsgi\server.py", line 426, in write
    self.transport.write(b''.join(chunks))
  File "D:\Python\Python-3.4.1\lib\asyncio\selector_events.py", line 516, in write
    self._buffer.extend(data)
MemoryError

Only after the exception it actually starts to send data to wget until it eventually fails after a few hundred MB. At first glance it almost looks like a GIL problem to me: The Python interpreter is so busy writing data from the iterable into the transport, never releasing the GIL, that the transport actually never gets the chance to forward the data from the buffer to the client. But even without the GIL I think the same problem could happen when the client is simply receiving the data much slower than the buffers are filling up. Maybe the transport should just block the iterator when it's reached a certain size so that buffers don't get too large? Not sure if that works with asyncio...

Sebastian

Luca Sbardella

unread,
Jul 8, 2014, 12:17:31 PM7/8/14
to Sebastian Krause, python...@googlegroups.com
Hi Sebastian,


When I send a very large file or stream using Flask's send_file() method (http://flask.pocoo.org/docs/api/#flask.send_file), it returns a werkzeug.wsgi.FileWrapper object (http://werkzeug.pocoo.org/docs/wsgi/#werkzeug.wsgi.FileWrapper) object which is simply a utility class that converts a file-like object into an iterable. Pulsar does actually iterate through this file chunk by chunk in pulsar.apps.wsgi.server.HttpServerResponse._response() (line 457) and calls the self.write method on the chunk. However, I still get a MemoryError.

I've created a small test program that demonstrates the MemoryError issue by generating 4 GB of reponse data using a generator function: https://gist.github.com/realpath/67f36488f7e2e9b4980d

You might need a 32-bit Python to reproduce the problem since 64-bit processes usually don't have a problem reading in 4 GB of data. I'm using the 32-bit Windows version of Python 3.4.1 on a 64-bit Windows 7. This is what happens here with the test program (when I run wget http://localhost:8080/ -O /dev/null):
 
When I set a debugging break point in the "for chunk in response" loop (pulsar.apps.wsgi.server.HttpServerResponse._response, line 457) everything seems to look fine: It reads a chunk and writes it to the client. wget reports that is has received 8192 bytes of data. But when I just let the program run at full speed, Python works at 100% until the following exception happens:

Traceback (most recent call last):
  File "D:\virtualenv\lib\site-packages\pulsar\apps\wsgi\server.py", line 459, in _response
    self.write(chunk)
  File "D:\virtualenv\lib\site-packages\pulsar\apps\wsgi\server.py", line 426, in write
    self.transport.write(b''.join(chunks))
  File "D:\Python\Python-3.4.1\lib\asyncio\selector_events.py", line 516, in write
    self._buffer.extend(data)
MemoryError

This looks like a bug, do you mind filing an Issue on github?

For starter, I've noticed that the connection class does not reset the idle_timeout when it writes (only when it reads) and therefore after a given time (the keep-alive parameter defaults to 15 seconds) it closes the transport and no more writing occurs.
I'm not yet sure how the memory error occurs.
 

Only after the exception it actually starts to send data to wget until it eventually fails after a few hundred MB. At first glance it almost looks like a GIL problem to me: The Python interpreter is so busy writing data from the iterable into the transport, never releasing the GIL, that the transport actually never gets the chance to forward the data from the buffer to the client. But even without the GIL I think the same problem could happen when the client is simply receiving the data much slower than the buffers are filling up. Maybe the transport should just block the iterator when it's reached a certain size so that buffers don't get too large? Not sure if that works with asyncio...


Nothing to do with the GIL, this is to do with the protocol-transport interaction.


Sebastian Krause

unread,
Jul 8, 2014, 12:59:08 PM7/8/14
to python...@googlegroups.com, dazo...@gmail.com
Hi Luca,

This looks like a bug, do you mind filing an Issue on github?

 
For starter, I've noticed that the connection class does not reset the idle_timeout when it writes (only when it reads) and therefore after a given time (the keep-alive parameter defaults to 15 seconds) it closes the transport and no more writing occurs.
I'm not yet sure how the memory error occurs.

I'll investigate a bit more tomorrow. If I should try something specific, just tell me. So far it looks like this is the only critical problem I've found during my testing today that stops me from using Pulse in my project yet. :)

Sebastian
Reply all
Reply to author
Forward
0 new messages