The reference in question is already there, handlers that need
storage service can capture it and use. These handlers are not
yet stopped, but neither is the storage service itself, so the
potentially dangling reference is not being set up here.
api/storage_proxy.cc | 4 +-
api/storage_service.cc | 129 ++++++++++++++++++++---------------------
2 files changed, 66 insertions(+), 67 deletions(-)
diff --git a/api/storage_proxy.cc b/api/storage_proxy.cc
index ec6fa46b07..28345ad40d 100644
--- a/api/storage_proxy.cc
+++ b/api/storage_proxy.cc
@@ -363,8 +363,8 @@ void set_storage_proxy(http_context& ctx, routes& r, sharded<service::storage_se
return sum_stats_storage_proxy(ctx.sp, &service::storage_proxy_stats::stats::read_repair_repaired_background);
});
- sp::get_schema_versions.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().describe_schema_versions().then([] (auto result) {
+ sp::get_schema_versions.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().describe_schema_versions().then([] (auto result) {
std::vector<sp::mapper_list> res;
for (auto e : result) {
sp::mapper_list entry;
diff --git a/api/storage_service.cc b/api/storage_service.cc
index 039286ea9f..52ebed73c1 100644
--- a/api/storage_service.cc
+++ b/api/storage_service.cc
@@ -134,12 +134,12 @@ seastar::future<json::json_return_type> run_toppartitions_query(db::toppartition
});
}
-future<json::json_return_type> set_tables_autocompaction(http_context& ctx, const sstring &keyspace, std::vector<sstring> tables, bool enabled) {
+future<json::json_return_type> set_tables_autocompaction(http_context& ctx, service::storage_service& ss, const sstring &keyspace, std::vector<sstring> tables, bool enabled) {
if (tables.empty()) {
tables = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
}
- return service::get_local_storage_service().set_tables_autocompaction(keyspace, tables, enabled).then([]{
+ return ss.set_tables_autocompaction(keyspace, tables, enabled).then([]{
return make_ready_future<json::json_return_type>(json_void());
});
}
@@ -318,8 +318,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return ctx.db.local().commitlog()->active_config().commit_log_location;
});
- ss::get_token_endpoint.set(r, [] (std::unique_ptr<request> req) {
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().get_token_to_endpoint_map(), [](const auto& i) {
+ ss::get_token_endpoint.set(r, [&ss] (std::unique_ptr<request> req) {
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().get_token_to_endpoint_map(), [](const auto& i) {
storage_service_json::mapper val;
val.key = boost::lexical_cast<std::string>(i.first);
val.value = boost::lexical_cast<std::string>(i.second);
@@ -395,15 +395,15 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return container_to_vec(addr);
});
- ss::get_release_version.set(r, [](const_req req) {
- return service::get_local_storage_service().get_release_version();
+ ss::get_release_version.set(r, [&ss](const_req req) {
+ return ss.local().get_release_version();
});
ss::get_scylla_release_version.set(r, [](const_req req) {
return scylla_version();
});
- ss::get_schema_version.set(r, [](const_req req) {
- return service::get_local_storage_service().get_schema_version();
+ ss::get_schema_version.set(r, [&ss](const_req req) {
+ return ss.local().get_schema_version();
});
ss::get_all_data_file_locations.set(r, [&ctx](const_req req) {
@@ -414,10 +414,10 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return ctx.db.local().get_config().saved_caches_directory();
});
- ss::get_range_to_endpoint_map.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::get_range_to_endpoint_map.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
std::vector<ss::maplist_mapper> res;
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().get_range_to_address_map(keyspace),
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().get_range_to_address_map(keyspace),
[](const std::pair<dht::token_range, inet_address_vector_replica_set>& entry){
ss::maplist_mapper m;
if (entry.first.start()) {
@@ -445,13 +445,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(res);
});
- ss::describe_any_ring.set(r, [&ctx](std::unique_ptr<request> req) {
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(""), token_range_endpoints_to_json));
+ ss::describe_any_ring.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().describe_ring(""), token_range_endpoints_to_json));
});
- ss::describe_ring.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::describe_ring.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
- return make_ready_future<json::json_return_type>(stream_range_as_array(service::get_local_storage_service().describe_ring(keyspace), token_range_endpoints_to_json));
+ return make_ready_future<json::json_return_type>(stream_range_as_array(ss.local().describe_ring(keyspace), token_range_endpoints_to_json));
});
ss::get_host_id_map.set(r, [&ctx](const_req req) {
@@ -483,14 +483,14 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});
- ss::get_natural_endpoints.set(r, [&ctx](const_req req) {
+ ss::get_natural_endpoints.set(r, [&ctx, &ss](const_req req) {
auto keyspace = validate_keyspace(ctx, req.param);
- return container_to_vec(service::get_local_storage_service().get_natural_endpoints(keyspace, req.get_query_param("cf"),
+ return container_to_vec(ss.local().get_natural_endpoints(keyspace, req.get_query_param("cf"),
req.get_query_param("key")));
});
- ss::cdc_streams_check_and_repair.set(r, [&ctx] (std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_cdc_generation_service().check_and_repair_cdc_streams().then([] {
+ ss::cdc_streams_check_and_repair.set(r, [&ctx, &ss] (std::unique_ptr<request> req) {
+ return ss.local().get_cdc_generation_service().check_and_repair_cdc_streams().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -514,13 +514,13 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});
- ss::force_keyspace_cleanup.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::force_keyspace_cleanup.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto column_families = split_cf(req->get_query_param("cf"));
if (column_families.empty()) {
column_families = map_keys(ctx.db.local().find_keyspace(keyspace).metadata().get()->cf_meta_data());
}
- return service::get_local_storage_service().is_cleanup_allowed(keyspace).then([&ctx, keyspace,
+ return ss.local().is_cleanup_allowed(keyspace).then([&ctx, keyspace,
column_families = std::move(column_families)] (bool is_cleanup_allowed) mutable {
if (!is_cleanup_allowed) {
return make_exception_future<json::json_return_type>(
@@ -584,20 +584,20 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
- ss::decommission.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().decommission().then([] {
+ ss::decommission.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().decommission().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
- ss::move.set(r, [] (std::unique_ptr<request> req) {
+ ss::move.set(r, [&ss] (std::unique_ptr<request> req) {
auto new_token = req->get_query_param("new_token");
- return service::get_local_storage_service().move(new_token).then([] {
+ return ss.local().move(new_token).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
- ss::remove_node.set(r, [](std::unique_ptr<request> req) {
+ ss::remove_node.set(r, [&ss](std::unique_ptr<request> req) {
auto host_id = req->get_query_param("host_id");
std::vector<sstring> ignore_nodes_strs= split(req->get_query_param("ignore_nodes"), ",");
auto ignore_nodes = std::list<gms::inet_address>();
@@ -614,19 +614,19 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
throw std::runtime_error(format("Failed to parse ignore_nodes parameter: ignore_nodes={}, node={}", ignore_nodes_strs, n));
}
}
- return service::get_local_storage_service().removenode(host_id, std::move(ignore_nodes)).then([] {
+ return ss.local().removenode(host_id, std::move(ignore_nodes)).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
- ss::get_removal_status.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_removal_status().then([] (auto status) {
+ ss::get_removal_status.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().get_removal_status().then([] (auto status) {
return make_ready_future<json::json_return_type>(status);
});
});
- ss::force_remove_completion.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().force_remove_completion().then([] {
+ ss::force_remove_completion.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().force_remove_completion().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -650,20 +650,20 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(res);
});
- ss::get_operation_mode.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_operation_mode().then([] (auto mode) {
+ ss::get_operation_mode.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().get_operation_mode().then([] (auto mode) {
return make_ready_future<json::json_return_type>(mode);
});
});
- ss::is_starting.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().is_starting().then([] (auto starting) {
+ ss::is_starting.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().is_starting().then([] (auto starting) {
return make_ready_future<json::json_return_type>(starting);
});
});
- ss::get_drain_progress.set(r, [](std::unique_ptr<request> req) {
- return service::get_storage_service().map_reduce(adder<service::storage_service::drain_progress>(), [] (auto& ss) {
+ ss::get_drain_progress.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.map_reduce(adder<service::storage_service::drain_progress>(), [] (auto& ss) {
return ss.get_drain_progress();
}).then([] (auto&& progress) {
auto progress_str = format("Drained {}/{} ColumnFamilies", progress.remaining_cfs, progress.total_cfs);
@@ -671,8 +671,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});
- ss::drain.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().drain().then([] {
+ ss::drain.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().drain().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -703,20 +703,20 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});
- ss::stop_gossiping.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().stop_gossiping().then([] {
+ ss::stop_gossiping.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().stop_gossiping().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
- ss::start_gossiping.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().start_gossiping().then([] {
+ ss::start_gossiping.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().start_gossiping().then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
- ss::is_gossip_running.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().is_gossip_running().then([] (bool running){
+ ss::is_gossip_running.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().is_gossip_running().then([] (bool running){
return make_ready_future<json::json_return_type>(running);
});
});
@@ -728,8 +728,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(json_void());
});
- ss::is_initialized.set(r, [](std::unique_ptr<request> req) {
- return service::get_local_storage_service().is_initialized().then([] (bool initialized) {
+ ss::is_initialized.set(r, [&ss](std::unique_ptr<request> req) {
+ return ss.local().is_initialized().then([] (bool initialized) {
return make_ready_future<json::json_return_type>(initialized);
});
});
@@ -738,8 +738,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(json_void());
});
- ss::is_joined.set(r, [] (std::unique_ptr<request> req) {
- return make_ready_future<json::json_return_type>(service::get_local_storage_service().is_joined());
+ ss::is_joined.set(r, [&ss] (std::unique_ptr<request> req) {
+ return make_ready_future<json::json_return_type>(ss.local().is_joined());
});
ss::set_stream_throughput_mb_per_sec.set(r, [](std::unique_ptr<request> req) {
@@ -804,9 +804,9 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});
- ss::rebuild.set(r, [](std::unique_ptr<request> req) {
+ ss::rebuild.set(r, [&ss](std::unique_ptr<request> req) {
auto source_dc = req->get_query_param("source_dc");
- return service::get_local_storage_service().rebuild(std::move(source_dc)).then([] {
+ return ss.local().rebuild(std::move(source_dc)).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
@@ -831,7 +831,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(json_void());
});
- ss::load_new_ss_tables.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::load_new_ss_tables.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto ks = validate_keyspace(ctx, req->param);
auto cf = req->get_query_param("cf");
auto stream = req->get_query_param("load_and_stream");
@@ -843,7 +843,7 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
// No need to add the keyspace, since all we want is to avoid always sending this to the same
// CPU. Even then I am being overzealous here. This is not something that happens all the time.
auto coordinator = std::hash<sstring>()(cf) % smp::count;
- return service::get_storage_service().invoke_on(coordinator,
+ return ss.invoke_on(coordinator,
[ks = std::move(ks), cf = std::move(cf),
load_and_stream, primary_replica_only] (service::storage_service& s) {
return s.load_new_sstables(ks, cf, load_and_stream, primary_replica_only);
@@ -933,18 +933,18 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
}
});
- ss::enable_auto_compaction.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::enable_auto_compaction.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto tables = split_cf(req->get_query_param("cf"));
- return set_tables_autocompaction(ctx, keyspace, tables, true);
+ return set_tables_autocompaction(ctx, ss.local(), keyspace, tables, true);
});
- ss::disable_auto_compaction.set(r, [&ctx](std::unique_ptr<request> req) {
+ ss::disable_auto_compaction.set(r, [&ctx, &ss](std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto tables = split_cf(req->get_query_param("cf"));
- return set_tables_autocompaction(ctx, keyspace, tables, false);
+ return set_tables_autocompaction(ctx, ss.local(), keyspace, tables, false);
});
ss::deliver_hints.set(r, [](std::unique_ptr<request> req) {
@@ -1012,8 +1012,8 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return get_cf_stats(ctx, &column_family_stats::live_disk_space_used);
});
- ss::get_exceptions.set(r, [](const_req req) {
- return service::get_local_storage_service().get_exception_count();
+ ss::get_exceptions.set(r, [&ss](const_req req) {
+ return ss.local().get_exception_count();
});
ss::get_total_hints_in_progress.set(r, [](std::unique_ptr<request> req) {
@@ -1028,25 +1028,25 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(0);
});
- ss::get_ownership.set(r, [] (std::unique_ptr<request> req) {
- return service::get_local_storage_service().get_ownership().then([] (auto&& ownership) {
+ ss::get_ownership.set(r, [&ss] (std::unique_ptr<request> req) {
+ return ss.local().get_ownership().then([] (auto&& ownership) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
});
});
- ss::get_effective_ownership.set(r, [&ctx] (std::unique_ptr<request> req) {
+ ss::get_effective_ownership.set(r, [&ctx, &ss] (std::unique_ptr<request> req) {
auto keyspace_name = req->param["keyspace"] == "null" ? "" : validate_keyspace(ctx, req->param);
- return service::get_local_storage_service().effective_ownership(keyspace_name).then([] (auto&& ownership) {
+ return ss.local().effective_ownership(keyspace_name).then([] (auto&& ownership) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(ownership, res));
});
});
- ss::view_build_statuses.set(r, [&ctx] (std::unique_ptr<request> req) {
+ ss::view_build_statuses.set(r, [&ctx, &ss] (std::unique_ptr<request> req) {
auto keyspace = validate_keyspace(ctx, req->param);
auto view = req->param["view"];
- return service::get_local_storage_service().view_build_statuses(std::move(keyspace), std::move(view)).then([] (std::unordered_map<sstring, sstring> status) {
+ return ss.local().view_build_statuses(std::move(keyspace), std::move(view)).then([] (std::unordered_map<sstring, sstring> status) {
std::vector<storage_service_json::mapper> res;
return make_ready_future<json::json_return_type>(map_to_key_value(std::move(status), res));
});
@@ -1177,7 +1177,6 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
});
});
});
-
}
void set_snapshot(http_context& ctx, routes& r, sharded<db::snapshot_ctl>& snap_ctl) {
--
2.20.1