[QUEUED scylla next] Merge "Remove global storage service instance" from Pavel E

0 views
Skip to first unread message

Commit Bot

unread,
Jul 29, 2021, 4:48:07 AMJul 29
to scylla...@googlegroups.com, Avi Kivity
From: Avi Kivity <a...@scylladb.com>
Committer: Avi Kivity <a...@scylladb.com>
Branch: next

Merge "Remove global storage service instance" from Pavel E

"
There are few places that call global storage service, but all
are easily fixable without significant changes.

1. alternator -- needs token metadata, switch to using proxy
2. api -- calls methods from storage service, all handlers are
registered in main and can capture storage service from there
3. thrift -- calls methods from storage service, can carry the
reference via controller
4. view -- needs tokens, switch to using (global) proxy
5. storage_service -- (surprisingly) can use "this"

tests: unit(dev), dtest(simple_boot_shutdown, dev)
"

* 'br-unglobal-storage-service' of https://github.com/xemul/scylla:
storage_service: Make it local
storage_service: Remove (de)?init_storage_service()
storage_service: Use container() in run_with(out)_api_lock
storage_service: Unmark update_topology static
storage_service: Capture this when appropriate
view: Use proxy to get token metadata from
thrift: Use local storage service in handlers
thrift: Carry sharded<storage_service>& down to handler
api: Capture and use sharded<storage_service>& in handlers
api: Carry sharded<storage_service>& down to some handlers
alternator: Take token metadata from server's storage_proxy
alternator: Keep storage_proxy on server

