[COMMIT scylla-cluster-tests master] improvement(parallel-operations): grow-shrink cluster

0 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Jun 27, 2024, 10:47:12 AMJun 27
to scylladb-dev@googlegroups.com, Lukasz Sojka
From: Lukasz Sojka <lukasz...@scylladb.com>
Committer: Israel Fruchter <israel....@gmail.com>
Branch: master

improvement(parallel-operations): grow-shrink cluster

Adding and decommissioning nodes should be run in parallel as this is
currently Scylla's default behavior.

Refactored code to support it. Can be disabled if
`parallel_node_operations` param is set to false.

---
diff --git a/sdcm/cluster.py b/sdcm/cluster.py
--- a/sdcm/cluster.py
+++ b/sdcm/cluster.py
@@ -3985,6 +3985,7 @@ def parallel_startup(self):
LOGGER.info("Starting nodes in parallel")
else:
LOGGER.info("Starting nodes sequentially")
+ self.parallel_node_operations = False # disable parallel operations regardless of test config

return parallel_startup

diff --git a/sdcm/cluster_aws.py b/sdcm/cluster_aws.py
--- a/sdcm/cluster_aws.py
+++ b/sdcm/cluster_aws.py
@@ -389,9 +389,21 @@ def add_nodes(self, count, ec2_user_data='', dc_idx=0, rack=0, enable_auto_boots
instance_dc = 0 if self.params.get("simulated_regions") else dc_idx
# if simulated_racks, create all instances in the same az
instance_az = 0 if self.params.get("simulated_racks") else rack
- instances = self._create_or_find_instances(
- count=count, ec2_user_data=ec2_user_data, dc_idx=instance_dc, az_idx=instance_az,
- instance_type=instance_type)
+ instances = []
+ if rack is None:
+ # define how many nodes should be created on each rack, e.g. for 3 nodes and 2 racks it will be [2, 1]
+ base = count // self.racks_count
+ extra = count % self.racks_count
+ rack_distribution = [base + 1 if i < extra else base for i in range(self.racks_count)]
+ else:
+ # otherwise create all nodes on the specified rack
+ rack_distribution = [count if i == rack else 0 for i in range(self.racks_count)]
+ self.log.info('rack distribution: %s', rack_distribution)
+ for rack_idx, rack_count in enumerate(rack_distribution):
+ if rack_count == 0:
+ continue # when provisioning fewer nodes than racks
+ instances.extend(self._create_or_find_instances(
+ count=rack_count, ec2_user_data=ec2_user_data, dc_idx=instance_dc, az_idx=rack_idx, instance_type=instance_type))
for node_index, instance in enumerate(instances):
self._node_index += 1
# in case rack is not specified, spread nodes to different racks
diff --git a/sdcm/nemesis.py b/sdcm/nemesis.py
--- a/sdcm/nemesis.py
+++ b/sdcm/nemesis.py
@@ -73,6 +73,7 @@
from sdcm.provision.scylla_yaml import SeedProvider
from sdcm.provision.helpers.certificate import update_certificate, TLSAssets
from sdcm.remote.libssh2_client.exceptions import UnexpectedExit as Libssh2UnexpectedExit
+from sdcm.rest.remote_curl_client import RemoteCurlClient
from sdcm.sct_events import Severity
from sdcm.sct_events.database import DatabaseLogEvent
from sdcm.sct_events.decorators import raise_event_on_failure
@@ -1224,8 +1225,8 @@ def add_ldap_configuration_to_node(node):
if not ContainerManager.is_running(self.tester.localhost, 'ldap'):
raise LdapNotRunning("LDAP server was supposed to be running, but it is not")

- def _add_and_init_new_cluster_node(self, old_node_ip=None, host_id=None,
- timeout=MAX_TIME_WAIT_FOR_NEW_NODE_UP, rack=0):
+ def _replace_cluster_node(self, old_node_ip=None, host_id=None,
+ timeout=MAX_TIME_WAIT_FOR_NEW_NODE_UP, rack=0):
"""When old_node_ip or host_id are not None then replacement node procedure is initiated"""
# TODO: make it work on K8S when we have decommissioned (by nodetool) nodes.
# Now it will fail because pod which hosts decommissioned Scylla member is reported
@@ -1264,6 +1265,35 @@ def _add_and_init_new_cluster_node(self, old_node_ip=None, host_id=None,
InfoEvent(message="FinishEvent - New Node is up and normal").publish()
return new_node

+ def _add_and_init_new_cluster_nodes(self, count, timeout=MAX_TIME_WAIT_FOR_NEW_NODE_UP, rack=None):
+ if rack is None and self._is_it_on_kubernetes():
+ rack = 0
+ self.log.info("Adding %s new nodes to cluster...", count)
+ InfoEvent(message=f'StartEvent - Adding {count} new nodes to cluster').publish()
+ new_nodes = self.cluster.add_nodes(
+ count=count, dc_idx=self.target_node.dc_idx, enable_auto_bootstrap=True, rack=rack)
+ self.monitoring_set.reconfigure_scylla_monitoring()
+ for new_node in new_nodes:
+ self.set_current_running_nemesis(node=new_node)
+ try:
+ with adaptive_timeout(Operations.NEW_NODE, node=self.cluster.nodes[0], timeout=timeout):
+ self.cluster.wait_for_init(node_list=new_nodes, timeout=timeout, check_node_health=False)
+ self.cluster.set_seeds()
+ self.cluster.update_seed_provider()
+ except (NodeSetupFailed, NodeSetupTimeout):
+ self.log.warning("TestConfig of the '%s' failed, removing them from list of nodes" % new_nodes)
+ for node in new_nodes:
+ self.cluster.nodes.remove(node)
+ self.log.warning("Nodes will not be terminated. Please terminate manually!!!")
+ raise
+ for new_node in new_nodes:
+ new_node.wait_native_transport()
+ self.cluster.wait_for_nodes_up_and_normal(nodes=new_nodes)
+ for new_node in new_nodes:
+ self.unset_current_running_nemesis(node=new_node)
+ InfoEvent(message="FinishEvent - New Nodes are up and normal").publish()
+ return new_nodes
+
@decorate_with_context(ignore_ycsb_connection_refused)
def _terminate_cluster_node(self, node):
self.cluster.terminate_node(node)
@@ -1278,7 +1308,7 @@ def _nodetool_decommission(self, add_node=True):
if add_node:
# When adding node after decommission the node is declared as up only after it completed bootstrapping,
# increasing the timeout for now
- new_node = self._add_and_init_new_cluster_node(rack=self.target_node.rack)
+ new_node = self._add_and_init_new_cluster_nodes(count=1, rack=self.target_node.rack)
# after decomission and add_node, the left nodes have data that isn't part of their tokens anymore.
# In order to eliminate cases that we miss a "data loss" bug because of it, we cleanup this data.
# This fix important when just user profile is run in the test and "keyspace1" doesn't exist.
@@ -1319,7 +1349,7 @@ def _terminate_and_wait(self, target_node, sleep_time=300):

@latency_calculator_decorator(legend="Replace a node in cluster with new one")
def replace_node(self, old_node_ip, host_id, rack=0):
- return self._add_and_init_new_cluster_node(old_node_ip, host_id, rack=rack)
+ return self._replace_cluster_node(old_node_ip, host_id, rack=rack)

def _verify_resharding_on_k8s(self, cpus, dc_idx):
nodes_data = []
@@ -1496,7 +1526,7 @@ def _disrupt_kubernetes_then_decommission_and_add_scylla_node(self, disruption_m
self.log.info('Decommission %s', node)
self.cluster.decommission(node, timeout=MAX_TIME_WAIT_FOR_DECOMMISSION)

- new_node = self.add_new_node(rack=node.rack)
+ new_node = self.add_new_nodes(count=1, rack=node.rack)
self.unset_current_running_nemesis(new_node)

# NOTE: wait for all other neighbour pods become ready
@@ -3546,7 +3576,7 @@ def remove_node():
"Node was not removed properly (Node status:{})".format(removed_node_status)

# add new node
- new_node = self._add_and_init_new_cluster_node(rack=self.target_node.rack)
+ new_node = self._add_and_init_new_cluster_nodes(count=1, rack=self.target_node.rack)
# in case the removed node was not last seed.
if node_to_remove.is_seed and num_of_seed_nodes > 1:
new_node.set_seed_flag(True)
@@ -3830,7 +3860,7 @@ def decommission_post_action():
self.log.error('Unexpected exception raised in checking decommission status: %s', exc)

self.log.info('Decommission might complete before stopping it. Re-add a new node')
- new_node = self._add_and_init_new_cluster_node(rack=self.target_node.rack)
+ new_node = self._add_and_init_new_cluster_nodes(count=1, rack=self.target_node.rack)
if new_node.is_seed != target_is_seed:
new_node.set_seed_flag(target_is_seed)
self.cluster.update_seed_provider()
@@ -3985,17 +4015,36 @@ def disrupt_corrupt_then_scrub(self):

self.clear_snapshots()

- @latency_calculator_decorator(legend="Adding new node")
- def add_new_node(self, rack=0):
- return self._add_and_init_new_cluster_node(rack=rack)
+ @latency_calculator_decorator(legend="Adding new nodes")
+ def add_new_nodes(self, count, rack=None):
+ nodes = self._add_and_init_new_cluster_nodes(count, rack=rack)
+ self._wait_for_tablets_balanced(nodes[0])
+ return nodes

- @latency_calculator_decorator(legend="Decommission node: remove node from cluster")
- def decommission_node(self, node):
- self.cluster.decommission(node)
+ @latency_calculator_decorator(legend="Decommission nodes: remove nodes from cluster")
+ def decommission_nodes(self, nodes):

- def decommission_nodes(self, add_nodes_number, rack, is_seed: Optional[Union[bool, DefaultValue]] = DefaultValue,
- dc_idx: Optional[int] = None):
- for idx in range(add_nodes_number):
+ def _decommission(node):
+ try:
+ InfoEvent(f'StartEvent - ShrinkCluster started decommissioning a node {node}').publish()
+ self.cluster.decommission(node)
+ InfoEvent(f'FinishEvent - ShrinkCluster has done decommissioning a node {node}').publish()
+ except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
+ InfoEvent(f'FinishEvent - ShrinkCluster failed decommissioning a node {self.target_node} with error '
+ f'{str(exc)}').publish()
+ raise
+
+ num_workers = None if (self.cluster.parallel_node_operations and nodes[0].raft.is_enabled) else 1
+ parallel_obj = ParallelObject(objects=nodes, timeout=7200, num_workers=num_workers)
+ InfoEvent(f'StartEvent - ShrinkCluster started decommissioning {len(nodes)} nodes').publish()
+ parallel_obj.run(_decommission, ignore_exceptions=False, unpack_objects=True)
+ self.monitoring_set.reconfigure_scylla_monitoring()
+ InfoEvent(f'FinishEvent - ShrinkCluster has done decommissioning {len(nodes)} nodes').publish()
+
+ def _decommission_nodes(self, nodes_number, rack, is_seed: Optional[Union[bool, DefaultValue]] = DefaultValue,
+ dc_idx: Optional[int] = None):
+ nodes_to_decommission = []
+ for idx in range(nodes_number):
if self._is_it_on_kubernetes():
if rack is None and self._is_it_on_kubernetes():
rack = 0
@@ -4004,15 +4053,17 @@ def decommission_nodes(self, add_nodes_number, rack, is_seed: Optional[Union[boo
rack_idx = rack if rack is not None else idx % self.cluster.racks_count
# if rack is not specified, round-robin racks
self.set_target_node(is_seed=is_seed, dc_idx=dc_idx, rack=rack_idx)
- self.log.info("Next node will be removed %s", self.target_node)
-
- try:
- InfoEvent(f'StartEvent - ShrinkCluster started decommissioning a node {self.target_node}').publish()
- self.decommission_node(self.target_node)
- InfoEvent(f'FinishEvent - ShrinkCluster has done decommissioning a node {self.target_node}').publish()
- except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
- InfoEvent(f'FinishEvent - ShrinkCluster failed decommissioning a node {self.target_node} with error '
- f'{str(exc)}').publish()
+ nodes_to_decommission.append(self.target_node)
+ self.target_node = None # otherwise node.running_nemesis will be taken off the node by self.set_target_node
+ try:
+ if self.cluster.parallel_node_operations:
+ self.decommission_nodes(nodes_to_decommission)
+ else:
+ for node in nodes_to_decommission:
+ self.decommission_nodes([node])
+ except Exception as exc: # pylint: disable=broad-except # noqa: BLE001
+ InfoEvent(f'FinishEvent - ShrinkCluster failed decommissioning a node {self.target_node} with error '
+ f'{str(exc)}').publish()

def disrupt_grow_shrink_cluster(self):
sleep_time_between_ops = self.cluster.params.get('nemesis_sequence_sleep_between_ops')
@@ -4037,21 +4088,19 @@ def _grow_cluster(self, rack=None):
if rack is None and self._is_it_on_kubernetes():
rack = 0
add_nodes_number = self.tester.params.get('nemesis_add_node_cnt')
- self.log.info("Start grow cluster on %s nodes", add_nodes_number)
- InfoEvent(message=f"Start grow cluster on {add_nodes_number} nodes").publish()
- for idx in range(add_nodes_number):
- # if rack is not specified, round-robin racks to spread nodes evenly
- rack_idx = rack if rack is not None else idx % self.cluster.racks_count
- InfoEvent(message=f'GrowCluster - Add New node to {rack_idx} rack').publish()
- added_node = self.add_new_node(rack=rack_idx)
- self.unset_current_running_nemesis(added_node)
- InfoEvent(message=f'GrowCluster - Done adding New node {added_node.name}').publish()
+ InfoEvent(message=f"Start grow cluster by {add_nodes_number} nodes").publish()
+ if self.cluster.parallel_node_operations:
+ self.add_new_nodes(count=add_nodes_number, rack=rack)
+ else:
+ for idx in range(add_nodes_number):
+ # if rack is not specified, round-robin racks to spread nodes evenly
+ rack_idx = rack if rack is not None else idx % self.cluster.racks_count
+ self.add_new_nodes(count=1, rack=rack_idx)
self.log.info("Finish cluster grow")
time.sleep(self.interval)

def _shrink_cluster(self, rack=None):
add_nodes_number = self.tester.params.get('nemesis_add_node_cnt')
- self.log.info("Start shrink cluster by %s nodes", add_nodes_number)
InfoEvent(message=f'Start shrink cluster by {add_nodes_number} nodes').publish()
# Check that number of nodes is enough for decommission:
cur_num_nodes_in_dc = len([n for n in self.cluster.nodes if n.dc_idx == self.target_node.dc_idx])
@@ -4074,7 +4123,7 @@ def _shrink_cluster(self, rack=None):
# Currently on kubernetes first two nodes of each rack are getting seed status
# Because of such behavior only way to get them decommission is to enable decommissioning
# TBD: After https://github.com/scylladb/scylla-operator/issues/292 is fixed remove is_seed parameter
- self.decommission_nodes(
+ self._decommission_nodes(
decommission_nodes_number,
rack,
is_seed=None if self._is_it_on_kubernetes() else DefaultValue,
@@ -5019,6 +5068,30 @@ def disrupt_disable_binary_gossip_execute_major_compaction(self):
self.target_node.restart_scylla_server()
raise

+ def _wait_for_tablets_balanced(self, node):
+ """
+ Waiting for tablets to be balanced using REST API.
+
+ doing it several times as there's a risk of:
+ "currently a small time window after adding nodes and before load balancing starts during which
+ topology may appear as quiesced because the state machine goes through an idle state before it enters load balancing state"
+ """
+ if not node.raft.is_enabled:
+ self.log.info("Raft is disabled, skipping wait for balance")
+ return
+ with self.cluster.cql_connection_patient(node=node) as session:
+ if not is_tablets_feature_enabled(session):
+ self.log.info("Tablets are disabled, skipping wait for balance")
+ return
+ time.sleep(60) # one minute gap before checking, just to give some time to the state machine
+ client = RemoteCurlClient(host="127.0.0.1:10000", endpoint="", node=node)
+ self.log.info("Waiting for tablets to be balanced")
+ for _ in range(3):
+ client.run_remoter_curl(method="POST", path="storage_service/quiesce_topology",
+ params={}, timeout=3600, retry=3)
+ time.sleep(5)
+ self.log.info("Tablets are balanced")
+

def disrupt_method_wrapper(method, is_exclusive=False): # pylint: disable=too-many-statements # noqa: PLR0915
"""

Commit Bot

<bot@cloudius-systems.com>
unread,
Jun 30, 2024, 2:22:27 PMJun 30
to scylladb-dev@googlegroups.com, Lukasz Sojka
From: Lukasz Sojka <lukasz...@scylladb.com>
Committer: Israel Fruchter <israel....@gmail.com>
Branch: branch-2024.2

improvement(parallel-operations): grow-shrink cluster

Adding and decommissioning nodes should be run in parallel as this is
currently Scylla's default behavior.

Refactored code to support it. Can be disabled if
`parallel_node_operations` param is set to false.

(cherry picked from commit 798228e278ad721eb580331697f13fa822806205)

---
diff --git a/sdcm/cluster.py b/sdcm/cluster.py
--- a/sdcm/cluster.py
+++ b/sdcm/cluster.py
@@ -3976,6 +3976,7 @@ def parallel_startup(self):

Commit Bot

<bot@cloudius-systems.com>
unread,
Jun 30, 2024, 2:24:43 PMJun 30
to scylladb-dev@googlegroups.com, Lukasz Sojka
From: Lukasz Sojka <lukasz...@scylladb.com>
Committer: Israel Fruchter <israel....@gmail.com>
Branch: branch-6.0

improvement(parallel-operations): grow-shrink cluster

Adding and decommissioning nodes should be run in parallel as this is
currently Scylla's default behavior.

Refactored code to support it. Can be disabled if
`parallel_node_operations` param is set to false.

(cherry picked from commit 798228e278ad721eb580331697f13fa822806205)

---
diff --git a/sdcm/cluster.py b/sdcm/cluster.py
--- a/sdcm/cluster.py
+++ b/sdcm/cluster.py
@@ -3978,6 +3978,7 @@ def parallel_startup(self):
@@ -72,6 +72,7 @@
from sdcm.prometheus import nemesis_metrics_obj
from sdcm.provision.scylla_yaml import SeedProvider
from sdcm.remote.libssh2_client.exceptions import UnexpectedExit as Libssh2UnexpectedExit
+from sdcm.rest.remote_curl_client import RemoteCurlClient
from sdcm.sct_events import Severity
from sdcm.sct_events.database import DatabaseLogEvent
from sdcm.sct_events.decorators import raise_event_on_failure
@@ -1228,8 +1229,8 @@ def add_ldap_configuration_to_node(node):
if not ContainerManager.is_running(self.tester.localhost, 'ldap'):
raise LdapNotRunning("LDAP server was supposed to be running, but it is not")

- def _add_and_init_new_cluster_node(self, old_node_ip=None, host_id=None,
- timeout=MAX_TIME_WAIT_FOR_NEW_NODE_UP, rack=0):
+ def _replace_cluster_node(self, old_node_ip=None, host_id=None,
+ timeout=MAX_TIME_WAIT_FOR_NEW_NODE_UP, rack=0):
"""When old_node_ip or host_id are not None then replacement node procedure is initiated"""
# TODO: make it work on K8S when we have decommissioned (by nodetool) nodes.
# Now it will fail because pod which hosts decommissioned Scylla member is reported
@@ -1268,6 +1269,35 @@ def _add_and_init_new_cluster_node(self, old_node_ip=None, host_id=None,
@@ -1282,7 +1312,7 @@ def _nodetool_decommission(self, add_node=True):
if add_node:
# When adding node after decommission the node is declared as up only after it completed bootstrapping,
# increasing the timeout for now
- new_node = self._add_and_init_new_cluster_node(rack=self.target_node.rack)
+ new_node = self._add_and_init_new_cluster_nodes(count=1, rack=self.target_node.rack)
# after decomission and add_node, the left nodes have data that isn't part of their tokens anymore.
# In order to eliminate cases that we miss a "data loss" bug because of it, we cleanup this data.
# This fix important when just user profile is run in the test and "keyspace1" doesn't exist.
@@ -1323,7 +1353,7 @@ def _terminate_and_wait(self, target_node, sleep_time=300):

@latency_calculator_decorator(legend="Replace a node in cluster with new one")
def replace_node(self, old_node_ip, host_id, rack=0):
- return self._add_and_init_new_cluster_node(old_node_ip, host_id, rack=rack)
+ return self._replace_cluster_node(old_node_ip, host_id, rack=rack)

def _verify_resharding_on_k8s(self, cpus, dc_idx):
nodes_data = []
@@ -1500,7 +1530,7 @@ def _disrupt_kubernetes_then_decommission_and_add_scylla_node(self, disruption_m
self.log.info('Decommission %s', node)
self.cluster.decommission(node, timeout=MAX_TIME_WAIT_FOR_DECOMMISSION)

- new_node = self.add_new_node(rack=node.rack)
+ new_node = self.add_new_nodes(count=1, rack=node.rack)
self.unset_current_running_nemesis(new_node)

# NOTE: wait for all other neighbour pods become ready
@@ -3550,7 +3580,7 @@ def remove_node():
"Node was not removed properly (Node status:{})".format(removed_node_status)

# add new node
- new_node = self._add_and_init_new_cluster_node(rack=self.target_node.rack)
+ new_node = self._add_and_init_new_cluster_nodes(count=1, rack=self.target_node.rack)
# in case the removed node was not last seed.
if node_to_remove.is_seed and num_of_seed_nodes > 1:
new_node.set_seed_flag(True)
@@ -3834,7 +3864,7 @@ def decommission_post_action():
self.log.error('Unexpected exception raised in checking decommission status: %s', exc)

self.log.info('Decommission might complete before stopping it. Re-add a new node')
- new_node = self._add_and_init_new_cluster_node(rack=self.target_node.rack)
+ new_node = self._add_and_init_new_cluster_nodes(count=1, rack=self.target_node.rack)
if new_node.is_seed != target_is_seed:
new_node.set_seed_flag(target_is_seed)
self.cluster.update_seed_provider()
@@ -3989,17 +4019,36 @@ def disrupt_corrupt_then_scrub(self):
@@ -4008,15 +4057,17 @@ def decommission_nodes(self, add_nodes_number, rack, is_seed: Optional[Union[boo
@@ -4041,21 +4092,19 @@ def _grow_cluster(self, rack=None):
@@ -4078,7 +4127,7 @@ def _shrink_cluster(self, rack=None):
# Currently on kubernetes first two nodes of each rack are getting seed status
# Because of such behavior only way to get them decommission is to enable decommissioning
# TBD: After https://github.com/scylladb/scylla-operator/issues/292 is fixed remove is_seed parameter
- self.decommission_nodes(
+ self._decommission_nodes(
decommission_nodes_number,
rack,
is_seed=None if self._is_it_on_kubernetes() else DefaultValue,
@@ -5023,6 +5072,30 @@ def disrupt_disable_binary_gossip_execute_major_compaction(self):
Reply all
Reply to author
Forward
0 new messages