[QUEUED scylladb next] test.py: HTTP client helper

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 4, 2022, 5:51:27 AM10/4/22
to scylladb-dev@googlegroups.com, Alejo Sanchez
From: Alejo Sanchez <alejo....@scylladb.com>
Committer: Alejo Sanchez <alejo....@scylladb.com>
Branch: next

test.py: HTTP client helper

Split aiohttp client to a shared helper file.

While there, move aiohttp session setup back to constructors. When there
were teardown issues it looked it could be caused by aiohttp session
being created outside a coroutine. But this is proven not to be the case
after recent fixes. So move it back to the ManagerClient constructor.

On th other hand, create a close() coroutine to stop the aiohttp session.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>

---
diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py
--- a/test/pylib/manager_client.py
+++ b/test/pylib/manager_client.py
@@ -9,11 +9,9 @@
Manages driver refresh when cluster is cycled.
"""

-import os.path
from typing import List, Optional, Callable
import logging
-import aiohttp # type: ignore
-import aiohttp.web # type: ignore
+from test.pylib.rest_client import UnixRESTClient
from cassandra.cluster import Session as CassandraSession # type: ignore # pylint: disable=no-name-in-module
from cassandra.cluster import Cluster as CassandraCluster # type: ignore # pylint: disable=no-name-in-module

@@ -27,9 +25,6 @@ class ManagerClient():
sock_path (str): path to an AF_UNIX socket where Manager server is listening
con_gen (Callable): generator function for CQL driver connection to a cluster
"""
- # pylint: disable=too-many-instance-attributes
- conn: aiohttp.UnixConnector
- session: aiohttp.ClientSession

