[COMMIT scylla-cluster-tests master] feature(k8s-eks): Add EKS backend

159 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Mar 11, 2021, 4:52:02 AM3/11/21
to scylladb-dev@googlegroups.com, Dmitry Kropachev
From: Dmitry Kropachev <dmitry.k...@gmail.com>
Committer: Bentsi <ben...@scylladb.com>
Branch: master

feature(k8s-eks): Add EKS backend

---
diff --git a/defaults/k8s_eks_config.yaml b/defaults/k8s_eks_config.yaml
--- a/defaults/k8s_eks_config.yaml
+++ b/defaults/k8s_eks_config.yaml
@@ -0,0 +1,48 @@
+# In order to have EKS Cluster created you need to provide subnets from two different availability zone
+# otherwise it will fail due to the validation.
+# At the same time Scylla Operator can't work properly when nodes within one rack are deployed into
+# different availability zones.
+# To make it work when node groups are created we pick subnets only from first availability zone
+# As result you need to specify two AZ to make it work, but scylla will occupy resources only from first one
+
+availability_zone: 'a,b'
+instance_provision: 'on_demand'
+instance_type_db: 'i3.4xlarge'
+
+eks_cluster_version: '1.18'
+eks_role_arn: 'arn:aws:iam::797456418907:role/eksServicePolicy'
+eks_service_ipv4_cidr: '172.20.0.0/16'
+eks_vpc_cni_version: 'v1.7.5-eksbuild.1'
+eks_nodegroup_role_arn: 'arn:aws:iam::797456418907:role/helm-test-worker-nodes-NodeInstanceRole-6ACHDYEKNN3I'
+
+scylla_version: '4.2.1'
+scylla_mgmt_agent_version: '2.2.0'
+k8s_cert_manager_version: '1.0.3'
+
+# Currently k8s monitoring does not work properly
+# https://trello.com/c/8Hwc0nUB/3061-eks-k8s-monitoring-stack-health-monitoring-fails
+# enable it when issue is fixed
+k8s_deploy_monitoring: false
+
+k8s_loader_cluster_name: 'sct-loaders'
+
+k8s_scylla_cluster_name: 'sct-cluster'
+k8s_scylla_datacenter: 'us-east1-b'
+k8s_scylla_disk_gi: 500
+k8s_scylla_disk_class: 'local-raid-disks'
+
+# NOTE: If 'k8s_scylla_operator_docker_image' not set then the one from helm chart will be used.
+# To test nightly builds define it like this: 'scylladb/scylla-operator:nightly'
+k8s_scylla_operator_docker_image: ''
+k8s_scylla_operator_helm_repo: 'https://storage.googleapis.com/scylla-operator-charts/latest'
+k8s_scylla_operator_chart_version: 'latest'
+
+k8s_scylla_rack: 'us-east1'
+
+n_monitor_nodes: 1
+n_loaders: 1
+n_db_nodes: 3
+
+append_scylla_args: ''
+docker_image: ''
+mgmt_docker_image: 'scylladb/scylla-manager:2.2.1'
diff --git a/defaults/k8s_gce_minikube_config.yaml b/defaults/k8s_gce_minikube_config.yaml
--- a/defaults/k8s_gce_minikube_config.yaml
+++ b/defaults/k8s_gce_minikube_config.yaml
@@ -27,9 +27,9 @@ k8s_deploy_monitoring: true
k8s_scylla_datacenter: 'gce'
k8s_scylla_rack: 'minikube'
k8s_scylla_cluster_name: 'sct-cluster'
-k8s_scylla_cpu_n: 1
-k8s_scylla_mem_gi: 2
k8s_scylla_disk_gi: 5
+k8s_scylla_disk_class: ''
+

gce_image: 'https://www.googleapis.com/compute/v1/projects/centos-cloud/global/images/family/centos-7'
gce_instance_type_loader: 'e2-standard-2'
diff --git a/defaults/k8s_gke_config.yaml b/defaults/k8s_gke_config.yaml
--- a/defaults/k8s_gke_config.yaml
+++ b/defaults/k8s_gke_config.yaml
@@ -4,7 +4,6 @@ gce_network: 'qa-vpc'
gce_image_username: 'scylla-test'

gke_cluster_version: '1.17.15-gke.800'
-gke_cluster_n_nodes: 3

gce_instance_type_db: 'n1-standard-8'
gce_root_disk_type_db: 'pd-ssd'
@@ -29,13 +28,10 @@ k8s_deploy_monitoring: true
k8s_scylla_datacenter: 'us-east1-b'
k8s_scylla_rack: 'us-east1'
k8s_scylla_cluster_name: 'sct-cluster'
-k8s_scylla_cpu_n: 6
-k8s_scylla_mem_gi: 16
k8s_scylla_disk_gi: 500
+k8s_scylla_disk_class: 'local-raid-disks'

k8s_loader_cluster_name: 'sct-loaders'
-k8s_loader_cpu_n: 3
-k8s_loader_mem_gi: 12
gce_instance_type_loader: 'e2-standard-4'

n_loaders: 1
diff --git a/docker/env/version b/docker/env/version
--- a/docker/env/version
+++ b/docker/env/version
@@ -1 +1 @@
-0.88
+0.88-pre-eks
diff --git a/docs/configuration_options.md b/docs/configuration_options.md
--- a/docs/configuration_options.md
+++ b/docs/configuration_options.md
@@ -153,20 +153,15 @@
| **<a href="#user-content-gce_root_disk_type_minikube" name="gce_root_disk_type_minikube">gce_root_disk_type_minikube</a>** | | N/A | SCT_GCE_ROOT_DISK_TYPE_MINIKUBE
| **<a href="#user-content-gce_root_disk_size_minikube" name="gce_root_disk_size_minikube">gce_root_disk_size_minikube</a>** | | N/A | SCT_GCE_ROOT_DISK_SIZE_MINIKUBE
| **<a href="#user-content-gke_cluster_version" name="gke_cluster_version">gke_cluster_version</a>** | | N/A | SCT_GKE_CLUSTER_VERSION
-| **<a href="#user-content-gke_cluster_n_nodes" name="gke_cluster_n_nodes">gke_cluster_n_nodes</a>** | | N/A | SCT_GKE_CLUSTER_N_NODES
| **<a href="#user-content-k8s_deploy_monitoring" name="k8s_deploy_monitoring">k8s_deploy_monitoring</a>** | | N/A | SCT_K8S_DEPLOY_MONITORING
| **<a href="#user-content-k8s_scylla_operator_docker_image" name="k8s_scylla_operator_docker_image">k8s_scylla_operator_docker_image</a>** | | N/A | SCT_K8S_SCYLLA_OPERATOR_DOCKER_IMAGE
| **<a href="#user-content-k8s_scylla_operator_helm_repo" name="k8s_scylla_operator_helm_repo">k8s_scylla_operator_helm_repo</a>** | | N/A | SCT_K8S_SCYLLA_OPERATOR_HELM_REPO
| **<a href="#user-content-k8s_scylla_operator_chart_version" name="k8s_scylla_operator_chart_version">k8s_scylla_operator_chart_version</a>** | | N/A | SCT_K8S_SCYLLA_OPERATOR_CHART_VERSION
| **<a href="#user-content-k8s_scylla_datacenter" name="k8s_scylla_datacenter">k8s_scylla_datacenter</a>** | | N/A | SCT_K8S_SCYLLA_DATACENTER
| **<a href="#user-content-k8s_scylla_rack" name="k8s_scylla_rack">k8s_scylla_rack</a>** | | N/A | SCT_K8S_SCYLLA_RACK
| **<a href="#user-content-k8s_scylla_cluster_name" name="k8s_scylla_cluster_name">k8s_scylla_cluster_name</a>** | | N/A | SCT_K8S_SCYLLA_CLUSTER_NAME
-| **<a href="#user-content-k8s_scylla_cpu_n" name="k8s_scylla_cpu_n">k8s_scylla_cpu_n</a>** | | N/A | SCT_K8S_SCYLLA_CPU_N
-| **<a href="#user-content-k8s_scylla_mem_gi" name="k8s_scylla_mem_gi">k8s_scylla_mem_gi</a>** | | N/A | SCT_K8S_SCYLLA_MEM_GI
| **<a href="#user-content-k8s_scylla_disk_gi" name="k8s_scylla_disk_gi">k8s_scylla_disk_gi</a>** | | N/A | SCT_K8S_SCYLLA_DISK_GI
| **<a href="#user-content-k8s_loader_cluster_name" name="k8s_loader_cluster_name">k8s_loader_cluster_name</a>** | | N/A | SCT_K8S_LOADER_CLUSTER_NAME
-| **<a href="#user-content-k8s_loader_cpu_n" name="k8s_loader_cpu_n">k8s_loader_cpu_n</a>** | | N/A | SCT_K8S_LOADER_CPU_N
-| **<a href="#user-content-k8s_loader_mem_gi" name="k8s_loader_mem_gi">k8s_loader_mem_gi</a>** | | N/A | SCT_K8S_LOADER_MEM_GI
| **<a href="#user-content-minikube_version" name="minikube_version">minikube_version</a>** | | N/A | SCT_MINIKUBE_VERSION
| **<a href="#user-content-k8s_cert_manager_version" name="k8s_cert_manager_version">k8s_cert_manager_version</a>** | | N/A | SCT_K8S_CERT_MANAGER_VERSION
| **<a href="#user-content-mgmt_docker_image" name="mgmt_docker_image">mgmt_docker_image</a>** | Scylla manager docker image, i.e. 'scylladb/scylla-manager:2.2.1' | N/A | SCT_MGMT_DOCKER_IMAGE
diff --git a/requirements.in b/requirements.in
--- a/requirements.in
+++ b/requirements.in
@@ -2,13 +2,13 @@ PyYAML==5.3.1
paramiko==2.7.2
fabric==2.5.0
invoke==1.3.1
-boto3==1.15.16
-boto3-stubs[s3,ec2,dynamodb,pricing]==1.15.16.0
-awscli==1.18.157
+boto3==1.17.3
+boto3-stubs[s3,ec2,dynamodb,pricing]==1.17.3.0
+awscli==1.19.3
aexpect==1.2.0
cassandra-driver==3.22.0
apache-libcloud==2.6.0
-requests==2.20.0
+requests==2.25.1
elasticsearch>=7.0.0,<8.0.0
sortedcontainers==1.5.9
jinja2==2.10.1
diff --git a/requirements.txt b/requirements.txt
--- a/requirements.txt
+++ b/requirements.txt
@@ -2,18 +2,18 @@
# This file is autogenerated by pip-compile
# To update, run:
#
-# pip-compile --allow-unsafe --generate-hashes /home/fruch/Projects/scylla-cluster-tests/requirements.in
+# pip-compile --allow-unsafe --generate-hashes /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
#
aexpect==1.2.0 \
--hash=sha256:69a45a4e8498f7041b9e0e203edf392dc7ba2674f0f72b2221c574e4694a34d8 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
anyconfig==0.9.8 \
--hash=sha256:9de54c3252240b3c562e25ef070e883c266b7daad8c8bc4d5d604855edc7d739 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
apache-libcloud==2.6.0 \
--hash=sha256:201751f738109f25d58dcdfb5804e17216e0dc8f68b522e9e26ac16e0b9ff2ea \
--hash=sha256:40215db1bd489d17dc1abfdb289d7f035313c7297b6a7462c79d8287cbbeae91 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
appdirs==1.4.4 \
--hash=sha256:7d5d0167b2b1ba821647616af46a749d1c653740dd0d2415100fe26e27afdf41 \
--hash=sha256:a841dacd6b99318a741b166adb07e19ee71a274450e68237b4650ca1055ab128 \
@@ -22,10 +22,10 @@ aspy.yaml==1.3.0 \
--hash=sha256:463372c043f70160a9ec950c3f1e4c3a82db5fca01d334b6bc89c7164d744bdc \
--hash=sha256:e7c742382eff2caed61f87a39d13f99109088e5e93f04d76eb8d4b28aa143f45 \
# via pre-commit
-astroid==2.4.2 \
- --hash=sha256:2f4078c2a41bf377eea06d71c9d2ba4eb8f6b1af2135bec27bbbb7d8f12bb703 \
- --hash=sha256:bc58d83eb610252fd8de6363e39d4f1d0619c894b0ed24603b881c02e64c7386 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in, pylint
+astroid==2.5 \
+ --hash=sha256:87ae7f2398b8a0ae5638ddecf9987f081b756e0e9fc071aeebdca525671fc4dc \
+ --hash=sha256:b31c92f545517dcc452f284bc9c044050862fbe6d93d2b3de4a215a6b384bf0d \
+ # via pylint
atomicwrites==1.4.0 \
--hash=sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197 \
--hash=sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a \
@@ -36,11 +36,11 @@ attrs==20.3.0 \
# via pytest
autopep8==1.5.3 \
--hash=sha256:60fd8c4341bab59963dafd5d2a566e94f547e660b9b396f772afe67d8481dbf0 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
-awscli==1.18.157 \
- --hash=sha256:59bb2bda1f85eafa380d39a06cdc7ecfa0ff37d0b5c2181ebe06cdb14ff58dc1 \
- --hash=sha256:bb52a78120b7bc7c7301bdf2734da882c9ff9ccef24d33bee5a63893c0b7cf5b \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
+awscli==1.19.3 \
+ --hash=sha256:062030352fe47b6d1f91df61d26183c4077c69cd5b9be9801273a39ab2d4f415 \
+ --hash=sha256:3f34cbaffe382cc66e6841b477b1836b762174cd9ea58581dce59323ff47b5c4 \
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
bcrypt==3.2.0 \
--hash=sha256:5b93c1726e50a93a033c36e5ca7fdcd29a5c7395af50a6892f5d9e7c6cfbfb29 \
--hash=sha256:63d4e3ff96188e5898779b6057878fecf3f11cfe6ec3b313ea09955d587ec7a7 \
@@ -50,17 +50,17 @@ bcrypt==3.2.0 \
--hash=sha256:cd1ea2ff3038509ea95f687256c46b79f5fc382ad0aa3664d200047546d511d1 \
--hash=sha256:cdcdcb3972027f83fe24a48b1e90ea4b584d35f1cc279d76de6fc4b13376239d \
# via paramiko
-boto3-stubs[dynamodb,ec2,pricing,s3]==1.15.16.0 \
- --hash=sha256:422b647a494cd0055b00a3952db937c28542d7eed6092da3d9dc9aa8327443e3 \
- --hash=sha256:c0f7cb5638e7f434b37668b403bbd984aaf35b96cb4b564c53afc99c30cf93f6 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
-boto3==1.15.16 \
- --hash=sha256:454a8dfb7b367a058c7967ef6b4e2a192c318f10761769fd1003cf7f2f5a7db9 \
- --hash=sha256:557320fe8b65cfc85953e6a63d2328e8efec95bf4ec383b92fa2d01119209716 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
-botocore==1.18.16 \
- --hash=sha256:e586e4d6eddbca31e6447a25df9972329ea3de64b1fb0eb17e7ab0c9b91f7720 \
- --hash=sha256:f0616d2c719691b94470307cee8adf89ceb1657b7b6f9aa1bf61f9de5543dbbb \
+boto3-stubs[dynamodb,ec2,pricing,s3]==1.17.3.0 \
+ --hash=sha256:54cd69236abe4742f90cb8b45ed76cbc8ba7885cf020d6d48b91fd3f554ba6ee \
+ --hash=sha256:c26c98203f94eb463ab0f13372e092ba605218002643a0915cf31cb53fcd957f \
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
+boto3==1.17.3 \
+ --hash=sha256:92041aa7589c886020cabd80eb58b89ace2f0094571792fccae24b9a8b3b97d7 \
+ --hash=sha256:9f132c34e20110dea019293c89cede49b0a56be615b3e1debf98390ed9f1f7b9 \
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
+botocore==1.20.3 \
+ --hash=sha256:1dae84c68b109f596f58cc2e9fa87704ccd40dcbc12144a89205f85efa7f9135 \
+ --hash=sha256:a0fdded1c9636899ab273f50bf123f79b91439a8c282b5face8b5f4a48b493cb \
# via awscli, boto3, s3transfer
cachetools==4.2.1 \
--hash=sha256:1d9d5f567be80f7c07d765e21b814326d78c61eb0c3a637dffc0e5d1796cb2e2 \
@@ -95,7 +95,7 @@ cassandra-driver==3.22.0 \
--hash=sha256:f3fef1fe2da69f8ed126755f3f26c85fce84ed3bc59f0ea41fde2125f7569799 \
--hash=sha256:f59267674c6b1c5f7303b08ff9dac0d3f218e3737a848fa105718e6cbe7d4bf5 \
--hash=sha256:f74eb7570c933ab4bd3d8f9080497eced9f57d24a79796c352e78a3af1a0a5f6 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
certifi==2020.12.5 \
--hash=sha256:1a4995114262bffbc2413b159f2a1a480c969de6e6eb13ee966d470af86af59c \
--hash=sha256:719a74fb9e33b9bd44cc7f3a8d94bc35e4049deebe19ba7d8e108280cfd59830 \
@@ -143,17 +143,17 @@ cfgv==3.2.0 \
--hash=sha256:32e43d604bbe7896fe7c248a9c2276447dbef840feb28fe20494f62af110211d \
--hash=sha256:cf22deb93d4bcf92f345a5c3cd39d3d41d6340adc60c78bbbd6588c384fda6a1 \
# via pre-commit
-chardet==3.0.4 \
- --hash=sha256:84ab92ed1c4d4f16916e05906b6b75a6c0fb5db821cc65e70cbd64a3e2a5eaae \
- --hash=sha256:fc323ffcaeaed0e0a02bf4d117757b98aed530d9ed4531e3e15460124c106691 \
+chardet==4.0.0 \
+ --hash=sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa \
+ --hash=sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5 \
# via mbstrdecoder, requests
click-completion==0.5.0 \
--hash=sha256:7600261cda0954b9794bd7afdaab35201be50a8c5404eaa4c0deb20086866c4c \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
click==7.0 \
--hash=sha256:2335065e6395b9e67ca716de5f7526736bfa6ceead690adf616d925bdc622b13 \
--hash=sha256:5b94b49521f6456670fdb30cd82a4eca9412788a93fa6dd6df72c94d5a8ff2d7 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in, click-completion, geomet, pip-tools
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in, click-completion, geomet, pip-tools
colorama==0.4.3 \
--hash=sha256:7d73d2a99753107a36ac6b455ee49046802e59d9d076ef8e47b61499fa29afff \
--hash=sha256:e96da0d330793e2cb9485e9ddfd918d456036c7149416295932478192f4436a1 \
@@ -178,7 +178,7 @@ distlib==0.3.1 \
docker==4.2.0 \
--hash=sha256:1c2ddb7a047b2599d1faec00889561316c674f7099427b9c51e8cb804114b553 \
--hash=sha256:ddae66620ab5f4bce769f64bcd7934f880c8abe6aa50986298db56735d0f722e \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in, tcconfig
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in, tcconfig
docutils==0.15.2 \
--hash=sha256:6c4f696463b79f1fb8ba0c594b63840ebd41f059e92b31957c46b74a4599b6d0 \
--hash=sha256:9e4d7ecfc600058e07ba661411a2b7de2fd0fafa17d1a7f7361cd47b1175c827 \
@@ -191,7 +191,7 @@ elasticsearch==7.11.0 \
fabric==2.5.0 \
--hash=sha256:160331934ea60036604928e792fa8e9f813266b098ef5562aa82b88527740389 \
--hash=sha256:24842d7d51556adcabd885ac3cf5e1df73fc622a1708bf3667bf5927576cdfa6 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
filelock==3.0.12 \
--hash=sha256:18d82244ee114f543149c66a6e0c14e9c4f8a1044b5cdaadd0f82159d6a6ff59 \
--hash=sha256:929b7d63ec5b7d6b71b0fa5ac14e030b3f70b75747cef1b10da9b879fef15836 \
@@ -206,14 +206,14 @@ google-api-core==1.26.0 \
google-api-python-client==1.12.8 \
--hash=sha256:3c4c4ca46b5c21196bec7ee93453443e477d82cbfa79234d1ce0645f81170eaf \
--hash=sha256:f3b9684442eec2cfe9f9bb48e796ef919456b82142c7528c5fd527e5224f08bb \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
google-auth-httplib2==0.0.4 \
--hash=sha256:8d092cc60fb16517b12057ec0bba9185a96e3b7169d86ae12eae98e645b7bc39 \
--hash=sha256:aeaff501738b289717fac1980db9711d77908a6c227f60e4aa1923410b43e2ee \
# via google-api-python-client
-google-auth==1.26.1 \
- --hash=sha256:1b461d079b5650efe492a7814e95c536ffa9e7a96e39a6d16189c1604f18554f \
- --hash=sha256:8ce6862cf4e9252de10045f05fa80393fde831da9c2b45c39288edeee3cde7f2 \
+google-auth==1.27.0 \
+ --hash=sha256:d3640ea61ee025d5af00e3ffd82ba0a06dd99724adaf50bdd52f49daf29f3f65 \
+ --hash=sha256:da5218cbf33b8461d7661d6b4ad91c12c0107e2767904d5e3ae6408031d5463e \
# via google-api-core, google-api-python-client, google-auth-httplib2, kubernetes
googleapis-common-protos==1.52.0 \
--hash=sha256:560716c807117394da12cecb0a54da5a451b5cf9866f1d37e9a5e2329a665351 \
@@ -231,9 +231,9 @@ identify==1.5.13 \
--hash=sha256:70b638cf4743f33042bebb3b51e25261a0a10e80f978739f17e7fd4837664a66 \
--hash=sha256:9dfb63a2e871b807e3ba62f029813552a24b5289504f5b071dea9b041aee9fe4 \
# via pre-commit
-idna==2.7 \
- --hash=sha256:156a6814fb5ac1fc6850fb002e0852d56c0c8d2531923a51032d1b70760e186e \
- --hash=sha256:684a38a6f903c1d71d6d5fac066b58d7768af4de2b832e426ec79c30daa94a16 \
+idna==2.10 \
+ --hash=sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6 \
+ --hash=sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0 \
# via requests
importlib-metadata==3.4.0 \
--hash=sha256:ace61d5fc652dc280e7b6b4ff732a9c2d40db2c0f92bc6cb74e07b73d53a1771 \
@@ -243,50 +243,53 @@ invoke==1.3.1 \
--hash=sha256:5a8558521dc5621b2483a1d90944567e2e104e09dda7be6ae4079eb3247f4a3b \
--hash=sha256:7e7659e290e375011adf828a491f07e341852b2126e05de14459408b66199915 \
--hash=sha256:dae041ff458e1ef05448aae3b76e8c2a176c4b7c6a9d5e8ce880f16251803661 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in, fabric
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in, fabric
isort==4.3.21 \
--hash=sha256:54da7e92468955c4fceacd0c86bd0ec997b0e1ee80d97f67c35a78b719dccab1 \
--hash=sha256:6e811fcb295968434526407adb8796944f1988c5b65e8139058f2014cbe100fd \
# via pylint
jinja2==2.10.1 \
--hash=sha256:065c4f02ebe7f7cf559e49ee5a95fb800a9e4528727aec6f24402a5374c65013 \
--hash=sha256:14dd6caf1527abb21f08f86c784eac40853ba93edb79552aa1e4b8aef1b61c7b \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in, click-completion
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in, click-completion
jmespath==0.10.0 \
--hash=sha256:b85d0567b8666149a93172712e68920734333c0ce7e89b78b3e987f71e5ed4f9 \
--hash=sha256:cdf6525904cc597730141d61b36f2e4b8ecc257c420fa2f4549bac2c2d0cb72f \
# via boto3, botocore
kubernetes==12.0.1 \
--hash=sha256:23c85d8571df8f56e773f1a413bc081537536dc47e2b5e8dc2e6262edb2c57ca \
--hash=sha256:ec52ea01d52e2ec3da255992f7e859f3a76f2bdb51cf65ba8cd71dfc309d8daa \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
-lazy-object-proxy==1.4.3 \
- --hash=sha256:0c4b206227a8097f05c4dbdd323c50edf81f15db3b8dc064d08c62d37e1a504d \
- --hash=sha256:194d092e6f246b906e8f70884e620e459fc54db3259e60cf69a4d66c3fda3449 \
- --hash=sha256:1be7e4c9f96948003609aa6c974ae59830a6baecc5376c25c92d7d697e684c08 \
- --hash=sha256:4677f594e474c91da97f489fea5b7daa17b5517190899cf213697e48d3902f5a \
- --hash=sha256:48dab84ebd4831077b150572aec802f303117c8cc5c871e182447281ebf3ac50 \
- --hash=sha256:5541cada25cd173702dbd99f8e22434105456314462326f06dba3e180f203dfd \
- --hash=sha256:59f79fef100b09564bc2df42ea2d8d21a64fdcda64979c0fa3db7bdaabaf6239 \
- --hash=sha256:8d859b89baf8ef7f8bc6b00aa20316483d67f0b1cbf422f5b4dc56701c8f2ffb \
- --hash=sha256:9254f4358b9b541e3441b007a0ea0764b9d056afdeafc1a5569eee1cc6c1b9ea \
- --hash=sha256:9651375199045a358eb6741df3e02a651e0330be090b3bc79f6d0de31a80ec3e \
- --hash=sha256:97bb5884f6f1cdce0099f86b907aa41c970c3c672ac8b9c8352789e103cf3156 \
- --hash=sha256:9b15f3f4c0f35727d3a0fba4b770b3c4ebbb1fa907dbcc046a1d2799f3edd142 \
- --hash=sha256:a2238e9d1bb71a56cd710611a1614d1194dc10a175c1e08d75e1a7bcc250d442 \
- --hash=sha256:a6ae12d08c0bf9909ce12385803a543bfe99b95fe01e752536a60af2b7797c62 \
- --hash=sha256:ca0a928a3ddbc5725be2dd1cf895ec0a254798915fb3a36af0964a0a4149e3db \
- --hash=sha256:cb2c7c57005a6804ab66f106ceb8482da55f5314b7fcb06551db1edae4ad1531 \
- --hash=sha256:d74bb8693bf9cf75ac3b47a54d716bbb1a92648d5f781fc799347cfc95952383 \
- --hash=sha256:d945239a5639b3ff35b70a88c5f2f491913eb94871780ebfabb2568bd58afc5a \
- --hash=sha256:eba7011090323c1dadf18b3b689845fd96a61ba0a1dfbd7f24b921398affc357 \
- --hash=sha256:efa1909120ce98bbb3777e8b6f92237f5d5c8ea6758efea36a473e1d38f7d3e4 \
- --hash=sha256:f3900e8a5de27447acbf900b4750b0ddfd7ec1ea7fbaf11dfa911141bc522af0 \
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
+lazy-object-proxy==1.5.2 \
+ --hash=sha256:1d33d6f789697f401b75ce08e73b1de567b947740f768376631079290118ad39 \
+ --hash=sha256:2f2de8f8ac0be3e40d17730e0600619d35c78c13a099ea91ef7fb4ad944ce694 \
+ --hash=sha256:3782931963dc89e0e9a0ae4348b44762e868ea280e4f8c233b537852a8996ab9 \
+ --hash=sha256:37d9c34b96cca6787fe014aeb651217944a967a5b165e2cacb6b858d2997ab84 \
+ --hash=sha256:38c3865bd220bd983fcaa9aa11462619e84a71233bafd9c880f7b1cb753ca7fa \
+ --hash=sha256:429c4d1862f3fc37cd56304d880f2eae5bd0da83bdef889f3bd66458aac49128 \
+ --hash=sha256:522b7c94b524389f4a4094c4bf04c2b02228454ddd17c1a9b2801fac1d754871 \
+ --hash=sha256:57fb5c5504ddd45ed420b5b6461a78f58cbb0c1b0cbd9cd5a43ad30a4a3ee4d0 \
+ --hash=sha256:5944a9b95e97de1980c65f03b79b356f30a43de48682b8bdd90aa5089f0ec1f4 \
+ --hash=sha256:6f4e5e68b7af950ed7fdb594b3f19a0014a3ace0fedb86acb896e140ffb24302 \
+ --hash=sha256:71a1ef23f22fa8437974b2d60fedb947c99a957ad625f83f43fd3de70f77f458 \
+ --hash=sha256:8a44e9901c0555f95ac401377032f6e6af66d8fc1fbfad77a7a8b1a826e0b93c \
+ --hash=sha256:b6577f15d5516d7d209c1a8cde23062c0f10625f19e8dc9fb59268859778d7d7 \
+ --hash=sha256:c8fe2d6ff0ff583784039d0255ea7da076efd08507f2be6f68583b0da32e3afb \
+ --hash=sha256:cadfa2c2cf54d35d13dc8d231253b7985b97d629ab9ca6e7d672c35539d38163 \
+ --hash=sha256:cd1bdace1a8762534e9a36c073cd54e97d517a17d69a17985961265be6d22847 \
+ --hash=sha256:ddbdcd10eb999d7ab292677f588b658372aadb9a52790f82484a37127a390108 \
+ --hash=sha256:e7273c64bccfd9310e9601b8f4511d84730239516bada26a0c9846c9697617ef \
+ --hash=sha256:e7428977763150b4cf83255625a80a23dfdc94d43be7791ce90799d446b4e26f \
+ --hash=sha256:e960e8be509e8d6d618300a6c189555c24efde63e85acaf0b14b2cd1ac743315 \
+ --hash=sha256:ecb5dd5990cec6e7f5c9c1124a37cb2c710c6d69b0c1a5c4aa4b35eba0ada068 \
+ --hash=sha256:ef3f5e288aa57b73b034ce9c1f1ac753d968f9069cd0742d1d69c698a0167166 \
+ --hash=sha256:fa5b2dee0e231fa4ad117be114251bdfe6afe39213bd629d43deb117b6a6c40a \
+ --hash=sha256:fa7fb7973c622b9e725bee1db569d2c2ee64d2f9a089201c5e8185d482c7352d \
# via astroid
ldap3==2.7 \
--hash=sha256:17f04298b70bf7ecaa5db8a7d8622b5a962ef7fc2b245b2eea705ac1c24338c0 \
--hash=sha256:81df4ac8b6df10fb1f05b17c18d0cb8c4c344d5a03083c382824960ed959cf5b \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
loguru==0.5.3 \
--hash=sha256:b28e72ac7a98be3d28ad28570299a393dfcd32e5e3f6a353dec94675767b6319 \
--hash=sha256:f8087ac396b5ee5f67c963b495d615ebbceac2796379599820e324419d53667c \
@@ -365,21 +368,21 @@ multi-key-dict==2.0.3 \
--hash=sha256:3a1e1fc705a30a7de1a153ec2992b3ca3655ccd9225d2e427fe6525c8f160d6d \
--hash=sha256:deebdec17aa30a1c432cb3f437e81f8621e1c0542a0c0617a74f71e232e9939e \
# via python-jenkins
-mypy-boto3-dynamodb==1.15.16.0 \
- --hash=sha256:1826097330c3fa2ec05273abc272f578ccf89022617d8432b5d4f66f8e5be90b \
- --hash=sha256:da040546acaa7c517968f69aa3dc5eac72b2c8e61a62441485bbaf05aeb221d4 \
+mypy-boto3-dynamodb==1.17.3.0 \
+ --hash=sha256:afe02e200c43212e43b82ca90dcb9ce90dfa2f7270af05b268bae3181083bfa6 \
+ --hash=sha256:c43c290dacaf5ca21e986c44fdf809a8f3fa81afc01e57062448190ef81382fa \
# via boto3-stubs
-mypy-boto3-ec2==1.15.16.0 \
- --hash=sha256:3b0a3e6b9fe205c92b518b0e66165613cc090d44401912848a821d2fc740bbf8 \
- --hash=sha256:b1d385a56fc5df0d3be748c75d54cc34caac90d173e1a4987b40678a9dac088e \
+mypy-boto3-ec2==1.17.3.0 \
+ --hash=sha256:4b5b6fb82dfd04070d228855e259912a9af507841d6702989a023354567eab67 \
+ --hash=sha256:b9416b78b47756fb916768bec09f76d5ec1658dd7f01c5f0386572899be47c65 \
# via boto3-stubs
-mypy-boto3-pricing==1.15.16.0 \
- --hash=sha256:2d282084c981dd4a0f8b3486a3301daae56209f4409889b65f5c05eef3ce34df \
- --hash=sha256:c8c012c82d764b7dea51ecf4826866227a86aed47d5650a6f42cc1ffd00c6de9 \
+mypy-boto3-pricing==1.17.3.0 \
+ --hash=sha256:2f0350933201e79f703c169580b80917fcaf8a1a8ab48cc464ea295c0e5cf10c \
+ --hash=sha256:ff40eb090740623ed16dce346488f2b944d0d12de5abd773530a95c7db2bd5a0 \
# via boto3-stubs
-mypy-boto3-s3==1.15.16.0 \
- --hash=sha256:0e543e687897701a467afceb368ff7b9134bc66494cbb6935199b3348b7a7733 \
- --hash=sha256:4a0ef6dc915f47cbd16066ec658656136f49b73a155576da31d53c3e19ea28c4 \
+mypy-boto3-s3==1.17.3.0 \
+ --hash=sha256:05e61765cfd62e9c7e4825fafdb0c29f6007ef08778808da914db4367a03957f \
+ --hash=sha256:422c12ebcd50f0bcac9c2f3d633ef9cfdc2c936945afed96853ffb9d0693b750 \
# via boto3-stubs
mysql-connector-python==8.0.20 \
--hash=sha256:0f4cf26d89a7d99ec215e01e47bac40935dc80c573005395dbc18890856f87a5 \
@@ -405,7 +408,7 @@ mysql-connector-python==8.0.20 \
--hash=sha256:eec4c549d96d9678fe9df663226fc175562dcd7187af892bfe4711c9a0d99228 \
--hash=sha256:f23c8ddc1550def024ea708d2aa7c999f301cd45ff17ca7bf9a762b48f0d4513 \
--hash=sha256:ffe0bdd9b22a987028ca4e70df770edef987d54899ac5029c364d9c3c04e9e9d \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
nodeenv==1.5.0 \
--hash=sha256:5304d424c529c997bc888453aeaa6362d242b6b4631e90f3d4bf1b290f1c84a9 \
--hash=sha256:ab45090ae383b716c4ef89e690c41ff8c2b257b85b309f01f3654df3d084bd7c \
@@ -421,11 +424,11 @@ packaging==20.9 \
parameterized==0.7.4 \
--hash=sha256:190f8cc7230eee0b56b30d7f074fd4d165f7c45e6077582d0813c8557e738490 \
--hash=sha256:59ab908e31c01505a987a2be78854e19cb1630c047bbab7848169c371d614d56 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
paramiko==2.7.2 \
--hash=sha256:4f3e316fef2ac628b05097a637af35685183111d4bc1b5979bd397c2ab7b5898 \
--hash=sha256:7f36f4ba2c0d81d219f4595e35f70d56cc94f9ac40a6acdf51d6ca210ce65035 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in, fabric
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in, fabric
path.py==12.5.0 \
--hash=sha256:8d885e8b2497aed005703d94e0fd97943401f035e42a136810308bff034529a8 \
--hash=sha256:a43e82eb2c344c3fd0b9d6352f6b856f40b8b7d3d65cc05978b42c3715668496 \
@@ -445,42 +448,44 @@ pbr==5.5.1 \
pip-tools==5.4.0 \
--hash=sha256:a4d3990df2d65961af8b41dacc242e600fdc8a65a2e155ed3d2fc18a5c209f20 \
--hash=sha256:b73f76fe6464b95e41d595a9c0302c55a786dbc54b63ae776c540c04e31914fb \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
pluggy==0.13.1 \
--hash=sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0 \
--hash=sha256:966c145cd83c96502c3c3868f50408687b38434af77734af1e9ca461a4081d2d \
# via pytest
pre-commit==1.14.4 \
--hash=sha256:d3d69c63ae7b7584c4b51446b0b583d454548f9df92575b2fe93a68ec800c4d3 \
--hash=sha256:fc512f129b9526e35e80d656a16a31c198f584c4fce3a5c739045b5140584917 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
prometheus_client==0.8.0 \
--hash=sha256:983c7ac4b47478720db338f1491ef67a100b474e3bc7dafcbaefb7d0b8f9b01c \
--hash=sha256:c6e6b706833a6bd1fd51711299edee907857be10ece535126a158f911ee80915 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
-protobuf==3.14.0 \
- --hash=sha256:0e247612fadda953047f53301a7b0407cb0c3cb4ae25a6fde661597a04039b3c \
- --hash=sha256:0fc96785262042e4863b3f3b5c429d4636f10d90061e1840fce1baaf59b1a836 \
- --hash=sha256:1c51fda1bbc9634246e7be6016d860be01747354ed7015ebe38acf4452f470d2 \
- --hash=sha256:1d63eb389347293d8915fb47bee0951c7b5dab522a4a60118b9a18f33e21f8ce \
- --hash=sha256:22bcd2e284b3b1d969c12e84dc9b9a71701ec82d8ce975fdda19712e1cfd4e00 \
- --hash=sha256:2a7e2fe101a7ace75e9327b9c946d247749e564a267b0515cf41dfe450b69bac \
- --hash=sha256:43b554b9e73a07ba84ed6cf25db0ff88b1e06be610b37656e292e3cbb5437472 \
- --hash=sha256:4b74301b30513b1a7494d3055d95c714b560fbb630d8fb9956b6f27992c9f980 \
- --hash=sha256:4e75105c9dfe13719b7293f75bd53033108f4ba03d44e71db0ec2a0e8401eafd \
- --hash=sha256:5b7a637212cc9b2bcf85dd828b1178d19efdf74dbfe1ddf8cd1b8e01fdaaa7f5 \
- --hash=sha256:5e9806a43232a1fa0c9cf5da8dc06f6910d53e4390be1fa06f06454d888a9142 \
- --hash=sha256:629b03fd3caae7f815b0c66b41273f6b1900a579e2ccb41ef4493a4f5fb84f3a \
- --hash=sha256:72230ed56f026dd664c21d73c5db73ebba50d924d7ba6b7c0d81a121e390406e \
- --hash=sha256:86a75477addde4918e9a1904e5c6af8d7b691f2a3f65587d73b16100fbe4c3b2 \
- --hash=sha256:8971c421dbd7aad930c9bd2694122f332350b6ccb5202a8b7b06f3f1a5c41ed5 \
- --hash=sha256:9616f0b65a30851e62f1713336c931fcd32c057202b7ff2cfbfca0fc7d5e3043 \
- --hash=sha256:b0d5d35faeb07e22a1ddf8dce620860c8fe145426c02d1a0ae2688c6e8ede36d \
- --hash=sha256:ecc33531a213eee22ad60e0e2aaea6c8ba0021f0cce35dbf0ab03dee6e2a23a1 \
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
+protobuf==3.15.1 \
+ --hash=sha256:03f6ee325710eb164bd85741721fbd4326c399b0ecf49dddba9172df9149c124 \
+ --hash=sha256:0644b70bc9d36329438de0da619e3337ab4eade784a9acc6ba8e5ed22f2e9e50 \
+ --hash=sha256:0938b13c2a5ad0ce2b75c19dc0c2082f721a61b97d3f11d73ee4412dfb6e06eb \
+ --hash=sha256:165071fdfaf4d7ff7a70d2197bba048fb301c7b957095eedf4bf8379d904adb1 \
+ --hash=sha256:17a26d5a7757211ce60032f0111dd426d4e5f44145ac6e86fa241e0cffe9df17 \
+ --hash=sha256:28daf1c44cf11c70f3852bd13f8fc6f7f1f211abbf068ffbeb25f8e4e2f6c98b \
+ --hash=sha256:3188af446c544df79525d66e2d987490053262b81226fc6fa3f00556135f7e8a \
+ --hash=sha256:509fba6d57f0c1dc483f91754a33a5d8632da1bf75d87b6c127bcf0e3966fa44 \
+ --hash=sha256:5810e9e3851ab8aa28624bdc947f9236ce7ec2be2f63b88b373fdc92791fbf86 \
+ --hash=sha256:60fd96bc77293d9770d133cdbf3af9ff2373ce11d2055d2ca581db2330fe6805 \
+ --hash=sha256:763a9444bafd2204cdeb29be54147ce7cfae04df805161507426c215a461ae6e \
+ --hash=sha256:824dbae3390fcc3ea1bf96748e6da951a601802894cf7e1465e72b4732538cab \
+ --hash=sha256:a8cccf2d2df2675f10a47f963f8010516f6aff09db7d134b0b0e57422ce07f78 \
+ --hash=sha256:c70647b71385302efb615d25c643f1b92784201f7b4ed2d9ff472e4c869ccad5 \
+ --hash=sha256:d3797255e8fbf234477332864f0304222b2492dfd91e95e6314326dbf0e235e2 \
+ --hash=sha256:d52494780f89d1277f982c209197ce1da91d416c27ba9f4495d339ac6a3bac02 \
+ --hash=sha256:d7576c8b59288c5feea161d9ed74925d26759963b51f850d8eadd7a88b4e0ddf \
+ --hash=sha256:de2e543ffb1701ea8fe7077ba571dbaa1980876d1817f6a70b984064dcb20e6f \
+ --hash=sha256:edae67da507393f377555531cb7afa1714c75a84404f3541ef5df36ce3637768 \
+ --hash=sha256:f49a1721f2a3d72466aa19f095cc3fe2883b5e1868f4a1e9f51043df8ecb0140 \
# via google-api-core, googleapis-common-protos, mysql-connector-python
ptable==0.9.2 \
--hash=sha256:aa7fc151cb40f2dabcd2275ba6f7fd0ff8577a86be3365cd3fb297cbe09cc292 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
py==1.10.0 \
--hash=sha256:21b81bda15b66ef5e1a777a21c4dcd9c20ad3efd0b3f817e7a809035269e1bd3 \
--hash=sha256:3b80836aa6d1feeaa108e046da6423ab8f6ceda6468545ae8d02d9d58d18818a \
@@ -537,11 +542,11 @@ pycryptodome==3.9.8 \
--hash=sha256:f521178e5a991ffd04182ed08f552daca1affcb826aeda0e1945cd989a9d4345 \
--hash=sha256:f78a68c2c820e4731e510a2df3eef0322f24fde1781ced970bf497b6c7d92982 \
--hash=sha256:fbe65d5cfe04ff2f7684160d50f5118bdefb01e3af4718eeb618bfed40f19d94 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
pylint==2.5.3 \
--hash=sha256:7dd78437f2d8d019717dbf287772d0b2dbdfd13fc016aa7faa08d67bccc46adc \
--hash=sha256:d0ece7d223fe422088b0e8f13fa0a1e8eb745ebffcb8ed53d3e95394b6101a1c \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
pynacl==1.4.0 \
--hash=sha256:06cbb4d9b2c4bd3c8dc0d267416aaed79906e7b33f114ddbf0911969794b1cc4 \
--hash=sha256:11335f09060af52c97137d4ac54285bcb7df0cef29014a1a4efe64ac065434c4 \
@@ -572,7 +577,7 @@ pyroute2==0.5.14 \
pytest==4.6.4 \
--hash=sha256:6aa9bc2f6f6504d7949e9df2a756739ca06e58ffda19b5e53c725f7b03fb4aae \
--hash=sha256:b77ae6f2d1a760760902a7676887b665c086f71e3461c64ed2a312afcedc00d6 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
python-dateutil==2.8.1 \
--hash=sha256:73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c \
--hash=sha256:75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a \
@@ -599,7 +604,7 @@ pyyaml==5.3.1 \
--hash=sha256:b8eac752c5e14d3eca0e6dd9199cd627518cb5ec06add0de9d32baeee6fe645d \
--hash=sha256:cc8955cfbfc7a115fa81d85284ee61147059a753344bc51098f3ccd69b0d7e0c \
--hash=sha256:d13155f591e6fcc1ec3b30685d50bf0711574e2c0dfffd7644babf8b5102ca1a \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in, aspy.yaml, awscli, kubernetes, pre-commit
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in, aspy.yaml, awscli, kubernetes, pre-commit
pyzmq==19.0.2 \
--hash=sha256:00dca814469436455399660247d74045172955459c0bd49b54a540ce4d652185 \
--hash=sha256:046b92e860914e39612e84fa760fc3f16054d268c11e0e25dcb011fb1bc6a075 \
@@ -633,18 +638,18 @@ pyzmq==19.0.2 \
--hash=sha256:e36f12f503511d72d9bdfae11cadbadca22ff632ff67c1b5459f69756a029c19 \
--hash=sha256:f1a25a61495b6f7bb986accc5b597a3541d9bd3ef0016f50be16dbb32025b302 \
--hash=sha256:fa411b1d8f371d3a49d31b0789eb6da2537dadbb2aef74a43aa99a78195c3f76 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
https://github.com/fruch/repodataParser/archive/py3.zip ; python_version > "3" \
--hash=sha256:3424db354bb58a6bd254546f3499510cb0b5c1543835cd92bc7dfe01b7bf828d \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
requests-oauthlib==1.3.0 \
--hash=sha256:7f71572defaecd16372f9006f33c2ec8c077c3cfa6f5911a9a90202beb513f3d \
--hash=sha256:b4261601a71fd721a8bd6d7aa1cc1d6a8a93b4a9f5e96626f8e4d91e8beeaa6a \
# via kubernetes
-requests==2.20.0 \
- --hash=sha256:99dcfdaaeb17caf6e526f32b6a7b780461512ab3f1d992187801694cba42770c \
- --hash=sha256:a84b8c9ab6239b578f22d1c21d51b696dcfe004032bb80ea832398d6909d7279 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in, apache-libcloud, docker, google-api-core, kubernetes, python-jenkins, requests-oauthlib
+requests==2.25.1 \
+ --hash=sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804 \
+ --hash=sha256:c210084e36a42ae6b9219e00e48287def368a26d03a048ddad7bfee44f75871e \
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in, apache-libcloud, docker, google-api-core, kubernetes, python-jenkins, requests-oauthlib
rsa==4.5 \
--hash=sha256:35c5b5f6675ac02120036d97cf96f1fde4d49670543db2822ba5015e21a18032 \
--hash=sha256:4d409f5a7d78530a4a2062574c7bd80311bc3af29b364e293aa9b03eea77714f \
@@ -672,14 +677,14 @@ six==1.15.0 \
sortedcontainers==1.5.9 \
--hash=sha256:844daced0f20d75c02ce53f373d048ea2e401ad8a7b3a4c43b2aa544b569efb3 \
--hash=sha256:fb9e22cd6ee4b459f0d7b9b4189b19631031c72ac05715563139162014c13672 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
sqliteschema==1.0.5 \
--hash=sha256:443addd7d8874d4f576d418884a569944098c83fd49b80b21561a7905aaf38da \
--hash=sha256:5b1a217edf42bd51181002e87c9133379274ef64db2d4c5369e6713a1527a06c \
# via simplesqlite
https://github.com/dkropachev/ssh2-python/archive/fix_segmentation.zip \
--hash=sha256:da673418258b21c8267c042bfdb103fed32b1c664947242f4c79786c03e0c746 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
subprocrunner==1.2.1 \
--hash=sha256:1541823dbe804e881fcc781e1b15b3770ecd381febcfaff41ff2f69dba206a6d \
--hash=sha256:446b0763ee6896f8e20e8a4484387c26e3ebd8ba586b84580c7e1fe75e469724 \
@@ -691,11 +696,11 @@ tabledata==1.1.3 \
tcconfig==0.26.0 \
--hash=sha256:8c73d868f555e0c316033b6d76d2362828e3f017896918f059e7c8993ff2fe17 \
--hash=sha256:b4b064f96c28f7a171552e2854c04849585fd47aba6df46539a0de776a4eebbc \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
tenacity==5.0.4 \
--hash=sha256:a0c3c5f7ae0c33f5556c775ca059c12d6fd8ab7121613a713e8b7d649908571b \
--hash=sha256:b87c1934daa0b2ccc7db153c37b8bf91d12f165936ade8628e7b962b92dc7705 \
- # via -r /home/fruch/Projects/scylla-cluster-tests/requirements.in
+ # via -r /extra/scylladb/scylla-cluster-tests/dkropachev3/requirements.in
toml==0.10.2 \
--hash=sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b \
--hash=sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f \
@@ -708,9 +713,9 @@ uritemplate==3.0.1 \
--hash=sha256:07620c3f3f8eed1f12600845892b0e036a2420acf513c53f7de0abd911a5894f \
--hash=sha256:5af8ad10cec94f215e3f48112de2022e1d5a37ed427fbd88652fa908f2ab7cae \
# via google-api-python-client
-urllib3==1.24.3 \
- --hash=sha256:2393a695cd12afedd0dcb26fe5d50d0cf248e5a66f75dbd89a3d4eb333a61af4 \
- --hash=sha256:a637e5fae88995b256e3409dc4d52c2e2e0ba32c42a6365fee8bbd2238de3cfb \
+urllib3==1.26.3 \
+ --hash=sha256:1b465e494e3e0d8939b50680403e3aedaa2bc434b7d5af64dfd3c958d7f5ae80 \
+ --hash=sha256:de3eedaad74a2683334e282005cd8d7f22f4d55fa690a2a1020a416cb0a47e73 \
# via botocore, elasticsearch, kubernetes, requests, selenium
virtualenv==20.4.2 \
--hash=sha256:147b43894e51dd6bba882cf9c282447f780e2251cd35172403745fc381a0a80d \
diff --git a/sct.py b/sct.py
--- a/sct.py
+++ b/sct.py
@@ -38,9 +38,9 @@
list_logs_by_test_id, get_branched_ami, gce_meta_to_dict,
aws_tags_to_dict, list_elastic_ips_aws, get_builder_by_test_id,
clean_resources_according_post_behavior, clean_sct_runners,
- search_test_id_in_latest, get_testrun_dir, format_timestamp, list_clusters_gke)
+ search_test_id_in_latest, get_testrun_dir, format_timestamp, list_clusters_gke,
+ list_clusters_eks)
from sdcm.utils.jepsen import JepsenResults
-from sdcm.utils.docker_utils import ContainerManager
from sdcm.utils.monitorstack import (restore_monitoring_stack, get_monitoring_stack_services,
kill_running_monitoring_stack_services)
from sdcm.cluster import Setup
@@ -260,7 +260,7 @@ def list_resources(ctx, user, test_id, get_all, get_all_running, verbose):
else:
click.secho("No elastic ips found for selected filters in AWS!", fg="yellow")

- click.secho("Checking GCE...", fg='green')
+ click.secho("Checking GKE...", fg='green')
gke_clusters = list_clusters_gke(tags_dict=params, verbose=verbose)
if gke_clusters:
gke_table = PrettyTable(["Name", "Region-AZ", "TestId", "RunByUser", "CreateTime"])
@@ -277,6 +277,8 @@ def list_resources(ctx, user, test_id, get_all, get_all_running, verbose):
click.echo(gke_table.get_string(title="GKE clusters"))
else:
click.secho("Nothing found for selected filters in GKE!", fg="yellow")
+
+ click.secho("Checking GCE...", fg='green')
gce_instances = list_instances_gce(tags_dict=params, running=get_all_running, verbose=verbose)
if gce_instances:
gce_table = PrettyTable(table_header)
@@ -296,8 +298,6 @@ def list_resources(ctx, user, test_id, get_all, get_all_running, verbose):
else:
click.secho("Nothing found for selected filters in GCE!", fg="yellow")

-<<<<<<< HEAD
-=======
click.secho("Checking EKS...", fg='green')
eks_clusters = list_clusters_eks(tags_dict=params, verbose=verbose)
if eks_clusters:
@@ -316,7 +316,6 @@ def list_resources(ctx, user, test_id, get_all, get_all_running, verbose):
else:
click.secho("Nothing found for selected filters in EKS!", fg="yellow")

->>>>>>> 8bd6ee1f... fix(operator-jenkins-pipelines): Generalize test files and create EKS jenkins pipelines
click.secho("Checking Docker...", fg="green")
docker_resources = \
list_resources_docker(tags_dict=params, running=get_all_running, group_as_builder=True, verbose=verbose)
diff --git a/sdcm/cluster.py b/sdcm/cluster.py
--- a/sdcm/cluster.py
+++ b/sdcm/cluster.py
@@ -2844,7 +2844,7 @@ def wait_for_machine_image_configured(self):
self.log.info("Waiting for Scylla Machine Image setup to finish...")
wait.wait_for(self.is_machine_image_configured, step=10, timeout=300)

- def get_sysctl_output(self) -> dict[str, str]:
+ def get_sysctl_output(self) -> Dict[str, str]:
properties = {}
result = self.remoter.sudo("sysctl -a", ignore_status=True)

@@ -4517,7 +4517,7 @@ def install_scylla_bench(node):

class BaseMonitorSet(): # pylint: disable=too-many-public-methods,too-many-instance-attributes
# This is a Mixin for monitoring cluster and should not be inherited
-
+ DB_NODES_IP_ADDRESS = 'ip_address'
json_file_params_for_replace = {"$test_name": get_test_name()}

def __init__(self, targets, params):
@@ -4776,7 +4776,8 @@ def configure_scylla_monitoring(self, node, sct_metrics=True, alert_manager=True
with open(local_template_tmp) as output_file:
templ_yaml = yaml.safe_load(output_file)
self.log.debug("Configs %s" % templ_yaml)
- loader_targets_list = ["[%s]:9103" % n.ip_address for n in self.targets["loaders"].nodes]
+ loader_targets_list = ["[%s]:9103" % getattr(node, self.DB_NODES_IP_ADDRESS)
+ for node in self.targets["loaders"].nodes]

# remove those jobs if exists, for support of 'reuse_cluster: true'
def remove_sct_metrics(metric):
@@ -4797,7 +4798,8 @@ def remove_sct_metrics(metric):
static_configs=[dict(targets=[cloud_prom_host])]))

if self.params.get('gemini_cmd'):
- gemini_loader_targets_list = ["%s:2112" % n.ip_address for n in self.targets["loaders"].nodes]
+ gemini_loader_targets_list = ["%s:2112" % getattr(node, self.DB_NODES_IP_ADDRESS)
+ for node in self.targets["loaders"].nodes]
scrape_configs.append(dict(job_name="gemini_metrics", honor_labels=True,
static_configs=[dict(targets=gemini_loader_targets_list)]))

@@ -4872,13 +4874,11 @@ def reconfigure_scylla_monitoring(self):
for node in self.nodes:
cluster_backend = self.params.get("cluster_backend")
monitoring_targets = []
- attr_name = f"{'public_' if cluster_backend == 'k8s-gke' else ''}ip_address"
for db_node in self.targets["db_cluster"].nodes:
- monitoring_targets.append(f"[{getattr(db_node, attr_name)}]:9180")
+ monitoring_targets.append(f"[{getattr(db_node, self.DB_NODES_IP_ADDRESS)}]:9180")
monitoring_targets = " ".join(monitoring_targets)
if self.params.get("ip_ssh_connections") == "ipv6":
monitoring_targets = monitoring_targets.replace("[", "").replace("]", "")
-
node.remoter.sudo(shell_script_cmd(f"""\
cd {self.monitor_install_path}
mkdir -p {self.monitoring_conf_dir}
@@ -4930,7 +4930,7 @@ def start_scylla_monitoring(self, node):
-D "{labels}" \
-s `realpath "{self.monitoring_conf_dir}/scylla_servers.yml"` \
-n `realpath "{self.monitoring_conf_dir}/node_exporter_servers.yml"` \
- {scylla_manager_servers_arg}
+ {scylla_manager_servers_arg} \
-d `realpath "{self.monitoring_data_dir}"` -l -v master,{self.monitoring_version} -b "-web.enable-admin-api"
""")
node.remoter.run("bash -ce '%s'" % run_script, verbose=True)
diff --git a/sdcm/cluster_aws.py b/sdcm/cluster_aws.py
--- a/sdcm/cluster_aws.py
+++ b/sdcm/cluster_aws.py
@@ -39,7 +39,8 @@
from sdcm.remote import LocalCmdRunner, shell_script_cmd, NETWORK_EXCEPTIONS
from sdcm.cluster import INSTANCE_PROVISION_ON_DEMAND
from sdcm.ec2_client import CreateSpotInstancesError
-from sdcm.utils.common import list_instances_aws, get_ami_tags, ec2_instance_wait_public_ip, MAX_SPOT_DURATION_TIME
+from sdcm.utils.aws_utils import tags_as_ec2_tags, ec2_instance_wait_public_ip
+from sdcm.utils.common import list_instances_aws, get_ami_tags, MAX_SPOT_DURATION_TIME
from sdcm.utils.decorators import retrying
from sdcm.sct_events.system import SpotTerminationEvent
from sdcm.sct_events.filters import DbEventsFilter
@@ -59,10 +60,6 @@
# pylint: disable=too-many-lines


-def _tags_as_ec2_tags(tags: Dict[str, str]) -> List[Dict[str, str]]:
- return [{"Key": key, "Value": value} for key, value in tags.items()]
-
-
class AWSCluster(cluster.BaseCluster): # pylint: disable=too-many-instance-attributes,abstract-method,

"""
@@ -76,7 +73,7 @@ def __init__(self, ec2_ami_id, ec2_subnet_id, ec2_security_group_ids, # pylint:
cluster_prefix='cluster',
node_prefix='node', n_nodes=10, params=None, node_type=None, extra_network_interface=False):
# pylint: disable=too-many-locals
- region_names = params.get('region_name').split()
+ region_names = params.region_names
if len(credentials) > 1 or len(region_names) > 1:
assert len(credentials) == len(region_names)
for idx, _ in enumerate(region_names):
@@ -495,7 +492,7 @@ def init(self):
# first we need to configure the both networks so we'll have public ip
self.allocate_and_attach_elastic_ip(self.parent_cluster, self.dc_idx)
resources_to_tag.append(self.eip_allocation_id)
- self._ec2_service.create_tags(Resources=resources_to_tag, Tags=_tags_as_ec2_tags(self.tags))
+ self._ec2_service.create_tags(Resources=resources_to_tag, Tags=tags_as_ec2_tags(self.tags))

self._wait_public_ip()

diff --git a/sdcm/cluster_k8s/__init__.py b/sdcm/cluster_k8s/__init__.py
--- a/sdcm/cluster_k8s/__init__.py
+++ b/sdcm/cluster_k8s/__init__.py
@@ -15,14 +15,15 @@

from __future__ import annotations

+import abc
import contextlib
import logging
import os
import random as librandom
-import requests
import time
import base64
-import zipfile
+import socket
+import traceback

from copy import deepcopy
from datetime import datetime
@@ -31,26 +32,30 @@
from tempfile import NamedTemporaryFile, TemporaryDirectory
from textwrap import dedent
from threading import RLock
-from typing import Optional, Union, List, Dict, Any, ContextManager, Type, Callable
+from typing import Optional, Union, List, Dict, Any, ContextManager, Type, Tuple, Callable

import json
import yaml
import kubernetes as k8s
-from kubernetes.client import V1Container
+from kubernetes.client import V1Container, V1ResourceRequirements
from kubernetes.dynamic.resource import Resource, ResourceField, ResourceInstance, ResourceList, Subresource

from invoke.exceptions import CommandTimedOut

from sdcm import sct_abs_path, cluster, cluster_docker
+from sdcm.cluster import Setup
from sdcm.db_stats import PrometheusDBStats
-from sdcm.remote import LOCALRUNNER, NETWORK_EXCEPTIONS
+from sdcm.remote import NETWORK_EXCEPTIONS
from sdcm.remote.kubernetes_cmd_runner import KubernetesCmdRunner
from sdcm.coredump import CoredumpExportFileThread
from sdcm.mgmt import AnyManagerCluster
from sdcm.sct_events.health import ClusterHealthValidatorEvent
from sdcm.sct_events.system import TestFrameworkEvent
-from sdcm.utils.common import download_from_github, walk_thru_data
-from sdcm.utils.k8s import KubernetesOps, ApiCallRateLimiter, JSON_PATCH_TYPE, KUBECTL_TIMEOUT
+from sdcm.utils.common import download_from_github, shorten_cluster_name, walk_thru_data
+from sdcm.utils.k8s import KubernetesOps, ApiCallRateLimiter, JSON_PATCH_TYPE, KUBECTL_TIMEOUT, TokenUpdateThread, \
+ get_helm_pool_affinity_values, convert_cpu_value_from_k8s_to_units, convert_memory_value_from_k8s_to_units, \
+ convert_cpu_units_to_k8s_value, convert_memory_units_to_k8s_value, get_pool_affinity_modifiers, PortExposeService, \
+ HelmValues, add_pool_node_affinity
from sdcm.utils.decorators import log_run_info, retrying, timeout
from sdcm.utils.remote_logger import get_system_logging_thread, CertManagerLogger, ScyllaOperatorLogger, \
KubectlClusterEventsLogger, ScyllaManagerLogger
@@ -62,19 +67,170 @@
ANY_KUBERNETES_RESOURCE = Union[Resource, ResourceField, ResourceInstance, ResourceList, Subresource]

CERT_MANAGER_TEST_CONFIG = sct_abs_path("sdcm/k8s_configs/cert-manager-test.yaml")
+LOADER_CLUSTER_CONFIG = sct_abs_path("sdcm/k8s_configs/loaders.yaml")
+LOCAL_PROVISIONER_DIR = sct_abs_path("sdcm/k8s_configs/provisioner")
+
SCYLLA_API_VERSION = "scylla.scylladb.com/v1"
SCYLLA_CLUSTER_RESOURCE_KIND = "ScyllaCluster"
DEPLOY_SCYLLA_CLUSTER_DELAY = 15 # seconds
SCYLLA_OPERATOR_NAMESPACE = "scylla-operator-system"
SCYLLA_MANAGER_NAMESPACE = "scylla-manager-system"
SCYLLA_NAMESPACE = "scylla"

+# Resources that are used by container deployed by scylla-operator on scylla nodes
+OPERATOR_CONTAINERS_RESOURCES = {
+ 'cpu': 2,
+ 'memory': 0.4
+}
+
+# Resources that are used by side-car injected by sct into scylla-operator statefulset
+# Look at ScyllaPodCluster.add_sidecar_injection()
+SIDECAR_CONTAINERS_RESOURCES = {
+ 'cpu': 0.1,
+ 'memory': 0.1
+}

LOGGER = logging.getLogger(__name__)


-class KubernetesCluster:
+class DnsPodResolver:
+ DNS_RESOLVER_CONFIG = sct_abs_path("sdcm/k8s_configs/dns-resolver-pod.yaml")
+
+ def __init__(self, k8s_cluster: KubernetesCluster, pod_name: str, pool: CloudK8sNodePool = None):
+ self.pod_name = pod_name
+ self.pool = pool
+ self.k8s_cluster = k8s_cluster
+
+ def deploy(self):
+ if self.pool:
+ affinity_modifiers = self.pool.affinity_modifiers
+ else:
+ affinity_modifiers = []
+
+ self.k8s_cluster.apply_file(
+ self.DNS_RESOLVER_CONFIG,
+ modifiers=affinity_modifiers,
+ environ={'POD_NAME': self.pod_name})
+
+ def resolve(self, hostname: str) -> str:
+ result = self.k8s_cluster.kubectl(f'exec {self.pod_name} -- host {hostname}', verbose=False).stdout
+ tmp = result.split()
+ # result = self.k8s_cluster.kubectl(f'exec {self.pod_name} -- dig +short {hostname}').stdout.splitlines()
+ if len(tmp) < 4:
+ raise RuntimeError("Got wrong result: %s", result)
+ return tmp[3]
+
+
+class CloudK8sNodePool(metaclass=abc.ABCMeta):
+ def __init__(
+ self,
+ k8s_cluster: 'KubernetesCluster',
+ name: str,
+ num_nodes: int,
+ instance_type: str,
+ image_type: str,
+ disk_size: int = None,
+ disk_type: str = None,
+ labels: dict = None,
+ tags: dict = None,
+ is_deployed: bool = False):
+ self.k8s_cluster = k8s_cluster
+ self.name = name
+ self.num_nodes = int(num_nodes)
+ self.instance_type = instance_type
+ self.disk_size = disk_size
+ self.disk_type = disk_type
+ self.image_type = image_type
+ self.labels = labels
+ self.tags = tags
+ self.is_deployed = is_deployed
+
+ @abc.abstractmethod
+ def deploy(self):
+ pass
+
+ def deploy_and_wait_till_ready(self):
+ if not self.is_deployed:
+ self.deploy()
+ self.wait_for_nodes_readiness()
+
+ @abc.abstractmethod
+ def undeploy(self):
+ pass
+
+ @abc.abstractmethod
+ def resize(self, num_nodes: int):
+ pass
+
+ def __str__(self):
+ data = [f'name="{self.name}"', *[f'{param}="{value}"' for param, value in self.__dict__.items() if
+ param not in ['name', 'k8s_cluster']]]
+ return f"<{self.__class__.__name__}:{', '.join(data)}>"
+
+ @cached_property
+ def affinity_modifiers(self) -> List[Callable]:
+ return get_pool_affinity_modifiers(self.pool_label_name, self.name)
+
+ @cached_property
+ def helm_affinity_values(self) -> HelmValues:
+ return HelmValues(get_helm_pool_affinity_values(self.pool_label_name, self.name))
+
+ @cached_property
+ def pool_label_name(self) -> str:
+ return self.k8s_cluster.POOL_LABEL_NAME
+
+ @cached_property
+ def cpu_and_memory_capacity(self) -> Tuple[float, float]:
+ for el in self.k8s_cluster.k8s_core_v1_api.list_node().items:
+ if el.metadata.labels.get(self.pool_label_name, '') == self.name:
+ capacity = el.status.allocatable
+ return convert_cpu_value_from_k8s_to_units(capacity['cpu']), convert_memory_value_from_k8s_to_units(
+ capacity['memory'])
+ raise RuntimeError("Can't find any node for pool '%s'", self.name)
+
+ @property
+ def cpu_capacity(self) -> float:
+ return self.cpu_and_memory_capacity[0]
+
+ @property
+ def memory_capacity(self) -> float:
+ return self.cpu_and_memory_capacity[1]
+
+ @property
+ def readiness_timeout(self) -> int:
+ return 10 + (10 * self.num_nodes)
+
+ def wait_for_nodes_readiness(self):
+ readiness_timeout = self.readiness_timeout
+
+ @timeout(
+ message=f"Wait for {self.num_nodes} node(s) in pool {self.name} to be ready...",
+ sleep_time=30,
+ timeout=readiness_timeout * 60)
+ def wait_nodes_are_ready():
+ # To make it more informative in worst case scenario made it repeat 5 times, by readiness_timeout // 5
+ result = self.k8s_cluster.kubectl_no_wait(
+ f"wait --timeout={self.readiness_timeout // 5}m -l {self.pool_label_name}={self.name} "
+ f"--for=condition=Ready node",
+ timeout=readiness_timeout // 5 * 60 + 10, verbose=False)
+ if result.stdout.count('condition met') != self.num_nodes:
+ raise RuntimeError('Not all nodes reported')
+
+ wait_nodes_are_ready()
+
+
+class KubernetesCluster(metaclass=abc.ABCMeta):
+ AUXILIARY_POOL_NAME = 'auxiliary-pool'
+ SCYLLA_POOL_NAME = 'scylla-pool'
+ MONITORING_POOL_NAME = 'monitoring-pool'
+ LOADER_POOL_NAME = 'loader-pool'
+ DNS_RESOLVER_POD_NAME = 'dns-resolver-pod'
+ POOL_LABEL_NAME: str = None
+ USE_POD_RESOLVER = False
+ USE_MONITORING_EXPOSE_SERVICE = False
+
api_call_rate_limiter: Optional[ApiCallRateLimiter] = None
+ k8s_monitoring_prometheus_expose_service: Optional[PortExposeService] = None

datacenter = ()
_cert_manager_journal_thread: Optional[CertManagerLogger] = None
@@ -84,6 +240,20 @@ class KubernetesCluster:

_scylla_operator_log_monitor_thread: Optional[ScyllaOperatorLogMonitoring] = None
_scylla_operator_status_monitor_thread: Optional[ScyllaOperatorStatusMonitoring] = None
+ _token_update_thread: Optional[TokenUpdateThread] = None
+ pools: Dict[str, CloudK8sNodePool]
+
+ def __init__(self, params: dict, user_prefix: str = '', region_name: str = None, cluster_uuid: str = None):
+ self.pools = {}
+ if cluster_uuid is None:
+ self.uuid = Setup.test_id()
+ else:
+ self.uuid = cluster_uuid
+ self.region_name = region_name
+ self.shortid = str(self.uuid)[:8]
+ self.name = '%s-%s' % (user_prefix, self.shortid)
+ self.params = params
+ self.api_call_rate_limiter = None

# NOTE: Following class attr(s) are defined for consumers of this class
# such as 'sdcm.utils.remote_logger.ScyllaOperatorLogger'.
@@ -95,6 +265,10 @@ class KubernetesCluster:
def k8s_server_url(self) -> Optional[str]:
return None

+ @cached_property
+ def short_cluster_name(self):
+ return shorten_cluster_name(self.name, 40).replace('_', '-')
+
kubectl_cmd = partialmethod(KubernetesOps.kubectl_cmd)
apply_file = partialmethod(KubernetesOps.apply_file)

@@ -117,23 +291,28 @@ def kubectl_multi_cmd(self, *command, namespace=None, timeout=KUBECTL_TIMEOUT, r
return KubernetesOps.kubectl_multi_cmd(self, *command, namespace=namespace, timeout=timeout, remoter=remoter,
ignore_status=ignore_status, verbose=verbose)

- @cached_property
+ @property
def helm(self):
if self.api_call_rate_limiter:
self.api_call_rate_limiter.wait()
return partial(cluster.Setup.tester_obj().localhost.helm, self)

- @cached_property
+ @property
def helm_install(self):
if self.api_call_rate_limiter:
self.api_call_rate_limiter.wait()
return partial(cluster.Setup.tester_obj().localhost.helm_install, self)

- @property
+ @cached_property
+ def kubectl_token_path(self):
+ return os.path.join(os.path.dirname(
+ os.path.expanduser(os.environ.get('KUBECONFIG', '~/.kube/config'))), 'kubectl.token')
+
+ @cached_property
def cert_manager_log(self) -> str:
return os.path.join(self.logdir, "cert_manager.log")

- @property
+ @cached_property
def scylla_manager_log(self) -> str:
return os.path.join(self.logdir, "scylla_manager.log")

@@ -146,21 +325,33 @@ def start_scylla_manager_journal_thread(self):
self._scylla_manager_journal_thread.start()

@log_run_info
- def deploy_cert_manager(self) -> None:
+ def deploy_cert_manager(self, pool_name: str = None) -> None:
+ if pool_name is None:
+ pool_name = self.AUXILIARY_POOL_NAME
LOGGER.info("Deploy cert-manager")
self.kubectl("create namespace cert-manager", ignore_status=True)
LOGGER.debug(self.helm("repo add jetstack https://charts.jetstack.io"))
- LOGGER.debug(self.helm(f"install cert-manager jetstack/cert-manager "
- f"--version v{self.params.get('k8s_cert_manager_version')} --set installCRDs=true",
- namespace="cert-manager"))
+
+ if pool_name:
+ helm_values = HelmValues(get_helm_pool_affinity_values(self.POOL_LABEL_NAME, pool_name))
+ else:
+ helm_values = HelmValues()
+
+ helm_values.set('installCRDs', True)
+
+ LOGGER.debug(self.helm(
+ f"install cert-manager jetstack/cert-manager --version v{self.params.get('k8s_cert_manager_version')}",
+ namespace="cert-manager", values=helm_values))
time.sleep(10)
+
self.kubectl("wait --timeout=10m --all --for=condition=Ready pod", namespace="cert-manager")
wait_for(
self.check_if_cert_manager_fully_functional,
text='Waiting for cert-manager to become fully operational',
timeout=10 * 60,
step=10,
throw_exc=True)
+
self.start_cert_manager_journal_thread()

@cached_property
@@ -189,45 +380,52 @@ def _scylla_operator_chart_version(self):
return chart_version

@log_run_info
- def deploy_scylla_manager(self) -> None:
+ def deploy_scylla_manager(self, pool_name: str = None) -> None:
# Calculate options values which must be set
#
# image.tag -> self.params.get('mgmt_docker_image').split(':')[-1]
# controllerImage.repository -> self.params.get(
# 'k8s_scylla_operator_docker_image').split('/')[0]
# controllerImage.tag -> self.params.get(
# 'k8s_scylla_operator_docker_image').split(':')[-1]
- set_options = []
+
+ if pool_name is None:
+ pool_name = self.AUXILIARY_POOL_NAME
+ LOGGER.info("Deploy scylla-manager")
+
+ helm_affinity = get_helm_pool_affinity_values(self.POOL_LABEL_NAME, pool_name) if pool_name else {}
+ values = HelmValues(**helm_affinity)

mgmt_docker_image_tag = self.params.get('mgmt_docker_image').split(':')[-1]
if mgmt_docker_image_tag:
- set_options.append(f"image.tag={mgmt_docker_image_tag}")
+ values.set('image.tag', mgmt_docker_image_tag)

scylla_operator_repo_base = self.params.get(
'k8s_scylla_operator_docker_image').split('/')[0]
if scylla_operator_repo_base:
- set_options.append(f"controllerImage.repository={scylla_operator_repo_base}")
+ values.set('controllerImage.repository', scylla_operator_repo_base)

scylla_operator_image_tag = self.params.get(
'k8s_scylla_operator_docker_image').split(':')[-1]
if scylla_operator_image_tag:
- set_options.append(f"controllerImage.tag={scylla_operator_image_tag}")
+ values.set('controllerImage.tag', scylla_operator_image_tag)

# Install and wait for initialization of the Scylla Manager chart
LOGGER.info("Deploy scylla-manager")
+ self.kubectl(f'create namespace {SCYLLA_MANAGER_NAMESPACE}')
LOGGER.debug(self.helm_install(
target_chart_name="scylla-manager",
source_chart_name="scylla-operator/scylla-manager",
version=self._scylla_operator_chart_version,
use_devel=True,
- set_options=",".join(set_options),
+ values=values,
namespace=SCYLLA_MANAGER_NAMESPACE,
))
+
time.sleep(10)
+
self.kubectl("wait --timeout=10m --all --for=condition=Ready pod",
namespace=SCYLLA_MANAGER_NAMESPACE)
-
- # Start the Scylla Manager logging thread
self.start_scylla_manager_journal_thread()

def check_if_cert_manager_fully_functional(self) -> bool:
@@ -260,22 +458,26 @@ def start_scylla_cluster_events_thread(self) -> None:
self._scylla_cluster_events_thread.start()

@log_run_info
- def deploy_scylla_operator(self) -> None:
+ def deploy_scylla_operator(self, pool_name: str = None) -> None:
+ if pool_name is None:
+ pool_name = self.AUXILIARY_POOL_NAME
+
+ values = HelmValues(**get_helm_pool_affinity_values(self.POOL_LABEL_NAME, pool_name) if pool_name else {})
+
# Calculate options values which must be set
#
# image.repository -> self.params.get('k8s_scylla_operator_docker_image').split('/')[0]
# image.tag -> self.params.get('k8s_scylla_operator_docker_image').split(':')[-1]
- set_options = []

scylla_operator_repo_base = self.params.get(
'k8s_scylla_operator_docker_image').split('/')[0]
if scylla_operator_repo_base:
- set_options.append(f"controllerImage.repository={scylla_operator_repo_base}")
+ values.set('controllerImage.repository', scylla_operator_repo_base)

scylla_operator_image_tag = self.params.get(
'k8s_scylla_operator_docker_image').split(':')[-1]
if scylla_operator_image_tag:
- set_options.append(f"controllerImage.tag={scylla_operator_image_tag}")
+ values.set('controllerImage.tag', scylla_operator_image_tag)

# Install and wait for initialization of the Scylla Operator chart
LOGGER.info("Deploy Scylla Operator")
@@ -285,15 +487,17 @@ def deploy_scylla_operator(self) -> None:
source_chart_name="scylla-operator/scylla-operator",
version=self._scylla_operator_chart_version,
use_devel=True,
- set_options=",".join(set_options),
namespace=SCYLLA_OPERATOR_NAMESPACE,
+ values=values
))
- time.sleep(10)
- self.kubectl("wait --timeout=5m --all --for=condition=Ready pod",
- namespace=SCYLLA_OPERATOR_NAMESPACE)

- # Start the Scylla Operator logging thread
- self.start_scylla_operator_journal_thread()
+ time.sleep(10)
+ KubernetesOps.wait_for_pods_readiness(
+ kluster=self,
+ total_pods=lambda pods: pods > 0,
+ readiness_timeout=5*60,
+ namespace=SCYLLA_OPERATOR_NAMESPACE
+ )

@log_run_info
def deploy_minio_s3_backend(self, minio_bucket_name):
@@ -306,14 +510,107 @@ def deploy_minio_s3_backend(self, minio_bucket_name):
'defaultBucket.policy=public --generate-name minio/minio',
namespace='minio')

- minio_ip_address = wait_for(
- lambda: self.minio_ip_address, text='Waiting for minio pod to popup', timeout=120, throw_exc=True)
+ wait_for(lambda: self.minio_ip_address, text='Waiting for minio pod to popup', timeout=120, throw_exc=True)

self.kubectl("wait --timeout=10m --all --for=condition=Ready pod", namespace="minio")

+ def get_scylla_cluster_helm_values(self, cpu_limit, memory_limit, pool_name: str = None) -> HelmValues:
+ return HelmValues({
+ 'nameOverride': '',
+ 'fullnameOverride': self.params.get('k8s_scylla_cluster_name'),
+ 'scyllaImage': {
+ 'repository': self.params.get('docker_image'),
+ 'tag': self.params.get('scylla_version')
+ },
+ 'agentImage': {
+ 'repository': 'scylladb/scylla-manager-agent',
+ 'tag': self.params.get('scylla_mgmt_agent_version')
+ },
+ 'serviceAccount': {
+ 'create': True,
+ 'annotations': {},
+ 'name': f"{self.params.get('k8s_scylla_cluster_name')}-member"
+ },
+ 'alternator': {
+ 'enabled': False,
+ 'port': 8000,
+ 'writeIsolation': 'always'
+ },
+ 'developerMode': False,
+ 'cpuset': True,
+ 'hostNetworking': True,
+ 'automaticOrphanedNodeCleanup': True,
+ 'sysctls': ["fs.aio-max-nr=2097152"],
+ 'serviceMonitor': {
+ 'create': False
+ },
+ 'datacenter': self.params.get('k8s_scylla_datacenter'),
+ 'racks': [
+ {
+ 'name': self.params.get('k8s_scylla_rack'),
+ 'scyllaConfig': 'scylla-confing',
+ 'scyllaAgentConfig': 'scylla-agent-config',
+ 'members': 0,
+ 'storage': {
+ 'storageClassName': 'local-raid-disks',
+ 'capacity': f"{self.params.get('k8s_scylla_disk_gi')}Gi"
+ },
+ 'resources': {
+ 'limits': {
+ 'cpu': cpu_limit,
+ 'memory': memory_limit
+ },
+ 'requests': {
+ 'cpu': cpu_limit,
+ 'memory': memory_limit
+ },
+ },
+ 'placement': add_pool_node_affinity({}, self.POOL_LABEL_NAME, pool_name) if pool_name else {}
+ }
+ ]
+ })
+
+ def wait_till_cluster_is_operational(self):
+ if self.api_call_rate_limiter:
+ with self.api_call_rate_limiter.pause:
+ self.api_call_rate_limiter.wait_till_api_become_not_operational(self)
+ self.api_call_rate_limiter.wait_till_api_become_stable(self)
+ self.wait_all_node_pools_to_be_ready()
+
@log_run_info
- def deploy_scylla_cluster(self, config: str, target_mgmt_agent_to_minio: bool = False) -> None:
+ def deploy_scylla_cluster(self, target_mgmt_agent_to_minio: bool = False,
+ node_pool: CloudK8sNodePool = None, node_prepare_config: str = None) -> None:
LOGGER.info("Create and initialize a Scylla cluster")
+ self.kubectl(f"create namespace {SCYLLA_NAMESPACE}")
+ affinity_modifiers = []
+
+ if node_pool:
+ affinity_modifiers.extend(node_pool.affinity_modifiers)
+ if node_prepare_config:
+ LOGGER.info("Install DaemonSets required by scylla nodes")
+ self.apply_file(node_prepare_config, modifiers=affinity_modifiers, envsubst=False)
+
+ LOGGER.info("Install local volume provisioner")
+ self.helm(f"install local-provisioner {LOCAL_PROVISIONER_DIR}", values=node_pool.helm_affinity_values)
+
+ self.deploy_node_pool(node_pool, wait_till_ready=False)
+ time.sleep(10)
+
+ self.wait_till_cluster_is_operational()
+
+ # Calculate cpu and memory limits to occupy all available amounts by scylla pods
+ cpu_limit, memory_limit = node_pool.cpu_and_memory_capacity
+ # TBD: Remove reduction logic after https://github.com/scylladb/scylla-operator/issues/384 is fixed
+ cpu_limit = int(cpu_limit - OPERATOR_CONTAINERS_RESOURCES['cpu'] - SIDECAR_CONTAINERS_RESOURCES['cpu'])
+ memory_limit = int(
+ memory_limit - OPERATOR_CONTAINERS_RESOURCES['memory'] - SIDECAR_CONTAINERS_RESOURCES['memory'])
+ else:
+ cpu_limit = 1
+ memory_limit = 2
+
+ cpu_limit = int(cpu_limit)
+ memory_limit = convert_memory_units_to_k8s_value(memory_limit)
+
if target_mgmt_agent_to_minio:
# Create kubernetes secret that holds scylla manager agent configuration
self.update_secret_from_data('scylla-agent-config', 'scylla', {
@@ -333,63 +630,102 @@ def deploy_scylla_cluster(self, config: str, target_mgmt_agent_to_minio: bool =
source_chart_name="scylla-operator/scylla",
version=self._scylla_operator_chart_version,
use_devel=True,
- set_options="",
- values_file_path=config,
+ values=self.get_scylla_cluster_helm_values(
+ cpu_limit=cpu_limit,
+ memory_limit=memory_limit,
+ pool_name=node_pool.name if node_pool else None),
namespace=SCYLLA_NAMESPACE,
))

+ self.wait_till_cluster_is_operational()
+
LOGGER.debug("Check Scylla cluster")
self.kubectl("get scyllaclusters.scylla.scylladb.com", namespace=SCYLLA_NAMESPACE)
- self.kubectl("get pods", namespace=SCYLLA_NAMESPACE)
-
LOGGER.debug("Wait for %d secs before we start to apply changes to the cluster", DEPLOY_SCYLLA_CLUSTER_DELAY)
time.sleep(DEPLOY_SCYLLA_CLUSTER_DELAY)
self.start_scylla_cluster_events_thread()

@log_run_info
- def deploy_loaders_cluster(self, config: str) -> None:
+ def deploy_loaders_cluster(self, config: str, node_pool: CloudK8sNodePool = None) -> None:
LOGGER.info("Create and initialize a loaders cluster")
- self.apply_file(config)
+ if node_pool:
+ self.deploy_node_pool(node_pool)
+ cpu_limit, memory_limit = node_pool.cpu_and_memory_capacity
+ cpu_limit, memory_limit = cpu_limit - 1, memory_limit - 1
+ affinity_modifiers = node_pool.affinity_modifiers
+ else:
+ cpu_limit = 2
+ memory_limit = 4
+ affinity_modifiers = []
+
+ cpu_limit = convert_cpu_units_to_k8s_value(cpu_limit)
+ memory_limit = convert_memory_units_to_k8s_value(memory_limit)

+ self.apply_file(config, environ={'CPU_LIMIT': cpu_limit, 'MEMORY_LIMIT': memory_limit},
+ modifiers=affinity_modifiers)
LOGGER.debug("Check the loaders cluster")
self.kubectl("get statefulset", namespace="sct-loaders")
self.kubectl("get pods", namespace="sct-loaders")

@log_run_info
def deploy_monitoring_cluster(
- self, scylla_operator_tag: str, namespace: str = "monitoring", is_manager_deployed: bool = False) -> None:
+ self, scylla_operator_tag: str, namespace: str = "monitoring", is_manager_deployed: bool = False,
+ node_pool: CloudK8sNodePool = None) -> None:
"""
This procedure comes from scylla-operator repo:
https://github.com/scylladb/scylla-operator/blob/master/docs/source/generic.md#setting-up-monitoring

If it fails please consider reporting and fixing issue in scylla-operator repo too
"""
LOGGER.info("Create and initialize a monitoring cluster")
+ if node_pool:
+ self.deploy_node_pool(node_pool)
+ helm_values = node_pool.helm_affinity_values
+ cpu_limit, memory_limit = node_pool.cpu_and_memory_capacity
+ cpu_limit, memory_limit = cpu_limit - 1, memory_limit - 1
+ else:
+ helm_values = HelmValues()
+ cpu_limit = 2
+ memory_limit = 4
+
+ cpu_limit = convert_cpu_units_to_k8s_value(cpu_limit)
+ memory_limit = convert_memory_units_to_k8s_value(memory_limit)
+
+ helm_values.set('server.resources.limits.cpu', cpu_limit)
+ helm_values.set('server.resources.limits.memory', memory_limit)
+ helm_values.set('nodeExporter.enabled', False)
+
if scylla_operator_tag == 'nightly':
scylla_operator_tag = 'master'
elif scylla_operator_tag == '':
scylla_operator_tag = get_git_tag_from_helm_chart_version(
self._scylla_operator_chart_version)
+
with TemporaryDirectory() as tmp_dir_name:
scylla_operator_dir = os.path.join(tmp_dir_name, 'scylla-operator')
scylla_monitoring_dir = os.path.join(tmp_dir_name, 'scylla-monitoring')
+ LOGGER.info("Download scylla-operator sources")
download_from_github(
repo='scylladb/scylla-operator',
tag=scylla_operator_tag,
dst_dir=scylla_operator_dir)
+ LOGGER.info("Download scylla-monitoring sources")
download_from_github(
repo='scylladb/scylla-monitoring',
tag='scylla-monitoring-3.6.0',
dst_dir=scylla_monitoring_dir)
- self.kubectl(f'create namespace {namespace}')
+ LOGGER.info("Install prometheus-community/kube-prometheus-stack helm chart")
+ self.kubectl(f'create namespace {namespace}', ignore_status=True)
self.helm('repo add prometheus-community https://prometheus-community.github.io/helm-charts')
self.helm('repo update')
self.helm(
- f'install monitoring prometheus-community/kube-prometheus-stack '
- f'--values {os.path.join(scylla_operator_dir, "examples", "common", "monitoring", "values.yaml")} '
- '--set server.resources.limits.cpu=2 --set server.resources.limits.memory=4Gi'
- f'--create-namespace --namespace {namespace}')
+ f'install monitoring prometheus-community/kube-prometheus-stack --create-namespace '
+ f'-f {os.path.join(scylla_operator_dir, "examples", "common", "monitoring", "values.yaml")} ',
+ values=helm_values,
+ namespace=namespace
+ )

+ LOGGER.info("Install scylla-monitoring dashboards and monitoring services for scylla")
self.apply_file(os.path.join(scylla_operator_dir, "examples", "common", "monitoring",
"scylla-service-monitor.yaml"))
self.kubectl(
@@ -400,6 +736,7 @@ def deploy_monitoring_cluster(
namespace=namespace)

if is_manager_deployed:
+ LOGGER.info("Install monitoring services for scylla-manager")
self.apply_file(os.path.join(scylla_operator_dir, "examples", "common", "monitoring",
"scylla-manager-service-monitor.yaml"))
self.kubectl(
@@ -411,9 +748,21 @@ def deploy_monitoring_cluster(
"-p '{\"metadata\":{\"labels\":{\"grafana_dashboard\": \"1\"}}}'",
namespace=namespace)

+ LOGGER.info("Check the monitoring cluster")
time.sleep(10)
self.kubectl("wait --timeout=15m --all --for=condition=Ready pod", timeout=1000, namespace=namespace)
- LOGGER.debug("Check the monitoring cluster")
+ if self.USE_MONITORING_EXPOSE_SERVICE:
+ LOGGER.info("Expose ports for prometheus of the monitoring cluster")
+ self.k8s_monitoring_prometheus_expose_service = PortExposeService(
+ name='prometheus-expose-ports-service',
+ namespace='monitoring',
+ selector_key='operator.prometheus.io/name',
+ selector_value='monitoring-kube-prometheus-prometheus',
+ core_v1_api=self.k8s_core_v1_api,
+ resolver=self.resolve_dns_to_ip
+ )
+ self.k8s_monitoring_prometheus_expose_service.deploy()
+
self.kubectl("get statefulset", namespace=namespace)
self.kubectl("get pods", namespace=namespace)

@@ -455,6 +804,17 @@ def operator_pod_status(self):
pods = KubernetesOps.list_pods(self, namespace=SCYLLA_OPERATOR_NAMESPACE)
return pods[0].status if pods else None

+ @cached_property
+ def tags(self) -> Dict[str, str]:
+ return get_tags_from_params(self.params)
+
+ @cached_property
+ def logdir(self) -> str:
+ assert '_SCT_TEST_LOGDIR' in os.environ
+ logdir = os.path.join(os.environ['_SCT_TEST_LOGDIR'], self.name)
+ os.makedirs(logdir, exist_ok=True)
+ return logdir
+
@property
def k8s_core_v1_api(self) -> k8s.client.CoreV1Api:
return KubernetesOps.core_v1_api(self.api_client)
@@ -528,16 +888,113 @@ def create_secret_from_directory(self, secret_name: str, path: str, namespace: s
self.kubectl(cmd, namespace=namespace)

@property
- def monitoring_prometheus_pod(self):
+ def k8s_monitoring_prometheus_pod(self):
for pod in KubernetesOps.list_pods(self, namespace='monitoring'):
- for container in pod.spec.containers:
- if container.name == 'prometheus':
- return pod
+ if pod.metadata.labels.get('operator.prometheus.io/name', None) == 'monitoring-kube-prometheus-prometheus':
+ return pod
return None

+ @property
+ def k8s_monitoring_external_ip(self) -> str:
+ if self.USE_MONITORING_EXPOSE_SERVICE:
+ return self.k8s_monitoring_prometheus_expose_service.service_ip
+ else:
+ return self.k8s_monitoring_prometheus_pod.status.pod_ip
+
+ def patch_kubectl_config(self):
+ """
+ Patched kubectl config so that it will obtain cloud token from cache file
+ that is kept update by token update thread
+ """
+ self.create_kubectl_config()
+ self.start_token_update_thread()
+ KubernetesOps.patch_kube_config(self.kubectl_token_path)
+ wait_for(self.check_if_token_is_valid, timeout=120, throw_exc=True)
+
+ def check_if_token_is_valid(self) -> bool:
+ with open(self.kubectl_token_path, mode='r') as token_file:
+ return bool(json.load(token_file))
+
+ def start_token_update_thread(self):
+ if os.path.exists(self.kubectl_token_path):
+ os.unlink(self.kubectl_token_path)
+ self._token_update_thread = self.create_token_update_thread()
+ self._token_update_thread.start()
+ # Wait till GcloudTokenUpdateThread get tokens and dump them to gcloud_token_path
+ wait_for(os.path.exists, timeout=30, step=5, text="Wait for gcloud token", throw_exc=True,
+ path=self.kubectl_token_path)
+
+ def stop_token_update_thread(self):
+ if self._token_update_thread and self._token_update_thread.is_alive():
+ self._token_update_thread.stop()
+
+ def _add_pool(self, pool: CloudK8sNodePool) -> None:
+ if pool.name not in self.pools:
+ self.pools[pool.name] = pool
+
+ def wait_all_node_pools_to_be_ready(self):
+ for node_pool in self.pools.values():
+ node_pool.wait_for_nodes_readiness()
+
+ def resolve_dns_to_ip(self, hostname: str, timeout: int = None, step: int = 1) -> str:
+
+ def resolve_ip():
+ try:
+ return self._resolve_dns_to_ip(hostname)
+ except Exception as exc:
+ raise RuntimeError("Failed to resolve %s due to the %s", hostname, exc) from None
+
+ if not timeout:
+ return resolve_ip()
+
+ return wait_for(resolve_ip, timeout=timeout, step=step, throw_exc=True)
+
+ def _resolve_dns_to_ip(self, hostname: str) -> str:
+ if self.USE_POD_RESOLVER:
+ return self._resolve_dns_to_ip_via_resolver(hostname)
+ return self._resolve_dns_to_ip_directly(hostname)
+
+ def _resolve_dns_to_ip_directly(self, hostname: str) -> str:
+ ip_address = socket.gethostbyname(hostname)
+ if ip_address == '0.0.0.0':
+ raise RuntimeError('Failed to resolve')
+ return ip_address
+
+ def _resolve_dns_to_ip_via_resolver(self, hostname: str) -> str:
+ return self.dns_resolver.resolve(hostname)
+
+ @cached_property
+ def dns_resolver(self):
+ resolver = DnsPodResolver(
+ k8s_cluster=self,
+ pod_name=self.DNS_RESOLVER_POD_NAME,
+ pool=self.pools.get(self.AUXILIARY_POOL_NAME, None)
+ )
+ resolver.deploy()
+ return resolver
+
+ @abc.abstractmethod
+ def deploy(self):
+ pass
+
+ @abc.abstractmethod
+ def create_kubectl_config(self):
+ pass
+
+ @abc.abstractmethod
+ def create_token_update_thread(self) -> TokenUpdateThread:
+ pass
+
+ @abc.abstractmethod
+ def deploy_node_pool(self, pool: CloudK8sNodePool, wait_till_ready=True) -> None:
+ pass
+

class BasePodContainer(cluster.BaseNode):
parent_cluster: PodCluster
+ expose_ports_service: Optional[PortExposeService] = None
+ public_ip_via_service: bool = False
+
pod_readiness_delay = 30 # seconds
pod_readiness_timeout = 5 # minutes
pod_terminate_timeout = 5 # minutes
@@ -574,6 +1031,11 @@ def _init_remoter(self, ssh_login_info):
def _init_port_mapping(self):
pass

+ def init(self) -> None:
+ if self.public_ip_via_service:
+ self.expose_ports()
+ super().init()
+
@property
def system_log(self):
return os.path.join(self.logdir, "system.log")
@@ -610,15 +1072,13 @@ def _pod_status(self):
return None

@property
- def _cluster_ip_service(self):
- services = KubernetesOps.list_services(self.parent_cluster, namespace=self.parent_cluster.namespace,
- field_selector=f"metadata.name={self.name}")
- return services[0] if services else None
+ def _node(self):
+ return KubernetesOps.get_node(self.parent_cluster, self.node_name)

@property
- def _loadbalancer_service(self):
+ def _cluster_ip_service(self):
services = KubernetesOps.list_services(self.parent_cluster, namespace=self.parent_cluster.namespace,
- field_selector=f"metadata.name={self.name}-loadbalancer")
+ field_selector=f"metadata.name={self.name}")
return services[0] if services else None

@property
@@ -638,15 +1098,95 @@ def _refresh_instance_state(self):
public_ips = []
private_ips = []

+ if self.expose_ports_service and self.expose_ports_service.is_deployed:
+ public_ips.append(self.expose_ports_service.service_ip)
+
if cluster_ip_service := self._cluster_ip_service:
- cluster_ip = cluster_ip_service.spec.cluster_ip
- private_ips.append(cluster_ip)
+ private_ips.append(cluster_ip_service.spec.cluster_ip)

if pod_status := self._pod_status:
public_ips.append(pod_status.host_ip)
private_ips.append(pod_status.pod_ip)
return (public_ips or [None, ], private_ips or [None, ])

+ @property
+ def k8s_pod_uid(self) -> str:
+ try:
+ return str(self._pod.metadata.uid)
+ except Exception:
+ return ''
+
+ @property
+ def k8s_pod_name(self) -> str:
+ return str(self._pod.metadata.name)
+
+ def expose_ports(self):
+ self.expose_ports_service = PortExposeService(
+ name=f'{self.k8s_pod_name}-lbc',
+ namespace='scylla',
+ selector_value=self.k8s_pod_name,
+ core_v1_api=self.parent_cluster.k8s_cluster.k8s_core_v1_api,
+ resolver=self.parent_cluster.k8s_cluster.resolve_dns_to_ip)
+ self.expose_ports_service.deploy()
+
+ def wait_till_k8s_pod_get_uid(self, timeout: int = None, ignore_uid=None) -> str:
+ """
+ Wait till pod get any valid uid.
+ If ignore_uid is provided it wait till any valid uid different from ignore_uid
+ """
+ if timeout is None:
+ timeout = self.pod_replace_timeout
+ wait_for(lambda: self.k8s_pod_uid and self.k8s_pod_uid != ignore_uid, timeout=timeout,
+ text=f"Wait till host {self} get uid")
+ return self.k8s_pod_uid
+
+ def wait_for_k8s_node_readiness(self):
+ wait_for(self._wait_for_k8s_node_readiness,
+ text=f"Wait for k8s host {self.node_name} to be ready...",
+ timeout=self.pod_readiness_timeout * 60,
+ throw_exc=True)
+
+ def _wait_for_k8s_node_readiness(self):
+ if self.node_name is None:
+ raise RuntimeError(f"Can't find node for pod {self.name}")
+ result = self.parent_cluster.k8s_cluster.kubectl(
+ f"wait node --timeout={self.pod_readiness_timeout // 3}m --for=condition=Ready {self.node_name}",
+ namespace=self.parent_cluster.namespace,
+ timeout=self.pod_readiness_timeout // 3 * 60 + 10
+ )
+ if result.stdout.count('condition met') != 1:
+ raise RuntimeError('Node is not reported as ready')
+ return True
+
+ def wait_for_pod_to_appear(self):
+ wait_for(self._wait_for_pod_to_appear,
+ text="Wait for pod to appear...",
+ timeout=self.pod_readiness_timeout * 60,
+ throw_exc=True)
+
+ def _wait_for_pod_to_appear(self):
+ if self._pod is None:
+ raise RuntimeError('Pod is not there')
+ return True
+
+ def wait_for_pod_readiness(self):
+ time.sleep(self.pod_readiness_delay)
+
+ # To make it more informative in worst case scenario it repeat waiting text 5 times
+ wait_for(self._wait_for_pod_readiness,
+ text=f"Wait for {self.name} pod to be ready...",
+ timeout=self.pod_readiness_timeout * 60,
+ throw_exc=True)
+
+ def _wait_for_pod_readiness(self):
+ result = self.parent_cluster.k8s_cluster.kubectl(
+ f"wait --timeout={self.pod_readiness_timeout // 3}m --for=condition=Ready pod {self.name}",
+ namespace=self.parent_cluster.namespace,
+ timeout=self.pod_readiness_timeout // 3 * 60 + 10)
+ if result.stdout.count('condition met') != 1:
+ raise RuntimeError('Pod is not ready')
+ return True
+
@property
def image(self) -> str:
return self._container_status.image
@@ -825,24 +1365,6 @@ def is_seed(self) -> bool:
def is_seed(self, value):
pass

- @property
- def k8s_pod_uid(self) -> str:
- try:
- return str(self._pod.metadata.uid)
- except Exception:
- return ''
-
- def wait_till_k8s_pod_get_uid(self, timeout: int = None, ignore_uid=None) -> str:
- """
- Wait till pod get any valid uid.
- If ignore_uid is provided it wait till any valid uid different from ignore_uid
- """
- if timeout is None:
- timeout = self.pod_replace_timeout
- wait_for(lambda: self.k8s_pod_uid and self.k8s_pod_uid != ignore_uid, timeout=timeout,
- text=f"Wait till host {self} get uid")
- return self.k8s_pod_uid
-
def mark_to_be_replaced(self, overwrite: bool = False):
if self.is_seed:
raise RuntimeError("Scylla-operator does not support seed nodes replacement")
@@ -863,53 +1385,6 @@ def _wait_for_svc(self):
f"get svc {self.name}", namespace=self.parent_cluster.namespace, verbose=False)
return True

- def wait_for_k8s_node_readiness(self):
- wait_for(self._wait_for_k8s_node_readiness,
- text=f"Wait for k8s host {self.node_name} to be ready...",
- timeout=self.pod_readiness_timeout * 60,
- throw_exc=True)
-
- def _wait_for_k8s_node_readiness(self):
- if self.node_name is None:
- raise RuntimeError(f"Can't find node for pod {self.name}")
- result = self.parent_cluster.k8s_cluster.kubectl(
- f"wait node --timeout={self.pod_readiness_timeout // 3}m --for=condition=Ready {self.node_name}",
- namespace=self.parent_cluster.namespace,
- timeout=self.pod_readiness_timeout // 3 * 60 + 10
- )
- if result.stdout.count('condition met') != 1:
- raise RuntimeError('Node is not reported as ready')
- return True
-
- def wait_for_pod_to_appear(self):
- wait_for(self._wait_for_pod_to_appear,
- text="Wait for pod(s) to apear...",
- timeout=self.pod_readiness_timeout * 60,
- throw_exc=True)
-
- def _wait_for_pod_to_appear(self):
- if self._pod is None:
- raise RuntimeError('Pod is not reported as ready')
- return True
-
- def wait_for_pod_readiness(self):
- time.sleep(self.pod_readiness_delay)
-
- # To make it more informative in worst case scenario it repeat waiting text 5 times
- wait_for(self._wait_for_pod_readiness,
- text="Wait for pod(s) to be ready...",
- timeout=self.pod_readiness_timeout * 60,
- throw_exc=True)
-
- def _wait_for_pod_readiness(self):
- result = self.parent_cluster.k8s_cluster.kubectl(
- f"wait --timeout={self.pod_readiness_timeout // 3}m --for=condition=Ready pod {self.name}",
- namespace=self.parent_cluster.namespace,
- timeout=self.pod_readiness_timeout // 3 * 60 + 10)
- if result.stdout.count('condition met') != 1:
- raise RuntimeError('Pod is not reported as ready')
- return True
-
def refresh_ip_address(self):
# Invalidate ip address cache
old_ip_info = (self.public_ip_address, self.private_ip_address)
@@ -933,10 +1408,13 @@ def __init__(self,
node_prefix: str = "node",
node_type: Optional[str] = None,
n_nodes: Union[list, int] = 3,
- params: Optional[dict] = None) -> None:
+ params: Optional[dict] = None,
+ node_pool: Optional[dict] = None,
+ ) -> None:
self.k8s_cluster = k8s_cluster
self.namespace = namespace
self.container = container
+ self.node_pool = node_pool

super().__init__(cluster_uuid=cluster_uuid,
cluster_prefix=cluster_prefix,
@@ -957,6 +1435,10 @@ def k8s_apps_v1_api(self):
def k8s_core_v1_api(self):
return self.k8s_cluster.k8s_core_v1_api

+ @cached_property
+ def pool_name(self):
+ return self.node_pool.get('name', None)
+
def _create_node(self, node_index: int, pod_name: str, dc_idx: int, rack: int) -> BasePodContainer:
node = self.PodContainerClass(parent_cluster=self,
name=pod_name,
@@ -974,7 +1456,11 @@ def add_nodes(self,
dc_idx: int = 0,
rack: int = 0,
enable_auto_bootstrap: bool = False) -> List[BasePodContainer]:
- # PodCluster only register new nodes and return whatever was registered
+
+ # Wait while whole cluster (on all racks) including new nodes are up and running
+ self.wait_for_pods_readiness(pods_to_wait=count, total_pods=len(self.nodes) + count)
+
+ # Register new nodes and return whatever was registered
k8s_pods = KubernetesOps.list_pods(self, namespace=self.namespace)
nodes = []
for pod in k8s_pods:
@@ -1019,34 +1505,36 @@ def get_nodes_readiness_delay(self) -> Union[float, int]:

def wait_for_pods_readiness(self, pods_to_wait: int, total_pods: int):
time.sleep(self.get_nodes_readiness_delay)
- readiness_timeout = self.get_nodes_reboot_timeout(pods_to_wait)
-
- @timeout(message="Wait for pod(s) to be ready...", timeout=readiness_timeout * 60)
- def wait_cluster_is_ready():
- # To make it more informative in worst case scenario made it repeat 5 times, by readiness_timeout // 5
- result = self.k8s_cluster.kubectl(
- f"wait --timeout={readiness_timeout // 5}m --all --for=condition=Ready pod",
- namespace=self.namespace,
- timeout=readiness_timeout // 5 * 60 + 10)
- if result.stdout.count('condition met') != total_pods:
- raise RuntimeError('Not all nodes reported')
+ KubernetesOps.wait_for_pods_readiness(
+ kluster=self.k8s_cluster,
+ total_pods=total_pods,
+ readiness_timeout=self.get_nodes_reboot_timeout(pods_to_wait),
+ namespace=self.namespace
+ )

- wait_cluster_is_ready()
+ def expose_ports(self, nodes=None):
+ if nodes is None:
+ nodes = self.nodes
+ for node in nodes:
+ node.expose_ports()


class ScyllaPodCluster(cluster.BaseScyllaCluster, PodCluster):
+ NODE_PREPARE_FILE = None
node_setup_requires_scylla_restart = False

def __init__(self,
k8s_cluster: KubernetesCluster,
- scylla_cluster_config: str,
scylla_cluster_name: Optional[str] = None,
user_prefix: Optional[str] = None,
n_nodes: Union[list, int] = 3,
- params: Optional[dict] = None) -> None:
+ params: Optional[dict] = None,
+ node_pool: CloudK8sNodePool = None,
+ ) -> None:
k8s_cluster.deploy_scylla_cluster(
- scylla_cluster_config,
- target_mgmt_agent_to_minio=bool(params.get('use_mgmt'))
+ target_mgmt_agent_to_minio=bool(params.get('use_mgmt')),
+ node_pool=node_pool,
+ node_prepare_config=self.NODE_PREPARE_FILE
)
self.scylla_yaml_lock = RLock()
self.scylla_yaml = {}
@@ -1059,7 +1547,8 @@ def __init__(self,
node_prefix=cluster.prepend_user_prefix(user_prefix, 'db-node'),
node_type="scylla-db",
n_nodes=n_nodes,
- params=params)
+ params=params,
+ node_pool=node_pool)

get_scylla_args = cluster_docker.ScyllaDockerCluster.get_scylla_args

@@ -1212,8 +1701,6 @@ def add_nodes(self,
current_members = self.scylla_cluster_spec.datacenter.racks[rack].members
self.replace_scylla_cluster_value(f"/spec/datacenter/racks/{rack}/members", current_members + count)

- # Wait while whole cluster (on all racks) including new nodes are up and running
- self.wait_for_pods_readiness(pods_to_wait=count, total_pods=len(self.nodes) + count)
return super().add_nodes(count=count,
ec2_user_data=ec2_user_data,
dc_idx=dc_idx,
@@ -1314,9 +1801,19 @@ def add_sidecar_injection(self) -> bool:
statefulset.spec.template.spec.containers.insert(
0,
V1Container(
- command=['/bin/sh', '-c', 'while true; do sleep 1 ; done'],
+ command=['/bin/sh', '-c', 'while true; do sleep 900 ; done'],
image='busybox:1.32.0',
- name='injected-busybox-sidecar'
+ name='injected-busybox-sidecar',
+ resources=V1ResourceRequirements(
+ limits={
+ 'cpu': '100m',
+ 'memory': '100Mi'
+ },
+ requests={
+ 'cpu': '100m',
+ 'memory': '100Mi'
+ }
+ )
)
)

@@ -1347,12 +1844,13 @@ def _check_kubernetes_monitoring_health(self, max_diff=0.1):

kubernetes_prometheus_host = None
try:
- kubernetes_prometheus_host = self.k8s_cluster.monitoring_prometheus_pod.status.pod_ip
+ kubernetes_prometheus_host = self.k8s_cluster.k8s_monitoring_external_ip
kubernetes_prometheus = PrometheusDBStats(host=kubernetes_prometheus_host)
except Exception as exc:
ClusterHealthValidatorEvent.MonitoringStatus(
- message=f'Failed to connect to kubernetes prometheus server at {kubernetes_prometheus_host},'
- f' due to the: {exc}').publish()
+ error=f'Failed to connect to kubernetes prometheus server at {kubernetes_prometheus_host},'
+ f' due to the: \n'
+ ''.join(traceback.format_exception(type(exc), exc, exc.__traceback__))).publish()

ClusterHealthValidatorEvent.Done(message="Kubernetes monitoring health check finished").publish()
return
@@ -1363,8 +1861,8 @@ def _check_kubernetes_monitoring_health(self, max_diff=0.1):
monitoring_prometheus = PrometheusDBStats(host=monitoring_prometheus_host)
except Exception as exc:
ClusterHealthValidatorEvent.MonitoringStatus(
- message=f'Failed to connect to monitoring prometheus server at {monitoring_prometheus_host},'
- f' due to the: {exc}').publish()
+ error=f'Failed to connect to monitoring prometheus server at {monitoring_prometheus_host},'
+ f' due to the: {exc}').publish()

ClusterHealthValidatorEvent.Done(message="Kubernetes monitoring health check finished").publish()
return
@@ -1384,7 +1882,7 @@ def get_stat(name, source, method_name, params):
return average(getattr(source, method_name), params)
except Exception as exc:
ClusterHealthValidatorEvent.MonitoringStatus(
- message=f'Failed to get data from {name}: {exc}').publish()
+ error=f'Failed to get data from {name}: {exc}').publish()
ClusterHealthValidatorEvent.Done(message="Kubernetes monitoring health check finished").publish()
return

@@ -1412,7 +1910,7 @@ def get_stat(name, source, method_name, params):
f'which is more than 1% different from kubernetes monitoring result {kubernetes_data}')

for error in errors:
- ClusterHealthValidatorEvent.MonitoringStatus(message=error).publish()
+ ClusterHealthValidatorEvent.MonitoringStatus(error=error).publish()

ClusterHealthValidatorEvent.Done(message="Kubernetes monitoring health check finished").publish()

@@ -1439,7 +1937,9 @@ def __init__(self,
loader_cluster_name: Optional[str] = None,
user_prefix: Optional[str] = None,
n_nodes: Union[list, int] = 3,
- params: Optional[dict] = None) -> None:
+ params: Optional[dict] = None,
+ node_pool: CloudK8sNodePool = None,
+ ) -> None:

self.loader_cluster_config = loader_cluster_config
self.loader_cluster_name = loader_cluster_name
@@ -1454,7 +1954,9 @@ def __init__(self,
node_prefix=cluster.prepend_user_prefix(user_prefix, "loader-node"),
node_type="loader",
n_nodes=n_nodes,
- params=params)
+ params=params,
+ node_pool=node_pool,
+ )

def node_setup(self,
node: BasePodContainer,
@@ -1477,7 +1979,7 @@ def add_nodes(self,
if self.loader_cluster_created:
raise NotImplementedError("Changing number of nodes in LoaderPodCluster is not supported.")

- self.k8s_cluster.deploy_loaders_cluster(self.loader_cluster_config)
+ self.k8s_cluster.deploy_loaders_cluster(self.loader_cluster_config, node_pool=self.node_pool)
self.wait_for_pods_readiness(pods_to_wait=count, total_pods=count)
new_nodes = super().add_nodes(count=count,
ec2_user_data=ec2_user_data,
@@ -1487,3 +1989,13 @@ def add_nodes(self,
self.loader_cluster_created = True

return new_nodes
+
+
+def get_tags_from_params(params: dict) -> Dict[str, str]:
+ behaviors = ['keep', 'keep-on-failure', 'destroy']
+ picked_behavior_idx = 2
+ for node_type in ['db', 'loader', 'monitor']:
+ post_behavior_idx = behaviors.index(params.get(f"post_behavior_{node_type}_nodes").lower())
+ picked_behavior_idx = min(post_behavior_idx, picked_behavior_idx)
+ picked_behavior = behaviors[picked_behavior_idx]
+ return {**Setup.common_tags(), "keep_action": "terminate" if picked_behavior == "destroy" else "", }
diff --git a/sdcm/cluster_k8s/eks.py b/sdcm/cluster_k8s/eks.py
--- a/sdcm/cluster_k8s/eks.py
+++ b/sdcm/cluster_k8s/eks.py
@@ -0,0 +1,455 @@
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+#
+# See LICENSE for more details.
+#
+# Copyright (c) 2020 ScyllaDB
+import base64
+import os
+import logging
+import time
+from textwrap import dedent
+from typing import List, Dict, Optional, Union, Literal
+from functools import cached_property, partial
+
+import boto3
+from mypy_boto3_ec2.type_defs import LaunchTemplateBlockDeviceMappingRequestTypeDef, \
+ LaunchTemplateEbsBlockDeviceRequestTypeDef, RequestLaunchTemplateDataTypeDef, \
+ LaunchTemplateTagSpecificationRequestTypeDef
+
+from sdcm import sct_abs_path, cluster
+from sdcm.cluster_aws import AWSCluster, MonitorSetAWS
+from sdcm.localhost import LocalHost
+from sdcm.utils.aws_utils import tags_as_ec2_tags, EksClusterCleanupMixin
+from sdcm.utils.k8s import TokenUpdateThread
+from sdcm.wait import wait_for
+from sdcm.cluster_k8s import KubernetesCluster, ScyllaPodCluster, BaseScyllaPodContainer, CloudK8sNodePool
+from sdcm.cluster_k8s.iptables import IptablesPodIpRedirectMixin, IptablesClusterOpsMixin
+from sdcm.remote import LOCALRUNNER
+
+
+LOGGER = logging.getLogger(__name__)
+
+
+class EksNodePool(CloudK8sNodePool):
+ k8s_cluster: 'EksCluster'
+ disk_type: Literal["standard", "io1", "io2", "gp2", "sc1", "st1"]
+
+ def __init__(
+ self,
+ k8s_cluster: 'EksCluster',
+ name: str,
+ num_nodes: int,
+ disk_size: int,
+ instance_type: str,
+ role_arn: str,
+ labels: dict = None,
+ tags: dict = None,
+ security_group_ids: List[str] = None,
+ ec2_subnet_ids: List[str] = None,
+ ssh_key_pair_name: str = None,
+ provision_type: Literal['ON_DEMAND', 'SPOT'] = 'ON_DEMAND',
+ launch_template: str = None,
+ image_type: Literal['AL2_x86_64', 'AL2_x86_64_GPU', 'AL2_ARM_64'] = 'AL2_x86_64',
+ disk_type: Literal["standard", "io1", "io2", "gp2", "sc1", "st1"] = None,
+ k8s_version: str = None,
+ is_deployed: bool = False,
+ user_data: str = None,
+ ):
+ super().__init__(
+ k8s_cluster=k8s_cluster,
+ name=name,
+ num_nodes=num_nodes,
+ disk_size=disk_size,
+ disk_type=disk_type,
+ image_type=image_type,
+ instance_type=instance_type,
+ labels=labels,
+ tags=tags,
+ is_deployed=is_deployed,
+ )
+ self.role_arn = role_arn
+ self.ec2_subnet_ids = self.k8s_cluster.ec2_subnet_ids if ec2_subnet_ids is None else ec2_subnet_ids
+ self.ssh_key_pair_name = os.path.basename(
+ self.k8s_cluster.credentials[0].key_pair_name) if ssh_key_pair_name is None else ssh_key_pair_name
+ self.security_group_ids = \
+ self.k8s_cluster.ec2_security_group_ids[0] if security_group_ids is None else security_group_ids
+ self.provision_type = provision_type
+ self.launch_template = launch_template
+ self.k8s_version = self.k8s_cluster.eks_cluster_version if k8s_version is None else k8s_version
+ self.user_data = user_data
+
+ @property
+ def launch_template_name(self) -> str:
+ return f'sct-{self.k8s_cluster.short_cluster_name}-{self.name}'
+
+ @property
+ def is_launch_template_required(self) -> bool:
+ return bool(self.user_data)
+
+ @property
+ def _launch_template_cfg(self) -> dict:
+ block_devices = []
+ if self.disk_size:
+ root_disk_def = LaunchTemplateBlockDeviceMappingRequestTypeDef(
+ DeviceName='/dev/xvda',
+ Ebs=LaunchTemplateEbsBlockDeviceRequestTypeDef(
+ DeleteOnTermination=True,
+ Encrypted=False,
+ VolumeSize=self.disk_size,
+ VolumeType=self.disk_type
+ ))
+ block_devices.append(root_disk_def)
+ launch_template = RequestLaunchTemplateDataTypeDef(
+ KeyName=self.ssh_key_pair_name,
+ EbsOptimized=False,
+ BlockDeviceMappings=block_devices,
+ NetworkInterfaces=[{
+ "DeviceIndex": 0,
+ "SubnetId": self.ec2_subnet_ids[0],
+ }],
+ )
+ if self.user_data:
+ launch_template['UserData'] = base64.b64encode(self.user_data.encode('utf-8')).decode("ascii")
+ if self.tags:
+ launch_template['TagSpecifications'] = [LaunchTemplateTagSpecificationRequestTypeDef(
+ ResourceType="instance",
+ Tags=tags_as_ec2_tags(self.tags)
+ )]
+ return launch_template
+
+ @property
+ def _node_group_cfg(self) -> dict:
+ labels = {} if self.labels is None else self.labels
+ tags = {} if self.tags is None else self.tags
+ node_labels = labels.copy()
+ node_labels['node-pool'] = self.name
+ node_pool_config = {
+ 'clusterName': self.k8s_cluster.short_cluster_name,
+ 'nodegroupName': self.name,
+ 'scalingConfig': {
+ 'minSize': self.num_nodes,
+ 'maxSize': self.num_nodes,
+ 'desiredSize': self.num_nodes
+ },
+ # subnets controls AZ placement, if you specify subnets from multiple AZ
+ # nodes will be spread across these AZ evenly, which can lead to failures and/or slowdowns
+ 'subnets': [self.ec2_subnet_ids[0]],
+ 'instanceTypes': [self.instance_type],
+ 'amiType': self.image_type,
+ 'nodeRole': self.role_arn,
+ 'labels': labels,
+ 'tags': tags,
+ 'capacityType': self.provision_type.upper(),
+ 'version': self.k8s_version
+ }
+ if self.is_launch_template_required:
+ node_pool_config['launchTemplate'] = {'name': self.launch_template_name}
+ else:
+ node_pool_config['diskSize'] = self.disk_size
+ node_pool_config['remoteAccess'] = {
+ 'ec2SshKey': self.ssh_key_pair_name,
+ }
+ return node_pool_config
+
+ def deploy(self) -> None:
+ LOGGER.info("Deploy %s node pool with %d node(s)", self.name, self.num_nodes)
+ if self.is_launch_template_required:
+ LOGGER.info("Deploy launch template %s", self.launch_template_name)
+ self.k8s_cluster.ec2_client.create_launch_template(
+ LaunchTemplateName=self.launch_template_name,
+ LaunchTemplateData=self._launch_template_cfg,
+ )
+ self.k8s_cluster.eks_client.create_nodegroup(**self._node_group_cfg)
+ self.is_deployed = True
+
+ def resize(self, num_nodes):
+ LOGGER.info("Resize %s pool to %d node(s)", self.name, num_nodes)
+ self.k8s_cluster.eks_client.update_nodegroup_config(
+ clusterName=self.k8s_cluster.short_cluster_name,
+ nodegroupName=self.name,
+ scalingConfig={
+ 'minSize': num_nodes,
+ 'maxSize': num_nodes,
+ 'desiredSize': num_nodes
+ },
+ )
+ self.num_nodes = num_nodes
+
+ def undeploy(self):
+ try:
+ self.k8s_cluster.eks_client.delete_nodegroup(
+ clusterName=self.k8s_cluster.short_cluster_name,
+ nodegroupName=self.name)
+ except Exception as exc:
+ LOGGER.debug("Failed to delete nodegroup %s/%s, due to the following error:\n%s",
+ self.k8s_cluster.short_cluster_name, self.name, exc)
+
+
+class EksTokenUpdateThread(TokenUpdateThread):
+ update_period = 300
+
+ def __init__(self, aws_cmd: str, kubectl_token_path: str):
+ self._aws_cmd = aws_cmd
+ super().__init__(kubectl_token_path=kubectl_token_path)
+
+ def get_token(self) -> str:
+ return LOCALRUNNER.run(self._aws_cmd).stdout
+
+
+class EksCluster(KubernetesCluster, EksClusterCleanupMixin):
+ POOL_LABEL_NAME = 'eks.amazonaws.com/nodegroup'
+ USE_MONITORING_EXPOSE_SERVICE = True
+ USE_POD_RESOLVER = True
+ pools: Dict[str, EksNodePool]
+ short_cluster_name: str
+
+ def __init__(self,
+ eks_cluster_version,
+ ec2_security_group_ids,
+ ec2_subnet_ids,
+ ec2_role_arn,
+ credentials,
+ user_prefix,
+ service_ipv4_cidr,
+ vpc_cni_version,
+ nodegroup_role_arn,
+ params=None,
+ cluster_uuid=None,
+ region_name=None
+ ):
+ super().__init__(user_prefix=user_prefix, cluster_uuid=cluster_uuid, region_name=region_name, params=params)
+ self.credentials = credentials
+ self.eks_cluster_version = eks_cluster_version
+ self.ec2_role_arn = ec2_role_arn
+ self.nodegroup_role_arn = nodegroup_role_arn
+ self.ec2_security_group_ids = ec2_security_group_ids
+ self.ec2_subnet_ids = ec2_subnet_ids
+ self.service_ipv4_cidr = service_ipv4_cidr
+ self.vpc_cni_version = vpc_cni_version
+
+ def create_eks_cluster(self, wait_till_functional=True):
+ self.eks_client.create_cluster(
+ name=self.short_cluster_name,
+ version=self.eks_cluster_version,
+ roleArn=self.ec2_role_arn,
+ resourcesVpcConfig={
+ 'securityGroupIds': self.ec2_security_group_ids[0],
+ 'subnetIds': self.ec2_subnet_ids,
+ 'endpointPublicAccess': True,
+ 'endpointPrivateAccess': True,
+ 'publicAccessCidrs': [
+ '0.0.0.0/0',
+ ]
+ },
+ kubernetesNetworkConfig={
+ 'serviceIpv4Cidr': self.service_ipv4_cidr
+ },
+ logging={
+ 'clusterLogging': [
+ {
+ 'types': [
+ 'api',
+ 'audit',
+ 'authenticator',
+ 'controllerManager',
+ 'scheduler'
+ ],
+ 'enabled': True
+ },
+ ]
+ },
+ tags=self.tags,
+ )
+ self.eks_client.create_addon(
+ clusterName=self.short_cluster_name,
+ addonName='vpc-cni',
+ addonVersion=self.vpc_cni_version
+ )
+ if wait_till_functional:
+ wait_for(lambda: self.cluster_status == 'ACTIVE', step=60, throw_exc=True, timeout=1200,
+ text=f'Waiting till EKS cluster {self.short_cluster_name} become operational')
+
+ @property
+ def cluster_info(self) -> dict:
+ return self.eks_client.describe_cluster(name=self.short_cluster_name)['cluster']
+
+ @property
+ def cluster_status(self) -> str:
+ return self.cluster_info['status']
+
+ def __str__(self):
+ return f"{type(self).__name__} {self.name} | Version: {self.eks_cluster_version}"
+
+ def create_token_update_thread(self):
+ return EksTokenUpdateThread(
+ aws_cmd=f'aws eks --region {self.region_name} get-token --cluster-name {self.short_cluster_name}',
+ kubectl_token_path=self.kubectl_token_path
+ )
+
+ def create_kubectl_config(self):
+ LOCALRUNNER.run(f'aws eks --region {self.region_name} update-kubeconfig --name {self.short_cluster_name}')
+
+ def deploy(self):
+ LOGGER.info("Create EKS cluster `%s'", self.short_cluster_name)
+ self.create_eks_cluster()
+ LOGGER.info("Patch kubectl config")
+ self.patch_kubectl_config()
+
+ def aws_cli(self, cmd) -> str:
+ return LOCALRUNNER.run(cmd).stdout
+
+ def deploy_node_pool(self, pool: EksNodePool, wait_till_ready=True) -> None:
+ self._add_pool(pool)
+ if pool.is_deployed:
+ return
+ if wait_till_ready:
+ pool.deploy_and_wait_till_ready()
+ else:
+ pool.deploy()
+
+ def resize_node_pool(self, name: str, num_nodes: int, wait_till_ready=True) -> None:
+ self.pools[name].resize(num_nodes)
+ if wait_till_ready:
+ self.pools[name].wait_for_nodes_readiness()
+
+ def destroy(self):
+ EksClusterCleanupMixin.destroy(self)
+ self.stop_token_update_thread()
+
+ def get_ec2_instance_by_id(self, instance_id):
+ return boto3.resource('ec2', region_name=self.region_name).Instance(id=instance_id)
+
+
+class EksScyllaPodContainer(BaseScyllaPodContainer, IptablesPodIpRedirectMixin):
+ parent_cluster: 'EksScyllaPodCluster'
+ public_ip_via_service = True
+
+ pod_readiness_delay = 30 # seconds
+ pod_readiness_timeout = 30 # minutes
+ pod_terminate_timeout = 30 # minutes
+
+ @cached_property
+ def hydra_dest_ip(self) -> str:
+ return self.public_ip_address
+
+ @cached_property
+ def nodes_dest_ip(self) -> str:
+ return self.public_ip_address
+
+ @property
+ def ec2_host(self):
+ return self.parent_cluster.k8s_cluster.get_ec2_instance_by_id(self.ec2_instance_id)
+
+ @property
+ def ec2_instance_id(self):
+ return self._node.spec.provider_id.split('/')[-1]
+
+ def terminate_k8s_host(self):
+ self.log.info('terminate_k8s_host: EC2 instance of kubernetes node will be terminated, '
+ 'the following is affected :\n' + dedent('''
+ EC2 instance X <-
+ K8s node X
+ Scylla Pod X
+ Scylla node X
+ '''))
+ self._instance_wait_safe(self.ec2_instance_destroy)
+ self.wait_for_k8s_node_readiness()
+
+ def ec2_instance_destroy(self):
+ if self.ec2_host:
+ self.ec2_host.terminate()
+
+ def _instance_wait_safe(self, instance_method, *args, **kwargs):
+ """
+ Wrapper around GCE instance methods that is safer to use.
+
+ Let's try a method, and if it fails, let's retry using an exponential
+ backoff algorithm, similar to what Amazon recommends for it's own
+ service [1].
+
+ :see: [1] http://docs.aws.amazon.com/general/latest/gr/api-retries.html
+ """
+ threshold = 300
+ ok = False
+ retries = 0
+ max_retries = 9
+ while not ok and retries <= max_retries:
+ try:
+ return instance_method(*args, **kwargs)
+ except Exception as details: # pylint: disable=broad-except
+ self.log.error('Call to method %s (retries: %s) failed: %s',
+ instance_method, retries, details)
+ time.sleep(min((2 ** retries) * 2, threshold))
+ retries += 1
+
+ if not ok:
+ raise cluster.NodeError('GCE instance %s method call error after '
+ 'exponential backoff wait' % self.ec2_host.id)
+
+ def terminate_k8s_node(self):
+ """
+ Delete kubernetes node, which will terminate scylla node that is running on it
+ """
+ self.log.info('terminate_k8s_node: kubernetes node will be deleted, the following is affected :\n' + dedent('''
+ EC2 instance X <-
+ K8s node X <-
+ Scylla Pod X
+ Scylla node X
+ '''))
+ super().terminate_k8s_node()
+
+ # EKS does not clean up instance automatically, in order to let pool to recover it self
+ # instance that holds node should be terminated
+
+ self.ec2_instance_destroy()
+ self.wait_for_k8s_node_readiness()
+ self.wait_for_pod_readiness()
+
+
+class EksScyllaPodCluster(ScyllaPodCluster, IptablesClusterOpsMixin):
+ NODE_PREPARE_FILE = sct_abs_path("sdcm/k8s_configs/eks/scylla-node-prepare.yaml")
+
+ k8s_cluster: 'EksCluster'
+ PodContainerClass = EksScyllaPodContainer
+ nodes: List[EksScyllaPodContainer]
+
+ def __init__(self,
+ k8s_cluster: KubernetesCluster,
+ scylla_cluster_name: Optional[str] = None,
+ user_prefix: Optional[str] = None,
+ n_nodes: Union[list, int] = 3,
+ params: Optional[dict] = None,
+ node_pool: EksNodePool = None,
+ ) -> None:
+ super().__init__(k8s_cluster=k8s_cluster, scylla_cluster_name=scylla_cluster_name, user_prefix=user_prefix,
+ n_nodes=n_nodes, params=params, node_pool=node_pool)
+
+ def add_nodes(self,
+ count: int,
+ ec2_user_data: str = "",
+ dc_idx: int = 0,
+ rack: int = 0,
+ enable_auto_bootstrap: bool = False) -> List[EksScyllaPodContainer]:
+ new_nodes = super().add_nodes(count=count,
+ ec2_user_data=ec2_user_data,
+ dc_idx=dc_idx,
+ rack=rack,
+ enable_auto_bootstrap=enable_auto_bootstrap)
+ self.add_hydra_iptables_rules(nodes=new_nodes)
+ self.update_nodes_iptables_redirect_rules(nodes=new_nodes, loaders=False)
+ return new_nodes
+
+
+class MonitorSetEKS(MonitorSetAWS):
+ # On EKS nodes you can't communicate to cluster nodes outside of it, so we have to enforce using public ip
+ DB_NODES_IP_ADDRESS = 'public_ip_address'
+
+ def install_scylla_manager(self, node, auth_token):
+ pass
diff --git a/sdcm/cluster_k8s/gke.py b/sdcm/cluster_k8s/gke.py
--- a/sdcm/cluster_k8s/gke.py
+++ b/sdcm/cluster_k8s/gke.py
@@ -11,7 +11,6 @@
#
# Copyright (c) 2020 ScyllaDB

-import os
import logging
import time
from textwrap import dedent
@@ -21,11 +20,10 @@
import yaml

from sdcm import sct_abs_path, cluster
-from sdcm.utils.k8s import ApiCallRateLimiter, K8S_CONFIGS_PATH_IN_CONTAINER
-from sdcm.utils.common import shorten_cluster_name
-from sdcm.utils.gce_utils import GcloudContextManager, GcloudTokenUpdateThread
-from sdcm.wait import wait_for
-from sdcm.cluster_k8s import KubernetesCluster, ScyllaPodCluster, BaseScyllaPodContainer
+from sdcm.utils.k8s import ApiCallRateLimiter, TokenUpdateThread
+from sdcm.utils.gce_utils import GcloudContextManager
+from sdcm.cluster_k8s import KubernetesCluster, ScyllaPodCluster, BaseScyllaPodContainer, CloudK8sNodePool
+
from sdcm.cluster_k8s.iptables import IptablesPodIpRedirectMixin, IptablesClusterOpsMixin
from sdcm.cluster_gce import MonitorSetGCE

@@ -34,45 +32,139 @@
GKE_API_CALL_QUEUE_SIZE = 1000 # ops
GKE_URLLIB_RETRY = 6 # How many times api request is retried before reporting failure

-SCYLLA_CLUSTER_CONFIG = f"{K8S_CONFIGS_PATH_IN_CONTAINER}/gke-cluster-chart-values.yaml"
LOADER_CLUSTER_CONFIG = sct_abs_path("sdcm/k8s_configs/gke-loaders.yaml")
CPU_POLICY_DAEMONSET = sct_abs_path("sdcm/k8s_configs/cpu-policy-daemonset.yaml")
RAID_DAEMONSET = sct_abs_path("sdcm/k8s_configs/raid-daemonset.yaml")
+LOGGER = logging.getLogger(__name__)

-SCYLLA_POD_POOL_NAME = 'default-pool'
-OPERATOR_POD_POOL_NAME = 'operator-pool'

-LOGGER = logging.getLogger(__name__)
+class GkeNodePool(CloudK8sNodePool):
+ k8s_cluster: 'GkeCluster'
+
+ def __init__(
+ self,
+ k8s_cluster: 'KubernetesCluster',
+ name: str,
+ num_nodes: int,
+ instance_type: str,
+ disk_size: int = None,
+ disk_type: str = None,
+ image_type: str = 'UBUNTU',
+ labels: dict = None,
+ tags: dict = None,
+ local_ssd_count: int = None,
+ gce_project: str = None,
+ gce_zone: str = None,
+ is_deployed: bool = False
+ ):
+ super().__init__(
+ k8s_cluster=k8s_cluster,
+ name=name,
+ num_nodes=num_nodes,
+ disk_size=disk_size,
+ disk_type=disk_type,
+ image_type=image_type,
+ instance_type=instance_type,
+ labels=labels,
+ tags=tags,
+ is_deployed=is_deployed,
+ )
+ self.local_ssd_count = local_ssd_count
+ self.gce_project = self.k8s_cluster.gce_project if gce_project is None else gce_project
+ self.gce_zone = self.k8s_cluster.gce_zone if gce_zone is None else gce_zone
+
+ @property
+ def _deploy_cmd(self) -> str:
+ cmd = [f"container --project {self.gce_project} node-pools create {self.name}",
+ f"--zone {self.gce_zone}",
+ f"--cluster {self.k8s_cluster.short_cluster_name}",
+ f"--num-nodes {self.num_nodes}",
+ f"--machine-type {self.instance_type}",
+ f"--image-type UBUNTU",
+ f"--no-enable-autoupgrade",
+ f"--no-enable-autorepair"
+ ]
+ if self.disk_type:
+ cmd.append(f"--disk-type {self.disk_type}")
+ if self.disk_size:
+ cmd.append(f"--disk-size {self.disk_size}")
+ if self.local_ssd_count:
+ cmd.append(f"--local-ssd-count {self.local_ssd_count}")
+ return ' '.join(cmd)
+
+ def deploy(self) -> None:
+ self.k8s_cluster.gcloud.run(self._deploy_cmd)
+ self.is_deployed = True
+
+ def resize(self, num_nodes: int):
+ self.k8s_cluster.gcloud.run(
+ f"container clusters resize {self.k8s_cluster.short_cluster_name} --project {self.gce_project} "
+ f"--zone {self.gce_zone} --node-pool {self.name} --num-nodes {num_nodes} --quiet")
+ self.num_nodes = int(num_nodes)
+ self.wait_for_nodes_readiness()
+
+ def undeploy(self):
+ pass
+
+ @property
+ def instance_group_name(self) -> str:
+ try:
+ group_link = yaml.load(
+ self.k8s_cluster.gcloud.run(
+ f'container node-pools describe {self.name} '
+ f'--zone {self.gce_zone} --project {self.gce_project} '
+ f'--cluster {self.k8s_cluster.short_cluster_name}')
+ ).get('instanceGroupUrls')[0]
+ return group_link.split('/')[-1]
+ except Exception as exc:
+ raise RuntimeError(f"Can't get instance group name due to the: {exc}")
+
+ def remove_instance(self, instance_name: str):
+ self.k8s_cluster.gcloud.run(f'compute instance-groups managed delete-instances {self.name} '
+ f'--zone={self.gce_zone} --instances={instance_name}')


-class GkeCluster(KubernetesCluster, cluster.BaseCluster):
+class GcloudTokenUpdateThread(TokenUpdateThread):
+ def __init__(self, gcloud, kubectl_token_path: str, token_min_duration: int = 60):
+ self._gcloud = gcloud
+ self._token_min_duration = token_min_duration
+ super().__init__(kubectl_token_path=kubectl_token_path)
+
+ def get_token(self) -> str:
+ return self._gcloud.run(
+ f'config config-helper --min-expiry={self._token_min_duration * 60} --format=json')
+
+
+class GkeCluster(KubernetesCluster):
+ AUXILIARY_POOL_NAME = 'default-pool' # This is default pool that is deployed with the cluster
+ POOL_LABEL_NAME = 'cloud.google.com/gke-nodepool'
+ pools: Dict[str, GkeNodePool]

def __init__(self,
gke_cluster_version,
gce_image_type,
gce_image_size,
gce_network,
services,
- credentials,
- gce_n_local_ssd=0,
- gce_instance_type="n1-highmem-8",
- n_nodes=3,
+ gce_instance_type='n1-standard-4',
user_prefix=None,
params=None,
- gce_datacenter=None):
- cluster_prefix = cluster.prepend_user_prefix(user_prefix, "k8s-gke")
- node_prefix = cluster.prepend_user_prefix(user_prefix, "node")
- self._pools_info: Dict[str: int] = {}
- self._gcloud_token_thread = None
+ gce_datacenter=None,
+ cluster_uuid=None,
+ n_nodes=1
+ ):
+ super().__init__(
+ params=params,
+ cluster_uuid=cluster_uuid,
+ user_prefix=user_prefix
+ )
self.gke_cluster_version = gke_cluster_version
self.gce_image_type = gce_image_type
self.gce_image_size = gce_image_size
self.gce_network = gce_network
self.gce_services = services
- self.credentials = credentials
self.gce_instance_type = gce_instance_type
- self.gce_n_local_ssd = int(gce_n_local_ssd) if gce_n_local_ssd else 0
-
+ self.n_nodes = n_nodes
self.gce_project = services[0].project
self.gce_user = services[0].key
self.gce_zone = gce_datacenter[0]
@@ -85,124 +177,76 @@ def __init__(self,
)
self.api_call_rate_limiter.start()

- super().__init__(cluster_prefix=cluster_prefix,
- node_prefix=node_prefix,
- n_nodes=n_nodes,
- params=params,
- region_names=gce_datacenter,
- node_type="scylla-db")
-
- @cached_property
- def gke_cluster_name(self):
- return shorten_cluster_name(self.name, 40)
-
def __str__(self):
return f"{type(self).__name__} {self.name} | Zone: {self.gce_zone} | Version: {self.gke_cluster_version}"

- def add_nodes(self, count, ec2_user_data='', dc_idx=0, rack=0, enable_auto_bootstrap=False):
- if not self.gke_cluster_created:
- self.setup_gke_cluster(num_nodes=count)
- self.gke_cluster_created = True
- else:
- raise NotImplementedError
-
- @property
- def gcloud(self) -> GcloudContextManager:
- return cluster.Setup.tester_obj().localhost.gcloud
-
- def setup_gke_cluster(self, num_nodes: int) -> None:
- LOGGER.info("Create GKE cluster `%s' with %d node(s) in %s and 1 node in %s",
- self.gke_cluster_name, num_nodes, SCYLLA_POD_POOL_NAME, OPERATOR_POD_POOL_NAME)
+ def deploy(self):
+ LOGGER.info("Create GKE cluster `%s' with %d node(s) in %s",
+ self.short_cluster_name, self.n_nodes, self.AUXILIARY_POOL_NAME)
tags = ",".join(f"{key}={value}" for key, value in self.tags.items())
with self.gcloud as gcloud:
- gcloud.run(f"container --project {self.gce_project} clusters create {self.gke_cluster_name}"
+ gcloud.run(f"container --project {self.gce_project} clusters create {self.short_cluster_name}"
f" --zone {self.gce_zone}"
f" --cluster-version {self.gke_cluster_version}"
f" --username admin"
f" --network {self.gce_network}"
- f" --num-nodes {num_nodes}"
+ f" --num-nodes {self.n_nodes}"
f" --machine-type {self.gce_instance_type}"
f" --image-type UBUNTU"
f" --disk-type {self.gce_image_type}"
f" --disk-size {self.gce_image_size}"
- f" --local-ssd-count {self.gce_n_local_ssd}"
- f" --node-taints role=scylla-clusters:NoSchedule"
f" --enable-stackdriver-kubernetes"
f" --no-enable-autoupgrade"
f" --no-enable-autorepair"
f" --metadata {tags}")
- self.set_nodes_in_pool(SCYLLA_POD_POOL_NAME, num_nodes)
- gcloud.run(f"container --project {self.gce_project} node-pools create {OPERATOR_POD_POOL_NAME}"
- f" --zone {self.gce_zone}"
- f" --cluster {self.gke_cluster_name}"
- f" --num-nodes 1"
- f" --machine-type n1-standard-4"
- f" --image-type UBUNTU"
- f" --disk-type pd-ssd"
- f" --disk-size 20"
- f" --no-enable-autoupgrade"
- f" --no-enable-autorepair")
- self.set_nodes_in_pool(OPERATOR_POD_POOL_NAME, 1)
-
- LOGGER.info("Get credentials for GKE cluster `%s'", self.name)
- gcloud.run(f"container clusters get-credentials {self.gke_cluster_name} --zone {self.gce_zone}")
- self.start_gcloud_token_update_thread()
- self.patch_kube_config()
+ self.patch_kubectl_config()
+ self.deploy_node_pool(GkeNodePool(
+ name=self.AUXILIARY_POOL_NAME,
+ num_nodes=self.n_nodes,
+ disk_size=self.gce_image_size,
+ disk_type=self.gce_image_type,
+ k8s_cluster=self,
+ instance_type=self.gce_instance_type,
+ is_deployed=True
+ ))

LOGGER.info("Setup RBAC for GKE cluster `%s'", self.name)
- self.kubectl(f"create clusterrolebinding cluster-admin-binding"
- f" --clusterrole cluster-admin"
- f" --user {self.gce_user}")
-
- LOGGER.info("Install RAID DaemonSet to GKE cluster `%s'", self.name)
- self.apply_file(RAID_DAEMONSET, envsubst=False)
+ self.kubectl("create clusterrolebinding cluster-admin-binding --clusterrole cluster-admin "
+ f"--user {self.gce_user}")

- LOGGER.info("Install CPU policy DaemonSet to GKE cluster `%s'", self.name)
- self.apply_file(CPU_POLICY_DAEMONSET, envsubst=False)
-
- LOGGER.info("Install local volume provisioner to GKE cluster `%s'", self.name)
- self.helm(f"install local-provisioner {K8S_CONFIGS_PATH_IN_CONTAINER}/provisioner")
-
- def get_nodes_in_pool(self, pool_name: str) -> int:
- return self._pools_info.get(pool_name, 0)
+ @cached_property
+ def gcloud(self) -> GcloudContextManager:
+ return cluster.Setup.tester_obj().localhost.gcloud

- def set_nodes_in_pool(self, pool_name: str, num: int):
- self._pools_info[pool_name] = num
+ def deploy_node_pool(self, pool: GkeNodePool, wait_till_ready=True) -> None:
+ self._add_pool(pool)
+ if pool.is_deployed:
+ return
+ LOGGER.info("Create %s pool with %d node(s) in GKE cluster `%s'", pool.name, pool.num_nodes, self.name)
+ if wait_till_ready:
+ with self.api_call_rate_limiter.pause:
+ pool.deploy_and_wait_till_ready()
+ self.api_call_rate_limiter.wait_till_api_become_stable(self)
+ else:
+ pool.deploy()

- def add_gke_pool(self, name: str, num_nodes: int, instance_type: str) -> None:
- LOGGER.info("Create %s pool with %d node(s) in GKE cluster `%s'", name, num_nodes, self.name)
+ def wait_all_node_pools_to_be_ready(self):
with self.api_call_rate_limiter.pause:
- self.gcloud.run(f"container --project {self.gce_project} node-pools create {name}"
- f" --zone {self.gce_zone}"
- f" --cluster {self.gke_cluster_name}"
- f" --num-nodes {num_nodes}"
- f" --machine-type {instance_type}"
- f" --image-type UBUNTU"
- f" --node-taints role=sct-loaders:NoSchedule"
- f" --no-enable-autoupgrade"
- f" --no-enable-autorepair")
- self.api_call_rate_limiter.wait_till_api_become_not_operational(self)
+ super().wait_all_node_pools_to_be_ready()
self.api_call_rate_limiter.wait_till_api_become_stable(self)
- self.kubectl_no_wait('wait --timeout=15m --all --for=condition=Ready node')
- self.set_nodes_in_pool(name, num_nodes)

- def resize_gke_pool(self, name: str, num_nodes: int) -> None:
- LOGGER.info("Resize %s pool with %d node(s) in GKE cluster `%s'", name, num_nodes, self.name)
+ def resize_node_pool(self, name: str, num_nodes: int) -> None:
with self.api_call_rate_limiter.pause:
- self.gcloud.run(f"container clusters resize {self.gke_cluster_name} --project {self.gce_project} "
- f"--zone {self.gce_zone} --node-pool {name} --num-nodes {num_nodes} --quiet")
- self.api_call_rate_limiter.wait_till_api_become_not_operational(self)
+ self.pools[name].resize(num_nodes)
self.api_call_rate_limiter.wait_till_api_become_stable(self)
- self.kubectl_no_wait('wait --timeout=15m --all --for=condition=Ready node')
- self.set_nodes_in_pool(name, num_nodes)

def get_instance_group_name_for_pool(self, pool_name: str, default=None) -> str:
try:
group_link = yaml.load(
self.gcloud.run(
f'container node-pools describe {pool_name} '
f'--zone {self.gce_zone} --project {self.gce_project} '
- f'--cluster {self.gke_cluster_name}')
+ f'--cluster {self.short_cluster_name}')
).get('instanceGroupUrls')[0]
return group_link.split('/')[-1]
except Exception as exc:
@@ -214,64 +258,15 @@ def delete_instance_that_belong_to_instance_group(self, group_name: str, instanc
self.gcloud.run(f'compute instance-groups managed delete-instances {group_name} '
f'--zone={self.gce_zone} --instances={instance_name}')

- def get_kubectl_config_for_user(self, config, username):
- for user in config["users"]:
- if user["name"] == username:
- return user["user"]["auth-provider"]["config"]
- return None
+ def create_token_update_thread(self):
+ return GcloudTokenUpdateThread(self.gcloud, self.kubectl_token_path)

- @cached_property
- def gcloud_token_path(self):
- return os.path.join(self.logdir, 'gcloud.output')
-
- def start_gcloud_token_update_thread(self):
- self._gcloud_token_thread = GcloudTokenUpdateThread(self.gcloud, self.gcloud_token_path)
- self._gcloud_token_thread.start()
- # Wait till GcloudTokenUpdateThread get tokens and dump them to gcloud_token_path
- wait_for(os.path.exists, timeout=30, step=5, text="Wait for gcloud token", throw_exc=True,
- path=self.gcloud_token_path)
-
- def patch_kube_config(self) -> None:
- # It assumes that config is already created by gcloud
- # It patches kube config so that instead of running gcloud each time
- # we will get it's output from the cache file located at gcloud_token_path
- # To keep this cache file updated we run GcloudTokenUpdateThread thread
- kube_config_path = os.path.expanduser(os.environ.get('KUBECONFIG', '~/.kube/config'))
- user_name = f"gke_{self.gce_project}_{self.gce_zone}_{self.gke_cluster_name}"
- LOGGER.debug("Patch %s to use dockerized gcloud for auth against GKE cluster `%s'", kube_config_path, self.name)
-
- with open(kube_config_path) as kube_config:
- data = yaml.safe_load(kube_config)
- user_config = self.get_kubectl_config_for_user(data, user_name)
-
- if user_config is None:
- raise RuntimeError(f"Unable to find configuration for `{user_name}' in ~/.kube/config")
-
- user_config["cmd-args"] = self.gcloud_token_path
- user_config["cmd-path"] = "cat"
-
- with open(kube_config_path, "w") as kube_config:
- yaml.safe_dump(data, kube_config)
-
- self.log.debug(f'Patched kubectl config at {kube_config_path} '
- f'with static gcloud config from {self.gcloud_token_path}')
-
- @cluster.wait_for_init_wrap
- def wait_for_init(self):
- LOGGER.info("--- List of nodes in GKE cluster `%s': ---\n%s\n", self.name, self.kubectl("get nodes").stdout)
- LOGGER.info("--- List of pods in GKE cluster `%s': ---\n%s\n", self.name, self.kubectl("get pods -A").stdout)
-
- LOGGER.info("Wait for readiness of all pods in default namespace...")
- self.kubectl("wait --timeout=15m --all --for=condition=Ready pod", timeout=15*60+10)
-
- # Create namespaces which are required in several different optional places
- self.kubectl(f"create namespace {self._scylla_namespace}")
- self.kubectl(f"create namespace {self._scylla_manager_namespace}")
+ def create_kubectl_config(self):
+ self.gcloud.run(f"container clusters get-credentials {self.short_cluster_name} --zone {self.gce_zone}")

def destroy(self):
self.api_call_rate_limiter.stop()
- if self._gcloud_token_thread:
- self._gcloud_token_thread.stop()
+ self.stop_token_update_thread()


class GkeScyllaPodContainer(BaseScyllaPodContainer, IptablesPodIpRedirectMixin):
@@ -281,7 +276,7 @@ class GkeScyllaPodContainer(BaseScyllaPodContainer, IptablesPodIpRedirectMixin):
pod_readiness_timeout = 30 # minutes
pod_terminate_timeout = 30 # minutes

- @cached_property
+ @property
def gce_node_ips(self):
gce_node = self.k8s_node
return gce_node.public_ips, gce_node.private_ips
@@ -359,21 +354,23 @@ def terminate_k8s_node(self):
Scylla Pod X
Scylla node X
'''))
- group_name = self.parent_cluster.k8s_cluster.get_instance_group_name_for_pool(SCYLLA_POD_POOL_NAME)
super().terminate_k8s_node()

# Removing GKE instance and adding one node back to the cluster
# TBD: Remove below lines when https://issuetracker.google.com/issues/178302655 is fixed

- self.parent_cluster.k8s_cluster.delete_instance_that_belong_to_instance_group(group_name, self.node_name)
- self.parent_cluster.k8s_cluster.resize_gke_pool(
- SCYLLA_POD_POOL_NAME,
- self.parent_cluster.k8s_cluster.get_nodes_in_pool(SCYLLA_POD_POOL_NAME)
+ self.parent_cluster.node_pool.remove_instance(instance_name=self.node_name)
+ self.parent_cluster.k8s_cluster.resize_node_pool(
+ self.parent_cluster.pool_name,
+ self.parent_cluster.node_pool.num_nodes
)


class GkeScyllaPodCluster(ScyllaPodCluster, IptablesClusterOpsMixin):
+ NODE_PREPARE_FILE = sct_abs_path("sdcm/k8s_configs/gke/scylla-node-prepare.yaml")
+
k8s_cluster: 'GkeCluster'
+ node_pool: 'GkeNodePool'
PodContainerClass = GkeScyllaPodContainer

def add_nodes(self,
@@ -395,5 +392,7 @@ def add_nodes(self,


class MonitorSetGKE(MonitorSetGCE):
+ DB_NODES_IP_ADDRESS = 'public_ip_address'
+
def install_scylla_manager(self, node, auth_token):
pass
diff --git a/sdcm/cluster_k8s/minikube.py b/sdcm/cluster_k8s/minikube.py
--- a/sdcm/cluster_k8s/minikube.py
+++ b/sdcm/cluster_k8s/minikube.py
@@ -18,19 +18,18 @@

from invoke.exceptions import UnexpectedExit

-from sdcm import sct_abs_path, cluster, cluster_gce
+from sdcm import cluster, cluster_gce
from sdcm.remote import LOCALRUNNER
from sdcm.remote.kubernetes_cmd_runner import KubernetesCmdRunner
from sdcm.cluster_k8s import KubernetesCluster, BaseScyllaPodContainer, ScyllaPodCluster
from sdcm.cluster_k8s.iptables import IptablesPodPortsRedirectMixin, IptablesClusterOpsMixin
from sdcm.cluster_gce import MonitorSetGCE
-from sdcm.utils.k8s import KubernetesOps, K8S_CONFIGS_PATH_IN_CONTAINER
+from sdcm.utils.k8s import KubernetesOps
from sdcm.utils.common import get_free_port, wait_for_port
from sdcm.utils.decorators import retrying
from sdcm.utils.docker_utils import ContainerManager


-SCYLLA_CLUSTER_CONFIG = f"{K8S_CONFIGS_PATH_IN_CONTAINER}/minikube-cluster-chart-values.yaml"
KUBECTL_PROXY_PORT = 8001
KUBECTL_PROXY_CONTAINER = "auto_ssh:kubectl_proxy"
SCYLLA_POD_EXPOSED_PORTS = [3000, 9042, 9180, ]
@@ -172,6 +171,18 @@ def docker_pull(self, image):
LOGGER.info("Pull `%s' to Minikube' Docker environment", image)
self.remoter.run(f"docker pull -q {image}")

+ def deploy(self):
+ pass
+
+ def create_kubectl_config(self):
+ pass
+
+ def create_token_update_thread(self):
+ pass
+
+ def deploy_node_pool(self, pool, wait_till_ready=True) -> None:
+ raise NotImplementedError("Not supported in Minikube")
+

class GceMinikubeCluster(MinikubeCluster, cluster_gce.GCECluster):
def __init__(self, minikube_version, gce_image, gce_image_type, gce_image_size, gce_network, services, credentials, # pylint: disable=too-many-arguments
@@ -182,6 +193,7 @@ def __init__(self, minikube_version, gce_image, gce_image_type, gce_image_size,

cluster_prefix = cluster.prepend_user_prefix(user_prefix, "k8s-minikube")
node_prefix = cluster.prepend_user_prefix(user_prefix, "node")
+ # pylint: disable=unexpected-keyword-arg,no-value-for-parameter
super().__init__(gce_image=gce_image,
gce_image_type=gce_image_type,
gce_image_size=gce_image_size,
diff --git a/sdcm/ec2_client.py b/sdcm/ec2_client.py
--- a/sdcm/ec2_client.py
+++ b/sdcm/ec2_client.py
@@ -8,7 +8,6 @@
from botocore.exceptions import ClientError, NoRegionError

from sdcm.utils.decorators import retrying
-from sdcm.utils.pricing import AWSPricing

LOGGER = logging.getLogger(__name__)

@@ -140,6 +139,7 @@ def _get_spot_price(self, instance_type):
:return: spot bid price
"""
LOGGER.info('Calculating spot price based on OnDemand price')
+ from sdcm.utils.pricing import AWSPricing
aws_pricing = AWSPricing()
on_demand_price = float(aws_pricing.get_on_demand_instance_price(self.region_name, instance_type))

diff --git a/sdcm/es.py b/sdcm/es.py
--- a/sdcm/es.py
+++ b/sdcm/es.py
@@ -1,4 +1,5 @@
import logging
+from typing import Any

import elasticsearch

@@ -69,3 +70,7 @@ def delete_doc(self, index, doc_type, doc_id):
"""
if self.get_doc(index, doc_id, doc_type):
self.delete(index=index, doc_type=doc_type, id=doc_id)
+
+ def search(self, **kwargs) -> Any:
+ # Needed to make pre-commit happy
+ return super().search(**kwargs)
diff --git a/sdcm/k8s_configs/cpu-policy-daemonset.yaml b/sdcm/k8s_configs/cpu-policy-daemonset.yaml
--- a/sdcm/k8s_configs/cpu-policy-daemonset.yaml
+++ b/sdcm/k8s_configs/cpu-policy-daemonset.yaml
@@ -1,132 +0,0 @@
-# ClusterRole for cpu-policy-daemonset.
-apiVersion: rbac.authorization.k8s.io/v1
-kind: ClusterRole
-metadata:
- name: cpu-policy-daemonset
-rules:
- - apiGroups:
- - ""
- resources:
- - nodes
- verbs:
- - get
- - patch
- - apiGroups:
- - ""
- resources:
- - pods
- verbs:
- - list
- - apiGroups:
- - apps
- - extensions
- resources:
- - daemonsets
- verbs:
- - get
- - apiGroups:
- - ""
- resources:
- - pods/eviction
- verbs:
- - create
-
-
----
-
-# ServiceAccount for cpu-policy daemonset.
-apiVersion: v1
-kind: ServiceAccount
-metadata:
- name: cpu-policy-daemonset
- namespace: default
----
-# Bind cpu-policy daemonset ServiceAccount with ClusterRole.
-kind: ClusterRoleBinding
-apiVersion: rbac.authorization.k8s.io/v1
-metadata:
- name: cpu-policy-daemonset
-roleRef:
- apiGroup: rbac.authorization.k8s.io
- kind: ClusterRole
- name: cpu-policy-daemonset
-subjects:
-- kind: ServiceAccount
- name: cpu-policy-daemonset
- namespace: default
----
-# Daemonset that will change cpuManagerPolicy to static.
-apiVersion: apps/v1
-kind: DaemonSet
-metadata:
- name: cpu-policy
-spec:
- selector:
- matchLabels:
- name: cpu-policy
- template:
- metadata:
- labels:
- name: cpu-policy
- spec:
- hostPID: true
- hostIPC: true
- affinity:
- nodeAffinity:
- requiredDuringSchedulingIgnoredDuringExecution:
- nodeSelectorTerms:
- - matchExpressions:
- - key: cloud.google.com/gke-nodepool
- operator: NotIn
- values:
- - operator-pool
- tolerations:
- - key: role
- operator: Equal
- value: scylla-clusters
- effect: NoSchedule
- - key: role
- operator: Equal
- value: cassandra-stress
- effect: NoSchedule
- serviceAccountName: cpu-policy-daemonset
- containers:
- - name: cpu-policy
- image: scylladb/kubectl:1.11.5
- imagePullPolicy: Always
- env:
- - name: NODE
- valueFrom:
- fieldRef:
- fieldPath: spec.nodeName
- securityContext:
- privileged: true
- volumeMounts:
- - name: hostfs
- mountPath: /mnt/hostfs
- mountPropagation: Bidirectional
- volumes:
- - name: hostfs
- hostPath:
- path: /
- - name: dbus
- hostPath:
- path: /var/run/dbus
- - name: systemd
- hostPath:
- path: /run/systemd
- - name: systemctl
- hostPath:
- path: /bin/systemctl
- - name: system
- hostPath:
- path: /etc/systemd/system
- - name: usr
- hostPath:
- path: /usr
- - name: lib
- hostPath:
- path: /lib/systemd
- - name: lib-linux
- hostPath:
- path: /lib/systemd
diff --git a/sdcm/k8s_configs/dns-resolver-pod.yaml b/sdcm/k8s_configs/dns-resolver-pod.yaml
--- a/sdcm/k8s_configs/dns-resolver-pod.yaml
+++ b/sdcm/k8s_configs/dns-resolver-pod.yaml
@@ -0,0 +1,22 @@
+apiVersion: v1
+kind: Pod
+metadata:
+ name: ${POD_NAME}
+ namespace: default
+spec:
+ containers:
+ - name: ${POD_NAME}
+ image: gcr.io/kubernetes-e2e-test-images/dnsutils:1.3
+ command:
+ - /bin/sh
+ - -c
+ - while true; do sleep 900 ; done
+ imagePullPolicy: IfNotPresent
+ resources:
+ limits:
+ cpu: 100m
+ memory: 100Mi
+ requests:
+ cpu: 100m
+ memory: 100Mi
+ restartPolicy: Always
diff --git a/sdcm/k8s_configs/eks/scylla-node-prepare.yaml b/sdcm/k8s_configs/eks/scylla-node-prepare.yaml
--- a/sdcm/k8s_configs/eks/scylla-node-prepare.yaml
+++ b/sdcm/k8s_configs/eks/scylla-node-prepare.yaml
@@ -0,0 +1,108 @@
+# ClusterRole for node-setup-daemonset.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ name: node-setup-daemonset
+rules:
+ - apiGroups:
+ - ""
+ resources:
+ - nodes
+ verbs:
+ - get
+ - patch
+ - apiGroups:
+ - ""
+ resources:
+ - pods
+ verbs:
+ - list
+ - apiGroups:
+ - apps
+ - extensions
+ resources:
+ - daemonsets
+ verbs:
+ - get
+ - apiGroups:
+ - ""
+ resources:
+ - pods/eviction
+ verbs:
+ - create
+
+
+---
+
+# ServiceAccount for node-setup daemonset.
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ name: node-setup-daemonset
+ namespace: default
+---
+# Bind node-setup daemonset ServiceAccount with ClusterRole.
+kind: ClusterRoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: node-setup-daemonset
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: ClusterRole
+ name: node-setup-daemonset
+subjects:
+- kind: ServiceAccount
+ name: node-setup-daemonset
+ namespace: default
+---
+# Daemonset that will configure disks and networking interfaces on node.
+apiVersion: apps/v1
+kind: DaemonSet
+metadata:
+ name: node-setup
+spec:
+ selector:
+ matchLabels:
+ name: node-setup
+ template:
+ metadata:
+ labels:
+ name: node-setup
+ spec:
+ hostPID: true
+ hostIPC: true
+ hostNetwork: true
+ serviceAccountName: node-setup-daemonset
+ containers:
+ - name: node-setup
+ image: scyllazimnx/scylla-machine-image-k8s-aws:666.development-20201001.eea319c
+ imagePullPolicy: Always
+ args:
+ - --all
+ env:
+ - name: ROOT_DISK
+ value: /mnt/hostfs/mnt/raid-disks/disk0
+ - name: SCYLLAD_CONF_MOUNT
+ value: /mnt/scylla.d/
+ securityContext:
+ privileged: true
+ volumeMounts:
+ - name: hostfs
+ mountPath: /mnt/hostfs
+ mountPropagation: Bidirectional
+ - name: hostetcscyllad
+ mountPath: /mnt/scylla.d
+ mountPropagation: Bidirectional
+ - name: hostirqbalanceconfig
+ mountPath: /etc/conf.d/irqbalance
+ mountPropagation: Bidirectional
+ volumes:
+ - name: hostfs
+ hostPath:
+ path: /
+ - name: hostetcscyllad
+ hostPath:
+ path: /etc/scylla.d/
+ - name: hostirqbalanceconfig
+ hostPath:
+ path: /etc/sysconfig/irqbalance
diff --git a/sdcm/k8s_configs/gke-cluster-chart-values.yaml b/sdcm/k8s_configs/gke-cluster-chart-values.yaml
--- a/sdcm/k8s_configs/gke-cluster-chart-values.yaml
+++ b/sdcm/k8s_configs/gke-cluster-chart-values.yaml
@@ -1,61 +0,0 @@
----
-nameOverride: ""
-fullnameOverride: ${SCT_K8S_SCYLLA_CLUSTER_NAME}
-
-scyllaImage:
- repository: scylladb/scylla
- tag: ${SCT_SCYLLA_VERSION}
-agentImage:
- repository: scylladb/scylla-manager-agent
- tag: ${SCT_SCYLLA_MGMT_AGENT_VERSION}
-
-serviceAccount:
- create: true
- annotations: {}
- name: ${SCT_K8S_SCYLLA_CLUSTER_NAME}-member
-
-alternator:
- enabled: false
- port: 8000
- writeIsolation: "always"
-
-developerMode: false
-cpuset: true
-hostNetworking: true
-automaticOrphanedNodeCleanup: true
-sysctls: ["fs.aio-max-nr=2097152"]
-backups: []
-repairs: []
-serviceMonitor:
- create: ${SCT_K8S_DEPLOY_MONITORING}
-
-datacenter: ${SCT_K8S_SCYLLA_DATACENTER}
-racks:
- - name: ${SCT_K8S_SCYLLA_RACK}
- scyllaConfig: "scylla-config"
- scyllaAgentConfig: "scylla-agent-config"
- members: 0
- storage:
- storageClassName: local-raid-disks
- capacity: ${SCT_K8S_SCYLLA_DISK_GI}Gi
- resources:
- limits:
- cpu: ${SCT_K8S_SCYLLA_CPU_N}
- memory: ${SCT_K8S_SCYLLA_MEM_GI}Gi
- requests:
- cpu: ${SCT_K8S_SCYLLA_CPU_N}
- memory: ${SCT_K8S_SCYLLA_MEM_GI}Gi
- placement:
- nodeAffinity:
- requiredDuringSchedulingIgnoredDuringExecution:
- nodeSelectorTerms:
- - matchExpressions:
- - key: failure-domain.beta.kubernetes.io/zone
- operator: In
- values:
- - ${SCT_K8S_SCYLLA_DATACENTER}
- tolerations:
- - key: role
- operator: Equal
- value: scylla-clusters
- effect: NoSchedule
diff --git a/sdcm/k8s_configs/gke/scylla-node-prepare.yaml b/sdcm/k8s_configs/gke/scylla-node-prepare.yaml
--- a/sdcm/k8s_configs/gke/scylla-node-prepare.yaml
+++ b/sdcm/k8s_configs/gke/scylla-node-prepare.yaml
@@ -1,3 +1,118 @@
+# ClusterRole for cpu-policy-daemonset.
+apiVersion: rbac.authorization.k8s.io/v1
+kind: ClusterRole
+metadata:
+ name: cpu-policy-daemonset
+rules:
+ - apiGroups:
+ - ""
+ resources:
+ - nodes
+ verbs:
+ - get
+ - patch
+ - apiGroups:
+ - ""
+ resources:
+ - pods
+ verbs:
+ - list
+ - apiGroups:
+ - apps
+ - extensions
+ resources:
+ - daemonsets
+ verbs:
+ - get
+ - apiGroups:
+ - ""
+ resources:
+ - pods/eviction
+ verbs:
+ - create
+
+
+---
+
+# ServiceAccount for cpu-policy daemonset.
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+ name: cpu-policy-daemonset
+ namespace: default
+---
+# Bind cpu-policy daemonset ServiceAccount with ClusterRole.
+kind: ClusterRoleBinding
+apiVersion: rbac.authorization.k8s.io/v1
+metadata:
+ name: cpu-policy-daemonset
+roleRef:
+ apiGroup: rbac.authorization.k8s.io
+ kind: ClusterRole
+ name: cpu-policy-daemonset
+subjects:
+- kind: ServiceAccount
+ name: cpu-policy-daemonset
+ namespace: default
+---
+# Daemonset that will change cpuManagerPolicy to static.
+apiVersion: apps/v1
+kind: DaemonSet
+metadata:
+ name: cpu-policy
+spec:
+ selector:
+ matchLabels:
+ name: cpu-policy
+ template:
+ metadata:
+ labels:
+ name: cpu-policy
+ spec:
+ hostPID: true
+ hostIPC: true
+ serviceAccountName: cpu-policy-daemonset
+ containers:
+ - name: cpu-policy
+ image: scylladb/kubectl:1.11.5
+ imagePullPolicy: Always
+ env:
+ - name: NODE
+ valueFrom:
+ fieldRef:
+ fieldPath: spec.nodeName
+ securityContext:
+ privileged: true
+ volumeMounts:
+ - name: hostfs
+ mountPath: /mnt/hostfs
+ mountPropagation: Bidirectional
+ volumes:
+ - name: hostfs
+ hostPath:
+ path: /
+ - name: dbus
+ hostPath:
+ path: /var/run/dbus
+ - name: systemd
+ hostPath:
+ path: /run/systemd
+ - name: systemctl
+ hostPath:
+ path: /bin/systemctl
+ - name: system
+ hostPath:
+ path: /etc/systemd/system
+ - name: usr
+ hostPath:
+ path: /usr
+ - name: lib
+ hostPath:
+ path: /lib/systemd
+ - name: lib-linux
+ hostPath:
+ path: /lib/systemd
+---
# Daemonset that will group NUM_DISKS disks following the pattern '/dev/ssd{i}'
# into a raid0 array and mount that array onto RAID_DIR.
apiVersion: apps/v1
diff --git a/sdcm/k8s_configs/loaders.yaml b/sdcm/k8s_configs/loaders.yaml
--- a/sdcm/k8s_configs/loaders.yaml
+++ b/sdcm/k8s_configs/loaders.yaml
@@ -34,11 +34,11 @@ spec:
name: docker-socket
resources:
limits:
- cpu: ${SCT_K8S_LOADER_CPU_N}
- memory: ${SCT_K8S_LOADER_MEM_GI}Gi
+ cpu: ${CPU_LIMIT}
+ memory: ${MEMORY_LIMIT}
requests:
- cpu: ${SCT_K8S_LOADER_CPU_N}
- memory: ${SCT_K8S_LOADER_MEM_GI}Gi
+ cpu: ${CPU_LIMIT}
+ memory: ${MEMORY_LIMIT}
securityContext:
privileged: true
volumes:
@@ -51,17 +51,3 @@ spec:
path: /var/run/docker.sock
type: Socket
hostNetwork: true
- affinity:
- nodeAffinity:
- requiredDuringSchedulingIgnoredDuringExecution:
- nodeSelectorTerms:
- - matchExpressions:
- - key: failure-domain.beta.kubernetes.io/zone
- operator: In
- values:
- - ${SCT_K8S_SCYLLA_DATACENTER}
- tolerations:
- - key: role
- operator: Equal
- value: ${SCT_K8S_LOADER_CLUSTER_NAME}
- effect: NoSchedule
diff --git a/sdcm/k8s_configs/minikube-cluster-chart-values.yaml b/sdcm/k8s_configs/minikube-cluster-chart-values.yaml
--- a/sdcm/k8s_configs/minikube-cluster-chart-values.yaml
+++ b/sdcm/k8s_configs/minikube-cluster-chart-values.yaml
@@ -1,46 +0,0 @@
----
-nameOverride: ""
-fullnameOverride: ${SCT_K8S_SCYLLA_CLUSTER_NAME}
-
-scyllaImage:
- repository: scylladb/scylla
- tag: ${SCT_SCYLLA_VERSION}
-agentImage:
- repository: scylladb/scylla-manager-agent
- tag: ${SCT_SCYLLA_MGMT_AGENT_VERSION}
-
-serviceAccount:
- create: true
- annotations: {}
- name: ${SCT_K8S_SCYLLA_CLUSTER_NAME}-member
-
-alternator:
- enabled: false
- port: 8000
- writeIsolation: "always"
-
-developerMode: true
-cpuset: false
-hostNetworking: false
-automaticOrphanedNodeCleanup: false
-sysctls: []
-backups: []
-repairs: []
-serviceMonitor:
- create: ${SCT_K8S_DEPLOY_MONITORING}
-
-datacenter: ${SCT_K8S_SCYLLA_DATACENTER}
-racks:
- - name: ${SCT_K8S_SCYLLA_RACK}
- scyllaConfig: "scylla-config"
- scyllaAgentConfig: "scylla-agent-config"
- members: 0
- storage:
- capacity: ${SCT_K8S_SCYLLA_DISK_GI}Gi
- resources:
- limits:
- cpu: ${SCT_K8S_SCYLLA_CPU_N}
- memory: ${SCT_K8S_SCYLLA_MEM_GI}Gi
- requests:
- cpu: ${SCT_K8S_SCYLLA_CPU_N}
- memory: ${SCT_K8S_SCYLLA_MEM_GI}Gi
diff --git a/sdcm/k8s_configs/provisioner/templates/provisioner.yaml b/sdcm/k8s_configs/provisioner/templates/provisioner.yaml
--- a/sdcm/k8s_configs/provisioner/templates/provisioner.yaml
+++ b/sdcm/k8s_configs/provisioner/templates/provisioner.yaml
@@ -68,6 +68,10 @@ spec:
nodeSelector:
{{ .Values.daemonset.nodeSelector | toYaml | trim | indent 8 }}
{{- end }}
+{{- if .Values.affinity }}
+ affinity:
+{{ .Values.affinity | toYaml | trim | indent 8 }}
+{{- end }}
{{- if .Values.daemonset.tolerations }}
tolerations:
{{ .Values.daemonset.tolerations | toYaml | trim | indent 8 }}
diff --git a/sdcm/k8s_configs/provisioner/values.yaml b/sdcm/k8s_configs/provisioner/values.yaml
--- a/sdcm/k8s_configs/provisioner/values.yaml
+++ b/sdcm/k8s_configs/provisioner/values.yaml
@@ -111,15 +111,7 @@ daemonset:
#
# Node tolerations for local-volume-provisioner scheduling to nodes with taints.
# Ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
- tolerations:
- - key: role
- operator: Equal
- value: scylla-clusters
- effect: NoSchedule
- - key: role
- operator: Equal
- value: cassandra-stress
- effect: NoSchedule
+ tolerations: {}
#
# If configured, resources will set the requests/limits field to the Daemonset PodSpec.
# Ref: https://kubernetes.io/docs/concepts/configuration/manage-compute-resources-container/
@@ -144,3 +136,5 @@ prometheus:
## [Kube Prometheus Selector Label](https://github.com/coreos/prometheus-operator/blob/master/helm/kube-prometheus/values.yaml#L298)
selector:
prometheus: kube-prometheus
+
+affinity: {}
diff --git a/sdcm/results_analyze/test.py b/sdcm/results_analyze/test.py
--- a/sdcm/results_analyze/test.py
+++ b/sdcm/results_analyze/test.py
@@ -544,7 +544,7 @@ def get_prior_tests(self, filter_path=None) -> typing.List['TestResultClass']:
es_result = es_result.get('hits', {}).get('hits', None) if es_result else None
if not es_result:
return output
- for es_data in es_result:
+ for es_data in es_result: # pylint: disable=not-an-iterable
test = TestResultClass(es_data)
output.append(test)
return output
diff --git a/sdcm/sct_config.py b/sdcm/sct_config.py
--- a/sdcm/sct_config.py
+++ b/sdcm/sct_config.py
@@ -28,9 +28,11 @@
import anyconfig

from sdcm import sct_abs_path
+from sdcm.cluster_k8s import eks, gke
from sdcm.utils import alternator
from sdcm.utils.common import find_scylla_repo, get_scylla_ami_versions, get_branched_ami, get_ami_tags, \
ami_built_by_scylla, MAX_SPOT_DURATION_TIME
+from sdcm.utils.k8s import convert_cpu_units_to_k8s_value, convert_memory_units_to_k8s_value
from sdcm.utils.version_utils import get_branch_version, get_branch_version_for_multiple_repositories, \
get_scylla_docker_repo_from_version, resolve_latest_repo_symlink
from sdcm.sct_events.base import add_severity_limit_rules, print_critical_events
@@ -87,7 +89,8 @@ class SCTConfiguration(dict):
"""
Class the hold the SCT configuration
"""
- available_backends = ['aws', 'gce', 'docker', 'baremetal', 'aws-siren', 'k8s-gce-minikube', 'k8s-gke', 'gce-siren']
+ available_backends = [
+ 'aws', 'gce', 'docker', 'baremetal', 'aws-siren', 'k8s-gce-minikube', 'k8s-gke', 'gce-siren', 'k8s-eks']

config_options = [
dict(name="config_files", env="SCT_CONFIG_FILES", type=str_or_list,
@@ -625,11 +628,25 @@ class SCTConfiguration(dict):
dict(name="gce_root_disk_size_minikube", env="SCT_GCE_ROOT_DISK_SIZE_MINIKUBE", type=int,
help=""),

- # k8s-gke options
- dict(name="gke_cluster_version", env="SCT_GKE_CLUSTER_VERSION", type=str,
+ # k8s-eks options
+
+ dict(name="eks_service_ipv4_cidr", env="SCT_EKS_SERVICE_IPV4_CIDR", type=str,
+ help=""),
+
+ dict(name="eks_vpc_cni_version", env="SCT_EKS_VPC_CNI_VERSION", type=str,
+ help=""),
+
+ dict(name="eks_role_arn", env="SCT_EKS_ROLE_ARN", type=str,
help=""),

- dict(name="gke_cluster_n_nodes", env="SCT_GKE_CLUSTER_N_NODES", type=int,
+ dict(name="eks_cluster_version", env="SCT_EKS_CLUSTER_VERSION", type=str,
+ help=""),
+
+ dict(name="eks_nodegroup_role_arn", env="SCT_EKS_NODEGROUP_ROLE_ARN", type=str,
+ help=""),
+
+ # k8s-gke options
+ dict(name="gke_cluster_version", env="SCT_GKE_CLUSTER_VERSION", type=str,
help=""),

# k8s options
@@ -657,22 +674,13 @@ class SCTConfiguration(dict):
dict(name="k8s_scylla_cluster_name", env="SCT_K8S_SCYLLA_CLUSTER_NAME", type=str,
help=""),

- dict(name="k8s_scylla_cpu_n", env="SCT_K8S_SCYLLA_CPU_N", type=int,
- help=""),
-
- dict(name="k8s_scylla_mem_gi", env="SCT_K8S_SCYLLA_MEM_GI", type=int,
- help=""),
-
dict(name="k8s_scylla_disk_gi", env="SCT_K8S_SCYLLA_DISK_GI", type=int,
help=""),

- dict(name="k8s_loader_cluster_name", env="SCT_K8S_LOADER_CLUSTER_NAME", type=str,
- help=""),
-
- dict(name="k8s_loader_cpu_n", env="SCT_K8S_LOADER_CPU_N", type=int,
+ dict(name="k8s_scylla_disk_class", env="SCT_K8S_SCYLLA_DISK_CLASS", type=str,
help=""),

- dict(name="k8s_loader_mem_gi", env="SCT_K8S_LOADER_MEM_GI", type=int,
+ dict(name="k8s_loader_cluster_name", env="SCT_K8S_LOADER_CLUSTER_NAME", type=str,
help=""),

dict(name="minikube_version", env="SCT_MINIKUBE_VERSION", type=str,
@@ -1121,19 +1129,28 @@ class SCTConfiguration(dict):
'k8s-gce-minikube': ['gce_image_minikube', 'gce_instance_type_minikube', 'gce_root_disk_type_minikube',
'gce_root_disk_size_minikube', 'user_credentials_path', 'scylla_version',
'scylla_mgmt_agent_version', 'k8s_scylla_operator_helm_repo', 'k8s_scylla_datacenter',
- 'k8s_scylla_rack', 'k8s_scylla_cluster_name', 'k8s_scylla_cpu_n', 'k8s_scylla_mem_gi',
+ 'k8s_scylla_rack', 'k8s_scylla_cluster_name',
'k8s_scylla_disk_gi', 'gce_image', 'gce_instance_type_loader', 'gce_root_disk_type_loader',
'gce_n_local_ssd_disk_loader', 'gce_instance_type_monitor', 'gce_root_disk_type_monitor',
'gce_root_disk_size_monitor', 'gce_n_local_ssd_disk_monitor', 'minikube_version',
'mgmt_docker_image'],

- 'k8s-gke': ['gke_cluster_version', 'gke_cluster_n_nodes', 'gce_instance_type_db', 'gce_root_disk_type_db',
+ 'k8s-gke': ['gke_cluster_version', 'gce_instance_type_db', 'gce_root_disk_type_db',
'gce_root_disk_size_db', 'gce_n_local_ssd_disk_db', 'user_credentials_path', 'scylla_version',
'scylla_mgmt_agent_version', 'k8s_scylla_operator_helm_repo', 'k8s_scylla_datacenter',
- 'k8s_scylla_rack', 'k8s_scylla_cluster_name', 'k8s_scylla_cpu_n', 'k8s_scylla_mem_gi',
- 'k8s_loader_cluster_name', 'k8s_loader_cpu_n', 'k8s_loader_mem_gi', 'gce_instance_type_loader',
+ 'k8s_scylla_rack', 'k8s_scylla_cluster_name',
+ 'k8s_loader_cluster_name', 'gce_instance_type_loader',
'gce_image_monitor', 'gce_instance_type_monitor', 'gce_root_disk_type_monitor',
'gce_root_disk_size_monitor', 'gce_n_local_ssd_disk_monitor', 'mgmt_docker_image'],
+
+ 'k8s-eks': ['instance_type_loader', 'instance_type_monitor', 'instance_type_db', 'region_name',
+ 'ami_id_db_scylla', 'ami_id_monitor', 'aws_root_disk_size_monitor',
+ 'aws_root_disk_name_monitor', 'ami_db_scylla_user', 'ami_monitor_user', 'user_credentials_path',
+ 'scylla_version', 'scylla_mgmt_agent_version', 'k8s_scylla_operator_docker_image',
+ 'k8s_scylla_datacenter', 'k8s_scylla_rack', 'k8s_scylla_cluster_name',
+ 'k8s_loader_cluster_name',
+ 'mgmt_docker_image', 'eks_service_ipv4_cidr', 'eks_vpc_cni_version', 'eks_role_arn',
+ 'eks_cluster_version', 'eks_nodegroup_role_arn'],
}

defaults_config_files = {
@@ -1145,6 +1162,7 @@ class SCTConfiguration(dict):
"gce-siren": [sct_abs_path('defaults/gce_config.yaml')],
"k8s-gce-minikube": [sct_abs_path('defaults/k8s_gce_minikube_config.yaml')],
"k8s-gke": [sct_abs_path('defaults/k8s_gke_config.yaml')],
+ "k8s-eks": [sct_abs_path('defaults/aws_config.yaml'), sct_abs_path('defaults/k8s_eks_config.yaml')],
}

multi_region_params = [
@@ -1178,13 +1196,13 @@ def __init__(self):
del self['regions_data']

# 2.2) load the region data
- region_names = (self.get('region_name') or '').split()
- region_names = env.get('region_name', region_names)

cluster_backend = self.get('cluster_backend')
cluster_backend = env.get('cluster_backend', cluster_backend)

- if 'aws' in cluster_backend:
+ region_names = self.region_names
+
+ if cluster_backend in ['aws', 'k8s-eks']:
for region in region_names:
for key, value in regions_data[region].items():
if key not in self.keys():
@@ -1201,7 +1219,7 @@ def __init__(self):

# 5) assume multi dc by n_db_nodes set size
if 'aws' in cluster_backend:
- num_of_regions = len((self.get('region_name')).split())
+ num_of_regions = len(region_names)
num_of_db_nodes_sets = len(str(self.get('n_db_nodes')).split(' '))
if num_of_db_nodes_sets > num_of_regions:
for region in list(regions_data.keys())[:num_of_db_nodes_sets]:
@@ -1224,7 +1242,7 @@ def __init__(self):
self.log.info("Assume that Scylla Docker image has repo file pre-installed.")
elif not self.get('ami_id_db_scylla') and self.get('cluster_backend') == 'aws':
ami_list = []
- for region in self.get('region_name').split():
+ for region in region_names:
if ':' in scylla_version:
amis = get_branched_ami(scylla_version, region_name=region)
ami_list.append(amis[0].id)
@@ -1258,7 +1276,7 @@ def __init__(self):
if oracle_scylla_version:
if not self.get('ami_id_db_oracle') and self.get('cluster_backend') == 'aws':
ami_list = []
- for region in self.get('region_name').split():
+ for region in region_names:
if ':' in oracle_scylla_version:
amis = get_branched_ami(oracle_scylla_version, region_name=region)
ami_list.append(amis[0].id)
@@ -1328,6 +1346,24 @@ def __init__(self):
def log_config(self):
self.log.info(self.dump_config())

+ @property
+ def region_names(self) -> List[str]:
+ region_names = self._env.get('region_name')
+ if region_names is None:
+ region_names = self.get('region_name')
+ if region_names is None:
+ region_names = ''
+ if isinstance(region_names, str):
+ region_names = region_names.split()
+ output = []
+ for region_name in region_names:
+ output.extend(region_name.split())
+ return output
+
+ @property
+ def _env(self) -> dict:
+ return self._load_environment_variables()
+
@classmethod
def get_config_option(cls, name):
return [o for o in cls.config_options if o['name'] == name][0]
@@ -1477,7 +1513,7 @@ def _check_backend_defaults(backend, required_params):
if not all(region_count['region_name'] == x for x in region_count.values()):
raise ValueError("not all multi region values are equal: \n\t{}".format(region_count))

- if 'extra_network_interface' in self and len(self.get('region_name').split()) >= 2:
+ if 'extra_network_interface' in self and len(self.region_names) >= 2:
raise ValueError("extra_network_interface isn't supported for multi region use cases")

# validate seeds number
@@ -1508,7 +1544,7 @@ def _check_backend_defaults(backend, required_params):
# verify that the AMIs used all have 'user_data_format_version' tag
if 'aws' in backend:
ami_id_db_scylla = self.get('ami_id_db_scylla').split()
- region_names = self.get('region_name').split()
+ region_names = self.region_names
ami_id_db_oracle = self.get('ami_id_db_oracle').split()

for ami_list in [ami_id_db_scylla, ami_id_db_oracle]:
diff --git a/sdcm/sct_runner.py b/sdcm/sct_runner.py
--- a/sdcm/sct_runner.py
+++ b/sdcm/sct_runner.py
@@ -12,7 +12,7 @@

from sdcm.keystore import KeyStore
from sdcm.remote import RemoteCmdRunnerBase
-from sdcm.utils.common import ec2_instance_wait_public_ip, ec2_ami_get_root_device_name
+from sdcm.utils.aws_utils import ec2_instance_wait_public_ip, ec2_ami_get_root_device_name
from sdcm.utils.get_username import get_username
from sdcm.utils.prepare_region import AwsRegion

diff --git a/sdcm/tester.py b/sdcm/tester.py
--- a/sdcm/tester.py
+++ b/sdcm/tester.py
@@ -15,6 +15,7 @@
import logging
import os
import re
+import stat
import time
import random
import unittest
@@ -28,7 +29,6 @@
import sys
import traceback

-import boto3.session
from invoke.exceptions import UnexpectedExit, Failure

from cassandra.concurrent import execute_concurrent_with_args # pylint: disable=no-name-in-module
@@ -45,11 +45,14 @@
from sdcm.cluster_aws import ScyllaAWSCluster
from sdcm.cluster_aws import LoaderSetAWS
from sdcm.cluster_aws import MonitorSetAWS
-from sdcm.cluster_k8s import minikube, gke
+from sdcm.cluster_k8s import minikube, gke, eks, LOADER_CLUSTER_CONFIG
+from sdcm.cluster_k8s.eks import MonitorSetEKS
from sdcm.scylla_bench_thread import ScyllaBenchThread
+from sdcm.utils.aws_utils import init_monitoring_info_from_params, get_ec2_network_configuration, get_ec2_services, \
+ get_common_params, init_db_info_from_params, ec2_ami_get_root_device_name
from sdcm.utils.common import format_timestamp, wait_ami_available, tag_ami, update_certificates, \
download_dir_from_cloud, get_post_behavior_actions, get_testrun_status, download_encrypt_keys, PageFetcher, \
- rows_to_list, ec2_ami_get_root_device_name
+ rows_to_list
from sdcm.utils.get_username import get_username
from sdcm.utils.decorators import log_run_info, retrying
from sdcm.utils.ldap import LDAP_USERS, LDAP_PASSWORD, LDAP_ROLE, LDAP_BASE_OBJECT
@@ -66,7 +69,6 @@
from sdcm.sct_events.events_analyzer import stop_events_analyzer
from sdcm.stress_thread import CassandraStressThread
from sdcm.gemini_thread import GeminiStressThread
-from sdcm.utils.prepare_region import AwsRegion
from sdcm.ycsb_thread import YcsbStressThread
from sdcm.ndbench_thread import NdBenchStressThread
from sdcm.kcl_thread import KclStressThread, CompareTablesSizesThread
@@ -83,7 +85,6 @@
from sdcm.keystore import KeyStore
from sdcm.utils.latency import calculate_latency

-
try:
import cluster_cloud
except ImportError:
@@ -101,7 +102,6 @@
except ImportError:
pass

-
warnings.filterwarnings(action="ignore", message="unclosed",
category=ResourceWarning)
TEST_LOG = logging.getLogger(__name__)
@@ -115,6 +115,7 @@ def teardown_on_exception(method):
:param method: ScyllaClusterTester method to wrap.
:return: Wrapped method.
"""
+
@wraps(method)
def wrapper(*args, **kwargs):
try:
@@ -128,6 +129,7 @@ def wrapper(*args, **kwargs):
TEST_LOG.exception("Exception in %s. Will call tearDown", method.__name__)
args[0].tearDown()
raise
+
return wrapper


@@ -175,6 +177,7 @@ def decor(*args, **kwargs):
except Exception as exc: # pylint: disable=broad-except
self.log.debug(f"Finished '{name}'. {str(type(exc))} exception was silenced.")
self._store_test_result(args[0], exc, exc.__traceback__, name)
+
return decor

def __exit__(self, exc_type, exc_val, exc_tb):
@@ -204,7 +207,8 @@ def critical_failure_handler(signum, frame): # pylint: disable=unused-argument
signal.signal(signal.SIGUSR2, critical_failure_handler)


-class ClusterTester(db_stats.TestStatsMixin, unittest.TestCase): # pylint: disable=too-many-instance-attributes,too-many-public-methods
+class ClusterTester(db_stats.TestStatsMixin,
+ unittest.TestCase): # pylint: disable=too-many-instance-attributes,too-many-public-methods
log = None
localhost = None
events_processes_registry = None
@@ -313,6 +317,7 @@ def _init_test_timeout_thread(self) -> threading.Timer:

def kill_the_test():
TestTimeoutEvent(start_time=self.start_time, duration=self.test_duration).publish()
+
th = threading.Timer(60 * int(self.test_duration), kill_the_test)
th.daemon = True
th.start()
@@ -322,15 +327,19 @@ def _init_localhost(self):
return LocalHost(user_prefix=self.params.get("user_prefix"), test_id=Setup.test_id())

def _move_kubectl_config(self):
+ secure_mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR
os.environ['KUBECONFIG'] = self.kubectl_config_path
- if not(os.path.exists(self.kubectl_config_path)):
+ if not (os.path.exists(self.kubectl_config_path)):
os.makedirs(os.path.dirname(self.kubectl_config_path), exist_ok=True)
with open(self.kubectl_config_path, 'w') as kube_config_file:
kube_config_file.write('')
kube_config_file.flush()
+ os.chmod(os.path.dirname(self.kubectl_config_path), mode=secure_mode)
+ os.chmod(self.kubectl_config_path, mode=secure_mode)
os.environ['HELM_CONFIG_HOME'] = self.helm_config_path
- if not(os.path.exists(self.helm_config_path)):
+ if not (os.path.exists(self.helm_config_path)):
os.makedirs(os.path.dirname(self.helm_config_path), exist_ok=True)
+ os.chmod(os.path.dirname(self.helm_config_path), mode=secure_mode)

@cached_property
def kubectl_config_path(self):
@@ -444,7 +453,8 @@ def setUp(self):
self.params['update_db_packages'] = download_dir_from_cloud(update_db_packages)

append_scylla_yaml = self.params.get('append_scylla_yaml')
- if append_scylla_yaml and ('system_key_directory' in append_scylla_yaml or 'system_info_encryption' in append_scylla_yaml or 'kmip_hosts:' in append_scylla_yaml):
+ if append_scylla_yaml and (
+ 'system_key_directory' in append_scylla_yaml or 'system_info_encryption' in append_scylla_yaml or 'kmip_hosts:' in append_scylla_yaml):
download_encrypt_keys()

self.init_resources()
@@ -588,7 +598,7 @@ def get_cluster_gce(self, loader_info, db_info, monitor_info):
user_prefix = self.params.get('user_prefix')

services = get_gce_services(self.params.get('gce_datacenter').split())
- assert len(services) in (1, len(db_info['n_nodes']), )
+ assert len(services) in (1, len(db_info['n_nodes']),)
gce_datacenter, services = list(services.keys()), list(services.values())
TEST_LOG.info("Using GCE AZs: %s", gce_datacenter)

@@ -694,88 +704,24 @@ def get_cluster_aws(self, loader_info, db_info, monitor_info):
else:
loader_info['device_mappings'] = []

- if db_info['n_nodes'] is None:
- n_db_nodes = self.params.get('n_db_nodes')
- if isinstance(n_db_nodes, int): # legacy type
- db_info['n_nodes'] = [n_db_nodes]
- elif isinstance(n_db_nodes, str): # latest type to support multiple datacenters
- db_info['n_nodes'] = [int(n) for n in n_db_nodes.split()]
- else:
- self.fail('Unsupported parameter type: {}'.format(type(n_db_nodes)))
- if db_info['type'] is None:
- db_info['type'] = self.params.get('instance_type_db')
- if db_info['disk_size'] is None:
- db_info['disk_size'] = self.params.get('aws_root_disk_size_db')
- if db_info['device_mappings'] is None and self.params.get('ami_id_db_scylla'):
- if db_info['disk_size']:
- db_info['device_mappings'] = [{
- "DeviceName": ec2_ami_get_root_device_name(image_id=self.params.get('ami_id_db_scylla').split()[0],
- region=regions[0]),
- "Ebs": {
- "VolumeSize": db_info['disk_size'],
- "VolumeType": "gp2"
- }
- }]
- else:
- db_info['device_mappings'] = []
-
- if monitor_info['n_nodes'] is None:
- monitor_info['n_nodes'] = self.params.get('n_monitor_nodes')
- if monitor_info['type'] is None:
- monitor_info['type'] = self.params.get('instance_type_monitor')
- if monitor_info['disk_size'] is None:
- monitor_info['disk_size'] = self.params.get('aws_root_disk_size_monitor')
- if monitor_info['device_mappings'] is None:
- if monitor_info['disk_size']:
- monitor_info['device_mappings'] = [{
- "DeviceName": ec2_ami_get_root_device_name(image_id=self.params.get('ami_id_monitor').split()[0],
- region=regions[0]),
- "Ebs": {
- "VolumeSize": monitor_info['disk_size'],
- "VolumeType": "gp2"
- }
- }]
- else:
- monitor_info['device_mappings'] = []
+ init_db_info_from_params(db_info, params=self.params, regions=regions)
+ init_monitoring_info_from_params(monitor_info, params=self.params, regions=regions)
user_prefix = self.params.get('user_prefix')

user_credentials = self.params.get('user_credentials_path')
- services = []
+
regions = self.params.get('region_name').split()
- for region in regions:
- session = boto3.session.Session(region_name=region)
- service = session.resource('ec2')
- services.append(service)
+ services = get_ec2_services(regions)
+
+ for _ in regions:
self.credentials.append(UserRemoteCredentials(key_file=user_credentials))

ami_ids = self.params.get('ami_id_db_scylla').split()
for idx, ami_id in enumerate(ami_ids):
wait_ami_available(services[idx].meta.client, ami_id)
- ec2_security_group_ids = []
- ec2_subnet_ids = []
- cluster_backend = self.params.get('cluster_backend')
- if "aws" in cluster_backend:
- availability_zone = self.params.get("availability_zone")
- for region in regions:
- aws_region = AwsRegion(region_name=region)
- sct_subnet = aws_region.sct_subnet(region_az=region + availability_zone)
- assert sct_subnet, f"No SCT subnet configured for {region}! Run 'hydra prepare-aws-region'"
- ec2_subnet_ids.append(sct_subnet.subnet_id)
- sct_sg = aws_region.sct_security_group
- assert sct_sg, f"No SCT security group configured for {region}! Run 'hydra prepare-aws-region'"
- ec2_security_group_ids.append([sct_sg.group_id])
- else:
- for i in self.params.get('security_group_ids').split():
- ec2_security_group_ids.append(i.split(','))
- ec2_subnet_ids = self.params.get('subnet_id').split()

- common_params = dict(ec2_security_group_ids=ec2_security_group_ids,
- ec2_subnet_id=ec2_subnet_ids,
- services=services,
- credentials=self.credentials,
- user_prefix=user_prefix,
- params=self.params
- )
+ common_params = get_common_params(params=self.params, regions=regions, credentials=self.credentials,
+ services=services)

def create_cluster(db_type='scylla'):
cl_params = dict(
@@ -939,14 +885,13 @@ def get_cluster_k8s_gce_minikube(self):

self.db_cluster = \
minikube.MinikubeScyllaPodCluster(k8s_cluster=self.k8s_cluster,
- scylla_cluster_config=minikube.SCYLLA_CLUSTER_CONFIG,
scylla_cluster_name=self.params.get("k8s_scylla_cluster_name"),
user_prefix=self.params.get("user_prefix"),
n_nodes=self.params.get("n_db_nodes"),
params=self.params)

self.log.debug("Update startup script with iptables rules")
- startup_script = "\n".join((Setup.get_startup_script(), *self.db_cluster.nodes_iptables_redirect_rules(), ))
+ startup_script = "\n".join((Setup.get_startup_script(), *self.db_cluster.nodes_iptables_redirect_rules(),))
Setup.get_startup_script = lambda: startup_script

self.loaders = LoaderSetGCE(gce_image=self.params.get("gce_image"),
@@ -993,47 +938,66 @@ def get_cluster_k8s_gke(self):
gce_image_size=self.params.get("gce_root_disk_size_db"),
gce_network=self.params.get("gce_network"),
services=services,
- credentials=self.credentials,
- gce_n_local_ssd=self.params.get("gce_n_local_ssd_disk_db"),
- gce_instance_type=self.params.get("gce_instance_type_db"),
user_prefix=self.params.get("user_prefix"),
- n_nodes=self.params.get("gke_cluster_n_nodes"),
+ n_nodes=1,
params=self.params,
gce_datacenter=gce_datacenter)
- self.k8s_cluster.wait_for_init()
-
+ self.k8s_cluster.deploy()
+ scylla_pool = gke.GkeNodePool(
+ name=self.k8s_cluster.SCYLLA_POOL_NAME,
+ local_ssd_count=self.params.get("gce_n_local_ssd_disk_db"),
+ disk_size=self.params.get("gce_root_disk_size_db"),
+ disk_type=self.params.get("gce_root_disk_type_db"),
+ instance_type=self.params.get("gce_instance_type_db"),
+ num_nodes=int(self.params.get("n_db_nodes")) + 1,
+ k8s_cluster=self.k8s_cluster)
+ self.k8s_cluster.deploy_node_pool(scylla_pool, wait_till_ready=False)
+ loader_pool = gke.GkeNodePool(
+ name=self.k8s_cluster.LOADER_POOL_NAME,
+ instance_type=self.params.get("gce_instance_type_loader"),
+ num_nodes=self.params.get("n_loaders"),
+ k8s_cluster=self.k8s_cluster)
+ self.k8s_cluster.deploy_node_pool(loader_pool, wait_till_ready=False)
+
+ monitor_pool = None
if self.params.get('k8s_deploy_monitoring'):
- self.k8s_cluster.add_gke_pool(name="monitoring",
- num_nodes=1,
- instance_type=self.params.get("gce_instance_type_monitor"))
- self.k8s_cluster.deploy_monitoring_cluster(
- self.params.get('k8s_scylla_operator_docker_image').split(':')[-1],
- is_manager_deployed=self.params.get('use_mgmt')
- )
-
+ monitor_pool = gke.GkeNodePool(
+ name=self.k8s_cluster.MONITORING_POOL_NAME,
+ local_ssd_count=self.params.get("gce_n_local_ssd_disk_monitor"),
+ disk_size=self.params.get("gce_root_disk_size_monitor"),
+ disk_type=self.params.get("gce_root_disk_type_monitor"),
+ instance_type=self.params.get("gce_instance_type_monitor"),
+ num_nodes=self.params.get("n_monitor_nodes"),
+ k8s_cluster=self.k8s_cluster)
+ self.k8s_cluster.deploy_node_pool(monitor_pool, wait_till_ready=False)
+ self.k8s_cluster.wait_all_node_pools_to_be_ready()
self.k8s_cluster.deploy_cert_manager()
self.k8s_cluster.deploy_scylla_operator()
if self.params.get('use_mgmt'):
self.k8s_cluster.deploy_minio_s3_backend(minio_bucket_name=self.params.get('backup_bucket_location'))
self.k8s_cluster.deploy_scylla_manager()

self.db_cluster = gke.GkeScyllaPodCluster(k8s_cluster=self.k8s_cluster,
- scylla_cluster_config=gke.SCYLLA_CLUSTER_CONFIG,
scylla_cluster_name=self.params.get("k8s_scylla_cluster_name"),
user_prefix=self.params.get("user_prefix"),
n_nodes=self.params.get("n_db_nodes"),
- params=self.params)
+ params=self.params,
+ node_pool=scylla_pool)

- self.k8s_cluster.add_gke_pool(name=self.params.get("k8s_loader_cluster_name"),
- num_nodes=self.params.get("n_loaders"),
- instance_type=self.params.get("gce_instance_type_loader"))
+ if self.params.get('k8s_deploy_monitoring'):
+ self.k8s_cluster.deploy_monitoring_cluster(
+ scylla_operator_tag=self.params.get('k8s_scylla_operator_docker_image').split(':')[-1],
+ is_manager_deployed=self.params.get('use_mgmt'),
+ node_pool=monitor_pool
+ )

self.loaders = cluster_k8s.LoaderPodCluster(k8s_cluster=self.k8s_cluster,
- loader_cluster_config=gke.LOADER_CLUSTER_CONFIG,
+ loader_cluster_config=LOADER_CLUSTER_CONFIG,
loader_cluster_name=self.params.get("k8s_loader_cluster_name"),
user_prefix=self.params.get("user_prefix"),
n_nodes=self.params.get("n_loaders"),
- params=self.params)
+ params=self.params,
+ node_pool=loader_pool)