---
diff --git a/alternator/controller.cc b/alternator/controller.cc
--- a/alternator/controller.cc
+++ b/alternator/controller.cc
@@ -85,7 +85,7 @@ future<> controller::start() {
auto get_cdc_metadata = [] (cdc::generation_service& svc) { return std::ref(svc.get_cdc_metadata()); };

_executor.start(std::ref(_proxy), std::ref(_mm), std::ref(_sys_dist_ks), sharded_parameter(get_cdc_metadata, std::ref(_cdc_gen_svc)), _ssg.value()).get();
- _server.start(std::ref(_executor), std::ref(_qp)).get();
+ _server.start(std::ref(_executor), std::ref(_qp), std::ref(_proxy)).get();
std::optional<uint16_t> alternator_port;
if (_config.alternator_port()) {
alternator_port = _config.alternator_port();
diff --git a/alternator/server.cc b/alternator/server.cc
--- a/alternator/server.cc
+++ b/alternator/server.cc
@@ -30,9 +30,12 @@
#include "utils/rjson.hh"
#include "auth.hh"
#include <cctype>
+#include "service/storage_proxy.hh"
#include "cql3/query_processor.hh"
-#include "service/storage_service.hh"
+#include "locator/snitch_base.hh"
+#include "gms/gossiper.hh"
#include "utils/overloaded_functor.hh"
+#include "utils/fb_utilities.hh"

static logging::logger slogger("alternator-server");

@@ -191,8 +194,11 @@ class health_handler : public gated_handler {
};

class local_nodelist_handler : public gated_handler {
+ service::storage_proxy& _proxy;
public:
- local_nodelist_handler(seastar::gate& pending_requests) : gated_handler(pending_requests) {}
+ local_nodelist_handler(seastar::gate& pending_requests, service::storage_proxy& proxy)
+ : gated_handler(pending_requests)
+ , _proxy(proxy) {}
protected:
virtual future<std::unique_ptr<reply>> do_handle(const sstring& path, std::unique_ptr<request> req, std::unique_ptr<reply> rep) override {
rjson::value results = rjson::empty_array();
@@ -202,8 +208,7 @@ class local_nodelist_handler : public gated_handler {
sstring local_dc = locator::i_endpoint_snitch::get_local_snitch_ptr()->get_datacenter(
utils::fb_utilities::get_broadcast_address());
std::unordered_set<gms::inet_address> local_dc_nodes =
- service::get_local_storage_service().get_token_metadata().
- get_topology().get_datacenter_endpoints().at(local_dc);
+ _proxy.get_token_metadata_ptr()->get_topology().get_datacenter_endpoints().at(local_dc);
for (auto& ip : local_dc_nodes) {
if (gms::get_local_gossiper().is_alive(ip)) {
rjson::push_back(results, rjson::from_string(ip.to_sstring()));
@@ -426,18 +431,19 @@ void server::set_routes(routes& r) {
// consider this to be a security risk, because an attacker can already
// scan an entire subnet for nodes responding to the health request,
// or even just scan for open ports.
- r.put(operation_type::GET, "/localnodes", new local_nodelist_handler(_pending_requests));
+ r.put(operation_type::GET, "/localnodes", new local_nodelist_handler(_pending_requests, _proxy));
r.put(operation_type::OPTIONS, "/", new options_handler(_pending_requests));
}

//FIXME: A way to immediately invalidate the cache should be considered,
// e.g. when the system table which stores the keys is changed.
// For now, this propagation may take up to 1 minute.
-server::server(executor& exec, cql3::query_processor& qp)
+server::server(executor& exec, cql3::query_processor& qp, service::storage_proxy& proxy)
: _http_server("http-alternator")
, _https_server("https-alternator")
, _executor(exec)
, _qp(qp)
+ , _proxy(proxy)
, _key_cache(1024, 1min, slogger)
, _enforce_authorization(false)
, _enabled_servers{}
diff --git a/alternator/server.hh b/alternator/server.hh
--- a/alternator/server.hh
+++ b/alternator/server.hh
@@ -46,6 +46,7 @@ class server {
http_server _https_server;
executor& _executor;
cql3::query_processor& _qp;
+ service::storage_proxy& _proxy;

key_cache _key_cache;
bool _enforce_authorization;
@@ -77,7 +78,7 @@ class server {
json_parser _json_parser;

public:
- server(executor& executor, cql3::query_processor& qp);
+ server(executor& executor, cql3::query_processor& qp, service::storage_proxy& proxy);

future<> init(net::inet_address addr, std::optional<uint16_t> port, std::optional<uint16_t> https_port, std::optional<tls::credentials_builder> creds,
bool enforce_authorization, semaphore* memory_limiter, utils::updateable_value<uint32_t> max_concurrent_requests);
diff --git a/api/api.cc b/api/api.cc
--- a/api/api.cc
+++ b/api/api.cc
@@ -109,8 +109,10 @@ future<> unset_rpc_controller(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_rpc_controller(ctx, r); });
}

-future<> set_server_storage_service(http_context& ctx) {
- return register_api(ctx, "storage_service", "The storage service API", set_storage_service);
+future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss) {
+ return register_api(ctx, "storage_service", "The storage service API", [&ss] (http_context& ctx, routes& r) {
+ set_storage_service(ctx, r, ss);
+ });
}

future<> set_server_repair(http_context& ctx, sharded<repair_service>& repair) {
@@ -153,9 +155,11 @@ future<> unset_server_messaging_service(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_messaging_service(ctx, r); });
}

-future<> set_server_storage_proxy(http_context& ctx) {
+future<> set_server_storage_proxy(http_context& ctx, sharded<service::storage_service>& ss) {
return register_api(ctx, "storage_proxy",
- "The storage proxy API", set_storage_proxy);
+ "The storage proxy API", [&ss] (http_context& ctx, routes& r) {
+ set_storage_proxy(ctx, r, ss);
+ });
}

future<> set_server_stream_manager(http_context& ctx) {
diff --git a/api/api_init.hh b/api/api_init.hh
--- a/api/api_init.hh
+++ b/api/api_init.hh
@@ -30,6 +30,7 @@ namespace service {

class load_meter;
class storage_proxy;
+class storage_service;

} // namespace service

@@ -69,7 +70,7 @@ struct http_context {
future<> set_server_init(http_context& ctx);
future<> set_server_config(http_context& ctx);
future<> set_server_snitch(http_context& ctx);
-future<> set_server_storage_service(http_context& ctx);
+future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss);
future<> set_server_repair(http_context& ctx, sharded<repair_service>& repair);
future<> unset_server_repair(http_context& ctx);
future<> set_transport_controller(http_context& ctx, cql_transport::controller& ctl);
@@ -82,7 +83,7 @@ future<> set_server_gossip(http_context& ctx);
future<> set_server_load_sstable(http_context& ctx);
future<> set_server_messaging_service(http_context& ctx, sharded<netw::messaging_service>& ms);
future<> unset_server_messaging_service(http_context& ctx);
-future<> set_server_storage_proxy(http_context& ctx);
+future<> set_server_storage_proxy(http_context& ctx, sharded<service::storage_service>& ss);
future<> set_server_stream_manager(http_context& ctx);
future<> set_server_gossip_settle(http_context& ctx);
future<> set_server_cache(http_context& ctx);
diff --git a/api/storage_proxy.cc b/api/storage_proxy.cc
--- a/api/storage_proxy.cc
+++ b/api/storage_proxy.cc
@@ -193,7 +193,7 @@ sum_timer_stats_storage_proxy(distributed<proxy>& d,
});
}

-void set_storage_proxy(http_context& ctx, routes& r) {
+void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_service>& ss) {
sp::get_total_hints.set(r, [](std::unique_ptr<request> req) {
//TBD
unimplemented();
@@ -363,8 +363,8 @@ void set_storage_proxy(http_context& ctx, routes& r) {
return sum_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read_repair_repaired_background);
});

- sp::get_schema_versions.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().describe_schema_versions().then([] (auto result) {
+ sp::get_schema_versions.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().describe_schema_versions().then([] (auto result) {
std::vector<sp::mapper_list> res;
for (auto e : result) {
sp::mapper_list entry;
diff --git a/api/storage_proxy.hh b/api/storage_proxy.hh
--- a/api/storage_proxy.hh
+++ b/api/storage_proxy.hh
@@ -21,10 +21,13 @@

#pragma once

+#include <seastar/core/sharded.hh>
#include "api.hh"

+namespace service { class storage_service; }
+
namespace api {

-void set_storage_proxy(http_context& ctx, routes& r);
+void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_service>& ss);

}
diff --git a/api/storage_service.cc b/api/storage_service.cc
--- a/api/storage_service.cc
+++ b/api/storage_service.cc
@@ -134,12 +134,12 @@ seastar::future<json::json_return_type> run_toppartitions_query(db::toppartition
});
}

-future<json::json_return_type> set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector<sstring> tables, bool enabled) {
+future<json::json_return_type> set_tables_autocompaction(http_context& ctx, service::storage_service& ss, const sstring &keyspace, std::vector<sstring> tables, bool enabled) {
if (tables.empty()) {
tables = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
}

- return service::get_local_storage_service().set_tables_autocompaction(keyspace, tables, enabled).then([]{
+ return ss.set_tables_autocompaction(keyspace, tables, enabled).then([]{
return make_ready_future<json::json_return_type>(json_void());
});
}
@@ -294,7 +294,7 @@ void unset_repair(http_context& ctx, routes& r) {
ss::force_terminate_all_repair_sessions_new.unset(r);
}

-void set_storage_service(http_context& ctx, routes& r) {
+void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss) {
ss::local_hostid.set(r, [](std::unique_ptr<request> req) {
return db::system_keyspace::get_local_host_id().then([](const utils::UUID& id) {
return make_ready_future<json::json_return_type>(id.to_sstring());
@@ -318,8 +318,8 @@ void set_storage_service(http_context& ctx, routes& r) {
return ctx.db.local().commitlog()->active_config().commit_log_location;
});

- ss::get_token_endpoint.set(r, [] (std::unique_ptr<request> req) {
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().get_token_to_endpoint_map(), [](const auto& i) {
+ ss::get_token_endpoint.set(r, [&ss] (std::unique_ptr<request> req) {
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().get_token_to_endpoint_map(), [](const auto& i) {
storage_service_json::mapper val;
val.key = boost::lexical_cast<std::string>(i.first);
val.value = boost::lexical_cast<std::string>(i.second);
@@ -395,15 +395,15 @@ void set_storage_service(http_context& ctx, routes& r) {
return container_to_vec(addr);
});

- ss::get_release_version.set(r, [](const_req req) {
- return service::get_local_storage_service().get_release_version();
+ ss::get_release_version.set(r, [&ss](const_req req) {
+ return ss.local().get_release_version();
});

ss::get_scylla_release_version.set(r, [](const_req req) {
return scylla_version();
});
- ss::get_schema_version.set(r, [](const_req req) {
- return service::get_local_storage_service().get_schema_version();
+ ss::get_schema_version.set(r, [&ss](const_req req) {
+ return ss.local().get_schema_version();
});

ss::get_all_data_file_locations.set(r, [&ctx](const_req req) {
@@ -414,10 +414,10 @@ void set_storage_service(http_context& ctx, routes& r) {
return ctx.db.local().get_config().saved_caches_directory();
});

- ss::get_range_to_endpoint_map.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::get_range_to_endpoint_map.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
std::vector<ss::maplist_mapper> res;
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().get_range_to_address_map(keyspace),
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().get_range_to_address_map(keyspace),
[](const std::pair<dht::token_range, inet_address_vector_replica_set>& entry){
ss::maplist_mapper m;
if (entry.first.start()) {
@@ -445,13 +445,13 @@ void set_storage_service(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(res);
});

- ss::describe_any_ring.set(r, [&ctx](std::unique_ptr<request> req) {
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(""), token_range_endpoints_to_json));
+ ss::describe_any_ring.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().describe_ring(""), token_range_endpoints_to_json));
});

- ss::describe_ring.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::describe_ring.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(keyspace), token_range_endpoints_to_json));
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().describe_ring(keyspace), token_range_endpoints_to_json));
});

ss::get_host_id_map.set(r, [&ctx](const_req req) {
@@ -483,14 +483,14 @@ void set_storage_service(http_context& ctx, routes& r) {
});
});

- ss::get_natural_endpoints.set(r, [&ctx](const_req req) {
+ ss::get_natural_endpoints.set(r, [&ctx, &ss](const_req req) {
auto keyspace = validate_keyspace(ctx, req.param);
- return container_to_vec(service::get_local_storage_service().get_natural_endpoints(keyspace, req.get_query_param("cf"),
+ return container_to_vec(ss.local().get_natural_endpoints(keyspace, req.get_query_param("cf"),
req.get_query_param("key")));
});

- ss::cdc_streams_check_and_repair.set(r, [&ctx] (std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_cdc_generation_service().check_and_repair_cdc_streams().then([] {
+ ss::cdc_streams_check_and_repair.set(r, [&ctx, &ss] (std::unique_ptr<request> req) {
+ return ss.local().get_cdc_generation_service().check_and_repair_cdc_streams().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -514,13 +514,13 @@ void set_storage_service(http_context& ctx, routes& r) {
});
});

- ss::force_keyspace_cleanup.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::force_keyspace_cleanup.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto column_families = split_cf(req->get_query_param("cf"));
if (column_families.empty()) {
column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
}
- return service::get_local_storage_service().is_cleanup_allowed(keyspace).then([&ctx, keyspace,
+ return ss.local().is_cleanup_allowed(keyspace).then([&ctx, keyspace,
column_families = std::move(column_families)] (bool is_cleanup_allowed) mutable {
if (!is_cleanup_allowed) {
return make_exception_future<json::json_return_type>(
@@ -584,20 +584,20 @@ void set_storage_service(http_context& ctx, routes& r) {
});


- ss::decommission.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().decommission().then([] {
+ ss::decommission.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().decommission().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

- ss::move.set(r, [] (std::unique_ptr<request> req) {
+ ss::move.set(r, [&ss] (std::unique_ptr<request> req) {
auto new_token = req->get_query_param("new_token");
- return service::get_local_storage_service().move(new_token).then([] {
+ return ss.local().move(new_token).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

- ss::remove_node.set(r, [](std::unique_ptr<request> req) {
+ ss::remove_node.set(r, [&ss](std::unique_ptr<request> req) {
auto host_id = req->get_query_param("host_id");
std::vector<sstring> ignore_nodes_strs= split(req->get_query_param("ignore_nodes"), ",");
auto ignore_nodes = std::list<gms::inet_address>();
@@ -614,19 +614,19 @@ void set_storage_service(http_context& ctx, routes& r) {
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
}
}
- return service::get_local_storage_service().removenode(host_id, std::move(ignore_nodes)).then([] {
+ return ss.local().removenode(host_id, std::move(ignore_nodes)).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

- ss::get_removal_status.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_removal_status().then([] (auto status) {
+ ss::get_removal_status.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().get_removal_status().then([] (auto status) {
return make_ready_future<json::json_return_type>(status);
});
});

- ss::force_remove_completion.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().force_remove_completion().then([] {
+ ss::force_remove_completion.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().force_remove_completion().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -650,29 +650,29 @@ void set_storage_service(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(res);
});

- ss::get_operation_mode.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_operation_mode().then([] (auto mode) {
+ ss::get_operation_mode.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().get_operation_mode().then([] (auto mode) {
return make_ready_future<json::json_return_type>(mode);
});
});

- ss::is_starting.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().is_starting().then([] (auto starting) {
+ ss::is_starting.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().is_starting().then([] (auto starting) {
return make_ready_future<json::json_return_type>(starting);
});
});

- ss::get_drain_progress.set(r, [](std::unique_ptr<request> req) {
- return service::get_storage_service().map_reduce(adder<service::storage_service::drain_progress>(), [] (auto& ss) {
+ ss::get_drain_progress.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.map_reduce(adder<service::storage_service::drain_progress>(), [] (auto& ss) {
return ss.get_drain_progress();
}).then([] (auto&& progress) {
auto progress_str = format("Drained {}/{} ColumnFamilies", progress.remaining_cfs, progress.total_cfs);
return make_ready_future<json::json_return_type>(std::move(progress_str));
});
});

- ss::drain.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().drain().then([] {
+ ss::drain.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().drain().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -703,20 +703,20 @@ void set_storage_service(http_context& ctx, routes& r) {
});
});

- ss::stop_gossiping.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().stop_gossiping().then([] {
+ ss::stop_gossiping.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().stop_gossiping().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

- ss::start_gossiping.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().start_gossiping().then([] {
+ ss::start_gossiping.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().start_gossiping().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});

- ss::is_gossip_running.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().is_gossip_running().then([] (bool running){
+ ss::is_gossip_running.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().is_gossip_running().then([] (bool running){
return make_ready_future<json::json_return_type>(running);
});
});
@@ -728,8 +728,8 @@ void set_storage_service(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(json_void());
});

- ss::is_initialized.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().is_initialized().then([] (bool initialized) {
+ ss::is_initialized.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().is_initialized().then([] (bool initialized) {
return make_ready_future<json::json_return_type>(initialized);
});
});
@@ -738,8 +738,8 @@ void set_storage_service(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(json_void());
});

- ss::is_joined.set(r, [] (std::unique_ptr<request> req) {
- return make_ready_future<json::json_return_type>(service::get_local_storage_service().is_joined());
+ ss::is_joined.set(r, [&ss] (std::unique_ptr<request> req) {
+ return make_ready_future<json::json_return_type>(ss.local().is_joined());
});

ss::set_stream_throughput_mb_per_sec.set(r, [](std::unique_ptr<request> req) {
@@ -804,9 +804,9 @@ void set_storage_service(http_context& ctx, routes& r) {
});
});

- ss::rebuild.set(r, [](std::unique_ptr<request> req) {
+ ss::rebuild.set(r, [&ss](std::unique_ptr<request> req) {
auto source_dc = req->get_query_param("source_dc");
- return service::get_local_storage_service().rebuild(std::move(source_dc)).then([] {
+ return ss.local().rebuild(std::move(source_dc)).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -831,7 +831,7 @@ void set_storage_service(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(json_void());
});

- ss::load_new_ss_tables.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::load_new_ss_tables.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto ks = validate_keyspace(ctx, req->param);
auto cf = req->get_query_param("cf");
auto stream = req->get_query_param("load_and_stream");
@@ -843,7 +843,7 @@ void set_storage_service(http_context& ctx, routes& r) {
// No need to add the keyspace, since all we want is to avoid always sending this to the same
// CPU. Even then I am being overzealous here. This is not something that happens all the time.
auto coordinator = std::hash<sstring>()(cf) % smp::count;
- return service::get_storage_service().invoke_on(coordinator,
+ return ss.invoke_on(coordinator,
[ks = std::move(ks), cf = std::move(cf),
load_and_stream, primary_replica_only] (service::storage_service& s) {
return s.load_new_sstables(ks, cf, load_and_stream, primary_replica_only);
@@ -933,18 +933,18 @@ void set_storage_service(http_context& ctx, routes& r) {
}
});

- ss::enable_auto_compaction.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::enable_auto_compaction.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto tables = split_cf(req->get_query_param("cf"));

- return set_tables_autocompaction(ctx, keyspace, tables, true);
+ return set_tables_autocompaction(ctx, ss.local(), keyspace, tables, true);
});

- ss::disable_auto_compaction.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::disable_auto_compaction.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto tables = split_cf(req->get_query_param("cf"));

- return set_tables_autocompaction(ctx, keyspace, tables, false);
+ return set_tables_autocompaction(ctx, ss.local(), keyspace, tables, false);
});

ss::deliver_hints.set(r, [](std::unique_ptr<request> req) {
@@ -1012,8 +1012,8 @@ void set_storage_service(http_context& ctx, routes& r) {
return get_cf_stats(ctx, &column_family_stats::live_disk_space_used);
});

- ss::get_exceptions.set(r, [](const_req req) {
- return service::get_local_storage_service().get_exception_count();
+ ss::get_exceptions.set(r, [&ss](const_req req) {
+ return ss.local().get_exception_count();
});

ss::get_total_hints_in_progress.set(r, [](std::unique_ptr<request> req) {
@@ -1028,25 +1028,25 @@ void set_storage_service(http_context& ctx, routes& r) {
return make_ready_future<json::json_return_type>(0);
});

- ss::get_ownership.set(r, [] (std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_ownership().then([] (auto&& ownership) {
+ ss::get_ownership.set(r, [&ss] (std::unique_ptr<request> req) {
+ return ss.local().get_ownership().then([] (auto&& ownership) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
});
});

- ss::get_effective_ownership.set(r, [&ctx] (std::unique_ptr<request> req) {
+ ss::get_effective_ownership.set(r, [&ctx, &ss] (std::unique_ptr<request> req) {
auto keyspace_name = req->param["keyspace"] == "null" ? "" : validate_keyspace(ctx, req->param);
- return service::get_local_storage_service().effective_ownership(keyspace_name).then([] (auto&& ownership) {
+ return ss.local().effective_ownership(keyspace_name).then([] (auto&& ownership) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
});
});

- ss::view_build_statuses.set(r, [&ctx] (std::unique_ptr<request> req) {
+ ss::view_build_statuses.set(r, [&ctx, &ss] (std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto view = req->param["view"];
- return service::get_local_storage_service().view_build_statuses(std::move(keyspace), std::move(view)).then([] (std::unordered_map<sstring, sstring> status) {
+ return ss.local().view_build_statuses(std::move(keyspace), std::move(view)).then([] (std::unordered_map<sstring, sstring> status) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(std::move(status), res));
});
@@ -1177,7 +1177,6 @@ void set_storage_service(http_context& ctx, routes& r) {
});
});
});
-
}

void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_ctl) {
diff --git a/api/storage_service.hh b/api/storage_service.hh
--- a/api/storage_service.hh
+++ b/api/storage_service.hh
@@ -33,7 +33,7 @@ class repair_service;

namespace api {

-void set_storage_service(http_context& ctx, routes& r);
+void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss);
void set_repair(http_context& ctx, routes& r, sharded<repair_service>& repair);
void unset_repair(http_context& ctx, routes& r);
void set_transport_controller(http_context& ctx, routes& r, cql_transport::controller& ctl);
diff --git a/db/view/view.cc b/db/view/view.cc
--- a/db/view/view.cc
+++ b/db/view/view.cc
@@ -73,7 +73,6 @@
#include "mutation.hh"
#include "mutation_partition.hh"
#include "service/migration_manager.hh"
-#include "service/storage_service.hh"
#include "service/storage_proxy.hh"
#include "view_info.hh"
#include "view_update_checks.hh"
@@ -1280,7 +1279,7 @@ future<> mutate_MV(
auto view_token = dht::get_token(*mut.s, mut.fm.key());
auto& keyspace_name = mut.s->ks_name();
auto target_endpoint = get_view_natural_endpoint(keyspace_name, base_token, view_token);
- auto remote_endpoints = service::get_local_storage_service().get_token_metadata().pending_endpoints_for(view_token, keyspace_name);
+ auto remote_endpoints = service::get_local_storage_proxy().get_token_metadata_ptr()->pending_endpoints_for(view_token, keyspace_name);
auto sem_units = pending_view_updates.split(mut.fm.representation().size());

// First, find the local endpoint and ensure that if it exists,
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc
@@ -416,6 +416,7 @@ sharded<netw::messaging_service>* the_messaging_service;
sharded<cql3::query_processor>* the_query_processor;
sharded<qos::service_level_controller>* the_sl_controller;
sharded<service::migration_manager>* the_migration_manager;
+sharded<service::storage_service>* the_storage_service;
}

int main(int ac, char** av) {
@@ -494,7 +495,7 @@ int main(int ac, char** av) {
service::load_meter load_meter;
debug::db = &db;
auto& proxy = service::get_storage_proxy();
- auto& ss = service::get_storage_service();
+ sharded<service::storage_service> ss;
sharded<service::migration_manager> mm;
api::http_context ctx(db, proxy, load_meter, token_metadata);
httpd::http_server_control prometheus_server;
@@ -888,11 +889,12 @@ int main(int ac, char** av) {
supervisor::notify("initializing storage service");
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
- service::init_storage_service(stop_signal.as_sharded_abort_source(),
- db, gossiper, sys_dist_ks, view_update_generator,
- feature_service, sscfg, mm, token_metadata,
- messaging, cdc_generation_service, repair,
- raft_gr, lifecycle_notifier).get();
+ debug::the_storage_service = &ss;
+ ss.start(std::ref(stop_signal.as_sharded_abort_source()),
+ std::ref(db), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(view_update_generator),
+ std::ref(feature_service), sscfg, std::ref(mm), std::ref(token_metadata),
+ std::ref(messaging), std::ref(cdc_generation_service), std::ref(repair),
+ std::ref(raft_gr), std::ref(lifecycle_notifier)).get();
supervisor::notify("starting per-shard database core");

sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
@@ -1097,7 +1099,7 @@ int main(int ac, char** av) {
}).get();
api::set_server_gossip(ctx).get();
api::set_server_snitch(ctx).get();
- api::set_server_storage_proxy(ctx).get();
+ api::set_server_storage_proxy(ctx, ss).get();
api::set_server_load_sstable(ctx).get();
static seastar::sharded<memory_threshold_guard> mtg;
//FIXME: discarded future
@@ -1191,7 +1193,7 @@ int main(int ac, char** av) {
auto stop_messaging_api = defer_verbose_shutdown("messaging service API", [&ctx] {
api::unset_server_messaging_service(ctx).get();
});
- api::set_server_storage_service(ctx).get();
+ api::set_server_storage_service(ctx, ss).get();
api::set_server_repair(ctx, repair).get();
auto stop_repair_api = defer_verbose_shutdown("repair API", [&ctx] {
api::unset_server_repair(ctx).get();
@@ -1372,7 +1374,7 @@ int main(int ac, char** av) {
api::unset_transport_controller(ctx).get();
});

- ::thrift_controller thrift_ctl(db, auth_service, qp, service_memory_limiter);
+ ::thrift_controller thrift_ctl(db, auth_service, qp, service_memory_limiter, ss);

ss.local().register_client_shutdown_hook("rpc server", [&thrift_ctl] {
thrift_ctl.stop().get();
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -101,8 +101,6 @@ namespace service {

static logging::logger slogger("storage_service");

-distributed<storage_service> _the_storage_service;
-
storage_service::storage_service(abort_source& abort_source,
distributed<database>& db, gms::gossiper& gossiper,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
@@ -1778,13 +1776,13 @@ future<std::unordered_map<sstring, std::vector<sstring>>> storage_service::descr
auto version = host_and_version.second ? host_and_version.second->to_sstring() : UNREACHABLE;
results.try_emplace(version).first->second.emplace_back(host_and_version.first.to_sstring());
return results;
- }).then([] (auto results) {
+ }).then([this] (auto results) {
// we're done: the results map is ready to return to the client. the rest is just debug logging:
auto it_unreachable = results.find(UNREACHABLE);
if (it_unreachable != results.end()) {
slogger.debug("Hosts not in agreement. Didn't get a response from everybody: {}", ::join( ",", it_unreachable->second));
}
- auto my_version = get_local_storage_service().get_schema_version();
+ auto my_version = get_schema_version();
for (auto&& entry : results) {
// check for version disagreement. log the hosts that don't agree.
if (entry.first == UNREACHABLE || entry.first == my_version) {
@@ -3590,7 +3588,7 @@ future<> storage_service::keyspace_changed(const sstring& ks_name) {
}

future<> storage_service::update_topology(inet_address endpoint) {
- return service::get_storage_service().invoke_on(0, [endpoint] (auto& ss) {
+ return container().invoke_on(0, [endpoint] (auto& ss) {
return ss.mutate_token_metadata([&ss, endpoint] (mutable_token_metadata_ptr tmptr) mutable {
// initiate the token metadata endpoints cache reset
tmptr->invalidate_cached_rings();
@@ -3602,8 +3600,8 @@ future<> storage_service::update_topology(inet_address endpoint) {
}

void storage_service::init_messaging_service() {
- _messaging.local().register_replication_finished([] (gms::inet_address from) {
- return get_local_storage_service().confirm_replication(from);
+ _messaging.local().register_replication_finished([this] (gms::inet_address from) {
+ return confirm_replication(from);
});

_messaging.local().register_node_ops_cmd([this] (const rpc::client_info& cinfo, node_ops_cmd_request req) {
@@ -3810,33 +3808,6 @@ storage_service::view_build_statuses(sstring keyspace, sstring view_name) const
});
}

-future<> init_storage_service(sharded<abort_source>& abort_source, distributed<database>& db, sharded<gms::gossiper>& gossiper,
- sharded<db::system_distributed_keyspace>& sys_dist_ks,
- sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service,
- storage_service_config config,
- sharded<service::migration_manager>& mm, sharded<locator::shared_token_metadata>& stm,
- sharded<netw::messaging_service>& ms,
- sharded<cdc::generation_service>& cdc_gen_service,
- sharded<repair_service>& repair,
- sharded<service::raft_group_registry>& raft_gr,
- sharded<service::endpoint_lifecycle_notifier>& elc_notif) {
- return
- service::get_storage_service().start(std::ref(abort_source),
- std::ref(db), std::ref(gossiper),
- std::ref(sys_dist_ks),
- std::ref(view_update_generator),
- std::ref(feature_service), config, std::ref(mm),
- std::ref(stm), std::ref(ms),
- std::ref(cdc_gen_service),
- std::ref(repair),
- std::ref(raft_gr),
- std::ref(elc_notif));
-}
-
-future<> deinit_storage_service() {
- return service::get_storage_service().stop();
-}
-
future<> endpoint_lifecycle_notifier::notify_down(gms::inet_address endpoint) {
return seastar::async([this, endpoint] {
_subscribers.for_each([endpoint] (endpoint_lifecycle_subscriber* subscriber) {
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -106,18 +106,6 @@ namespace service {
class storage_service;
class migration_manager;

-extern distributed<storage_service> _the_storage_service;
-// DEPRECATED, DON'T USE!
-// Pass references to services through constructor/function parameters. Don't use globals.
-inline distributed<storage_service>& get_storage_service() {
- return _the_storage_service;
-}
-// DEPRECATED, DON'T USE!
-// Pass references to services through constructor/function parameters. Don't use globals.
-inline storage_service& get_local_storage_service() {
- return _the_storage_service.local();
-}
-
enum class disk_error { regular, commit };

struct bind_messaging_port_tag {};
@@ -255,7 +243,7 @@ private:
future<> keyspace_changed(const sstring& ks_name);
void register_metrics();
future<> snitch_reconfigured();
- static future<> update_topology(inet_address endpoint);
+ future<> update_topology(inet_address endpoint);
future<> publish_schema_version();
void install_schema_version_change_listener();

@@ -879,7 +867,7 @@ public:

template <typename Func>
auto run_with_api_lock(sstring operation, Func&& func) {
- return get_storage_service().invoke_on(0, [operation = std::move(operation),
+ return container().invoke_on(0, [operation = std::move(operation),
func = std::forward<Func>(func)] (storage_service& ss) mutable {
if (!ss._operation_in_progress.empty()) {
throw std::runtime_error(format("Operation {} is in progress, try again", ss._operation_in_progress));
@@ -893,7 +881,7 @@ public:

template <typename Func>
auto run_with_no_api_lock(Func&& func) {
- return get_storage_service().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable {
+ return container().invoke_on(0, [func = std::forward<Func>(func)] (storage_service& ss) mutable {
return func(ss);
});
}
@@ -911,20 +899,4 @@ public:
bool is_repair_based_node_ops_enabled();
};

-future<> init_storage_service(sharded<abort_source>& abort_sources,
- distributed<database>& db,
- sharded<gms::gossiper>& gossiper,
- sharded<db::system_distributed_keyspace>& sys_dist_ks,
- sharded<db::view::view_update_generator>& view_update_generator,
- sharded<gms::feature_service>& feature_service,
- storage_service_config config,
- sharded<service::migration_manager>& mm,
- sharded<locator::shared_token_metadata>& stm,
- sharded<netw::messaging_service>& ms,
- sharded<cdc::generation_service>&,
- sharded<repair_service>& repair,
- sharded<raft_group_registry>& raft_gr,
- sharded<endpoint_lifecycle_notifier>& elc_notif);
-future<> deinit_storage_service();
-
}
diff --git a/test/boost/gossip_test.cc b/test/boost/gossip_test.cc
--- a/test/boost/gossip_test.cc
+++ b/test/boost/gossip_test.cc
@@ -98,7 +98,8 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
elc_notif.start().get();
auto stop_elc_notif = defer([&elc_notif] { elc_notif.stop().get(); });

- service::get_storage_service().start(std::ref(abort_sources),
+ sharded<service::storage_service> ss;
+ ss.start(std::ref(abort_sources),
std::ref(db), std::ref(gms::get_gossiper()),
std::ref(sys_dist_ks),
std::ref(view_update_generator),
@@ -108,7 +109,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
std::ref(cdc_generation_service), std::ref(repair),
std::ref(raft_gr), std::ref(elc_notif),
true).get();
- auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });
+ auto stop_ss = defer([&] { ss.stop().get(); });

sharded<semaphore> sst_dir_semaphore;
sst_dir_semaphore.start(cfg->initial_sstable_loading_concurrency()).get();
diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc
--- a/test/lib/cql_test_env.cc
+++ b/test/lib/cql_test_env.cc
@@ -542,7 +542,7 @@ class single_node_cql_env : public cql_test_env {
raft_gr.start(std::ref(ms), std::ref(gms::get_gossiper()), std::ref(qp)).get();
auto stop_raft = defer([&raft_gr] { raft_gr.stop().get(); });

- auto& ss = service::get_storage_service();
+ sharded<service::storage_service> ss;
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
ss.start(std::ref(abort_sources), std::ref(db),
@@ -684,8 +684,8 @@ class single_node_cql_env : public cql_test_env {
cdc.stop().get();
});

- service::get_local_storage_service().init_server(service::bind_messaging_port(false)).get();
- service::get_local_storage_service().join_cluster().get();
+ ss.local().init_server(service::bind_messaging_port(false)).get();
+ ss.local().join_cluster().get();

auth::permissions_cache_config perm_cache_config;
perm_cache_config.max_entries = cfg->permissions_cache_max_entries();
diff --git a/thrift/controller.cc b/thrift/controller.cc
--- a/thrift/controller.cc
+++ b/thrift/controller.cc
@@ -27,13 +27,14 @@

static logging::logger clogger("thrift_controller");

-thrift_controller::thrift_controller(distributed<database>& db, sharded<auth::service>& auth, sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml)
+thrift_controller::thrift_controller(distributed<database>& db, sharded<auth::service>& auth, sharded<cql3::query_processor>& qp, sharded<service::memory_limiter>& ml, sharded<service::storage_service>& ss)
: _ops_sem(1)
, _db(db)
, _auth_service(auth)
, _qp(qp)
- , _mem_limiter(ml) {
-}
+ , _mem_limiter(ml)
+ , _ss(ss)
+{ }

future<> thrift_controller::start_server() {
return smp::submit_to(0, [this] {
@@ -63,7 +64,7 @@ future<> thrift_controller::do_start_server() {
tsc.timeout_config = make_timeout_config(cfg);
tsc.max_request_size = cfg.thrift_max_message_length_in_mb() * (uint64_t(1) << 20);
return gms::inet_address::lookup(addr, family, preferred).then([this, tserver, addr, port, keepalive, tsc] (gms::inet_address ip) {
- return tserver->start(std::ref(_db), std::ref(_qp), std::ref(_auth_service), std::ref(_mem_limiter), tsc).then([tserver, port, addr, ip, keepalive] {
+ return tserver->start(std::ref(_db), std::ref(_qp), std::ref(_ss), std::ref(_auth_service), std::ref(_mem_limiter), tsc).then([tserver, port, addr, ip, keepalive] {
// #293 - do not stop anything
//engine().at_exit([tserver] {
// return tserver->stop();
diff --git a/thrift/controller.hh b/thrift/controller.hh
--- a/thrift/controller.hh
+++ b/thrift/controller.hh
@@ -32,6 +32,7 @@ class thrift_server;
class database;
namespace auth { class service; }
namespace cql3 { class query_processor; }
+namespace service { class storage_service; }

class thrift_controller {
std::unique_ptr<distributed<thrift_server>> _server;
@@ -42,12 +43,13 @@ class thrift_controller {
sharded<auth::service>& _auth_service;
sharded<cql3::query_processor>& _qp;
sharded<service::memory_limiter>& _mem_limiter;
+ sharded<service::storage_service>& _ss;

future<> do_start_server();
future<> do_stop_server();

public:
- thrift_controller(distributed<database>&, sharded<auth::service>&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&);
+ thrift_controller(distributed<database>&, sharded<auth::service>&, sharded<cql3::query_processor>&, sharded<service::memory_limiter>&, sharded<service::storage_service>& ss);
future<> start_server();
future<> stop_server();
future<> stop();
diff --git a/thrift/handler.cc b/thrift/handler.cc
--- a/thrift/handler.cc
+++ b/thrift/handler.cc
@@ -203,6 +203,7 @@ enum class query_order { no, yes };
class thrift_handler : public CassandraCobSvIf {
distributed<database>& _db;
distributed<cql3::query_processor>& _query_processor;
+ sharded<service::storage_service>& _ss;
::timeout_config _timeout_config;
service::client_state _client_state;
service::query_state _query_state;
@@ -220,9 +221,10 @@ class thrift_handler : public CassandraCobSvIf {
});
}
public:
- explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit)
+ explicit thrift_handler(distributed<database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, auth::service& auth_service, ::timeout_config timeout_config, service_permit& current_permit)
: _db(db)
, _query_processor(qp)
+ , _ss(ss)
, _timeout_config(timeout_config)
, _client_state(service::client_state::external_tag{}, auth_service, nullptr, _timeout_config, socket_address(), true)
// FIXME: Handlers are not created per query, but rather per connection, so it makes little sense to store
@@ -717,8 +719,8 @@ class thrift_handler : public CassandraCobSvIf {

void describe_schema_versions(thrift_fn::function<void(std::map<std::string, std::vector<std::string> > const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
service_permit permit = obtain_permit();
- with_cob(std::move(cob), std::move(exn_cob), [] {
- return service::get_local_storage_service().describe_schema_versions().then([](auto&& m) {
+ with_cob(std::move(cob), std::move(exn_cob), [this] {
+ return _ss.local().describe_schema_versions().then([](auto&& m) {
std::map<std::string, std::vector<std::string>> ret;
for (auto&& p : m) {
ret[p.first] = std::vector<std::string>(p.second.begin(), p.second.end());
@@ -758,7 +760,7 @@ class thrift_handler : public CassandraCobSvIf {
throw make_exception<InvalidRequestException>("There is no ring for the keyspace: %s", keyspace);
}

- auto ring = service::get_local_storage_service().describe_ring(keyspace, local);
+ auto ring = _ss.local().describe_ring(keyspace, local);
std::vector<TokenRange> ret;
ret.reserve(ring.size());
std::transform(ring.begin(), ring.end(), std::back_inserter(ret), [](auto&& tr) {
@@ -792,8 +794,8 @@ class thrift_handler : public CassandraCobSvIf {

void describe_token_map(thrift_fn::function<void(std::map<std::string, std::string> const& _return)> cob, thrift_fn::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob) {
service_permit permit = obtain_permit();
- with_cob(std::move(cob), std::move(exn_cob), [] {
- auto m = service::get_local_storage_service().get_token_to_endpoint_map();
+ with_cob(std::move(cob), std::move(exn_cob), [this] {
+ auto m = _ss.local().get_token_to_endpoint_map();
std::map<std::string, std::string> ret;
for (auto&& p : m) {
ret[format("{}", p.first)] = p.second.to_sstring();
@@ -849,7 +851,7 @@ class thrift_handler : public CassandraCobSvIf {
auto tend = end_token.empty() ? dht::maximum_token() : dht::token::from_sstring(sstring(end_token));
range<dht::token> r({{ std::move(tstart), false }}, {{ std::move(tend), true }});
auto cf = sstring(cfName);
- auto splits = service::get_local_storage_service().get_splits(current_keyspace(), cf, std::move(r), keys_per_split);
+ auto splits = _ss.local().get_splits(current_keyspace(), cf, std::move(r), keys_per_split);

std::vector<CfSplit> res;
for (auto&& s : splits) {
@@ -2018,27 +2020,30 @@ class thrift_handler : public CassandraCobSvIf {
class handler_factory : public CassandraCobSvIfFactory {
distributed<database>& _db;
distributed<cql3::query_processor>& _query_processor;
+ sharded<service::storage_service>& _ss;
auth::service& _auth_service;
timeout_config _timeout_config;
service_permit& _current_permit;
public:
explicit handler_factory(distributed<database>& db,
distributed<cql3::query_processor>& qp,
+ sharded<service::storage_service>& ss,
auth::service& auth_service,
::timeout_config timeout_config,
service_permit& current_permit)
- : _db(db), _query_processor(qp), _auth_service(auth_service), _timeout_config(timeout_config), _current_permit(current_permit) {}
+ : _db(db), _query_processor(qp), _ss(ss), _auth_service(auth_service), _timeout_config(timeout_config), _current_permit(current_permit) {}
typedef CassandraCobSvIf Handler;
virtual CassandraCobSvIf* getHandler(const ::apache::thrift::TConnectionInfo& connInfo) {
- return new thrift_handler(_db, _query_processor, _auth_service, _timeout_config, _current_permit);
+ return new thrift_handler(_db, _query_processor, _ss, _auth_service, _timeout_config, _current_permit);
}
virtual void releaseHandler(CassandraCobSvIf* handler) {
delete handler;
}
};

std::unique_ptr<CassandraCobSvIfFactory>
-create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service& auth_service,
+create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp,
+ sharded<service::storage_service>& ss, auth::service& auth_service,
::timeout_config timeout_config, service_permit& current_permit) {
- return std::make_unique<handler_factory>(db, qp, auth_service, timeout_config, current_permit);
+ return std::make_unique<handler_factory>(db, qp, ss, auth_service, timeout_config, current_permit);
}
diff --git a/thrift/handler.hh b/thrift/handler.hh
--- a/thrift/handler.hh
+++ b/thrift/handler.hh
@@ -31,7 +31,8 @@

struct timeout_config;
class service_permit;
+namespace service { class storage_service; }

-std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, timeout_config, service_permit& current_permit);
+std::unique_ptr<::cassandra::CassandraCobSvIfFactory> create_handler_factory(distributed<database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, auth::service&, timeout_config, service_permit& current_permit);

#endif /* APPS_SEASTAR_THRIFT_HANDLER_HH_ */
diff --git a/thrift/server.cc b/thrift/server.cc
--- a/thrift/server.cc
+++ b/thrift/server.cc
@@ -68,11 +68,12 @@ class thrift_stats {

thrift_server::thrift_server(distributed<database>& db,
distributed<cql3::query_processor>& qp,
+ sharded<service::storage_service>& ss,
auth::service& auth_service,
service::memory_limiter& ml,
thrift_server_config config)
: _stats(new thrift_stats(*this))
- , _handler_factory(create_handler_factory(db, qp, auth_service, config.timeout_config, _current_permit).release())
+ , _handler_factory(create_handler_factory(db, qp, ss, auth_service, config.timeout_config, _current_permit).release())
, _protocol_factory(new TBinaryProtocolFactoryT<TMemoryBuffer>())
, _processor_factory(new CassandraAsyncProcessorFactory(_handler_factory))
, _memory_available(ml.get_semaphore())
diff --git a/thrift/server.hh b/thrift/server.hh
--- a/thrift/server.hh
+++ b/thrift/server.hh
@@ -76,6 +76,8 @@ namespace auth {
class service;
}

+namespace service { class storage_service; }
+
struct thrift_server_config {
::timeout_config timeout_config;
uint64_t max_request_size;
@@ -127,7 +129,7 @@ private:
boost::intrusive::list<connection> _connections_list;
seastar::gate _stop_gate;
public:
- thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp, auth::service&, service::memory_limiter& ml, thrift_server_config config);
+ thrift_server(distributed<database>& db, distributed<cql3::query_processor>& qp, sharded<service::storage_service>& ss, auth::service&, service::memory_limiter& ml, thrift_server_config config);
~thrift_server();
future<> listen(socket_address addr, bool keepalive);
future<> stop();

Commit Bot

unread,
Jul 29, 2021, 1:20:01 PMJul 29
to scylla...@googlegroups.com, Avi Kivity
From: Avi Kivity <a...@scylladb.com>
Committer: Avi Kivity <a...@scylladb.com>
Branch: master
Reply all
Reply to author
Forward
0 new messages