[PATCH v2 00/15] Relax migration manager dependencies

4 views
Skip to first unread message

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:02:51 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
The set make dependencies between mm and other services cleaner,
in particular, after the set:

- the query processor no longer needs mm (mm doesn't need qp either)

- the database no longer needs mm, thus the mutual dependency
between these two is dropped, only mm -> db is left

- the mm -> ss dependency is relaxed, one more patchset will be
needed to remove it, thus dropping one more mutual dependency
between mm and ss, only the ss -> mm will be left

- the mm is stopped on drain, but several more services need
it on stop, thus causing use after free problems, in particular
there's a caught bug when view builder crashes when unregistering
from mm notifier on stop. Fixed.

Branch: https://github.com/xemul/scylla/commits/br-mm-deps-cleanup-1

v2:
- rebased on recent master
- fixed patches comments
- removed header-to-header inclusion in patch #2
- more carefully handling for temporary stuff (patch #2 again)

Pavel Emelyanov (15):
storage_service: Kill initialization helper from init.cc
migration_manager: Split notifier from main class
storage_service: Keep migration_notifier
database: Switch on mnotifier from migration_manager
view_builder: Use migration notifier
auth: Use migration_notifier
query_processor: Use migration_notifier
cql_server: Use migration_notifier in events_notifier
storage_service: Use migration_notifier (and stop worrying)
tests: Switch on migration notifier
migration_manager: Remove dependency on migration_notifier
database: Do not request migration_manager instance for
passive_announce
database: Explicitly pass migration_manager through
init_non_system_keyspace
migration_manager: Get database through storage_proxy
migration_manager: Use in-place value factory

auth/service.hh | 10 +++++----
cql3/query_processor.hh | 5 +++--
database.hh | 8 +++++--
db/schema_tables.hh | 2 +-
db/view/view_builder.hh | 6 +++---
distributed_loader.hh | 3 ++-
init.hh | 6 ------
service/migration_listener.hh | 33 +++++++++++++++++++++++++++++
service/migration_manager.hh | 23 ++-------------------
service/storage_service.hh | 6 ++++--
tests/cql_test_env.hh | 2 ++
transport/server.hh | 7 +++++--
auth/service.cc | 23 +++++++++++----------
cql3/query_processor.cc | 7 ++++---
database.cc | 21 ++++++++++---------
db/schema_tables.cc | 29 +++++++++++++-------------
db/view/view.cc | 14 ++++++-------
distributed_loader.cc | 9 ++++----
init.cc | 15 --------------
main.cc | 23 ++++++++++++++-------
service/migration_manager.cc | 39 +++++++++++++++--------------------
service/storage_service.cc | 24 ++++++++++-----------
tests/cql_test_env.cc | 29 +++++++++++++++++++-------
tests/gossip.cc | 6 +++++-
tests/gossip_test.cc | 8 +++++--
tests/schema_change_test.cc | 6 +++---
tests/test_services.cc | 4 +++-
transport/event_notifier.cc | 8 +++----
transport/server.cc | 4 ++--
29 files changed, 209 insertions(+), 171 deletions(-)

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:02:53 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
The helper just makes further patching more complex, so drop it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
init.hh | 6 ------
init.cc | 15 ---------------
main.cc | 2 +-
3 files changed, 1 insertion(+), 22 deletions(-)

diff --git a/init.hh b/init.hh
index 287002d41..57c9d1f9a 100644
--- a/init.hh
+++ b/init.hh
@@ -47,12 +47,6 @@ extern logging::logger startlog;

class bad_configuration_error : public std::exception {};

-void init_storage_service(sharded<abort_source>& abort_sources,
- distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service,
- sharded<cql3::cql_config>& cql_config,
- sharded<db::system_distributed_keyspace>& sys_dist_ks,
- sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service);
-
struct init_scheduling_config {
scheduling_group streaming;
scheduling_group statement;
diff --git a/init.cc b/init.cc
index 52e3a3c02..0be36ac85 100644
--- a/init.cc
+++ b/init.cc
@@ -32,21 +32,6 @@

logging::logger startlog("init");

-//
-// NOTE: there functions are (temporarily)
-// duplicated in cql_test_env.cc
-// until proper shutdown is done.
-
-void init_storage_service(sharded<abort_source>& abort_source,
- distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service,
- sharded<cql3::cql_config>& cql_config,
- sharded<db::system_distributed_keyspace>& sys_dist_ks,
- sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service, service::storage_service_config config) {
- service::init_storage_service(abort_source, db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, config).get();
- // #293 - do not stop anything
- //engine().at_exit([] { return service::deinit_storage_service(); });
-}
-
void init_ms_fd_gossiper(sharded<gms::gossiper>& gossiper
, sharded<gms::feature_service>& features
, db::config& cfg
diff --git a/main.cc b/main.cc
index 063ccf08b..bda3c5802 100644
--- a/main.cc
+++ b/main.cc
@@ -722,7 +722,7 @@ int main(int ac, char** av) {
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
//FIXME: discarded future
- (void)init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg);
+ (void)service::init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg);
supervisor::notify("starting per-shard database core");

// Note: changed from using a move here, because we want the config object intact.
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:02:55 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
The _listeners list on migration_manager class and the corresponding
notify_xxx helpers have nothing to do with the its instances, they
are just transport for notification delivery.

At the same time some services need the migration manager to be alive
at their stop time to unregister from it, while the manager itself
may need them for its needs.

The proposal is to move the migration notifier into a complete separate
sharded "service". This service doesn't need anything, so it's started
first and stopped last.

While it's not effectively a "migration" notifier, we inherited the name
from Cassandra and renaming it will "scramble neurons in the old-timers'
brains but will make it easier for newcomers" as Avi says.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
service/migration_listener.hh | 33 +++++++++++++++++++++++++++++++++
service/migration_manager.hh | 31 ++++++++++++-------------------
database.cc | 2 +-
db/schema_tables.cc | 22 +++++++++++-----------
main.cc | 11 +++++++++--
service/migration_manager.cc | 31 +++++++++++++++----------------
tests/cql_test_env.cc | 6 +++++-
7 files changed, 86 insertions(+), 50 deletions(-)

diff --git a/service/migration_listener.hh b/service/migration_listener.hh
index 945a0b937..fba35bf60 100644
--- a/service/migration_listener.hh
+++ b/service/migration_listener.hh
@@ -42,6 +42,14 @@
#pragma once

#include <seastar/core/sstring.hh>
+#include <seastar/core/shared_ptr.hh>
+
+class keyspace_metadata;
+class view_ptr;
+class user_type_impl;
+using user_type = seastar::shared_ptr<const user_type_impl>;
+class schema;
+using schema_ptr = seastar::lw_shared_ptr<const schema>;

namespace service {

@@ -95,4 +103,29 @@ class migration_listener::only_view_notifications : public migration_listener {
virtual void on_drop_aggregate(const sstring& ks_name, const sstring& aggregate_name) { }
};

+class migration_notifier {
+private:
+ std::vector<migration_listener*> _listeners;
+
+public:
+ /// Register a migration listener on current shard.
+ void register_listener(migration_listener* listener);
+
+ /// Unregister a migration listener on current shard.
+ void unregister_listener(migration_listener* listener);
+
+ future<> create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
+ future<> create_column_family(const schema_ptr& cfm);
+ future<> create_user_type(const user_type& type);
+ future<> create_view(const view_ptr& view);
+ future<> update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
+ future<> update_column_family(const schema_ptr& cfm, bool columns_changed);
+ future<> update_user_type(const user_type& type);
+ future<> update_view(const view_ptr& view, bool columns_changed);
+ future<> drop_keyspace(const sstring& ks_name);
+ future<> drop_column_family(const schema_ptr& cfm);
+ future<> drop_user_type(const user_type& type);
+ future<> drop_view(const view_ptr& view);
+};
+
}
diff --git a/service/migration_manager.hh b/service/migration_manager.hh
index 1fc68c3d6..ad6c4a60c 100644
--- a/service/migration_manager.hh
+++ b/service/migration_manager.hh
@@ -55,19 +55,25 @@
namespace service {

class migration_manager : public seastar::async_sharded_service<migration_manager> {
- std::vector<migration_listener*> _listeners;
+private:
+ migration_notifier& _notify; /* XXX -- temporary */
+
std::unordered_map<netw::msg_addr, serialized_action, netw::msg_addr::hash> _schema_pulls;
std::vector<gms::feature::listener_registration> _feature_listeners;
seastar::gate _background_tasks;
static const std::chrono::milliseconds migration_delay;
public:
- migration_manager();
+ explicit migration_manager(migration_notifier&);

- /// Register a migration listener on current shard.
- void register_listener(migration_listener* listener);
+ inline void register_listener(migration_listener* listener) {
+ _notify.register_listener(listener);
+ }

- /// Unregister a migration listener on current shard.
- void unregister_listener(migration_listener* listener);
+ inline void unregister_listener(migration_listener* listener) {
+ _notify.unregister_listener(listener);
+ }
+
+ inline migration_notifier& notify() { return _notify; }

future<> schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state);

@@ -90,19 +96,6 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
// Deprecated. The canonical mutation should be used instead.
future<> merge_schema_from(netw::msg_addr src, const std::vector<frozen_mutation>& mutations);

- future<> notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
- future<> notify_create_column_family(const schema_ptr& cfm);
- future<> notify_create_user_type(const user_type& type);
- future<> notify_create_view(const view_ptr& view);
- future<> notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm);
- future<> notify_update_column_family(const schema_ptr& cfm, bool columns_changed);
- future<> notify_update_user_type(const user_type& type);
- future<> notify_update_view(const view_ptr& view, bool columns_changed);
- future<> notify_drop_keyspace(const sstring& ks_name);
- future<> notify_drop_column_family(const schema_ptr& cfm);
- future<> notify_drop_user_type(const user_type& type);
- future<> notify_drop_view(const view_ptr& view);
-
bool should_pull_schema_from(const gms::inet_address& endpoint);
bool has_compatible_schema_tables_version(const gms::inet_address& endpoint);

diff --git a/database.cc b/database.cc
index 9909e3b68..d5c1f58c9 100644
--- a/database.cc
+++ b/database.cc
@@ -689,7 +689,7 @@ future<> database::update_keyspace(const sstring& name) {
auto new_ksm = ::make_lw_shared<keyspace_metadata>(tmp_ksm->name(), tmp_ksm->strategy_name(), tmp_ksm->strategy_options(), tmp_ksm->durable_writes(),
boost::copy_range<std::vector<schema_ptr>>(ks.metadata()->cf_meta_data() | boost::adaptors::map_values), ks.metadata()->user_types());
ks.update_from(std::move(new_ksm));
- return service::get_local_migration_manager().notify_update_keyspace(ks.metadata());
+ return service::get_local_migration_manager().notify().update_keyspace(ks.metadata());
});
}

diff --git a/db/schema_tables.cc b/db/schema_tables.cc
index 7e6ec65c7..6f3fd527d 100644
--- a/db/schema_tables.cc
+++ b/db/schema_tables.cc
@@ -912,7 +912,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
return do_for_each(keyspaces_to_drop, [&db] (auto keyspace_to_drop) {
db.drop_keyspace(keyspace_to_drop);
- return service::get_local_migration_manager().notify_drop_keyspace(keyspace_to_drop);
+ return service::get_local_migration_manager().notify().drop_keyspace(keyspace_to_drop);
});
}).get0();
});
@@ -955,7 +955,7 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
return do_for_each(created, [&db](auto&& val) {
auto ksm = create_keyspace_from_schema_partition(val);
return db.create_keyspace(ksm).then([ksm] {
- return service::get_local_migration_manager().notify_create_keyspace(ksm);
+ return service::get_local_migration_manager().notify().create_keyspace(ksm);
});
}).then([&altered, &db]() {
return do_for_each(altered, [&db](auto& name) {
@@ -1055,14 +1055,14 @@ static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
when_all(notifications.begin(), notifications.end()).get();
};
// View drops are notified first, because a table can only be dropped if its views are already deleted
- notify(views_diff.dropped, [&] (auto&& dt) { return mm.notify_drop_view(view_ptr(dt.schema)); });
- notify(tables_diff.dropped, [&] (auto&& dt) { return mm.notify_drop_column_family(dt.schema); });
+ notify(views_diff.dropped, [&] (auto&& dt) { return mm.notify().drop_view(view_ptr(dt.schema)); });
+ notify(tables_diff.dropped, [&] (auto&& dt) { return mm.notify().drop_column_family(dt.schema); });
// Table creations are notified first, in case a view is created right after the table
- notify(tables_diff.created, [&] (auto&& gs) { return mm.notify_create_column_family(gs); });
- notify(views_diff.created, [&] (auto&& gs) { return mm.notify_create_view(view_ptr(gs)); });
+ notify(tables_diff.created, [&] (auto&& gs) { return mm.notify().create_column_family(gs); });
+ notify(views_diff.created, [&] (auto&& gs) { return mm.notify().create_view(view_ptr(gs)); });
// Table altering is notified first, in case new base columns appear
- notify(tables_diff.altered, [&] (auto&& gs) { return mm.notify_update_column_family(gs, *it++); });
- notify(views_diff.altered, [&] (auto&& gs) { return mm.notify_update_view(view_ptr(gs), *it++); });
+ notify(tables_diff.altered, [&] (auto&& gs) { return mm.notify().update_column_family(gs, *it++); });
+ notify(views_diff.altered, [&] (auto&& gs) { return mm.notify().update_view(view_ptr(gs), *it++); });
});
}).get();
}
@@ -1204,11 +1204,11 @@ static std::vector<user_type> create_types(database& db, const std::vector<const
return seastar::async([&] {
for (auto&& user_type : create_types(db, diff.created)) {
db.find_keyspace(user_type->_keyspace).add_user_type(user_type);
- service::get_local_migration_manager().notify_create_user_type(user_type).get();
+ service::get_local_migration_manager().notify().create_user_type(user_type).get();
}
for (auto&& user_type : create_types(db, diff.altered)) {
db.find_keyspace(user_type->_keyspace).add_user_type(user_type);
- service::get_local_migration_manager().notify_update_user_type(user_type).get();
+ service::get_local_migration_manager().notify().update_user_type(user_type).get();
}
});
}).get();
@@ -1218,7 +1218,7 @@ static std::vector<user_type> create_types(database& db, const std::vector<const
return do_with(create_types(db, rows), [&db] (auto &dropped) {
return do_for_each(dropped, [&db](auto& user_type) {
db.find_keyspace(user_type->_keyspace).remove_user_type(user_type);
- return service::get_local_migration_manager().notify_drop_user_type(user_type);
+ return service::get_local_migration_manager().notify().drop_user_type(user_type);
});
});
}).get();
diff --git a/main.cc b/main.cc
index bda3c5802..78cf1cda7 100644
--- a/main.cc
+++ b/main.cc
@@ -496,6 +496,7 @@ int main(int ac, char** av) {

print_starting_message(ac, av, parsed_opts);

+ sharded<service::migration_notifier> mm_notifier;
distributed<database> db;
seastar::sharded<service::cache_hitrate_calculator> cf_cache_hitrate_calculator;
debug::db = &db;
@@ -532,7 +533,7 @@ int main(int ac, char** av) {

tcp_syncookies_sanity();

- return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &ctx, &opts, &dirs,
+ return seastar::async([cfg, ext, &db, &qp, &proxy, &mm, &mm_notifier, &ctx, &opts, &dirs,
&prometheus_server, &cf_cache_hitrate_calculator, &feature_service] {
try {
::stop_signal stop_signal; // we can move this earlier to support SIGINT during initialization
@@ -682,6 +683,12 @@ int main(int ac, char** av) {
});
set_abort_on_internal_error(cfg->abort_on_internal_error());

+ supervisor::notify("starting mm_notifier");
+ mm_notifier.start().get();
+ auto stop_mm_notifier = defer_verbose_shutdown("mm_notifier", [ &mm_notifier ] {
+ mm_notifier.stop().get();
+ });
+
supervisor::notify("creating tracing");
tracing::backend_registry tracing_backend_registry;
tracing::register_tracing_keyspace_backend(tracing_backend_registry);
@@ -859,7 +866,7 @@ int main(int ac, char** av) {
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
supervisor::notify("starting migration manager");
- mm.start().get();
+ mm.start(std::ref(mm_notifier)).get();
auto stop_migration_manager = defer_verbose_shutdown("migration manager", [&mm] {
mm.stop().get();
});
diff --git a/service/migration_manager.cc b/service/migration_manager.cc
index 09401cffd..ce46f6494 100644
--- a/service/migration_manager.cc
+++ b/service/migration_manager.cc
@@ -63,8 +63,7 @@ using namespace std::chrono_literals;

const std::chrono::milliseconds migration_manager::migration_delay = 60000ms;

-migration_manager::migration_manager()
- : _listeners{}
+migration_manager::migration_manager(migration_notifier& notifier) : _notify(notifier)
{
}

@@ -159,12 +158,12 @@ future<> migration_manager::uninit_messaging_service()
);
}

-void migration_manager::register_listener(migration_listener* listener)
+void migration_notifier::register_listener(migration_listener* listener)
{
_listeners.emplace_back(listener);
}

-void migration_manager::unregister_listener(migration_listener* listener)
+void migration_notifier::unregister_listener(migration_listener* listener)
{
_listeners.erase(std::remove(_listeners.begin(), _listeners.end(), listener), _listeners.end());
}
@@ -352,7 +351,7 @@ bool migration_manager::should_pull_schema_from(const gms::inet_address& endpoin
&& !gms::get_local_gossiper().is_gossip_only_member(endpoint);
}

-future<> migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
+future<> migration_notifier::create_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
return seastar::async([this, ksm] {
auto&& name = ksm->name();
for (auto&& listener : _listeners) {
@@ -365,7 +364,7 @@ future<> migration_manager::notify_create_keyspace(const lw_shared_ptr<keyspace_
});
}

-future<> migration_manager::notify_create_column_family(const schema_ptr& cfm) {
+future<> migration_notifier::create_column_family(const schema_ptr& cfm) {
return seastar::async([this, cfm] {
auto&& ks_name = cfm->ks_name();
auto&& cf_name = cfm->cf_name();
@@ -379,7 +378,7 @@ future<> migration_manager::notify_create_column_family(const schema_ptr& cfm) {
});
}

-future<> migration_manager::notify_create_user_type(const user_type& type) {
+future<> migration_notifier::create_user_type(const user_type& type) {
return seastar::async([this, type] {
auto&& ks_name = type->_keyspace;
auto&& type_name = type->get_name_as_string();
@@ -393,7 +392,7 @@ future<> migration_manager::notify_create_user_type(const user_type& type) {
});
}

-future<> migration_manager::notify_create_view(const view_ptr& view) {
+future<> migration_notifier::create_view(const view_ptr& view) {
return seastar::async([this, view] {
auto&& ks_name = view->ks_name();
auto&& view_name = view->cf_name();
@@ -421,7 +420,7 @@ public void notifyCreateAggregate(UDAggregate udf)
}
#endif

-future<> migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
+future<> migration_notifier::update_keyspace(const lw_shared_ptr<keyspace_metadata>& ksm) {
return seastar::async([this, ksm] {
auto&& name = ksm->name();
for (auto&& listener : _listeners) {
@@ -434,7 +433,7 @@ future<> migration_manager::notify_update_keyspace(const lw_shared_ptr<keyspace_
});
}

-future<> migration_manager::notify_update_column_family(const schema_ptr& cfm, bool columns_changed) {
+future<> migration_notifier::update_column_family(const schema_ptr& cfm, bool columns_changed) {
return seastar::async([this, cfm, columns_changed] {
auto&& ks_name = cfm->ks_name();
auto&& cf_name = cfm->cf_name();
@@ -448,7 +447,7 @@ future<> migration_manager::notify_update_column_family(const schema_ptr& cfm, b
});
}

-future<> migration_manager::notify_update_user_type(const user_type& type) {
+future<> migration_notifier::update_user_type(const user_type& type) {
return seastar::async([this, type] {
auto&& ks_name = type->_keyspace;
auto&& type_name = type->get_name_as_string();
@@ -462,7 +461,7 @@ future<> migration_manager::notify_update_user_type(const user_type& type) {
});
}

-future<> migration_manager::notify_update_view(const view_ptr& view, bool columns_changed) {
+future<> migration_notifier::update_view(const view_ptr& view, bool columns_changed) {
return seastar::async([this, view, columns_changed] {
auto&& ks_name = view->ks_name();
auto&& view_name = view->cf_name();
@@ -490,7 +489,7 @@ public void notifyUpdateAggregate(UDAggregate udf)
}
#endif

-future<> migration_manager::notify_drop_keyspace(const sstring& ks_name) {
+future<> migration_notifier::drop_keyspace(const sstring& ks_name) {
return seastar::async([this, ks_name] {
for (auto&& listener : _listeners) {
try {
@@ -502,7 +501,7 @@ future<> migration_manager::notify_drop_keyspace(const sstring& ks_name) {
});
}

-future<> migration_manager::notify_drop_column_family(const schema_ptr& cfm) {
+future<> migration_notifier::drop_column_family(const schema_ptr& cfm) {
return seastar::async([this, cfm] {
auto&& cf_name = cfm->cf_name();
auto&& ks_name = cfm->ks_name();
@@ -516,7 +515,7 @@ future<> migration_manager::notify_drop_column_family(const schema_ptr& cfm) {
});
}

-future<> migration_manager::notify_drop_user_type(const user_type& type) {
+future<> migration_notifier::drop_user_type(const user_type& type) {
return seastar::async([this, type] {
auto&& ks_name = type->_keyspace;
auto&& type_name = type->get_name_as_string();
@@ -530,7 +529,7 @@ future<> migration_manager::notify_drop_user_type(const user_type& type) {
});
}

-future<> migration_manager::notify_drop_view(const view_ptr& view) {
+future<> migration_notifier::drop_view(const view_ptr& view) {
return seastar::async([this, view] {
auto&& ks_name = view->ks_name();
auto&& view_name = view->cf_name();
diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc
index 826a22d73..4d8255f0e 100644
--- a/tests/cql_test_env.cc
+++ b/tests/cql_test_env.cc
@@ -375,6 +375,10 @@ class single_node_cql_env : public cql_test_env {
create_directories((cfg->view_hints_directory() + "/" + std::to_string(i)).c_str());
}

+ sharded<service::migration_notifier> mm_notif;
+ mm_notif.start().get();
+ auto stop_mm_notify = defer([&mm_notif] { mm_notif.stop().get(); });
+
set_abort_on_internal_error(true);
const gms::inet_address listen("127.0.0.1");
auto& ms = netw::get_messaging_service();
@@ -433,7 +437,7 @@ class single_node_cql_env : public cql_test_env {
proxy.start(std::ref(*db), spcfg, std::ref(b)).get();
auto stop_proxy = defer([&proxy] { proxy.stop().get(); });

- mm.start().get();
+ mm.start(std::ref(mm_notif)).get();
auto stop_mm = defer([&mm] { mm.stop().get(); });

auto& qp = cql3::get_query_processor();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:02:59 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
The storage service will need this guy to initialize sub-services
with. Also it registers itself with notifiers.

That said, it's convenient to have the migration notifier on board.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
service/storage_service.hh | 6 ++++--
main.cc | 2 +-
service/storage_service.cc | 6 ++++--
tests/cql_test_env.cc | 2 +-
tests/gossip.cc | 6 +++++-
tests/gossip_test.cc | 2 +-
tests/test_services.cc | 4 +++-
7 files changed, 19 insertions(+), 9 deletions(-)

diff --git a/service/storage_service.hh b/service/storage_service.hh
index cde4c706d..43a201a7a 100644
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -151,6 +151,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
gms::gossiper& _gossiper;
sharded<auth::service>& _auth_service;
sharded<cql3::cql_config>& _cql_config;
+ sharded<service::migration_notifier>& _mnotifier;
int _update_jobs{0};
// Note that this is obviously only valid for the current shard. Users of
// this facility should elect a shard to be the coordinator based on any
@@ -171,7 +172,7 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
size_t _service_memory_total;
semaphore _service_memory_limiter;
public:
- storage_service(abort_source& as, distributed<database>& db, gms::gossiper& gossiper, sharded<auth::service>&, sharded<cql3::cql_config>& cql_config, sharded<db::system_distributed_keyspace>&, sharded<db::view::view_update_generator>&, gms::feature_service& feature_service, storage_service_config config, /* only for tests */ bool for_testing = false, /* only for tests */ std::set<sstring> disabled_features = {});
+ storage_service(abort_source& as, distributed<database>& db, gms::gossiper& gossiper, sharded<auth::service>&, sharded<cql3::cql_config>& cql_config, sharded<service::migration_notifier>& mn, sharded<db::system_distributed_keyspace>&, sharded<db::view::view_update_generator>&, gms::feature_service& feature_service, storage_service_config config, /* only for tests */ bool for_testing = false, /* only for tests */ std::set<sstring> disabled_features = {});
void isolate_on_error();
void isolate_on_commit_error();

@@ -2399,7 +2400,8 @@ class storage_service : public service::migration_listener, public gms::i_endpoi
};

future<> init_storage_service(sharded<abort_source>& abort_sources, distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service,
- sharded<cql3::cql_config>& cql_config, sharded<db::system_distributed_keyspace>& sys_dist_ks,
+ sharded<cql3::cql_config>& cql_config, sharded<service::migration_notifier>& mn,
+ sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service, storage_service_config config);
future<> deinit_storage_service();

diff --git a/main.cc b/main.cc
index 78cf1cda7..a0a1b57f1 100644
--- a/main.cc
+++ b/main.cc
@@ -729,7 +729,7 @@ int main(int ac, char** av) {
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
//FIXME: discarded future
- (void)service::init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg);
+ (void)service::init_storage_service(stop_signal.as_sharded_abort_source(), db, gossiper, auth_service, cql_config, mm_notifier, sys_dist_ks, view_update_generator, feature_service, sscfg);
supervisor::notify("starting per-shard database core");

// Note: changed from using a move here, because we want the config object intact.
diff --git a/service/storage_service.cc b/service/storage_service.cc
index e72f3de3b..39a053ce0 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -140,7 +140,7 @@ int get_generation_number() {
return generation_number;
}

-storage_service::storage_service(abort_source& abort_source, distributed<database>& db, gms::gossiper& gossiper, sharded<auth::service>& auth_service, sharded<cql3::cql_config>& cql_config, sharded<db::system_distributed_keyspace>& sys_dist_ks,
+storage_service::storage_service(abort_source& abort_source, distributed<database>& db, gms::gossiper& gossiper, sharded<auth::service>& auth_service, sharded<cql3::cql_config>& cql_config, sharded<service::migration_notifier>& mn, sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator, gms::feature_service& feature_service, storage_service_config config, bool for_testing, std::set<sstring> disabled_features)
: _abort_source(abort_source)
, _feature_service(feature_service)
@@ -148,6 +148,7 @@ storage_service::storage_service(abort_source& abort_source, distributed<databas
, _gossiper(gossiper)
, _auth_service(auth_service)
, _cql_config(cql_config)
+ , _mnotifier(mn)
, _disabled_features(std::move(disabled_features))
, _service_memory_total(config.available_memory / 10)
, _service_memory_limiter(_service_memory_total)
@@ -3417,9 +3418,10 @@ storage_service::view_build_statuses(sstring keyspace, sstring view_name) const

future<> init_storage_service(sharded<abort_source>& abort_source, distributed<database>& db, sharded<gms::gossiper>& gossiper, sharded<auth::service>& auth_service,
sharded<cql3::cql_config>& cql_config,
+ sharded<service::migration_notifier>& mn,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& view_update_generator, sharded<gms::feature_service>& feature_service, storage_service_config config) {
- return service::get_storage_service().start(std::ref(abort_source), std::ref(db), std::ref(gossiper), std::ref(auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), config);
+ return service::get_storage_service().start(std::ref(abort_source), std::ref(db), std::ref(gossiper), std::ref(auth_service), std::ref(cql_config), std::ref(mn), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), config);
}

future<> deinit_storage_service() {
diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc
index 4d8255f0e..a7c2e4bd5 100644
--- a/tests/cql_test_env.cc
+++ b/tests/cql_test_env.cc
@@ -409,7 +409,7 @@ class single_node_cql_env : public cql_test_env {
auto& ss = service::get_storage_service();
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
- ss.start(std::ref(abort_sources), std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), sscfg, true, cfg_in.disabled_features).get();
+ ss.start(std::ref(abort_sources), std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(cql_config), std::ref(mm_notif), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), sscfg, true, cfg_in.disabled_features).get();
auto stop_storage_service = defer([&ss] { ss.stop().get(); });

database_config dbcfg;
diff --git a/tests/gossip.cc b/tests/gossip.cc
index b77bfc51e..51749a6a4 100644
--- a/tests/gossip.cc
+++ b/tests/gossip.cc
@@ -81,12 +81,16 @@ int main(int ac, char ** av) {
sharded<db::system_distributed_keyspace> sys_dist_ks;
sharded<db::view::view_update_generator> view_update_generator;
sharded<abort_source> abort_sources;
+ sharded<service::migration_notifier> mnotif;
+
abort_sources.start().get();
auto stop_abort_source = defer([&] { abort_sources.stop().get(); });
+ mnotif.start().get();
+ auto stop_mnotifier = defer([&] { mnotif.stop().get(); });
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
cql_config.start().get();
- service::init_storage_service(std::ref(abort_sources), db, gms::get_gossiper(), auth_service, cql_config, sys_dist_ks, view_update_generator, feature_service, sscfg).get();
+ service::init_storage_service(std::ref(abort_sources), db, gms::get_gossiper(), auth_service, cql_config, mnotif, sys_dist_ks, view_update_generator, feature_service, sscfg).get();
netw::get_messaging_service().start(listen).get();
auto& server = netw::get_local_messaging_service();
auto port = server.port();
diff --git a/tests/gossip_test.cc b/tests/gossip_test.cc
index b5b9a3334..0fb3db2c1 100644
--- a/tests/gossip_test.cc
+++ b/tests/gossip_test.cc
@@ -76,7 +76,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
cql_config.start().get();
auto stop_cql_config = defer([&] { cql_config.stop().get(); });

- service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(cql_config), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, true).get();
+ service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(cql_config), std::ref(mm_notif), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, true).get();
auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });

db.start(std::ref(cfg), dbcfg).get();
diff --git a/tests/test_services.cc b/tests/test_services.cc
index f86aef89f..a95154e49 100644
--- a/tests/test_services.cc
+++ b/tests/test_services.cc
@@ -41,6 +41,7 @@ class storage_service_for_tests::impl {
db::config _cfg;
sharded<auth::service> _auth_service;
sharded<cql3::cql_config> _cql_config;
+ sharded<service::migration_notifier> _mnotif;
sharded<db::system_distributed_keyspace> _sys_dist_ks;
sharded<db::view::view_update_generator> _view_update_generator;
public:
@@ -51,13 +52,14 @@ class storage_service_for_tests::impl {
utils::fb_utilities::set_broadcast_address(gms::inet_address("localhost"));
utils::fb_utilities::set_broadcast_rpc_address(gms::inet_address("localhost"));
_abort_source.start().get();
+ _mnotif.start().get();
_feature_service.start().get();
_gossiper.start(std::ref(_feature_service), std::ref(_cfg)).get();
netw::get_messaging_service().start(gms::inet_address("127.0.0.1"), 7000, false).get();
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
_cql_config.start().get();
- service::get_storage_service().start(std::ref(_abort_source), std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_cql_config), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service), sscfg, true).get();
+ service::get_storage_service().start(std::ref(_abort_source), std::ref(_db), std::ref(_gossiper), std::ref(_auth_service), std::ref(_cql_config), std::ref(_mnotif), std::ref(_sys_dist_ks), std::ref(_view_update_generator), std::ref(_feature_service), sscfg, true).get();
service::get_storage_service().invoke_on_all([] (auto& ss) {
ss.enable_all_features();
}).get();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:02:59 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
Do not call for local migration manager instance to send notifications,
call for the local migration notifier, it will always be alive.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
database.hh | 6 +++++-
service/migration_manager.hh | 2 --
database.cc | 5 +++--
db/schema_tables.cc | 25 ++++++++++++-------------
main.cc | 2 +-
tests/cql_test_env.cc | 2 +-
tests/gossip_test.cc | 6 +++++-
7 files changed, 27 insertions(+), 21 deletions(-)

diff --git a/database.hh b/database.hh
index 1d186706a..c96191958 100644
--- a/database.hh
+++ b/database.hh
@@ -102,6 +102,7 @@ class reconcilable_result;

namespace service {
class storage_proxy;
+class migration_notifier;
}

namespace netw {
@@ -1318,10 +1319,13 @@ class database {
friend db::data_listeners;
std::unique_ptr<db::data_listeners> _data_listeners;

+ service::migration_notifier& _mnotifier;
+
bool _supports_infinite_bound_range_deletions = false;

future<> init_commitlog();
public:
+ service::migration_notifier& notify() const { return _mnotifier; }
future<> apply_in_memory(const frozen_mutation& m, schema_ptr m_schema, db::rp_handle&&, db::timeout_clock::time_point timeout);
future<> apply_in_memory(const mutation& m, column_family& cf, db::rp_handle&&, db::timeout_clock::time_point timeout);
private:
@@ -1352,7 +1356,7 @@ class database {
void set_enable_incremental_backups(bool val) { _enable_incremental_backups = val; }

future<> parse_system_tables(distributed<service::storage_proxy>&);
- database(const db::config&, database_config dbcfg);
+ database(const db::config&, database_config dbcfg, service::migration_notifier& mn);
database(database&&) = delete;
~database();

diff --git a/service/migration_manager.hh b/service/migration_manager.hh
index ad6c4a60c..1e8d8b130 100644
--- a/service/migration_manager.hh
+++ b/service/migration_manager.hh
@@ -73,8 +73,6 @@ class migration_manager : public seastar::async_sharded_service<migration_manage
_notify.unregister_listener(listener);
}

- inline migration_notifier& notify() { return _notify; }
-
future<> schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state);

future<> maybe_schedule_schema_pull(const utils::UUID& their_version, const gms::inet_address& endpoint);
diff --git a/database.cc b/database.cc
index d5c1f58c9..0e0d8817e 100644
--- a/database.cc
+++ b/database.cc
@@ -181,7 +181,7 @@ void keyspace::remove_user_type(const user_type ut) {

utils::UUID database::empty_version = utils::UUID_gen::get_name_UUID(bytes{});

-database::database(const db::config& cfg, database_config dbcfg)
+database::database(const db::config& cfg, database_config dbcfg, service::migration_notifier& mn)
: _stats(make_lw_shared<db_stats>())
, _cl_stats(std::make_unique<cell_locker_stats>())
, _cfg(cfg)
@@ -230,6 +230,7 @@ database::database(const db::config& cfg, database_config dbcfg)
, _system_sstables_manager(std::make_unique<sstables::sstables_manager>(*_nop_large_data_handler))
, _result_memory_limiter(dbcfg.available_memory / 10)
, _data_listeners(std::make_unique<db::data_listeners>(*this))
+ , _mnotifier(mn)
{
local_schema_registry().init(*this); // TODO: we're never unbound.
setup_metrics();
@@ -689,7 +690,7 @@ future<> database::update_keyspace(const sstring& name) {
auto new_ksm = ::make_lw_shared<keyspace_metadata>(tmp_ksm->name(), tmp_ksm->strategy_name(), tmp_ksm->strategy_options(), tmp_ksm->durable_writes(),
boost::copy_range<std::vector<schema_ptr>>(ks.metadata()->cf_meta_data() | boost::adaptors::map_values), ks.metadata()->user_types());
ks.update_from(std::move(new_ksm));
- return service::get_local_migration_manager().notify().update_keyspace(ks.metadata());
+ return _mnotifier.update_keyspace(ks.metadata());
});
}

diff --git a/db/schema_tables.cc b/db/schema_tables.cc
index 6f3fd527d..b32e7e121 100644
--- a/db/schema_tables.cc
+++ b/db/schema_tables.cc
@@ -912,7 +912,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, std:
// it is safe to drop a keyspace only when all nested ColumnFamilies where deleted
return do_for_each(keyspaces_to_drop, [&db] (auto keyspace_to_drop) {
db.drop_keyspace(keyspace_to_drop);
- return service::get_local_migration_manager().notify().drop_keyspace(keyspace_to_drop);
+ return db.notify().drop_keyspace(keyspace_to_drop);
});
}).get0();
});
@@ -954,8 +954,8 @@ future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& p
return proxy.local().get_db().invoke_on_all([&created, &altered] (database& db) {
return do_for_each(created, [&db](auto&& val) {
auto ksm = create_keyspace_from_schema_partition(val);
- return db.create_keyspace(ksm).then([ksm] {
- return service::get_local_migration_manager().notify().create_keyspace(ksm);
+ return db.create_keyspace(ksm).then([&db, ksm] {
+ return db.notify().create_keyspace(ksm);
});
}).then([&altered, &db]() {
return do_for_each(altered, [&db](auto& name) {
@@ -1048,21 +1048,20 @@ static void merge_tables_and_views(distributed<service::storage_proxy>& proxy,
columns_changed.push_back(db.update_column_family(gs));
}

- auto& mm = service::get_local_migration_manager();
auto it = columns_changed.begin();
auto notify = [&] (auto& r, auto&& f) {
auto notifications = r | boost::adaptors::transformed(f);
when_all(notifications.begin(), notifications.end()).get();
};
// View drops are notified first, because a table can only be dropped if its views are already deleted
- notify(views_diff.dropped, [&] (auto&& dt) { return mm.notify().drop_view(view_ptr(dt.schema)); });
- notify(tables_diff.dropped, [&] (auto&& dt) { return mm.notify().drop_column_family(dt.schema); });
+ notify(views_diff.dropped, [&] (auto&& dt) { return db.notify().drop_view(view_ptr(dt.schema)); });
+ notify(tables_diff.dropped, [&] (auto&& dt) { return db.notify().drop_column_family(dt.schema); });
// Table creations are notified first, in case a view is created right after the table
- notify(tables_diff.created, [&] (auto&& gs) { return mm.notify().create_column_family(gs); });
- notify(views_diff.created, [&] (auto&& gs) { return mm.notify().create_view(view_ptr(gs)); });
+ notify(tables_diff.created, [&] (auto&& gs) { return db.notify().create_column_family(gs); });
+ notify(views_diff.created, [&] (auto&& gs) { return db.notify().create_view(view_ptr(gs)); });
// Table altering is notified first, in case new base columns appear
- notify(tables_diff.altered, [&] (auto&& gs) { return mm.notify().update_column_family(gs, *it++); });
- notify(views_diff.altered, [&] (auto&& gs) { return mm.notify().update_view(view_ptr(gs), *it++); });
+ notify(tables_diff.altered, [&] (auto&& gs) { return db.notify().update_column_family(gs, *it++); });
+ notify(views_diff.altered, [&] (auto&& gs) { return db.notify().update_view(view_ptr(gs), *it++); });
});
}).get();
}
@@ -1204,11 +1203,11 @@ static std::vector<user_type> create_types(database& db, const std::vector<const
return seastar::async([&] {
for (auto&& user_type : create_types(db, diff.created)) {
db.find_keyspace(user_type->_keyspace).add_user_type(user_type);
- service::get_local_migration_manager().notify().create_user_type(user_type).get();
+ db.notify().create_user_type(user_type).get();
}
for (auto&& user_type : create_types(db, diff.altered)) {
db.find_keyspace(user_type->_keyspace).add_user_type(user_type);
- service::get_local_migration_manager().notify().update_user_type(user_type).get();
+ db.notify().update_user_type(user_type).get();
}
});
}).get();
@@ -1218,7 +1217,7 @@ static std::vector<user_type> create_types(database& db, const std::vector<const
return do_with(create_types(db, rows), [&db] (auto &dropped) {
return do_for_each(dropped, [&db](auto& user_type) {
db.find_keyspace(user_type->_keyspace).remove_user_type(user_type);
- return service::get_local_migration_manager().notify().drop_user_type(user_type);
+ return db.notify().drop_user_type(user_type);
});
});
}).get();
diff --git a/main.cc b/main.cc
index a0a1b57f1..66899b957 100644
--- a/main.cc
+++ b/main.cc
@@ -741,7 +741,7 @@ int main(int ac, char** av) {
dbcfg.memtable_scheduling_group = make_sched_group("memtable", 1000);
dbcfg.memtable_to_cache_scheduling_group = make_sched_group("memtable_to_cache", 200);
dbcfg.available_memory = memory::stats().total_memory();
- db.start(std::ref(*cfg), dbcfg).get();
+ db.start(std::ref(*cfg), dbcfg, std::ref(mm_notifier)).get();
auto stop_database_and_sstables = defer_verbose_shutdown("database", [&db] {
// #293 - do not stop anything - not even db (for real)
//return db.stop();
diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc
index a7c2e4bd5..a4ef8e40b 100644
--- a/tests/cql_test_env.cc
+++ b/tests/cql_test_env.cc
@@ -414,7 +414,7 @@ class single_node_cql_env : public cql_test_env {

database_config dbcfg;
dbcfg.available_memory = memory::stats().total_memory();
- db->start(std::ref(*cfg), dbcfg).get();
+ db->start(std::ref(*cfg), dbcfg, std::ref(mm_notif)).get();
auto stop_db = defer([db] {
db->stop().get();
});
diff --git a/tests/gossip_test.cc b/tests/gossip_test.cc
index 0fb3db2c1..dafd94c30 100644
--- a/tests/gossip_test.cc
+++ b/tests/gossip_test.cc
@@ -48,6 +48,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
distributed<database> db;
database_config dbcfg;
db::config cfg;
+ sharded<service::migration_notifier> mm_notif;
sharded<abort_source> abort_sources;
sharded<auth::service> auth_service;
sharded<db::system_distributed_keyspace> sys_dist_ks;
@@ -55,6 +56,9 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
utils::fb_utilities::set_broadcast_address(gms::inet_address("127.0.0.1"));
sharded<gms::feature_service> feature_service;

+ mm_notif.start().get();
+ auto stop_mm_notif = defer([&mm_notif] { mm_notif.stop().get(); });
+
abort_sources.start().get();
auto stop_abort_sources = defer([&] { abort_sources.stop().get(); });

@@ -79,7 +83,7 @@ SEASTAR_TEST_CASE(test_boot_shutdown){
service::get_storage_service().start(std::ref(abort_sources), std::ref(db), std::ref(gms::get_gossiper()), std::ref(auth_service), std::ref(cql_config), std::ref(mm_notif), std::ref(sys_dist_ks), std::ref(view_update_generator), std::ref(feature_service), sscfg, true).get();
auto stop_ss = defer([&] { service::get_storage_service().stop().get(); });

- db.start(std::ref(cfg), dbcfg).get();
+ db.start(std::ref(cfg), dbcfg, std::ref(mm_notif)).get();
auto stop_db = defer([&] { db.stop().get(); });
auto stop_database_d = defer([&db] {
stop_database(db).get();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:03:00 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
The migration manager itself is still needed on start to wait
for schema agreement, but there's no longer the need for the
life-time reference on it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
db/view/view_builder.hh | 6 +++---
db/view/view.cc | 14 +++++++-------
main.cc | 6 ++++--
tests/cql_test_env.cc | 6 ++++--
4 files changed, 18 insertions(+), 14 deletions(-)

diff --git a/db/view/view_builder.hh b/db/view/view_builder.hh
index cdb8b8cb9..041d2c964 100644
--- a/db/view/view_builder.hh
+++ b/db/view/view_builder.hh
@@ -146,7 +146,7 @@ class view_builder final : public service::migration_listener::only_view_notific

database& _db;
db::system_distributed_keyspace& _sys_dist_ks;
- service::migration_manager& _mm;
+ service::migration_notifier& _mnotifier;
base_to_build_step_type _base_to_build_step;
base_to_build_step_type::iterator _current_step = _base_to_build_step.end();
serialized_action _build_step{std::bind(&view_builder::do_build_step, this)};
@@ -174,7 +174,7 @@ class view_builder final : public service::migration_listener::only_view_notific
static constexpr size_t batch_memory_max = 1024*1024;

public:
- view_builder(database&, db::system_distributed_keyspace&, service::migration_manager&);
+ view_builder(database&, db::system_distributed_keyspace&, service::migration_notifier&);
view_builder(view_builder&&) = delete;

/**
@@ -182,7 +182,7 @@ class view_builder final : public service::migration_listener::only_view_notific
* Requires that all views have been loaded from the system tables and are accessible
* through the database, and that the commitlog has been replayed.
*/
- future<> start();
+ future<> start(service::migration_manager&);

/**
* Stops the view building process.
diff --git a/db/view/view.cc b/db/view/view.cc
index 5cf924e2a..d67ae6859 100644
--- a/db/view/view.cc
+++ b/db/view/view.cc
@@ -1142,16 +1142,16 @@ future<> mutate_MV(
return f.finally([fs = std::move(fs)] { });
}

-view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_dist_ks, service::migration_manager& mm)
+view_builder::view_builder(database& db, db::system_distributed_keyspace& sys_dist_ks, service::migration_notifier& mn)
: _db(db)
, _sys_dist_ks(sys_dist_ks)
- , _mm(mm) {
+ , _mnotifier(mn) {
}

-future<> view_builder::start() {
- _started = seastar::async([this] {
+future<> view_builder::start(service::migration_manager& mm) {
+ _started = seastar::async([this, &mm] {
// Wait for schema agreement even if we're a seed node.
- while (!_mm.have_schema_agreement()) {
+ while (!mm.have_schema_agreement()) {
if (_as.abort_requested()) {
return;
}
@@ -1160,7 +1160,7 @@ future<> view_builder::start() {
auto built = system_keyspace::load_built_views().get0();
auto in_progress = system_keyspace::load_view_build_progress().get0();
calculate_shard_build_step(std::move(built), std::move(in_progress)).get();
- _mm.register_listener(this);
+ _mnotifier.register_listener(this);
_current_step = _base_to_build_step.begin();
// Waited on indirectly in stop().
(void)_build_step.trigger();
@@ -1172,7 +1172,7 @@ future<> view_builder::stop() {
vlogger.info("Stopping view builder");
_as.request_abort();
return _started.finally([this] {
- _mm.unregister_listener(this);
+ _mnotifier.unregister_listener(this);
return _sem.wait().then([this] {
_sem.broken();
return _build_step.join();
diff --git a/main.cc b/main.cc
index 66899b957..7de9c22d5 100644
--- a/main.cc
+++ b/main.cc
@@ -1063,8 +1063,10 @@ int main(int ac, char** av) {
static sharded<db::view::view_builder> view_builder;
if (cfg->view_building()) {
supervisor::notify("starting the view builder");
- view_builder.start(std::ref(db), std::ref(sys_dist_ks), std::ref(mm)).get();
- view_builder.invoke_on_all(&db::view::view_builder::start).get();
+ view_builder.start(std::ref(db), std::ref(sys_dist_ks), std::ref(mm_notifier)).get();
+ view_builder.invoke_on_all([&mm] (db::view::view_builder& vb) {
+ return vb.start(mm.local());
+ }).get();
}

supervisor::notify("starting native transport");
diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc
index a4ef8e40b..bf210c6e6 100644
--- a/tests/cql_test_env.cc
+++ b/tests/cql_test_env.cc
@@ -488,8 +488,10 @@ class single_node_cql_env : public cql_test_env {
});

auto view_builder = ::make_shared<seastar::sharded<db::view::view_builder>>();
- view_builder->start(std::ref(*db), std::ref(sys_dist_ks), std::ref(mm)).get();
- view_builder->invoke_on_all(&db::view::view_builder::start).get();
+ view_builder->start(std::ref(*db), std::ref(sys_dist_ks), std::ref(mm_notif)).get();
+ view_builder->invoke_on_all([&mm] (db::view::view_builder& vb) {
+ return vb.start(mm.local());
+ }).get();
auto stop_view_builder = defer([view_builder] {
view_builder->stop().get();
});
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:03:02 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
The same as with view builder. The constructor still needs both,
but the life-time reference is now for notifier only.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
auth/service.hh | 10 ++++++----
auth/service.cc | 21 +++++++++++----------
service/storage_service.cc | 5 ++++-
3 files changed, 21 insertions(+), 15 deletions(-)

diff --git a/auth/service.hh b/auth/service.hh
index c325a1453..e38b66318 100644
--- a/auth/service.hh
+++ b/auth/service.hh
@@ -43,6 +43,7 @@ class query_processor;

namespace service {
class migration_manager;
+class migration_notifier;
class migration_listener;
}

@@ -85,7 +86,7 @@ class service final : public seastar::peering_sharded_service<service> {

cql3::query_processor& _qp;

- ::service::migration_manager& _migration_manager;
+ ::service::migration_notifier& _mnotifier;

std::unique_ptr<authorizer> _authorizer;

@@ -100,7 +101,7 @@ class service final : public seastar::peering_sharded_service<service> {
service(
permissions_cache_config,
cql3::query_processor&,
- ::service::migration_manager&,
+ ::service::migration_notifier&,
std::unique_ptr<authorizer>,
std::unique_ptr<authenticator>,
std::unique_ptr<role_manager>);
@@ -113,10 +114,11 @@ class service final : public seastar::peering_sharded_service<service> {
service(
permissions_cache_config,
cql3::query_processor&,
+ ::service::migration_notifier&,
::service::migration_manager&,
const service_config&);

- future<> start();
+ future<> start(::service::migration_manager&);

future<> stop();

@@ -162,7 +164,7 @@ class service final : public seastar::peering_sharded_service<service> {
private:
future<bool> has_existing_legacy_users() const;

- future<> create_keyspace_if_missing() const;
+ future<> create_keyspace_if_missing(::service::migration_manager& mm) const;
};

future<bool> has_superuser(const service&, const authenticated_user&);
diff --git a/auth/service.cc b/auth/service.cc
index fdd199daf..d3ac3d56a 100644
--- a/auth/service.cc
+++ b/auth/service.cc
@@ -114,14 +114,14 @@ static future<> validate_role_exists(const service& ser, std::string_view role_n
service::service(
permissions_cache_config c,
cql3::query_processor& qp,
- ::service::migration_manager& mm,
+ ::service::migration_notifier& mn,
std::unique_ptr<authorizer> z,
std::unique_ptr<authenticator> a,
std::unique_ptr<role_manager> r)
: _permissions_cache_config(std::move(c))
, _permissions_cache(nullptr)
, _qp(qp)
- , _migration_manager(mm)
+ , _mnotifier(mn)
, _authorizer(std::move(z))
, _authenticator(std::move(a))
, _role_manager(std::move(r))
@@ -141,18 +141,19 @@ service::service(
service::service(
permissions_cache_config c,
cql3::query_processor& qp,
+ ::service::migration_notifier& mn,
::service::migration_manager& mm,
const service_config& sc)
: service(
std::move(c),
qp,
- mm,
+ mn,
create_object<authorizer>(sc.authorizer_java_name, qp, mm),
create_object<authenticator>(sc.authenticator_java_name, qp, mm),
create_object<role_manager>(sc.role_manager_java_name, qp, mm)) {
}

-future<> service::create_keyspace_if_missing() const {
+future<> service::create_keyspace_if_missing(::service::migration_manager& mm) const {
auto& db = _qp.db();

if (!db.has_keyspace(meta::AUTH_KS)) {
@@ -166,15 +167,15 @@ future<> service::create_keyspace_if_missing() const {

// We use min_timestamp so that default keyspace metadata will loose with any manual adjustments.
// See issue #2129.
- return _migration_manager.announce_new_keyspace(ksm, api::min_timestamp, false);
+ return mm.announce_new_keyspace(ksm, api::min_timestamp, false);
}

return make_ready_future<>();
}

-future<> service::start() {
- return once_among_shards([this] {
- return create_keyspace_if_missing();
+future<> service::start(::service::migration_manager& mm) {
+ return once_among_shards([this, &mm] {
+ return create_keyspace_if_missing(mm);
}).then([this] {
return _role_manager->start().then([this] {
return when_all_succeed(_authorizer->start(), _authenticator->start());
@@ -183,7 +184,7 @@ future<> service::start() {
_permissions_cache = std::make_unique<permissions_cache>(_permissions_cache_config, *this, log);
}).then([this] {
return once_among_shards([this] {
- _migration_manager.register_listener(_migration_listener.get());
+ _mnotifier.register_listener(_migration_listener.get());
return make_ready_future<>();
});
});
@@ -192,7 +193,7 @@ future<> service::start() {
future<> service::stop() {
// Only one of the shards has the listener registered, but let's try to
// unregister on each one just to make sure.
- _migration_manager.unregister_listener(_migration_listener.get());
+ _mnotifier.unregister_listener(_migration_listener.get());

return _permissions_cache->stop().then([this] {
return when_all_succeed(_role_manager->stop(), _authorizer->stop(), _authenticator->stop());
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 39a053ce0..2f22002de 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -803,10 +803,13 @@ void storage_service::join_token_ring(int delay) {
_auth_service.start(
permissions_cache_config_from_db_config(_db.local().get_config()),
std::ref(cql3::get_query_processor()),
+ std::ref(_mnotifier),
std::ref(service::get_migration_manager()),
auth_service_config_from_db_config(_db.local().get_config())).get();

- _auth_service.invoke_on_all(&auth::service::start).get();
+ _auth_service.invoke_on_all([] (auth::service& auth) {
+ return auth.start(service::get_local_migration_manager());
+ }).get();

supervisor::notify("starting tracing");
tracing::tracing::start_tracing().get();
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:03:04 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
This patch breaks one (probably harmless but still) dependency
loop. The query_processor -> migration_manager -> storage_proxy
-> tracing -> query_processor.

The first link is not not needed, as the query_processor needs the
migration_manager purely to (ub)subscribe on notifications.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
cql3/query_processor.hh | 5 +++--
auth/service.cc | 2 +-
cql3/query_processor.cc | 7 ++++---
main.cc | 2 +-
tests/cql_test_env.cc | 2 +-
5 files changed, 10 insertions(+), 8 deletions(-)

diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh
index bb6a83722..825760573 100644
--- a/cql3/query_processor.hh
+++ b/cql3/query_processor.hh
@@ -57,7 +57,7 @@
#include "cql3/untyped_result_set.hh"
#include "exceptions/exceptions.hh"
#include "log.hh"
-#include "service/migration_manager.hh"
+#include "service/migration_listener.hh"
#include "service/query_state.hh"
#include "transport/messages/result_message.hh"

@@ -109,6 +109,7 @@ class query_processor {
std::unique_ptr<migration_subscriber> _migration_subscriber;
service::storage_proxy& _proxy;
database& _db;
+ service::migration_notifier& _mnotifier;

struct stats {
uint64_t prepare_invocations = 0;
@@ -142,7 +143,7 @@ class query_processor {

static ::shared_ptr<statements::raw::parsed_statement> parse_statement(const std::string_view& query);

- query_processor(service::storage_proxy& proxy, database& db, memory_config mcfg);
+ query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, memory_config mcfg);

~query_processor();

diff --git a/auth/service.cc b/auth/service.cc
index d3ac3d56a..287a378b7 100644
--- a/auth/service.cc
+++ b/auth/service.cc
@@ -39,7 +39,7 @@
#include "db/consistency_level_type.hh"
#include "exceptions/exceptions.hh"
#include "log.hh"
-#include "service/migration_listener.hh"
+#include "service/migration_manager.hh"
#include "utils/class_registrator.hh"
#include "database.hh"

diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc
index b03965b46..12ce86ece 100644
--- a/cql3/query_processor.cc
+++ b/cql3/query_processor.cc
@@ -85,10 +85,11 @@ class query_processor::internal_state {
}
};

-query_processor::query_processor(service::storage_proxy& proxy, database& db, query_processor::memory_config mcfg)
+query_processor::query_processor(service::storage_proxy& proxy, database& db, service::migration_notifier& mn, query_processor::memory_config mcfg)
: _migration_subscriber{std::make_unique<migration_subscriber>(this)}
, _proxy(proxy)
, _db(db)
+ , _mnotifier(mn)
, _internal_state(new internal_state())
, _prepared_cache(prep_cache_log, mcfg.prepared_statment_cache_size)
, _authorized_prepared_cache(std::min(std::chrono::milliseconds(_db.get_config().permissions_validity_in_ms()),
@@ -437,14 +438,14 @@ query_processor::query_processor(service::storage_proxy& proxy, database& db, qu

});

- service::get_local_migration_manager().register_listener(_migration_subscriber.get());
+ _mnotifier.register_listener(_migration_subscriber.get());
}

query_processor::~query_processor() {
}

future<> query_processor::stop() {
- service::get_local_migration_manager().unregister_listener(_migration_subscriber.get());
+ _mnotifier.unregister_listener(_migration_subscriber.get());
return _authorized_prepared_cache.stop().finally([this] { return _prepared_cache.stop(); });
}

diff --git a/main.cc b/main.cc
index 7de9c22d5..b3d5ae9c6 100644
--- a/main.cc
+++ b/main.cc
@@ -872,7 +872,7 @@ int main(int ac, char** av) {
});
supervisor::notify("starting query processor");
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
- qp.start(std::ref(proxy), std::ref(db), qp_mcfg).get();
+ qp.start(std::ref(proxy), std::ref(db), std::ref(mm_notifier), qp_mcfg).get();
// #293 - do not stop anything
// engine().at_exit([&qp] { return qp.stop(); });
supervisor::notify("initializing batchlog manager");
diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc
index bf210c6e6..73d264d5d 100644
--- a/tests/cql_test_env.cc
+++ b/tests/cql_test_env.cc
@@ -442,7 +442,7 @@ class single_node_cql_env : public cql_test_env {

auto& qp = cql3::get_query_processor();
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
- qp.start(std::ref(proxy), std::ref(*db), qp_mcfg).get();
+ qp.start(std::ref(proxy), std::ref(*db), std::ref(mm_notif), qp_mcfg).get();
auto stop_qp = defer([&qp] { qp.stop().get(); });

db::batchlog_manager_config bmcfg;
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:03:05 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
This patch removes an implicit cql_server -> migration_manager
dependency, as the former's event notifier uses the latter
for notifications.

This dependency also breaks a loop:
storage_service -> cql_server -> migration_manager -> storage_service

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
transport/server.hh | 7 +++++--
service/storage_service.cc | 2 +-
transport/event_notifier.cc | 8 ++++----
transport/server.cc | 4 ++--
4 files changed, 12 insertions(+), 9 deletions(-)

diff --git a/transport/server.hh b/transport/server.hh
index a2cdd9e41..7b788f4db 100644
--- a/transport/server.hh
+++ b/transport/server.hh
@@ -134,7 +134,9 @@ class cql_server : public seastar::peering_sharded_service<cql_server> {
const cql3::cql_config& _cql_config;
public:
cql_server(distributed<cql3::query_processor>& qp, auth::service&,
- const cql3::cql_config& cql_config, cql_server_config config);
+ const cql3::cql_config& cql_config,
+ service::migration_notifier& mn,
+ cql_server_config config);
future<> listen(socket_address addr, std::shared_ptr<seastar::tls::credentials_builder> = {}, bool keepalive = false);
future<> do_accepts(int which, bool keepalive, socket_address server_addr);
future<> stop();
@@ -252,8 +254,9 @@ class cql_server::event_notifier : public service::migration_listener,
std::set<cql_server::connection*> _status_change_listeners;
std::set<cql_server::connection*> _schema_change_listeners;
std::unordered_map<gms::inet_address, event::status_change::status_type> _last_status_change;
+ service::migration_notifier& _mnotifier;
public:
- event_notifier();
+ event_notifier(service::migration_notifier& mn);
~event_notifier();
void register_event(cql_transport::event::event_type et, cql_server::connection* conn);
void unregister_connection(cql_server::connection* conn);
diff --git a/service/storage_service.cc b/service/storage_service.cc
index 2f22002de..dec583f4f 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -2286,7 +2286,7 @@ future<> storage_service::start_native_transport() {
cql_server_smp_service_group_config.max_nonlocal_requests = 5000;
cql_server_config.bounce_request_smp_service_group = create_smp_service_group(cql_server_smp_service_group_config).get0();
seastar::net::inet_address ip = gms::inet_address::lookup(addr, family, preferred).get0();
- cserver->start(std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), std::ref(ss._cql_config), cql_server_config).get();
+ cserver->start(std::ref(cql3::get_query_processor()), std::ref(ss._auth_service), std::ref(ss._cql_config), std::ref(ss._mnotifier), cql_server_config).get();
struct listen_cfg {
socket_address addr;
std::shared_ptr<seastar::tls::credentials_builder> cred;
diff --git a/transport/event_notifier.cc b/transport/event_notifier.cc
index 0008d6275..bd144c5f5 100644
--- a/transport/event_notifier.cc
+++ b/transport/event_notifier.cc
@@ -21,7 +21,7 @@

#include "transport/server.hh"
#include <seastar/core/gate.hh>
-#include "service/migration_manager.hh"
+#include "service/migration_listener.hh"
#include "service/storage_service.hh"
#include "transport/response.hh"

@@ -29,16 +29,16 @@ namespace cql_transport {

static logging::logger elogger("event_notifier");

-cql_server::event_notifier::event_notifier()
+cql_server::event_notifier::event_notifier(service::migration_notifier& mn) : _mnotifier(mn)
{
- service::get_local_migration_manager().register_listener(this);
+ _mnotifier.register_listener(this);
service::get_local_storage_service().register_subscriber(this);
}

cql_server::event_notifier::~event_notifier()
{
service::get_local_storage_service().unregister_subscriber(this);
- service::get_local_migration_manager().unregister_listener(this);
+ _mnotifier.unregister_listener(this);
}

void cql_server::event_notifier::register_event(event::event_type et, cql_server::connection* conn)
diff --git a/transport/server.cc b/transport/server.cc
index f3d1af17a..51a3c515a 100644
--- a/transport/server.cc
+++ b/transport/server.cc
@@ -140,12 +140,12 @@ event::event_type parse_event_type(const sstring& value)
}

cql_server::cql_server(distributed<cql3::query_processor>& qp, auth::service& auth_service,
- const cql3::cql_config& cql_config, cql_server_config config)
+ const cql3::cql_config& cql_config, service::migration_notifier& mn, cql_server_config config)
: _query_processor(qp)
, _config(config)
, _max_request_size(config.max_request_size)
, _memory_available(config.get_service_memory_limiter_semaphore())
- , _notifier(std::make_unique<event_notifier>())
+ , _notifier(std::make_unique<event_notifier>(mn))
, _auth_service(auth_service)
, _cql_config(cql_config)
{
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:03:07 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
The storage_server needs migration_manager for notifications and
carefully handles the manager's stop process not to demolish the
listeners list from under itself. From now on this dependency is
no longer valid (however the storage_service seems still need the
migration_manager, but this is different story).

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
service/storage_service.cc | 11 +++--------
1 file changed, 3 insertions(+), 8 deletions(-)

diff --git a/service/storage_service.cc b/service/storage_service.cc
index dec583f4f..f876424f7 100644
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -1473,12 +1473,7 @@ future<> storage_service::drain_on_shutdown() {
}).get();
slogger.info("Drain on shutdown: shutdown commitlog done");

- // NOTE: We currently don't destroy migration_manager nor
- // storage_service in scylla, so when we reach here
- // migration_manager should to be still alive. Be careful, when
- // scylla starts to destroy migration_manager in the shutdown
- // process.
- service::get_local_migration_manager().unregister_listener(&ss);
+ ss._mnotifier.local().unregister_listener(&ss);

slogger.info("Drain on shutdown: done");
});
@@ -1555,9 +1550,9 @@ future<> storage_service::init_server(int delay, bind_messaging_port do_bind) {
#endif
_initialized = true;

- // Register storage_service to migration_manager so we can update
+ // Register storage_service to migration_notifier so we can update
// pending ranges when keyspace is chagned
- service::get_local_migration_manager().register_listener(this);
+ _mnotifier.local().register_listener(this);
#if 0
try
{
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:03:08 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
tests/cql_test_env.hh | 2 ++
tests/cql_test_env.cc | 27 +++++++++++++++++----------
tests/schema_change_test.cc | 6 +++---
3 files changed, 22 insertions(+), 13 deletions(-)

diff --git a/tests/cql_test_env.hh b/tests/cql_test_env.hh
index f578ef66f..3390684ab 100644
--- a/tests/cql_test_env.hh
+++ b/tests/cql_test_env.hh
@@ -123,6 +123,8 @@ class cql_test_env {
virtual db::view::view_builder& local_view_builder() = 0;

virtual db::view::view_update_generator& local_view_update_generator() = 0;
+
+ virtual service::migration_notifier& local_mnotifier() = 0;
};

future<> do_with_cql_env(std::function<future<>(cql_test_env&)> func, cql_test_config = {});
diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc
index 73d264d5d..461300ceb 100644
--- a/tests/cql_test_env.cc
+++ b/tests/cql_test_env.cc
@@ -118,6 +118,7 @@ class single_node_cql_env : public cql_test_env {
::shared_ptr<sharded<auth::service>> _auth_service;
::shared_ptr<sharded<db::view::view_builder>> _view_builder;
::shared_ptr<sharded<db::view::view_update_generator>> _view_update_generator;
+ ::shared_ptr<sharded<service::migration_notifier>> _mnotifier;
private:
struct core_local_state {
service::client_state client_state;
@@ -146,12 +147,14 @@ class single_node_cql_env : public cql_test_env {
::shared_ptr<distributed<database>> db,
::shared_ptr<sharded<auth::service>> auth_service,
::shared_ptr<sharded<db::view::view_builder>> view_builder,
- ::shared_ptr<sharded<db::view::view_update_generator>> view_update_generator)
+ ::shared_ptr<sharded<db::view::view_update_generator>> view_update_generator,
+ ::shared_ptr<sharded<service::migration_notifier>> mnotifier)
: _feature_service(std::move(feature_service))
, _db(db)
, _auth_service(std::move(auth_service))
, _view_builder(std::move(view_builder))
, _view_update_generator(std::move(view_update_generator))
+ , _mnotifier(std::move(mnotifier))
{ }

virtual future<::shared_ptr<cql_transport::messages::result_message>> execute_cql(const sstring& text) override {
@@ -301,6 +304,10 @@ class single_node_cql_env : public cql_test_env {
return _view_update_generator->local();
}

+ virtual service::migration_notifier& local_mnotifier() override {
+ return _mnotifier->local();
+ }
+
future<> start() {
return _core_local.start(std::ref(*_auth_service));
}
@@ -375,9 +382,9 @@ class single_node_cql_env : public cql_test_env {
create_directories((cfg->view_hints_directory() + "/" + std::to_string(i)).c_str());
}

- sharded<service::migration_notifier> mm_notif;
- mm_notif.start().get();
- auto stop_mm_notify = defer([&mm_notif] { mm_notif.stop().get(); });
+ auto mm_notif = ::make_shared<sharded<service::migration_notifier>>();
+ mm_notif->start().get();
+ auto stop_mm_notify = defer([mm_notif] { mm_notif->stop().get(); });

set_abort_on_internal_error(true);
const gms::inet_address listen("127.0.0.1");
@@ -409,12 +416,12 @@ class single_node_cql_env : public cql_test_env {
auto& ss = service::get_storage_service();
service::storage_service_config sscfg;
sscfg.available_memory = memory::stats().total_memory();
- ss.start(std::ref(abort_sources), std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(cql_config), std::ref(mm_notif), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), sscfg, true, cfg_in.disabled_features).get();
+ ss.start(std::ref(abort_sources), std::ref(*db), std::ref(gms::get_gossiper()), std::ref(*auth_service), std::ref(cql_config), std::ref(*mm_notif), std::ref(sys_dist_ks), std::ref(*view_update_generator), std::ref(*feature_service), sscfg, true, cfg_in.disabled_features).get();
auto stop_storage_service = defer([&ss] { ss.stop().get(); });

database_config dbcfg;
dbcfg.available_memory = memory::stats().total_memory();
- db->start(std::ref(*cfg), dbcfg, std::ref(mm_notif)).get();
+ db->start(std::ref(*cfg), dbcfg, std::ref(*mm_notif)).get();
auto stop_db = defer([db] {
db->stop().get();
});
@@ -437,12 +444,12 @@ class single_node_cql_env : public cql_test_env {
proxy.start(std::ref(*db), spcfg, std::ref(b)).get();
auto stop_proxy = defer([&proxy] { proxy.stop().get(); });

- mm.start(std::ref(mm_notif)).get();
+ mm.start().get();
auto stop_mm = defer([&mm] { mm.stop().get(); });

auto& qp = cql3::get_query_processor();
cql3::query_processor::memory_config qp_mcfg = {memory::stats().total_memory() / 256, memory::stats().total_memory() / 2560};
- qp.start(std::ref(proxy), std::ref(*db), std::ref(mm_notif), qp_mcfg).get();
+ qp.start(std::ref(proxy), std::ref(*db), std::ref(*mm_notif), qp_mcfg).get();
auto stop_qp = defer([&qp] { qp.stop().get(); });

db::batchlog_manager_config bmcfg;
@@ -488,7 +495,7 @@ class single_node_cql_env : public cql_test_env {
});

auto view_builder = ::make_shared<seastar::sharded<db::view::view_builder>>();
- view_builder->start(std::ref(*db), std::ref(sys_dist_ks), std::ref(mm_notif)).get();
+ view_builder->start(std::ref(*db), std::ref(sys_dist_ks), std::ref(*mm_notif)).get();
view_builder->invoke_on_all([&mm] (db::view::view_builder& vb) {
return vb.start(mm.local());
}).get();
@@ -511,7 +518,7 @@ class single_node_cql_env : public cql_test_env {
// The default user may already exist if this `cql_test_env` is starting with previously populated data.
}

- single_node_cql_env env(feature_service, db, auth_service, view_builder, view_update_generator);
+ single_node_cql_env env(feature_service, db, auth_service, view_builder, view_update_generator, mm_notif);
env.start().get();
auto stop_env = defer([&env] { env.stop().get(); });

diff --git a/tests/schema_change_test.cc b/tests/schema_change_test.cc
index ecddd2388..314509874 100644
--- a/tests/schema_change_test.cc
+++ b/tests/schema_change_test.cc
@@ -440,7 +440,7 @@ SEASTAR_TEST_CASE(test_nested_type_mutation_in_update) {
// show that we can handle that.
return do_with_cql_env_thread([](cql_test_env& e) {
counting_migration_listener listener;
- service::get_local_migration_manager().register_listener(&listener);
+ e.local_mnotifier().register_listener(&listener);

e.execute_cql("CREATE TYPE foo (foo_k int);").get();
e.execute_cql("CREATE TYPE bar (bar_k frozen<foo>);").get();
@@ -469,8 +469,8 @@ SEASTAR_TEST_CASE(test_notifications) {
return do_with_cql_env([](cql_test_env& e) {
return seastar::async([&] {
counting_migration_listener listener;
- service::get_local_migration_manager().register_listener(&listener);
- auto listener_lease = defer([&listener] { service::get_local_migration_manager().register_listener(&listener); });
+ e.local_mnotifier().register_listener(&listener);
+ auto listener_lease = defer([&e, &listener] { e.local_mnotifier().register_listener(&listener); });

e.execute_cql("create keyspace tests with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };").get();

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:03:10 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
In the 2nd patch the migration_manager kept reference on
notifier for simpler patching, but now we can drop it.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
service/migration_manager.hh | 12 +-----------
main.cc | 2 +-
service/migration_manager.cc | 4 ----
3 files changed, 2 insertions(+), 16 deletions(-)

diff --git a/service/migration_manager.hh b/service/migration_manager.hh
index 1e8d8b130..8eb7b0a2e 100644
--- a/service/migration_manager.hh
+++ b/service/migration_manager.hh
@@ -56,22 +56,12 @@ namespace service {

class migration_manager : public seastar::async_sharded_service<migration_manager> {
private:
- migration_notifier& _notify; /* XXX -- temporary */
-
std::unordered_map<netw::msg_addr, serialized_action, netw::msg_addr::hash> _schema_pulls;
std::vector<gms::feature::listener_registration> _feature_listeners;
seastar::gate _background_tasks;
static const std::chrono::milliseconds migration_delay;
public:
- explicit migration_manager(migration_notifier&);
-
- inline void register_listener(migration_listener* listener) {
- _notify.register_listener(listener);
- }
-
- inline void unregister_listener(migration_listener* listener) {
- _notify.unregister_listener(listener);
- }
+ migration_manager(){}

future<> schedule_schema_pull(const gms::inet_address& endpoint, const gms::endpoint_state& state);

diff --git a/main.cc b/main.cc
index b3d5ae9c6..e797b35e3 100644
--- a/main.cc
+++ b/main.cc
@@ -866,7 +866,7 @@ int main(int ac, char** av) {
// #293 - do not stop anything
// engine().at_exit([&proxy] { return proxy.stop(); });
supervisor::notify("starting migration manager");
- mm.start(std::ref(mm_notifier)).get();
+ mm.start().get();
auto stop_migration_manager = defer_verbose_shutdown("migration manager", [&mm] {
mm.stop().get();
});
diff --git a/service/migration_manager.cc b/service/migration_manager.cc
index ce46f6494..9762284dc 100644
--- a/service/migration_manager.cc
+++ b/service/migration_manager.cc
@@ -63,10 +63,6 @@ using namespace std::chrono_literals;

const std::chrono::milliseconds migration_manager::migration_delay = 60000ms;

-migration_manager::migration_manager(migration_notifier& notifier) : _notify(notifier)
-{
-}
-
future<> migration_manager::stop()
{
mlogger.info("stopping migration service");
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:03:11 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
The helper in question is static, so no need to play with the
migration_manager instances.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
database.cc | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/database.cc b/database.cc
index 0e0d8817e..2656ec431 100644
--- a/database.cc
+++ b/database.cc
@@ -1899,7 +1899,7 @@ future<utils::UUID> update_schema_version(distributed<service::storage_proxy>& p
}

future<> announce_schema_version(utils::UUID schema_version) {
- return service::get_local_migration_manager().passive_announce(schema_version);
+ return service::migration_manager::passive_announce(schema_version);
}

future<> update_schema_version_and_announce(distributed<service::storage_proxy>& proxy, schema_features features)
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:03:14 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
This is the last place where database code needs the migration_manager
instance to be alive, so now the mutual dependency between these two
is gone, only the migration_manager needs the database, but not the
vice-versa.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
database.hh | 2 +-
db/schema_tables.hh | 2 +-
distributed_loader.hh | 3 ++-
database.cc | 14 +++++++-------
db/schema_tables.cc | 4 ++--
distributed_loader.cc | 9 +++++----
main.cc | 2 +-
tests/cql_test_env.cc | 2 +-
8 files changed, 20 insertions(+), 18 deletions(-)

diff --git a/database.hh b/database.hh
index c96191958..e9e524944 100644
--- a/database.hh
+++ b/database.hh
@@ -1355,7 +1355,7 @@ class database {

void set_enable_incremental_backups(bool val) { _enable_incremental_backups = val; }

- future<> parse_system_tables(distributed<service::storage_proxy>&);
+ future<> parse_system_tables(distributed<service::storage_proxy>&, distributed<service::migration_manager>&);
database(const db::config&, database_config dbcfg, service::migration_notifier& mn);
database(database&&) = delete;
~database();
diff --git a/db/schema_tables.hh b/db/schema_tables.hh
index 4a120e753..667d62116 100644
--- a/db/schema_tables.hh
+++ b/db/schema_tables.hh
@@ -216,7 +216,7 @@ std::vector<mutation> make_update_view_mutations(lw_shared_ptr<keyspace_metadata

std::vector<mutation> make_drop_view_mutations(lw_shared_ptr<keyspace_metadata> keyspace, view_ptr view, api::timestamp_type timestamp);

-future<> maybe_update_legacy_secondary_index_mv_schema(database& db, view_ptr v);
+future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manager& mm, database& db, view_ptr v);

sstring serialize_kind(column_kind kind);
column_kind deserialize_kind(sstring kind);
diff --git a/distributed_loader.hh b/distributed_loader.hh
index ecb813bd1..22b3a1e26 100644
--- a/distributed_loader.hh
+++ b/distributed_loader.hh
@@ -50,6 +50,7 @@ class foreign_sstable_open_info;
namespace service {

class storage_proxy;
+class migration_manager;

}

@@ -68,7 +69,7 @@ class distributed_loader {
static future<> populate_keyspace(distributed<database>& db, sstring datadir, sstring ks_name);
static future<> init_system_keyspace(distributed<database>& db);
static future<> ensure_system_table_directories(distributed<database>& db);
- static future<> init_non_system_keyspaces(distributed<database>& db, distributed<service::storage_proxy>& proxy);
+ static future<> init_non_system_keyspaces(distributed<database>& db, distributed<service::storage_proxy>& proxy, distributed<service::migration_manager>& mm);
private:
static future<> cleanup_column_family_temp_sst_dirs(sstring sstdir);
static future<> handle_sstables_pending_delete(sstring pending_deletes_dir);
diff --git a/database.cc b/database.cc
index 2656ec431..90b5b29da 100644
--- a/database.cc
+++ b/database.cc
@@ -596,7 +596,7 @@ do_parse_schema_tables(distributed<service::storage_proxy>& proxy, const sstring
});
}

-future<> database::parse_system_tables(distributed<service::storage_proxy>& proxy) {
+future<> database::parse_system_tables(distributed<service::storage_proxy>& proxy, distributed<service::migration_manager>& mm) {
using namespace db::schema_tables;
return do_parse_schema_tables(proxy, db::schema_tables::KEYSPACES, [this] (schema_result_value_type &v) {
auto ksm = create_keyspace_from_schema_partition(v);
@@ -626,12 +626,12 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
});
});
});
- }).then([&proxy, this] {
- return do_parse_schema_tables(proxy, db::schema_tables::VIEWS, [this, &proxy] (schema_result_value_type &v) {
- return create_views_from_schema_partition(proxy, v.second).then([this] (std::vector<view_ptr> views) {
- return parallel_for_each(views.begin(), views.end(), [this] (auto&& v) {
- return this->add_column_family_and_make_directory(v).then([this, v] {
- return maybe_update_legacy_secondary_index_mv_schema(*this, v);
+ }).then([&proxy, &mm, this] {
+ return do_parse_schema_tables(proxy, db::schema_tables::VIEWS, [this, &proxy, &mm] (schema_result_value_type &v) {
+ return create_views_from_schema_partition(proxy, v.second).then([this, &mm] (std::vector<view_ptr> views) {
+ return parallel_for_each(views.begin(), views.end(), [this, &mm] (auto&& v) {
+ return this->add_column_family_and_make_directory(v).then([this, &mm, v] {
+ return maybe_update_legacy_secondary_index_mv_schema(mm.local(), *this, v);
});
});
});
diff --git a/db/schema_tables.cc b/db/schema_tables.cc
index b32e7e121..ad4997680 100644
--- a/db/schema_tables.cc
+++ b/db/schema_tables.cc
@@ -2814,7 +2814,7 @@ std::vector<sstring> all_table_names(schema_features features) {
boost::adaptors::transformed([] (auto schema) { return schema->cf_name(); }));
}

-future<> maybe_update_legacy_secondary_index_mv_schema(database& db, view_ptr v) {
+future<> maybe_update_legacy_secondary_index_mv_schema(service::migration_manager& mm, database& db, view_ptr v) {
// TODO(sarna): Remove once computed columns are guaranteed to be featured in the whole cluster.
// Legacy format for a secondary index used a hardcoded "token" column, which ensured a proper
// order for indexed queries. This "token" column is now implemented as a computed column,
@@ -2842,7 +2842,7 @@ future<> maybe_update_legacy_secondary_index_mv_schema(database& db, view_ptr v)
if (base_schema->columns_by_name().count(first_view_ck.name()) == 0) {
schema_builder builder{schema_ptr(v)};
builder.mark_column_computed(first_view_ck.name(), std::make_unique<token_column_computation>());
- return service::get_local_migration_manager().announce_view_update(view_ptr(builder.build()), true);
+ return mm.announce_view_update(view_ptr(builder.build()), true);
}
return make_ready_future<>();
}
diff --git a/distributed_loader.cc b/distributed_loader.cc
index f71c643df..2cd13e382 100644
--- a/distributed_loader.cc
+++ b/distributed_loader.cc
@@ -865,10 +865,11 @@ future<> distributed_loader::ensure_system_table_directories(distributed<databas
});
}

-future<> distributed_loader::init_non_system_keyspaces(distributed<database>& db, distributed<service::storage_proxy>& proxy) {
- return seastar::async([&db, &proxy] {
- db.invoke_on_all([&proxy] (database& db) {
- return db.parse_system_tables(proxy);
+future<> distributed_loader::init_non_system_keyspaces(distributed<database>& db,
+ distributed<service::storage_proxy>& proxy, distributed<service::migration_manager>& mm) {
+ return seastar::async([&db, &proxy, &mm] {
+ db.invoke_on_all([&proxy, &mm] (database& db) {
+ return db.parse_system_tables(proxy, mm);
}).get();

const auto& cfg = db.local().get_config();
diff --git a/main.cc b/main.cc
index e797b35e3..7fc615c12 100644
--- a/main.cc
+++ b/main.cc
@@ -899,7 +899,7 @@ int main(int ac, char** av) {
distributed_loader::ensure_system_table_directories(db).get();

supervisor::notify("loading non-system sstables");
- distributed_loader::init_non_system_keyspaces(db, proxy).get();
+ distributed_loader::init_non_system_keyspaces(db, proxy, mm).get();

supervisor::notify("starting view update generator");
view_update_generator.start(std::ref(db), std::ref(proxy)).get();
diff --git a/tests/cql_test_env.cc b/tests/cql_test_env.cc
index 461300ceb..ad650b113 100644
--- a/tests/cql_test_env.cc
+++ b/tests/cql_test_env.cc
@@ -473,7 +473,7 @@ class single_node_cql_env : public cql_test_env {
auto cfm = pair.second;
return ks.make_directory_for_column_family(cfm->cf_name(), cfm->id());
}).get();
- distributed_loader::init_non_system_keyspaces(*db, proxy).get();
+ distributed_loader::init_non_system_keyspaces(*db, proxy, mm).get();
// In main.cc we call db::system_keyspace::setup which calls
// minimal_setup and init_local_cache
db::system_keyspace::minimal_setup(*db, qp);
--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:03:16 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
There are several places where migration_manager needs storage_service
reference to get the database from, thus forming the mutual dependency
between them. This is the simplest case where the migration_manager
link to the storage_service can be cut -- the databse reference can be
obtained from storage_proxy instead.

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
service/migration_manager.cc | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/service/migration_manager.cc b/service/migration_manager.cc
index 9762284dc..1c12df333 100644
--- a/service/migration_manager.cc
+++ b/service/migration_manager.cc
@@ -140,7 +140,7 @@ void migration_manager::init_messaging_service()
});
});
ms.register_schema_check([] {
- return make_ready_future<utils::UUID>(service::get_local_storage_service().db().local().get_version());
+ return make_ready_future<utils::UUID>(service::get_local_storage_proxy().get_db().local().get_version());
});
}

--
2.20.1

Pavel Emelyanov

<xemul@scylladb.com>
unread,
Dec 17, 2019, 5:03:17 AM12/17/19
to seastar-dev@googlegroups.com, Pavel Emelyanov
The factory is purely a state-less thing, there is no difference what
instance of it to use, so we may omit referencing the storage_service
in passive_announce

This is 2nd simple migration_manager -> storage_service link to cut
(more to come later).

Signed-off-by: Pavel Emelyanov <xe...@scylladb.com>
---
service/migration_manager.cc | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/service/migration_manager.cc b/service/migration_manager.cc
index 1c12df333..546f4cfab 100644
--- a/service/migration_manager.cc
+++ b/service/migration_manager.cc
@@ -934,9 +934,9 @@ future<> migration_manager::announce(std::vector<mutation> schema) {
*/
future<> migration_manager::passive_announce(utils::UUID version) {
return gms::get_gossiper().invoke_on(0, [version] (auto&& gossiper) {
- auto& ss = service::get_local_storage_service();
+ gms::versioned_value::factory value_factory;
mlogger.debug("Gossiping my schema version {}", version);
- return gossiper.add_local_application_state(gms::application_state::SCHEMA, ss.value_factory.schema(version));
+ return gossiper.add_local_application_state(gms::application_state::SCHEMA, value_factory.schema(version));
});
}

--
2.20.1

Tomasz Grabiec

<tgrabiec@scylladb.com>
unread,
Dec 17, 2019, 12:52:48 PM12/17/19
to Pavel Emelyanov, seastar-dev
Wrong mailing list.

--
You received this message because you are subscribed to the Google Groups "seastar-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/20191217100242.12498-1-xemul%40scylladb.com.
Reply all
Reply to author
Forward
0 new messages