[PATCH 01/12] alternator: Keep storage_proxy on server

0 views
Skip to first unread message

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:23 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
It's already available on controller and will be needed by
API handlers in the next patch.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
alternator/server.hh | 3 ++-
alternator/controller.cc | 2 +-
alternator/server.cc | 4 +++-
3 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/alternator/server.hh b/alternator/server.hh
index a13660eb0d..5a7094b3c0 100644
--- a/alternator/server.hh
+++ b/alternator/server.hh
@@ -46,6 +46,7 @@ class server {
http_server _https_server;
executor& _executor;
cql3::query_processor& _qp;
+ service::storage_proxy& _proxy;

key_cache _key_cache;
bool _enforce_authorization;
@@ -77,7 +78,7 @@ class server {
json_parser _json_parser;

public:
- server(executor& executor, cql3::query_processor& qp);
+ server(executor& executor, cql3::query_processor& qp, service::storage_proxy& proxy);

future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
bool enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
diff --git a/alternator/controller.cc b/alternator/controller.cc
index a8b5020b8e..fc8d14312d 100644
--- a/alternator/controller.cc
+++ b/alternator/controller.cc
@@ -85,7 +85,7 @@ future<> controller::start() {
auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); };

_executor.start(std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value()).get();
- _server.start(std::ref(_executor), std::ref(_qp)).get();
+ _server.start(std::ref(_executor), std::ref(_qp), std::ref(_proxy)).get();
std::optional<uint16_t> alternator_port;
if (_config.alternator_port()) {
alternator_port = _config.alternator_port();
diff --git a/alternator/server.cc b/alternator/server.cc
index 1403c27147..ef7ef7ae11 100644
--- a/alternator/server.cc
+++ b/alternator/server.cc
@@ -433,11 +433,12 @@ void server::set_routes(routes& r) {
//FIXME: A way to immediately invalidate the cache should be considered,
// e.g. when the system table which stores the keys is changed.
// For now, this propagation may take up to 1 minute.
-server::server(executor& exec, cql3::query_processor& qp)
+server::server(executor& exec, cql3::query_processor& qp, service::storage_proxy& proxy)
: _http_server("http-alternator")
, _https_server("https-alternator")
, _executor(exec)
, _qp(qp)
+ , _proxy(proxy)
, _key_cache(1024, 1min, slogger)
, _enforce_authorization(false)
, _enabled_servers{}
@@ -507,6 +508,7 @@ server::server(executor& exec, cql3::query_processor& qp)
return e.get_records(client_state, std::move(trace_state), std::move(permit), std::move(json_request));
}},
} {
+ (void)_proxy; // temporary
}

future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
--
2.20.1

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:23 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
There are few places that call global storage service, but all
are easily fixable without significant changes.

1. alternator -- needs token metadata, switch to using proxy
2. api -- calls methods from storage service, all handlers are
registered in main and can capture storage service from there
3. thrift -- calls methods from storage service, can carry the
reference via controller
4. view -- needs tokens, switch to using (global) proxy
5. storage_service -- (surprisingly) can use "this"

branch: https://github.com/xemul/scylla/tree/br-unglobal-storage-service
tests: unit(dev), dtest(simple_boot_shutdown, dev)

Pavel Emelyanov (12):
alternator: Keep storage_proxy on server
alternator: Take token metadata from server's storage_proxy
api: Carry sharded<storage_service>& down to some handlers
api: Capture and use sharded<storage_service>& in handlers
thrift: Carry sharded<storage_service>& down to handler
thrift: Use local storage service in handlers
view: Use proxy to get token metadata from
storage_service: Capture this when appropriate
storage_service: Unmark update_topology static
storage_service: Use container() in run_with(out)_api_lock
storage_service: Remove (de)?init_storage_service()
storage_service: Make it local

alternator/server.hh | 3 +-
api/api_init.hh | 5 +-
api/storage_proxy.hh | 5 +-
api/storage_service.hh | 2 +-
service/storage_service.hh | 34 +---------
thrift/controller.hh | 4 +-
thrift/handler.hh | 3 +-
thrift/server.hh | 4 +-
alternator/controller.cc | 2 +-
alternator/server.cc | 18 +++--
api/api.cc | 12 ++--
api/storage_proxy.cc | 6 +-
api/storage_service.cc | 131 ++++++++++++++++++-------------------
db/view/view.cc | 3 +-
main.cc | 20 +++---
service/storage_service.cc | 39 ++---------
test/boost/gossip_test.cc | 5 +-
test/lib/cql_test_env.cc | 6 +-
thrift/controller.cc | 9 +--
thrift/handler.cc | 27 ++++----
thrift/server.cc | 3 +-
21 files changed, 156 insertions(+), 185 deletions(-)

--
2.20.1

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:23 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
There's a local_nodelist_handler serving API requests that calls
for global storage service to get token metadata from. Now it
can get storage proxy reference from server upon construction
and use it for tokens.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
alternator/server.cc | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)

diff --git a/alternator/server.cc b/alternator/server.cc
index ef7ef7ae11..4069759ce8 100644
--- a/alternator/server.cc
+++ b/alternator/server.cc
@@ -30,9 +30,12 @@
#include "utils/rjson.hh"
#include "auth.hh"
#include <cctype>
+#include "service/storage_proxy.hh"
#include "cql3/query_processor.hh"
-#include "service/storage_service.hh"
+#include "locator/snitch_base.hh"
+#include "gms/gossiper.hh"
#include "utils/overloaded_functor.hh"
+#include "utils/fb_utilities.hh"

static logging::logger slogger("alternator-server");

@@ -191,8 +194,11 @@ class health_handler : public gated_handler {
};

class local_nodelist_handler : public gated_handler {
+ service::storage_proxy& _proxy;
public:
- local_nodelist_handler(seastar::gate& pending_requests) : gated_handler(pending_requests) {}
+ local_nodelist_handler(seastar::gate& pending_requests, service::storage_proxy& proxy)
+ : gated_handler(pending_requests)
+ , _proxy(proxy) {}
protected:
virtual future<std::unique_ptr<reply>> do_handle(const sstring& path, std::unique_ptr<request> req, std::unique_ptr<reply> rep) override {
rjson::value results = rjson::empty_array();
@@ -202,8 +208,7 @@ class local_nodelist_handler : public gated_handler {
sstring local_dc = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(
utils::fb_utilities::get_broadcast_address());
std::unordered_set<gms::inet_address> local_dc_nodes =
- service::get_local_storage_service().get_token_metadata().
- get_topology().get_datacenter_endpoints().at(local_dc);
+ _proxy.get_token_metadata_ptr()->get_topology().get_datacenter_endpoints().at(local_dc);
for (auto& ip : local_dc_nodes) {
if (gms::get_local_gossiper().is_alive(ip)) {
rjson::push_back(results, rjson::from_string(ip.to_sstring()));
@@ -426,7 +431,7 @@ void server::set_routes(routes& r) {
// consider this to be a security risk, because an attacker can already
// scan an entire subnet for nodes responding to the health request,
// or even just scan for open ports.
- r.put(operation_type::GET, "/localnodes", new local_nodelist_handler(_pending_requests));
+ r.put(operation_type::GET, "/localnodes", new local_nodelist_handler(_pending_requests, _proxy));
r.put(operation_type::OPTIONS, "/", new options_handler(_pending_requests));
}

@@ -508,7 +513,6 @@ server::server(executor& exec, cql3::query_processor& qp, service::storage_proxy
return e.get_records(client_state, std::move(trace_state), std::move(permit), std::move(json_request));
}},
} {
- (void)_proxy; // temporary

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:24 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
Both set_server_storage_service and set_server_storage_proxy set up
API handlers that need storage service to work. Now they all call for
global storage service instance, but it's better if they receive one
from main. This patch carries the sharded storage service reference
down to handlers setting function, next patch will make use of it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
api/api_init.hh | 5 +++--
api/storage_proxy.hh | 5 ++++-
api/storage_service.hh | 2 +-
api/api.cc | 12 ++++++++----
api/storage_proxy.cc | 2 +-
api/storage_service.cc | 2 +-
main.cc | 4 ++--
7 files changed, 20 insertions(+), 12 deletions(-)

diff --git a/api/api_init.hh b/api/api_init.hh
index ecba22f17f..ecda9c6626 100644
--- a/api/api_init.hh
+++ b/api/api_init.hh
@@ -30,6 +30,7 @@ namespace service {

class load_meter;
class storage_proxy;
+class storage_service;

} // namespace service

@@ -69,7 +70,7 @@ struct http_context {
future<> set_server_init(http_context& ctx);
future<> set_server_config(http_context& ctx);
future<> set_server_snitch(http_context& ctx);
-future<> set_server_storage_service(http_context& ctx);
+future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss);
future<> set_server_repair(http_context& ctx, sharded<repair_service>& repair);
future<> unset_server_repair(http_context& ctx);
future<> set_transport_controller(http_context& ctx, cql_transport::controller& ctl);
@@ -82,7 +83,7 @@ future<> set_server_gossip(http_context& ctx);
future<> set_server_load_sstable(http_context& ctx);
future<> set_server_messaging_service(http_context& ctx, sharded<netw::messaging_service>& ms);
future<> unset_server_messaging_service(http_context& ctx);
-future<> set_server_storage_proxy(http_context& ctx);
+future<> set_server_storage_proxy(http_context& ctx, sharded<service::storage_service>& ss);
future<> set_server_stream_manager(http_context& ctx);
future<> set_server_gossip_settle(http_context& ctx);
future<> set_server_cache(http_context& ctx);
diff --git a/api/storage_proxy.hh b/api/storage_proxy.hh
index de9a9cf8d3..a3de5d571c 100644
--- a/api/storage_proxy.hh
+++ b/api/storage_proxy.hh
@@ -21,10 +21,13 @@

#pragma once

+#include <seastar/core/sharded.hh>
#include "api.hh"

+namespace service { class storage_service; }
+
namespace api {

-void set_storage_proxy(http_context& ctx, routes& r);
+void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_service>& ss);

}
diff --git a/api/storage_service.hh b/api/storage_service.hh
index 89e2f89f4f..aec3d76bdd 100644
--- a/api/storage_service.hh
+++ b/api/storage_service.hh
@@ -33,7 +33,7 @@ class repair_service;

namespace api {

-void set_storage_service(http_context& ctx, routes& r);
+void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss);
void set_repair(http_context& ctx, routes& r, sharded<repair_service>& repair);
void unset_repair(http_context& ctx, routes& r);
void set_transport_controller(http_context& ctx, routes& r, cql_transport::controller& ctl);
diff --git a/api/api.cc b/api/api.cc
index bef2a736e3..8e45de6513 100644
--- a/api/api.cc
+++ b/api/api.cc
@@ -109,8 +109,10 @@ future<> unset_rpc_controller(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_rpc_controller(ctx, r); });
}

-future<> set_server_storage_service(http_context& ctx) {
- return register_api(ctx, "storage_service", "The storage service API", set_storage_service);
+future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss) {
+ return register_api(ctx, "storage_service", "The storage service API", [&ss] (http_context& ctx, routes& r) {
+ set_storage_service(ctx, r, ss);
+ });
}

future<> set_server_repair(http_context& ctx, sharded<repair_service>& repair) {
@@ -153,9 +155,11 @@ future<> unset_server_messaging_service(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_messaging_service(ctx, r); });
}

-future<> set_server_storage_proxy(http_context& ctx) {
+future<> set_server_storage_proxy(http_context& ctx, sharded<service::storage_service>& ss) {
return register_api(ctx, "storage_proxy",
- "The storage proxy API", set_storage_proxy);
+ "The storage proxy API", [&ss] (http_context& ctx, routes& r) {
+ set_storage_proxy(ctx, r, ss);
+ });
}

future<> set_server_stream_manager(http_context& ctx) {
diff --git a/api/storage_proxy.cc b/api/storage_proxy.cc
index 0bd3b17305..ec6fa46b07 100644
--- a/api/storage_proxy.cc
+++ b/api/storage_proxy.cc
@@ -193,7 +193,7 @@ sum_timer_stats_storage_proxy(distributed<proxy>& d,
});
}

-void set_storage_proxy(http_context& ctx, routes& r) {
+void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_service>& ss) {
sp::get_total_hints.set(r, [](std::unique_ptr<request> req) {
//TBD
unimplemented();
diff --git a/api/storage_service.cc b/api/storage_service.cc
index 46a6623a29..039286ea9f 100644
--- a/api/storage_service.cc
+++ b/api/storage_service.cc
@@ -294,7 +294,7 @@ void unset_repair(http_context& ctx, routes& r) {
ss::force_terminate_all_repair_sessions_new.unset(r);
}

-void set_storage_service(http_context& ctx, routes& r) {
+void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss) {
ss::local_hostid.set(r, [](std::unique_ptr<request> req) {
return db::system_keyspace::get_local_host_id().then([](const utils::UUID& id) {
return make_ready_future<json::json_return_type>(id.to_sstring());
diff --git a/main.cc b/main.cc
index 9c102639f4..831b50c37e 100644
--- a/main.cc
+++ b/main.cc
@@ -1097,7 +1097,7 @@ int main(int ac, char** av) {
}).get();
api::set_server_gossip(ctx).get();
api::set_server_snitch(ctx).get();
- api::set_server_storage_proxy(ctx).get();
+ api::set_server_storage_proxy(ctx, ss).get();
api::set_server_load_sstable(ctx).get();
static seastar::sharded<memory_threshold_guard> mtg;
//FIXME: discarded future
@@ -1191,7 +1191,7 @@ int main(int ac, char** av) {
auto stop_messaging_api = defer_verbose_shutdown("messaging service API", [&ctx] {
api::unset_server_messaging_service(ctx).get();
});
- api::set_server_storage_service(ctx).get();
+ api::set_server_storage_service(ctx, ss).get();
api::set_server_repair(ctx, repair).get();
auto stop_repair_api = defer_verbose_shutdown("repair API", [&ctx] {
api::unset_server_repair(ctx).get();
--
2.20.1

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:25 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
The thrift_handler class' methods need storage service. This
patch makes sure this class has sharded storage service
reference on board.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
thrift/controller.hh | 4 +++-
thrift/handler.hh | 3 ++-
thrift/server.hh | 4 +++-
main.cc | 2 +-
thrift/controller.cc | 9 +++++----
thrift/handler.cc | 17 +++++++++++------
thrift/server.cc | 3 ++-
7 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/thrift/controller.hh b/thrift/controller.hh
index feff7b5305..6626c2d514 100644
--- a/thrift/controller.hh
+++ b/thrift/controller.hh
@@ -32,6 +32,7 @@ class thrift_server;
class database;
namespace auth { class service; }
namespace cql3 { class query_processor; }
+namespace service { class storage_service; }

class thrift_controller {
std::unique_ptr<distributed<thrift_server>> _server;
@@ -42,12 +43,13 @@ class thrift_controller {
sharded<auth::service>& _auth_service;
sharded<cql3::query_processor>& _qp;
sharded<service::memory_limiter>& _mem_limiter;
+ sharded<service::storage_service>& _ss;

future<> do_start_server();
future<> do_stop_server();

public:
- thrift_controller(distributed<database>&, sharded<auth::service>&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&);
+ thrift_controller(distributed<database>&, sharded<auth::service>&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&, sharded<service::storage_service>& ss);
future<> start_server();
future<> stop_server();
future<> stop();
diff --git a/thrift/handler.hh b/thrift/handler.hh
index 0340000505..f36904efb2 100644
--- a/thrift/handler.hh
+++ b/thrift/handler.hh
@@ -31,7 +31,8 @@

struct timeout_config;
class service_permit;
+namespace service { class storage_service; }

-std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, timeout_config, service_permit& current_permit);
+std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, auth::service&, timeout_config, service_permit& current_permit);

#endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */
diff --git a/thrift/server.hh b/thrift/server.hh
index 3cccf7e657..4e4ed47ec2 100644
--- a/thrift/server.hh
+++ b/thrift/server.hh
@@ -76,6 +76,8 @@ namespace auth {
class service;
}

+namespace service { class storage_service; }
+
struct thrift_server_config {
::timeout_config timeout_config;
uint64_t max_request_size;
@@ -127,7 +129,7 @@ class thrift_server {
boost::intrusive::list<connection> _connections_list;
seastar::gate _stop_gate;
public:
- thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, service::memory_limiter& ml, thrift_server_config config);
+ thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, auth::service&, service::memory_limiter& ml, thrift_server_config config);
~thrift_server();
future<> listen(socket_address addr, bool keepalive);
future<> stop();
diff --git a/main.cc b/main.cc
index 831b50c37e..d9796e86f0 100644
--- a/main.cc
+++ b/main.cc
@@ -1372,7 +1372,7 @@ int main(int ac, char** av) {
api::unset_transport_controller(ctx).get();
});

- ::thrift_controller thrift_ctl(db, auth_service, qp, service_memory_limiter);
+ ::thrift_controller thrift_ctl(db, auth_service, qp, service_memory_limiter, ss);

ss.local().register_client_shutdown_hook("rpc server", [&thrift_ctl] {
thrift_ctl.stop().get();
diff --git a/thrift/controller.cc b/thrift/controller.cc
index 0f1f2f840e..28da121b2e 100644
--- a/thrift/controller.cc
+++ b/thrift/controller.cc
@@ -27,13 +27,14 @@

static logging::logger clogger("thrift_controller");

-thrift_controller::thrift_controller(distributed<database>& db, sharded<auth::service>& auth, sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml)
+thrift_controller::thrift_controller(distributed<database>& db, sharded<auth::service>& auth, sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml, sharded<service::storage_service>& ss)
: _ops_sem(1)
, _db(db)
, _auth_service(auth)
, _qp(qp)
- , _mem_limiter(ml) {
-}
+ , _mem_limiter(ml)
+ , _ss(ss)
+{ }

future<> thrift_controller::start_server() {
return smp::submit_to(0, [this] {
@@ -63,7 +64,7 @@ future<> thrift_controller::do_start_server() {
tsc.timeout_config = make_timeout_config(cfg);
tsc.max_request_size = cfg.thrift_max_message_length_in_mb() * (uint64_t(1) << 20);
return gms::inet_address::lookup(addr, family, preferred).then([this, tserver, addr, port, keepalive, tsc] (gms::inet_address ip) {
- return tserver->start(std::ref(_db), std::ref(_qp), std::ref(_auth_service), std::ref(_mem_limiter), tsc).then([tserver, port, addr, ip, keepalive] {
+ return tserver->start(std::ref(_db), std::ref(_qp), std::ref(_ss), std::ref(_auth_service), std::ref(_mem_limiter), tsc).then([tserver, port, addr, ip, keepalive] {
// #293 - do not stop anything
//engine().at_exit([tserver] {
// return tserver->stop();
diff --git a/thrift/handler.cc b/thrift/handler.cc
index 3eac70c14d..d756423528 100644
--- a/thrift/handler.cc
+++ b/thrift/handler.cc
@@ -203,6 +203,7 @@ enum class query_order { no, yes };
class thrift_handler : public CassandraCobSvIf {
distributed<database>& _db;
distributed<cql3::query_processor>& _query_processor;
+ sharded<service::storage_service>& _ss;
::timeout_config _timeout_config;
service::client_state _client_state;
service::query_state _query_state;
@@ -220,9 +221,10 @@ class thrift_handler : public CassandraCobSvIf {
});
}
public:
- explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit)
+ explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit)
: _db(db)
, _query_processor(qp)
+ , _ss(ss)
, _timeout_config(timeout_config)
, _client_state(service::client_state::external_tag{}, auth_service, nullptr, _timeout_config, socket_address(), true)
// FIXME: Handlers are not created per query, but rather per connection, so it makes little sense to store
@@ -230,7 +232,7 @@ class thrift_handler : public CassandraCobSvIf {
// for CQL queries which piggy-back on Thrift protocol.
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
, _current_permit(current_permit)
- { }
+ { (void)_ss; /* temporary */ }

const sstring& current_keyspace() const {
return _query_state.get_client_state().get_raw_keyspace();
@@ -2018,19 +2020,21 @@ class thrift_handler : public CassandraCobSvIf {
class handler_factory : public CassandraCobSvIfFactory {
distributed<database>& _db;
distributed<cql3::query_processor>& _query_processor;
+ sharded<service::storage_service>& _ss;
auth::service& _auth_service;
timeout_config _timeout_config;
service_permit& _current_permit;
public:
explicit handler_factory(distributed<database>& db,
distributed<cql3::query_processor>& qp,
+ sharded<service::storage_service>& ss,
auth::service& auth_service,
::timeout_config timeout_config,
service_permit& current_permit)
- : _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config), _current_permit(current_permit) {}
+ : _db(db), _query_processor(qp), _ss(ss), _auth_service(auth_service), _timeout_config(timeout_config), _current_permit(current_permit) {}
typedef CassandraCobSvIf Handler;
virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) {
- return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config, _current_permit);
+ return new thrift_handler(_db, _query_processor, _ss, _auth_service, _timeout_config, _current_permit);
}
virtual void releaseHandler(CassandraCobSvIf* handler) {
delete handler;
@@ -2038,7 +2042,8 @@ class handler_factory : public CassandraCobSvIfFactory {
};

std::unique_ptr<CassandraCobSvIfFactory>
-create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service,
+create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp,
+ sharded<service::storage_service>& ss, auth::service& auth_service,
::timeout_config timeout_config, service_permit& current_permit) {
- return std::make_unique<handler_factory>(db, qp, auth_service, timeout_config, current_permit);
+ return std::make_unique<handler_factory>(db, qp, ss, auth_service, timeout_config, current_permit);
}
diff --git a/thrift/server.cc b/thrift/server.cc
index 0a89e741dc..e3f51a8953 100644
--- a/thrift/server.cc
+++ b/thrift/server.cc
@@ -68,11 +68,12 @@ class thrift_stats {

thrift_server::thrift_server(distributed<database>& db,
distributed<cql3::query_processor>& qp,
+ sharded<service::storage_service>& ss,
auth::service& auth_service,
service::memory_limiter& ml,
thrift_server_config config)
: _stats(new thrift_stats(*this))
- , _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config, _current_permit).release())
+ , _handler_factory(create_handler_factory(db, qp, ss, auth_service, config.timeout_config, _current_permit).release())
, _protocol_factory(new TBinaryProtocolFactoryT<TMemoryBuffer>())
, _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory))
, _memory_available(ml.get_semaphore())
--
2.20.1

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:25 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
thrift/handler.cc | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/thrift/handler.cc b/thrift/handler.cc
index d756423528..0415bb95c9 100644
--- a/thrift/handler.cc
+++ b/thrift/handler.cc
@@ -232,7 +232,7 @@ class thrift_handler : public CassandraCobSvIf {
// for CQL queries which piggy-back on Thrift protocol.
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
, _current_permit(current_permit)
- { (void)_ss; /* temporary */ }
+ { }

const sstring& current_keyspace() const {
return _query_state.get_client_state().get_raw_keyspace();
@@ -719,8 +719,8 @@ class thrift_handler : public CassandraCobSvIf {

void describe_schema_versions(thrift_fn::function<void(std::map<std::string, std::vector<std::string> > const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
service_permit permit = obtain_permit();
- with_cob(std::move(cob), std::move(exn_cob), [] {
- return service::get_local_storage_service().describe_schema_versions().then([](auto&& m) {
+ with_cob(std::move(cob), std::move(exn_cob), [this] {
+ return _ss.local().describe_schema_versions().then([](auto&& m) {
std::map<std::string, std::vector<std::string>> ret;
for (auto&& p : m) {
ret[p.first] = std::vector<std::string>(p.second.begin(), p.second.end());
@@ -760,7 +760,7 @@ class thrift_handler : public CassandraCobSvIf {
throw make_exception<InvalidRequestException>("There is no ring for the keyspace: %s", keyspace);
}

- auto ring = service::get_local_storage_service().describe_ring(keyspace, local);
+ auto ring = _ss.local().describe_ring(keyspace, local);
std::vector<TokenRange> ret;
ret.reserve(ring.size());
std::transform(ring.begin(), ring.end(), std::back_inserter(ret), [](auto&& tr) {
@@ -794,8 +794,8 @@ class thrift_handler : public CassandraCobSvIf {

void describe_token_map(thrift_fn::function<void(std::map<std::string, std::string> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
service_permit permit = obtain_permit();
- with_cob(std::move(cob), std::move(exn_cob), [] {
- auto m = service::get_local_storage_service().get_token_to_endpoint_map();
+ with_cob(std::move(cob), std::move(exn_cob), [this] {
+ auto m = _ss.local().get_token_to_endpoint_map();
std::map<std::string, std::string> ret;
for (auto&& p : m) {
ret[format("{}", p.first)] = p.second.to_sstring();
@@ -851,7 +851,7 @@ class thrift_handler : public CassandraCobSvIf {
auto tend = end_token.empty() ? dht::maximum_token() : dht::token::from_sstring(sstring(end_token));
range<dht::token> r({{ std::move(tstart), false }}, {{ std::move(tend), true }});
auto cf = sstring(cfName);
- auto splits = service::get_local_storage_service().get_splits(current_keyspace(), cf, std::move(r), keys_per_split);
+ auto splits = _ss.local().get_splits(current_keyspace(), cf, std::move(r), keys_per_split);

std::vector<CfSplit> res;
for (auto&& s : splits) {
--
2.20.1

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:25 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
The reference in question is already there, handlers that need
storage service can capture it and use. These handlers are not
yet stopped, but neither is the storage service itself, so the
potentially dangling reference is not being set up here.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
api/storage_proxy.cc | 4 +-
api/storage_service.cc | 129 ++++++++++++++++++++---------------------
2 files changed, 66 insertions(+), 67 deletions(-)

diff --git a/api/storage_proxy.cc b/api/storage_proxy.cc
index ec6fa46b07..28345ad40d 100644
--- a/api/storage_proxy.cc
+++ b/api/storage_proxy.cc
@@ -363,8 +363,8 @@ void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_se
return sum_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read_repair_repaired_background);
});

- sp::get_schema_versions.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().describe_schema_versions().then([] (auto result) {
+ sp::get_schema_versions.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().describe_schema_versions().then([] (auto result) {
std::vector<sp::mapper_list> res;
for (auto e : result) {
sp::mapper_list entry;
diff --git a/api/storage_service.cc b/api/storage_service.cc
index 039286ea9f..52ebed73c1 100644
--- a/api/storage_service.cc
+++ b/api/storage_service.cc
@@ -134,12 +134,12 @@ seastar::future<json::json_return_type> run_toppartitions_query(db::toppartition
});
}

-future<json::json_return_type> set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector<sstring> tables, bool enabled) {
+future<json::json_return_type> set_tables_autocompaction(http_context& ctx, service::storage_service& ss, const sstring &keyspace, std::vector<sstring> tables, bool enabled) {
if (tables.empty()) {
tables = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
}

- return service::get_local_storage_service().set_tables_autocompaction(keyspace, tables, enabled).then([]{
+ return ss.set_tables_autocompaction(keyspace, tables, enabled).then([]{
return make_ready_future<json::json_return_type>(json_void());
});
}
@@ -318,8 +318,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return ctx.db.local().commitlog()->active_config().commit_log_location;
});

- ss::get_token_endpoint.set(r, [] (std::unique_ptr<request> req) {
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().get_token_to_endpoint_map(), [](const auto& i) {
+ ss::get_token_endpoint.set(r, [&ss] (std::unique_ptr<request> req) {
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().get_token_to_endpoint_map(), [](const auto& i) {
storage_service_json::mapper val;
val.key = boost::lexical_cast<std::string>(i.first);
val.value = boost::lexical_cast<std::string>(i.second);
@@ -395,15 +395,15 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return container_to_vec(addr);
});

- ss::get_release_version.set(r, [](const_req req) {
- return service::get_local_storage_service().get_release_version();
+ ss::get_release_version.set(r, [&ss](const_req req) {
+ return ss.local().get_release_version();
});

ss::get_scylla_release_version.set(r, [](const_req req) {
return scylla_version();
});
- ss::get_schema_version.set(r, [](const_req req) {
- return service::get_local_storage_service().get_schema_version();
+ ss::get_schema_version.set(r, [&ss](const_req req) {
+ return ss.local().get_schema_version();
});

ss::get_all_data_file_locations.set(r, [&ctx](const_req req) {
@@ -414,10 +414,10 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return ctx.db.local().get_config().saved_caches_directory();
});

- ss::get_range_to_endpoint_map.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::get_range_to_endpoint_map.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
std::vector<ss::maplist_mapper> res;
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().get_range_to_address_map(keyspace),
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().get_range_to_address_map(keyspace),
[](const std::pair<dht::token_range, inet_address_vector_replica_set>& entry){
ss::maplist_mapper m;
if (entry.first.start()) {
@@ -445,13 +445,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(res);
});

- ss::describe_any_ring.set(r, [&ctx](std::unique_ptr<request> req) {
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(""), token_range_endpoints_to_json));
+ ss::describe_any_ring.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().describe_ring(""), token_range_endpoints_to_json));
});

- ss::describe_ring.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::describe_ring.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(keyspace), token_range_endpoints_to_json));
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().describe_ring(keyspace), token_range_endpoints_to_json));
});

ss::get_host_id_map.set(r, [&ctx](const_req req) {
@@ -483,14 +483,14 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});

- ss::get_natural_endpoints.set(r, [&ctx](const_req req) {
+ ss::get_natural_endpoints.set(r, [&ctx, &ss](const_req req) {
auto keyspace = validate_keyspace(ctx, req.param);
- return container_to_vec(service::get_local_storage_service().get_natural_endpoints(keyspace, req.get_query_param("cf"),
+ return container_to_vec(ss.local().get_natural_endpoints(keyspace, req.get_query_param("cf"),
req.get_query_param("key")));
});

- ss::cdc_streams_check_and_repair.set(r, [&ctx] (std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_cdc_generation_service().check_and_repair_cdc_streams().then([] {
+ ss::cdc_streams_check_and_repair.set(r, [&ctx, &ss] (std::unique_ptr<request> req) {
+ return ss.local().get_cdc_generation_service().check_and_repair_cdc_streams().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -514,13 +514,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});

- ss::force_keyspace_cleanup.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::force_keyspace_cleanup.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto column_families = split_cf(req->get_query_param("cf"));
if (column_families.empty()) {
column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
}
- return service::get_local_storage_service().is_cleanup_allowed(keyspace).then([&ctx, keyspace,
+ return ss.local().is_cleanup_allowed(keyspace).then([&ctx, keyspace,
column_families = std::move(column_families)] (bool is_cleanup_allowed) mutable {
if (!is_cleanup_allowed) {
return make_exception_future<json::json_return_type>(
@@ -584,20 +584,20 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});


- ss::decommission.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().decommission().then([] {
+ ss::decommission.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().decommission().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

- ss::move.set(r, [] (std::unique_ptr<request> req) {
+ ss::move.set(r, [&ss] (std::unique_ptr<request> req) {
auto new_token = req->get_query_param("new_token");
- return service::get_local_storage_service().move(new_token).then([] {
+ return ss.local().move(new_token).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

- ss::remove_node.set(r, [](std::unique_ptr<request> req) {
+ ss::remove_node.set(r, [&ss](std::unique_ptr<request> req) {
auto host_id = req->get_query_param("host_id");
std::vector<sstring> ignore_nodes_strs= split(req->get_query_param("ignore_nodes"), ",");
auto ignore_nodes = std::list<gms::inet_address>();
@@ -614,19 +614,19 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
}
}
- return service::get_local_storage_service().removenode(host_id, std::move(ignore_nodes)).then([] {
+ return ss.local().removenode(host_id, std::move(ignore_nodes)).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

- ss::get_removal_status.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_removal_status().then([] (auto status) {
+ ss::get_removal_status.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().get_removal_status().then([] (auto status) {
return make_ready_future<json::json_return_type>(status);
});
});

- ss::force_remove_completion.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().force_remove_completion().then([] {
+ ss::force_remove_completion.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().force_remove_completion().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -650,20 +650,20 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(res);
});

- ss::get_operation_mode.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_operation_mode().then([] (auto mode) {
+ ss::get_operation_mode.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().get_operation_mode().then([] (auto mode) {
return make_ready_future<json::json_return_type>(mode);
});
});

- ss::is_starting.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().is_starting().then([] (auto starting) {
+ ss::is_starting.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().is_starting().then([] (auto starting) {
return make_ready_future<json::json_return_type>(starting);
});
});

- ss::get_drain_progress.set(r, [](std::unique_ptr<request> req) {
- return service::get_storage_service().map_reduce(adder<service::storage_service::drain_progress>(), [] (auto& ss) {
+ ss::get_drain_progress.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.map_reduce(adder<service::storage_service::drain_progress>(), [] (auto& ss) {
return ss.get_drain_progress();
}).then([] (auto&& progress) {
auto progress_str = format("Drained {}/{} ColumnFamilies", progress.remaining_cfs, progress.total_cfs);
@@ -671,8 +671,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});

- ss::drain.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().drain().then([] {
+ ss::drain.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().drain().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -703,20 +703,20 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});

- ss::stop_gossiping.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().stop_gossiping().then([] {
+ ss::stop_gossiping.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().stop_gossiping().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

- ss::start_gossiping.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().start_gossiping().then([] {
+ ss::start_gossiping.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().start_gossiping().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

- ss::is_gossip_running.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().is_gossip_running().then([] (bool running){
+ ss::is_gossip_running.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().is_gossip_running().then([] (bool running){
return make_ready_future<json::json_return_type>(running);
});
});
@@ -728,8 +728,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(json_void());
});

- ss::is_initialized.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().is_initialized().then([] (bool initialized) {
+ ss::is_initialized.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().is_initialized().then([] (bool initialized) {
return make_ready_future<json::json_return_type>(initialized);
});
});
@@ -738,8 +738,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(json_void());
});

- ss::is_joined.set(r, [] (std::unique_ptr<request> req) {
- return make_ready_future<json::json_return_type>(service::get_local_storage_service().is_joined());
+ ss::is_joined.set(r, [&ss] (std::unique_ptr<request> req) {
+ return make_ready_future<json::json_return_type>(ss.local().is_joined());
});

ss::set_stream_throughput_mb_per_sec.set(r, [](std::unique_ptr<request> req) {
@@ -804,9 +804,9 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});

- ss::rebuild.set(r, [](std::unique_ptr<request> req) {
+ ss::rebuild.set(r, [&ss](std::unique_ptr<request> req) {
auto source_dc = req->get_query_param("source_dc");
- return service::get_local_storage_service().rebuild(std::move(source_dc)).then([] {
+ return ss.local().rebuild(std::move(source_dc)).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -831,7 +831,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(json_void());
});

- ss::load_new_ss_tables.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::load_new_ss_tables.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto ks = validate_keyspace(ctx, req->param);
auto cf = req->get_query_param("cf");
auto stream = req->get_query_param("load_and_stream");
@@ -843,7 +843,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
// No need to add the keyspace, since all we want is to avoid always sending this to the same
// CPU. Even then I am being overzealous here. This is not something that happens all the time.
auto coordinator = std::hash<sstring>()(cf) % smp::count;
- return service::get_storage_service().invoke_on(coordinator,
+ return ss.invoke_on(coordinator,
[ks = std::move(ks), cf = std::move(cf),
load_and_stream, primary_replica_only] (service::storage_service& s) {
return s.load_new_sstables(ks, cf, load_and_stream, primary_replica_only);
@@ -933,18 +933,18 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}
});

- ss::enable_auto_compaction.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::enable_auto_compaction.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto tables = split_cf(req->get_query_param("cf"));

- return set_tables_autocompaction(ctx, keyspace, tables, true);
+ return set_tables_autocompaction(ctx, ss.local(), keyspace, tables, true);
});

- ss::disable_auto_compaction.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::disable_auto_compaction.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto tables = split_cf(req->get_query_param("cf"));

- return set_tables_autocompaction(ctx, keyspace, tables, false);
+ return set_tables_autocompaction(ctx, ss.local(), keyspace, tables, false);
});

ss::deliver_hints.set(r, [](std::unique_ptr<request> req) {
@@ -1012,8 +1012,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return get_cf_stats(ctx, &column_family_stats::live_disk_space_used);
});

- ss::get_exceptions.set(r, [](const_req req) {
- return service::get_local_storage_service().get_exception_count();
+ ss::get_exceptions.set(r, [&ss](const_req req) {
+ return ss.local().get_exception_count();
});

ss::get_total_hints_in_progress.set(r, [](std::unique_ptr<request> req) {
@@ -1028,25 +1028,25 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(0);
});

- ss::get_ownership.set(r, [] (std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_ownership().then([] (auto&& ownership) {
+ ss::get_ownership.set(r, [&ss] (std::unique_ptr<request> req) {
+ return ss.local().get_ownership().then([] (auto&& ownership) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
});
});

- ss::get_effective_ownership.set(r, [&ctx] (std::unique_ptr<request> req) {
+ ss::get_effective_ownership.set(r, [&ctx, &ss] (std::unique_ptr<request> req) {
auto keyspace_name = req->param["keyspace"] == "null" ? "" : validate_keyspace(ctx, req->param);
- return service::get_local_storage_service().effective_ownership(keyspace_name).then([] (auto&& ownership) {
+ return ss.local().effective_ownership(keyspace_name).then([] (auto&& ownership) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
});
});

- ss::view_build_statuses.set(r, [&ctx] (std::unique_ptr<request> req) {
+ ss::view_build_statuses.set(r, [&ctx, &ss] (std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto view = req->param["view"];
- return service::get_local_storage_service().view_build_statuses(std::move(keyspace), std::move(view)).then([] (std::unordered_map<sstring, sstring> status) {
+ return ss.local().view_build_statuses(std::move(keyspace), std::move(view)).then([] (std::unordered_map<sstring, sstring> status) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(std::move(status), res));
});
@@ -1177,7 +1177,6 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});
});
-
}

void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_ctl) {
--
2.20.1

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:26 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
Some storage_service methods call for global storage service instance
while they can enjoy "this" pointer.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
service/storage_service.cc | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/service/storage_service.cc b/service/storage_service.cc
index e37163e349..c2c8d14bbb 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -1778,13 +1778,13 @@ future<std::unordered_map<sstring, std::vector<sstring>>> storage_service::descr
auto version = host_and_version.second ? host_and_version.second->to_sstring() : UNREACHABLE;
results.try_emplace(version).first->second.emplace_back(host_and_version.first.to_sstring());
return results;
- }).then([] (auto results) {
+ }).then([this] (auto results) {
// we're done: the results map is ready to return to the client. the rest is just debug logging:
auto it_unreachable = results.find(UNREACHABLE);
if (it_unreachable != results.end()) {
slogger.debug("Hosts not in agreement. Didn't get a response from everybody: {}", ::join( ",", it_unreachable->second));
}
- auto my_version = get_local_storage_service().get_schema_version();
+ auto my_version = get_schema_version();
for (auto&& entry : results) {
// check for version disagreement. log the hosts that don't agree.
if (entry.first == UNREACHABLE || entry.first == my_version) {
@@ -3602,8 +3602,8 @@ future<> storage_service::update_topology(inet_address endpoint) {
}

void storage_service::init_messaging_service() {
- _messaging.local().register_replication_finished([] (gms::inet_address from) {
- return get_local_storage_service().confirm_replication(from);
+ _messaging.local().register_replication_finished([this] (gms::inet_address from) {
+ return confirm_replication(from);
});

_messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
--
2.20.1

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:26 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
The mutate_MV() call needs token metadata and it gets them from
global storage service. Fixing it not to use globals is a huge
refactoring, so for now just get the tokens from global storage
proxy.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
db/view/view.cc | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/db/view/view.cc b/db/view/view.cc
index 9e044cabc9..6aad31d9ec 100644
--- a/db/view/view.cc
+++ b/db/view/view.cc
@@ -73,7 +73,6 @@
#include "mutation.hh"
#include "mutation_partition.hh"
#include "service/migration_manager.hh"
-#include "service/storage_service.hh"
#include "service/storage_proxy.hh"
#include "view_info.hh"
#include "view_update_checks.hh"
@@ -1280,7 +1279,7 @@ future<> mutate_MV(
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto& keyspace_name = mut.s->ks_name();
auto target_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token);
- auto remote_endpoints = service::get_local_storage_service().get_token_metadata().pending_endpoints_for(view_token, keyspace_name);
+ auto remote_endpoints = service::get_local_storage_proxy().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name);
auto sem_units = pending_view_updates.split(mut.fm.representation().size());

// First, find the local endpoint and ensure that if it exists,
--
2.20.1

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:27 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
And use container() to reshard to shard 0. This removes one
more call for global storage service instance.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
service/storage_service.hh | 2 +-
service/storage_service.cc | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/service/storage_service.hh b/service/storage_service.hh
index 8465fbd65e..cebd4b69d2 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -255,7 +255,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
future<> keyspace_changed(const sstring& ks_name);
void register_metrics();
future<> snitch_reconfigured();
- static future<> update_topology(inet_address endpoint);
+ future<> update_topology(inet_address endpoint);
future<> publish_schema_version();
void install_schema_version_change_listener();

diff --git a/service/storage_service.cc b/service/storage_service.cc
index c2c8d14bbb..1cd0a24523 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -3590,7 +3590,7 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
}

future<> storage_service::update_topology(inet_address endpoint) {
- return service::get_storage_service().invoke_on(0, [endpoint] (auto& ss) {
+ return container().invoke_on(0, [endpoint] (auto& ss) {
return ss.mutate_token_metadata([&ss, endpoint] (mutable_token_metadata_ptr tmptr) mutable {
// initiate the token metadata endpoints cache reset
tmptr->invalidate_cached_rings();
--
2.20.1

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:28 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
One of them just re-wraps arguments in std::ref and calls for
global storage service. The other one is dead code which also
calls the global s._s. Remove both and fix the only caller.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
service/storage_service.hh | 16 ----------------
main.cc | 10 +++++-----
service/storage_service.cc | 27 ---------------------------
3 files changed, 5 insertions(+), 48 deletions(-)

diff --git a/service/storage_service.hh b/service/storage_service.hh
index b6f2787817..0ea45ad41a 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -911,20 +911,4 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
bool is_repair_based_node_ops_enabled();
};

-future<> init_storage_service(sharded<abort_source>& abort_sources,
- distributed<database>& db,
- sharded<gms::gossiper>& gossiper,
- sharded<db::system_distributed_keyspace>& sys_dist_ks,
- sharded<db::view::view_update_generator>& view_update_generator,
- sharded<gms::feature_service>& feature_service,
- storage_service_config config,
- sharded<service::migration_manager>& mm,
- sharded<locator::shared_token_metadata>& stm,
- sharded<netw::messaging_service>& ms,
- sharded<cdc::generation_service>&,
- sharded<repair_service>& repair,
- sharded<raft_group_registry>& raft_gr,
- sharded<endpoint_lifecycle_notifier>& elc_notif);
-future<> deinit_storage_service();
-
}
diff --git a/main.cc b/main.cc
index d9796e86f0..c84660a1ba 100644
--- a/main.cc
+++ b/main.cc
@@ -888,11 +888,11 @@ int main(int ac, char** av) {
supervisor::notify("initializing storage service");
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
- service::init_storage_service(stop_signal.as_sharded_abort_source(),
- db, gossiper, sys_dist_ks, view_update_generator,
- feature_service, sscfg, mm, token_metadata,
- messaging, cdc_generation_service, repair,
- raft_gr, lifecycle_notifier).get();
+ ss.start(std::ref(stop_signal.as_sharded_abort_source()),
+ std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(view_update_generator),
+ std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata),
+ std::ref(messaging), std::ref(cdc_generation_service), std::ref(repair),
+ std::ref(raft_gr), std::ref(lifecycle_notifier)).get();
supervisor::notify("starting per-shard database core");

sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 1cd0a24523..6a91969b4b 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -3810,33 +3810,6 @@ storage_service::view_build_statuses(sstring keyspace, sstring view_name) const
});
}

-future<> init_storage_service(sharded<abort_source>& abort_source, distributed<database>& db, sharded<gms::gossiper>& gossiper,
- sharded<db::system_distributed_keyspace>& sys_dist_ks,
- sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service,
- storage_service_config config,
- sharded<service::migration_manager>& mm, sharded<locator::shared_token_metadata>& stm,
- sharded<netw::messaging_service>& ms,
- sharded<cdc::generation_service>& cdc_gen_service,
- sharded<repair_service>& repair,
- sharded<service::raft_group_registry>& raft_gr,
- sharded<service::endpoint_lifecycle_notifier>& elc_notif) {
- return
- service::get_storage_service().start(std::ref(abort_source),
- std::ref(db), std::ref(gossiper),
- std::ref(sys_dist_ks),
- std::ref(view_update_generator),
- std::ref(feature_service), config, std::ref(mm),
- std::ref(stm), std::ref(ms),
- std::ref(cdc_gen_service),
- std::ref(repair),
- std::ref(raft_gr),
- std::ref(elc_notif));
-}
-
-future<> deinit_storage_service() {
- return service::get_storage_service().stop();
-}
-
future<> endpoint_lifecycle_notifier::notify_down(gms::inet_address endpoint) {
return seastar::async([this, endpoint] {
_subscribers.for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) {
--
2.20.1

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:28 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
service/storage_service.hh | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/service/storage_service.hh b/service/storage_service.hh
index cebd4b69d2..b6f2787817 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -879,7 +879,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi

template <typename Func>
auto run_with_api_lock(sstring operation, Func&& func) {
- return get_storage_service().invoke_on(0, [operation = std::move(operation),
+ return container().invoke_on(0, [operation = std::move(operation),
func = std::forward<Func>(func)] (storage_service& ss) mutable {
if (!ss._operation_in_progress.empty()) {
throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress));
@@ -893,7 +893,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi

template <typename Func>
auto run_with_no_api_lock(Func&& func) {
- return get_storage_service().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable {
+ return container().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable {
return func(ss);
});
}
--
2.20.1

Pavel Emelyanov

unread,
Jul 29, 2021, 4:07:29 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
There are 3 places that can now declare local instance:

- main
- cql_test_env
- boost gossiper test

The global pointer is saved in debug namespace for debugging.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
service/storage_service.hh | 12 ------------
main.cc | 4 +++-
service/storage_service.cc | 2 --
test/boost/gossip_test.cc | 5 +++--
test/lib/cql_test_env.cc | 6 +++---
5 files changed, 9 insertions(+), 20 deletions(-)

diff --git a/service/storage_service.hh b/service/storage_service.hh
index 0ea45ad41a..5ff130fa4b 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -106,18 +106,6 @@ namespace service {
class storage_service;
class migration_manager;

-extern distributed<storage_service> _the_storage_service;
-// DEPRECATED, DON'T USE!
-// Pass references to services through constructor/function parameters. Don't use globals.
-inline distributed<storage_service>& get_storage_service() {
- return _the_storage_service;
-}
-// DEPRECATED, DON'T USE!
-// Pass references to services through constructor/function parameters. Don't use globals.
-inline storage_service& get_local_storage_service() {
- return _the_storage_service.local();
-}
-
enum class disk_error { regular, commit };

struct bind_messaging_port_tag {};
diff --git a/main.cc b/main.cc
index c84660a1ba..5dc7135d43 100644
--- a/main.cc
+++ b/main.cc
@@ -416,6 +416,7 @@ sharded<netw::messaging_service>* the_messaging_service;
sharded<cql3::query_processor>* the_query_processor;
sharded<qos::service_level_controller>* the_sl_controller;
sharded<service::migration_manager>* the_migration_manager;
+sharded<service::storage_service>* the_storage_service;
}

int main(int ac, char** av) {
@@ -494,7 +495,7 @@ int main(int ac, char** av) {
service::load_meter load_meter;
debug::db = &db;
auto& proxy = service::get_storage_proxy();
- auto& ss = service::get_storage_service();
+ sharded<service::storage_service> ss;
sharded<service::migration_manager> mm;
api::http_context ctx(db, proxy, load_meter, token_metadata);
httpd::http_server_control prometheus_server;
@@ -888,6 +889,7 @@ int main(int ac, char** av) {
supervisor::notify("initializing storage service");
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
+ debug::the_storage_service = &ss;
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(view_update_generator),
std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata),
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 6a91969b4b..5918900b68 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -101,8 +101,6 @@ namespace service {

static logging::logger slogger("storage_service");

-distributed<storage_service> _the_storage_service;
-
storage_service::storage_service(abort_source& abort_source,
distributed<database>& db, gms::gossiper& gossiper,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
diff --git a/test/boost/gossip_test.cc b/test/boost/gossip_test.cc
index 10d4761980..197e34c83d 100644
--- a/test/boost/gossip_test.cc
+++ b/test/boost/gossip_test.cc
@@ -98,7 +98,8 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
elc_notif.start().get();
auto stop_elc_notif = defer([&elc_notif] { elc_notif.stop().get(); });

- service::get_storage_service().start(std::ref(abort_sources),
+ sharded<service::storage_service> ss;
+ ss.start(std::ref(abort_sources),
std::ref(db), std::ref(gms::get_gossiper()),
std::ref(sys_dist_ks),
std::ref(view_update_generator),
@@ -108,7 +109,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
std::ref(cdc_generation_service), std::ref(repair),
std::ref(raft_gr), std::ref(elc_notif),
true).get();
- auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });
+ auto stop_ss = defer([&] { ss.stop().get(); });

sharded<semaphore> sst_dir_semaphore;
sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc
index 3562669bf6..c0e4274755 100644
--- a/test/lib/cql_test_env.cc
+++ b/test/lib/cql_test_env.cc
@@ -542,7 +542,7 @@ class single_node_cql_env : public cql_test_env {
raft_gr.start(std::ref(ms), std::ref(gms::get_gossiper()), std::ref(qp)).get();
auto stop_raft = defer([&raft_gr] { raft_gr.stop().get(); });

- auto& ss = service::get_storage_service();
+ sharded<service::storage_service> ss;
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
ss.start(std::ref(abort_sources), std::ref(db),
@@ -684,8 +684,8 @@ class single_node_cql_env : public cql_test_env {
cdc.stop().get();
});

- service::get_local_storage_service().init_server(service::bind_messaging_port(false)).get();
- service::get_local_storage_service().join_cluster().get();
+ ss.local().init_server(service::bind_messaging_port(false)).get();
+ ss.local().join_cluster().get();

auth::permissions_cache_config perm_cache_config;
perm_cache_config.max_entries = cfg->permissions_cache_max_entries();
--
2.20.1

Commit Bot

unread,
Jul 29, 2021, 4:47:48 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

alternator: Keep storage_proxy on server

It's already available on controller and will be needed by
API handlers in the next patch.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/alternator/controller.cc b/alternator/controller.cc
--- a/alternator/controller.cc
+++ b/alternator/controller.cc
@@ -85,7 +85,7 @@ future<> controller::start() {
auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); };

_executor.start(std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value()).get();
- _server.start(std::ref(_executor), std::ref(_qp)).get();
+ _server.start(std::ref(_executor), std::ref(_qp), std::ref(_proxy)).get();
std::optional<uint16_t> alternator_port;
if (_config.alternator_port()) {
alternator_port = _config.alternator_port();
diff --git a/alternator/server.cc b/alternator/server.cc
--- a/alternator/server.cc
+++ b/alternator/server.cc
@@ -433,11 +433,12 @@ void server::set_routes(routes& r) {
//FIXME: A way to immediately invalidate the cache should be considered,
// e.g. when the system table which stores the keys is changed.
// For now, this propagation may take up to 1 minute.
-server::server(executor& exec, cql3::query_processor& qp)
+server::server(executor& exec, cql3::query_processor& qp, service::storage_proxy& proxy)
: _http_server("http-alternator")
, _https_server("https-alternator")
, _executor(exec)
, _qp(qp)
+ , _proxy(proxy)
, _key_cache(1024, 1min, slogger)
, _enforce_authorization(false)
, _enabled_servers{}
@@ -507,6 +508,7 @@ server::server(executor& exec, cql3::query_processor& qp)
return e.get_records(client_state, std::move(trace_state), std::move(permit), std::move(json_request));
}},
} {
+ (void)_proxy; // temporary
}

future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
diff --git a/alternator/server.hh b/alternator/server.hh

Commit Bot

unread,
Jul 29, 2021, 4:47:49 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

alternator: Take token metadata from server's storage_proxy

There's a local_nodelist_handler serving API requests that calls
for global storage service to get token metadata from. Now it
can get storage proxy reference from server upon construction
and use it for tokens.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/alternator/server.cc b/alternator/server.cc
--- a/alternator/server.cc
+++ b/alternator/server.cc
@@ -508,7 +513,6 @@ server::server(executor& exec, cql3::query_processor& qp, service::storage_proxy
return e.get_records(client_state, std::move(trace_state), std::move(permit), std::move(json_request));
}},
} {
- (void)_proxy; // temporary

Commit Bot

unread,
Jul 29, 2021, 4:47:51 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

api: Carry sharded<storage_service>& down to some handlers

Both set_server_storage_service and set_server_storage_proxy set up
API handlers that need storage service to work. Now they all call for
global storage service instance, but it's better if they receive one
from main. This patch carries the sharded storage service reference
down to handlers setting function, next patch will make use of it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/api/api.cc b/api/api.cc
--- a/api/api.cc
+++ b/api/api.cc
@@ -109,8 +109,10 @@ future<> unset_rpc_controller(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_rpc_controller(ctx, r); });
}

-future<> set_server_storage_service(http_context& ctx) {
- return register_api(ctx, "storage_service", "The storage service API", set_storage_service);
+future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss) {
+ return register_api(ctx, "storage_service", "The storage service API", [&ss] (http_context& ctx, routes& r) {
+ set_storage_service(ctx, r, ss);
+ });
}

future<> set_server_repair(http_context& ctx, sharded<repair_service>& repair) {
@@ -153,9 +155,11 @@ future<> unset_server_messaging_service(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_messaging_service(ctx, r); });
}

-future<> set_server_storage_proxy(http_context& ctx) {
+future<> set_server_storage_proxy(http_context& ctx, sharded<service::storage_service>& ss) {
return register_api(ctx, "storage_proxy",
- "The storage proxy API", set_storage_proxy);
+ "The storage proxy API", [&ss] (http_context& ctx, routes& r) {
+ set_storage_proxy(ctx, r, ss);
+ });
}

future<> set_server_stream_manager(http_context& ctx) {
diff --git a/api/api_init.hh b/api/api_init.hh
diff --git a/api/storage_proxy.cc b/api/storage_proxy.cc
--- a/api/storage_proxy.cc
+++ b/api/storage_proxy.cc
@@ -193,7 +193,7 @@ sum_timer_stats_storage_proxy(distributed<proxy>& d,
});
}

-void set_storage_proxy(http_context& ctx, routes& r) {
+void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_service>& ss) {
sp::get_total_hints.set(r, [](std::unique_ptr<request> req) {
//TBD
unimplemented();
diff --git a/api/storage_proxy.hh b/api/storage_proxy.hh
--- a/api/storage_proxy.hh
+++ b/api/storage_proxy.hh
@@ -21,10 +21,13 @@

#pragma once

+#include <seastar/core/sharded.hh>
#include "api.hh"

+namespace service { class storage_service; }
+
namespace api {

-void set_storage_proxy(http_context& ctx, routes& r);
+void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_service>& ss);

}
diff --git a/api/storage_service.cc b/api/storage_service.cc
--- a/api/storage_service.cc
+++ b/api/storage_service.cc
@@ -294,7 +294,7 @@ void unset_repair(http_context& ctx, routes& r) {
ss::force_terminate_all_repair_sessions_new.unset(r);
}

-void set_storage_service(http_context& ctx, routes& r) {
+void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss) {
ss::local_hostid.set(r, [](std::unique_ptr<request> req) {
return db::system_keyspace::get_local_host_id().then([](const utils::UUID& id) {
return make_ready_future<json::json_return_type>(id.to_sstring());
diff --git a/api/storage_service.hh b/api/storage_service.hh
--- a/api/storage_service.hh
+++ b/api/storage_service.hh
@@ -33,7 +33,7 @@ class repair_service;

namespace api {

-void set_storage_service(http_context& ctx, routes& r);
+void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss);
void set_repair(http_context& ctx, routes& r, sharded<repair_service>& repair);
void unset_repair(http_context& ctx, routes& r);
void set_transport_controller(http_context& ctx, routes& r, cql_transport::controller& ctl);
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc

Commit Bot

unread,
Jul 29, 2021, 4:47:52 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

api: Capture and use sharded<storage_service>& in handlers

The reference in question is already there, handlers that need
storage service can capture it and use. These handlers are not
yet stopped, but neither is the storage service itself, so the
potentially dangling reference is not being set up here.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/api/storage_proxy.cc b/api/storage_proxy.cc
--- a/api/storage_proxy.cc
+++ b/api/storage_proxy.cc
@@ -363,8 +363,8 @@ void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_se
return sum_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read_repair_repaired_background);
});

- sp::get_schema_versions.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().describe_schema_versions().then([] (auto result) {
+ sp::get_schema_versions.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().describe_schema_versions().then([] (auto result) {
std::vector<sp::mapper_list> res;
for (auto e : result) {
sp::mapper_list entry;
diff --git a/api/storage_service.cc b/api/storage_service.cc
--- a/api/storage_service.cc
+++ b/api/storage_service.cc
@@ -650,29 +650,29 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(res);
});

- ss::get_operation_mode.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_operation_mode().then([] (auto mode) {
+ ss::get_operation_mode.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().get_operation_mode().then([] (auto mode) {
return make_ready_future<json::json_return_type>(mode);
});
});

- ss::is_starting.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().is_starting().then([] (auto starting) {
+ ss::is_starting.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().is_starting().then([] (auto starting) {
return make_ready_future<json::json_return_type>(starting);
});
});

- ss::get_drain_progress.set(r, [](std::unique_ptr<request> req) {
- return service::get_storage_service().map_reduce(adder<service::storage_service::drain_progress>(), [] (auto& ss) {
+ ss::get_drain_progress.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.map_reduce(adder<service::storage_service::drain_progress>(), [] (auto& ss) {
return ss.get_drain_progress();
}).then([] (auto&& progress) {
auto progress_str = format("Drained {}/{} ColumnFamilies", progress.remaining_cfs, progress.total_cfs);
return make_ready_future<json::json_return_type>(std::move(progress_str));

Commit Bot

unread,
Jul 29, 2021, 4:47:53 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

thrift: Carry sharded<storage_service>& down to handler

The thrift_handler class' methods need storage service. This
patch makes sure this class has sharded storage service
reference on board.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc
@@ -1372,7 +1372,7 @@ int main(int ac, char** av) {
api::unset_transport_controller(ctx).get();
});

- ::thrift_controller thrift_ctl(db, auth_service, qp, service_memory_limiter);
+ ::thrift_controller thrift_ctl(db, auth_service, qp, service_memory_limiter, ss);

ss.local().register_client_shutdown_hook("rpc server", [&thrift_ctl] {
thrift_ctl.stop().get();
diff --git a/thrift/controller.cc b/thrift/controller.cc
diff --git a/thrift/controller.hh b/thrift/controller.hh
--- a/thrift/controller.hh
+++ b/thrift/controller.hh
@@ -32,6 +32,7 @@ class thrift_server;
class database;
namespace auth { class service; }
namespace cql3 { class query_processor; }
+namespace service { class storage_service; }

class thrift_controller {
std::unique_ptr<distributed<thrift_server>> _server;
@@ -42,12 +43,13 @@ class thrift_controller {
sharded<auth::service>& _auth_service;
sharded<cql3::query_processor>& _qp;
sharded<service::memory_limiter>& _mem_limiter;
+ sharded<service::storage_service>& _ss;

future<> do_start_server();
future<> do_stop_server();

public:
- thrift_controller(distributed<database>&, sharded<auth::service>&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&);
+ thrift_controller(distributed<database>&, sharded<auth::service>&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&, sharded<service::storage_service>& ss);
future<> start_server();
future<> stop_server();
future<> stop();
diff --git a/thrift/handler.cc b/thrift/handler.cc
--- a/thrift/handler.cc
+++ b/thrift/handler.cc
@@ -203,6 +203,7 @@ enum class query_order { no, yes };
class thrift_handler : public CassandraCobSvIf {
distributed<database>& _db;
distributed<cql3::query_processor>& _query_processor;
+ sharded<service::storage_service>& _ss;
::timeout_config _timeout_config;
service::client_state _client_state;
service::query_state _query_state;
@@ -220,17 +221,18 @@ class thrift_handler : public CassandraCobSvIf {
});
}
public:
- explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit)
+ explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit)
: _db(db)
, _query_processor(qp)
+ , _ss(ss)
, _timeout_config(timeout_config)
, _client_state(service::client_state::external_tag{}, auth_service, nullptr, _timeout_config, socket_address(), true)
// FIXME: Handlers are not created per query, but rather per connection, so it makes little sense to store
// service permits in here. The query state should be reinstantiated per query - AFAIK it's only used
// for CQL queries which piggy-back on Thrift protocol.
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
, _current_permit(current_permit)
- { }
+ { (void)_ss; /* temporary */ }

const sstring& current_keyspace() const {
return _query_state.get_client_state().get_raw_keyspace();
@@ -2018,27 +2020,30 @@ class thrift_handler : public CassandraCobSvIf {
class handler_factory : public CassandraCobSvIfFactory {
distributed<database>& _db;
distributed<cql3::query_processor>& _query_processor;
+ sharded<service::storage_service>& _ss;
auth::service& _auth_service;
timeout_config _timeout_config;
service_permit& _current_permit;
public:
explicit handler_factory(distributed<database>& db,
distributed<cql3::query_processor>& qp,
+ sharded<service::storage_service>& ss,
auth::service& auth_service,
::timeout_config timeout_config,
service_permit& current_permit)
- : _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config), _current_permit(current_permit) {}
+ : _db(db), _query_processor(qp), _ss(ss), _auth_service(auth_service), _timeout_config(timeout_config), _current_permit(current_permit) {}
typedef CassandraCobSvIf Handler;
virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) {
- return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config, _current_permit);
+ return new thrift_handler(_db, _query_processor, _ss, _auth_service, _timeout_config, _current_permit);
}
virtual void releaseHandler(CassandraCobSvIf* handler) {
delete handler;
}
};

std::unique_ptr<CassandraCobSvIfFactory>
-create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service,
+create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp,
+ sharded<service::storage_service>& ss, auth::service& auth_service,
::timeout_config timeout_config, service_permit& current_permit) {
- return std::make_unique<handler_factory>(db, qp, auth_service, timeout_config, current_permit);
+ return std::make_unique<handler_factory>(db, qp, ss, auth_service, timeout_config, current_permit);
}
diff --git a/thrift/handler.hh b/thrift/handler.hh
--- a/thrift/handler.hh
+++ b/thrift/handler.hh
@@ -31,7 +31,8 @@

struct timeout_config;
class service_permit;
+namespace service { class storage_service; }

-std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, timeout_config, service_permit& current_permit);
+std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, auth::service&, timeout_config, service_permit& current_permit);

#endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */
diff --git a/thrift/server.cc b/thrift/server.cc
--- a/thrift/server.cc
+++ b/thrift/server.cc
@@ -68,11 +68,12 @@ class thrift_stats {

thrift_server::thrift_server(distributed<database>& db,
distributed<cql3::query_processor>& qp,
+ sharded<service::storage_service>& ss,
auth::service& auth_service,
service::memory_limiter& ml,
thrift_server_config config)
: _stats(new thrift_stats(*this))
- , _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config, _current_permit).release())
+ , _handler_factory(create_handler_factory(db, qp, ss, auth_service, config.timeout_config, _current_permit).release())
, _protocol_factory(new TBinaryProtocolFactoryT<TMemoryBuffer>())
, _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory))
, _memory_available(ml.get_semaphore())
diff --git a/thrift/server.hh b/thrift/server.hh
--- a/thrift/server.hh
+++ b/thrift/server.hh
@@ -76,6 +76,8 @@ namespace auth {
class service;
}

+namespace service { class storage_service; }
+
struct thrift_server_config {
::timeout_config timeout_config;
uint64_t max_request_size;
@@ -127,7 +129,7 @@ private:

Commit Bot

unread,
Jul 29, 2021, 4:47:54 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

thrift: Use local storage service in handlers

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/thrift/handler.cc b/thrift/handler.cc
--- a/thrift/handler.cc
+++ b/thrift/handler.cc
@@ -232,7 +232,7 @@ class thrift_handler : public CassandraCobSvIf {
// for CQL queries which piggy-back on Thrift protocol.
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
, _current_permit(current_permit)
- { (void)_ss; /* temporary */ }
+ { }

const sstring& current_keyspace() const {
return _query_state.get_client_state().get_raw_keyspace();

Commit Bot

unread,
Jul 29, 2021, 4:47:56 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

view: Use proxy to get token metadata from

The mutate_MV() call needs token metadata and it gets them from
global storage service. Fixing it not to use globals is a huge
refactoring, so for now just get the tokens from global storage
proxy.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/db/view/view.cc b/db/view/view.cc

Commit Bot

unread,
Jul 29, 2021, 4:47:57 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

storage_service: Capture this when appropriate

Some storage_service methods call for global storage service instance
while they can enjoy "this" pointer.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc

Commit Bot

unread,
Jul 29, 2021, 4:47:58 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

storage_service: Unmark update_topology static

And use container() to reshard to shard 0. This removes one
more call for global storage service instance.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -3590,7 +3590,7 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
}

future<> storage_service::update_topology(inet_address endpoint) {
- return service::get_storage_service().invoke_on(0, [endpoint] (auto& ss) {
+ return container().invoke_on(0, [endpoint] (auto& ss) {
return ss.mutate_token_metadata([&ss, endpoint] (mutable_token_metadata_ptr tmptr) mutable {
// initiate the token metadata endpoints cache reset
tmptr->invalidate_cached_rings();
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -255,7 +255,7 @@ private:

Commit Bot

unread,
Jul 29, 2021, 4:48:00 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

storage_service: Use container() in run_with(out)_api_lock

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -879,7 +879,7 @@ public:

template <typename Func>
auto run_with_api_lock(sstring operation, Func&& func) {
- return get_storage_service().invoke_on(0, [operation = std::move(operation),
+ return container().invoke_on(0, [operation = std::move(operation),
func = std::forward<Func>(func)] (storage_service& ss) mutable {
if (!ss._operation_in_progress.empty()) {
throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress));
@@ -893,7 +893,7 @@ public:

Commit Bot

unread,
Jul 29, 2021, 4:48:01 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

storage_service: Remove (de)?init_storage_service()

One of them just re-wraps arguments in std::ref and calls for
global storage service. The other one is dead code which also
calls the global s._s. Remove both and fix the only caller.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc
@@ -888,11 +888,11 @@ int main(int ac, char** av) {
supervisor::notify("initializing storage service");
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
- service::init_storage_service(stop_signal.as_sharded_abort_source(),
- db, gossiper, sys_dist_ks, view_update_generator,
- feature_service, sscfg, mm, token_metadata,
- messaging, cdc_generation_service, repair,
- raft_gr, lifecycle_notifier).get();
+ ss.start(std::ref(stop_signal.as_sharded_abort_source()),
+ std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(view_update_generator),
+ std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata),
+ std::ref(messaging), std::ref(cdc_generation_service), std::ref(repair),
+ std::ref(raft_gr), std::ref(lifecycle_notifier)).get();
supervisor::notify("starting per-shard database core");

sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -911,20 +911,4 @@ public:

Commit Bot

unread,
Jul 29, 2021, 4:48:02 AMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: next

storage_service: Make it local

There are 3 places that can now declare local instance:

- main
- cql_test_env
- boost gossiper test

The global pointer is saved in debug namespace for debugging.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc
@@ -416,6 +416,7 @@ sharded<netw::messaging_service>* the_messaging_service;
sharded<cql3::query_processor>* the_query_processor;
sharded<qos::service_level_controller>* the_sl_controller;
sharded<service::migration_manager>* the_migration_manager;
+sharded<service::storage_service>* the_storage_service;
}

int main(int ac, char** av) {
@@ -494,7 +495,7 @@ int main(int ac, char** av) {
service::load_meter load_meter;
debug::db = &db;
auto& proxy = service::get_storage_proxy();
- auto& ss = service::get_storage_service();
+ sharded<service::storage_service> ss;
sharded<service::migration_manager> mm;
api::http_context ctx(db, proxy, load_meter, token_metadata);
httpd::http_server_control prometheus_server;
@@ -888,6 +889,7 @@ int main(int ac, char** av) {
supervisor::notify("initializing storage service");
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
+ debug::the_storage_service = &ss;
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(view_update_generator),
std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata),
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -101,8 +101,6 @@ namespace service {

static logging::logger slogger("storage_service");

-distributed<storage_service> _the_storage_service;
-
storage_service::storage_service(abort_source& abort_source,
distributed<database>& db, gms::gossiper& gossiper,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -106,18 +106,6 @@ namespace service {
class storage_service;
class migration_manager;

-extern distributed<storage_service> _the_storage_service;
-// DEPRECATED, DON'T USE!
-// Pass references to services through constructor/function parameters. Don't use globals.
-inline distributed<storage_service>& get_storage_service() {
- return _the_storage_service;
-}
-// DEPRECATED, DON'T USE!
-// Pass references to services through constructor/function parameters. Don't use globals.
-inline storage_service& get_local_storage_service() {
- return _the_storage_service.local();
-}
-
enum class disk_error { regular, commit };

struct bind_messaging_port_tag {};
diff --git a/test/boost/gossip_test.cc b/test/boost/gossip_test.cc
--- a/test/boost/gossip_test.cc
+++ b/test/boost/gossip_test.cc
@@ -98,7 +98,8 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
elc_notif.start().get();
auto stop_elc_notif = defer([&elc_notif] { elc_notif.stop().get(); });

- service::get_storage_service().start(std::ref(abort_sources),
+ sharded<service::storage_service> ss;
+ ss.start(std::ref(abort_sources),
std::ref(db), std::ref(gms::get_gossiper()),
std::ref(sys_dist_ks),
std::ref(view_update_generator),
@@ -108,7 +109,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
std::ref(cdc_generation_service), std::ref(repair),
std::ref(raft_gr), std::ref(elc_notif),
true).get();
- auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });
+ auto stop_ss = defer([&] { ss.stop().get(); });

sharded<semaphore> sst_dir_semaphore;
sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc
--- a/test/lib/cql_test_env.cc
+++ b/test/lib/cql_test_env.cc
@@ -542,7 +542,7 @@ class single_node_cql_env : public cql_test_env {
raft_gr.start(std::ref(ms), std::ref(gms::get_gossiper()), std::ref(qp)).get();
auto stop_raft = defer([&raft_gr] { raft_gr.stop().get(); });

- auto& ss = service::get_storage_service();
+ sharded<service::storage_service> ss;
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();

Avi Kivity

unread,
Jul 29, 2021, 4:48:47 AMJul 29
to Pavel Emelyanov, scylla...@googlegroups.com

On 29/07/2021 11.07, Pavel Emelyanov wrote:
> There are few places that call global storage service, but all
> are easily fixable without significant changes.
>
> 1. alternator -- needs token metadata, switch to using proxy


This is a good change, storage_proxy is lower level, and I can also
remember what it does.


> 2. api -- calls methods from storage service, all handlers are
> registered in main and can capture storage service from there
> 3. thrift -- calls methods from storage service, can carry the
> reference via controller
> 4. view -- needs tokens, switch to using (global) proxy


Here too.

Commit Bot

unread,
Jul 29, 2021, 1:19:45 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

alternator: Keep storage_proxy on server

It's already available on controller and will be needed by
API handlers in the next patch.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/alternator/controller.cc b/alternator/controller.cc
--- a/alternator/controller.cc
+++ b/alternator/controller.cc
@@ -85,7 +85,7 @@ future<> controller::start() {
auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); };

_executor.start(std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value()).get();
- _server.start(std::ref(_executor), std::ref(_qp)).get();
+ _server.start(std::ref(_executor), std::ref(_qp), std::ref(_proxy)).get();
std::optional<uint16_t> alternator_port;
if (_config.alternator_port()) {
alternator_port = _config.alternator_port();
diff --git a/alternator/server.cc b/alternator/server.cc
--- a/alternator/server.cc
+++ b/alternator/server.cc
@@ -433,11 +433,12 @@ void server::set_routes(routes& r) {
//FIXME: A way to immediately invalidate the cache should be considered,
// e.g. when the system table which stores the keys is changed.
// For now, this propagation may take up to 1 minute.
-server::server(executor& exec, cql3::query_processor& qp)
+server::server(executor& exec, cql3::query_processor& qp, service::storage_proxy& proxy)
: _http_server("http-alternator")
, _https_server("https-alternator")
, _executor(exec)
, _qp(qp)
+ , _proxy(proxy)
, _key_cache(1024, 1min, slogger)
, _enforce_authorization(false)
, _enabled_servers{}
@@ -507,6 +508,7 @@ server::server(executor& exec, cql3::query_processor& qp)
return e.get_records(client_state, std::move(trace_state), std::move(permit), std::move(json_request));
}},
} {
+ (void)_proxy; // temporary
}

future<> server::init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,

Commit Bot

unread,
Jul 29, 2021, 1:19:46 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

alternator: Take token metadata from server's storage_proxy

There's a local_nodelist_handler serving API requests that calls
for global storage service to get token metadata from. Now it
can get storage proxy reference from server upon construction
and use it for tokens.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/alternator/server.cc b/alternator/server.cc
--- a/alternator/server.cc
+++ b/alternator/server.cc
@@ -508,7 +513,6 @@ server::server(executor& exec, cql3::query_processor& qp, service::storage_proxy
return e.get_records(client_state, std::move(trace_state), std::move(permit), std::move(json_request));
}},
} {
- (void)_proxy; // temporary

Commit Bot

unread,
Jul 29, 2021, 1:19:48 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

api: Carry sharded<storage_service>& down to some handlers

Both set_server_storage_service and set_server_storage_proxy set up
API handlers that need storage service to work. Now they all call for
global storage service instance, but it's better if they receive one
from main. This patch carries the sharded storage service reference
down to handlers setting function, next patch will make use of it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/api/storage_proxy.cc b/api/storage_proxy.cc
--- a/api/storage_proxy.cc
+++ b/api/storage_proxy.cc
@@ -193,7 +193,7 @@ sum_timer_stats_storage_proxy(distributed<proxy>& d,
});
}

-void set_storage_proxy(http_context& ctx, routes& r) {
+void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_service>& ss) {
sp::get_total_hints.set(r, [](std::unique_ptr<request> req) {
//TBD
unimplemented();
diff --git a/api/storage_proxy.hh b/api/storage_proxy.hh
--- a/api/storage_proxy.hh
+++ b/api/storage_proxy.hh
@@ -21,10 +21,13 @@

#pragma once

+#include <seastar/core/sharded.hh>
#include "api.hh"

+namespace service { class storage_service; }
+
namespace api {

-void set_storage_proxy(http_context& ctx, routes& r);
+void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_service>& ss);

}
diff --git a/api/storage_service.cc b/api/storage_service.cc
--- a/api/storage_service.cc
+++ b/api/storage_service.cc
@@ -294,7 +294,7 @@ void unset_repair(http_context& ctx, routes& r) {
ss::force_terminate_all_repair_sessions_new.unset(r);
}

-void set_storage_service(http_context& ctx, routes& r) {
+void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss) {
ss::local_hostid.set(r, [](std::unique_ptr<request> req) {
return db::system_keyspace::get_local_host_id().then([](const utils::UUID& id) {
return make_ready_future<json::json_return_type>(id.to_sstring());
diff --git a/api/storage_service.hh b/api/storage_service.hh
--- a/api/storage_service.hh
+++ b/api/storage_service.hh
@@ -33,7 +33,7 @@ class repair_service;

namespace api {

-void set_storage_service(http_context& ctx, routes& r);
+void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss);
void set_repair(http_context& ctx, routes& r, sharded<repair_service>& repair);
void unset_repair(http_context& ctx, routes& r);
void set_transport_controller(http_context& ctx, routes& r, cql_transport::controller& ctl);
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc

Commit Bot

unread,
Jul 29, 2021, 1:19:50 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

api: Capture and use sharded<storage_service>& in handlers

The reference in question is already there, handlers that need
storage service can capture it and use. These handlers are not
yet stopped, but neither is the storage service itself, so the
potentially dangling reference is not being set up here.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/api/storage_proxy.cc b/api/storage_proxy.cc
--- a/api/storage_proxy.cc
+++ b/api/storage_proxy.cc
@@ -363,8 +363,8 @@ void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_se
return sum_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read_repair_repaired_background);
});

- sp::get_schema_versions.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().describe_schema_versions().then([] (auto result) {
+ sp::get_schema_versions.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().describe_schema_versions().then([] (auto result) {
std::vector<sp::mapper_list> res;
for (auto e : result) {
sp::mapper_list entry;
diff --git a/api/storage_service.cc b/api/storage_service.cc
--- a/api/storage_service.cc
+++ b/api/storage_service.cc

Commit Bot

unread,
Jul 29, 2021, 1:19:50 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

thrift: Carry sharded<storage_service>& down to handler

The thrift_handler class' methods need storage service. This
patch makes sure this class has sharded storage service
reference on board.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc
diff --git a/thrift/handler.cc b/thrift/handler.cc
--- a/thrift/handler.cc
+++ b/thrift/handler.cc
@@ -203,6 +203,7 @@ enum class query_order { no, yes };
class thrift_handler : public CassandraCobSvIf {
distributed<database>& _db;
distributed<cql3::query_processor>& _query_processor;
+ sharded<service::storage_service>& _ss;
::timeout_config _timeout_config;
service::client_state _client_state;
service::query_state _query_state;
@@ -220,17 +221,18 @@ class thrift_handler : public CassandraCobSvIf {
});
}
public:
- explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit)
+ explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit)
: _db(db)
, _query_processor(qp)
+ , _ss(ss)
, _timeout_config(timeout_config)
, _client_state(service::client_state::external_tag{}, auth_service, nullptr, _timeout_config, socket_address(), true)
// FIXME: Handlers are not created per query, but rather per connection, so it makes little sense to store
// service permits in here. The query state should be reinstantiated per query - AFAIK it's only used
// for CQL queries which piggy-back on Thrift protocol.
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
, _current_permit(current_permit)
- { }
+ { (void)_ss; /* temporary */ }

const sstring& current_keyspace() const {
return _query_state.get_client_state().get_raw_keyspace();
+namespace service { class storage_service; }
+

Commit Bot

unread,
Jul 29, 2021, 1:19:51 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

thrift: Use local storage service in handlers

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/thrift/handler.cc b/thrift/handler.cc
--- a/thrift/handler.cc
+++ b/thrift/handler.cc
@@ -232,7 +232,7 @@ class thrift_handler : public CassandraCobSvIf {
// for CQL queries which piggy-back on Thrift protocol.
, _query_state(_client_state, /*FIXME: pass real permit*/empty_service_permit())
, _current_permit(current_permit)
- { (void)_ss; /* temporary */ }
+ { }

const sstring& current_keyspace() const {
return _query_state.get_client_state().get_raw_keyspace();

Commit Bot

unread,
Jul 29, 2021, 1:19:53 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

view: Use proxy to get token metadata from

The mutate_MV() call needs token metadata and it gets them from
global storage service. Fixing it not to use globals is a huge
refactoring, so for now just get the tokens from global storage
proxy.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---

Commit Bot

unread,
Jul 29, 2021, 1:19:54 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

storage_service: Capture this when appropriate

Some storage_service methods call for global storage service instance
while they can enjoy "this" pointer.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc

Commit Bot

unread,
Jul 29, 2021, 1:19:55 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

storage_service: Unmark update_topology static

And use container() to reshard to shard 0. This removes one
more call for global storage service instance.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -3590,7 +3590,7 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
}

future<> storage_service::update_topology(inet_address endpoint) {
- return service::get_storage_service().invoke_on(0, [endpoint] (auto& ss) {
+ return container().invoke_on(0, [endpoint] (auto& ss) {
return ss.mutate_token_metadata([&ss, endpoint] (mutable_token_metadata_ptr tmptr) mutable {
// initiate the token metadata endpoints cache reset
tmptr->invalidate_cached_rings();
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh

Commit Bot

unread,
Jul 29, 2021, 1:19:56 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

storage_service: Use container() in run_with(out)_api_lock

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh

Commit Bot

unread,
Jul 29, 2021, 1:19:58 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

storage_service: Remove (de)?init_storage_service()

One of them just re-wraps arguments in std::ref and calls for
global storage service. The other one is dead code which also
calls the global s._s. Remove both and fix the only caller.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc
@@ -888,11 +888,11 @@ int main(int ac, char** av) {
supervisor::notify("initializing storage service");
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
- service::init_storage_service(stop_signal.as_sharded_abort_source(),
- db, gossiper, sys_dist_ks, view_update_generator,
- feature_service, sscfg, mm, token_metadata,
- messaging, cdc_generation_service, repair,
- raft_gr, lifecycle_notifier).get();
+ ss.start(std::ref(stop_signal.as_sharded_abort_source()),
+ std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(view_update_generator),
+ std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata),
+ std::ref(messaging), std::ref(cdc_generation_service), std::ref(repair),
+ std::ref(raft_gr), std::ref(lifecycle_notifier)).get();
supervisor::notify("starting per-shard database core");

sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh

Commit Bot

unread,
Jul 29, 2021, 1:19:59 PMJul 29
to scylla...@googlegroups.com, Pavel Emelyanov
From: Pavel Emelyanov <xe...@scylladb.com>
Committer: Pavel Emelyanov <xe...@scylladb.com>
Branch: master

storage_service: Make it local

There are 3 places that can now declare local instance:

- main
- cql_test_env
- boost gossiper test

The global pointer is saved in debug namespace for debugging.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>

---
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc
@@ -416,6 +416,7 @@ sharded<netw::messaging_service>* the_messaging_service;
sharded<cql3::query_processor>* the_query_processor;
sharded<qos::service_level_controller>* the_sl_controller;
sharded<service::migration_manager>* the_migration_manager;
+sharded<service::storage_service>* the_storage_service;
}

int main(int ac, char** av) {
@@ -494,7 +495,7 @@ int main(int ac, char** av) {
service::load_meter load_meter;
debug::db = &db;
auto& proxy = service::get_storage_proxy();
- auto& ss = service::get_storage_service();
+ sharded<service::storage_service> ss;
sharded<service::migration_manager> mm;
api::http_context ctx(db, proxy, load_meter, token_metadata);
httpd::http_server_control prometheus_server;
@@ -888,6 +889,7 @@ int main(int ac, char** av) {
supervisor::notify("initializing storage service");
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
+ debug::the_storage_service = &ss;
ss.start(std::ref(stop_signal.as_sharded_abort_source()),
std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(view_update_generator),
std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata),
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -101,8 +101,6 @@ namespace service {

static logging::logger slogger("storage_service");

-distributed<storage_service> _the_storage_service;
-
storage_service::storage_service(abort_source& abort_source,
distributed<database>& db, gms::gossiper& gossiper,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
Reply all
Reply to author
Forward
0 new messages