Channels 2.0 - Exception when sending a message to client

658 views
Skip to first unread message

muha...@swordfish.co.za

unread,
Feb 24, 2018, 7:56:45 PM2/24/18
to Django users
I'm still trying to find my way around channels 2.0, so I'm not sure if my implementation is incorrect or if this is a valid issue. I'm going to post as much info as possible in the hope that it will assist with finding the problem.

I have a single page app which opens a JS websocket connection - with channels 1 I used to add a session_key to the querystring and that used to handle the authentication.

I see this is no longer the case, so I now have a custom middleware class that sets the user object on the scope:

from django.contrib.sessions.models import Session

from users.models import User


class QueryAuthMiddleware:

    def __init__(self, inner):
        # Store the ASGI application we were passed
        self.inner = inner

    def __call__(self, scope):
        # Look up user from query string (you should also do things like
        # check it's a valid user ID, or if scope["user"] is already populated)
        if scope.get("user", None) is None:
            session_obj = Session.objects.get(session_key=scope["query_string"].decode("utf-8").split("=")[1])
            session_decoded = session_obj.get_decoded()

            scope["user"] = User.objects.get(id=session_decoded.get("_auth_user_id"))

            # Return the inner application directly and let it run everything else
        return self.inner(scope)



This is in turn added to my routing (channels.py):

from django.conf.urls import url
from django.conf import settings
from channels.routing import ProtocolTypeRouter, URLRouter, ChannelNameRouter

from notifications.consumer import TestWebsocketConsumer, TestConsumer
from notifications.middleware.query_auth_middleware import QueryAuthMiddleware

ROOT_PATH = "" if settings.DEBUG else "/ws/"


application = ProtocolTypeRouter({

    "websocket": QueryAuthMiddleware(
        URLRouter([
            url(f"^{ROOT_PATH}(?P<user_id>[-\w]+)/$", TestWebsocketConsumer),

        ])
    ),

    "channel": ChannelNameRouter({
      "user-notifications": TestConsumer,
    })


})




Here's my consumers.py:

from asgiref.sync import async_to_sync
from channels.consumer import SyncConsumer
from channels.generic.websocket import WebsocketConsumer


class TestWebsocketConsumer(WebsocketConsumer):
    def websocket_connect(self, message):
        async_to_sync(self.channel_layer.group_add)(str(self.scope["user"].id), "user-notifications")
        self.connect()


class TestConsumer(SyncConsumer):
    def notification_handler(self, message):

        self.send(
            {
             "type": "websocket.send",
             "text": message["text"]
            }
        )





The idea of the app is that each user that logs in on the front end is able to receive messages meant only for them sent by the back end.  I have been trying to test it like this:

>>> channel_layer = get_channel_layer()
>>> async_to_sync(channel_layer.send)("user-notifications", {"type": "notification.handler", "text": "My Message"})



Here's the traceback in the runworker output:

2018-02-25 02:34:14,002 - INFO - runworker - Running worker for channels ['user-notifications']
ERROR:root:Exception inside application: You must implement application_send()
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/consumer.py", line 54, in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/utils.py", line 48, in await_many_dispatch
    await dispatch(result)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", line 110, in __call__
    return await asyncio.wait_for(future, timeout=None)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py", line 333, in wait_for
    return (yield from fut)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/db.py", line 13, in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", line 125, in thread_handler
    return self.func(*args, **kwargs)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/consumer.py", line 99, in dispatch
    handler(message)
  File "/Users/muhammed/projects/xxx/my-app/app/notifications/consumer.py", line 18, in notification_handler
    "text": message["text"]
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/consumer.py", line 107, in send
    self.base_send(message)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", line 64, in __call__
    return call_result.result()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 405, in result
    return self.__get_result()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", line 78, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/server.py", line 71, in application_send
    raise NotImplementedError("You must implement application_send()")
  You must implement application_send()


OS: MacOS High Sierra
Python: 3.6.1

Django==2.0.2
channels==2.0.2
channels-redis==2.1.0
asgiref==2.1.6
daphne==2.0.4

Config from settings.py:

# -- Channels Details
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("localhost", 6379)]
}
}
}

ASGI_APPLICATION = "myapp.settings.channels.application"



Any help would be appreciated.

Ken Whitesell

unread,
Feb 24, 2018, 9:08:14 PM2/24/18
to Django users
I'm going to take a stab at this - with the warning that I'm extremely uncertain that I'm anywhere close to being correct, but I'm hoping that I'm at least close enough that it might give you some clues.

