[COMMIT seastar master] rpc: remove `connection::_server` field

1 view
Skip to first unread message

Commit Bot

<bot@cloudius-systems.com>
unread,
May 25, 2023, 6:29:34 AM5/25/23
to seastar-dev@googlegroups.com, Kamil Braun
From: Kamil Braun <kbr...@scylladb.com>
Committer: Kamil Braun <kbr...@scylladb.com>
Branch: master

rpc: remove `connection::_server` field

After adding `rpc::server& server` to `client_info`,
the `connection::_server` field is redundant. Remove it. Replace usages
with `get_server()` which accesses `_info.server`.

---
diff --git a/include/seastar/rpc/rpc.hh b/include/seastar/rpc/rpc.hh
--- a/include/seastar/rpc/rpc.hh
+++ b/include/seastar/rpc/rpc.hh
@@ -550,7 +550,6 @@ private:

public:
class connection : public rpc::connection, public enable_shared_from_this<connection> {
- server& _server;
client_info _info;
connection_id _parent_id = invalid_connection_id;
std::optional<isolation_config> _isolation_config;
@@ -581,19 +580,22 @@ public:
// Resources will be released when this goes out of scope
future<resource_permit> wait_for_resources(size_t memory_consumed, std::optional<rpc_clock_type::time_point> timeout) {
if (timeout) {
- return get_units(_server._resources_available, memory_consumed, *timeout);
+ return get_units(get_server()._resources_available, memory_consumed, *timeout);
} else {
- return get_units(_server._resources_available, memory_consumed);
+ return get_units(get_server()._resources_available, memory_consumed);
}
}
size_t estimate_request_size(size_t serialized_size) {
- return rpc::estimate_request_size(_server._limits, serialized_size);
+ return rpc::estimate_request_size(get_server()._limits, serialized_size);
}
size_t max_request_size() const {
- return _server._limits.max_memory;
+ return get_server()._limits.max_memory;
}
server& get_server() {
- return _server;
+ return _info.server;
+ }
+ const server& get_server() const {
+ return _info.server;
}
future<> deregister_this_stream();
};
diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc
--- a/src/rpc/rpc.cc
+++ b/src/rpc/rpc.cc
@@ -861,8 +861,8 @@ namespace rpc {
switch (id) {
// supported features go here
case protocol_features::COMPRESS: {
- if (_server._options.compressor_factory) {
- _compressor = _server._options.compressor_factory->negotiate(e.second, true);
+ if (get_server()._options.compressor_factory) {
+ _compressor = get_server()._options.compressor_factory->negotiate(e.second, true);
if (_compressor) {
ret[protocol_features::COMPRESS] = _compressor->name();
}
@@ -874,20 +874,20 @@ namespace rpc {
ret[protocol_features::TIMEOUT] = "";
break;
case protocol_features::STREAM_PARENT: {
- if (!_server._options.streaming_domain) {
+ if (!get_server()._options.streaming_domain) {
f = f.then([] {
return make_exception_future<>(std::runtime_error("streaming is not configured for the server"));
});
} else {
_parent_id = deserialize_connection_id(e.second);
_is_stream = true;
// remove stream connection from rpc connection list
- _server._conns.erase(get_connection_id());
+ get_server()._conns.erase(get_connection_id());
f = f.then([this, c = shared_from_this()] () mutable {
return smp::submit_to(_parent_id.shard(), [this, c = make_foreign(static_pointer_cast<rpc::connection>(c))] () mutable {
- auto sit = _servers.find(*_server._options.streaming_domain);
+ auto sit = _servers.find(*get_server()._options.streaming_domain);
if (sit == _servers.end()) {
- throw std::logic_error(format("Shard {:d} does not have server with streaming domain {}", this_shard_id(), *_server._options.streaming_domain).c_str());
+ throw std::logic_error(format("Shard {:d} does not have server with streaming domain {}", this_shard_id(), *get_server()._options.streaming_domain).c_str());
}
auto s = sit->second;
auto it = s->_conns.find(_parent_id);
@@ -918,7 +918,7 @@ namespace rpc {

auto visitor = isolation_function_visitor(isolation_cookie);
f = f.then([visitor = std::move(visitor), this] () mutable {
- return std::visit(visitor, _server._limits.isolate_connection).then([this] (isolation_config conf) {
+ return std::visit(visitor, get_server()._limits.isolate_connection).then([this] (isolation_config conf) {
_isolation_config = conf;
});
});
@@ -930,7 +930,7 @@ namespace rpc {
;
}
}
- if (_server._options.streaming_domain) {
+ if (get_server()._options.streaming_domain) {
ret[protocol_features::CONNECTION_ID] = serialize_connection_id(_id);
}
return f.then([ret = std::move(ret)] {
@@ -1017,7 +1017,7 @@ future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_typ
try {
// Send asynchronously.
// This is safe since connection::stop() will wait for background work.
- (void)with_gate(_server._reply_gate, [this, timeout, msg_id, data = std::move(data), permit = std::move(permit)] () mutable {
+ (void)with_gate(get_server()._reply_gate, [this, timeout, msg_id, data = std::move(data), permit = std::move(permit)] () mutable {
// workaround for https://gcc.gnu.org/bugzilla/show_bug.cgi?id=83268
auto c = shared_from_this();
return respond(-msg_id, std::move(data), timeout).then([c = std::move(c), permit = std::move(permit)] {});
@@ -1048,7 +1048,7 @@ future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_typ
if (expire && *expire) {
timeout = relative_timeout_to_absolute(std::chrono::milliseconds(*expire));
}
- auto h = _server._proto->get_handler(type);
+ auto h = get_server()._proto->get_handler(type);
if (!h) {
return send_unknown_verb_reply(timeout, msg_id, type);
}
@@ -1059,7 +1059,7 @@ future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_typ
return with_scheduling_group(sg, [this, timeout, msg_id, h, data = std::move(data.value())] () mutable {
return h->func(shared_from_this(), timeout, msg_id, std::move(data)).finally([this, h] {
// If anything between get_handler() and here throws, we leak put_handler
- _server._proto->put_handler(h);
+ get_server()._proto->put_handler(h);
});
});
}
@@ -1078,7 +1078,7 @@ future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_typ
_stream_queue.abort(std::make_exception_ptr(stream_closed()));
return stop_send_loop(ep).then_wrapped([this] (future<> f) {
f.ignore_ready_future();
- _server._conns.erase(get_connection_id());
+ get_server()._conns.erase(get_connection_id());
if (is_stream()) {
return deregister_this_stream();
} else {
@@ -1094,16 +1094,15 @@ future<> server::connection::send_unknown_verb_reply(std::optional<rpc_clock_typ

server::connection::connection(server& s, connected_socket&& fd, socket_address&& addr, const logger& l, void* serializer, connection_id id)
: rpc::connection(std::move(fd), l, serializer, id)
- , _server(s)
, _info{.addr{std::move(addr)}, .server{s}, .conn_id{id}} {
}

future<> server::connection::deregister_this_stream() {
- if (!_server._options.streaming_domain) {
+ if (!get_server()._options.streaming_domain) {
return make_ready_future<>();
}
return smp::submit_to(_parent_id.shard(), [this] () mutable {
- auto sit = server::_servers.find(*_server._options.streaming_domain);
+ auto sit = server::_servers.find(*get_server()._options.streaming_domain);
if (sit != server::_servers.end()) {
auto s = sit->second;
auto it = s->_conns.find(_parent_id);
Reply all
Reply to author
Forward
0 new messages