From: Kamil Braun <
kbr...@scylladb.com>
Committer: Kamil Braun <
kbr...@scylladb.com>
Branch: next
test: raft: randomized_nemesis_test: fix `rpc` reply ID generation
When `rpc` wants to perform a two-way RPC call it sends a message
containing a `reply_id`. The other side will send the `reply_id` back
when answering, so the original side can match the response to the promise
corresponding to the future being waited on by the RPC caller.
Previously each instance of `rpc` generated reply IDs independently as
increasing integers starting from 0. The network delivers messages
based on Raft server IDs. A response message may thus be delievered not
to the original instance which invoked the RPC, but to a new instance
which uses the same Raft server ID (after we simulated a server
crash/stop and restart, creating a new server with the same ID that
reuses the previous instance's `persistence` instance but has a new `rpc`).
The new instance could have started a new RPC call using the same
`reply_id` as one currently being in-flight that was started by the
previous instance. The new instance could then receive and handle a
response that was intended for the previous instance, leading to weird
bugs.
Fix this by replacing the local reply ID counters by a global counter so
that every two-way RPC call gets a unique reply ID.
---
diff --git a/test/raft/randomized_nemesis_test.cc b/test/raft/randomized_nemesis_test.cc
--- a/test/raft/randomized_nemesis_test.cc
+++ b/test/raft/randomized_nemesis_test.cc
@@ -419,7 +419,6 @@ class rpc : public raft::rpc {
promise<raft::add_entry_reply>
>;
std::unordered_map<reply_id_t, reply_promise> _reply_promises;
- reply_id_t _counter = 0;
// Used to ensure that when `abort()` returns there are
// no more in-progress methods running on this object.
@@ -439,6 +438,11 @@ class rpc : public raft::rpc {
}
}
+ static reply_id_t new_reply_id() {
+ static size_t counter = 0;
+ return counter++;
+ }
+
public:
rpc(raft::server_id id, snapshots_t<State>& snaps, send_message_t send)
: _id(id), _snapshots(snaps), _send(std::move(send)) {
@@ -610,7 +614,7 @@ class rpc : public raft::rpc {
throw snapshot_not_found{ .id =
ins.snp.id };
}
- auto id = _counter++;
+ auto id = new_reply_id();
promise<raft::snapshot_reply> p;
auto f = p.get_future();
_reply_promises.emplace(id, std::move(p));
@@ -645,7 +649,7 @@ class rpc : public raft::rpc {
virtual future<raft::add_entry_reply> send_add_entry(raft::server_id dst, const raft::command& cmd) override {
co_return co_await with_gate([&] () -> future<raft::add_entry_reply> {
- auto id = _counter++;
+ auto id = new_reply_id();
promise<raft::add_entry_reply> p;
auto f = p.get_future();
_reply_promises.emplace(id, std::move(p));
@@ -670,7 +674,7 @@ class rpc : public raft::rpc {
const std::vector<raft::server_address>& add,
const std::vector<raft::server_id>& del) override {
co_return co_await with_gate([&] () -> future<raft::add_entry_reply> {
- auto id = _counter++;
+ auto id = new_reply_id();
promise<raft::add_entry_reply> p;
auto f = p.get_future();
_reply_promises.emplace(id, std::move(p));
@@ -694,7 +698,7 @@ class rpc : public raft::rpc {
}
virtual future<raft::read_barrier_reply> execute_read_barrier_on_leader(raft::server_id dst) override {
co_return co_await with_gate([&] () -> future<raft::read_barrier_reply> {
- auto id = _counter++;
+ auto id = new_reply_id();
promise<raft::read_barrier_reply> p;
auto f = p.get_future();
_reply_promises.emplace(id, std::move(p));