def __init__(self, sock_path: str, port: int, use_ssl: bool,
con_gen: Optional[Callable[[List[str], int, bool], CassandraSession]]) -> None:
@@ -38,18 +33,12 @@ def __init__(self, sock_path: str, port: int, use_ssl: bool,
self.con_gen = con_gen
self.ccluster: Optional[CassandraCluster] = None
self.cql: Optional[CassandraSession] = None
- # API
- self.sock_path = sock_path
- self.sock_name = os.path.basename(sock_path)
-
- async def start(self):
- """Setup connection to Manager server"""
- self.conn = aiohttp.UnixConnector(path=self.sock_path)
- self.session = aiohttp.ClientSession(connector=self.conn)
+ # A client for communicating with ScyllaClusterManager (server)
+ self.client = UnixRESTClient(sock_path)

async def stop(self):
- """Close client session and close driver"""
- await self.session.close()
+ """Close client session and driver"""
+ await self.client.close()
self.driver_close()

async def driver_connect(self) -> None:
@@ -80,7 +69,7 @@ async def before_test(self, test_case_name: str) -> None:
dirty = await self.is_dirty()
if dirty:
self.driver_close() # Close driver connection to old cluster
- resp = await self._get(f"/cluster/before-test/{test_case_name}")
+ resp = await self.client.get(f"/cluster/before-test/{test_case_name}")
if resp.status != 200:
raise RuntimeError(f"Failed before test check {await resp.text()}")
if self.cql is None:
@@ -91,93 +80,75 @@ async def before_test(self, test_case_name: str) -> None:
async def after_test(self, test_case_name: str) -> None:
"""Tell harness this test finished"""
logger.debug("after_test for %s", test_case_name)
- await self._get(f"/cluster/after-test")
-
- async def _get(self, resource: str) -> aiohttp.ClientResponse:
- # Can raise exception. See https://docs.aiohttp.org/en/latest/web_exceptions.html
- # NOTE: using Python requests style URI for Unix domain sockets to avoid using "localhost"
- resp = await self.session.get(self._resource_uri(resource))
- if resp.status != 200:
- text = await resp.text()
- raise Exception(f'status code: {resp.status}, body text: {text}')
- return resp
-
- async def _get_text(self, resource: str) -> str:
- resp = await self._get(resource)
- return await resp.text()
-
- async def _put_json(self, resource: str, json: dict) -> aiohttp.ClientResponse:
- return await self.session.request(method='PUT', url=self._resource_uri(resource), json=json)
-
- def _resource_uri(self, resource: str) -> str:
- return f"http+unix://{self.sock_name}{resource}"
+ await self.client.get(f"/cluster/after-test")

async def is_manager_up(self) -> bool:
"""Check if Manager server is up"""
- ret = await self._get_text("/up")
+ ret = await self.client.get_text("/up")
return ret == "True"

async def is_cluster_up(self) -> bool:
"""Check if cluster is up"""
- ret = await self._get_text("/cluster/up")
+ ret = await self.client.get_text("/cluster/up")
return ret == "True"

async def is_dirty(self) -> bool:
"""Check if current cluster dirty."""
- dirty = await self._get_text("/cluster/is-dirty")
+ dirty = await self.client.get_text("/cluster/is-dirty")
return dirty == "True"

async def replicas(self) -> int:
"""Get number of configured replicas for the cluster (replication factor)"""
- resp = await self._get_text("/cluster/replicas")
+ resp = await self.client.get_text("/cluster/replicas")
return int(resp)

async def servers(self) -> List[str]:
"""Get list of running servers"""
- host_list = await self._get_text("/cluster/servers")
- return host_list.split(',')
+ host_list = await self.client.get_text("/cluster/servers")
+ return host_list.split(",")

async def mark_dirty(self) -> None:
"""Manually mark current cluster dirty.
To be used when a server was modified outside of this API."""
- await self._get_text("/cluster/mark-dirty")
+ await self.client.get_text("/cluster/mark-dirty")

async def server_stop(self, server_id: str) -> None:
"""Stop specified server"""
logger.debug("ManagerClient stopping %s", server_id)
- await self._get_text(f"/cluster/server/{server_id}/stop")
+ await self.client.get_text(f"/cluster/server/{server_id}/stop")

async def server_stop_gracefully(self, server_id: str) -> None:
"""Stop specified server gracefully"""
logger.debug("ManagerClient stopping gracefully %s", server_id)
- await self._get_text(f"/cluster/server/{server_id}/stop_gracefully")
+ await self.client.get_text(f"/cluster/server/{server_id}/stop_gracefully")

async def server_start(self, server_id: str) -> None:
"""Start specified server"""
logger.debug("ManagerClient starting %s", server_id)
- await self._get_text(f"/cluster/server/{server_id}/start")
+ await self.client.get_text(f"/cluster/server/{server_id}/start")
self._driver_update()

async def server_restart(self, server_id: str) -> None:
"""Restart specified server"""
logger.debug("ManagerClient restarting %s", server_id)
- await self._get_text(f"/cluster/server/{server_id}/restart")
+ await self.client.get_text(f"/cluster/server/{server_id}/restart")
self._driver_update()

async def server_add(self) -> str:
"""Add a new server"""
- server_id = await self._get_text("/cluster/addserver")
+ server_id = await self.client.get_text("/cluster/addserver")
self._driver_update()
logger.debug("ManagerClient added %s", server_id)
return server_id

async def server_get_config(self, server_id: str) -> dict[str, object]:
- resp = await self._get(f"/cluster/server/{server_id}/get_config")
+ resp = await self.client.get(f"/cluster/server/{server_id}/get_config")
if resp.status != 200:
raise Exception(await resp.text())
return await resp.json()

async def server_update_config(self, server_id: str, key: str, value: object) -> None:
- resp = await self._put_json(f"/cluster/server/{server_id}/update_config", {'key': key, 'value': value})
+ resp = await self.client.put_json(f"/cluster/server/{server_id}/update_config",
+ {"key": key, "value": value})
if resp.status != 200:
raise Exception(await resp.text())
diff --git a/test/pylib/rest_client.py b/test/pylib/rest_client.py
--- a/test/pylib/rest_client.py
+++ b/test/pylib/rest_client.py
@@ -0,0 +1,112 @@
+#
+# Copyright (C) 2022-present ScyllaDB
+#
+# SPDX-License-Identifier: AGPL-3.0-or-later
+#
+"""Asynchronous helper for Scylla REST API operations.
+"""
+import os.path
+from typing import Optional
+import aiohttp
+
+
+class RESTSession:
+ def __init__(self, connector: aiohttp.BaseConnector = None):
+ self.session: aiohttp.ClientSession = aiohttp.ClientSession(connector = connector)
+
+ async def close(self) -> None:
+ """End session"""
+ await self.session.close()
+
+ async def get(self, resource_uri: str) -> aiohttp.ClientResponse:
+ """Fetch remote resource or raise"""
+ # Can raise exception. See https://docs.aiohttp.org/en/latest/web_exceptions.html
+ resp = await self.session.get(resource_uri)
+ if resp.status != 200:
+ text = await resp.text()
+ raise RuntimeError(f"status code: {resp.status}, body text: {text}")
+ return resp
+
+ async def get_text(self, resource_uri: str) -> str:
+ """Fetch remote resource text response or raise"""
+ resp = await self.get(resource_uri)
+ return await resp.text()
+
+ async def post(self, resource_uri: str, params: Optional[dict[str, str]]) \
+ -> aiohttp.ClientResponse:
+ """Post to remote resource or raise"""
+ resp = await self.session.post(resource_uri, params=params)
+ if resp.status != 200:
+ text = await resp.text()
+ raise RuntimeError(f"status code: {resp.status}, body text: {text}, "
+ f"resource {resource_uri} params {params}")
+ return resp
+
+ async def put_json(self, resource_uri: str, json: dict) \
+ -> aiohttp.ClientResponse:
+ """Put JSON"""
+ return await self.session.request(method="PUT", url=resource_uri, json=json)
+
+
+class UnixRESTClient:
+ """An async helper for REST API operations using AF_UNIX socket"""
+
+ def __init__(self, sock_path: str):
+ self.sock_name: str = os.path.basename(sock_path)
+ self.session = RESTSession(aiohttp.UnixConnector(path=sock_path))
+
+ async def close(self) -> None:
+ """End session"""
+ await self.session.close()
+
+ async def get(self, resource: str) -> aiohttp.ClientResponse:
+ return await self.session.get(self._resource_uri(resource))
+
+ async def put_json(self, resource: str, json: dict) -> aiohttp.ClientResponse:
+ """Put JSON"""
+ return await self.session.put_json(self._resource_uri(resource), json=json)
+
+ async def get_text(self, resource: str) -> str:
+ """Fetch remote resource text response or raise"""
+ return await self.session.get_text(self._resource_uri(resource))
+
+ async def post(self, resource: str, params: Optional[dict[str, str]] = None) \
+ -> aiohttp.ClientResponse:
+ """Post to remote resource or raise"""
+ return await self.session.post(self._resource_uri(resource), params)
+
+ def _resource_uri(self, resource: str) -> str:
+ # NOTE: using Python requests style URI for Unix domain sockets to avoid using "localhost"
+ # host parameter is ignored
+ return f"http+unix://{self.sock_name}{resource}"
+
+
+class TCPRESTClient:
+ """An async helper for REST API operations"""
+
+ def __init__(self, port: int):
+ self.port: int = port
+ self.session = RESTSession()
+
+ async def close(self) -> None:
+ """End session"""
+ await self.session.close()
+
+ async def get(self, resource: str, host: str) -> aiohttp.ClientResponse:
+ return await self.session.get(self._resource_uri(resource, host))
+
+ async def put_json(self, resource: str, host: str, json: dict) -> aiohttp.ClientResponse:
+ """Put JSON"""
+ return await self.session.put_json(self._resource_uri(resource, host), json=json)
+
+ async def get_text(self, resource: str, host: str) -> str:
+ """Fetch remote resource text response or raise"""
+ return await self.session.get_text(self._resource_uri(resource, host))
+
+ async def post(self, resource: str, host: str, params: Optional[dict[str, str]] = None) \
+ -> aiohttp.ClientResponse:
+ """Post to remote resource or raise"""
+ return await self.session.post(self._resource_uri(resource, host), params)
+
+ def _resource_uri(self, resource: str, host: str) -> str:
+ return f"http://{host}:{self.port}{resource}"
diff --git a/test/topology/conftest.py b/test/topology/conftest.py
--- a/test/topology/conftest.py
+++ b/test/topology/conftest.py
@@ -150,7 +150,6 @@ async def manager_internal(event_loop, request):
port = int(request.config.getoption('port'))
ssl = bool(request.config.getoption('ssl'))
manager_int = ManagerClient(request.config.getoption('manager_api'), port, ssl, cluster_con)
- await manager_int.start()
yield manager_int
await manager_int.stop() # Stop client session and close driver after last test

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 4, 2022, 2:50:23 PM10/4/22
to scylladb-dev@googlegroups.com, Alejo Sanchez
From: Alejo Sanchez <alejo....@scylladb.com>
Committer: Alejo Sanchez <alejo....@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages