[QUEUED scylladb next] test.py: topology pass ManagerClient instead of...

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 4, 2022, 5:51:26 AM10/4/22
to scylladb-dev@googlegroups.com, Alejo Sanchez
From: Alejo Sanchez <alejo....@scylladb.com>
Committer: Alejo Sanchez <alejo....@scylladb.com>
Branch: next

test.py: topology pass ManagerClient instead of...

cql connection

When there are topology changes, the driver needs to be updated. Instead
of passing the CassandraCluster.Connection, pass the ManagerClient
instance which manages the driver connection inside of it.

Remove workaround for test_raft_upgrade.

Signed-off-by: Alejo Sanchez <alejo....@scylladb.com>

---
diff --git a/test/pylib/random_tables.py b/test/pylib/random_tables.py
--- a/test/pylib/random_tables.py
+++ b/test/pylib/random_tables.py
@@ -38,6 +38,7 @@
from typing import Optional, Type, List, Set, Union, TYPE_CHECKING
if TYPE_CHECKING:
from cassandra.cluster import Session as CassandraSession # type: ignore
+ from test.pylib.manager_client import ManagerClient


logger = logging.getLogger('random_tables')
@@ -115,13 +116,13 @@ class RandomTable():
# Sequential unique id
newid = itertools.count(start=1).__next__

- def __init__(self, cql: CassandraSession, keyspace: str, ncolumns: Optional[int]=None,
+ def __init__(self, manager: ManagerClient, keyspace: str, ncolumns: Optional[int]=None,
columns: Optional[List[Column]]=None, pks: int=2, name: str=None):
"""Set up a new table definition from column definitions.
If column definitions not specified pick a random number of columns with random types.
By default there will be 4 columns with first column as Primary Key"""
self.id: int = RandomTable.newid()
- self.cql: CassandraSession = cql
+ self.manager: ManagerClient = manager
self.keyspace: str = keyspace
self.name: str = name if name is not None else f"t_{self.id:02}"
self.full_name: str = keyspace + "." + self.name
@@ -160,13 +161,15 @@ async def create(self) -> asyncio.Future:
pk_names = ", ".join(c.name for c in self.columns[:self.pks])
cql_stmt = f"CREATE TABLE {self.full_name} ({col_defs}, , primary key({pk_names}))"
logger.debug(cql_stmt)
- return await self.cql.run_async(cql_stmt)
+ assert self.manager.cql is not None
+ return await self.manager.cql.run_async(cql_stmt)

async def drop(self) -> asyncio.Future:
"""Drop this table"""
cql_stmt = f"DROP TABLE {self.full_name}"
logger.debug(cql_stmt)
- return await self.cql.run_async(cql_stmt)
+ assert self.manager.cql is not None
+ return await self.manager.cql.run_async(cql_stmt)

async def add_column(self, name: str = None, ctype: Type[ValueType] = None, column: Column = None):
if column is not None:
@@ -176,7 +179,9 @@ async def add_column(self, name: str = None, ctype: Type[ValueType] = None, colu
ctype = ctype if ctype is not None else TextType
column = Column(name, ctype=ctype)
self.columns.append(column)
- await self.cql.run_async(f"ALTER TABLE {self.full_name} ADD {column.name} {column.ctype.name}")
+ assert self.manager.cql is not None
+ await self.manager.cql.run_async(f"ALTER TABLE {self.full_name} "
+ f"ADD {column.name} {column.ctype.name}")

async def drop_column(self, column: Union[Column, str] = None):
if column is None:
@@ -196,14 +201,16 @@ async def drop_column(self, column: Union[Column, str] = None):
assert len(self.columns) - 1 > self.pks, f"Cannot remove last value column {col.name} from {self.name}"
self.columns.remove(col)
self.removed_columns.append(col)
- await self.cql.run_async(f"ALTER TABLE {self.full_name} DROP {col.name}")
+ assert self.manager.cql is not None
+ await self.manager.cql.run_async(f"ALTER TABLE {self.full_name} DROP {col.name}")

async def insert_seq(self) -> asyncio.Future:
"""Insert a row of next sequential values"""
seed = self.next_seq()
- return await self.cql.run_async(f"INSERT INTO {self.full_name} ({self.all_col_names}) " +
- f"VALUES ({', '.join(['%s'] * len(self.columns)) })",
- parameters=[c.val(seed) for c in self.columns])
+ assert self.manager.cql is not None
+ return await self.manager.cql.run_async(f"INSERT INTO {self.full_name} ({self.all_col_names})"
+ f"VALUES ({', '.join(['%s'] * len(self.columns)) })",
+ parameters=[c.val(seed) for c in self.columns])

async def add_index(self, column: Union[Column, str], name: str = None) -> str:
if isinstance(column, int):
@@ -219,13 +226,15 @@ async def add_index(self, column: Union[Column, str], name: str = None) -> str:
raise TypeError(f"Wrong column type {type(column)} given to add_column")

name = name if name is not None else f"{self.name}_{col_name}_{self.next_idx_id():02}"
- await self.cql.run_async(f"CREATE INDEX {name} on {self.full_name} ({col_name})")
+ assert self.manager.cql is not None
+ await self.manager.cql.run_async(f"CREATE INDEX {name} on {self.full_name} ({col_name})")
self.indexes.add(name)
return name

async def drop_index(self, name: str) -> None:
self.indexes.remove(name)
- await self.cql.run_async(f"DROP INDEX {self.keyspace}.{name}")
+ assert self.manager.cql is not None
+ await self.manager.cql.run_async(f"DROP INDEX {self.keyspace}.{name}")
self.removed_indexes.add(name)

def __str__(self):
@@ -234,30 +243,28 @@ def __str__(self):

class RandomTables():
"""A list of managed random tables"""
- def __init__(self, test_name: str, cql: CassandraSession, keyspace: str):
+ def __init__(self, test_name: str, manager: ManagerClient, keyspace: str):
self.test_name = test_name
- self.cql = cql
+ self.manager = manager
self.keyspace = keyspace
self.tables: List[RandomTable] = []
self.removed_tables: List[RandomTable] = []
- cql.execute(f"CREATE KEYSPACE {keyspace} WITH REPLICATION = "
- "{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")
-
- def set_cql(self, cql: CassandraSession) -> None:
- self.cql = cql
+ assert self.manager.cql is not None
+ self.manager.cql.execute(f"CREATE KEYSPACE {keyspace} WITH REPLICATION = "
+ "{ 'class' : 'NetworkTopologyStrategy', 'replication_factor' : 1 }")

async def add_tables(self, ntables: int = 1, ncolumns: int = 5) -> None:
"""Add random tables to the list.
ntables specifies how many tables.
ncolumns specifies how many random columns per table."""
- tables = [RandomTable(self.cql, self.keyspace, ncolumns) for _ in range(ntables)]
+ tables = [RandomTable(self.manager, self.keyspace, ncolumns) for _ in range(ntables)]
await asyncio.gather(*(t.create() for t in tables))
self.tables.extend(tables)

async def add_table(self, ncolumns: int = None, columns: List[Column] = None,
pks: int = 2, name: str = None) -> RandomTable:
"""Add a random table. See random_tables.RandomTable()"""
- table = RandomTable(self.cql, self.keyspace, ncolumns=ncolumns, columns=columns,
+ table = RandomTable(self.manager, self.keyspace, ncolumns=ncolumns, columns=columns,
pks=pks, name=name)
await table.create()
self.tables.append(table)
@@ -285,7 +292,8 @@ async def drop_table(self, table: Union[str, RandomTable]) -> RandomTable:

def drop_all(self) -> None:
"""Drop keyspace (and tables)"""
- self.cql.execute(f"DROP KEYSPACE {self.keyspace}")
+ assert self.manager.cql is not None
+ self.manager.cql.execute(f"DROP KEYSPACE {self.keyspace}")

async def verify_schema(self, table: Union[RandomTable, str] = None) -> None:
"""Verify schema of all active managed random tables"""
@@ -305,7 +313,8 @@ async def verify_schema(self, table: Union[RandomTable, str] = None) -> None:
f"WHERE keyspace_name = '{self.keyspace}'"

logger.debug(cql_stmt1)
- res1 = {row.table_name for row in await self.cql.run_async(cql_stmt1)}
+ assert self.manager.cql is not None
+ res1 = {row.table_name for row in await self.manager.cql.run_async(cql_stmt1)}
assert not tables - res1, f"Tables {tables - res1} not present"

for table_name in tables:
@@ -315,7 +324,8 @@ async def verify_schema(self, table: Union[RandomTable, str] = None) -> None:
cql_stmt2 = f"SELECT column_name, position, kind, type FROM system_schema.columns " \
f"WHERE keyspace_name = '{self.keyspace}' AND table_name = '{table_name}'"
logger.debug(cql_stmt2)
- res2 = {row.column_name: row for row in await self.cql.run_async(cql_stmt2)}
+ assert self.manager.cql is not None
+ res2 = {row.column_name: row for row in await self.manager.cql.run_async(cql_stmt2)}
assert res2.keys() == cols.keys(), f"Column names for {table_name} do not match " \
f"expected ({', '.join(cols.keys())}) " \
f"got ({', '.join(res2.keys())})"
diff --git a/test/topology/conftest.py b/test/topology/conftest.py
--- a/test/topology/conftest.py
+++ b/test/topology/conftest.py
@@ -176,13 +176,15 @@ def cql(manager):
# These tests should use the "fails_without_raft" fixture. When Raft mode
# becomes the default, this fixture can be removed.
@pytest.fixture(scope="function")
-def check_pre_raft(cql):
+def check_pre_raft(manager):
# If not running on Scylla, return false.
- names = [row.table_name for row in cql.execute("SELECT * FROM system_schema.tables WHERE keyspace_name = 'system'")]
+ names = [row.table_name for row in manager.cql.execute(
+ "SELECT * FROM system_schema.tables WHERE keyspace_name = 'system'")]
if not any('scylla' in name for name in names):
return False
# In Scylla, we check Raft mode by inspecting the configuration via CQL.
- experimental_features = list(cql.execute("SELECT value FROM system.config WHERE name = 'experimental_features'"))[0].value
+ experimental_features = list(manager.cql.execute(
+ "SELECT value FROM system.config WHERE name = 'experimental_features'"))[0].value
return not '"raft"' in experimental_features


@@ -195,7 +197,7 @@ def fails_without_raft(request, check_pre_raft):
# "random_tables" fixture: Creates and returns a temporary RandomTables object
# used in tests to make schema changes. Tables are dropped after finished.
@pytest.fixture(scope="function")
-def random_tables(request, cql):
- tables = RandomTables(request.node.name, cql, unique_name())
+def random_tables(request, manager):
+ tables = RandomTables(request.node.name, manager, unique_name())
yield tables
tables.drop_all()
diff --git a/test/topology/test_random_tables.py b/test/topology/test_random_tables.py
--- a/test/topology/test_random_tables.py
+++ b/test/topology/test_random_tables.py
@@ -9,7 +9,9 @@

# Simple test of schema helper
@pytest.mark.asyncio
-async def test_new_table(cql, random_tables):
+async def test_new_table(manager, random_tables):
+ cql = manager.cql
+ assert cql is not None
table = await random_tables.add_table(ncolumns=5)
await cql.run_async(f"INSERT INTO {table} ({','.join(c.name for c in table.columns)})" \
f"VALUES ({', '.join(['%s'] * len(table.columns))})",
@@ -29,8 +31,10 @@ async def test_new_table(cql, random_tables):

# Simple test of schema helper with alter
@pytest.mark.asyncio
-async def test_alter_verify_schema(cql, random_tables):
+async def test_alter_verify_schema(manager, random_tables):
"""Verify table schema"""
+ cql = manager.cql
+ assert cql is not None
await random_tables.add_tables(ntables=4, ncolumns=5)
await random_tables.verify_schema()
# Manually remove a column
@@ -41,7 +45,9 @@ async def test_alter_verify_schema(cql, random_tables):


@pytest.mark.asyncio
-async def test_new_table_insert_one(cql, random_tables):
+async def test_new_table_insert_one(manager, random_tables):
+ cql = manager.cql
+ assert cql is not None
table = await random_tables.add_table(ncolumns=5)
await table.insert_seq()
pk_col = table.columns[0]
@@ -54,8 +60,10 @@ async def test_new_table_insert_one(cql, random_tables):


@pytest.mark.asyncio
-async def test_drop_column(cql, random_tables):
+async def test_drop_column(manager, random_tables):
"""Drop a random column from a table"""
+ cql = manager.cql
+ assert cql is not None
table = await random_tables.add_table(ncolumns=5)
await table.insert_seq()
await table.drop_column()
@@ -70,7 +78,7 @@ async def test_drop_column(cql, random_tables):


@pytest.mark.asyncio
-async def test_add_index(cql, random_tables):
+async def test_add_index(random_tables):
"""Add and drop an index"""
table = await random_tables.add_table(ncolumns=5)
with pytest.raises(AssertionError, match='partition key'):
diff --git a/test/topology_raft_disabled/test_raft_upgrade.py b/test/topology_raft_disabled/test_raft_upgrade.py
--- a/test/topology_raft_disabled/test_raft_upgrade.py
+++ b/test/topology_raft_disabled/test_raft_upgrade.py
@@ -50,8 +50,6 @@ async def restart(srv):
manager.driver_close()
await manager.driver_connect()
cql = manager.cql
- assert(cql)
- random_tables.set_cql(cql)

deadline = time.time() + 300
# Using `servers` doesn't work for the `host` parameter in `cql.run_async` (we need objects of type `Host`).

Commit Bot

<bot@cloudius-systems.com>
unread,
Oct 4, 2022, 2:50:21 PM10/4/22
to scylladb-dev@googlegroups.com, Alejo Sanchez
From: Alejo Sanchez <alejo....@scylladb.com>
Committer: Alejo Sanchez <alejo....@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages