#36863: Under WSGI, multiple calls to asgiref.sync.async_to_sync within the same
request do not share the same event loop.
----------------------------------+--------------------------------------
Reporter: Mykhailo Havelia | Owner: (none)
Type: Bug | Status: new
Component: HTTP handling | Version: 6.0
Severity: Normal | Resolution:
Keywords: async, wsgi | Triage Stage: Unreviewed
Has patch: 0 | Needs documentation: 0
Needs tests: 0 | Patch needs improvement: 0
Easy pickings: 0 | UI/UX: 0
----------------------------------+--------------------------------------
Comment (by Vishy Algo):
With asgiref >= 3.10, we can leverage AsyncSingleThreadContext to enforce
a persistent execution context for the entire request lifecycle.
I suggest wrapping the {{{ WSGIHandler.__call__ }}} logic in this context.
Crucially, because the WSGI response iteration often outlives the {{{
__call__ }}} stack frame (e.g. StreamingHttpResponse), we cannot use a
simple with block or decorator. Instead, we must manually manage the
context's lifecycle and attach the cleanup logic to response.close().
I’ve verified this behavior with a test asserting that the thread is
successfully reused across calls.
{{{#!diff
diff --git a/django/core/handlers/wsgi.py b/django/core/handlers/wsgi.py
index aab9fe0c49..d531c6a564 100644
--- a/django/core/handlers/wsgi.py
+++ b/django/core/handlers/wsgi.py
@@ -118,30 +118,45 @@ class WSGIHandler(base.BaseHandler):
self.load_middleware()
def __call__(self, environ, start_response):
- set_script_prefix(get_script_name(environ))
- signals.request_started.send(sender=self.__class__,
environ=environ)
- request = self.request_class(environ)
- response = self.get_response(request)
-
- response._handler_class = self.__class__
-
- status = "%d %s" % (response.status_code, response.reason_phrase)
- response_headers = [
- *response.items(),
- *(("Set-Cookie", c.OutputString()) for c in
response.cookies.values()),
- ]
- start_response(status, response_headers)
- if getattr(response, "file_to_stream", None) is not None and
environ.get(
- "wsgi.file_wrapper"
- ):
- # If `wsgi.file_wrapper` is used the WSGI server does not
call
- # .close on the response, but on the file wrapper. Patch it
to use
- # response.close instead which takes care of closing all
files.
- response.file_to_stream.close = response.close
- response = environ["wsgi.file_wrapper"](
- response.file_to_stream, response.block_size
- )
- return response
+ async_context = AsyncSingleThreadContext()
+ async_context.__enter__()
+ try:
+ set_script_prefix(get_script_name(environ))
+ signals.request_started.send(sender=self.__class__,
environ=environ)
+ request = self.request_class(environ)
+ response = self.get_response(request)
+
+ response._handler_class = self.__class__
+
+ status = "%d %s" % (response.status_code,
response.reason_phrase)
+ response_headers = [
+ *response.items(),
+ *(("Set-Cookie", c.OutputString()) for c in
response.cookies.values()),
+ ]
+ start_response(status, response_headers)
+
+ original_close = response.close
+
+ def close():
+ try:
+ original_close()
+ finally:
+ async_context.__exit__(None, None, None)
+
+ if getattr(response, "file_to_stream", None) is not None and
environ.get(
+ "wsgi.file_wrapper"
+ ):
+ # If `wsgi.file_wrapper` is used the WSGI server does not
call
+ # .close on the response, but on the file wrapper. Patch
it to use
+ # response.close instead which takes care of closing all
files.
+ response.file_to_stream.close = response.close
+ response = environ["wsgi.file_wrapper"](
+ response.file_to_stream, response.block_size
+ )
+ return response
+ except Exception:
+ async_context.__exit__(None, None, None)
+ raise
}}}
{{{#!python
def test_async_context_reuse(self):
"""
Multiple calls to async_to_sync within a single request share the
same thread and event loop via AsyncSingleThreadContext.
"""
async def get_thread_ident():
return threading.get_ident()
class ProbingHandler(WSGIHandler):
def __init__(self):
pass
def get_response(self, request):
t1 = async_to_sync(get_thread_ident)()
t2 = async_to_sync(get_thread_ident)()
return HttpResponse(f"{t1}|{t2}")
app = ProbingHandler()
environ = self.request_factory._base_environ(PATH_INFO="/")
def start_response(status, headers):
pass
response = app(environ, start_response)
content = b"".join(response).decode("utf-8")
t1, t2 = content.split("|")
self.assertEqual(
t1, t2,
f"Failed: async_to_sync spawned new threads ({t1} vs {t2}).
Context was not reused."
)
}}}
--
Ticket URL: <
https://code.djangoproject.com/ticket/36863#comment:4>