I have system under test(SUT) that requires socketio-server. This server would response to SUT in some functionality. So this server is necessary enviroment for my SUT.
For socketio-server i chose aiohttp with python-socketio.
Example of functionality my socketio-server:
from aiohttp import web
import socketio
sio = socketio.AsyncServer()
app = web.Application()
sio.attach(app)
@sio.on('commands')
async def message_handler(sid, msg):
if msg['data']['command'] == 'terminal_settings_info':
response_msg = {'handler': msg['handler'],
'data': {'result': 1,
'is_success': True,
'TerminalID': 5,
'request_id': msg['data']['request_id']}}
await sio.emit('commands', response_msg)
elif msg['data']['command'] == 'get_agent_info':
response_msg = {'handler': msg['handler'],
'data': {'result': 1,
'is_success': True,
'terminal_address': 'Zelenograd',
'agent_support_phone': '8-888-88-88-88',
'mf_retail': True,
'request_id': msg['data']['request_id']}}
await sio.emit('commands', response_msg)
else:
raise ValueError('Unknown request.\nMessage: ' + msg.__repr__())
web.run_app(app, host='127.0.0.1', port=1234)I want to SetUp this socketio-server from pytest fixture before run tests and TearDown after tests complete. So, this server must non-blocks pytest thread. This is my purpose. How can i do this?
I'm pretty good in pytest and syncrhonous code execution in Python, but newbi for async programming(asyncio, aiohttp) and newbi for using threading, subprocess or multiprocessing.
from aiohttp import web
import socketio
sio = socketio.AsyncServer()
app = web.Application()
sio.attach(app)
@sio.on('commands')
async def message_handler(sid, msg):
if msg['data']['command'] == 'terminal_settings_info':
response_msg = {'handler': msg['handler'],
'data': {'result': 1,
'is_success': True,
'TerminalID': 5,
'request_id': msg['data']['request_id']}}
await sio.emit('commands', response_msg)
else:
raise ValueError('Unknown request.\nMessage: ' + msg.__repr__())
web.run_app(app, host='127.0.0.1', port=1234)
import pytest
import socketio
from aiohttp import web
from selenium.webdriver.common.by import By
@pytest.fixture(scope='module')
async def bridge():
sio = socketio.AsyncServer()
app = web.Application()
sio.attach(app)
@sio.on('commands')
async def message_handler(sid, msg):
if msg['data']['command'] == 'terminal_settings_info':
response_msg = {'handler': msg['handler'],
'data': {'result': 1,
'is_success': True,
'TerminalID': 5,
'request_id': msg['data']['request_id']}}
await sio.emit('commands', response_msg)
else:
raise ValueError('Unknown request.\nMessage: ' + msg.__repr__())
await web.run_app(app, host='127.0.0.1', port=1234)
def test_initialization(bridge, driver):
assert driver.find_element(By.ID, 'success-initialization-element')