From: Israel Fruchter <
fr...@scylladb.com>
Committer: Israel Fruchter <
israel....@gmail.com>
Branch: master
fix(kafka_cluster): add support for auth
since we want to default the integration test env to use
auth by default.
we are adding this support for the kafka_cluster
since we now need to pass in the params into that class
the fixutre for creating it switch to be function scoped
for now it's not much of an issue, since we have only 2 test using
it, but we might reconsider if we'll have more of those.
---
diff --git a/docker/scylla-sct/entry.sh b/docker/scylla-sct/entry.sh
--- a/docker/scylla-sct/entry.sh
+++ b/docker/scylla-sct/entry.sh
@@ -9,8 +9,8 @@ authenticator: 'PasswordAuthenticator'
authenticator_user: cassandra
authenticator_password: cassandra
authorizer: 'CassandraAuthorizer'
-
-enable_tablets: false
EOM
+sed -e '/enable_tablets:.*/s/true/false/g' -i /etc/scylla/scylla.yaml
+
/docker-entrypoint.py $*
diff --git a/docker/scylla-sct/entry_ssl.sh b/docker/scylla-sct/entry_ssl.sh
--- a/docker/scylla-sct/entry_ssl.sh
+++ b/docker/scylla-sct/entry_ssl.sh
@@ -16,7 +16,8 @@ client_encryption_options:
keyfile: /etc/scylla/ssl_conf/client-facing.key
truststore: /etc/scylla/ssl_conf/ca.pem
-enable_tablets: false
EOM
+sed -e '/enable_tablets:.*/s/true/false/g' -i /etc/scylla/scylla.yaml
+
/docker-entrypoint.py $*
diff --git a/sdcm/kafka/kafka_cluster.py b/sdcm/kafka/kafka_cluster.py
--- a/sdcm/kafka/kafka_cluster.py
+++ b/sdcm/kafka/kafka_cluster.py
@@ -29,8 +29,8 @@
class LocalKafkaCluster(cluster.BaseCluster):
- def __init__(self, remoter=LOCALRUNNER):
- super().__init__(cluster_prefix="kafka", add_nodes=False)
+ def __init__(self, params, remoter=LOCALRUNNER):
+ super().__init__(cluster_prefix="kafka", add_nodes=False, params=params)
self.remoter = remoter
self.docker_compose_path = (
Path(get_sct_root_path()) / "kafka-stack-docker-compose"
@@ -94,7 +94,7 @@ def install_connector_from_url(self, connector_url: str):
if connector_url.endswith('.zip'):
self.remoter.run(
f'curl -L -o /tmp/connector.zip {connector_url} && '
- f'unzip /tmp/connector.zip -d {self.docker_compose_path / "connectors"} && rm /tmp/connector.zip'
+ f'unzip -o /tmp/connector.zip -d {self.docker_compose_path / "connectors"} && rm /tmp/connector.zip'
)
if connector_url.startswith("file://"):
connector_local_path = connector_url.replace("file://", "")
@@ -116,18 +116,24 @@ def create_connector(
connector_config: SctKafkaConfiguration,
):
# TODO: extend the number of tasks
- # TODO: handle user/password
# TODO: handle client encryption SSL
connector_data = connector_config.dict(by_alias=True, exclude_none=True)
match connector_config.config.connector_class:
case "io.connect.scylladb.ScyllaDbSinkConnector":
scylla_addresses = [node.cql_address for node in db_cluster.nodes]
connector_data["config"]["scylladb.contact.points"] = ",".join(scylla_addresses)
+ if credentials := self.get_db_auth():
+ connector_data["config"]["scylladb.security.enabled"] = True
+ connector_data["config"]["scylladb.username"] = credentials[0]
+ connector_data["config"]["scylladb.password"] = credentials[1]
case "com.scylladb.cdc.debezium.connector.ScyllaConnector":
scylla_addresses = [f"{node.cql_address}:{node.CQL_PORT}" for node in db_cluster.nodes]
connector_data["config"]["scylla.cluster.ip.addresses"] = ",".join(scylla_addresses)
+ if credentials := self.get_db_auth():
+ connector_data["config"]["scylla.user"] = credentials[0]
+ connector_data["config"]["scylla.password"] = credentials[1]
self.install_connector(connector_config.source)
diff --git a/sdcm/kafka/kafka_config.py b/sdcm/kafka/kafka_config.py
--- a/sdcm/kafka/kafka_config.py
+++ b/sdcm/kafka/kafka_config.py
@@ -22,7 +22,7 @@ class ConnectorConfiguration(BaseModel):
# see
https://github.com/scylladb/kafka-connect-scylladb/blob/master/documentation/CONFIG.md
scylladb_contact_points: Optional[str] = Field(alias="scylladb.contact.points")
scylladb_keyspace: Optional[str] = Field(alias="scylladb.keyspace")
- scylladb_user: Optional[str] = Field(alias="scylladb.user")
+ scylladb_username: Optional[str] = Field(alias="scylladb.username")
scylladb_password: Optional[str] = Field(alias="scylladb.password")
class Config:
diff --git a/sdcm/tester.py b/sdcm/tester.py
--- a/sdcm/tester.py
+++ b/sdcm/tester.py
@@ -1631,7 +1631,7 @@ def get_cluster_k8s_local_kind_cluster(self):
def get_cluster_kafka(self):
if kafka_backend := self.params.get('kafka_backend'):
if kafka_backend == 'localstack':
- self.kafka_cluster = LocalKafkaCluster()
+ self.kafka_cluster = LocalKafkaCluster(params=self.params)
self.kafka_cluster.start()
else:
raise NotImplementedError(f"{kafka_backend=} not implemented")
diff --git a/sdcm/utils/docker_remote.py b/sdcm/utils/docker_remote.py
--- a/sdcm/utils/docker_remote.py
+++ b/sdcm/utils/docker_remote.py
@@ -79,7 +79,8 @@ def sudo_needed(self):
def create_network(self, docker_network):
try:
- ret = self.node.remoter.run(f"docker network create {docker_network}").stdout.strip()
+ ret = self.node.remoter.run(
+ f"docker network create {docker_network} --label 'com.docker.compose.network=default'").stdout.strip()
LOGGER.debug(ret)
except (UnexpectedExit, Libssh2_UnexpectedExit) as ex:
if 'already exists' in str(ex):
diff --git a/unit_tests/test_cluster.py b/unit_tests/test_cluster.py
--- a/unit_tests/test_cluster.py
+++ b/unit_tests/test_cluster.py
@@ -763,7 +763,6 @@ def test_get_any_ks_cf_list(docker_scylla, params, events): # pylint: disable=u
'system_traces.sessions', 'system_traces.sessions_time_idx',
'system.role_attributes', 'system.role_members', 'system.role_permissions',
'system.roles', 'system.service_levels_v2',
- 'system.tablets',
'system.topology', 'system.topology_requests',
'system.cdc_generations_v3',
'"123_keyspace"."120users"', '"123_keyspace".users'}
diff --git a/unit_tests/test_kafka.py b/unit_tests/test_kafka.py
--- a/unit_tests/test_kafka.py
+++ b/unit_tests/test_kafka.py
@@ -27,10 +27,10 @@
LOGGER = logging.getLogger(__name__)
-...@pytest.fixture(name="kafka_cluster", scope="session")
-def fixture_kafka_cluster(tmp_path_factory):
+...@pytest.fixture(name="kafka_cluster", scope="function")
+def fixture_kafka_cluster(tmp_path_factory, params):
os.environ["_SCT_TEST_LOGDIR"] = str(tmp_path_factory.mktemp("logs"))
- kafka = LocalKafkaCluster()
+ kafka = LocalKafkaCluster(params=params)
kafka.start()
@@ -68,10 +68,10 @@ def test_01_kafka_cdc_source_connector(request, docker_scylla, kafka_cluster, pa
db_cluster=docker_scylla.parent_cluster, connector_config=connector_config
)
- loader_set = LocalLoaderSetDummy()
+ loader_set = LocalLoaderSetDummy(params=params)
cmd = (
- """cassandra-stress write cl=ONE n=500 -rate threads=10 """
+ """cassandra-stress write cl=ONE n=500 -mode cql3 native -rate threads=10 """
)
cs_thread = CassandraStressThread(