[COMMIT scylla-cluster-tests master] fix(kafka_cluster): add support for auth

1 view
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Jul 2, 2024, 2:02:25 AMJul 2
to scylladb-dev@googlegroups.com, Israel Fruchter
From: Israel Fruchter <fr...@scylladb.com>
Committer: Israel Fruchter <israel....@gmail.com>
Branch: master

fix(kafka_cluster): add support for auth

since we want to default the integration test env to use
auth by default.

we are adding this support for the kafka_cluster
since we now need to pass in the params into that class
the fixutre for creating it switch to be function scoped
for now it's not much of an issue, since we have only 2 test using
it, but we might reconsider if we'll have more of those.

---
diff --git a/docker/scylla-sct/entry.sh b/docker/scylla-sct/entry.sh
--- a/docker/scylla-sct/entry.sh
+++ b/docker/scylla-sct/entry.sh
@@ -9,8 +9,8 @@ authenticator: 'PasswordAuthenticator'
authenticator_user: cassandra
authenticator_password: cassandra
authorizer: 'CassandraAuthorizer'
-
-enable_tablets: false
EOM

+sed -e '/enable_tablets:.*/s/true/false/g' -i /etc/scylla/scylla.yaml
+
/docker-entrypoint.py $*
diff --git a/docker/scylla-sct/entry_ssl.sh b/docker/scylla-sct/entry_ssl.sh
--- a/docker/scylla-sct/entry_ssl.sh
+++ b/docker/scylla-sct/entry_ssl.sh
@@ -16,7 +16,8 @@ client_encryption_options:
keyfile: /etc/scylla/ssl_conf/client-facing.key
truststore: /etc/scylla/ssl_conf/ca.pem

-enable_tablets: false
EOM

+sed -e '/enable_tablets:.*/s/true/false/g' -i /etc/scylla/scylla.yaml
+
/docker-entrypoint.py $*
diff --git a/sdcm/kafka/kafka_cluster.py b/sdcm/kafka/kafka_cluster.py
--- a/sdcm/kafka/kafka_cluster.py
+++ b/sdcm/kafka/kafka_cluster.py
@@ -29,8 +29,8 @@


class LocalKafkaCluster(cluster.BaseCluster):
- def __init__(self, remoter=LOCALRUNNER):
- super().__init__(cluster_prefix="kafka", add_nodes=False)
+ def __init__(self, params, remoter=LOCALRUNNER):
+ super().__init__(cluster_prefix="kafka", add_nodes=False, params=params)
self.remoter = remoter
self.docker_compose_path = (
Path(get_sct_root_path()) / "kafka-stack-docker-compose"
@@ -94,7 +94,7 @@ def install_connector_from_url(self, connector_url: str):
if connector_url.endswith('.zip'):
self.remoter.run(
f'curl -L -o /tmp/connector.zip {connector_url} && '
- f'unzip /tmp/connector.zip -d {self.docker_compose_path / "connectors"} && rm /tmp/connector.zip'
+ f'unzip -o /tmp/connector.zip -d {self.docker_compose_path / "connectors"} && rm /tmp/connector.zip'
)
if connector_url.startswith("file://"):
connector_local_path = connector_url.replace("file://", "")
@@ -116,18 +116,24 @@ def create_connector(
connector_config: SctKafkaConfiguration,
):
# TODO: extend the number of tasks
- # TODO: handle user/password
# TODO: handle client encryption SSL

connector_data = connector_config.dict(by_alias=True, exclude_none=True)
match connector_config.config.connector_class:
case "io.connect.scylladb.ScyllaDbSinkConnector":
scylla_addresses = [node.cql_address for node in db_cluster.nodes]
connector_data["config"]["scylladb.contact.points"] = ",".join(scylla_addresses)
+ if credentials := self.get_db_auth():
+ connector_data["config"]["scylladb.security.enabled"] = True
+ connector_data["config"]["scylladb.username"] = credentials[0]
+ connector_data["config"]["scylladb.password"] = credentials[1]

case "com.scylladb.cdc.debezium.connector.ScyllaConnector":
scylla_addresses = [f"{node.cql_address}:{node.CQL_PORT}" for node in db_cluster.nodes]
connector_data["config"]["scylla.cluster.ip.addresses"] = ",".join(scylla_addresses)
+ if credentials := self.get_db_auth():
+ connector_data["config"]["scylla.user"] = credentials[0]
+ connector_data["config"]["scylla.password"] = credentials[1]

self.install_connector(connector_config.source)

diff --git a/sdcm/kafka/kafka_config.py b/sdcm/kafka/kafka_config.py
--- a/sdcm/kafka/kafka_config.py
+++ b/sdcm/kafka/kafka_config.py
@@ -22,7 +22,7 @@ class ConnectorConfiguration(BaseModel):
# see https://github.com/scylladb/kafka-connect-scylladb/blob/master/documentation/CONFIG.md
scylladb_contact_points: Optional[str] = Field(alias="scylladb.contact.points")
scylladb_keyspace: Optional[str] = Field(alias="scylladb.keyspace")
- scylladb_user: Optional[str] = Field(alias="scylladb.user")
+ scylladb_username: Optional[str] = Field(alias="scylladb.username")
scylladb_password: Optional[str] = Field(alias="scylladb.password")

class Config:
diff --git a/sdcm/tester.py b/sdcm/tester.py
--- a/sdcm/tester.py
+++ b/sdcm/tester.py
@@ -1631,7 +1631,7 @@ def get_cluster_k8s_local_kind_cluster(self):
def get_cluster_kafka(self):
if kafka_backend := self.params.get('kafka_backend'):
if kafka_backend == 'localstack':
- self.kafka_cluster = LocalKafkaCluster()
+ self.kafka_cluster = LocalKafkaCluster(params=self.params)
self.kafka_cluster.start()
else:
raise NotImplementedError(f"{kafka_backend=} not implemented")
diff --git a/sdcm/utils/docker_remote.py b/sdcm/utils/docker_remote.py
--- a/sdcm/utils/docker_remote.py
+++ b/sdcm/utils/docker_remote.py
@@ -79,7 +79,8 @@ def sudo_needed(self):

def create_network(self, docker_network):
try:
- ret = self.node.remoter.run(f"docker network create {docker_network}").stdout.strip()
+ ret = self.node.remoter.run(
+ f"docker network create {docker_network} --label 'com.docker.compose.network=default'").stdout.strip()
LOGGER.debug(ret)
except (UnexpectedExit, Libssh2_UnexpectedExit) as ex:
if 'already exists' in str(ex):
diff --git a/unit_tests/test_cluster.py b/unit_tests/test_cluster.py
--- a/unit_tests/test_cluster.py
+++ b/unit_tests/test_cluster.py
@@ -763,7 +763,6 @@ def test_get_any_ks_cf_list(docker_scylla, params, events): # pylint: disable=u
'system_traces.sessions', 'system_traces.sessions_time_idx',
'system.role_attributes', 'system.role_members', 'system.role_permissions',
'system.roles', 'system.service_levels_v2',
- 'system.tablets',
'system.topology', 'system.topology_requests',
'system.cdc_generations_v3',
'"123_keyspace"."120users"', '"123_keyspace".users'}
diff --git a/unit_tests/test_kafka.py b/unit_tests/test_kafka.py
--- a/unit_tests/test_kafka.py
+++ b/unit_tests/test_kafka.py
@@ -27,10 +27,10 @@
LOGGER = logging.getLogger(__name__)


-...@pytest.fixture(name="kafka_cluster", scope="session")
-def fixture_kafka_cluster(tmp_path_factory):
+...@pytest.fixture(name="kafka_cluster", scope="function")
+def fixture_kafka_cluster(tmp_path_factory, params):
os.environ["_SCT_TEST_LOGDIR"] = str(tmp_path_factory.mktemp("logs"))
- kafka = LocalKafkaCluster()
+ kafka = LocalKafkaCluster(params=params)

kafka.start()

@@ -68,10 +68,10 @@ def test_01_kafka_cdc_source_connector(request, docker_scylla, kafka_cluster, pa
db_cluster=docker_scylla.parent_cluster, connector_config=connector_config
)

- loader_set = LocalLoaderSetDummy()
+ loader_set = LocalLoaderSetDummy(params=params)

cmd = (
- """cassandra-stress write cl=ONE n=500 -rate threads=10 """
+ """cassandra-stress write cl=ONE n=500 -mode cql3 native -rate threads=10 """
)

cs_thread = CassandraStressThread(
Reply all
Reply to author
Forward
0 new messages