if self.params.get("n_monitor_nodes") > 0:
self.log.debug("Update startup script with iptables rules")
@@ -1058,6 +1022,135 @@ def get_cluster_k8s_gke(self):
else:
self.monitors = NoMonitorSet()

+ def get_cluster_k8s_eks(self):
+ self.credentials.append(UserRemoteCredentials(key_file=self.params.get('user_credentials_path')))
+ regions = self.params.region_names
+
+ ec2_security_group_ids, ec2_subnet_ids = get_ec2_network_configuration(
+ regions=regions,
+ availability_zones=self.params.get('availability_zone').split(','),
+ )
+
+ services = get_ec2_services(regions)
+
+ common_params = get_common_params(params=self.params, regions=regions, credentials=self.credentials,
+ services=services)
+
+ monitor_info = {'n_nodes': None, 'type': None, 'disk_size': None, 'disk_type': None, 'n_local_ssd': None,
+ 'device_mappings': None}
+ db_info = {'n_nodes': None, 'type': None, 'disk_size': None, 'disk_type': None, 'n_local_ssd': None,
+ 'device_mappings': None}
+
+ init_db_info_from_params(db_info, params=self.params, regions=regions)
+ init_monitoring_info_from_params(monitor_info, params=self.params, regions=regions)
+
+ self.k8s_cluster = eks.EksCluster(eks_cluster_version=self.params.get("eks_cluster_version"),
+ ec2_security_group_ids=ec2_security_group_ids,
+ ec2_subnet_ids=ec2_subnet_ids,
+ credentials=self.credentials,
+ ec2_role_arn=self.params.get("eks_role_arn"),
+ nodegroup_role_arn=self.params.get("eks_nodegroup_role_arn"),
+ service_ipv4_cidr=self.params.get("eks_service_ipv4_cidr"),
+ vpc_cni_version=self.params.get("eks_vpc_cni_version"),
+ user_prefix=self.params.get("user_prefix"),
+ params=self.params,
+ region_name=self.params.region_names[0]
+ )
+
+ self.k8s_cluster.deploy()
+ self.k8s_cluster.deploy_node_pool(
+ eks.EksNodePool(
+ name=self.k8s_cluster.AUXILIARY_POOL_NAME,
+ num_nodes=1,
+ instance_type="t3.xlarge",
+ # It should have at least 3 vCPU to be able to hold all the pods
+ disk_size=40,
+ role_arn=self.k8s_cluster.nodegroup_role_arn,
+ provision_type=self.params.get('instance_provision'),
+ k8s_cluster=self.k8s_cluster),
+ wait_till_ready=False)
+
+ scylla_pool = eks.EksNodePool(
+ name=self.k8s_cluster.SCYLLA_POOL_NAME,
+ num_nodes=self.params.get("n_db_nodes") + 1,
+ instance_type=self.params.get('instance_type_db'),
+ role_arn=self.params.get('eks_nodegroup_role_arn'),
+ provision_type=self.params.get('instance_provision'),
+ disk_size=self.params.get('aws_root_disk_size_db'),
+ k8s_cluster=self.k8s_cluster
+ )
+ self.k8s_cluster.deploy_node_pool(scylla_pool, wait_till_ready=False)
+
+ loader_pool = eks.EksNodePool(
+ name=self.k8s_cluster.LOADER_POOL_NAME,
+ num_nodes=self.params.get("n_loaders"),
+ instance_type=self.params.get("instance_type_monitor"),
+ role_arn=self.params.get('eks_nodegroup_role_arn'),
+ provision_type=self.params.get('instance_provision'),
+ disk_size=self.params.get('aws_root_disk_size_monitor'),
+ k8s_cluster=self.k8s_cluster)
+ self.k8s_cluster.deploy_node_pool(loader_pool, wait_till_ready=False)
+
+ monitor_pool = None
+ if self.params.get('k8s_deploy_monitoring'):
+ monitor_pool = eks.EksNodePool(
+ name=self.k8s_cluster.MONITORING_POOL_NAME,
+ num_nodes=1,
+ instance_type=self.params.get("instance_type_monitor"),
+ role_arn=self.params.get('eks_nodegroup_role_arn'),
+ provision_type=self.params.get('instance_provision'),
+ disk_size=self.params.get('aws_root_disk_size_monitor'),
+ k8s_cluster=self.k8s_cluster
+ )
+ self.k8s_cluster.deploy_node_pool(monitor_pool, wait_till_ready=False)
+ self.k8s_cluster.wait_all_node_pools_to_be_ready()
+
+ self.k8s_cluster.deploy_cert_manager()
+ self.k8s_cluster.deploy_scylla_operator()
+ if self.params.get('use_mgmt'):
+ self.k8s_cluster.deploy_scylla_manager()
+
+ self.db_cluster = eks.EksScyllaPodCluster(
+ k8s_cluster=self.k8s_cluster,
+ scylla_cluster_name=self.params.get("k8s_scylla_cluster_name"),
+ user_prefix=self.params.get("user_prefix"),
+ n_nodes=self.params.get("n_db_nodes"),
+ params=self.params,
+ node_pool=scylla_pool
+ )
+
+ if self.params.get('k8s_deploy_monitoring'):
+ self.k8s_cluster.deploy_monitoring_cluster(
+ scylla_operator_tag=self.params.get('k8s_scylla_operator_docker_image').split(':')[-1],
+ is_manager_deployed=self.params.get('use_mgmt'),
+ node_pool=monitor_pool
+ )
+
+ self.loaders = cluster_k8s.LoaderPodCluster(k8s_cluster=self.k8s_cluster,
+ loader_cluster_config=LOADER_CLUSTER_CONFIG,
+ loader_cluster_name=self.params.get("k8s_loader_cluster_name"),
+ user_prefix=self.params.get("user_prefix"),
+ n_nodes=self.params.get("n_loaders"),
+ params=self.params,
+ node_pool=loader_pool)
+
+ if monitor_info['n_nodes']:
+ self.log.debug("Update startup script with iptables rules")
+ startup_script = "\n".join((Setup.get_startup_script(), *self.db_cluster.nodes_iptables_redirect_rules(),))
+ Setup.get_startup_script = lambda: startup_script
+
+ self.monitors = MonitorSetEKS(
+ ec2_ami_id=self.params.get('ami_id_monitor').split(),
+ ec2_ami_username=self.params.get('ami_monitor_user'),
+ ec2_instance_type=monitor_info['type'],
+ ec2_block_device_mappings=monitor_info['device_mappings'],
+ n_nodes=monitor_info['n_nodes'],
+ targets=dict(db_cluster=self.db_cluster,
+ loaders=self.loaders),
+ **common_params)
+ else:
+ self.monitors = NoMonitorSet()
+
def init_resources(self, loader_info=None, db_info=None,
monitor_info=None):
# pylint: disable=too-many-locals,too-many-statements,too-many-branches
@@ -1090,6 +1183,8 @@ def init_resources(self, loader_info=None, db_info=None,
self.get_cluster_k8s_gce_minikube()
elif cluster_backend == 'k8s-gke':
self.get_cluster_k8s_gke()
+ elif cluster_backend == 'k8s-eks':
+ self.get_cluster_k8s_eks()

def _cs_add_node_flag(self, stress_cmd):
if '-node' not in stress_cmd:
@@ -1106,7 +1201,8 @@ def run_stress(self, stress_cmd, duration=None):
duration=duration)
self.verify_stress_thread(cs_thread_pool=cs_thread_pool)

- def run_stress_thread(self, stress_cmd, duration=None, stress_num=1, keyspace_num=1, profile=None, prefix='', # pylint: disable=too-many-arguments
+ def run_stress_thread(self, stress_cmd, duration=None, stress_num=1, keyspace_num=1, profile=None, prefix='',
+ # pylint: disable=too-many-arguments
round_robin=False, stats_aggregate_cmds=True, keyspace_name=None, use_single_loader=False,
stop_test_on_failure=True):

@@ -1131,8 +1227,10 @@ def run_stress_thread(self, stress_cmd, duration=None, stress_num=1, keyspace_nu
else:
raise ValueError(f'Unsupported stress command: "{stress_cmd[:50]}..."')

- def run_stress_cassandra_thread(self, stress_cmd, duration=None, stress_num=1, keyspace_num=1, profile=None, prefix='', # pylint: disable=too-many-arguments,unused-argument
- round_robin=False, stats_aggregate_cmds=True, keyspace_name=None, use_single_loader=False, # pylint: disable=too-many-arguments,unused-argument
+ def run_stress_cassandra_thread(self, stress_cmd, duration=None, stress_num=1, keyspace_num=1, profile=None,
+ prefix='', # pylint: disable=too-many-arguments,unused-argument
+ round_robin=False, stats_aggregate_cmds=True, keyspace_name=None,
+ use_single_loader=False, # pylint: disable=too-many-arguments,unused-argument
stop_test_on_failure=True): # pylint: disable=too-many-arguments,unused-argument
# stress_cmd = self._cs_add_node_flag(stress_cmd)
timeout = self.get_duration(duration)
@@ -1159,8 +1257,10 @@ def run_stress_cassandra_thread(self, stress_cmd, duration=None, stress_num=1, k
self.db_cluster.wait_for_schema_agreement()
return cs_thread

- def run_stress_thread_bench(self, stress_cmd, duration=None, stress_num=1, keyspace_num=1, profile=None, prefix='', # pylint: disable=too-many-arguments,unused-argument
- round_robin=False, stats_aggregate_cmds=True, keyspace_name=None, use_single_loader=False,
+ def run_stress_thread_bench(self, stress_cmd, duration=None, stress_num=1, keyspace_num=1, profile=None, prefix='',
+ # pylint: disable=too-many-arguments,unused-argument
+ round_robin=False, stats_aggregate_cmds=True, keyspace_name=None,
+ use_single_loader=False,
stop_test_on_failure=True): # pylint: disable=too-many-arguments,unused-argument

timeout = self.get_duration(duration)
@@ -1187,9 +1287,12 @@ def run_stress_thread_bench(self, stress_cmd, duration=None, stress_num=1, keysp
self.alter_test_tables_encryption(scylla_encryption_options=scylla_encryption_options)
return bench_thread

- def run_ycsb_thread(self, stress_cmd, duration=None, stress_num=1, prefix='', # pylint: disable=too-many-arguments,unused-argument
- round_robin=False, stats_aggregate_cmds=True, # pylint: disable=too-many-arguments,unused-argument
- keyspace_num=None, keyspace_name=None, profile=None, use_single_loader=False): # pylint: disable=too-many-arguments,unused-argument
+ def run_ycsb_thread(self, stress_cmd, duration=None, stress_num=1, prefix='',
+ # pylint: disable=too-many-arguments,unused-argument
+ round_robin=False, stats_aggregate_cmds=True,
+ # pylint: disable=too-many-arguments,unused-argument
+ keyspace_num=None, keyspace_name=None, profile=None,
+ use_single_loader=False): # pylint: disable=too-many-arguments,unused-argument

timeout = self.get_duration(duration)

@@ -1203,9 +1306,12 @@ def run_ycsb_thread(self, stress_cmd, duration=None, stress_num=1, prefix='', #
node_list=self.db_cluster.nodes,
round_robin=round_robin, params=self.params).run()

- def run_hydra_kcl_thread(self, stress_cmd, duration=None, stress_num=1, prefix='', # pylint: disable=too-many-arguments,unused-argument
- round_robin=False, stats_aggregate_cmds=True, # pylint: disable=too-many-arguments,unused-argument
- keyspace_num=None, keyspace_name=None, profile=None, use_single_loader=False): # pylint: disable=too-many-arguments,unused-argument
+ def run_hydra_kcl_thread(self, stress_cmd, duration=None, stress_num=1, prefix='',
+ # pylint: disable=too-many-arguments,unused-argument
+ round_robin=False, stats_aggregate_cmds=True,
+ # pylint: disable=too-many-arguments,unused-argument
+ keyspace_num=None, keyspace_name=None, profile=None,
+ use_single_loader=False): # pylint: disable=too-many-arguments,unused-argument

timeout = self.get_duration(duration)

@@ -1219,9 +1325,12 @@ def run_hydra_kcl_thread(self, stress_cmd, duration=None, stress_num=1, prefix='
node_list=self.db_cluster.nodes,
round_robin=round_robin, params=self.params).run()

- def run_table_compare_thread(self, stress_cmd, duration=None, stress_num=1, prefix='', # pylint: disable=too-many-arguments,unused-argument
- round_robin=False, stats_aggregate_cmds=True, # pylint: disable=too-many-arguments,unused-argument
- keyspace_num=None, keyspace_name=None, profile=None, use_single_loader=False): # pylint: disable=too-many-arguments,unused-argument
+ def run_table_compare_thread(self, stress_cmd, duration=None, stress_num=1, prefix='',
+ # pylint: disable=too-many-arguments,unused-argument
+ round_robin=False, stats_aggregate_cmds=True,
+ # pylint: disable=too-many-arguments,unused-argument
+ keyspace_num=None, keyspace_name=None, profile=None,
+ use_single_loader=False): # pylint: disable=too-many-arguments,unused-argument

timeout = self.get_duration(duration)

@@ -1232,9 +1341,11 @@ def run_table_compare_thread(self, stress_cmd, duration=None, stress_num=1, pref
node_list=self.db_cluster.nodes,
round_robin=round_robin, params=self.params).run()

- def run_ndbench_thread(self, stress_cmd, duration=None, stress_num=1, prefix='', # pylint: disable=too-many-arguments
+ def run_ndbench_thread(self, stress_cmd, duration=None, stress_num=1, prefix='',
+ # pylint: disable=too-many-arguments
round_robin=False, stats_aggregate_cmds=True,
- keyspace_num=None, keyspace_name=None, profile=None, use_single_loader=False): # pylint: disable=unused-argument
+ keyspace_num=None, keyspace_name=None, profile=None,
+ use_single_loader=False): # pylint: disable=unused-argument

timeout = self.get_duration(duration)

@@ -1248,9 +1359,10 @@ def run_ndbench_thread(self, stress_cmd, duration=None, stress_num=1, prefix='',
node_list=self.db_cluster.nodes,
round_robin=round_robin, params=self.params).run()

- def run_cdclog_reader_thread(self, stress_cmd, duration=None, stress_num=1, prefix='', # pylint: disable=too-many-arguments
+ def run_cdclog_reader_thread(self, stress_cmd, duration=None, stress_num=1, prefix='',
+ # pylint: disable=too-many-arguments
round_robin=False, stats_aggregate_cmds=True, enable_batching=True,
- keyspace_name=None, base_table_name=None): # pylint: disable=unused-argument
+ keyspace_name=None, base_table_name=None): # pylint: disable=unused-argument
timeout = self.get_duration(duration)

if self.create_stats:
@@ -1431,7 +1543,8 @@ def is_keyspace_in_cluster(session, keyspace_name):
keyspace_list = [row.keyspace_name.lower() for row in query_result.current_rows]
return keyspace_name.lower() in keyspace_list

- def wait_validate_keyspace_existence(self, session, keyspace_name, timeout=180, step=5): # pylint: disable=invalid-name
+ def wait_validate_keyspace_existence(self, session, keyspace_name, timeout=180,
+ step=5): # pylint: disable=invalid-name
text = 'waiting for the keyspace "{}" to be created in the cluster'.format(keyspace_name)
does_keyspace_exist = wait.wait_for(func=self.is_keyspace_in_cluster, step=step, text=text, timeout=timeout,
session=session, keyspace_name=keyspace_name)
@@ -1510,7 +1623,7 @@ def create_table(self, name, key_type="varchar", # pylint: disable=too-many-arg
compaction_clause = " 'class': '%s'" % compaction
if sstable_size:
compaction_clause += ", 'sstable_size_in_mb' : '%s'" % sstable_size
- query += prefix+compaction_clause+postfix
+ query += prefix + compaction_clause + postfix

if read_repair is not None:
query = '%s AND read_repair_chance=%f' % (query, read_repair)
@@ -1536,7 +1649,8 @@ def truncate_cf(self, ks_name, table_name, session):
except Exception as ex: # pylint: disable=broad-except
self.log.debug('Failed to truncate base table {0}.{1}. Error: {2}'.format(ks_name, table_name, str(ex)))

- def create_materialized_view(self, ks_name, base_table_name, mv_name, mv_partition_key, mv_clustering_key, session, # pylint: disable=too-many-arguments
+ def create_materialized_view(self, ks_name, base_table_name, mv_name, mv_partition_key, mv_clustering_key, session,
+ # pylint: disable=too-many-arguments
mv_columns='*', speculative_retry=None, read_repair=None, compression=None,
gc_grace=None, compact_storage=False):

@@ -1556,12 +1670,15 @@ def create_materialized_view(self, ks_name, base_table_name, mv_name, mv_partiti
cl_clause = ', '.join(cl for cl in mv_clustering_key)

query = 'CREATE MATERIALIZED VIEW {ks}.{mv_name} AS SELECT {mv_columns} FROM {ks}.{table_name} ' \
- 'WHERE {where_clause} PRIMARY KEY ({pk}, {cl}) WITH comment=\'test MV\''.format(ks=ks_name, mv_name=mv_name,
- mv_columns=mv_columns_str,
- table_name=base_table_name,
- where_clause=' and '.join
- (wc for wc in where_clause),
- pk=pk_clause, cl=cl_clause)
+ 'WHERE {where_clause} PRIMARY KEY ({pk}, {cl}) WITH comment=\'test MV\''.format(ks=ks_name,
+ mv_name=mv_name,
+ mv_columns=mv_columns_str,
+ table_name=base_table_name,
+ where_clause=' and '.join
+ (wc for wc in
+ where_clause),
+ pk=pk_clause,
+ cl=cl_clause)
if compression is not None:
query = ('%s AND compression = { \'sstable_compression\': '
'\'%sCompressor\' }' % (query, compression))
@@ -1610,7 +1727,8 @@ def _wait_for_view_build_start(self, session, key_space, view, seconds_to_wait=2

def _check_build_started():
result = self.rows_to_list(session.execute("SELECT last_token FROM system.views_builds_in_progress "
- "WHERE keyspace_name='{0}' AND view_name='{1}'".format(key_space, view)))
+ "WHERE keyspace_name='{0}' AND view_name='{1}'".format(key_space,
+ view)))
self.log.debug('View build in progress: {}'.format(result))
return result != []

@@ -1624,7 +1742,8 @@ def _check_build_started():
def rows_to_list(rows):
return [list(row) for row in rows]

- def copy_table(self, node, src_keyspace, src_table, dest_keyspace, dest_table, columns_list=None, copy_data=False): # pylint: disable=too-many-arguments
+ def copy_table(self, node, src_keyspace, src_table, dest_keyspace, dest_table, columns_list=None,
+ copy_data=False): # pylint: disable=too-many-arguments
"""
Create table with same structure as <src_keyspace>.<src_table>.
If columns_list is supplied, the table with create with the columns that in the columns_list
@@ -1642,12 +1761,13 @@ def copy_table(self, node, src_keyspace, src_table, dest_keyspace, dest_table, c
result = self.copy_data_between_tables(node, src_keyspace, src_table,
dest_keyspace, dest_table, columns_list)
except Exception as error: # pylint: disable=broad-except
- self.log. error(f'Copying data from {src_table} to {dest_table} failed with error: {error}')
+ self.log.error(f'Copying data from {src_table} to {dest_table} failed with error: {error}')
return False

return result

- def copy_view(self, node, src_keyspace, src_view, dest_keyspace, dest_table, columns_list=None, copy_data=False): # pylint: disable=too-many-arguments
+ def copy_view(self, node, src_keyspace, src_view, dest_keyspace, dest_table, columns_list=None,
+ copy_data=False): # pylint: disable=too-many-arguments
"""
Create table with same structure as <src_keyspace>.<src_view>.
If columns_list is supplied, the table with create with the columns that in the columns_list
@@ -1666,12 +1786,13 @@ def copy_view(self, node, src_keyspace, src_view, dest_keyspace, dest_table, col
result = self.copy_data_between_tables(node, src_keyspace, src_view,
dest_keyspace, dest_table, columns_list)
except Exception as error: # pylint: disable=broad-except
- self.log. error(f'Copying data from {src_view} to {dest_table} failed with error {error}')
+ self.log.error(f'Copying data from {src_view} to {dest_table} failed with error {error}')
return False

return result

- def create_table_as(self, node, src_keyspace, src_table, dest_keyspace, dest_table, create_statement, # pylint: disable=too-many-arguments,too-many-locals,inconsistent-return-statements
+ def create_table_as(self, node, src_keyspace, src_table, dest_keyspace, dest_table, create_statement,
+ # pylint: disable=too-many-arguments,too-many-locals,inconsistent-return-statements
columns_list=None):
""" Create table with same structure as another table or view
If columns_list is supplied, the table with create with the columns that in the columns_list
@@ -1701,9 +1822,10 @@ def create_table_as(self, node, src_keyspace, src_table, dest_keyspace, dest_tab

for column in columns_list or result.column_names:
column_kind = session.execute("select kind from system_schema.columns where keyspace_name='{ks}' "
- "and table_name='{name}' and column_name='{column}'".format(ks=src_keyspace,
- name=src_table,
- column=column))
+ "and table_name='{name}' and column_name='{column}'".format(
+ ks=src_keyspace,
+ name=src_table,
+ column=column))
if column_kind.current_rows[0].kind in ['partition_key', 'clustering']:
primary_keys.append(column)

@@ -1734,11 +1856,12 @@ def fetch_all_rows(self, session, default_fetch_size, statement, verbose=True):
fetcher = PageFetcher(result).request_all()
current_rows = fetcher.all_data()
if verbose and current_rows:
- dataset_size = sum(sys.getsizeof(e) for e in current_rows[0])*len(current_rows)
+ dataset_size = sum(sys.getsizeof(e) for e in current_rows[0]) * len(current_rows)
self.log.debug(f"Size of fetched rows: {dataset_size} bytes")
return current_rows

- def copy_data_between_tables(self, node, src_keyspace, src_table, dest_keyspace, # pylint: disable=too-many-arguments,too-many-locals
+ def copy_data_between_tables(self, node, src_keyspace, src_table, dest_keyspace,
+ # pylint: disable=too-many-arguments,too-many-locals
dest_table, columns_list=None):
""" Copy all data from one table/view to another table
Structure of the tables has to be same
@@ -1781,15 +1904,16 @@ def copy_data_between_tables(self, node, src_keyspace, src_table, dest_keyspace,

session.default_consistency_level = ConsistencyLevel.QUORUM

- results = execute_concurrent_with_args(session=session, statement=insert_statement, parameters=source_table_rows,
+ results = execute_concurrent_with_args(session=session, statement=insert_statement,
+ parameters=source_table_rows,
concurrency=max_workers, results_generator=True)

try:
succeeded_rows = sum(1 for (success, result) in results if success)
if succeeded_rows != len(source_table_rows):
- self. log.warning(f'Problem during copying data. Not all rows were inserted.'
- f'Rows expected to be inserted: {len(source_table_rows)}; '
- f'Actually inserted rows: {succeeded_rows}.')
+ self.log.warning(f'Problem during copying data. Not all rows were inserted.'
+ f'Rows expected to be inserted: {len(source_table_rows)}; '
+ f'Actually inserted rows: {succeeded_rows}.')
return False
except Exception as exc:
self.log.warning(f'Problem during copying data: {exc}')
@@ -1798,9 +1922,9 @@ def copy_data_between_tables(self, node, src_keyspace, src_table, dest_keyspace,
result = session.execute(f"SELECT count(*) FROM {dest_keyspace}.{dest_table}")
if result:
if result.current_rows[0].count != len(source_table_rows):
- self. log.warning(f'Problem during copying data. '
- f'Rows in source table: {len(source_table_rows)}; '
- f'Rows in destination table: {len(result.current_rows)}.')
+ self.log.warning(f'Problem during copying data. '
+ f'Rows in source table: {len(source_table_rows)}; '
+ f'Rows in destination table: {len(result.current_rows)}.')
return False
self.log.debug(f'All rows have been copied from {src_table} to {dest_table}')
return True
@@ -2175,7 +2299,8 @@ def alter_table_encryption(self, table, scylla_encryption_options=None, upgrades
Update table encryption
"""
if not scylla_encryption_options:
- self.log.debug('scylla_encryption_options is not set, skipping to enable encryption at-rest for all test tables')
+ self.log.debug(
+ 'scylla_encryption_options is not set, skipping to enable encryption at-rest for all test tables')
else:
with self.db_cluster.cql_connection_patient(self.db_cluster.nodes[0]) as session:
query = "ALTER TABLE {table} WITH scylla_encryption_options = {scylla_encryption_options};".format(
@@ -2223,8 +2348,8 @@ def wait_for_hints_to_be_sent(self, node, num_dest_nodes):
num_shards = self.get_num_shards(node)
hints_after_send_completed = num_shards * num_dest_nodes
# after hints were sent to all nodes, the number of files should be 1 per shard per destination
- assert self.get_num_of_hint_files(node) <= hints_after_send_completed, "Waiting until the number of hint files"\
- " will be %s." % hints_after_send_completed
+ assert self.get_num_of_hint_files(node) <= hints_after_send_completed, "Waiting until the number of hint files" \
+ " will be %s." % hints_after_send_completed
assert self.hints_sending_in_progress() is False, "Waiting until Prometheus hints counter will not change"

def verify_no_drops_and_errors(self, starting_from):
@@ -2249,13 +2374,13 @@ def get_data_set_size(self, cs_cmd):
@retrying(n=60, sleep_time=60, allowed_exceptions=(AssertionError,))
def wait_data_dir_reaching(self, size, node):
query = '(sum(node_filesystem_size_bytes{{mountpoint="{0.scylla_dir}", ' \
- 'instance=~"{1.private_ip_address}"}})-sum(node_filesystem_avail_bytes{{mountpoint="{0.scylla_dir}", ' \
- 'instance=~"{1.private_ip_address}"}}))'.format(self, node)
+ 'instance=~"{1.private_ip_address}"}})-sum(node_filesystem_avail_bytes{{mountpoint="{0.scylla_dir}", ' \
+ 'instance=~"{1.private_ip_address}"}}))'.format(self, node)
res = self.prometheus_db.query(query=query, start=time.time(), end=time.time())
assert res, "No results from Prometheus"
used = int(res[0]["values"][0][1]) / (2 ** 10)
assert used >= size, f"Waiting for Scylla data dir to reach '{size}', " \
- f"current size is: '{used}'"
+ f"current size is: '{used}'"

def check_latency_during_ops(self):
results_analyzer = LatencyDuringOperationsPerformanceAnalyzer(es_index=self._test_index,
@@ -2341,6 +2466,7 @@ def is_compactions_done():
if results:
assert any([float(v[1]) for v in results[0]["values"]]) is False, \
"Waiting until all compactions settle down"
+
is_compactions_done()

def metric_has_data(self, metric_query, n=80, sleep_time=60, ):
@@ -2407,7 +2533,7 @@ def collect_logs(self) -> None:
{"name": "k8s_cluster",
"nodes": getattr(self, "k8s_cluster", None) and self.k8s_cluster.nodes,
"collector": KubernetesLogCollector,
- "logname": "k8s_minikube_log", }, )
+ "logname": "k8s_minikube_log", },)

for cluster in clusters:
if not cluster["nodes"]:
@@ -2500,7 +2626,7 @@ def get_used_capacity(self, node) -> float: # pylint: disable=too-many-locals
self.log.debug(f"filesystem_capacity_query: {filesystem_capacity_query}")

fs_size_res = self.prometheus_db.query(query=filesystem_capacity_query,
- start=int(time.time())-5, end=int(time.time()))
+ start=int(time.time()) - 5, end=int(time.time()))
assert fs_size_res, "No results from Prometheus"
if not fs_size_res[0]: # if no returned values - try the old metric names.
filesystem_capacity_query = f'{fs_size_metric_old}{capacity_query_postfix}'
@@ -2516,7 +2642,7 @@ def get_used_capacity(self, node) -> float: # pylint: disable=too-many-locals
self.log.debug("used_capacity_query: {}".format(used_capacity_query))

used_cap_res = self.prometheus_db.query(
- query=used_capacity_query, start=int(time.time())-5, end=int(time.time()))
+ query=used_capacity_query, start=int(time.time()) - 5, end=int(time.time()))
self.log.debug("used_cap_res: {}".format(used_cap_res))

assert used_cap_res, "No results from Prometheus"
diff --git a/sdcm/utils/aws_utils.py b/sdcm/utils/aws_utils.py
--- a/sdcm/utils/aws_utils.py
+++ b/sdcm/utils/aws_utils.py
@@ -0,0 +1,313 @@
+from functools import cached_property
+from typing import List, Dict
+
+import boto3
+import logging
+import time
+
+from botocore.exceptions import ClientError
+
+from sdcm.utils.decorators import retrying
+from sdcm.utils.prepare_region import AwsRegion
+from sdcm.wait import wait_for
+
+
+LOGGER = logging.getLogger(__name__)
+
+
+class EksClusterCleanupMixin:
+ short_cluster_name: str
+ region_name: str
+
+ @cached_property
+ def eks_client(self):
+ return boto3.client('eks', region_name=self.region_name)
+
+ @cached_property
+ def ec2_client(self):
+ return boto3.client('ec2', region_name=self.region_name)
+
+ @cached_property
+ def elb_client(self):
+ return boto3.client('elb', region_name=self.region_name)
+
+ @property
+ def owned_object_tag_name(self):
+ return f'kubernetes.io/cluster/{self.short_cluster_name}'
+
+ @cached_property
+ def cluster_owned_objects_filter(self):
+ return [{"Name": f"tag:{self.owned_object_tag_name}", 'Values': ['owned']}]
+
+ @property
+ def attached_security_group_ids(self) -> List[str]:
+ return [group_desc['GroupId'] for group_desc in
+ self.ec2_client.describe_security_groups(Filters=self.cluster_owned_objects_filter)['SecurityGroups']]
+
+ @property
+ def attached_nodegroup_names(self) -> List[str]:
+ return self._get_attached_nodegroup_names()
+
+ @property
+ def failed_to_delete_nodegroup_names(self) -> List[str]:
+ return self._get_attached_nodegroup_names(status='DELETE_FAILED')
+
+ @property
+ def deleting_nodegroup_names(self) -> List[str]:
+ return self._get_attached_nodegroup_names(status='DELETING')
+
+ def _get_attached_nodegroup_names(self, status: str = None) -> List[str]:
+ if status is None:
+ return self.eks_client.list_nodegroups(clusterName=self.short_cluster_name)['nodegroups']
+ output = []
+ for nodegroup_name in self.attached_nodegroup_names:
+ if status == self.eks_client.describe_nodegroup(
+ clusterName=self.short_cluster_name, nodegroupName=nodegroup_name)['nodegroup']['status']:
+ output.append(nodegroup_name)
+ return output
+
+ @property
+ def all_load_balancers_names(self) -> List[str]:
+ return [elb_desc['LoadBalancerName'] for elb_desc in
+ self.elb_client.describe_load_balancers()['LoadBalancerDescriptions']]
+
+ @property
+ def attached_load_balancers_names(self) -> List[str]:
+ output = []
+ for tags_data in self.elb_client.describe_tags(
+ LoadBalancerNames=self.all_load_balancers_names)['TagDescriptions']:
+ for tag_data in tags_data['Tags']:
+ if tag_data['Key'] == self.owned_object_tag_name:
+ output.append(tags_data['LoadBalancerName'])
+ return output
+
+ @property
+ def cluster_exists(self) -> bool:
+ if self.short_cluster_name in self.eks_client.list_clusters()['clusters']:
+ return True
+ return False
+
+ def destroy(self):
+ for n in range(2):
+ self.destroy_nodegroups()
+ if self.failed_to_delete_nodegroup_names:
+ self.destroy_nodegroups(status='DELETE_FAILED')
+ self.destroy_cluster()
+ self.destroy_attached_load_balancers()
+ # Destroying of the security groups will affect load balancers and node groups that is why
+ # in order to do not distract load balancers cleaning process we should have
+ # destroy_attached_security_groups performed after destroy_attached_load_balancers
+ # but before retrying of the destroy_nodegroups
+ self.destroy_attached_security_groups()
+ if not self.cluster_exists:
+ return
+
+ def check_if_all_network_interfaces_detached(self, sg_id):
+ for interface_description in self.ec2_client.describe_network_interfaces(
+ Filters=[{'Name': 'group-id', 'Values': [sg_id]}])['NetworkInterfaces']:
+ if attachment := interface_description.get('Attachment'):
+ if attachment.get('AttachmentId'):
+ return False
+ return True
+
+ def delete_network_interfaces_of_sg(self, sg_id: str):
+ network_interfaces = self.ec2_client.describe_network_interfaces(
+ Filters=[{'Name': 'group-id', 'Values': [sg_id]}])['NetworkInterfaces']
+
+ for interface_description in network_interfaces:
+ network_interface_id = interface_description['NetworkInterfaceId']
+ if attachment := interface_description.get('Attachment'):
+ if attachment_id := attachment.get('AttachmentId'):
+ try:
+ self.ec2_client.detach_network_interface(AttachmentId=attachment_id, Force=True)
+ except Exception as exc:
+ LOGGER.debug("Failed to detach network interface (%s) attachment %s:\n%s",
+ network_interface_id, attachment_id, exc)
+
+ wait_for(self.check_if_all_network_interfaces_detached, sg_id=sg_id, timeout=120, throw_exc=False)
+
+ for interface_description in network_interfaces:
+ network_interface_id = interface_description['NetworkInterfaceId']
+ try:
+ self.ec2_client.delete_network_interface(NetworkInterfaceId=network_interface_id)
+ except Exception as exc:
+ LOGGER.debug("Failed to delete network interface %s :\n%s", network_interface_id, exc)
+
+ def destroy_attached_security_groups(self):
+ # EKS infra does not cleanup security group perfectly and some of them can be left alive
+ # even when cluster is gone
+ try:
+ sg_list = self.attached_security_group_ids
+ except Exception as exc:
+ LOGGER.debug("Failed to get list of security groups:\n%s", exc)
+ return
+
+ for security_group_id in sg_list:
+ # EKS Nodegroup deletion can fail due to the network interfaces stuck in attached state
+ # while instance is gone.
+ # In this case you need to forcefully detach interfaces and delete them to make nodegroup deletion possible.
+ try:
+ self.delete_network_interfaces_of_sg(security_group_id)
+ except Exception:
+ pass
+
+ try:
+ self.ec2_client.delete_security_group(GroupId=security_group_id)
+ except Exception as exc:
+ LOGGER.debug("Failed to delete security groups %s, due to the following error:\n%s",
+ security_group_id, exc)
+
+ def destroy_nodegroups(self, status=None):
+
+ def _destroy_attached_nodegroups():
+ for node_group_name in self._get_attached_nodegroup_names(status=status):
+ try:
+ self.eks_client.delete_nodegroup(clusterName=self.short_cluster_name, nodegroupName=node_group_name)
+ except Exception as exc:
+ LOGGER.debug("Failed to delete nodegroup %s/%s, due to the following error:\n%s",
+ self.short_cluster_name, node_group_name, exc)
+ time.sleep(10)
+ return wait_for(lambda: not self._get_attached_nodegroup_names(status='DELETING'),
+ text='Waiting till target nodegroups are deleted',
+ step=10,
+ timeout=300)
+
+ wait_for(_destroy_attached_nodegroups, timeout=400, throw_exc=False)
+
+ def destroy_attached_load_balancers(self):
+ # EKS infra does not cleanup load balancers and some of them can be left alive even when cluster is gone
+ try:
+ elb_names = self.attached_load_balancers_names
+ except Exception as exc:
+ LOGGER.debug("Failed to get list of load balancers:\n%s", exc)
+ return
+
+ for elb_name in elb_names:
+ try:
+ self.elb_client.delete_load_balancer(LoadBalancerName=elb_name)
+ except Exception as exc:
+ LOGGER.debug("Failed to delete load balancer %s, due to the following error:\n%s", elb_name, exc)
+
+ def destroy_cluster(self):
+ try:
+ self.eks_client.delete_cluster(name=self.short_cluster_name)
+ except Exception as exc:
+ LOGGER.debug("Failed to delete cluster %s, due to the following error:\n%s",
+ self.short_cluster_name, exc)
+
+
+def init_monitoring_info_from_params(monitor_info: dict, params: dict, regions: List):
+ if monitor_info['n_nodes'] is None:
+ monitor_info['n_nodes'] = params.get('n_monitor_nodes')
+ if monitor_info['type'] is None:
+ monitor_info['type'] = params.get('instance_type_monitor')
+ if monitor_info['disk_size'] is None:
+ monitor_info['disk_size'] = params.get('aws_root_disk_size_monitor')
+ if monitor_info['device_mappings'] is None:
+ if monitor_info['disk_size']:
+ monitor_info['device_mappings'] = [{
+ "DeviceName": ec2_ami_get_root_device_name(image_id=params.get('ami_id_monitor').split()[0],
+ region=regions[0]),
+ "Ebs": {
+ "VolumeSize": monitor_info['disk_size'],
+ "VolumeType": "gp2"
+ }
+ }]
+ else:
+ monitor_info['device_mappings'] = []
+ return monitor_info
+
+
+def init_db_info_from_params(db_info: dict, params: dict, regions: List, root_device: str = None):
+ if db_info['n_nodes'] is None:
+ n_db_nodes = params.get('n_db_nodes')
+ if isinstance(n_db_nodes, int): # legacy type
+ db_info['n_nodes'] = [n_db_nodes]
+ elif isinstance(n_db_nodes, str): # latest type to support multiple datacenters
+ db_info['n_nodes'] = [int(n) for n in n_db_nodes.split()]
+ else:
+ raise RuntimeError('Unsupported parameter type: %s', type(n_db_nodes))
+ if db_info['type'] is None:
+ db_info['type'] = params.get('instance_type_db')
+ if db_info['disk_size'] is None:
+ db_info['disk_size'] = params.get('aws_root_disk_size_db')
+ if db_info['device_mappings'] is None and (root_device or params.get('ami_id_db_scylla')):
+ if db_info['disk_size']:
+ root_device = root_device if root_device else ec2_ami_get_root_device_name(
+ image_id=params.get('ami_id_db_scylla').split()[0],
+ region=regions[0])
+ db_info['device_mappings'] = [{
+ "DeviceName": root_device,
+ "Ebs": {
+ "VolumeSize": db_info['disk_size'],
+ "VolumeType": "gp2"
+ }
+ }]
+ else:
+ db_info['device_mappings'] = []
+ return db_info
+
+
+def get_common_params(params: dict, regions: List, credentials: List, services: List) -> dict:
+ ec2_security_group_ids, ec2_subnet_ids = get_ec2_network_configuration(
+ regions=regions,
+ availability_zones=params.get('availability_zone').split(','),
+ )
+ return dict(ec2_security_group_ids=ec2_security_group_ids,
+ ec2_subnet_id=ec2_subnet_ids,
+ services=services,
+ credentials=credentials,
+ user_prefix=params.get('user_prefix'),
+ params=params,
+ )
+
+
+def get_ec2_network_configuration(regions, availability_zones):
+ ec2_security_group_ids = []
+ ec2_subnet_ids = []
+ for region in regions:
+ aws_region = AwsRegion(region_name=region)
+ for availability_zone in availability_zones:
+ sct_subnet = aws_region.sct_subnet(region_az=region + availability_zone)
+ assert sct_subnet, f"No SCT subnet configured for {region}! Run 'hydra prepare-aws-region'"
+ ec2_subnet_ids.append(sct_subnet.subnet_id)
+ sct_sg = aws_region.sct_security_group
+ assert sct_sg, f"No SCT security group configured for {region}! Run 'hydra prepare-aws-region'"
+ ec2_security_group_ids.append([sct_sg.group_id])
+ return ec2_security_group_ids, ec2_subnet_ids
+
+
+def get_ec2_services(regions):
+ services = []
+ for region in regions:
+ session = boto3.session.Session(region_name=region)
+ service = session.resource('ec2')
+ services.append(service)
+ return services
+
+
+def tags_as_ec2_tags(tags: Dict[str, str]) -> List[Dict[str, str]]:
+ return [{"Key": key, "Value": value} for key, value in tags.items()]
+
+
+class PublicIpNotReady(Exception):
+ pass
+
+
+@retrying(n=7, sleep_time=10, allowed_exceptions=(PublicIpNotReady,),
+ message="Waiting for instance to get public ip")
+def ec2_instance_wait_public_ip(instance):
+ instance.reload()
+ if instance.public_ip_address is None:
+ raise PublicIpNotReady(instance)
+ LOGGER.debug(f"[{instance}] Got public ip: {instance.public_ip_address}")
+
+
+def ec2_ami_get_root_device_name(image_id, region):
+ ec2 = boto3.resource('ec2', region)
+ image = ec2.Image(image_id)
+ try:
+ if image.root_device_name:
+ return image.root_device_name
+ except (TypeError, ClientError):
+ raise AssertionError(f"Image '{image_id}' details not found in '{region}'")
diff --git a/sdcm/utils/common.py b/sdcm/utils/common.py
--- a/sdcm/utils/common.py
+++ b/sdcm/utils/common.py
@@ -20,7 +20,6 @@
import os
import logging
import random
-import re
import socket
import time
import datetime
@@ -37,7 +36,7 @@
import requests
import uuid
import zipfile
-from typing import Iterable, List, Callable, Optional, Dict, Union, Literal, Tuple, Any
+from typing import Iterable, List, Callable, Optional, Dict, Union, Literal, Any
from urllib.parse import urlparse
from unittest.mock import Mock

@@ -51,7 +50,6 @@
import boto3
from mypy_boto3_s3 import S3Client, S3ServiceResource
from mypy_boto3_ec2 import EC2Client, EC2ServiceResource
-from botocore.exceptions import ClientError
import docker # pylint: disable=wrong-import-order; false warning because of docker import (local file vs. package)
import libcloud.storage.providers
import libcloud.storage.types
@@ -60,11 +58,11 @@
from libcloud.compute.types import Provider
from packaging.version import Version

+from sdcm.utils.aws_utils import EksClusterCleanupMixin
from sdcm.utils.ssh_agent import SSHAgent
from sdcm.utils.decorators import retrying
from sdcm import wait

-
LOGGER = logging.getLogger('utils')
DEFAULT_AWS_REGION = "eu-west-1"
DOCKER_CGROUP_RE = re.compile("/docker/([0-9a-f]+)")
@@ -481,6 +479,7 @@ def clean_cloud_resources(tags_dict, dry_run=False):
clean_instances_aws(tags_dict, dry_run=dry_run)
clean_elastic_ips_aws(tags_dict, dry_run=dry_run)
clean_clusters_gke(tags_dict, dry_run=dry_run)
+ clean_clusters_eks(tags_dict, dry_run=dry_run)
clean_instances_gce(tags_dict, dry_run=dry_run)
clean_resources_docker(tags_dict, dry_run=dry_run)
return True
@@ -964,6 +963,64 @@ def __del__(self):
return clusters


+class EksCluster(EksClusterCleanupMixin):
+ def __init__(self, name: str, region: str):
+ self.short_cluster_name = name
+ self.name = name
+ self.region_name = region
+ self.body = self.eks_client.describe_cluster(name=name)['cluster']
+
+ @cached_property
+ def extra(self) -> dict:
+ metadata = self.body['tags'].items()
+ return {"metadata": {"items": [{"key": key, "value": value} for key, value in metadata], }, }
+
+ @cached_property
+ def create_time(self):
+ return self.body['createdAt']
+
+
+def list_clusters_eks(tags_dict: Optional[dict] = None, verbose: bool = False) -> List[EksCluster]:
+
+ class EksCleaner:
+ name = f"eks-cleaner-{uuid.uuid4()!s:.8}"
+ _containers = {}
+ tags = {}
+
+ @cached_property
+ def eks_client(self):
+ return
+
+ def list_clusters(self) -> list:
+ eks_clusters = []
+ for aws_region in all_aws_regions():
+ try:
+ cluster_names = boto3.client('eks', region_name=aws_region).list_clusters()['clusters']
+ except Exception as exc:
+ LOGGER.error("Failed to get list of clusters on EKS: %s", exc)
+ return []
+ for cluster_name in cluster_names:
+ try:
+ eks_clusters.append(EksCluster(cluster_name, aws_region))
+ except Exception as exc:
+ LOGGER.error("Failed to get body of cluster on EKS: %s", exc)
+ return eks_clusters
+
+ clusters = EksCleaner().list_clusters()
+
+ if 'NodeType' in tags_dict:
+ tags_dict = tags_dict.copy()
+ del tags_dict['NodeType']
+
+ if tags_dict:
+ clusters = filter_gce_by_tags(tags_dict=tags_dict, instances=clusters)
+
+ if verbose:
+ LOGGER.info("Done. Found total of %s GKE clusters.", len(clusters))
+
+ return clusters
+
+
def clean_instances_gce(tags_dict, dry_run=False):
"""
Remove all instances with specific tags GCE
@@ -1000,12 +1057,31 @@ def delete_cluster(cluster):
if not dry_run:
try:
res = cluster.destroy()
+ LOGGER.info("%s deleted=%s", cluster.name, res)
except Exception as exc:
LOGGER.error(exc)
- LOGGER.info("%s deleted=%s", cluster.name, res)
ParallelObject(gke_clusters_to_clean, timeout=180).run(delete_cluster, ignore_exceptions=True)


+def clean_clusters_eks(tags_dict: dict, dry_run: bool = False) -> None:
+ assert tags_dict, "tags_dict not provided (can't clean all clusters)"
+ eks_clusters_to_clean = list_clusters_eks(tags_dict=tags_dict)
+
+ if not eks_clusters_to_clean:
+ LOGGER.info("There are no clusters to remove in EKS")
+ return
+
+ def delete_cluster(cluster):
+ LOGGER.info("Going to delete: %s", cluster.name)
+ if not dry_run:
+ try:
+ res = cluster.destroy()
+ LOGGER.info("%s deleted=%s", cluster.name, res)
+ except Exception as exc:
+ LOGGER.error(exc)
+ ParallelObject(eks_clusters_to_clean, timeout=180).run(delete_cluster, ignore_exceptions=True)
+
+
_SCYLLA_AMI_CACHE = defaultdict(dict)


@@ -2036,29 +2112,6 @@ def clean_enospc_on_node(target_node, sleep_time):
target_node.wait_db_up()


-class PublicIpNotReady(Exception):
- pass
-
-
-@retrying(n=7, sleep_time=10, allowed_exceptions=(PublicIpNotReady,),
- message="Waiting for instance to get public ip")
-def ec2_instance_wait_public_ip(instance):
- instance.reload()
- if instance.public_ip_address is None:
- raise PublicIpNotReady(instance)
- LOGGER.debug(f"[{instance}] Got public ip: {instance.public_ip_address}")
-
-
-def ec2_ami_get_root_device_name(image_id, region):
- ec2 = boto3.resource('ec2', region)
- image = ec2.Image(image_id)
- try:
- if image.root_device_name:
- return image.root_device_name
- except (TypeError, ClientError):
- raise AssertionError(f"Image '{image_id}' details not found in '{region}'")
-
-
def parse_nodetool_listsnapshots(listsnapshots_output: str) -> defaultdict:
"""
listsnapshots output:
diff --git a/sdcm/utils/docker_utils.py b/sdcm/utils/docker_utils.py
--- a/sdcm/utils/docker_utils.py
+++ b/sdcm/utils/docker_utils.py
@@ -27,7 +27,7 @@
from sdcm.keystore import pub_key_from_private_key_file
from sdcm.utils.common import deprecation
from sdcm.utils.decorators import retrying, Retry
-
+from sdcm.wait import wait_for

DOCKER_API_CALL_TIMEOUT = 180 # seconds

@@ -203,6 +203,7 @@ def run_container(cls, instance: object, name: str, **extra_run_args) -> Contain
elif container.status != 'running':
LOGGER.warning("Re-run container %s", container)
container.start()
+ LOGGER.info('Cotainer %s status %s', container, container.status)
else:
LOGGER.debug("Container %s is running already.", container)
return container
diff --git a/sdcm/utils/gce_utils.py b/sdcm/utils/gce_utils.py
--- a/sdcm/utils/gce_utils.py
+++ b/sdcm/utils/gce_utils.py
@@ -55,8 +55,12 @@ def __init__(self, instance: 'GcloudContainerMixin', name: str):
self._instance = instance
self._name = name
self._container = None
+ self._span_counter = 0

def _span_container(self):
+ self._span_counter += 1
+ if self._container:
+ return
try:
self._container = self._instance._get_gcloud_container() # pylint: disable=protected-access
except Exception as exc:
@@ -67,6 +71,9 @@ def _span_container(self):
raise exc from None

def _destroy_container(self):
+ self._span_counter -= 1
+ if self._span_counter != 0:
+ return
try:
ContainerManager.destroy_container(self._instance, self._name)
except Exception: # pylint: disable=broad-except
@@ -103,17 +110,19 @@ class GcloudContainerMixin:
_gcloud_container_instance = None

def gcloud_container_run_args(self) -> dict:
- kube_config_path = os.path.expanduser(os.environ.get('KUBECONFIG', '~/.kube/config'))
+ kube_config_path = os.environ.get('KUBECONFIG', '~/.kube/config')
+ kube_config_dir_path = os.path.expanduser(kube_config_path)
volumes = {
- os.path.dirname(kube_config_path): {"bind": "/.kube", "mode": "rw"}
+ os.path.dirname(kube_config_dir_path): {"bind": os.path.dirname(kube_config_dir_path), "mode": "rw"},
}
return dict(image=GOOGLE_CLOUD_SDK_IMAGE,
command="cat",
tty=True,
name=f"{self.name}-gcloud",
volumes=volumes,
user=f"{os.getuid()}:{os.getgid()}",
- tmpfs={'/.config': f'size=50M,uid={os.getuid()}'}
+ tmpfs={'/.config': f'size=50M,uid={os.getuid()}'},
+ environment={'KUBECONFIG': kube_config_path},
)

def _get_gcloud_container(self) -> Container:
@@ -141,34 +150,3 @@ def _get_gcloud_container(self) -> Container:
@property
def gcloud(self) -> GcloudContextManager:
return GcloudContextManager(self, 'gcloud')
-
-
-class GcloudTokenUpdateThread(threading.Thread):
- update_period = 1800
-
- def __init__(self, gcloud: GcloudContextManager, config_path: str, token_min_duration: int = 60):
- self._gcloud = gcloud
- self._config_path = config_path
- self._token_min_duration = token_min_duration
- self._termination_event = threading.Event()
- super().__init__(daemon=True)
-
- def run(self):
- wait_time = 0.01
- while not self._termination_event.wait(wait_time):
- try:
- gcloud_config = self._gcloud.run(
- f'config config-helper --min-expiry={self._token_min_duration * 60} --format=json')
- with open(self._config_path, 'w') as gcloud_config_file:
- gcloud_config_file.write(gcloud_config)
- gcloud_config_file.flush()
- LOGGER.debug('Gcloud token has been updated and stored at %s', self._config_path)
- except Exception as exc: # pylint: disable=broad-except
- LOGGER.debug('Failed to read gcloud config: %s', exc)
- wait_time = 5
- else:
- wait_time = self.update_period
-
- def stop(self, timeout=None):
- self._termination_event.set()
- self.join(timeout)
diff --git a/sdcm/utils/k8s.py b/sdcm/utils/k8s.py
--- a/sdcm/utils/k8s.py
+++ b/sdcm/utils/k8s.py
@@ -12,23 +12,29 @@
# Copyright (c) 2020 ScyllaDB

# pylint: disable=too-many-arguments
-
+import abc
import os
import time
import queue
import logging
+import re
import threading
import multiprocessing
import contextlib
-from typing import Optional
+from tempfile import NamedTemporaryFile
+from typing import Optional, Union, Callable, List
from functools import cached_property

import kubernetes as k8s
+import yaml
+from kubernetes.client import V1ObjectMeta, V1Service, V1ServiceSpec, V1ContainerPort, \
+ V1ServicePort
+from paramiko.config import invoke
from urllib3.util.retry import Retry

from sdcm import sct_abs_path
from sdcm.remote import LOCALRUNNER
-from sdcm.utils.decorators import timeout as timeout_decor
+from sdcm.utils.decorators import timeout as timeout_decor, timeout, retrying
from sdcm.utils.docker_utils import ContainerManager, DockerException, Container
from sdcm.wait import wait_for

@@ -39,15 +45,148 @@
KUBECTL_TIMEOUT = 300 # seconds

K8S_CONFIGS_PATH_SCT = sct_abs_path("sdcm/k8s_configs")
-K8S_CONFIGS_PATH_IN_CONTAINER = "/apps/k8s_configs"

JSON_PATCH_TYPE = "application/json-patch+json"

LOGGER = logging.getLogger(__name__)
+K8S_MEM_CPU_RE = re.compile('^([0-9]+)([a-zA-Z]*)$')
+K8S_MEM_CONVERSION_MAP = {
+ 'e': lambda x: x * 1073741824,
+ 'p': lambda x: x * 1048576,
+ 't': lambda x: x * 1024,
+ 'g': lambda x: x,
+ 'm': lambda x: x / 1024,
+ 'k': lambda x: x / 1048576,
+ '': lambda x: x,
+}
+K8S_CPU_CONVERSION_MAP = {
+ 'm': lambda x: x / 1000,
+ '': lambda x: x,
+}

logging.getLogger("kubernetes.client.rest").setLevel(logging.INFO)


+class PortExposeService:
+ service_hostname: str = None
+ service_ip: str = None
+
+ def __init__(self,
+ name: str,
+ selector_value: str,
+ ports: List[V1ServicePort] = None,
+ selector_key: str = 'statefulset.kubernetes.io/pod-name',
+ namespace: str = 'default',
+ core_v1_api: k8s.client.CoreV1Api = None,
+ pod_container_name: str = None,
+ resolver: Callable = None
+ ):
+ if core_v1_api is None:
+ k8s_configuration = k8s.client.Configuration()
+ k8s.config.load_kube_config(
+ config_file=os.path.expanduser(os.environ.get('KUBECONFIG', '~/.kube/config')),
+ client_configuration=k8s_configuration)
+ self.core_v1_api = KubernetesOps.core_v1_api(KubernetesOps.api_client(k8s_configuration))
+ else:
+ self.core_v1_api = core_v1_api
+ self.ports = ports
+ self.selector_key = selector_key
+ self.selector_value = selector_value
+ self.namespace = namespace
+ self.name = name
+ self.pod_container_name = pod_container_name
+ self.resolver = resolver
+ self.is_deployed = False
+
+ @property
+ def target_pod(self):
+ for pod in self.core_v1_api.list_namespaced_pod(self.namespace).items:
+ if pod.metadata.labels.get(self.selector_key) == self.selector_value:
+ return pod
+ raise ValueError("Can't find target pod")
+
+ @property
+ def pod_ports(self) -> List[V1ContainerPort]:
+ ports = []
+ for container in self.target_pod.spec.containers:
+ if self.pod_container_name and container.name != self.pod_container_name:
+ continue
+ if container.ports:
+ ports.extend(container.ports)
+ return ports
+
+ @property
+ def ports_to_expose(self) -> List[V1ServicePort]:
+ if self.ports:
+ return self.ports
+ ports = []
+ for port in self.pod_ports:
+ if port.container_port is None:
+ continue
+ ports.append(V1ServicePort(
+ port=port.container_port,
+ target_port=port.container_port,
+ name=port.name,
+ protocol=port.protocol
+ ))
+ return ports
+
+ @property
+ def service_definition(self):
+ return V1Service(
+ metadata=V1ObjectMeta(
+ name=self.name,
+ namespace=self.namespace
+ ),
+ spec=V1ServiceSpec(
+ ports=self.ports_to_expose,
+ publish_not_ready_addresses=True,
+ session_affinity=None,
+ type='LoadBalancer',
+ selector={self.selector_key: self.selector_value}
+ )
+ )
+
+ def deploy(self):
+ events = KubernetesOps.watch_events(self.core_v1_api, name=self.name, namespace=self.namespace)
+ self.core_v1_api.create_namespaced_service(namespace=self.namespace, body=self.service_definition)
+ service_hostname = wait_for(self.get_service_hostname, timeout=300, throw_exc=False)
+ if not service_hostname:
+ error_message = "Failed to create load balancer %s, \n" \
+ "it can happen due to the lack of ip addresses in subnet, \n" \
+ "or due to the reaching limits on load balancers quota"
+ if events:
+ error_message += ', last events:\n' + ('\n'.join([event['object'].message for event in events]))
+ raise RuntimeError(error_message, self.name)
+ service_ip = self.get_service_ip()
+ if not service_ip:
+ raise RuntimeError("Failed to resolve hostname %s for load balancer %s", service_hostname, self.name)
+
+ self.service_hostname = service_hostname
+ self.service_ip = service_ip
+ self.is_deployed = True
+
+ @property
+ def service(self) -> V1Service:
+ services = self.core_v1_api.list_namespaced_service(
+ namespace=self.namespace,
+ field_selector=f"metadata.name={self.name}").items
+ if not services:
+ raise RuntimeError(f"No service with name %s found under namespace %s", self.name, self.namespace)
+ return services[0]
+
+ def get_service_hostname(self) -> str:
+ try:
+ return self.service.status.load_balancer.ingress[0].hostname
+ except Exception:
+ return ''
+
+ def get_service_ip(self):
+ if self.resolver is None:
+ raise ValueError("In order to get ip address you need to provide resolver")
+ return self.resolver(self.get_service_hostname(), timeout=300)
+
+
class ApiLimiterClient(k8s.client.ApiClient):
_api_rate_limiter: 'ApiCallRateLimiter' = None

@@ -153,7 +292,7 @@ def check_if_api_not_operational(self, kluster, num_requests=20):
passed += 1
except Exception: # pylint: disable=broad-except
time.sleep(1 / self.rate_limit)
- return passed > num_requests / 0.8
+ return passed < num_requests * 0.8

def stop(self):
self.running.clear()
@@ -189,7 +328,7 @@ def create_k8s_configuration(kluster) -> k8s.client.Configuration:
k8s_configuration.host = kluster.k8s_server_url
else:
k8s.config.load_kube_config(
- config_file=os.environ.get('KUBECONFIG', '~/.kube/config'),
+ config_file=os.path.expanduser(os.environ.get('KUBECONFIG', '~/.kube/config')),
client_configuration=k8s_configuration)
return k8s_configuration

@@ -227,11 +366,16 @@ def list_pods(cls, kluster, namespace=None, **kwargs):
return kluster.k8s_core_v1_api.list_pod_for_all_namespaces(watch=False, **kwargs).items
return kluster.k8s_core_v1_api.list_namespaced_pod(namespace=namespace, watch=False, **kwargs).items

+ @classmethod
+ @timeout_decor(timeout=600)
+ def get_node(cls, kluster, name, **kwargs):
+ return kluster.k8s_core_v1_api.read_node(name, **kwargs)
+
@classmethod
@timeout_decor(timeout=600)
def list_services(cls, kluster, namespace=None, **kwargs):
if namespace is None:
- return kluster.k8s_core_v1_api.list_service_all_namespaces(watch=False, **kwargs).items
+ return kluster.k8s_core_v1_api.list_service_for_all_namespaces(watch=False, **kwargs).items
return kluster.k8s_core_v1_api.list_namespaced_service(namespace=namespace, watch=False, **kwargs).items

@staticmethod
@@ -270,10 +414,41 @@ def kubectl_multi_cmd(cls, kluster, *command, namespace: Optional[str] = None, t
return remoter.run(final_command, timeout=timeout, ignore_status=ignore_status, verbose=verbose)

@classmethod
- def apply_file(cls, kluster, config_path, namespace=None, timeout=KUBECTL_TIMEOUT, envsubst=True):
- if envsubst:
- config_path = f"<(envsubst<{config_path})"
- cls.kubectl(kluster, "apply", "-f", config_path, namespace=namespace, timeout=timeout)
+ def apply_file(cls, kluster, config_path, namespace=None, timeout=KUBECTL_TIMEOUT, environ=None, envsubst=True,
+ modifiers: List[Callable] = None):
+ if environ:
+ environ_str = (' '.join([f'{name}="{value}"' for name, value in environ.items()])) + ' '
+ else:
+ environ_str = ''
+
+ with NamedTemporaryFile(mode='tw') as temp_file:
+ resulted_content = []
+ if envsubst:
+ data = LOCALRUNNER.run(f'{environ_str}envsubst<{config_path}', verbose=False).stdout
+ else:
+ with open(config_path, 'r') as file:
+ data = file.read()
+ file_content = yaml.load_all(data)
+
+ for doc in file_content:
+ if modifiers:
+ for modifier in modifiers:
+ modifier(doc)
+ resulted_content.append(doc)
+ temp_file.write(yaml.safe_dump_all(resulted_content))
+ temp_file.flush()
+
+ @retrying(n=0, sleep_time=5, timeout=timeout, allowed_exceptions=RuntimeError)
+ def run_kubectl():
+ try:
+ cls.kubectl(kluster, "apply", "-f", temp_file.name, namespace=namespace, timeout=timeout)
+ except invoke.exceptions.UnexpectedExit as exc:
+ if 'did you specify the right host or port' in exc.result.stderr:
+ raise RuntimeError(str(exc)) from None
+ else:
+ raise
+
+ run_kubectl()

@classmethod
def copy_file(cls, kluster, src, dst, container=None, timeout=KUBECTL_TIMEOUT):
@@ -297,38 +472,106 @@ def expose_pod_ports(cls, kluster, pod_name, ports, labels=None, selector=None,
def unexpose_pod_ports(cls, kluster, pod_name, namespace=None, timeout=KUBECTL_TIMEOUT):
cls.kubectl(kluster, f"delete service {pod_name}-loadbalancer", namespace=namespace, timeout=timeout)

+ @classmethod
+ def get_kubectl_auth_config_for_first_user(cls, config):
+ for user in config["users"]:
+ for auth_type in ['exec', 'auth-provider']:
+ if auth_type in user["user"]:
+ return auth_type, user["user"][auth_type]
+ return None, None
+
+ @classmethod
+ def patch_kubectl_auth_config(cls, config, auth_type, cmd: str, args: list):
+ if auth_type == 'exec':
+ config['command'] = cmd
+ config['args'] = args
+ elif auth_type == 'auth-provider':
+ config['config']['cmd-args'] = ' '.join(args)
+ config['config']['cmd-path'] = cmd
+ else:
+ raise ValueError('Unknown auth-type %s', auth_type)
+
+ @staticmethod
+ def wait_for_pods_readiness(kluster, total_pods: Union[int, Callable], readiness_timeout: float, namespace: str,
+ sleep: int = 10):
+ @timeout(message=f"Wait for {total_pods} pod(s) from {namespace} namespace to become ready...",
+ timeout=readiness_timeout * 60,
+ sleep_time=sleep)
+ def wait_cluster_is_ready():
+ # To make it more informative in worst case scenario made it repeat 5 times, by readiness_timeout // 5
+ result = kluster.kubectl(
+ f"wait --timeout={readiness_timeout // 5}m --all --for=condition=Ready pod",
+ namespace=namespace,
+ timeout=readiness_timeout // 5 * 60 + 10)
+ count = result.stdout.count('condition met')
+ if isinstance(total_pods, (int, float)):
+ if total_pods != count:
+ raise RuntimeError('Not all pods reported')
+ elif callable(total_pods):
+ if not total_pods(count):
+ raise RuntimeError('Not all pods reported')
+
+ wait_cluster_is_ready()
+
+ @classmethod
+ def patch_kube_config(cls, static_token_path, kube_config_path: str = None) -> None:
+ # It assumes that config is already created by gcloud
+ # It patches kube config so that instead of running gcloud each time
+ # we will get it's output from the cache file located at gcloud_token_path
+ # To keep this cache file updated we run GcloudTokenUpdateThread thread
+ if kube_config_path is None:
+ kube_config_path = os.path.expanduser(os.environ.get('KUBECONFIG', '~/.kube/config'))
+ LOGGER.debug("Patch %s to use file token %s", kube_config_path, static_token_path)
+
+ with open(kube_config_path) as kube_config:
+ data = yaml.safe_load(kube_config)
+ auth_type, user_config = KubernetesOps.get_kubectl_auth_config_for_first_user(data)
+
+ if user_config is None:
+ raise RuntimeError(f"Unable to find user configuration in ~/.kube/config")
+ KubernetesOps.patch_kubectl_auth_config(user_config, auth_type, "cat", [static_token_path])
+
+ with open(kube_config_path, "w") as kube_config:
+ yaml.safe_dump(data, kube_config)
+
+ LOGGER.debug(f'Patched kubectl config at {kube_config_path} with static kubectl token from {static_token_path}')
+
+ @classmethod
+ def watch_events(cls, k8s_core_v1_api: k8s.client.CoreV1Api, name: str = None, namespace: str = None,
+ timeout: int = None):
+ field_selector = f'involvedObject.name={name}' if name is not None else None
+ if namespace is None:
+ return k8s.watch.Watch().stream(k8s_core_v1_api.list_event_for_all_namespaces,
+ field_selector=field_selector, timeout_seconds=timeout)
+ return k8s.watch.Watch().stream(k8s_core_v1_api.list_namespaced_event, namespace=namespace,
+ field_selector=field_selector, timeout_seconds=timeout)
+

class HelmContainerMixin:
def helm_container_run_args(self) -> dict:
- kube_config_path = os.path.expanduser(os.environ.get('KUBECONFIG', '~/.kube/config'))
+ kube_config_path = os.environ.get('KUBECONFIG', '~/.kube/config')
+ kube_config_dir_path = os.path.expanduser(kube_config_path)
helm_config_path = os.path.expanduser(os.environ.get('HELM_CONFIG_HOME', '~/.helm'))
volumes = {
- os.path.dirname(kube_config_path): {"bind": "/root/.kube", "mode": "rw"},
+ os.path.dirname(kube_config_dir_path): {"bind": os.path.dirname(kube_config_dir_path), "mode": "rw"},
helm_config_path: {"bind": "/root/.helm", "mode": "rw"},
- K8S_CONFIGS_PATH_SCT: {"bind": K8S_CONFIGS_PATH_IN_CONTAINER, "mode": "ro"},
+ sct_abs_path(""): {"bind": sct_abs_path(""), "mode": "ro"},
'/tmp': {"bind": "/tmp", "mode": "rw"},
}
return dict(image=HELM_IMAGE,
entrypoint="/bin/cat",
tty=True,
name=f"{self.name}-helm",
network_mode="host",
- volumes=volumes)
+ volumes=volumes,
+ environment={'KUBECONFIG': kube_config_path},
+ )

@cached_property
def _helm_container(self) -> Container:
- container = ContainerManager.run_container(self, "helm")
+ return ContainerManager.run_container(self, "helm")

- # NOTE: Install 'gettext' package that contains 'envsubst' binary
- # needed for installation of helm charts uisng "values.yaml" files.
- res = container.exec_run(["sh", "-c", "apk add gettext"])
- if res.exit_code:
- raise DockerException(f"{container}: {res.output.decode('utf-8')}")
-
- return container
-
- def helm(self, kluster, *command: str, namespace: Optional[str] = None,
- prepend_command=None) -> str:
+ def helm(self, kluster, *command: str, namespace: Optional[str] = None, values: 'HelmValues' = None, prepend_command=None) -> str:
cmd = ["helm", ]
if prepend_command:
if isinstance(prepend_command, list):
@@ -339,42 +582,231 @@ def helm(self, kluster, *command: str, namespace: Optional[str] = None,
cmd.extend(("--kube-apiserver", kluster.k8s_server_url, ))
if namespace:
cmd.extend(("--namespace", namespace, ))
+ values_file = None
cmd.extend(command)
+
+ if values:
+ helm_values_file = NamedTemporaryFile(mode='tw')
+ helm_values_file.write(yaml.safe_dump(values.as_dict()))
+ helm_values_file.flush()
+ cmd.extend(("-f", helm_values_file.name))
+ values_file = helm_values_file
+
cmd = " ".join(cmd)
- environment = [f"SCT_{k.upper()}={v}" for k, v in kluster.params.items()]

LOGGER.debug("Execute `%s'", cmd)
- res = self._helm_container.exec_run(["sh", "-c", cmd], environment=environment)
- if res.exit_code:
- raise DockerException(f"{self._helm_container}: {res.output.decode('utf-8')}")
- return res.output.decode("utf-8")
+ try:
+ res = self._helm_container.exec_run(["sh", "-c", cmd])
+ if res.exit_code:
+ raise DockerException(f"{self._helm_container}: {res.output.decode('utf-8')}")
+ return res.output.decode("utf-8")
+ finally:
+ if values_file:
+ values_file.close()

def helm_install(self, kluster,
target_chart_name: str,
source_chart_name: str,
version: str = "",
use_devel: bool = False,
- set_options: str = "",
- values_file_path: str = "",
+ values: 'HelmValues' = None,
namespace: Optional[str] = None) -> str:
command = ["install", target_chart_name, source_chart_name]
prepend_command = []
if version:
command.extend(("--version", version))
if use_devel:
command.extend(("--devel",))
- if set_options:
- command.extend(("--set", set_options))
- if values_file_path:
- # NOTE: It must look like following:
- # $ envsubst < {values_file_path} | helm install \
- # scylla scylla-operator/scylla --version {v} --devel --values -
- command.extend(("--values", "-"))
- prepend_command = ["envsubst", "<", values_file_path, "|"]

return self.helm(
kluster,
*command,
prepend_command=prepend_command,
namespace=namespace,
+ values=values
)
+
+
+class TokenUpdateThread(threading.Thread, metaclass=abc.ABCMeta):
+ update_period = 1800
+
+ def __init__(self, kubectl_token_path: str):
+ self._kubectl_token_path = kubectl_token_path
+ self._termination_event = threading.Event()
+ super().__init__(daemon=True, name=self.__class__.name)
+
+ def run(self):
+ wait_time = 0.01
+ while not self._termination_event.wait(wait_time):
+ try:
+ mode = 'r+' if os.path.exists(self._kubectl_token_path) else 'w'
+ with open(self._kubectl_token_path, mode) as gcloud_config_file:
+ gcloud_config_file.write(self.get_token())
+ gcloud_config_file.flush()
+ LOGGER.debug('Cloud token has been updated and stored at %s', self._kubectl_token_path)
+ except Exception as exc: # pylint: disable=broad-except
+ LOGGER.debug('Failed to read gcloud config: %s', exc)
+ wait_time = 5
+ else:
+ wait_time = self.update_period
+
+ @abc.abstractmethod
+ def get_token(self) -> str:
+ pass
+
+ def stop(self, timeout=None):
+ self._termination_event.set()
+ self.join(timeout)
+
+
+def convert_cpu_units_to_k8s_value(cpu: Union[float, int]) -> str:
+ if isinstance(cpu, float):
+ if not cpu.is_integer():
+ return f'{int(cpu * 1000)}m'
+ return f'{int(cpu)}'
+
+
+def convert_memory_units_to_k8s_value(memory: Union[float, int]) -> str:
+ if isinstance(memory, float):
+ if not memory.is_integer():
+ return f'{int(memory * 1024)}Mi'
+ return f'{int(memory)}Gi'
+
+
+def convert_memory_value_from_k8s_to_units(memory: str) -> float:
+ match = K8S_MEM_CPU_RE.match(memory).groups()
+ if len(match) == 1:
+ value = int(match[0])
+ units = 'gb'
+ else:
+ value = int(match[0])
+ units = match[1].lower().rstrip('ib')
+ convertor = K8S_MEM_CONVERSION_MAP.get(units)
+ if convertor is None:
+ raise ValueError('Unknown memory units %s', units)
+ return float(convertor(value))
+
+
+def convert_cpu_value_from_k8s_to_units(cpu: str) -> float:
+ match = K8S_MEM_CPU_RE.match(cpu).groups()
+ if len(match) == 1:
+ value = float(match[0])
+ units = ''
+ else:
+ value = float(match[0])
+ units = match[1].lower()
+ convertor = K8S_CPU_CONVERSION_MAP.get(units)
+ if convertor is None:
+ raise ValueError('Unknown cpu units %s', units)
+ return float(convertor(value))
+
+
+def add_pool_node_affinity(value, pool_label_name, pool_name):
+ target_selector_term = {'matchExpressions': [{'operator': 'In', 'values': [pool_name], 'key': pool_label_name}]}
+ value['nodeAffinity'] = node_affinity = value.get('nodeAffinity', {})
+ node_affinity['requiredDuringSchedulingIgnoredDuringExecution'] = required_during = \
+ node_affinity.get('requiredDuringSchedulingIgnoredDuringExecution', {})
+ required_during['nodeSelectorTerms'] = node_selectors = required_during.get('nodeSelectorTerms', [])
+
+ exists = False
+ for node_selector in node_selectors:
+ if node_selector == target_selector_term:
+ exists = True
+ break
+
+ if not exists:
+ node_selectors.append(target_selector_term)
+
+ return value
+
+
+def get_helm_pool_affinity_values(pool_label_name, pool_name):
+ return {'affinity': add_pool_node_affinity({}, pool_label_name, pool_name)}
+
+
+def get_pool_affinity_modifiers(pool_label_name, pool_name):
+ def add_statefulset_or_daemonset_pool_affinity(x):
+ if x['kind'] in ['StatefulSet', 'DaemonSet']:
+ x['spec']['template']['spec']['affinity'] = add_pool_node_affinity(
+ x['spec']['template']['spec'].get('affinity', {}),
+ pool_label_name,
+ pool_name)
+
+ def add_scylla_cluster_pool_affinity(x):
+ if x['kind'] == 'ScyllaCluster':
+ for rack in x['spec']['datacenter']['racks']:
+ rack['placement'] = add_pool_node_affinity(
+ rack.get('placement', {}),
+ pool_label_name,
+ pool_name)
+
+ return [add_statefulset_or_daemonset_pool_affinity, add_scylla_cluster_pool_affinity]
+
+
+class HelmValues:
+ def __init__(self, *args, **kwargs):
+ if len(args) == 1 and type(args[0]) is dict:
+ self._data = args[0]
+ else:
+ self._data = dict(**kwargs)
+
+ def get(self, path):
+ current = self._data
+ for name in path.split('.'):
+ if current is None:
+ return None
+ if name.isalnum() and isinstance(current, (list, tuple, set)):
+ try:
+ current = current[int(name)]
+ except Exception:
+ current = None
+ continue
+ current = current.get(name, None)
+ return current
+
+ def set(self, path, value):
+ current = self._data
+ chain = []
+ types = []
+ for attr in path.split('.'):
+ types.append(dict)
+ if attr[-1] == ']':
+ idx = attr.find('[')
+ chain.extend([attr[0:idx], int(attr[idx + 1:-1])])
+ types.append(list)
+ continue
+ chain.append(attr)
+
+ # last_item = chain.pop()
+ types.pop(0)
+ types.append(None)
+
+ for num, (attr, next_item_type) in enumerate(zip(chain, types)):
+ try:
+ if isinstance(attr, int) and isinstance(current, dict):
+ raise TypeError()
+ attr_value = current[attr]
+ except (KeyError, IndexError):
+ attr_value = None
+ except TypeError:
+ raise ValueError("Wrong type provided at section")
+
+ if None is attr_value:
+ if next_item_type is None:
+ attr_value = value
+ else:
+ attr_value = next_item_type()
+ if type(current) is dict:
+ current[attr] = attr_value
+ else:
+ if attr > len(current):
+ raise ValueError("Can add items to tail of the list only")
+ elif attr == len(current):
+ current.append(attr_value)
+ else:
+ current[attr] = attr_value
+
+ current = attr_value
+
+ def as_dict(self):
+ return self._data
diff --git a/test-cases/scylla-operator/kubernetes-scylla-upgrade-minikube.yaml b/test-cases/scylla-operator/kubernetes-scylla-upgrade-minikube.yaml
--- a/test-cases/scylla-operator/kubernetes-scylla-upgrade-minikube.yaml
+++ b/test-cases/scylla-operator/kubernetes-scylla-upgrade-minikube.yaml
@@ -8,8 +8,6 @@ stress_cmd_w: cassandra-stress write no-warmup cl=QUORUM n=2010020 -schema 'keys

gce_instance_type_minikube: 'n1-highmem-16'
gce_root_disk_size_minikube: 240
-k8s_scylla_cpu_n: 2
-k8s_scylla_mem_gi: 8
k8s_scylla_disk_gi: 30

n_db_nodes: 3
diff --git a/test-cases/scylla-operator/longevity-scylla-operator-3h-minikube.yaml b/test-cases/scylla-operator/longevity-scylla-operator-3h-minikube.yaml
--- a/test-cases/scylla-operator/longevity-scylla-operator-3h-minikube.yaml
+++ b/test-cases/scylla-operator/longevity-scylla-operator-3h-minikube.yaml
@@ -5,8 +5,6 @@ stress_cmd: ["cassandra-stress write cl=QUORUM duration=180m -schema 'replicatio

gce_instance_type_minikube: 'n1-highmem-16'
gce_root_disk_size_minikube: 240
-k8s_scylla_cpu_n: 2
-k8s_scylla_mem_gi: 8
k8s_scylla_disk_gi: 30

n_db_nodes: 3
diff --git a/unit_tests/test_remoter.py b/unit_tests/test_remoter.py
--- a/unit_tests/test_remoter.py
+++ b/unit_tests/test_remoter.py
@@ -37,6 +37,18 @@ class FakeKluster(KubernetesCluster):
def __init__(self, k8s_server_url):
self.k8s_server_url = k8s_server_url

+ def deploy(self):
+ pass
+
+ def create_kubectl_config(self):
+ pass
+
+ def create_token_update_thread(self):
+ pass
+
+ def deploy_node_pool(self, pool, wait_till_ready=True) -> None:
+ pass
+

for ip in ['::1', '127.0.0.1']:
for cmd in [
diff --git a/vars/byoOperatorPipeline.groovy b/vars/byoOperatorPipeline.groovy
--- a/vars/byoOperatorPipeline.groovy
+++ b/vars/byoOperatorPipeline.groovy
@@ -16,15 +16,15 @@ def call(Map pipelineParams) {
SCT_CLUSTER_BACKEND = 'k8s-gce-minikube'
}
parameters {
- string(defaultValue: '',
- description: '',
- name: 'k8s_scylla_operator_docker_image')
string(defaultValue: 'https://storage.googleapis.com/scylla-operator-charts/latest',
description: '',
name: 'k8s_scylla_operator_helm_repo')
string(defaultValue: 'latest',
description: '',
name: 'k8s_scylla_operator_chart_version')
+ string(defaultValue: '',
+ description: '',
+ name: 'k8s_scylla_operator_docker_image')
string(defaultValue: '4.0.0',
description: '',
name: 'scylla_version')
@@ -73,12 +73,11 @@ def call(Map pipelineParams) {
rm -fv ./latest

export SCT_CONFIG_FILES=${test_config}
- if [[ -n "${params.k8s_scylla_operator_docker_image}" ]]; then
- export SCT_K8S_SCYLLA_OPERATOR_DOCKER_IMAGE=${params.k8s_scylla_operator_docker_image}
- fi
- if [[ -n "${params.k8s_scylla_operator_helm_repo}" ]]; then
+
+ if [[ -n "${params.k8s_scylla_operator_helm_repo ? params.k8s_scylla_operator_helm_repo : ''}" ]] ; then
export SCT_K8S_SCYLLA_OPERATOR_HELM_REPO=${params.k8s_scylla_operator_helm_repo}
fi
+
if [[ -n "${params.k8s_scylla_operator_chart_version}" ]]; then
export SCT_K8S_SCYLLA_OPERATOR_CHART_VERSION=${params.k8s_scylla_operator_chart_version}
fi
diff --git a/vars/createSctRunner.groovy b/vars/createSctRunner.groovy
--- a/vars/createSctRunner.groovy
+++ b/vars/createSctRunner.groovy
@@ -1,8 +1,8 @@
#!groovy

def call(Map params, Integer test_duration, String region) {
+ def cloud_provider = getCloudProviderFromBackend(params.backend)

- def cloud_provider = params.backend.trim().toLowerCase()
println(params)
sh """
#!/bin/bash
diff --git a/vars/getCloudProviderFromBackend.groovy b/vars/getCloudProviderFromBackend.groovy
--- a/vars/getCloudProviderFromBackend.groovy
+++ b/vars/getCloudProviderFromBackend.groovy
@@ -0,0 +1,10 @@
+
+def call(String backend) {
+ def backend_to_provider = [
+ 'k8s-eks': 'aws',
+ 'k8s-gke': 'gce',
+ 'k8s-gce-minikube': 'gce',
+ ]
+ def cloud_provider = backend.trim().toLowerCase()
+ return backend_to_provider.get(cloud_provider, cloud_provider)
+ }
diff --git a/vars/getJenkinsLabels.groovy b/vars/getJenkinsLabels.groovy
--- a/vars/getJenkinsLabels.groovy
+++ b/vars/getJenkinsLabels.groovy
@@ -15,19 +15,19 @@ def call(String backend, String aws_region=null) {
'aws-eu-central-1': 'aws-sct-builders-eu-central-1',
'aws-us-east-1' : 'aws-sct-builders-us-east-1-new',
'gce': 'gce-sct-builders',
- 'k8s-gce-minikube': 'gce-sct-builders',
- 'k8s-gke': 'gce-sct-builders',
'docker': 'sct-builders']

- if (backend == 'aws' && aws_region)
+ def cloud_provider = getCloudProviderFromBackend(backend)
+
+ if (cloud_provider == 'aws' && aws_region)
{
println("Finding builder for AWS region: " + aws_region)
if (aws_region == "random"){
def aws_supported_regions = ["eu-west-2", "eu-north-1", "eu-central-1"]
Collections.shuffle(aws_supported_regions)
aws_region = aws_supported_regions[0]
}
- def cp_region = backend + "-" + aws_region
+ def cp_region = cloud_provider + "-" + aws_region
println("Checking if we have a label for " + cp_region)
def label = jenkins_labels.get(cp_region, null)
if (label != null){
@@ -41,6 +41,6 @@ def call(String backend, String aws_region=null) {
}
else
{
- return [ "label": jenkins_labels[backend] ]
+ return [ "label": jenkins_labels[cloud_provider] ]
}
}
diff --git a/vars/getJobTimeouts.groovy b/vars/getJobTimeouts.groovy
--- a/vars/getJobTimeouts.groovy
+++ b/vars/getJobTimeouts.groovy
@@ -7,16 +7,16 @@ List<Integer> call(Map params, String region){
#!/bin/bash
export SCT_CLUSTER_BACKEND="${params.backend}"
export SCT_CONFIG_FILES=${test_config}
+
if [[ -n "${params.k8s_scylla_operator_docker_image ? params.k8s_scylla_operator_docker_image : ''}" ]] ; then
export SCT_K8S_SCYLLA_OPERATOR_DOCKER_IMAGE=${params.k8s_scylla_operator_docker_image}
fi
- if [[ -n "${params.k8s_scylla_operator_helm_repo}" ]]; then
+ if [[ -n "${params.k8s_scylla_operator_helm_repo ? params.k8s_scylla_operator_helm_repo : ''}" ]] ; then
export SCT_K8S_SCYLLA_OPERATOR_HELM_REPO=${params.k8s_scylla_operator_helm_repo}
fi
- if [[ -n "${params.k8s_scylla_operator_chart_version}" ]]; then
+ if [[ -n "${params.k8s_scylla_operator_chart_version ? params.k8s_scylla_operator_chart_version : ''}" ]] ; then
export SCT_K8S_SCYLLA_OPERATOR_CHART_VERSION=${params.k8s_scylla_operator_chart_version}
fi
-
if [[ -n "${params.scylla_mgmt_agent_version ? params.scylla_mgmt_agent_version : ''}" ]] ; then
export SCT_SCYLLA_MGMT_AGENT_VERSION=${params.scylla_mgmt_agent_version}
fi
diff --git a/vars/longevityPipeline.groovy b/vars/longevityPipeline.groovy
--- a/vars/longevityPipeline.groovy
+++ b/vars/longevityPipeline.groovy
@@ -78,6 +78,19 @@ def call(Map pipelineParams) {
string(defaultValue: "${pipelineParams.get('test_name', '')}",
description: 'Name of the test to run',
name: 'test_name')
+
+ string(defaultValue: "${pipelineParams.get('k8s_scylla_operator_helm_repo', '')}",
+ description: 'Scylla Operator helm repo',
+ name: 'k8s_scylla_operator_helm_repo')
+
+ string(defaultValue: "${pipelineParams.get('k8s_scylla_operator_chart_version', '')}",
+ description: 'Scylla Operator helm chart version',
+ name: 'k8s_scylla_operator_chart_version')
+
+ string(defaultValue: "${pipelineParams.get('k8s_scylla_operator_docker_image', '')}",
+ description: 'Scylla Operator docker image',
+ name: 'k8s_scylla_operator_docker_image')
+
}
options {
timestamps()
diff --git a/vars/operatorPipeline.groovy b/vars/operatorPipeline.groovy
--- a/vars/operatorPipeline.groovy
+++ b/vars/operatorPipeline.groovy
@@ -2,7 +2,7 @@

def call(Map pipelineParams) {

- def builder = getJenkinsLabels('gce', null)
+ def builder = getJenkinsLabels(pipelineParams.backend, pipelineParams.aws_region)

pipeline {
agent {
@@ -16,18 +16,18 @@ def call(Map pipelineParams) {
SCT_CLUSTER_BACKEND = "${pipelineParams.get('backend', params.backend)}"
}
parameters {
- choice(choices: ['k8s-gke', 'k8s-gce-minikube'],
+ choice(choices: ['k8s-gke', 'k8s-eks', 'k8s-gce-minikube'],
description: '',
name: 'backend')
- string(defaultValue: '',
- description: '',
- name: 'k8s_scylla_operator_docker_image')
string(defaultValue: 'https://storage.googleapis.com/scylla-operator-charts/latest',
description: '',
name: 'k8s_scylla_operator_helm_repo')
string(defaultValue: 'latest',
description: '',
name: 'k8s_scylla_operator_chart_version')
+ string(defaultValue: '',
+ description: '',
+ name: 'k8s_scylla_operator_docker_image')
string(defaultValue: '',
description: '',
name: 'scylla_version')
@@ -68,19 +68,19 @@ def call(Map pipelineParams) {
rm -fv ./latest

export SCT_CONFIG_FILES=${pipelineParams.test_config}
- if [[ -n "${params.k8s_scylla_operator_docker_image}" ]]; then
+ if [[ -n "${params.k8s_scylla_operator_docker_image ? params.k8s_scylla_operator_docker_image : ''}" ]] ; then
export SCT_K8S_SCYLLA_OPERATOR_DOCKER_IMAGE=${params.k8s_scylla_operator_docker_image}
fi
- if [[ -n "${params.k8s_scylla_operator_helm_repo}" ]]; then
+ if [[ -n "${params.k8s_scylla_operator_helm_repo ? params.k8s_scylla_operator_helm_repo : ''}" ]] ; then
export SCT_K8S_SCYLLA_OPERATOR_HELM_REPO=${params.k8s_scylla_operator_helm_repo}
fi
- if [[ -n "${params.k8s_scylla_operator_chart_version}" ]]; then
+ if [[ -n "${params.k8s_scylla_operator_chart_version ? params.k8s_scylla_operator_chart_version : ''}" ]] ; then
export SCT_K8S_SCYLLA_OPERATOR_CHART_VERSION=${params.k8s_scylla_operator_chart_version}
fi
- if [[ -n "${params.scylla_version}" ]]; then
+ if [[ -n "${params.scylla_version ? params.scylla_version : ''}" ]] ; then
export SCT_SCYLLA_VERSION=${params.scylla_version}
fi
- if [[ -n "${params.scylla_mgmt_agent_version}" ]]; then
+ if [[ -n "${params.scylla_mgmt_agent_version ? params.scylla_mgmt_agent_version : ''}" ]] ; then
export SCT_SCYLLA_MGMT_AGENT_VERSION=${params.scylla_mgmt_agent_version}
fi

diff --git a/vars/runSctTest.groovy b/vars/runSctTest.groovy
--- a/vars/runSctTest.groovy
+++ b/vars/runSctTest.groovy
@@ -28,22 +28,23 @@ def call(Map params, String region){
if [[ -n "${params.k8s_scylla_operator_docker_image ? params.k8s_scylla_operator_docker_image : ''}" ]] ; then
export SCT_K8S_SCYLLA_OPERATOR_DOCKER_IMAGE=${params.k8s_scylla_operator_docker_image}
fi
- if [[ -n "${params.k8s_scylla_operator_helm_repo}" ]]; then
+ if [[ -n "${params.k8s_scylla_operator_helm_repo ? params.k8s_scylla_operator_helm_repo : ''}" ]] ; then
export SCT_K8S_SCYLLA_OPERATOR_HELM_REPO=${params.k8s_scylla_operator_helm_repo}
fi
- if [[ -n "${params.k8s_scylla_operator_chart_version}" ]]; then
+ if [[ -n "${params.k8s_scylla_operator_chart_version ? params.k8s_scylla_operator_chart_version : ''}" ]] ; then
export SCT_K8S_SCYLLA_OPERATOR_CHART_VERSION=${params.k8s_scylla_operator_chart_version}
fi

if [[ -n "${params.scylla_mgmt_agent_version ? params.scylla_mgmt_agent_version : ''}" ]] ; then
export SCT_SCYLLA_MGMT_AGENT_VERSION=${params.scylla_mgmt_agent_version}
fi

- if [[ -n "${params.scylla_ami_id ? params.scylla_ami_id : ''}" ]] ; then
+ if [[ "${params.backend ? params.backend : ''}" == *"k8s"* ]] ; then
+ echo "Kubernetes backend can have empty scylla version"
+ elif [[ -n "${params.scylla_ami_id ? params.scylla_ami_id : ''}" ]] ; then
export SCT_AMI_ID_DB_SCYLLA="${params.scylla_ami_id}"
elif [[ -n "${params.gce_image_db ? params.gce_image_db : ''}" ]] ; then
export SCT_GCE_IMAGE_DB="${params.gce_image_db}"
-
elif [[ -n "${params.scylla_version ? params.scylla_version : ''}" ]] ; then
export SCT_SCYLLA_VERSION="${params.scylla_version}"
elif [[ -n "${params.scylla_repo ? params.scylla_repo : ''}" ]] ; then
Reply all
Reply to author
Forward
0 new messages