service/paxos/paxos_state.cc | 88 +++++++++++++++++-------------------
1 file changed, 42 insertions(+), 46 deletions(-)
diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc
index 8f0bf51121a..a0e83d560f4 100644
--- a/service/paxos/paxos_state.cc
+++ b/service/paxos/paxos_state.cc
@@ -194,60 +194,56 @@ future<bool> paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks,
future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
if (utils::get_local_injector().enter("paxos_error_before_learn")) {
- return make_exception_future<>(utils::injected_error("injected_error_before_learn"));
+ co_await coroutine::return_exception(utils::injected_error("injected_error_before_learn"));
utils::latency_counter lc;
- return do_with(std::move(decision), [&sp, &sys_ks, tr_state = std::move(tr_state), schema, timeout] (proposal& decision) {
- auto f = utils::get_local_injector().inject("paxos_state_learn_timeout", timeout);
- replica::table& cf = sp.get_db().local().find_column_family(schema);
- db_clock::time_point t = cf.get_truncation_time();
- auto truncated_at = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch());
- // When saving a decision, also delete the last accepted proposal. This is just an
- // optimization to save space.
- // Even though there is no guarantee we will see decisions in the right order,
- // because messages can get delayed, so this decision can be older than our current most
- // recent accepted proposal/committed decision, saving it is always safe due to column timestamps.
- // Since the mutation uses the decision ballot timestamp, if cell timestamp of any current cell
- // is strictly greater than the decision one, saving the decision will not erase it.
- //
- // The table may have been truncated since the proposal was initiated. In that case, we
- // don't want to perform the mutation and potentially resurrect truncated data.
- if (utils::UUID_gen::unix_timestamp(decision.ballot) >= truncated_at) {
- f = f.then([&sp, schema, &decision, timeout, tr_state] {
- logger.debug("Committing decision {}", decision);
- tracing::trace(tr_state, "Committing decision {}", decision);
- // In case current schema is not the same as the schema in the decision
- // try to look it up first in the local schema_registry cache and upgrade
- // the mutation using schema from the cache.
- //
- // If there's no schema in the cache, then retrieve persisted column mapping
- // for that version and upgrade the mutation with it.
- if (decision.update.schema_version() != schema->version()) {
- on_internal_error(logger, format("schema version in learn does not match current schema"));
- }
- return sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
- });
- } else {
- logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);
- tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
- }
- return f.then([&sys_ks, &decision, schema, timeout] {
- // We don't need to lock the partition key if there is no gap between loading paxos
- // state and saving it, and here we're just blindly updating.
- return utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout, [&sys_ks, &decision, schema, timeout] {
- return sys_ks.save_paxos_decision(*schema, decision, timeout);
- });
- });
- }).finally([&sp, schema, lc] () mutable {
+ auto stats_updater = defer([&sp, schema, lc] () mutable {
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
+ co_await utils::get_local_injector().inject("paxos_state_learn_timeout", timeout);
+ replica::table& cf = sp.get_db().local().find_column_family(schema);
+ db_clock::time_point t = cf.get_truncation_time();
+ auto truncated_at = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch());
+ // When saving a decision, also delete the last accepted proposal. This is just an
+ // optimization to save space.
+ // Even though there is no guarantee we will see decisions in the right order,
+ // because messages can get delayed, so this decision can be older than our current most
+ // recent accepted proposal/committed decision, saving it is always safe due to column timestamps.
+ // Since the mutation uses the decision ballot timestamp, if cell timestamp of any current cell
+ // is strictly greater than the decision one, saving the decision will not erase it.
+ //
+ // The table may have been truncated since the proposal was initiated. In that case, we
+ // don't want to perform the mutation and potentially resurrect truncated data.
+ if (utils::UUID_gen::unix_timestamp(decision.ballot) >= truncated_at) {
+ logger.debug("Committing decision {}", decision);
+ tracing::trace(tr_state, "Committing decision {}", decision);
+ // In case current schema is not the same as the schema in the decision
+ // try to look it up first in the local schema_registry cache and upgrade
+ // the mutation using schema from the cache.
+ //
+ // If there's no schema in the cache, then retrieve persisted column mapping
+ // for that version and upgrade the mutation with it.
+ if (decision.update.schema_version() != schema->version()) {
+ on_internal_error(logger, format("schema version in learn does not match current schema"));
+ }
+ co_await sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
+ } else {
+ logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);
+ tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
+ }
+ // We don't need to lock the partition key if there is no gap between loading paxos
+ // state and saving it, and here we're just blindly updating.
+ co_await utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout);
+ co_return co_await sys_ks.save_paxos_decision(*schema, decision, timeout);
future<> paxos_state::prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,