Django 2.0.2, Channels 2.0.2 and Celery 4.1 Issue

550 views
Skip to first unread message

G Broten

unread,
Mar 2, 2018, 1:36:08 PM3/2/18
to Django users
Hi All:
 I'm migrating a small application from Django 1.x/Channels 1.x to Django 2.0.2 and Channels 2.0. I've run into an issue whose cause I'm trying to determine. It could be due to a failure on my part to correctly implement the channel_layer or it could be due to an
incompatibility with Celery 4.1. The basics are:
- Run a periodic Celery task
- Use the channel_layer to perform a group_send
- Have the consumer receive the group_send event and push a json  message over the socket

Show below is my simple consumer.py module:
class mstatusMessage(AsyncJsonWebsocketConsumer):

    ##### WebSocket event handlers

    async def connect(self):
        """
        Called when the websocket is handshaking as part of initial connection.
        """
        logging.info("### Connected ###")
        # Accept the connection
        await self.accept()

        # Add to the group so they get messages
        await self.channel_layer.group_add(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    async def disconnect(self, code):
        """
        Called when the WebSocket closes for any reason.
        """
        # Remove them from the group
        await self.channel_layer.group_discard(
            settings.CHANNEL_GROUP,
            self.channel_name,
        )

    ##### Handlers for messages sent over the channel layer

    # These helper methods are named by the types we send - so epics.join becomes epics_join
    async def epics_message(self, event):
        """
        Called when the Celery task queries Epics.
        """
        logging.error("### Received Msg ###")
        # Send a message down to the client
        await self.send_json(
            {
                "text": event["message"],
            },
        )

The routing is simple:
application = ProtocolTypeRouter({
    "websocket":  mstatusMessage
})

The Celery task is as follows:
@shared_task
def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

The results are promising as the websocket connect opens successfully and the Celery task run as show by the debugging output given below:
127.0.0.1:59818 - - [02/Mar/2018:09:32:11] "GET /" 200 100639
127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECTING /epics/" - -
2018-03-02 09:32:12,280 INFO     ### Connected ###
127.0.0.1:59844 - - [02/Mar/2018:09:32:12] "WSCONNECT /epics/" - -
[2018-03-02 09:32:12,312: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[8d329e61-]: ##### updateData #####
[2018-03-02 09:32:13,310: ERROR/ForkPoolWorker-2] mstatus.tasks.updateData[786f51a6-]: ##### updateData #####

BUT ............... although the Celery task runs the consumer never receives a message via the channel layer. This could be due to an
implementation error or, maybe, a compatibility issue. The application doesn't crash but the following warning is issued:

[2018-03-02 09:32:02,105: WARNING/ForkPoolWorker-2] /mstatus/mstatus/tasks.py:33: RuntimeWarning: coroutine 'RedisChannelLayer.group_send' was never awaited
  {"type": "epics.message", "text": "Hello World"},

This warning appears related to the Python asyncio functionality. Under the Celery task module, the channel_layer.group_send
doesn't use the await directive as it's inclusion hangs the Celery task. Changing the Celery task to:
async def updateData(param):

    logger.error('##### updateData #####')

    # # Get an instance of the channel layer for
    # # inter task communications
    channel_layer = get_channel_layer()

    await channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

This results in the following runtime warning and the Celery task fails to run (the debug message is never printed) :

[2018-03-02 09:45:19,804: WARNING/ForkPoolWorker-2] /home/broteng/.pyenv/versions/3.6.3/envs/djchannels2/lib/python3.6/site-packages/billiard/pool.py:358: RuntimeWarning: coroutine 'updateData' was never awaited
  result = (True, prepare_result(fun(*args, **kwargs)))

I'm sure these warnings are understood by someone who can provide guidance with respect to a solution.

Thanks,

G Broten

Reference:

The application has be tried under two OS versions:
CentOS 7.4
Alpine Linux 3.7

A partial pip list of the significant packages:
asgi-redis (1.4.3)
asgiref (2.1.6)
async-timeout (2.0.0)
billiard (3.5.0.3)
cached-property (1.4.0)
celery (4.1.0)
channels (2.0.2)
channels-redis (2.1.0)
daphne (2.0.4)
Django (2.0.2)
redis (2.10.6)





Ken Whitesell

unread,
Mar 2, 2018, 1:57:01 PM3/2/18
to Django users
Taking a stab at this - I believe the original problem may be here:

    channel_layer.group_send(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )

Your updateData method is a synchronous method. However, channel_layer.group_send is an asynchronous method.

What you might try is wrapping the group_send method in the async_to_sync function.
See the documentation at http://channels.readthedocs.io/en/latest/topics/channel_layers.html#synchronous-functions

    async_to_sync(channel_layer.group_send)(
        settings.CHANNEL_GROUP,
        {"type": "epics.message", "text": "Hello World"},
    )


Your first solution to make updateData an asynchronous method might work with some other work involved in adding that task to the event loop - but that answer is beyond me at the moment.

Hope this helps,
     Ken

G Broten

unread,
Mar 2, 2018, 3:14:41 PM3/2/18
to Django users
Two big thumbs up for Ken! 
His keen eyes spotted the problem, which was attempting an asynchronous send from the Celery task. I changed the code to use the synchronous send and, bingo, the consumer now receives events via the channel layer.

A big thank-you to Ken! And, I'm sure anyone else using Channels 2.0 with Celery will find this thread of use.

G Broten

Sergio Lopez

unread,
Apr 16, 2018, 1:18:58 PM4/16/18
to Django users
Plese, Can you send us a basic example of celery 4 and channels 2?
Reply all
Reply to author
Forward
0 new messages