I have a question about running Channels and Django code under Celery. Originally, I had some code that used neither Channels nor Celery, but simply run as part of the View code:
def import_all(...)
...database access...
This is an expensive operation and to avoid timing out the browser, I needed to (a) run it under Celery but also (b) provide progress updates via a Websocket to the client. First, I implemented (b) using a Channels Consumer and the group_send, and that worked fine. I then tried to implement (a) using a Celery Task:
@task
def import_all(...):
...database access...
async_to_sync(group_send)(...)
However, this caused Celery to be unhappy and the worker to die:
...
2020-08-22 13:33:08,303 [ERROR] MainProcess: Task handler raised error: WorkerLostError('Worker exited prematurely: exitcode 0.')
Traceback (most recent call last):
File "/usr/local/lib/python3.8/dist-packages/billiard-3.6.3.0-py3.8.egg/billiard/pool.py", line 1265, in mark_as_worker_lost
raise WorkerLostError(
billiard.exceptions.WorkerLostError: Worker exited prematurely: exitcode 0.
After quite some Googling, and various detours including where the DataBase access caused issues (it seems it cannot be run from an async context), I ended up with this double-wrapped monster:
@task
def import_all(...):
async_to_sync(_import_all)(...)
async def _import_all(...):
await database_sync_to_async(__import_all)(...)
def __import_all(...): <<<<<<<<<< original code
...database access...
async_to_sync(group_send)(...)
This seems to work (even though the Celery task eventually dies, it seems to have done its work first, and is replaced). Is this double-wrapping really the correct approach? Even if it is clumsy, is there any reason to think it won't be reliable?
TIA, Shaheed
P.S. I should mention that I am using Celery's prefork library, as opposed to say gevent or greenlets.