[QUEUED scylladb next] tablets, raft topology: Add support for decommission with tablets

16 views
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
Sep 14, 2023, 10:16:57 AM9/14/23
to scylladb-dev@googlegroups.com, Tomasz Grabiec
From: Tomasz Grabiec <tgra...@scylladb.com>
Committer: Tomasz Grabiec <tgra...@scylladb.com>
Branch: next

tablets, raft topology: Add support for decommission with tablets

Load balancer will recognize decommissioning nodes and will
move tablet replicas away from such nodes with highest priority.

Topology changes have now an extra step called "tablet draining" which
calls the load balancer. The step will execute tablet migration track
as long as there are nodes which require draining. It will not do regular
load balancing.

If load balancer is unable to find new tablet replicas, because RF
cannot be met or availability is at risk due to insufficient node
distribution in racks, it will throw an exception. Currently, topology
change will retry in a loop. We should make this error cause topology
change to be paused so that admin becomes aware of the problem and
issues an abort on the topology change. There is no infrastructure for
aborts yet, so this is not implemented.

---
diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md
--- a/docs/dev/topology-over-raft.md
+++ b/docs/dev/topology-over-raft.md
@@ -76,17 +76,17 @@ such as `check_and_repair_cdc_streams`.
If there is no work for the state machine, tablet load balancer is invoked to
check if we need to rebalance. If so, it computes an incremental tablet migration
plan, persists it by moving tablets into transitional states, and moves the state machine
-into the tablet migration track. All this happens atomically form the perspective
+into the tablet migration track. All this happens atomically from the perspective
of group0 state machine.

The tablet migration track also invokes the load balancer and starts new migrations
to keep the cluster saturated with streaming. The load balancer is invoked
on transition of tablet stages, and also continuously as long as it generates
new migrations.

-If there is a pending topology change request, the load balancer
-will not be invoked to allow for current migrations to drain, after which the
-state machine will exit the tablet migration track and allow pending topology
+If there is a pending topology change request during tablet migration track,
+the load balancer will not be invoked to allow for current migrations to drain,
+after which the state machine will exit the tablet migration track and allow pending topology
operation to start.

The tablet migration track excludes with other topology changes, so node operations
@@ -103,6 +103,12 @@ that there are no tablet transitions in the system.

Tablets are migrated in parallel and independently.

+There is a variant of tablet migration track called tablet draining track, which is invoked
+as a step of certain topology operations (e.g. decommission). Its goal is to readjust tablet replicas
+so that a given topology change can proceed. For example, when decommissioning a node, we
+need to migrate tablet replicas away from the node being decommissioned.
+Tablet draining happens before making changes to vnode-based replication.
+
# Tablet migration

Each tablet has its own migration state machine stored in group0 which is part of the tablet state. It involves
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -427,6 +427,8 @@ future<> storage_service::topology_state_load() {
[[fallthrough]];
case topology::transition_state::commit_cdc_generation:
[[fallthrough]];
+ case topology::transition_state::tablet_draining:
+ [[fallthrough]];
case topology::transition_state::write_both_read_old:
return read_new_t::no;
case topology::transition_state::write_both_read_new:
@@ -1473,7 +1475,11 @@ class topology_coordinator {
}
}

