Hi All:
I'm migrating a small application from Django 1.x/Channels 1.x to Django 2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to determine. It could be due to a failure on my part to correctly implement the channel_layer or it could be due to an
incompatibility with Celery 4.1. The basics are:
- Run a periodic Celery task
- Use the channel_layer to perform a group_send
- Have the consumer receive the group_send event and push a json message over the socket
Show below is my simple consumer.py module:
class mstatusMessage(AsyncJsonWebsocketConsumer):
##### WebSocket event handlers
async def connect(self):
"""
Called when the websocket is handshaking as part of initial connection.
"""
# Accept the connection
await self.accept()
# Add to the group so they get messages
await self.channel_layer.group_add(
settings.CHANNEL_GROUP,
self.channel_name,
)
async def disconnect(self, code):
"""
Called when the WebSocket closes for any reason.
"""
# Remove them from the group
await self.channel_layer.group_discard(
settings.CHANNEL_GROUP,
self.channel_name,
)
##### Handlers for messages sent over the channel layer
# These helper methods are named by the types we send - so epics.join becomes epics_join
async def epics_message(self, event):
"""
Called when the Celery task queries Epics.
"""
logging.error("### Received Msg ###")
# Send a message down to the client
await self.send_json(
{
"text": event["message"],
},
)
The routing is simple:
application = ProtocolTypeRouter({
"websocket": mstatusMessage
})
The Celery task is as follows:
@shared_task
def updateData(param):
logger.error('##### updateData #####')
# # Get an instance of the channel layer for
# # inter task communications
channel_layer = get_channel_layer()
channel_layer.group_send(
settings.CHANNEL_GROUP,
{"type": "epics.message", "text": "Hello World"},
)
The results are promising as the websocket connect opens successfully and the Celery task run as show by the debugging output given below:
2018-03-02 09:32:12,280 INFO ### Connected ###
[2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[8d329e61-]: ##### updateData #####
[2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[786f51a6-]: ##### updateData #####
BUT ............... although the Celery task runs the consumer never receives a message via the channel layer. This could be due to an
implementation error or, maybe, a compatibility issue. The application doesn't crash but the following warning is issued:
[2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2] /mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine 'RedisChannelLayer.group_send' was never awaited
{"type": "epics.message", "text": "Hello World"},
This warning appears related to the Python asyncio functionality. Under the Celery task module, the channel_layer.group_send
doesn't use the await directive as it's inclusion hangs the Celery task. Changing the Celery task to:
async def updateData(param):
logger.error('##### updateData #####')
# # Get an instance of the channel layer for
# # inter task communications
channel_layer = get_channel_layer()
await channel_layer.group_send(
settings.CHANNEL_GROUP,
{"type": "epics.message", "text": "Hello World"},
)
This results in the following runtime warning and the Celery task fails to run (the debug message is never printed) :
[2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2] /home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358: RuntimeWarning: coroutine 'updateData' was never awaited
result = (True, prepare_result(fun(*args, **kwargs)))
I'm sure these warnings are understood by someone who can provide guidance with respect to a solution.
Thanks,
G Broten
Reference:
The application has be tried under two OS versions:
CentOS 7.4
Alpine Linux 3.7
A partial pip list of the significant packages:
asgi-redis (1.4.3)
asgiref (2.1.6)
async-timeout (2.0.0)
billiard (3.5.0.3)
cached-property (1.4.0)
celery (4.1.0)
channels (2.0.2)
channels-redis (2.1.0)
daphne (2.0.4)
Django (2.0.2)