Hello Andrey and Community!
We made an application with aiohttp and faced a strange behaviour of ClientSession.
This is the code with related parts
import asyncio
import aiohttp
import aiorun
class Consul:
def __init__(self, endpoint: str, session: aiohttp.ClientSession) -> None:
self.__endpoint = endpoint
self.__session = session
asyncio.Task(self.__renew_session())
async def __renew_session(self) -> None:
while 1:
await self.__session.put(f"{self.__endpoint}/v1/session/renew/{self.__session_id}")
await asyncio.sleep(10)
class Nomad:
def __init__(self, endpoint: str, session: aiohttp.ClientSession) -> None:
self.__endpoint = endpoint
self.__session = session
async def allocations(self, index: int) -> int:
params = {
'stale': '',
'index': index,
'wait': f"{5 * 60 * 1000}ms"
}
async with self.__session.get(f"{self.__endpoint}/v1/allocations", params=params, timeout=0) as r:
return 1
async def jobs(self) -> dict:
async with self.__session.get(f"{self.__endpoint}/v1/jobs", timeout=5) as r:
return await r.json()
class Service:
def __init__(self, nomad: Nomad, consul: Consul) -> None:
self.__nomad = nomad
self.__consul = consul
async def run(self) -> None:
while 1:
await self.__nomad.allocations(1)
jobs = await self.__nomad.jobs()
# some logic
async def main() -> None:
session = aiohttp.ClientSession()
consul = Consul("localhost:8500", session)
nomad = Nomad("localhost:4646", session)
await Service(nomad, consul).run()
aiorun.run(main())
Let me explain it.
In Service while loop we wait for updates from Nomad service (what service - it does not matter) by doing blocking query without timeout (Nomad.allocations method), then we ask the service for what actually changed (Nomad.jobs method) and sometimes we stuck there, so we had to add timeout=5 to it.
This is tcpdump screenshot from the server where Nomad is running, all applications on one server in different VMs.
Unfortunately I cannot provide .pcap file to public due to some sensitive information, but I can send it personally for investigation

As you can see, the service sent reply with json (packet N13), but our application didn't receive it and killed connection after 5 seconds (packet N15)
As workaround we replaced self.__session in both jobs and allocations methods with aiohttp.ClientSession(), even removed timeout=5 from jobs method, and it works fine for now.
this
async with self.__session.get
to this
async with aiohttp.ClientSession() as session:
async with session.get
So what we're doing wrong?
Thanks!