- future<> handle_tablet_migration(group0_guard guard) {
+ // When "drain" is true, we migrate tablets only as long as there are nodes to drain
+ // and then change the transition state to write_both_read_old. Also, while draining,
+ // we ignore pending topology requests which normally interrupt load balancing.
+ // When "drain" is false, we do regular load balancing.
+ future<> handle_tablet_migration(group0_guard guard, bool drain) {
// This step acts like a pump which advances state machines of individual tablets,
// batching barriers and group0 updates.
// If progress cannot be made, e.g. because all transitions are streaming, we block
@@ -1575,17 +1581,25 @@ class topology_coordinator {

// In order to keep the cluster saturated, ask the load balancer for more transitions.
// Unless there is a pending topology change operation.
- auto ts = guard.write_timestamp();
- auto [preempt, new_guard] = should_preempt_balancing(std::move(guard));
- guard = std::move(new_guard);
- if (ts != guard.write_timestamp()) {
- // We rely on the fact that should_preempt_balancing() does not release the guard
- // so that tablet metadata reading and updates are atomic.
- on_internal_error(slogger, "should_preempt_balancing() retook the guard");
+ bool preempt = false;
+ if (!drain) {
+ // When draining, this method is invoked with an active node transition, which
+ // would normally cause preemption, which we don't want here.
+ auto ts = guard.write_timestamp();
+ auto [new_preempt, new_guard] = should_preempt_balancing(std::move(guard));
+ preempt = new_preempt;
+ guard = std::move(new_guard);
+ if (ts != guard.write_timestamp()) {
+ // We rely on the fact that should_preempt_balancing() does not release the guard
+ // so that tablet metadata reading and updates are atomic.
+ on_internal_error(slogger, "should_preempt_balancing() retook the guard");
+ }
}
if (!preempt) {
auto plan = co_await _tablet_allocator.balance_tablets(get_token_metadata_ptr());
- co_await generate_migration_updates(updates, guard, plan);
+ if (!drain || plan.has_nodes_to_drain()) {
+ co_await generate_migration_updates(updates, guard, plan);
+ }
}

// The updates have to be executed under the same guard which was used to read tablet metadata
@@ -1627,11 +1641,19 @@ class topology_coordinator {
co_return;
}

- updates.emplace_back(
- topology_mutation_builder(guard.write_timestamp())
- .del_transition_state()
- .set_version(_topo_sm._topology.version + 1)
- .build());
+ if (drain) {
+ updates.emplace_back(
+ topology_mutation_builder(guard.write_timestamp())
+ .set_transition_state(topology::transition_state::write_both_read_old)
+ .set_version(_topo_sm._topology.version + 1)
+ .build());
+ } else {
+ updates.emplace_back(
+ topology_mutation_builder(guard.write_timestamp())
+ .del_transition_state()
+ .set_version(_topo_sm._topology.version + 1)
+ .build());
+ }
co_await update_topology_state(std::move(guard), std::move(updates), "Finished tablet migration");
}

@@ -1769,6 +1791,9 @@ class topology_coordinator {
co_await update_topology_state(std::move(guard), {builder.build()}, std::move(str));
}
break;
+ case topology::transition_state::tablet_draining:
+ co_await handle_tablet_migration(std::move(guard), true);
+ break;
case topology::transition_state::write_both_read_old: {
auto node = get_node_to_work_on(std::move(guard));

@@ -1921,7 +1946,7 @@ class topology_coordinator {
}
break;
case topology::transition_state::tablet_migration:
- co_await handle_tablet_migration(std::move(guard));
+ co_await handle_tablet_migration(std::move(guard), false);
break;
}
co_return true;
@@ -1969,7 +1994,7 @@ class topology_coordinator {
// start decommission and put tokens of decommissioning nodes into write_both_read_old state
// meaning that reads will go to the replica being decommissioned
// but writes will go to new owner as well
- builder.set_transition_state(topology::transition_state::write_both_read_old)
+ builder.set_transition_state(topology::transition_state::tablet_draining)
.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.set("node_state", node_state::decommissioning)
@@ -1979,10 +2004,7 @@ class topology_coordinator {
break;
case topology_request::remove:
assert(node.rs->ring);
- // start removing and put tokens of a node been removed into write_both_read_old state
- // meaning that reads will go to the replica being removed (it is dead though)
- // but writes will go to new owner as well
- builder.set_transition_state(topology::transition_state::write_both_read_old)
+ builder.set_transition_state(topology::transition_state::tablet_draining)
.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.set("node_state", node_state::removing)
@@ -1996,10 +2018,7 @@ class topology_coordinator {
auto it = _topo_sm._topology.normal_nodes.find(replaced_id);
assert(it != _topo_sm._topology.normal_nodes.end());
assert(it->second.ring && it->second.state == node_state::normal);
- // start replacing and take ownership of the tokens of a node been replaced
- // and put them into write_both_read_old state meaning that reads will go
- // to the replica being removed (it is dead though) but writes will go to new owner as well
- builder.set_transition_state(topology::transition_state::write_both_read_old)
+ builder.set_transition_state(topology::transition_state::tablet_draining)
.set_version(_topo_sm._topology.version + 1)
.with_node(node.id)
.set("node_state", node_state::replacing)
diff --git a/service/tablet_allocator.cc b/service/tablet_allocator.cc
--- a/service/tablet_allocator.cc
+++ b/service/tablet_allocator.cc
@@ -141,6 +141,14 @@ class load_balancer_stats_manager {
/// means that many under-loaded nodes can be driven forward to balance concurrently because the load balancer
/// will alternate between them across make_plan() calls.
///
+/// The algorithm behaves differently when there are decommissioning nodes which have tablet replicas.
+/// In this case, we move those tablets away first. The balancing works in the opposite direction.
+/// Rather than picking a single least-loaded target and moving tablets into it from many sources,
+/// we have a single source and move tablets to multiple targets. This process necessarily disregards
+/// convergence checks, and the stop condition is that the source is drained. We still take target
+/// load into consideration and pick least-loaded targets first. When draining is not possible
+/// because there is no viable new replica for a tablet, load balancing will throw an exception.
+///
/// If the algorithm is called with active tablet migrations in tablet metadata, those are treated
/// by load balancer as if they were already completed. This allows the algorithm to incrementally
/// make decision which when executed with active migrations will produce the desired result.
@@ -303,6 +311,8 @@ class load_balancer {
}

future<migration_plan> make_plan(dc_name dc) {
+ migration_plan plan;
+
_stats.for_dc(dc).calls++;
lblogger.info("Examining DC {}", dc);

@@ -317,18 +327,33 @@ class load_balancer {
// Select subset of nodes to balance.

std::unordered_map<host_id, node_load> nodes;
+ std::unordered_set<host_id> nodes_to_drain;
topo.for_each_node([&] (const locator::node* node_ptr) {
- if (node_ptr->get_state() == locator::node::state::normal && node_ptr->dc_rack().dc == dc) {
+ if (node_ptr->dc_rack().dc != dc) {
+ return;
+ }
+ if (node_ptr->get_state() == locator::node::state::normal
+ || node_ptr->get_state() == locator::node::state::being_decommissioned) {
node_load& load = nodes[node_ptr->host_id()];
load.id = node_ptr->host_id();
load.shard_count = node_ptr->get_shard_count();
load.shards.resize(load.shard_count);
if (!load.shard_count) {
throw std::runtime_error(format("Shard count of {} not found in topology", node_ptr->host_id()));
}
+ if (node_ptr->get_state() == locator::node::state::being_decommissioned) {
+ lblogger.info("Will drain node {} from DC {}", node_ptr->host_id(), dc);
+ nodes_to_drain.emplace(node_ptr->host_id());
+ }
}
});

+ if (nodes.empty()) {
+ lblogger.debug("No nodes to balance.");
+ _stats.for_dc(dc).stop_balance++;
+ co_return plan;
+ }
+
// Compute tablet load on nodes.

for (auto&& [table, tmap_] : _tm->tablets().all_tables()) {
@@ -351,36 +376,55 @@ class load_balancer {
});
}

+ // Detect finished drain.
+
+ for (auto i = nodes_to_drain.begin(); i != nodes_to_drain.end();) {
+ if (nodes[*i].tablet_count == 0) {
+ lblogger.info("Node {} is already drained, ignoring", *i);
+ nodes.erase(*i);
+ i = nodes_to_drain.erase(i);
+ } else {
+ ++i;
+ }
+ }
+
+ plan.set_has_nodes_to_drain(!nodes_to_drain.empty());
+
// Compute load imbalance.

load_type max_load = 0;
load_type min_load = 0;
std::optional<host_id> min_load_node = std::nullopt;
for (auto&& [host, load] : nodes) {
load.update();
- if (!min_load_node || load.avg_load < min_load) {
- min_load = load.avg_load;
- min_load_node = host;
- }
- if (load.avg_load > max_load) {
- max_load = load.avg_load;
- }
_stats.for_node(dc, host).load = load.avg_load;
- }

- if (!shuffle && max_load == min_load) {
- // load is balanced.
- // TODO: Evaluate and fix intra-node balance.
- _stats.for_dc(dc).stop_balance++;
- co_return migration_plan();
+ if (!nodes_to_drain.contains(host)) {
+ if (!min_load_node || load.avg_load < min_load) {
+ min_load = load.avg_load;
+ min_load_node = host;
+ }
+ if (load.avg_load > max_load) {
+ max_load = load.avg_load;
+ }
+ }
}

for (auto&& [host, load] : nodes) {
- lblogger.info("Node {}: rack={} avg_load={}, tablets={}, shards={}",
- host, topo.find_node(host)->dc_rack().rack, load.avg_load, load.tablet_count, load.shard_count);
+ auto& node = topo.get_node(host);
+ lblogger.info("Node {}: rack={} avg_load={}, tablets={}, shards={}, state={}",
+ host, node.dc_rack().rack, load.avg_load, load.tablet_count, load.shard_count, node.get_state());
+ }
+
+ if (nodes_to_drain.empty()) {
+ if (!shuffle && max_load == min_load) {
+ // load is balanced.
+ // TODO: Evaluate and fix intra-node balance.
+ _stats.for_dc(dc).stop_balance++;
+ co_return plan;
+ }
+ lblogger.info("target node: {}, avg_load: {}, max: {}", *min_load_node, min_load, max_load);
}
- lblogger.info("target node: {}, avg_load: {}, max: {}", *min_load_node, min_load, max_load);
- auto target = *min_load_node;

// We want to saturate the target node so we migrate several tablets in parallel, one for each shard
// on the target node. This assumes that the target node is well-balanced and that tablet migrations
@@ -392,8 +436,8 @@ class load_balancer {
// will suffer because more loaded shards will not participate, which will under-utilize the node.
// FIXME: To handle the above, we should rebalance the target node before migrating tablets from other nodes.

- auto target_node = topo.find_node(target);
- auto batch_size = target_node->get_shard_count();
+ auto target = *min_load_node;
+ auto batch_size = nodes[target].shard_count;

// Compute per-shard load and candidate tablets.

@@ -435,12 +479,25 @@ class load_balancer {

// Prepare candidate nodes and shards for heap-based balancing.

+ // Any given node is either in nodes_by_load or nodes_by_load_dst, but not both.
+ // This means that either of the heap needs to be updated when the node's load changes, not both.
+
// heap which tracks most-loaded nodes in terms of avg_load.
+ // It is used to find source tablet candidates.
std::vector<host_id> nodes_by_load;
nodes_by_load.reserve(nodes.size());
+
+ // heap which tracks least-loaded nodes in terms of avg_load.
+ // Used to find candidates for target nodes.
+ std::vector<host_id> nodes_by_load_dst;
+ nodes_by_load_dst.reserve(nodes.size());
+
auto nodes_cmp = [&] (const host_id& a, const host_id& b) {
return nodes[a].avg_load < nodes[b].avg_load;
};
+ auto nodes_dst_cmp = [&] (const host_id& a, const host_id& b) {
+ return nodes_cmp(b, a);
+ };

for (auto&& [host, node_load] : nodes) {
if (lblogger.is_enabled(seastar::log_level::debug)) {
@@ -452,21 +509,27 @@ class load_balancer {
}
}

- nodes_by_load.push_back(host);
- std::make_heap(node_load.shards_by_load.begin(), node_load.shards_by_load.end(), node_load.shards_by_load_cmp());
+ if (host != target && (nodes_to_drain.empty() || nodes_to_drain.contains(host))) {
+ nodes_by_load.push_back(host);
+ std::make_heap(node_load.shards_by_load.begin(), node_load.shards_by_load.end(),
+ node_load.shards_by_load_cmp());
+ } else {
+ nodes_by_load_dst.push_back(host);
+ }
}

std::make_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
+ std::make_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);

- migration_plan plan;
const tablet_metadata& tmeta = _tm->tablets();
load_type max_off_candidate_load = 0; // max load among nodes which ran out of candidates.
- auto& target_info = nodes[target];
- const size_t max_skipped_migrations = target_info.shards.size() * 2;
+ const size_t max_skipped_migrations = nodes[target].shards.size() * 2;
size_t skipped_migrations = 0;
while (plan.size() < batch_size) {
co_await coroutine::maybe_yield();

+ // Pick a source tablet.
+
if (nodes_by_load.empty()) {
lblogger.debug("No more candidate nodes");
_stats.for_dc(dc).stop_no_candidates++;
@@ -477,7 +540,110 @@ class load_balancer {
auto src_host = nodes_by_load.back();
auto& src_node_info = nodes[src_host];

- if (!shuffle) {
+ if (src_node_info.shards_by_load.empty()) {
+ lblogger.debug("candidate node {} ran out of candidate shards with {} tablets remaining.",
+ src_host, src_node_info.tablet_count);
+ max_off_candidate_load = std::max(max_off_candidate_load, src_node_info.avg_load);
+ nodes_by_load.pop_back();
+ continue;
+ }
+ auto push_back_node_candidate = seastar::defer([&] {
+ std::push_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
+ });
+
+ std::pop_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
+ auto src_shard = src_node_info.shards_by_load.back();
+ auto src = tablet_replica{src_host, src_shard};
+ auto&& src_shard_info = src_node_info.shards[src_shard];
+ if (src_shard_info.candidates.empty()) {
+ lblogger.debug("shard {} ran out of candidates with {} tablets remaining.", src, src_shard_info.tablet_count);
+ src_node_info.shards_by_load.pop_back();
+ continue;
+ }
+ auto push_back_shard_candidate = seastar::defer([&] {
+ std::push_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
+ });
+
+ auto source_tablet = *src_shard_info.candidates.begin();
+ src_shard_info.candidates.erase(source_tablet);
+ auto& tmap = tmeta.get_tablet_map(source_tablet.table);
+
+ // Pick a target node.
+
+ if (nodes_by_load_dst.empty()) {
+ lblogger.debug("No more target nodes");
+ _stats.for_dc(dc).stop_no_candidates++;
+ break;
+ }
+
+ // The post-condition of this block is that nodes_by_load_dst.back() is a viable target node
+ // for the source tablet.
+ if (nodes_to_drain.empty()) {
+ std::pop_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
+ } else {
+ std::unordered_set<host_id> replicas;
+ std::unordered_map<sstring, int> rack_load;
+ int max_rack_load = 0;
+ for (auto&& r : tmap.get_tablet_info(source_tablet.tablet).replicas) {
+ replicas.insert(r.host);
+ if (nodes.contains(r.host)) {
+ const locator::node& node = topo.get_node(r.host);
+ rack_load[node.dc_rack().rack] += 1;
+ max_rack_load = std::max(max_rack_load, rack_load[node.dc_rack().rack]);
+ }
+ }
+
+ auto end = nodes_by_load_dst.end();
+ while (true) {
+ if (nodes_by_load_dst.begin() == end) {
+ throw std::runtime_error(format("Unable to find new replica for tablet {} on {} when draining {}",
+ source_tablet, src, nodes_to_drain));
+ }
+
+ pop_heap(nodes_by_load_dst.begin(), end, nodes_dst_cmp);
+ --end;
+ auto new_target = *end;
+
+ if (replicas.contains(new_target)) {
+ lblogger.debug("next best target {} (avg_load={}) skipped because it is already a replica for {}",
+ new_target, nodes[new_target].avg_load, source_tablet);
+ continue;
+ }
+
+ const locator::node& target_node = topo.get_node(new_target);
+ const locator::node& source_node = topo.get_node(src_host);
+ if (target_node.dc_rack().rack != source_node.dc_rack().rack
+ && (rack_load[target_node.dc_rack().rack] + 1 > max_rack_load)) {
+ lblogger.debug("next best target {} (avg_load={}) skipped because it would overload rack {} "
+ "with {} replicas of {}, current max is {}",
+ new_target, nodes[new_target].avg_load, target_node.dc_rack().rack,
+ rack_load[target_node.dc_rack().rack] + 1, source_tablet, max_rack_load);
+ continue;
+ }
+
+ // Found a viable target, restore the heap
+ std::swap(*end, nodes_by_load_dst.back());
+ while (end != std::prev(nodes_by_load_dst.end())) {
+ ++end;
+ push_heap(nodes_by_load_dst.begin(), end, nodes_dst_cmp);
+ }
+ break;
+ }
+ }
+
+ target = nodes_by_load_dst.back();
+ auto& target_info = nodes[target];
+ const locator::node& target_node = topo.get_node(target);
+ auto push_back_target_node = seastar::defer([&] {
+ std::push_heap(nodes_by_load_dst.begin(), nodes_by_load_dst.end(), nodes_dst_cmp);
+ });
+
+ lblogger.debug("target node: {}, avg_load={}", target, target_info.avg_load);
+
+ // Check convergence conditions.
+
+ // When draining nodes, disable convergence checks so that all tablets are migrated away.
+ if (!shuffle && nodes_to_drain.empty()) {
// Check if all nodes reached the same avg_load. There are three sets of nodes: target, candidates (nodes_by_load)
// and off-candidates (removed from nodes_by_load). At any time, the avg_load for target is not greater than
// that of any candidate, and avg_load of any candidate is not greater than that of any in the off-candidates set.
@@ -513,48 +679,24 @@ class load_balancer {
}
}

- if (src_node_info.shards_by_load.empty()) {
- lblogger.debug("candidate node {} ran out of candidate shards with {} tablets remaining.",
- src_host, src_node_info.tablet_count);
- max_off_candidate_load = std::max(max_off_candidate_load, src_node_info.avg_load);
- nodes_by_load.pop_back();
- continue;
- }
- auto push_back_node_candidate = seastar::defer([&] {
- std::push_heap(nodes_by_load.begin(), nodes_by_load.end(), nodes_cmp);
- });
-
- std::pop_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
- auto src_shard = src_node_info.shards_by_load.back();
- auto src = tablet_replica{src_host, src_shard};
- auto&& src_shard_info = src_node_info.shards[src_shard];
- if (src_shard_info.candidates.empty()) {
- lblogger.debug("shard {} ran out of candidates with {} tablets remaining.", src, src_shard_info.tablet_count);
- src_node_info.shards_by_load.pop_back();
- continue;
- }
- auto push_back_shard_candidate = seastar::defer([&] {
- std::push_heap(src_node_info.shards_by_load.begin(), src_node_info.shards_by_load.end(), src_node_info.shards_by_load_cmp());
- });
-
- auto source_tablet = *src_shard_info.candidates.begin();
- src_shard_info.candidates.erase(source_tablet);
-
// Check replication strategy constraints.

- auto same_rack = target_node->dc_rack().rack == topo.get_node(src.host).dc_rack().rack;
- std::unordered_map<sstring, int> rack_load; // Will be built if !same_rack
+ bool check_rack_load = false;
bool has_replica_on_target = false;
- auto& tmap = tmeta.get_tablet_map(source_tablet.table);
- for (auto&& r : tmap.get_tablet_info(source_tablet.tablet).replicas) {
- if (r.host == target) {
- has_replica_on_target = true;
- break;
- }
- if (!same_rack) {
- const locator::node& node = topo.get_node(r.host);
- if (node.dc_rack().dc == dc) {
- rack_load[node.dc_rack().rack] += 1;
+ std::unordered_map<sstring, int> rack_load; // Will be built if check_rack_load
+
+ if (nodes_to_drain.empty()) {
+ check_rack_load = target_node.dc_rack().rack != topo.get_node(src.host).dc_rack().rack;
+ for (auto&& r: tmap.get_tablet_info(source_tablet.tablet).replicas) {
+ if (r.host == target) {
+ has_replica_on_target = true;
+ break;
+ }
+ if (check_rack_load) {
+ const locator::node& node = topo.get_node(r.host);
+ if (node.dc_rack().dc == dc) {
+ rack_load[node.dc_rack().rack] += 1;
+ }
}
}
}
@@ -566,13 +708,13 @@ class load_balancer {
}

// Make sure we don't increase level of duplication of racks in the replica list.
- if (!same_rack) {
+ if (check_rack_load) {
auto max_rack_load = std::max_element(rack_load.begin(), rack_load.end(),
[] (auto& a, auto& b) { return a.second < b.second; })->second;
- auto new_rack_load = rack_load[target_node->dc_rack().rack] + 1;
+ auto new_rack_load = rack_load[target_node.dc_rack().rack] + 1;
if (new_rack_load > max_rack_load) {
lblogger.debug("candidate tablet {} skipped because it would increase load on rack {} to {}, max={}",
- source_tablet, target_node->dc_rack().rack, new_rack_load, max_rack_load);
+ source_tablet, target_node.dc_rack().rack, new_rack_load, max_rack_load);
_stats.for_dc(dc).tablets_skipped_rack++;
continue;
}
diff --git a/service/tablet_allocator.hh b/service/tablet_allocator.hh
--- a/service/tablet_allocator.hh
+++ b/service/tablet_allocator.hh
@@ -27,7 +27,11 @@ public:
using migrations_vector = utils::chunked_vector<tablet_migration_info>;
private:
migrations_vector _migrations;
+ bool _has_nodes_to_drain = false;
public:
+ /// Returns true iff there are decommissioning nodes which own some tablet replicas.
+ bool has_nodes_to_drain() const { return _has_nodes_to_drain; }
+
const migrations_vector& migrations() const { return _migrations; }
bool empty() const { return _migrations.empty(); }
size_t size() const { return _migrations.size(); }
@@ -38,6 +42,11 @@ public:

void merge(migration_plan&& other) {
std::move(other._migrations.begin(), other._migrations.end(), std::back_inserter(_migrations));
+ _has_nodes_to_drain |= other._has_nodes_to_drain;
+ }
+
+ void set_has_nodes_to_drain(bool b) {
+ _has_nodes_to_drain = b;
}
};

diff --git a/service/topology_state_machine.cc b/service/topology_state_machine.cc
--- a/service/topology_state_machine.cc
+++ b/service/topology_state_machine.cc
@@ -79,6 +79,7 @@ static std::unordered_map<topology::transition_state, sstring> transition_state_
{topology::transition_state::write_both_read_old, "write both read old"},
{topology::transition_state::write_both_read_new, "write both read new"},
{topology::transition_state::tablet_migration, "tablet migration"},
+ {topology::transition_state::tablet_draining, "tablet draining"},
};

std::ostream& operator<<(std::ostream& os, topology::transition_state s) {
diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh
--- a/service/topology_state_machine.hh
+++ b/service/topology_state_machine.hh
@@ -97,6 +97,7 @@ struct topology_features {
struct topology {
enum class transition_state: uint16_t {
commit_cdc_generation,
+ tablet_draining,
write_both_read_old,
write_both_read_new,
tablet_migration,
diff --git a/test/boost/tablets_test.cc b/test/boost/tablets_test.cc
--- a/test/boost/tablets_test.cc
+++ b/test/boost/tablets_test.cc
@@ -761,6 +761,335 @@ SEASTAR_THREAD_TEST_CASE(test_load_balancing_with_empty_node) {
}).get();
}

+SEASTAR_THREAD_TEST_CASE(test_decommission_rf_met) {
+ // Verifies that load balancer moves tablets out of the decommissioned node.
+ // The scenario is such that replication factor of tablets can be satisfied after decommission.
+ do_with_cql_env_thread([](auto& e) {
+ inet_address ip1("192.168.0.1");
+ inet_address ip2("192.168.0.2");
+ inet_address ip3("192.168.0.3");
+
+ auto host1 = host_id(next_uuid());
+ auto host2 = host_id(next_uuid());
+ auto host3 = host_id(next_uuid());
+
+ auto table1 = table_id(next_uuid());
+
+ semaphore sem(1);
+ shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config {
+ locator::topology::config {
+ .this_endpoint = ip1,
+ .local_dc_rack = locator::endpoint_dc_rack::default_location
+ }
+ });
+
+ stm.mutate_token_metadata([&](auto& tm) {
+ const unsigned shard_count = 2;
+
+ tm.update_host_id(host1, ip1);
+ tm.update_host_id(host2, ip2);
+ tm.update_host_id(host3, ip3);
+ tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
+ tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
+ tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned,
+ shard_count);
+
+ tablet_map tmap(4);
+ auto tid = tmap.first_tablet();
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host1, 0},
+ tablet_replica {host2, 1},
+ }
+ });
+ tid = *tmap.next_tablet(tid);
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host1, 0},
+ tablet_replica {host2, 1},
+ }
+ });
+ tid = *tmap.next_tablet(tid);
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host1, 0},
+ tablet_replica {host3, 0},
+ }
+ });
+ tid = *tmap.next_tablet(tid);
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host2, 1},
+ tablet_replica {host3, 1},
+ }
+ });
+ tablet_metadata tmeta;
+ tmeta.set_tablet_map(table1, std::move(tmap));
+ tm.set_tablets(std::move(tmeta));
+ return make_ready_future<>();
+ }).get();
+
+ rebalance_tablets(e.get_tablet_allocator().local(), stm);
+
+ {
+ load_sketch load(stm.get());
+ load.populate().get();
+ BOOST_REQUIRE(load.get_avg_shard_load(host1) == 2);
+ BOOST_REQUIRE(load.get_avg_shard_load(host2) == 2);
+ BOOST_REQUIRE(load.get_avg_shard_load(host3) == 0);
+ }
+
+ stm.mutate_token_metadata([&](auto& tm) {
+ tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::left);
+ return make_ready_future<>();
+ }).get();
+
+ rebalance_tablets(e.get_tablet_allocator().local(), stm);
+
+ {
+ load_sketch load(stm.get());
+ load.populate().get();
+ BOOST_REQUIRE(load.get_avg_shard_load(host1) == 2);
+ BOOST_REQUIRE(load.get_avg_shard_load(host2) == 2);
+ BOOST_REQUIRE(load.get_avg_shard_load(host3) == 0);
+ }
+ }).get();
+}
+
+SEASTAR_THREAD_TEST_CASE(test_decommission_two_racks) {
+ // Verifies that load balancer moves tablets out of the decommissioned node.
+ // The scenario is such that replication constraints of tablets can be satisfied after decommission.
+ do_with_cql_env_thread([](auto& e) {
+ inet_address ip1("192.168.0.1");
+ inet_address ip2("192.168.0.2");
+ inet_address ip3("192.168.0.3");
+ inet_address ip4("192.168.0.4");
+
+ auto host1 = host_id(next_uuid());
+ auto host2 = host_id(next_uuid());
+ auto host3 = host_id(next_uuid());
+ auto host4 = host_id(next_uuid());
+
+ std::vector<endpoint_dc_rack> racks = {
+ endpoint_dc_rack{ "dc1", "rack-1" },
+ endpoint_dc_rack{ "dc1", "rack-2" }
+ };
+
+ auto table1 = table_id(next_uuid());
+
+ semaphore sem(1);
+ shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config {
+ locator::topology::config {
+ .this_endpoint = ip1,
+ .local_dc_rack = racks[0]
+ }
+ });
+
+ stm.mutate_token_metadata([&](auto& tm) {
+ const unsigned shard_count = 1;
+
+ tm.update_host_id(host1, ip1);
+ tm.update_host_id(host2, ip2);
+ tm.update_host_id(host3, ip3);
+ tm.update_host_id(host4, ip4);
+ tm.update_topology(ip1, racks[0], std::nullopt, shard_count);
+ tm.update_topology(ip2, racks[1], std::nullopt, shard_count);
+ tm.update_topology(ip3, racks[0], std::nullopt, shard_count);
+ tm.update_topology(ip4, racks[1], node::state::being_decommissioned,
+ shard_count);
+
+ tablet_map tmap(4);
+ auto tid = tmap.first_tablet();
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host1, 0},
+ tablet_replica {host2, 0},
+ }
+ });
+ tid = *tmap.next_tablet(tid);
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host2, 0},
+ tablet_replica {host3, 0},
+ }
+ });
+ tid = *tmap.next_tablet(tid);
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host3, 0},
+ tablet_replica {host4, 0},
+ }
+ });
+ tid = *tmap.next_tablet(tid);
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host1, 0},
+ tablet_replica {host2, 0},
+ }
+ });
+ tablet_metadata tmeta;
+ tmeta.set_tablet_map(table1, std::move(tmap));
+ tm.set_tablets(std::move(tmeta));
+ return make_ready_future<>();
+ }).get();
+
+ rebalance_tablets(e.get_tablet_allocator().local(), stm);
+
+ {
+ load_sketch load(stm.get());
+ load.populate().get();
+ BOOST_REQUIRE(load.get_avg_shard_load(host1) >= 2);
+ BOOST_REQUIRE(load.get_avg_shard_load(host2) >= 2);
+ BOOST_REQUIRE(load.get_avg_shard_load(host3) >= 2);
+ BOOST_REQUIRE(load.get_avg_shard_load(host4) == 0);
+ }
+
+ // Verify replicas are not collocated on racks
+ {
+ auto tm = stm.get();
+ auto& tmap = tm->tablets().get_tablet_map(table1);
+ tmap.for_each_tablet([&](auto tid, auto& tinfo) {
+ auto rack1 = tm->get_topology().get_rack(tinfo.replicas[0].host);
+ auto rack2 = tm->get_topology().get_rack(tinfo.replicas[1].host);
+ BOOST_REQUIRE(rack1 != rack2);
+ }).get();
+ }
+ }).get();
+}
+
+SEASTAR_THREAD_TEST_CASE(test_decommission_rack_load_failure) {
+ // Verifies that load balancer moves tablets out of the decommissioned node.
+ // The scenario is such that it is impossible to distribute replicas without violating rack uniqueness.
+ do_with_cql_env_thread([](auto& e) {
+ inet_address ip1("192.168.0.1");
+ inet_address ip2("192.168.0.2");
+ inet_address ip3("192.168.0.3");
+ inet_address ip4("192.168.0.4");
+
+ auto host1 = host_id(next_uuid());
+ auto host2 = host_id(next_uuid());
+ auto host3 = host_id(next_uuid());
+ auto host4 = host_id(next_uuid());
+
+ std::vector<endpoint_dc_rack> racks = {
+ endpoint_dc_rack{ "dc1", "rack-1" },
+ endpoint_dc_rack{ "dc1", "rack-2" }
+ };
+
+ auto table1 = table_id(next_uuid());
+
+ semaphore sem(1);
+ shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config {
+ locator::topology::config {
+ .this_endpoint = ip1,
+ .local_dc_rack = racks[0]
+ }
+ });
+
+ stm.mutate_token_metadata([&](auto& tm) {
+ const unsigned shard_count = 1;
+
+ tm.update_host_id(host1, ip1);
+ tm.update_host_id(host2, ip2);
+ tm.update_host_id(host3, ip3);
+ tm.update_host_id(host4, ip4);
+ tm.update_topology(ip1, racks[0], std::nullopt, shard_count);
+ tm.update_topology(ip2, racks[0], std::nullopt, shard_count);
+ tm.update_topology(ip3, racks[0], std::nullopt, shard_count);
+ tm.update_topology(ip4, racks[1], node::state::being_decommissioned,
+ shard_count);
+
+ tablet_map tmap(4);
+ auto tid = tmap.first_tablet();
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host1, 0},
+ tablet_replica {host4, 0},
+ }
+ });
+ tid = *tmap.next_tablet(tid);
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host2, 0},
+ tablet_replica {host4, 0},
+ }
+ });
+ tid = *tmap.next_tablet(tid);
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host3, 0},
+ tablet_replica {host4, 0},
+ }
+ });
+ tid = *tmap.next_tablet(tid);
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host1, 0},
+ tablet_replica {host4, 0},
+ }
+ });
+ tablet_metadata tmeta;
+ tmeta.set_tablet_map(table1, std::move(tmap));
+ tm.set_tablets(std::move(tmeta));
+ return make_ready_future<>();
+ }).get();
+
+ BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm), std::runtime_error);
+ }).get();
+}
+
+SEASTAR_THREAD_TEST_CASE(test_decommission_rf_not_met) {
+ // Verifies that load balancer moves tablets out of the decommissioned node.
+ // The scenario is such that replication factor of tablets can be satisfied after decommission.
+ do_with_cql_env_thread([](auto& e) {
+ inet_address ip1("192.168.0.1");
+ inet_address ip2("192.168.0.2");
+ inet_address ip3("192.168.0.3");
+
+ auto host1 = host_id(next_uuid());
+ auto host2 = host_id(next_uuid());
+ auto host3 = host_id(next_uuid());
+
+ auto table1 = table_id(next_uuid());
+
+ semaphore sem(1);
+ shared_token_metadata stm([&sem]() noexcept { return get_units(sem, 1); }, locator::token_metadata::config {
+ locator::topology::config {
+ .this_endpoint = ip1,
+ .local_dc_rack = locator::endpoint_dc_rack::default_location
+ }
+ });
+
+ stm.mutate_token_metadata([&](auto& tm) {
+ const unsigned shard_count = 2;
+
+ tm.update_host_id(host1, ip1);
+ tm.update_host_id(host2, ip2);
+ tm.update_host_id(host3, ip3);
+ tm.update_topology(ip1, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
+ tm.update_topology(ip2, locator::endpoint_dc_rack::default_location, std::nullopt, shard_count);
+ tm.update_topology(ip3, locator::endpoint_dc_rack::default_location, node::state::being_decommissioned,
+ shard_count);
+
+ tablet_map tmap(1);
+ auto tid = tmap.first_tablet();
+ tmap.set_tablet(tid, tablet_info {
+ tablet_replica_set {
+ tablet_replica {host1, 0},
+ tablet_replica {host2, 0},
+ tablet_replica {host3, 0},
+ }
+ });
+ tablet_metadata tmeta;
+ tmeta.set_tablet_map(table1, std::move(tmap));
+ tm.set_tablets(std::move(tmeta));
+ return make_ready_future<>();
+ }).get();
+
+ BOOST_REQUIRE_THROW(rebalance_tablets(e.get_tablet_allocator().local(), stm), std::runtime_error);
+ }).get();
+}
+
SEASTAR_THREAD_TEST_CASE(test_load_balancing_works_with_in_progress_transitions) {
do_with_cql_env_thread([] (auto& e) {
// Tests the scenario of bootstrapping a single node.
diff --git a/test/topology_experimental_raft/test_tablets.py b/test/topology_experimental_raft/test_tablets.py
--- a/test/topology_experimental_raft/test_tablets.py
+++ b/test/topology_experimental_raft/test_tablets.py
@@ -143,7 +143,7 @@ async def test_table_drop_with_auto_snapshot(manager: ManagerClient):


@pytest.mark.asyncio
-async def test_bootstrap(manager: ManagerClient):
+async def test_topology_changes(manager: ManagerClient):
logger.info("Bootstrapping cluster")
servers = [await manager.server_add(), await manager.server_add(), await manager.server_add()]

@@ -178,4 +178,8 @@ async def check():
time.sleep(5) # Give load balancer some time to do work
await check()

+ await manager.decommission_node(servers[0].server_id)
+
+ await check()
+
await cql.run_async("DROP KEYSPACE test;")

Commit Bot

<bot@cloudius-systems.com>
unread,
Sep 14, 2023, 6:25:15 PM9/14/23
to scylladb-dev@googlegroups.com, Tomasz Grabiec
From: Tomasz Grabiec <tgra...@scylladb.com>
Committer: Tomasz Grabiec <tgra...@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages