The _listeners list on migration_manager class and the corresponding
notify_xxx helpers have nothing to do with the its instances, they
are just transport for notification delivery.
At the same time some services need the migration manager to be alive
at their stop time to unregister from it, while the manager itself
may need them for its needs.
The proposal is to move the migration notifier into a complete separate
sharded "service". This service doesn't need anything, so it's started
first and stopped last.
While it's not effectively a "migration" notifier, we inherited the name
from Cassandra and renaming it will "scramble neurons in the old-timers'
brains but will make it easier for newcomers" as Avi says.
service/migration_listener.hh | 33 +++++++++++++++++++++++++++++++++
service/migration_manager.hh | 31 ++++++++++++-------------------
database.cc | 2 +-
db/schema_tables.cc | 22 +++++++++++-----------
main.cc | 11 +++++++++--
service/migration_manager.cc | 31 +++++++++++++++----------------
tests/cql_test_env.cc | 6 +++++-
7 files changed, 86 insertions(+), 50 deletions(-)
diff --git a/service/migration_listener.hh b/service/migration_listener.hh
index 945a0b937..fba35bf60 100644
--- a/service/migration_listener.hh
+++ b/service/migration_listener.hh
@@ -42,6 +42,14 @@
#pragma once
#include <seastar/core/sstring.hh>
+#include <seastar/core/shared_ptr.hh>
+
+class keyspace_metadata;
+class view_ptr;
+class user_type_impl;
+using user_type = seastar::shared_ptr<const user_type_impl>;
+class schema;
+using schema_ptr = seastar::lw_shared_ptr<const schema>;
namespace service {
@@ -95,4 +103,29 @@ class migration_listener::only_view_notifications : public migration_listener {
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) { }
};
+class migration_notifier {
+private:
+ std::vector<migration_listener*> _listeners;
+
+public:
+ /// Register a migration listener on current shard.
+ void register_listener(migration_listener* listener);
+
+ /// Unregister a migration listener on current shard.
+ void unregister_listener(migration_listener* listener);
+
+ future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
+ future<> create_column_family(const schema_ptr& cfm);
+ future<> create_user_type(const user_type& type);
+ future<> create_view(const view_ptr& view);
+ future<> update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
+ future<> update_column_family(const schema_ptr& cfm, bool columns_changed);
+ future<> update_user_type(const user_type& type);
+ future<> update_view(const view_ptr& view, bool columns_changed);
+ future<> drop_keyspace(const sstring& ks_name);
+ future<> drop_column_family(const schema_ptr& cfm);
+ future<> drop_user_type(const user_type& type);
+ future<> drop_view(const view_ptr& view);
+};
+
}
diff --git a/service/migration_manager.hh b/service/migration_manager.hh
index 1fc68c3d6..ad6c4a60c 100644
--- a/service/migration_manager.hh
+++ b/service/migration_manager.hh
@@ -55,19 +55,25 @@
namespace service {
class migration_manager : public seastar::async_sharded_service<migration_manager> {
- std::vector<migration_listener*> _listeners;
+private:
+ migration_notifier& _notify; /* XXX -- temporary */
+
std::unordered_map<netw::msg_addr, serialized_action, netw::msg_addr::hash> _schema_pulls;
std::vector<gms::feature::listener_registration> _feature_listeners;
seastar::gate _background_tasks;
static const std::chrono::milliseconds migration_delay;
public:
- migration_manager();
+ explicit migration_manager(migration_notifier&);
- /// Register a migration listener on current shard.
- void register_listener(migration_listener* listener);
+ inline void register_listener(migration_listener* listener) {
+ _notify.register_listener(listener);
+ }
- /// Unregister a migration listener on current shard.
- void unregister_listener(migration_listener* listener);
+ inline void unregister_listener(migration_listener* listener) {
+ _notify.unregister_listener(listener);
+ }
+
+ inline migration_notifier& notify() { return _notify; }
future<> schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state);
@@ -90,19 +96,6 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
// Deprecated. The canonical mutation should be used instead.
future<> merge_schema_from(netw::msg_addr src, const std::vector<frozen_mutation>& mutations);
- future<> notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
- future<> notify_create_column_family(const schema_ptr& cfm);
- future<> notify_create_user_type(const user_type& type);
- future<> notify_create_view(const view_ptr& view);
- future<> notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
- future<> notify_update_column_family(const schema_ptr& cfm, bool columns_changed);
- future<> notify_update_user_type(const user_type& type);
- future<> notify_update_view(const view_ptr& view, bool columns_changed);
- future<> notify_drop_keyspace(const sstring& ks_name);
- future<> notify_drop_column_family(const schema_ptr& cfm);
- future<> notify_drop_user_type(const user_type& type);
- future<> notify_drop_view(const view_ptr& view);
-
bool should_pull_schema_from(const gms::inet_address& endpoint);
bool has_compatible_schema_tables_version(const gms::inet_address& endpoint);
diff --git a/database.cc b/database.cc
index 9909e3b68..d5c1f58c9 100644
--- a/database.cc
+++ b/database.cc
@@ -689,7 +689,7 @@ future<> database::update_keyspace(const sstring& name) {
auto new_ksm = ::make_lw_shared<keyspace_metadata>(tmp_ksm->name(), tmp_ksm->strategy_name(), tmp_ksm->strategy_options(), tmp_ksm->durable_writes(),
boost::copy_range<std::vector<schema_ptr>>(ks.metadata()->cf_meta_data() | boost::adaptors::map_values), ks.metadata()->user_types());
ks.update_from(std::move(new_ksm));
- return service::get_local_migration_manager().notify_update_keyspace(ks.metadata());
+ return service::get_local_migration_manager().notify().update_keyspace(ks.metadata());
});
}
diff --git a/db/schema_tables.cc b/db/schema_tables.cc
index 7e6ec65c7..6f3fd527d 100644
--- a/db/schema_tables.cc
+++ b/db/schema_tables.cc
@@ -912,7 +912,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
return do_for_each(keyspaces_to_drop, [&db] (auto keyspace_to_drop) {
db.drop_keyspace(keyspace_to_drop);
- return service::get_local_migration_manager().notify_drop_keyspace(keyspace_to_drop);
+ return service::get_local_migration_manager().notify().drop_keyspace(keyspace_to_drop);
});
}).get0();
});
@@ -955,7 +955,7 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
return do_for_each(created, [&db](auto&& val) {
auto ksm = create_keyspace_from_schema_partition(val);
return db.create_keyspace(ksm).then([ksm] {
- return service::get_local_migration_manager().notify_create_keyspace(ksm);
+ return service::get_local_migration_manager().notify().create_keyspace(ksm);
});
}).then([&altered, &db]() {
return do_for_each(altered, [&db](auto& name) {
@@ -1055,14 +1055,14 @@ static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
when_all(notifications.begin(), notifications.end()).get();
};
// View drops are notified first, because a table can only be dropped if its views are already deleted
- notify(views_diff.dropped, [&] (auto&& dt) { return mm.notify_drop_view(view_ptr(dt.schema)); });
- notify(tables_diff.dropped, [&] (auto&& dt) { return mm.notify_drop_column_family(dt.schema); });
+ notify(views_diff.dropped, [&] (auto&& dt) { return mm.notify().drop_view(view_ptr(dt.schema)); });
+ notify(tables_diff.dropped, [&] (auto&& dt) { return mm.notify().drop_column_family(dt.schema); });
// Table creations are notified first, in case a view is created right after the table
- notify(tables_diff.created, [&] (auto&& gs) { return mm.notify_create_column_family(gs); });
- notify(views_diff.created, [&] (auto&& gs) { return mm.notify_create_view(view_ptr(gs)); });
+ notify(tables_diff.created, [&] (auto&& gs) { return mm.notify().create_column_family(gs); });
+ notify(views_diff.created, [&] (auto&& gs) { return mm.notify().create_view(view_ptr(gs)); });
// Table altering is notified first, in case new base columns appear
- notify(tables_diff.altered, [&] (auto&& gs) { return mm.notify_update_column_family(gs, *it++); });
- notify(views_diff.altered, [&] (auto&& gs) { return mm.notify_update_view(view_ptr(gs), *it++); });
+ notify(tables_diff.altered, [&] (auto&& gs) { return mm.notify().update_column_family(gs, *it++); });
+ notify(views_diff.altered, [&] (auto&& gs) { return mm.notify().update_view(view_ptr(gs), *it++); });
});
}).get();
}
@@ -1204,11 +1204,11 @@ static std::vector<user_type> create_types(database& db, const std::vector<const
return seastar::async([&] {
for (auto&& user_type : create_types(db, diff.created)) {
db.find_keyspace(user_type->_keyspace).add_user_type(user_type);
- service::get_local_migration_manager().notify_create_user_type(user_type).get();
+ service::get_local_migration_manager().notify().create_user_type(user_type).get();
}
for (auto&& user_type : create_types(db, diff.altered)) {
db.find_keyspace(user_type->_keyspace).add_user_type(user_type);
- service::get_local_migration_manager().notify_update_user_type(user_type).get();
+ service::get_local_migration_manager().notify().update_user_type(user_type).get();
}
});
}).get();
@@ -1218,7 +1218,7 @@ static std::vector<user_type> create_types(database& db, const std::vector<const
return do_with(create_types(db, rows), [&db] (auto &dropped) {
return do_for_each(dropped, [&db](auto& user_type) {
db.find_keyspace(user_type->_keyspace).remove_user_type(user_type);
- return service::get_local_migration_manager().notify_drop_user_type(user_type);
+ return service::get_local_migration_manager().notify().drop_user_type(user_type);
});
});
}).get();
diff --git a/main.cc b/main.cc
index bda3c5802..78cf1cda7 100644
--- a/main.cc
+++ b/main.cc
@@ -496,6 +496,7 @@ int main(int ac, char** av) {
print_starting_message(ac, av, parsed_opts);
+ sharded<service::migration_notifier> mm_notifier;
distributed<database> db;
seastar::sharded<service::cache_hitrate_calculator> cf_cache_hitrate_calculator;
debug::db = &db;
@@ -532,7 +533,7 @@ int main(int ac, char** av) {
tcp_syncookies_sanity();
- return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs,
+ return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &mm_notifier, &ctx, &opts, &dirs,
&prometheus_server, &cf_cache_hitrate_calculator, &feature_service] {
try {
::stop_signal stop_signal; // we can move this earlier to support SIGINT during initialization
@@ -682,6 +683,12 @@ int main(int ac, char** av) {
});
set_abort_on_internal_error(cfg->abort_on_internal_error());
+ supervisor::notify("starting mm_notifier");
+ mm_notifier.start().get();
+ auto stop_mm_notifier = defer_verbose_shutdown("mm_notifier", [ &mm_notifier ] {
+ mm_notifier.stop().get();
+ });
+
supervisor::notify("creating tracing");
tracing::backend_registry tracing_backend_registry;
tracing::register_tracing_keyspace_backend(tracing_backend_registry);
@@ -859,7 +866,7 @@ int main(int ac, char** av) {
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
supervisor::notify("starting migration manager");
- mm.start().get();
+ mm.start(std::ref(mm_notifier)).get();
auto stop_migration_manager = defer_verbose_shutdown("migration manager", [&mm] {
mm.stop().get();
});
diff --git a/service/migration_manager.cc b/service/migration_manager.cc
index 09401cffd..ce46f6494 100644
--- a/service/migration_manager.cc
+++ b/service/migration_manager.cc
@@ -63,8 +63,7 @@ using namespace std::chrono_literals;
const std::chrono::milliseconds migration_manager::migration_delay = 60000ms;
-migration_manager::migration_manager()
- : _listeners{}
+migration_manager::migration_manager(migration_notifier& notifier) : _notify(notifier)
{
}
@@ -159,12 +158,12 @@ future<> migration_manager::uninit_messaging_service()
);
}
-void migration_manager::register_listener(migration_listener* listener)
+void migration_notifier::register_listener(migration_listener* listener)
{
_listeners.emplace_back(listener);
}
-void migration_manager::unregister_listener(migration_listener* listener)
+void migration_notifier::unregister_listener(migration_listener* listener)
{
_listeners.erase(std::remove(_listeners.begin(), _listeners.end(), listener), _listeners.end());
}
@@ -352,7 +351,7 @@ bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoin
&& !gms::get_local_gossiper().is_gossip_only_member(endpoint);
}
-future<> migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
+future<> migration_notifier::create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
return seastar::async([this, ksm] {
auto&& name = ksm->name();
for (auto&& listener : _listeners) {
@@ -365,7 +364,7 @@ future<> migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_
});
}
-future<> migration_manager::notify_create_column_family(const schema_ptr& cfm) {
+future<> migration_notifier::create_column_family(const schema_ptr& cfm) {
return seastar::async([this, cfm] {
auto&& ks_name = cfm->ks_name();
auto&& cf_name = cfm->cf_name();
@@ -379,7 +378,7 @@ future<> migration_manager::notify_create_column_family(const schema_ptr& cfm) {
});
}
-future<> migration_manager::notify_create_user_type(const user_type& type) {
+future<> migration_notifier::create_user_type(const user_type& type) {
return seastar::async([this, type] {
auto&& ks_name = type->_keyspace;
auto&& type_name = type->get_name_as_string();
@@ -393,7 +392,7 @@ future<> migration_manager::notify_create_user_type(const user_type& type) {
});
}
-future<> migration_manager::notify_create_view(const view_ptr& view) {
+future<> migration_notifier::create_view(const view_ptr& view) {
return seastar::async([this, view] {
auto&& ks_name = view->ks_name();
auto&& view_name = view->cf_name();
@@ -421,7 +420,7 @@ public void notifyCreateAggregate(UDAggregate udf)
}
#endif
-future<> migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
+future<> migration_notifier::update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
return seastar::async([this, ksm] {
auto&& name = ksm->name();
for (auto&& listener : _listeners) {
@@ -434,7 +433,7 @@ future<> migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_
});
}
-future<> migration_manager::notify_update_column_family(const schema_ptr& cfm, bool columns_changed) {
+future<> migration_notifier::update_column_family(const schema_ptr& cfm, bool columns_changed) {
return seastar::async([this, cfm, columns_changed] {
auto&& ks_name = cfm->ks_name();
auto&& cf_name = cfm->cf_name();
@@ -448,7 +447,7 @@ future<> migration_manager::notify_update_column_family(const schema_ptr& cfm, b
});
}
-future<> migration_manager::notify_update_user_type(const user_type& type) {
+future<> migration_notifier::update_user_type(const user_type& type) {
return seastar::async([this, type] {
auto&& ks_name = type->_keyspace;
auto&& type_name = type->get_name_as_string();
@@ -462,7 +461,7 @@ future<> migration_manager::notify_update_user_type(const user_type& type) {
});
}
-future<> migration_manager::notify_update_view(const view_ptr& view, bool columns_changed) {
+future<> migration_notifier::update_view(const view_ptr& view, bool columns_changed) {
return seastar::async([this, view, columns_changed] {
auto&& ks_name = view->ks_name();
auto&& view_name = view->cf_name();
@@ -490,7 +489,7 @@ public void notifyUpdateAggregate(UDAggregate udf)
}
#endif
-future<> migration_manager::notify_drop_keyspace(const sstring& ks_name) {
+future<> migration_notifier::drop_keyspace(const sstring& ks_name) {
return seastar::async([this, ks_name] {
for (auto&& listener : _listeners) {
try {
@@ -502,7 +501,7 @@ future<> migration_manager::notify_drop_keyspace(const sstring& ks_name) {
});
}
-future<> migration_manager::notify_drop_column_family(const schema_ptr& cfm) {
+future<> migration_notifier::drop_column_family(const schema_ptr& cfm) {
return seastar::async([this, cfm] {
auto&& cf_name = cfm->cf_name();
auto&& ks_name = cfm->ks_name();
@@ -516,7 +515,7 @@ future<> migration_manager::notify_drop_column_family(const schema_ptr& cfm) {
});
}
-future<> migration_manager::notify_drop_user_type(const user_type& type) {
+future<> migration_notifier::drop_user_type(const user_type& type) {
return seastar::async([this, type] {
auto&& ks_name = type->_keyspace;
auto&& type_name = type->get_name_as_string();
@@ -530,7 +529,7 @@ future<> migration_manager::notify_drop_user_type(const user_type& type) {
});
}
-future<> migration_manager::notify_drop_view(const view_ptr& view) {
+future<> migration_notifier::drop_view(const view_ptr& view) {
return seastar::async([this, view] {
auto&& ks_name = view->ks_name();
auto&& view_name = view->cf_name();
diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc
index 826a22d73..4d8255f0e 100644
--- a/tests/cql_test_env.cc
+++ b/tests/cql_test_env.cc
@@ -375,6 +375,10 @@ class single_node_cql_env : public cql_test_env {
create_directories((cfg->view_hints_directory() + "/" + std::to_string(i)).c_str());
}
+ sharded<service::migration_notifier> mm_notif;
+ mm_notif.start().get();
+ auto stop_mm_notify = defer([&mm_notif] { mm_notif.stop().get(); });
+
set_abort_on_internal_error(true);
const gms::inet_address listen("127.0.0.1");
auto& ms = netw::get_messaging_service();
@@ -433,7 +437,7 @@ class single_node_cql_env : public cql_test_env {
proxy.start(std::ref(*db), spcfg, std::ref(b)).get();
auto stop_proxy = defer([&proxy] { proxy.stop().get(); });
- mm.start().get();
+ mm.start(std::ref(mm_notif)).get();
auto stop_mm = defer([&mm] { mm.stop().get(); });
auto& qp = cql3::get_query_processor();
--
2.20.1