[PATCH v1 0/6] co-routinize paxos_state functions

1 view
Skip to first unread message

Gleb Natapov

<gleb@scylladb.com>
unread,
Jun 30, 2024, 3:38:09 AMJun 30
to scylladb-dev@googlegroups.com
Co-routinize paxos_state functions to make them more readable

Also at scylla-dev gleb/coroutineze-paxos-state
CI: https://jenkins.scylladb.com/job/scylla-master/job/scylla-ci/10061/

Gleb Natapov (6):
paxos: introduce get_replica_lock() function to take RAII guard for
local paxos table access
paxos: co-routinize paxos_state::prepare function
paxos: co-routinize paxos_state::accept function
paxos: remove no longer used with_locked_key functions
paxos: co-routinize paxos_state::learn function
paxos: simplify paxos_state::prepare code to not work with raw futures

service/paxos/paxos_state.hh | 56 ++----
service/paxos/paxos_state.cc | 345 +++++++++++++++++------------------
service/storage_proxy.cc | 2 +-
3 files changed, 191 insertions(+), 212 deletions(-)

--
2.44.0

Gleb Natapov

<gleb@scylladb.com>
unread,
Jun 30, 2024, 3:38:10 AMJun 30
to scylladb-dev@googlegroups.com
---
service/paxos/paxos_state.cc | 187 ++++++++++++++++++-----------------
1 file changed, 94 insertions(+), 93 deletions(-)

diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc
index 400a1e32810..244449cd21f 100644
--- a/service/paxos/paxos_state.cc
+++ b/service/paxos/paxos_state.cc
@@ -7,7 +7,9 @@
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
+#include <exception>
#include <seastar/core/coroutine.hh>
+#include <seastar/coroutine/all.hh>
#include "service/storage_proxy.hh"
#include "service/paxos/proposal.hh"
#include "service/paxos/paxos_state.hh"
@@ -51,101 +53,100 @@ future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, cloc
future<prepare_response> paxos_state::prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema,
const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
- return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &sys_ks, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] {
- dht::token token = dht::get_token(*schema, key);
- utils::latency_counter lc;
- lc.start();
- // FIXME: Handle tablet intra-node migration: #16594.
- // The shard can change concurrently, so we cannot rely on locking on this shard.
- return with_locked_key(token, timeout, [&sp, &sys_ks, &cmd, token, &key, ballot, tr_state, schema, only_digest, da, timeout] () mutable {
- // When preparing, we need to use the same time as "now" (that's the time we use to decide if something
- // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
- // on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no
- // amount of re-submit will fix this (because the node on which the commit has expired will have a
- // tombstone that hides any re-submit). See CASSANDRA-12043 for details.
- auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot);
+ co_await utils::get_local_injector().inject("paxos_prepare_timeout", timeout);
+ dht::token token = dht::get_token(*schema, key);
+ utils::latency_counter lc;
+ lc.start();

- auto f = sys_ks.load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout);
- return f.then([&sp, &sys_ks, &cmd, token = std::move(token), &key, ballot, tr_state, schema, only_digest, da, timeout] (paxos_state state) {
- // If received ballot is newer that the one we already accepted it has to be accepted as well,
- // but we will return the previously accepted proposal so that the new coordinator will use it instead of
- // its own.
- if (ballot.timestamp() > state._promised_ballot.timestamp()) {
- logger.debug("Promising ballot {}", ballot);
- tracing::trace(tr_state, "Promising ballot {}", ballot);
- if (utils::get_local_injector().enter("paxos_error_before_save_promise")) {
- return make_exception_future<prepare_response>(utils::injected_error("injected_error_before_save_promise"));
- }
- auto f1 = futurize_invoke([&] {
- return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout);
- });
- auto f2 = futurize_invoke([&] {
- return do_with(dht::partition_range_vector({dht::partition_range::make_singular({token, key})}),
- [&sp, tr_state, schema, &cmd, only_digest, da, timeout] (const dht::partition_range_vector& prv) {
- return sp.get_db().local().query(schema, cmd,
- {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da},
- prv, tr_state, timeout);
- });
- });
- return when_all(std::move(f1), std::move(f2)).then([state = std::move(state), only_digest, schema, &sys_ks] (auto t) mutable {
- if (utils::get_local_injector().enter("paxos_error_after_save_promise")) {
- return make_exception_future<prepare_response>(utils::injected_error("injected_error_after_save_promise"));
- }
- auto&& f1 = std::get<0>(t);
- auto&& f2 = std::get<1>(t);
- if (f1.failed()) {
- f2.ignore_ready_future();
- // Failed to save promise. Nothing we can do but throw.
- return make_exception_future<prepare_response>(f1.get_exception());
- }
- std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>> data_or_digest;
- if (!f2.failed()) {
- auto&& [result, hit_rate] = f2.get();
- if (only_digest) {
- data_or_digest = *result->digest();
- } else {
- data_or_digest = std::move(make_foreign(std::move(result)));
- }
- } else {
- // Don't return errors querying the current value, just debug-log them, as the caller is prepared to fall back
- // on querying it by itself in case it's missing in the response.
- auto ex = f2.get_exception();
- logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex));
- }
- auto upgrade_if_needed = [schema = std::move(schema), &sys_ks] (std::optional<proposal> p) {
- if (!p || p->update.schema_version() == schema->version()) {
- return make_ready_future<std::optional<proposal>>(std::move(p));
- }
- // In case current schema is not the same as the schema in the proposal
- // try to look it up first in the local schema_registry cache and upgrade
- // the mutation using schema from the cache.
- //
- // If there's no schema in the cache, then retrieve persisted column mapping
- // for that version and upgrade the mutation with it.
- logger.debug("Stored mutation references outdated schema version. "
- "Trying to upgrade the accepted proposal mutation to the most recent schema version.");
- return service::get_column_mapping(sys_ks, p->update.column_family_id(), p->update.schema_version()).then([schema, p = std::move(p)] (const column_mapping& cm) {
- return make_ready_future<std::optional<proposal>>(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm))));
- });
- };
- return when_all_succeed(upgrade_if_needed(std::move(state._accepted_proposal)), upgrade_if_needed(std::move(state._most_recent_commit))).then([data_or_digest = std::move(data_or_digest)] (auto&& u) mutable {
- return prepare_response(promise(std::move(std::get<0>(u)), std::move(std::get<1>(u)), std::move(data_or_digest)));
- });
- });
- } else {
- logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
- tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
- // Return the currently promised ballot (rather than, e.g., the ballot of the last
- // accepted proposal) so the coordinator can make sure it uses a newer ballot next
- // time (#5667).
- return make_ready_future<prepare_response>(prepare_response(std::move(state._promised_ballot)));
- }
- });
- }).finally([&sp, schema, lc] () mutable {
- auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
- stats.cas_prepare.mark(lc.stop().latency());
- });
+ auto stats_updater = defer([&sp, schema, lc] () mutable {
+ auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
+ stats.cas_prepare.mark(lc.stop().latency());
});
+
+ auto guard = co_await get_replica_lock(token, timeout);
+ // FIXME: Handle tablet intra-node migration: #16594.
+ // The shard can change concurrently, so we cannot rely on locking on this shard.
+
+ // When preparing, we need to use the same time as "now" (that's the time we use to decide if something
+ // is expired or not) across nodes, otherwise we may have a window where a Most Recent Decision shows up
+ // on some replica and not others during a new proposal (in storage_proxy::begin_and_repair_paxos()), and no
+ // amount of re-submit will fix this (because the node on which the commit has expired will have a
+ // tombstone that hides any re-submit). See CASSANDRA-12043 for details.
+ auto now_in_sec = utils::UUID_gen::unix_timestamp_in_sec(ballot);
+
+ paxos_state state = co_await sys_ks.load_paxos_state(key, schema, gc_clock::time_point(now_in_sec), timeout);
+ // If received ballot is newer that the one we already accepted it has to be accepted as well,
+ // but we will return the previously accepted proposal so that the new coordinator will use it instead of
+ // its own.
+ if (ballot.timestamp() > state._promised_ballot.timestamp()) {
+ logger.debug("Promising ballot {}", ballot);
+ tracing::trace(tr_state, "Promising ballot {}", ballot);
+ if (utils::get_local_injector().enter("paxos_error_before_save_promise")) {
+ co_await coroutine::return_exception(utils::injected_error("injected_error_before_save_promise"));
+ }
+
+ auto [f1, f2] = co_await when_all(
+ [&] {
+ return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout);
+ },
+ coroutine::lambda([&] () -> future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>> {
+ co_return co_await sp.get_db().local().query(schema, cmd,
+ {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da},
+ dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), tr_state, timeout);
+ })
+ );
+
+ if (utils::get_local_injector().enter("paxos_error_after_save_promise")) {
+ co_await coroutine::return_exception(utils::injected_error("injected_error_after_save_promise"));
+ }
+
+ if (f1.failed()) {
+ f2.ignore_ready_future();
+ // Failed to save promise. Nothing we can do but throw.
+ co_return coroutine::exception(f1.get_exception());
+ }
+ std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>> data_or_digest;
+ if (!f2.failed()) {
+ auto&& [result, hit_rate] = f2.get();
+ if (only_digest) {
+ data_or_digest = *result->digest();
+ } else {
+ data_or_digest = make_foreign(std::move(result));
+ }
+ } else {
+ // Don't return errors querying the current value, just debug-log them, as the caller is prepared to fall back
+ // on querying it by itself in case it's missing in the response.
+ auto ex = f2.get_exception();
+ logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex));
+ }
+ auto upgrade_if_needed = [schema = std::move(schema), &sys_ks] (std::optional<proposal> p) -> future<std::optional<proposal>> {
+ if (!p || p->update.schema_version() == schema->version()) {
+ co_return std::move(p);
+ }
+ // In case current schema is not the same as the schema in the proposal
+ // try to look it up first in the local schema_registry cache and upgrade
+ // the mutation using schema from the cache.
+ //
+ // If there's no schema in the cache, then retrieve persisted column mapping
+ // for that version and upgrade the mutation with it.
+ logger.debug("Stored mutation references outdated schema version. "
+ "Trying to upgrade the accepted proposal mutation to the most recent schema version.");
+ const column_mapping& cm = co_await service::get_column_mapping(sys_ks, p->update.column_family_id(), p->update.schema_version());
+
+ co_return std::make_optional(proposal(p->ballot, freeze(p->update.unfreeze_upgrading(schema, cm))));
+ };
+
+ auto [u1, u2] = co_await coroutine::all(std::bind(upgrade_if_needed, std::move(state._accepted_proposal)), std::bind(upgrade_if_needed, std::move(state._most_recent_commit)));
+
+ co_return prepare_response(promise(std::move(u1), std::move(u2), std::move(data_or_digest)));
+ } else {
+ logger.debug("Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
+ tracing::trace(tr_state, "Promise rejected; {} is not sufficiently newer than {}", ballot, state._promised_ballot);
+ // Return the currently promised ballot (rather than, e.g., the ballot of the last
+ // accepted proposal) so the coordinator can make sure it uses a newer ballot next
+ // time (#5667).
+ co_return std::move(state._promised_ballot);
+ }
}

