How to run aiohttp app from pytest fixture

286 views
Skip to first unread message

andrew...@gmail.com

unread,
Jul 14, 2017, 11:11:33 AM7/14/17
to aio-libs

I have system under test(SUT) that requires socketio-server. This server would response to SUT in some functionality. So this server is necessary enviroment for my SUT.

For socketio-server i chose aiohttp with python-socketio.

Example of functionality my socketio-server:


from aiohttp import web
import socketio


sio = socketio.AsyncServer()
app = web.Application()
sio.attach(app)


@sio.on('commands')
async def message_handler(sid, msg):
    if msg['data']['command'] == 'terminal_settings_info':
        response_msg = {'handler': msg['handler'],
                        'data': {'result': 1,
                                 'is_success': True,
                                 'TerminalID': 5,
                                 'request_id': msg['data']['request_id']}}
        await sio.emit('commands', response_msg)

    elif msg['data']['command'] == 'get_agent_info':
        response_msg = {'handler': msg['handler'],
                        'data': {'result': 1,
                                 'is_success': True,
                                 'terminal_address': 'Zelenograd',
                                 'agent_support_phone': '8-888-88-88-88',
                                 'mf_retail': True,
                                 'request_id': msg['data']['request_id']}}
        await sio.emit('commands', response_msg)

    else:
        raise ValueError('Unknown request.\nMessage: ' + msg.__repr__())


web.run_app(app, host='127.0.0.1', port=1234)


I want to SetUp this socketio-server from pytest fixture before run tests and TearDown after tests complete. So, this server must non-blocks pytest thread. This is my purpose. How can i do this?

I'm pretty good in pytest and syncrhonous code execution in Python, but newbi for async programming(asyncio, aiohttp) and newbi for using threading, subprocess or multiprocessing.

Andrew Svetlov

unread,
Jul 14, 2017, 5:53:29 PM7/14/17
to aio-libs, andrew...@gmail.com
Sorry, your question is too broad.
What did you try and what don't work?
Message has been deleted

andrew...@gmail.com

unread,
Jul 18, 2017, 9:39:17 AM7/18/17
to aio-libs, andrew...@gmail.com
If i run following code from CLI(> python sio_server.py) it works perfectly.

sio_server.py
from aiohttp import web
import socketio


sio
= socketio.AsyncServer()
app
= web.Application()
sio
.attach(app)


@sio.on('commands')
async def message_handler(sid, msg):

   
if msg['data']['command'] == 'terminal_settings_info':
        response_msg
= {'handler': msg['handler'],
                       
'data': {'result': 1,
                                 
'is_success': True,
                                 
'TerminalID': 5,
                                 
'request_id': msg['data']['request_id']}}
       
await sio.emit('commands', response_msg)


    else:

       
raise ValueError('Unknown request.\nMessage: ' + msg.__repr__())


web
.run_app(app, host='127.0.0.1', port=1234)


But if i run this code from pytest as a fixture(> pytest test_init.py) it doesn't work: server doesn't start during test and i have "RuntimeWarning: coroutine 'bridge' was never awaited" in stdout.

test_init.py
import pytest
import socketio

from aiohttp import web
from selenium.webdriver.common.by import By


@pytest.fixture(scope='module')
async def bridge():

    sio
= socketio.AsyncServer()
    app
= web.Application()
    sio
.attach(app)

   
@sio.on('commands')
   
async def message_handler(sid, msg):

       
if msg['data']['command'] == 'terminal_settings_info':
            response_msg
= {'handler': msg['handler'],
                           
'data': {'result': 1,
                                     
'is_success': True,
                                     
'TerminalID': 5,
                                     
'request_id': msg['data']['request_id']}}
           
await sio.emit('commands', response_msg)


        else:

           
raise ValueError('Unknown request.\nMessage: ' + msg.__repr__())


    await web.run_app(app, host='127.0.0.1', port=1234)


def test_initialization(bridge, driver):
    assert
driver.find_element(By.ID, 'success-initialization-element')

I saw your answer on stackoverflow with link to http://aiohttp.readthedocs.io/en/stable/testing.html# but didnt realize my error.

Andrew Svetlov

unread,
Jul 18, 2017, 6:43:57 PM7/18/17
to aio-libs, andrew...@gmail.com
Of course `await web.run_app()` doesn't work because `run_app` is not a coroutine.
Try `pytest-aiohttp` 
Reply all
Reply to author
Forward
0 new messages