So where I think the issue may be is that you're trying to do a "self.send" from a worker task, not from a websocket task. If you look at the docs for the "Worker and Background Tasks", you'll see that to send an event you call "self.channel_layer.send" with a first parameter of the channel to which you wish to send the message.

Since this worker task isn't running in the context of a websocket consumer, it doesn't automatically know to which channel a response should be sent. (I _know_ that my wording there is imprecise and probably inaccurate, but I think I'm sufficiently close for casual conversation.)

If you want the worker task to send a message back to a client, what _I_ have done is create another method in my WebSocket consumer that takes a message and forwards it back out to the client. In thinking about it, I guess it may be possible that if I pass something (whatever that "something" might be) to the worker task, I might be able to send a message back out without going back through the consumer - again, that's a guess for something I haven't tried yet.

Anyway, I hope this helps.

Ken

Andrew Godwin

unread,
Feb 24, 2018, 9:55:46 PM2/24/18
to django...@googlegroups.com
You are entirely right, Ken - the problem here is that "self.send()" on consumers where their scope type is "channel" is not defined. I suspect we could do with a better error message in this case rather than the one shown here.

The confusion probably comes as this would sort have worked in Channels 1, but in Channels 2, the only consumer that can send to a websocket is the one that's attached to it. All anything else can do is send messages over the channel layer to the websocket consumer to trigger _it_ to run code and send something.

Andrew

--
You received this message because you are subscribed to the Google Groups "Django users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to django-users+unsubscribe@googlegroups.com.
To post to this group, send email to django...@googlegroups.com.
Visit this group at https://groups.google.com/group/django-users.
To view this discussion on the web visit https://groups.google.com/d/msgid/django-users/c5c4bef6-0469-401a-9624-1eed4374c1cf%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

muha...@swordfish.co.za

unread,
Feb 25, 2018, 4:42:27 AM2/25/18
to Django users
Thanks for the update - I think I understand now.

I updated my channel name routing and pointed it at the Websocket consumer:

"channel": ChannelNameRouter({
        "user-notifications": TestWebsocketConsumer,
})


Moved the message handler into TestWebsocketConsumer:

from asgiref.sync import async_to_sync

from channels.generic.websocket import WebsocketConsumer


class TestWebsocketConsumer(WebsocketConsumer):
    def websocket_connect(self, message):
        async_to_sync(self.channel_layer.group_add)(str(self.scope["user"].id), "user-notifications")
        self.connect()

    def notification_handler(self, message):
        self.send(
            {
             "text": message["text"]
            }
        )



This raises the same exception:

ERROR:root:Exception inside application: You must implement application_send()
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/consumer.py", line 54, in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/utils.py", line 48, in await_many_dispatch
    await dispatch(result)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", line 110, in __call__
    return await asyncio.wait_for(future, timeout=None)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py", line 333, in wait_for
    return (yield from fut)
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/db.py", line 13, in thread_handler
    return super().thread_handler(loop, *args, **kwargs)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", line 125, in thread_handler
    return self.func(*args, **kwargs)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/consumer.py", line 99, in dispatch
    handler(message)
  File "/Users/muhammed/projects/xxx/myapp/app/notifications/consumer.py", line 15, in notification_handler
    "text": message["text"]
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/generic/websocket.py", line 56, in send
    {"type": "websocket.send", "text": text_data},
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/channels/consumer.py", line 107, in send
    self.base_send(message)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", line 64, in __call__
    return call_result.result()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 405, in result
    return self.__get_result()
  File "/usr/local/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/concurrent/futures/_base.py", line 357, in __get_result
    raise self._exception
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/sync.py", line 78, in main_wrap
    result = await self.awaitable(*args, **kwargs)
  File "/Users/muhammed/projects/xxx/lib/python3.6/site-packages/asgiref/server.py", line 71, in application_send
    raise NotImplementedError("You must implement application_send()")
  You must implement application_send()

In my channels 1.x implementation, the channel name was routed to a method where I did a group send. I also tried this implementation in the websocket consumer:

def notification_handler(self, message):
async_to_sync(self.channel_layer.group_send)(
"1", # The group id
{
"text": message["text"]
}
)
print("done")


In my runworker output I see the print statement output, however on my front end app, the websocket onmessage callback is not fired.

Apologies again if it's something obvious.

muha...@swordfish.co.za

unread,
Feb 25, 2018, 5:04:13 AM2/25/18
to Django users
While I'm at it, would you mind confirming if I understand the following changes in channels 2.0 correctly ?

1) Channel names are not auto-detected and must be specified when running a worker
2) runserver no long starts up a worker by default, this has be done manually

On Sunday, February 25, 2018 at 2:56:45 AM UTC+2, muha...@swordfish.co.za wrote:

Ken Whitesell

unread,
Feb 25, 2018, 10:37:13 AM2/25/18
to Django users
Before I get to the guts of my answer, I'd suggest you get Andrew Godwin's "channels-examples" application and read it completely until you believe you understand everything that it's doing. (https://github.com/andrewgodwin/channels-examples)

Now, to the extent of my understanding, the "ChannelNameRouter" is used for background tasks - it's not used for connections by a websocket client- so the API is different than the router used for a websocket.

The applicable excerpt from the routing.py file in the channels-examples app (comments removed - see the full file for more details):
application = ProtocolTypeRouter({
    "websocket": AuthMiddlewareStack(
        URLRouter([
            # URLRouter just takes standard Django path() or url() entries.
            path("chat/stream/", ChatConsumer),
        ]),
    ),
    "channel": ChannelNameRouter({
        "mpge-timer": TimerConsumer
    }),

})

What this is saying to me is that in the channels world, there are (at least) two fundamentally different types of objects - 
1) There are websocket consumers, which are connected to by an external client using a websocket connection, _and_ can send and receive messages on the internal channel. These are defined in the "websocket" key of the ProtocolTypeRouter object.
When they are created by a connection being made, they are given a "scope" which exists for the life of the connection. This scope include a system-generated channel name - but it's implicit in the API that that consumer uses to send & receive messages - you don't specify it when making calls. (Don't know if that's universally true, but it's my current impression.)

2) There are worker tasks, which do _not_ create or use websockets to make a connection to the outside world. They exist to send and receive events via channels. (They can also make changes to your models, so strictly speaking, channels aren't their only means of communication to other tasks or processes.)

So for your task, you'll want to define your TestWebSocketConsumer within the websocket key of the ProtocolTypeRouter

Keep in mind that a websocket is created as an "upgrade" of a regular HTTP connection - hence the requirement of a url as part of the routing. Effectively, the client is issuing an HTTP GET request for a url, and also says, "Oh, by the way, I want to upgrade this connection to be a websocket". (The actual protocol is a little bit more complex than that - but not too much.) So the server gets the GET, and needs to route it to something that will handle that URL and provide the upgraded protocol. That is handled by Daphne at the lowest layers - this definition is just information that Daphne uses to figure out what object gets to handle each url.

Hope this helps.
Ken

Disclaimer: I am not Andrew Godwin, I do not play him on TV, and my code will never be mistaken for his. In any situation where there's a discrepancy between anything he has written and my comments, I am wrong. At this point I'm guessing I'm just about a step and a half ahead of you in working my way though this.

Ken Whitesell

unread,
Feb 25, 2018, 10:41:43 AM2/25/18
to Django users
In so far as you are talking about background worker tasks, you are correct.

But those statements do not pertain to a websocket consumer that accepts connections from a browser. You can run an application without ever creating a worker task. (Again, I'll refer you to the channels-examples app for such a demonstration.)

Ken

Ken Whitesell

unread,
Feb 25, 2018, 10:54:44 AM2/25/18
to Django users
And while we're at it, I'm going to toss out another opinion.

I've found it extremely beneficial _for me_ to not try to relate channels 2.0 to channels 1.1. I'm approaching it as if I'm learning about a completely separate package - working from a clean slate so-to-speak.

Ken

On Sunday, February 25, 2018 at 5:04:13 AM UTC-5, muha...@swordfish.co.za wrote:

muha...@swordfish.co.za

unread,
Feb 25, 2018, 2:17:07 PM2/25/18
to Django users
Thank you so much for the detailed explanation.

1) I now understand where my error was - the custom channel I used was passing events to a background worker that was not related to the websocket consumer. This also answers my question about how channel names must be included in the runworker command explicitly, similar to celery.

2) To fix my own issue, all I had to do was add the group to the right channel (self.channel_name in this case). I did read the multichat example a few times but missed the reference to the implicit channel name. 

I can now send messages to my front end app perfectly.


Andrew/Ken - Thank you for taking the time to assist and helping me solve the issue.


On Sunday, February 25, 2018 at 2:56:45 AM UTC+2, muha...@swordfish.co.za wrote:
Reply all
Reply to author
Forward
0 new messages