future<bool> paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema, dht::token token, const proposal& proposal,
--
2.44.0

Gleb Natapov

<gleb@scylladb.com>
unread,
Jun 30, 2024, 3:38:11 AMJun 30
to scylladb-dev@googlegroups.com
---
service/paxos/paxos_state.cc | 88 +++++++++++++++++-------------------
1 file changed, 42 insertions(+), 46 deletions(-)

diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc
index 8f0bf51121a..a0e83d560f4 100644
--- a/service/paxos/paxos_state.cc
+++ b/service/paxos/paxos_state.cc
@@ -194,60 +194,56 @@ future<bool> paxos_state::accept(storage_proxy& sp, db::system_keyspace& sys_ks,
future<> paxos_state::learn(storage_proxy& sp, db::system_keyspace& sys_ks, schema_ptr schema, proposal decision, clock_type::time_point timeout,
tracing::trace_state_ptr tr_state) {
if (utils::get_local_injector().enter("paxos_error_before_learn")) {
- return make_exception_future<>(utils::injected_error("injected_error_before_learn"));
+ co_await coroutine::return_exception(utils::injected_error("injected_error_before_learn"));
}

utils::latency_counter lc;
lc.start();

- return do_with(std::move(decision), [&sp, &sys_ks, tr_state = std::move(tr_state), schema, timeout] (proposal& decision) {
- auto f = utils::get_local_injector().inject("paxos_state_learn_timeout", timeout);
-
- replica::table& cf = sp.get_db().local().find_column_family(schema);
- db_clock::time_point t = cf.get_truncation_time();
- auto truncated_at = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch());
- // When saving a decision, also delete the last accepted proposal. This is just an
- // optimization to save space.
- // Even though there is no guarantee we will see decisions in the right order,
- // because messages can get delayed, so this decision can be older than our current most
- // recent accepted proposal/committed decision, saving it is always safe due to column timestamps.
- // Since the mutation uses the decision ballot timestamp, if cell timestamp of any current cell
- // is strictly greater than the decision one, saving the decision will not erase it.
- //
- // The table may have been truncated since the proposal was initiated. In that case, we
- // don't want to perform the mutation and potentially resurrect truncated data.
- if (utils::UUID_gen::unix_timestamp(decision.ballot) >= truncated_at) {
- f = f.then([&sp, schema, &decision, timeout, tr_state] {
- logger.debug("Committing decision {}", decision);
- tracing::trace(tr_state, "Committing decision {}", decision);
-
- // In case current schema is not the same as the schema in the decision
- // try to look it up first in the local schema_registry cache and upgrade
- // the mutation using schema from the cache.
- //
- // If there's no schema in the cache, then retrieve persisted column mapping
- // for that version and upgrade the mutation with it.
- if (decision.update.schema_version() != schema->version()) {
- on_internal_error(logger, format("schema version in learn does not match current schema"));
- }
-
- return sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
- });
- } else {
- logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);
- tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
- }
- return f.then([&sys_ks, &decision, schema, timeout] {
- // We don't need to lock the partition key if there is no gap between loading paxos
- // state and saving it, and here we're just blindly updating.
- return utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout, [&sys_ks, &decision, schema, timeout] {
- return sys_ks.save_paxos_decision(*schema, decision, timeout);
- });
- });
- }).finally([&sp, schema, lc] () mutable {
+ auto stats_updater = defer([&sp, schema, lc] () mutable {
auto& stats = sp.get_db().local().find_column_family(schema).get_stats();
stats.cas_learn.mark(lc.stop().latency());
});
+
+ co_await utils::get_local_injector().inject("paxos_state_learn_timeout", timeout);
+
+ replica::table& cf = sp.get_db().local().find_column_family(schema);
+ db_clock::time_point t = cf.get_truncation_time();
+ auto truncated_at = std::chrono::duration_cast<std::chrono::milliseconds>(t.time_since_epoch());
+ // When saving a decision, also delete the last accepted proposal. This is just an
+ // optimization to save space.
+ // Even though there is no guarantee we will see decisions in the right order,
+ // because messages can get delayed, so this decision can be older than our current most
+ // recent accepted proposal/committed decision, saving it is always safe due to column timestamps.
+ // Since the mutation uses the decision ballot timestamp, if cell timestamp of any current cell
+ // is strictly greater than the decision one, saving the decision will not erase it.
+ //
+ // The table may have been truncated since the proposal was initiated. In that case, we
+ // don't want to perform the mutation and potentially resurrect truncated data.
+ if (utils::UUID_gen::unix_timestamp(decision.ballot) >= truncated_at) {
+ logger.debug("Committing decision {}", decision);
+ tracing::trace(tr_state, "Committing decision {}", decision);
+
+ // In case current schema is not the same as the schema in the decision
+ // try to look it up first in the local schema_registry cache and upgrade
+ // the mutation using schema from the cache.
+ //
+ // If there's no schema in the cache, then retrieve persisted column mapping
+ // for that version and upgrade the mutation with it.
+ if (decision.update.schema_version() != schema->version()) {
+ on_internal_error(logger, format("schema version in learn does not match current schema"));
+ }
+
+ co_await sp.mutate_locally(schema, decision.update, tr_state, db::commitlog::force_sync::yes, timeout);
+ } else {
+ logger.debug("Not committing decision {} as ballot timestamp predates last truncation time", decision);
+ tracing::trace(tr_state, "Not committing decision {} as ballot timestamp predates last truncation time", decision);
+ }
+
+ // We don't need to lock the partition key if there is no gap between loading paxos
+ // state and saving it, and here we're just blindly updating.
+ co_await utils::get_local_injector().inject("paxos_timeout_after_save_decision", timeout);
+ co_return co_await sys_ks.save_paxos_decision(*schema, decision, timeout);
}

future<> paxos_state::prune(db::system_keyspace& sys_ks, schema_ptr schema, const partition_key& key, utils::UUID ballot, clock_type::time_point timeout,
--
2.44.0

Gleb Natapov

<gleb@scylladb.com>
unread,
Jun 30, 2024, 3:38:12 AMJun 30
to scylladb-dev@googlegroups.com
---
service/paxos/paxos_state.cc | 43 +++++++++++++++---------------------
1 file changed, 18 insertions(+), 25 deletions(-)

diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc
index a0e83d560f4..8a2a21931c0 100644
--- a/service/paxos/paxos_state.cc
+++ b/service/paxos/paxos_state.cc
@@ -86,40 +86,33 @@ future<prepare_response> paxos_state::prepare(storage_proxy& sp, db::system_keys
co_await coroutine::return_exception(utils::injected_error("injected_error_before_save_promise"));
}

- auto [f1, f2] = co_await when_all(
+ // The all() below throws only if save_paxos_promise fails.
+ // If querying the result fails we continue without read round optimization
+ auto [data_or_digest] = co_await coroutine::all(
[&] {
return sys_ks.save_paxos_promise(*schema, std::ref(key), ballot, timeout);
},
- coroutine::lambda([&] () -> future<std::tuple<lw_shared_ptr<query::result>, cache_temperature>> {
- co_return co_await sp.get_db().local().query(schema, cmd,
- {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da},
- dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), tr_state, timeout);
- })
+ [&] () -> future<std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>>> {
+ try {
+ auto&& [result, hit_rate] = co_await sp.get_db().local().query(schema, cmd,
+ {only_digest ? query::result_request::only_digest : query::result_request::result_and_digest, da},
+ dht::partition_range_vector({dht::partition_range::make_singular({token, key})}), tr_state, timeout);
+ if (only_digest) {
+ co_return *result->digest();
+ } else {
+ co_return make_foreign(std::move(result));
+ }
+ } catch(...) {
+ logger.debug("Failed to get data or digest: {}. Ignored.", std::current_exception());
+ co_return std::nullopt;
+ }
+ }
);

if (utils::get_local_injector().enter("paxos_error_after_save_promise")) {
co_await coroutine::return_exception(utils::injected_error("injected_error_after_save_promise"));
}

- if (f1.failed()) {
- f2.ignore_ready_future();
- // Failed to save promise. Nothing we can do but throw.
- co_return coroutine::exception(f1.get_exception());
- }
- std::optional<std::variant<foreign_ptr<lw_shared_ptr<query::result>>, query::result_digest>> data_or_digest;
- if (!f2.failed()) {
- auto&& [result, hit_rate] = f2.get();
- if (only_digest) {
- data_or_digest = *result->digest();
- } else {
- data_or_digest = make_foreign(std::move(result));
- }
- } else {
- // Don't return errors querying the current value, just debug-log them, as the caller is prepared to fall back
- // on querying it by itself in case it's missing in the response.
- auto ex = f2.get_exception();
- logger.debug("Failed to get data or digest: {}. Ignored.", std::move(ex));
- }
auto upgrade_if_needed = [schema = std::move(schema), &sys_ks] (std::optional<proposal> p) -> future<std::optional<proposal>> {
if (!p || p->update.schema_version() == schema->version()) {
co_return std::move(p);
--
2.44.0

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 1, 2024, 6:19:16 AMJul 1
to Gleb Natapov, scylladb-dev@googlegroups.com


On 6/30/24 09:36, Gleb Natapov wrote:
> ---
> service/paxos/paxos_state.cc | 187 ++++++++++++++++++-----------------
> 1 file changed, 94 insertions(+), 93 deletions(-)
>
> diff --git a/service/paxos/paxos_state.cc b/service/paxos/paxos_state.cc
> index 400a1e32810..244449cd21f 100644
> --- a/service/paxos/paxos_state.cc
> +++ b/service/paxos/paxos_state.cc
> @@ -7,7 +7,9 @@
> /*
> * SPDX-License-Identifier: AGPL-3.0-or-later
> */
> +#include <exception>
> #include <seastar/core/coroutine.hh>
> +#include <seastar/coroutine/all.hh>
> #include "service/storage_proxy.hh"
> #include "service/paxos/proposal.hh"
> #include "service/paxos/paxos_state.hh"
> @@ -51,101 +53,100 @@ future<paxos_state::guard> paxos_state::get_cas_lock(const dht::token& key, cloc
> future<prepare_response> paxos_state::prepare(storage_proxy& sp, db::system_keyspace& sys_ks, tracing::trace_state_ptr tr_state, schema_ptr schema,
> const query::read_command& cmd, const partition_key& key, utils::UUID ballot,
> bool only_digest, query::digest_algorithm da, clock_type::time_point timeout) {
> - return utils::get_local_injector().inject("paxos_prepare_timeout", timeout, [&sp, &sys_ks, &cmd, &key, ballot, tr_state, schema, only_digest, da, timeout] {
I don't understand what happened here.
All this code was living... inside the injection?
That doesn't make any sense.
Was `paxos_state::prepare` a no-op in release mode? 😅️

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 1, 2024, 6:25:07 AMJul 1
to Kamil Braun, scylladb-dev@googlegroups.com
This is a sleep injection. It injects a sleep and then executes the
functions. Release build simply executes the function directly.
--
Gleb.

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 1, 2024, 6:43:57 AMJul 1
to Gleb Natapov, scylladb-dev@googlegroups.com
Right, I got confused. I don't understand why the code didn't simply use
`then`, but ok.

Kamil Braun

<kbraun@scylladb.com>
unread,
Jul 2, 2024, 5:52:15 AMJul 2
to Gleb Natapov, scylladb-dev@googlegroups.com
LGTM

Avi Kivity

<avi@scylladb.com>
unread,
Jul 2, 2024, 11:07:32 AMJul 2
to Gleb Natapov, scylladb-dev@googlegroups.com
Too bad we don't have a coroutine::all() variant that returns futures. It's more efficient.

Gleb Natapov

<gleb@scylladb.com>
unread,
Jul 2, 2024, 11:11:39 AMJul 2
to Avi Kivity, scylladb-dev@googlegroups.com
On Tue, Jul 02, 2024 at 06:07:24PM +0300, 'Avi Kivity' via ScyllaDB development wrote:
> > +    paxos_state state = co_await sys_ks.load_paxos_state(key,
> > schema, gc_clock::time_point(now_in_sec), timeout);
> > +    // If received ballot is newer that the one we already accepted
> > it has to be accepted as well,
> > +    // but we will return the previously accepted proposal so that
> > the new coordinator will use it instead of
> > +    // its own.
> > +    if (ballot.timestamp() > state._promised_ballot.timestamp()) {
> > +        logger.debug("Promising ballot {}", ballot);
> > +        tracing::trace(tr_state, "Promising ballot {}", ballot);
> > +        if
> > (utils::get_local_injector().enter("paxos_error_before_save_promise")
> > ) {
> > +            co_await
> > coroutine::return_exception(utils::injected_error("injected_error_bef
> > ore_save_promise"));
> > +        }
> > +
> > +        auto [f1, f2] = co_await when_all(
>
> Too bad we don't have a coroutine::all() variant that returns futures.
> It's more efficient.
>

Look at the last patch of the series.

--
Gleb.

Avi Kivity

<avi@scylladb.com>
unread,
Jul 2, 2024, 11:14:12 AMJul 2
to Gleb Natapov, scylladb-dev@googlegroups.com
Nice.

Reply all
Reply to author
Forward
0 new messages