I am using Django 2.0.5, Python 3.6.2, Django Channels 2.1.1, aiohttp 3.1.3.
Background:
I am connecting to an external websocket, asynchronously, using aiohttp within a background channels worker. Once triggered, the socket connection stays alive, listening/streaming, and sends incoming messages to a consumer through a group_send. The consumer receives it at the appropriate handler, however, after some time, it appears that the channels websocket dies and the handler can no longer be found - I get 'No handler for message type....' even though it has been working successfully until that point. Second issue is that 'await t1' seems to block in the background. I route the 'toggle' to backgroundUpdater.py.
consumers.py:
from channels.generic.websocket import AsyncWebsocketConsumer
import json
class MyConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.channel_layer.group_add('myGroup', self.channel_name)
await self.accept()
async def disconnect(self, closeCode):
await self.channel_layer.group_discard('myGroup', self.channel_name)
async def receive(self, text_data):
eventData = json.loads(text_data)
print(eventData)
if eventData['myToggle'] == 'on':
await self.channel_layer.send('toggle',{"type": "openConnection"})
elif eventData['myToggle'] == 'off':
await self.channel_layer.send('toggle',{"type": "closeConnection"})
async def backgroundWorkerUpdate(self, data):
await self.send(text_data=json.dumps(data['updateTime']))
backgroundUpdater.py:
from channels.consumer import AsyncConsumer
from channels.layers import get_channel_layer
import asyncio
import aiohttp
import json
from dashboard.models import Info
import datetime
class backgroundConsumer(AsyncConsumer):
turnOnListener = True
channelLayer = get_channel_layer()
loadComplete = False
async def listen(self):
infoList = Info.objects.filter(active=True)
subInfoList = []
for i in infoList:
subInfoList.append('info'+str(t))\
self.loadComplete = False
async with aiohttp.ClientSession() as session:
await ws.send_str(str(subInfoList))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data.startswith('2'):
await ws.send_str('3')
else:
await self.parseListenData(msg)
async def parseListenData(self, msgContent):
# handles data in here, including a call to datetime
updateMsg = {'type': 'backgroundWorkerUpdate',
'updateTime': str(updateTime),
}
print('built update msg and now sending to group')
await self.channelLayer.group_send('myGroup', updateMsg)
async def openConnection(self, event):
if(self.turnOnListener):
await self.channelLayer.group_add(
'myGroup',
self.channel_name,
)
self.turnOnListener = False
loop = asyncio.get_event_loop()
t1 = loop.create_task(self.listen())
await t1
print('awaiting t1') # THIS NEVER PRINTS UNTIL THE WEBSOCKET IN consumers.py CRASHES
else:
print('already turned on listener')
async def closeConnection(self, event):
print('in close connection') # CANNOT SEEM TO TRIGGER THIS ONCE await t1 is called above...
if(not self.turnOnListener):
print('turn it off')
Questions:
1. Why is does this work for some time until the websocket in consumers.py crashes? I am assuming it has crashed because of the missing handler (i.e. 'No handler...' error).
2. Why is the 'await t1' call blocking? When I try to send 'await self.channel_layer.send('toggleMarket',{"type": "closeConnection"})', it sends, but nothing seems to be received?
I am have been trying to get this to run in different ways, however, I cannot seem to resolve these two issues. Any help/suggestions would be greatly appreciated.
Thanks!