Indeed, that is exactly what I am doing - run processing in the background with
task = asyncio.ensure_future(database_sync_to_async(self._process)(message))
and then keep track of that running tasks. Actually, I am quite happy with this except one thing. I would like to limit the number if messages processed at the same time by a single worker. The logic is straightforward - once I start a new task I check if the "tasks per worker limit" has reached and if so - just invoke
asyncio.wait(self._tasks, return_when=asyncio.FIRST_COMPLETED)
so I block the consumer's message handler until some task finishes. I did not make experiments, but from
the worker's code I can conclude that worker will anyway extract messages from the channel and put them into the application's queue which (according to this
this) has unlimited size. And that is what bothers me the most. My logic is simple, if a worker has already reached the "tasks per worker" limit, then I do not want this worker to extract message from the channel, cause there is probably another worker process willing to process it. Frankly, I do not understand how to achieve this... sorry for bothering, but probably you have some bright idea, please?
Anyway, thank you for explanation you have already gave me, it helps, and it is always pleasure to chat with you :-)
среда, 2 мая 2018 г., 22:35:08 UTC+3 пользователь Andrew Godwin написал: