From: Kamil Braun <
kbr...@scylladb.com>
Committer: Kamil Braun <
kbr...@scylladb.com>
Branch: next
Merge 'Migrate system_auth to raft group0' from Marcin Maliszkiewicz
This patch series makes all auth writes serialized via raft. Reads stay
eventually consistent for performance reasons. To make transition to new
code easier data is stored in a newly created keyspace: system_auth_v2.
Internally the difference is that instead of executing CQL directly for
writes we generate mutations and then announce them via raft group0. Per
commit descriptions provide more implementation details.
Refs
https://github.com/scylladb/scylladb/issues/16970
Fixes
https://github.com/scylladb/scylladb/issues/11157
Closes scylladb/scylladb#16578
* github.com:scylladb/scylladb:
test: extend auth-v2 migration test to catch stale static
test: add auth-v2 migration test
test: add auth-v2 snapshot transfer test
test: auth: add tests for lost quorum and command splitting
test: pylib: disconnect driver before re-connection
test: adjust tests for auth-v2
auth: implement auth-v2 migration
auth: remove static from queries on auth-v2 path
auth: coroutinize functions in password_authenticator
auth: coroutinize functions in standard_role_manager
auth: coroutinize functions in default_authorizer
storage_service: add support for auth-v2 raft snapshots
storage_service: extract getting mutations in raft snapshot to a common function
auth: service: capture string_view by value
alternator: add support for auth-v2
auth: add auth-v2 write paths
auth: add raft_group0_client as dependency
cql3: auth: add a way to create mutations without executing
cql3: run auth DML writes on shard 0 and with raft guard
service: don't loose service_level_controller when bouncing client_state
auth: put system_auth and users consts in legacy namespace
cql3: parametrize keyspace name in auth related statements
auth: parametrize keyspace name in roles metadata helpers
auth: parametrize keyspace name in password_authenticator
auth: parametrize keyspace name in standard_role_manager
auth: remove redundant consts auth::meta::*::qualified_name
auth: parametrize keyspace name in default_authorizer
db: make all system_auth_v2 tables use schema commitlog
db: add system_auth_v2 tables
db: add system_auth_v2 keyspace
---
diff --git a/alternator/auth.cc b/alternator/auth.cc
--- a/alternator/auth.cc
+++ b/alternator/auth.cc
@@ -7,6 +7,7 @@
*/
#include "alternator/error.hh"
+#include "auth/common.hh"
#include "log.hh"
#include <string>
#include <string_view>
@@ -24,8 +25,8 @@ namespace alternator {
static logging::logger alogger("alternator-auth");
-future<std::string> get_key_from_roles(service::storage_proxy& proxy, std::string username) {
- schema_ptr schema = proxy.data_dictionary().find_schema("system_auth", "roles");
+future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::service& as, std::string username) {
+ schema_ptr schema = proxy.data_dictionary().find_schema(auth::get_auth_ks_name(as.query_processor()), "roles");
partition_key pk = partition_key::from_single_value(*schema, utf8_type->decompose(username));
dht::partition_range_vector partition_ranges{dht::partition_range(dht::decorate_key(*schema, pk))};
std::vector<query::clustering_range> bounds{query::clustering_range::make_open_ended_both_sides()};
diff --git a/alternator/auth.hh b/alternator/auth.hh
--- a/alternator/auth.hh
+++ b/alternator/auth.hh
@@ -10,6 +10,7 @@
#include <string>
#include "utils/loading_cache.hh"
+#include "auth/service.hh"
namespace service {
class storage_proxy;
@@ -19,6 +20,6 @@ namespace alternator {
using key_cache = utils::loading_cache<std::string, std::string, 1>;
-future<std::string> get_key_from_roles(service::storage_proxy& proxy, std::string username);
+future<std::string> get_key_from_roles(service::storage_proxy& proxy, auth::service& as, std::string username);
}
diff --git a/alternator/server.cc b/alternator/server.cc
--- a/alternator/server.cc
+++ b/alternator/server.cc
@@ -310,8 +310,8 @@ future<std::string> server::verify_signature(const request& req, const chunked_c
}
}
- auto cache_getter = [&proxy = _proxy] (std::string username) {
- return get_key_from_roles(proxy, std::move(username));
+ auto cache_getter = [&proxy = _proxy, &as = _auth_service] (std::string username) {
+ return get_key_from_roles(proxy, as, std::move(username));
};
return _key_cache.get_ptr(user, cache_getter).then([this, &req, &content,
user = std::move(user),
diff --git a/auth/allow_all_authenticator.cc b/auth/allow_all_authenticator.cc
--- a/auth/allow_all_authenticator.cc
+++ b/auth/allow_all_authenticator.cc
@@ -20,6 +20,7 @@ static const class_registrator<
authenticator,
allow_all_authenticator,
cql3::query_processor&,
+ ::service::raft_group0_client&,
::service::migration_manager&> registration("org.apache.cassandra.auth.AllowAllAuthenticator");
}
diff --git a/auth/allow_all_authenticator.hh b/auth/allow_all_authenticator.hh
--- a/auth/allow_all_authenticator.hh
+++ b/auth/allow_all_authenticator.hh
@@ -28,7 +28,7 @@ extern const std::string_view allow_all_authenticator_name;
class allow_all_authenticator final : public authenticator {
public:
- allow_all_authenticator(cql3::query_processor&, ::service::migration_manager&) {
+ allow_all_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&) {
}
virtual future<> start() override {
diff --git a/auth/allow_all_authorizer.cc b/auth/allow_all_authorizer.cc
--- a/auth/allow_all_authorizer.cc
+++ b/auth/allow_all_authorizer.cc
@@ -20,6 +20,7 @@ static const class_registrator<
authorizer,
allow_all_authorizer,
cql3::query_processor&,
+ ::service::raft_group0_client&,
::service::migration_manager&> registration("org.apache.cassandra.auth.AllowAllAuthorizer");
}
diff --git a/auth/allow_all_authorizer.hh b/auth/allow_all_authorizer.hh
--- a/auth/allow_all_authorizer.hh
+++ b/auth/allow_all_authorizer.hh
@@ -16,6 +16,7 @@ class query_processor;
namespace service {
class migration_manager;
+class raft_group0_client;
}
namespace auth {
@@ -24,7 +25,7 @@ extern const std::string_view allow_all_authorizer_name;
class allow_all_authorizer final : public authorizer {
public:
- allow_all_authorizer(cql3::query_processor&, ::service::migration_manager&) {
+ allow_all_authorizer(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&) {
}
virtual future<> start() override {
diff --git a/auth/certificate_authenticator.cc b/auth/certificate_authenticator.cc
--- a/auth/certificate_authenticator.cc
+++ b/auth/certificate_authenticator.cc
@@ -30,13 +30,14 @@ static const std::string cfg_source_altname = "ALTNAME";
static const class_registrator<auth::authenticator
, auth::certificate_authenticator
, cql3::query_processor&
+ , ::service::raft_group0_client&
, ::service::migration_manager&> cert_auth_reg(CERT_AUTH_NAME);
enum class auth::certificate_authenticator::query_source {
subject, altname
};
-auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::migration_manager&)
+auth::certificate_authenticator::certificate_authenticator(cql3::query_processor& qp, ::service::raft_group0_client&, ::service::migration_manager&)
: _queries([&] {
auto& conf = qp.db().get_config();
auto queries = conf.auth_certificate_role_queries();
diff --git a/auth/certificate_authenticator.hh b/auth/certificate_authenticator.hh
--- a/auth/certificate_authenticator.hh
+++ b/auth/certificate_authenticator.hh
@@ -20,6 +20,7 @@ class query_processor;
namespace service {
class migration_manager;
+class raft_group0_client;
}
namespace auth {
@@ -30,7 +31,7 @@ class certificate_authenticator : public authenticator {
enum class query_source;
std::vector<std::pair<query_source, boost::regex>> _queries;
public:
- certificate_authenticator(cql3::query_processor&, ::service::migration_manager&);
+ certificate_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
~certificate_authenticator();
future<> start() override;
diff --git a/auth/common.cc b/auth/common.cc
--- a/auth/common.cc
+++ b/auth/common.cc
@@ -8,27 +8,50 @@
#include "auth/common.hh"
+#include <optional>
#include <seastar/core/coroutine.hh>
#include <seastar/core/shared_ptr.hh>
+#include <seastar/core/sharded.hh>
+#include "mutation/canonical_mutation.hh"
+#include "schema/schema_fwd.hh"
+#include "timestamp.hh"
#include "utils/exponential_backoff_retry.hh"
#include "cql3/query_processor.hh"
#include "cql3/statements/create_table_statement.hh"
#include "schema/schema_builder.hh"
#include "service/migration_manager.hh"
+#include "service/raft/group0_state_machine.hh"
#include "timeout_config.hh"
+#include "db/config.hh"
+#include "db/system_auth_keyspace.hh"
+#include "utils/error_injection.hh"
namespace auth {
namespace meta {
-constinit const std::string_view AUTH_KS("system_auth");
-constinit const std::string_view USERS_CF("users");
+namespace legacy {
+ constinit const std::string_view AUTH_KS("system_auth");
+ constinit const std::string_view USERS_CF("users");
+} // namespace legacy
constinit const std::string_view AUTH_PACKAGE_NAME("org.apache.cassandra.auth.");
+} // namespace meta
+static logging::logger auth_log("auth");
+
+bool legacy_mode(cql3::query_processor& qp) {
+ return !qp.db().get_config().check_experimental(
+ db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES) ||
+ qp.auth_version <= db::system_auth_keyspace::version_t::v1;
}
-static logging::logger auth_log("auth");
+std::string_view get_auth_ks_name(cql3::query_processor& qp) {
+ if (legacy_mode(qp)) {
+ return meta::legacy::AUTH_KS;
+ }
+ return db::system_auth_keyspace::NAME;
+}
// Func must support being invoked more than once.
future<> do_after_system_ready(seastar::abort_source& as, seastar::noncopyable_function<future<>()> func) {
@@ -55,7 +78,7 @@ static future<> create_metadata_table_if_missing_impl(
auto parsed_statement = cql3::query_processor::parse_statement(cql);
auto& parsed_cf_statement = static_cast<cql3::statements::raw::cf_statement&>(*parsed_statement);
- parsed_cf_statement.prepare_keyspace(meta::AUTH_KS);
+ parsed_cf_statement.prepare_keyspace(meta::legacy::AUTH_KS);
auto statement = static_pointer_cast<cql3::statements::create_table_statement>(
parsed_cf_statement.prepare(db, qp.get_cql_stats())->statement);
@@ -98,4 +121,85 @@ ::service::query_state& internal_distributed_query_state() noexcept {
return qs;
}
+static future<> announce_mutations_with_guard(
+ ::service::raft_group0_client& group0_client,
+ std::vector<canonical_mutation> muts,
+ ::service::group0_guard group0_guard,
+ seastar::abort_source* as) {
+ auto group0_cmd = group0_client.prepare_command(
+ ::service::write_mutations{
+ .mutations{std::move(muts)},
+ },
+ group0_guard,
+ "auth: modify internal data"
+ );
+ return group0_client.add_entry(std::move(group0_cmd), std::move(group0_guard), as);
+}
+
+future<> announce_mutations_with_batching(
+ ::service::raft_group0_client& group0_client,
+ start_operation_func_t start_operation_func,
+ std::function<mutations_generator(api::timestamp_type& t)> gen,
+ seastar::abort_source* as) {
+ // account for command's overhead, it's better to use smaller threshold than constantly bounce off the limit
+ size_t memory_threshold = group0_client.max_command_size() * 0.75;
+ utils::get_local_injector().inject("auth_announce_mutations_command_max_size",
+ [&memory_threshold] {
+ memory_threshold = 1000;
+ });
+
+ size_t memory_usage = 0;
+ std::vector<canonical_mutation> muts;
+
+ // guard has to be taken before we execute code in gen as
+ // it can do read-before-write and we want announce_mutations
+ // operation to be linearizable with other such calls,
+ // for instance if we do select and then delete in gen
+ // we want both to operate on the same data or fail
+ // if someone else modified it in the middle
+ std::optional<::service::group0_guard> group0_guard;
+ group0_guard = co_await start_operation_func(as);
+ auto timestamp = group0_guard->write_timestamp();
+
+ auto g = gen(timestamp);
+ while (auto mut = co_await g()) {
+ muts.push_back(canonical_mutation{*mut});
+ memory_usage += muts.back().representation().size();
+ if (memory_usage >= memory_threshold) {
+ if (!group0_guard) {
+ group0_guard = co_await start_operation_func(as);
+ timestamp = group0_guard->write_timestamp();
+ }
+ co_await announce_mutations_with_guard(group0_client, std::move(muts), std::move(*group0_guard), as);
+ group0_guard = std::nullopt;
+ memory_usage = 0;
+ muts = {};
+ }
+ }
+ if (!muts.empty()) {
+ if (!group0_guard) {
+ group0_guard = co_await start_operation_func(as);
+ timestamp = group0_guard->write_timestamp();
+ }
+ co_await announce_mutations_with_guard(group0_client, std::move(muts), std::move(*group0_guard), as);
+ }
+}
+
+future<> announce_mutations(
+ cql3::query_processor& qp,
+ ::service::raft_group0_client& group0_client,
+ const sstring query_string,
+ std::vector<data_value_or_unset> values,
+ seastar::abort_source* as) {
+ auto group0_guard = co_await group0_client.start_operation(as);
+ auto timestamp = group0_guard.write_timestamp();
+ auto muts = co_await qp.get_mutations_internal(
+ query_string,
+ internal_distributed_query_state(),
+ timestamp,
+ std::move(values));
+ std::vector<canonical_mutation> cmuts = {muts.begin(), muts.end()};
+ co_await announce_mutations_with_guard(group0_client, std::move(cmuts), std::move(group0_guard), as);
+}
+
}
diff --git a/auth/common.hh b/auth/common.hh
--- a/auth/common.hh
+++ b/auth/common.hh
@@ -18,7 +18,9 @@
#include <seastar/core/sstring.hh>
#include <seastar/core/smp.hh>
-#include "seastarx.hh"
+#include "schema/schema_registry.hh"
+#include "types/types.hh"
+#include "service/raft/raft_group0_client.hh"
using namespace std::chrono_literals;
@@ -39,12 +41,22 @@ namespace auth {
namespace meta {
-constexpr std::string_view DEFAULT_SUPERUSER_NAME("cassandra");
+namespace legacy {
extern constinit const std::string_view AUTH_KS;
extern constinit const std::string_view USERS_CF;
+} // namespace legacy
+
+constexpr std::string_view DEFAULT_SUPERUSER_NAME("cassandra");
extern constinit const std::string_view AUTH_PACKAGE_NAME;
-}
+} // namespace meta
+
+// This is a helper to check whether auth-v2 is on.
+bool legacy_mode(cql3::query_processor& qp);
+
+// We have legacy implementation using different keyspace
+// and need to parametrize depending on runtime feature.
+std::string_view get_auth_ks_name(cql3::query_processor& qp);
template <class Task>
future<> once_among_shards(Task&& f) {
@@ -69,4 +81,26 @@ future<> create_metadata_table_if_missing(
///
::service::query_state& internal_distributed_query_state() noexcept;
+// Execute update query via group0 mechanism, mutations will be applied on all nodes.
+// Use this function when need to perform read before write on a single guard or if
+// you have more than one mutation and potentially exceed single command size limit.
+using start_operation_func_t = std::function<future<::service::group0_guard>(abort_source*)>;
+using mutations_generator = coroutine::experimental::generator<mutation>;
+future<> announce_mutations_with_batching(
+ ::service::raft_group0_client& group0_client,
+ // since we can operate also in topology coordinator context where we need stronger
+ // guarantees than start_operation from group0_client gives we allow to inject custom
+ // function here
+ start_operation_func_t start_operation_func,
+ std::function<mutations_generator(api::timestamp_type& t)> gen,
+ seastar::abort_source* as);
+
+// Execute update query via group0 mechanism, mutations will be applied on all nodes.
+future<> announce_mutations(
+ cql3::query_processor& qp,
+ ::service::raft_group0_client& group0_client,
+ const sstring query_string,
+ std::vector<data_value_or_unset> values,
+ seastar::abort_source* as);
+
}
diff --git a/auth/default_authorizer.cc b/auth/default_authorizer.cc
--- a/auth/default_authorizer.cc
+++ b/auth/default_authorizer.cc
@@ -9,6 +9,7 @@
*/
#include "auth/default_authorizer.hh"
+#include "db/system_auth_keyspace.hh"
extern "C" {
#include <crypt.h>
@@ -47,10 +48,12 @@ static const class_registrator<
authorizer,
default_authorizer,
cql3::query_processor&,
+ ::service::raft_group0_client&,
::service::migration_manager&> password_auth_reg("org.apache.cassandra.auth.CassandraAuthorizer");
-default_authorizer::default_authorizer(cql3::query_processor& qp, ::service::migration_manager& mm)
+default_authorizer::default_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: _qp(qp)
+ , _group0_client(g0)
, _migration_manager(mm) {
}
@@ -60,11 +63,11 @@ default_authorizer::~default_authorizer() {
static const sstring legacy_table_name{"permissions"};
bool default_authorizer::legacy_metadata_exists() const {
- return _qp.db().has_schema(meta::AUTH_KS, legacy_table_name);
+ return _qp.db().has_schema(meta::legacy::AUTH_KS, legacy_table_name);
}
-future<bool> default_authorizer::any_granted() const {
- static const sstring query = format("SELECT * FROM {}.{} LIMIT 1", meta::AUTH_KS, PERMISSIONS_CF);
+future<bool> default_authorizer::legacy_any_granted() const {
+ static const sstring query = format("SELECT * FROM {}.{} LIMIT 1", meta::legacy::AUTH_KS, PERMISSIONS_CF);
return _qp.execute_internal(
query,
@@ -77,7 +80,7 @@ future<bool> default_authorizer::any_granted() const {
future<> default_authorizer::migrate_legacy_metadata() {
alogger.info("Starting migration of legacy permissions metadata.");
- static const sstring query = format("SELECT * FROM {}.{}", meta::AUTH_KS, legacy_table_name);
+ static const sstring query = format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, legacy_table_name);
return _qp.execute_internal(
query,
@@ -108,7 +111,7 @@ future<> default_authorizer::start() {
"{} set<text>,"
"PRIMARY KEY({}, {})"
") WITH gc_grace_seconds={}",
- meta::AUTH_KS,
+ meta::legacy::AUTH_KS,
PERMISSIONS_CF,
ROLE_NAME,
RESOURCE_NAME,
@@ -128,7 +131,7 @@ future<> default_authorizer::start() {
_migration_manager.wait_for_schema_agreement(_qp.db().real_database(), db::timeout_clock::time_point::max(), &_as).get();
if (legacy_metadata_exists()) {
- if (!any_granted().get()) {
+ if (!legacy_any_granted().get()) {
migrate_legacy_metadata().get();
return;
}
@@ -149,27 +152,25 @@ future<> default_authorizer::stop() {
future<permission_set>
default_authorizer::authorize(const role_or_anonymous& maybe_role, const resource& r) const {
if (is_anonymous(maybe_role)) {
- return make_ready_future<permission_set>(permissions::NONE);
+ co_return permissions::NONE;
}
- static const sstring query = format("SELECT {} FROM {}.{} WHERE {} = ? AND {} = ?",
+ const sstring query = format("SELECT {} FROM {}.{} WHERE {} = ? AND {} = ?",
PERMISSIONS_NAME,
- meta::AUTH_KS,
+ get_auth_ks_name(_qp),
PERMISSIONS_CF,
ROLE_NAME,
RESOURCE_NAME);
- return _qp.execute_internal(
+ const auto results = co_await _qp.execute_internal(
query,
db::consistency_level::LOCAL_ONE,
{*
maybe_role.name,
r.name()},
- cql3::query_processor::cache_internal::yes).then([](::shared_ptr<cql3::untyped_result_set> results) {
- if (results->empty()) {
- return permissions::NONE;
- }
-
- return permissions::from_strings(results->one().get_set<sstring>(PERMISSIONS_NAME));
- });
+ cql3::query_processor::cache_internal::yes);
+ if (results->empty()) {
+ co_return permissions::NONE;
+ }
+ co_return permissions::from_strings(results->one().get_set<sstring>(PERMISSIONS_NAME));
}
future<>
@@ -178,23 +179,24 @@ default_authorizer::modify(
permission_set set,
const resource& resource,
std::string_view op) {
- return do_with(
- format("UPDATE {}.{} SET {} = {} {} ? WHERE {} = ? AND {} = ?",
- meta::AUTH_KS,
- PERMISSIONS_CF,
- PERMISSIONS_NAME,
- PERMISSIONS_NAME,
- op,
- ROLE_NAME,
- RESOURCE_NAME),
- [this, &role_name, set, &resource](const auto& query) {
- return _qp.execute_internal(
+ const sstring query = format("UPDATE {}.{} SET {} = {} {} ? WHERE {} = ? AND {} = ?",
+ get_auth_ks_name(_qp),
+ PERMISSIONS_CF,
+ PERMISSIONS_NAME,
+ PERMISSIONS_NAME,
+ op,
+ ROLE_NAME,
+ RESOURCE_NAME);
+ if (legacy_mode(_qp)) {
+ co_return co_await _qp.execute_internal(
query,
db::consistency_level::ONE,
internal_distributed_query_state(),
{permissions::to_strings(set), sstring(role_name),
resource.name()},
cql3::query_processor::cache_internal::no).discard_result();
- });
+ }
+ co_return co_await announce_mutations(_qp, _group0_client, query,
+ {permissions::to_strings(set), sstring(role_name),
resource.name()}, &_as);
}
@@ -207,58 +209,57 @@ future<> default_authorizer::revoke(std::string_view role_name, permission_set s
}
future<std::vector<permission_details>> default_authorizer::list_all() const {
- static const sstring query = format("SELECT {}, {}, {} FROM {}.{}",
+ const sstring query = format("SELECT {}, {}, {} FROM {}.{}",
ROLE_NAME,
RESOURCE_NAME,
PERMISSIONS_NAME,
- meta::AUTH_KS,
+ get_auth_ks_name(_qp),
PERMISSIONS_CF);
- return _qp.execute_internal(
+ const auto results = co_await _qp.execute_internal(
query,
db::consistency_level::ONE,
internal_distributed_query_state(),
{},
- cql3::query_processor::cache_internal::yes).then([](::shared_ptr<cql3::untyped_result_set> results) {
- std::vector<permission_details> all_details;
-
- for (const auto& row : *results) {
- if (row.has(PERMISSIONS_NAME)) {
- auto role_name = row.get_as<sstring>(ROLE_NAME);
- auto resource = parse_resource(row.get_as<sstring>(RESOURCE_NAME));
- auto perms = permissions::from_strings(row.get_set<sstring>(PERMISSIONS_NAME));
- all_details.push_back(permission_details{std::move(role_name), std::move(resource), std::move(perms)});
- }
+ cql3::query_processor::cache_internal::yes);
+
+ std::vector<permission_details> all_details;
+ for (const auto& row : *results) {
+ if (row.has(PERMISSIONS_NAME)) {
+ auto role_name = row.get_as<sstring>(ROLE_NAME);
+ auto resource = parse_resource(row.get_as<sstring>(RESOURCE_NAME));
+ auto perms = permissions::from_strings(row.get_set<sstring>(PERMISSIONS_NAME));
+ all_details.push_back(permission_details{std::move(role_name), std::move(resource), std::move(perms)});
}
-
- return all_details;
- });
+ }
+ co_return all_details;
}
future<> default_authorizer::revoke_all(std::string_view role_name) {
- static const sstring query = format("DELETE FROM {}.{} WHERE {} = ?",
- meta::AUTH_KS,
- PERMISSIONS_CF,
- ROLE_NAME);
-
- return _qp.execute_internal(
- query,
- db::consistency_level::ONE,
- internal_distributed_query_state(),
- {sstring(role_name)},
- cql3::query_processor::cache_internal::no).discard_result().handle_exception([role_name](auto ep) {
- try {
- std::rethrow_exception(ep);
- } catch (exceptions::request_execution_exception& e) {
- alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
+ try {
+ const sstring query = format("DELETE FROM {}.{} WHERE {} = ?",
+ get_auth_ks_name(_qp),
+ PERMISSIONS_CF,
+ ROLE_NAME);
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(
+ query,
+ db::consistency_level::ONE,
+ internal_distributed_query_state(),
+ {sstring(role_name)},
+ cql3::query_processor::cache_internal::no).discard_result();
+ } else {
+ co_await announce_mutations(_qp, _group0_client, query, {sstring(role_name)}, &_as);
}
- });
+ } catch (exceptions::request_execution_exception& e) {
+ alogger.warn("CassandraAuthorizer failed to revoke all permissions of {}: {}", role_name, e);
+ }
}
-future<> default_authorizer::revoke_all(const resource& resource) {
+future<> default_authorizer::revoke_all_legacy(const resource& resource) {
static const sstring query = format("SELECT {} FROM {}.{} WHERE {} = ? ALLOW FILTERING",
ROLE_NAME,
- meta::AUTH_KS,
+ get_auth_ks_name(_qp),
PERMISSIONS_CF,
RESOURCE_NAME);
@@ -274,7 +275,7 @@ future<> default_authorizer::revoke_all(const resource& resource) {
res->end(),
[this, res, resource](const cql3::untyped_result_set::row& r) {
static const sstring query = format("DELETE FROM {}.{} WHERE {} = ? AND {} = ?",
- meta::AUTH_KS,
+ get_auth_ks_name(_qp),
PERMISSIONS_CF,
ROLE_NAME,
RESOURCE_NAME);
@@ -300,8 +301,53 @@ future<> default_authorizer::revoke_all(const resource& resource) {
});
}
+future<> default_authorizer::revoke_all(const resource& resource) {
+ if (legacy_mode(_qp)) {
+ co_return co_await revoke_all_legacy(resource);
+ }
+ auto name =
resource.name();
+ try {
+ auto gen = [this, name] (api::timestamp_type& t) -> mutations_generator {
+ const sstring query = format("SELECT {} FROM {}.{} WHERE {} = ? ALLOW FILTERING",
+ ROLE_NAME,
+ get_auth_ks_name(_qp),
+ PERMISSIONS_CF,
+ RESOURCE_NAME);
+ auto res = co_await _qp.execute_internal(
+ query,
+ db::consistency_level::LOCAL_ONE,
+ {name},
+ cql3::query_processor::cache_internal::no);
+ for (const auto& r : *res) {
+ const sstring query = format("DELETE FROM {}.{} WHERE {} = ? AND {} = ?",
+ get_auth_ks_name(_qp),
+ PERMISSIONS_CF,
+ ROLE_NAME,
+ RESOURCE_NAME);
+ auto muts = co_await _qp.get_mutations_internal(
+ query,
+ internal_distributed_query_state(),
+ t,
+ {r.get_as<sstring>(ROLE_NAME), name});
+ if (muts.size() != 1) {
+ on_internal_error(alogger,
+ format("expecting single delete mutation, got {}", muts.size()));
+ }
+ co_yield std::move(muts[0]);
+ }
+ };
+ co_await announce_mutations_with_batching(
+ _group0_client,
+ std::bind_front(&::service::raft_group0_client::start_operation, &_group0_client),
+ std::move(gen),
+ &_as);
+ } catch (exceptions::request_execution_exception& e) {
+ alogger.warn("CassandraAuthorizer failed to revoke all permissions on {}: {}", name, e);
+ }
+}
+
const resource_set& default_authorizer::protected_resources() const {
- static const resource_set resources({ make_data_resource(meta::AUTH_KS, PERMISSIONS_CF) });
+ static const resource_set resources({ make_data_resource(meta::legacy::AUTH_KS, PERMISSIONS_CF) });
return resources;
}
diff --git a/auth/default_authorizer.hh b/auth/default_authorizer.hh
--- a/auth/default_authorizer.hh
+++ b/auth/default_authorizer.hh
@@ -14,6 +14,7 @@
#include "auth/authorizer.hh"
#include "service/migration_manager.hh"
+#include "service/raft/raft_group0_client.hh"
namespace cql3 {
@@ -25,6 +26,7 @@ namespace auth {
class default_authorizer : public authorizer {
cql3::query_processor& _qp;
+ ::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
@@ -33,7 +35,7 @@ class default_authorizer : public authorizer {
future<> _finished{make_ready_future<>()};
public:
- default_authorizer(cql3::query_processor&, ::service::migration_manager&);
+ default_authorizer(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
~default_authorizer();
@@ -60,7 +62,9 @@ public:
private:
bool legacy_metadata_exists() const;
- future<bool> any_granted() const;
+ future<> revoke_all_legacy(const resource&);
+
+ future<bool> legacy_any_granted() const;
future<> migrate_legacy_metadata();
diff --git a/auth/maintenance_socket_role_manager.cc b/auth/maintenance_socket_role_manager.cc
--- a/auth/maintenance_socket_role_manager.cc
+++ b/auth/maintenance_socket_role_manager.cc
@@ -21,6 +21,7 @@ static const class_registrator<
role_manager,
maintenance_socket_role_manager,
cql3::query_processor&,
+ ::service::raft_group0_client&,
::service::migration_manager&> registration(sstring{maintenance_socket_role_manager_name});
diff --git a/auth/maintenance_socket_role_manager.hh b/auth/maintenance_socket_role_manager.hh
--- a/auth/maintenance_socket_role_manager.hh
+++ b/auth/maintenance_socket_role_manager.hh
@@ -18,6 +18,7 @@ class query_processor;
namespace service {
class migration_manager;
+class raft_group0_client;
}
namespace auth {
@@ -28,7 +29,7 @@ extern const std::string_view maintenance_socket_role_manager_name;
// system_auth keyspace, which may be not yet created when the maintenance socket starts listening.
class maintenance_socket_role_manager final : public role_manager {
public:
- maintenance_socket_role_manager(cql3::query_processor&, ::service::migration_manager&) {}
+ maintenance_socket_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&) {}
virtual std::string_view qualified_java_name() const noexcept override;
diff --git a/auth/password_authenticator.cc b/auth/password_authenticator.cc
--- a/auth/password_authenticator.cc
+++ b/auth/password_authenticator.cc
@@ -46,6 +46,7 @@ static const class_registrator<
authenticator,
password_authenticator,
cql3::query_processor&,
+ ::service::raft_group0_client&,
::service::migration_manager&> password_auth_reg("org.apache.cassandra.auth.PasswordAuthenticator");
static thread_local auto rng_for_salt = std::default_random_engine(std::random_device{}());
@@ -61,8 +62,9 @@ std::string password_authenticator::default_superuser(const db::config& cfg) {
password_authenticator::~password_authenticator() {
}
-password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::migration_manager& mm)
+password_authenticator::password_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: _qp(qp)
+ , _group0_client(g0)
, _migration_manager(mm)
, _stopped(make_ready_future<>())
, _superuser(default_superuser(qp.db().get_config()))
@@ -72,23 +74,23 @@ static bool has_salted_hash(const cql3::untyped_result_set_row& row) {
return !row.get_or<sstring>(SALTED_HASH, "").empty();
}
-static const sstring& update_row_query() {
- static const sstring update_row_query = format("UPDATE {} SET {} = ? WHERE {} = ?",
- meta::roles_table::qualified_name,
+sstring password_authenticator::update_row_query() const {
+ return format("UPDATE {}.{} SET {} = ? WHERE {} = ?",
+ get_auth_ks_name(_qp),
+ meta::roles_table::name,
SALTED_HASH,
meta::roles_table::role_col_name);
- return update_row_query;
}
static const sstring legacy_table_name{"credentials"};
bool password_authenticator::legacy_metadata_exists() const {
- return _qp.db().has_schema(meta::AUTH_KS, legacy_table_name);
+ return _qp.db().has_schema(meta::legacy::AUTH_KS, legacy_table_name);
}
future<> password_authenticator::migrate_legacy_metadata() const {
plogger.info("Starting migration of legacy authentication metadata.");
- static const sstring query = format("SELECT * FROM {}.{}", meta::AUTH_KS, legacy_table_name);
+ static const sstring query = format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, legacy_table_name);
return _qp.execute_internal(
query,
@@ -98,9 +100,9 @@ future<> password_authenticator::migrate_legacy_metadata() const {
return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) {
auto username = row.get_as<sstring>("username");
auto salted_hash = row.get_as<sstring>(SALTED_HASH);
-
+ static const auto query = update_row_query();
return _qp.execute_internal(
- update_row_query(),
+ query,
consistency_for_user(username),
internal_distributed_query_state(),
{std::move(salted_hash), username},
@@ -115,24 +117,30 @@ future<> password_authenticator::migrate_legacy_metadata() const {
}
future<> password_authenticator::create_default_if_missing() {
- return default_role_row_satisfies(_qp, &has_salted_hash, _superuser).then([this](bool exists) {
- if (!exists) {
- std::string salted_pwd(get_config_value(_qp.db().get_config().auth_superuser_salted_password(), ""));
- if (salted_pwd.empty()) {
- salted_pwd = passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt);
- }
- return _qp.execute_internal(
- update_row_query(),
- db::consistency_level::QUORUM,
- internal_distributed_query_state(),
- {salted_pwd, _superuser},
- cql3::query_processor::cache_internal::no).then([](auto&&) {
-
plogger.info("Created default superuser authentication record.");
- });
- }
-
- return make_ready_future<>();
- });
+ const auto exists = co_await default_role_row_satisfies(_qp, &has_salted_hash, _superuser);
+ if (exists) {
+ co_return;
+ }
+ std::string salted_pwd(get_config_value(_qp.db().get_config().auth_superuser_salted_password(), ""));
+ if (salted_pwd.empty()) {
+ salted_pwd = passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt);
+ }
+ const auto query = update_row_query();
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(
+ query,
+ db::consistency_level::QUORUM,
+ internal_distributed_query_state(),
+ {salted_pwd, _superuser},
+ cql3::query_processor::cache_internal::no).then([](auto&&) {
+
plogger.info("Created default superuser authentication record.");
+ });
+ } else {
+ co_await announce_mutations(_qp, _group0_client, query,
+ {salted_pwd, _superuser}, &_as).then([]() {
+
plogger.info("Created default superuser authentication record.");
+ });
+ }
}
future<> password_authenticator::start() {
@@ -207,101 +215,112 @@ future<authenticated_user> password_authenticator::authenticate(
throw exceptions::authentication_exception(format("Required key '{}' is missing", PASSWORD_KEY));
}
- auto& username =
credentials.at(USERNAME_KEY);
- auto& password =
credentials.at(PASSWORD_KEY);
+ const sstring username =
credentials.at(USERNAME_KEY);
+ const sstring password =
credentials.at(PASSWORD_KEY);
// Here was a thread local, explicit cache of prepared statement. In normal execution this is
// fine, but since we in testing set up and tear down system over and over, we'd start using
// obsolete prepared statements pretty quickly.
// Rely on query processing caching statements instead, and lets assume
// that a map lookup string->statement is not gonna kill us much.
- return futurize_invoke([this, username, password] {
- static const sstring query = format("SELECT {} FROM {} WHERE {} = ?",
+ const sstring query = format("SELECT {} FROM {}.{} WHERE {} = ?",
SALTED_HASH,
- meta::roles_table::qualified_name,
+ get_auth_ks_name(_qp),
+ meta::roles_table::name,
meta::roles_table::role_col_name);
-
- return _qp.execute_internal(
+ try {
+ const auto res = co_await _qp.execute_internal(
query,
consistency_for_user(username),
internal_distributed_query_state(),
{username},
cql3::query_processor::cache_internal::yes);
- }).then_wrapped([=](future<::shared_ptr<cql3::untyped_result_set>> f) {
- try {
- auto res = f.get();
- auto salted_hash = std::optional<sstring>();
- if (!res->empty()) {
- salted_hash = res->one().get_opt<sstring>(SALTED_HASH);
- }
- if (!salted_hash || !passwords::check(password, *salted_hash)) {
- throw exceptions::authentication_exception("Username and/or password are incorrect");
- }
- return make_ready_future<authenticated_user>(username);
- } catch (std::system_error &) {
- std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
- } catch (exceptions::request_execution_exception& e) {
- std::throw_with_nested(exceptions::authentication_exception(e.what()));
- } catch (exceptions::authentication_exception& e) {
- std::throw_with_nested(e);
- } catch (exceptions::unavailable_exception& e) {
- std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
- } catch (...) {
- std::throw_with_nested(exceptions::authentication_exception("authentication failed"));
+
+ auto salted_hash = std::optional<sstring>();
+ if (!res->empty()) {
+ salted_hash = res->one().get_opt<sstring>(SALTED_HASH);
}
- });
+ if (!salted_hash || !passwords::check(password, *salted_hash)) {
+ throw exceptions::authentication_exception("Username and/or password are incorrect");
+ }
+ co_return username;
+ } catch (std::system_error &) {
+ std::throw_with_nested(exceptions::authentication_exception("Could not verify password"));
+ } catch (exceptions::request_execution_exception& e) {
+ std::throw_with_nested(exceptions::authentication_exception(e.what()));
+ } catch (exceptions::authentication_exception& e) {
+ std::throw_with_nested(e);
+ } catch (exceptions::unavailable_exception& e) {
+ std::throw_with_nested(exceptions::authentication_exception(e.get_message()));
+ } catch (...) {
+ std::throw_with_nested(exceptions::authentication_exception("authentication failed"));
+ }
}
future<> password_authenticator::create(std::string_view role_name, const authentication_options& options) {
if (!options.password) {
- return make_ready_future<>();
+ co_return;
+ }
+ const auto query = update_row_query();
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(
+ query,
+ consistency_for_user(role_name),
+ internal_distributed_query_state(),
+ {passwords::hash(*options.password, rng_for_salt), sstring(role_name)},
+ cql3::query_processor::cache_internal::no).discard_result();
+ } else {
+ co_await announce_mutations(_qp, _group0_client, query,
+ {passwords::hash(*options.password, rng_for_salt), sstring(role_name)}, &_as);
}
-
- return _qp.execute_internal(
- update_row_query(),
- consistency_for_user(role_name),
- internal_distributed_query_state(),
- {passwords::hash(*options.password, rng_for_salt), sstring(role_name)},
- cql3::query_processor::cache_internal::no).discard_result();
}
future<> password_authenticator::alter(std::string_view role_name, const authentication_options& options) {
if (!options.password) {
- return make_ready_future<>();
+ co_return;
}
- static const sstring query = format("UPDATE {} SET {} = ? WHERE {} = ?",
- meta::roles_table::qualified_name,
+ const sstring query = format("UPDATE {}.{} SET {} = ? WHERE {} = ?",
+ get_auth_ks_name(_qp),
+ meta::roles_table::name,
SALTED_HASH,
meta::roles_table::role_col_name);
-
- return _qp.execute_internal(
- query,
- consistency_for_user(role_name),
- internal_distributed_query_state(),
- {passwords::hash(*options.password, rng_for_salt), sstring(role_name)},
- cql3::query_processor::cache_internal::no).discard_result();
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(
+ query,
+ consistency_for_user(role_name),
+ internal_distributed_query_state(),
+ {passwords::hash(*options.password, rng_for_salt), sstring(role_name)},
+ cql3::query_processor::cache_internal::no).discard_result();
+ } else {
+ co_await announce_mutations(_qp, _group0_client, query,
+ {passwords::hash(*options.password, rng_for_salt), sstring(role_name)}, &_as);
+ }
}
future<> password_authenticator::drop(std::string_view name) {
- static const sstring query = format("DELETE {} FROM {} WHERE {} = ?",
+ const sstring query = format("DELETE {} FROM {}.{} WHERE {} = ?",
SALTED_HASH,
- meta::roles_table::qualified_name,
+ get_auth_ks_name(_qp),
+ meta::roles_table::name,
meta::roles_table::role_col_name);
-
- return _qp.execute_internal(
- query, consistency_for_user(name),
- internal_distributed_query_state(),
- {sstring(name)},
- cql3::query_processor::cache_internal::no).discard_result();
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(
+ query, consistency_for_user(name),
+ internal_distributed_query_state(),
+ {sstring(name)},
+ cql3::query_processor::cache_internal::no).discard_result();
+ } else {
+ co_await announce_mutations(_qp, _group0_client, query, {sstring(name)}, &_as);
+ }
}
future<custom_options> password_authenticator::query_custom_options(std::string_view role_name) const {
return make_ready_future<custom_options>();
}
const resource_set& password_authenticator::protected_resources() const {
- static const resource_set resources({make_data_resource(meta::AUTH_KS, meta::roles_table::name)});
+ static const resource_set resources({make_data_resource(meta::legacy::AUTH_KS, meta::roles_table::name)});
return resources;
}
diff --git a/auth/password_authenticator.hh b/auth/password_authenticator.hh
--- a/auth/password_authenticator.hh
+++ b/auth/password_authenticator.hh
@@ -14,6 +14,7 @@
#include "db/consistency_level_type.hh"
#include "auth/authenticator.hh"
+#include "service/raft/raft_group0_client.hh"
namespace db {
class config;
@@ -35,16 +36,17 @@ extern const std::string_view password_authenticator_name;
class password_authenticator : public authenticator {
cql3::query_processor& _qp;
+ ::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
future<> _stopped;
- seastar::abort_source _as;
+ abort_source _as;
std::string _superuser;
public:
static db::consistency_level consistency_for_user(std::string_view role_name);
static std::string default_superuser(const db::config&);
- password_authenticator(cql3::query_processor&, ::service::migration_manager&);
+ password_authenticator(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
~password_authenticator();
@@ -80,6 +82,8 @@ private:
future<> migrate_legacy_metadata() const;
future<> create_default_if_missing();
+
+ sstring update_row_query() const;
};
}
diff --git a/auth/roles-metadata.cc b/auth/roles-metadata.cc
--- a/auth/roles-metadata.cc
+++ b/auth/roles-metadata.cc
@@ -25,31 +25,31 @@ namespace roles_table {
std::string_view creation_query() {
static const sstring instance = fmt::format(
- "CREATE TABLE {} ("
+ "CREATE TABLE {}.{} ("
" {} text PRIMARY KEY,"
" can_login boolean,"
" is_superuser boolean,"
" member_of set<text>,"
" salted_hash text"
")",
- qualified_name,
+ meta::legacy::AUTH_KS,
+ name,
role_col_name);
return instance;
}
-constexpr std::string_view qualified_name("system_auth.roles");
+} // namespace roles_table
-}
-
-}
+} // namespace meta
future<bool> default_role_row_satisfies(
cql3::query_processor& qp,
std::function<bool(const cql3::untyped_result_set_row&)> p,
std::optional<std::string> rolename) {
- static const sstring query = format("SELECT * FROM {} WHERE {} = ?",
- meta::roles_table::qualified_name,
+ const sstring query = format("SELECT * FROM {}.{} WHERE {} = ?",
+ get_auth_ks_name(qp),
+ meta::roles_table::name,
meta::roles_table::role_col_name);
for (auto cl : { db::consistency_level::ONE, db::consistency_level::QUORUM }) {
@@ -69,7 +69,7 @@ future<bool> any_nondefault_role_row_satisfies(
cql3::query_processor& qp,
std::function<bool(const cql3::untyped_result_set_row&)> p,
std::optional<std::string> rolename) {
- static const sstring query = format("SELECT * FROM {}", meta::roles_table::qualified_name);
+ const sstring query = format("SELECT * FROM {}.{}", get_auth_ks_name(qp), meta::roles_table::name);
auto results = co_await qp.execute_internal(query, db::consistency_level::QUORUM
, internal_distributed_query_state(), cql3::query_processor::cache_internal::no
diff --git a/auth/roles-metadata.hh b/auth/roles-metadata.hh
--- a/auth/roles-metadata.hh
+++ b/auth/roles-metadata.hh
@@ -30,8 +30,6 @@ std::string_view creation_query();
constexpr std::string_view name{"roles", 5};
-extern const std::string_view qualified_name;
-
constexpr std::string_view role_col_name{"role", 4};
}
diff --git a/auth/service.cc b/auth/service.cc
--- a/auth/service.cc
+++ b/auth/service.cc
@@ -11,6 +11,8 @@
#include "auth/service.hh"
#include <algorithm>
+#include <boost/algorithm/string/join.hpp>
+#include <chrono>
#include <seastar/core/future-util.hh>
#include <seastar/core/sharded.hh>
@@ -26,11 +28,19 @@
#include "db/config.hh"
#include "db/consistency_level_type.hh"
#include "db/functions/function_name.hh"
+#include "db/system_auth_keyspace.hh"
#include "log.hh"
+#include "schema/schema_fwd.hh"
+#include "seastar/core/future.hh"
#include "service/migration_manager.hh"
+#include "timestamp.hh"
#include "utils/class_registrator.hh"
#include "locator/abstract_replication_strategy.hh"
#include "data_dictionary/keyspace_metadata.hh"
+#include "service/storage_service.hh"
+#include "service_permit.hh"
+
+using namespace std::chrono_literals;
namespace auth {
@@ -147,6 +157,7 @@ service::service(
service::service(
utils::loading_cache_config c,
cql3::query_processor& qp,
+ ::service::raft_group0_client& g0,
::service::migration_notifier& mn,
::service::migration_manager& mm,
const service_config& sc,
@@ -155,34 +166,34 @@ service::service(
std::move(c),
qp,
mn,
- create_object<authorizer>(sc.authorizer_java_name, qp, mm),
- create_object<authenticator>(sc.authenticator_java_name, qp, mm),
- create_object<role_manager>(sc.role_manager_java_name, qp, mm),
+ create_object<authorizer>(sc.authorizer_java_name, qp, g0, mm),
+ create_object<authenticator>(sc.authenticator_java_name, qp, g0, mm),
+ create_object<role_manager>(sc.role_manager_java_name, qp, g0, mm),
used_by_maintenance_socket) {
}
future<> service::create_keyspace_if_missing(::service::migration_manager& mm) const {
assert(this_shard_id() == 0); // once_among_shards makes sure a function is executed on shard 0 only
auto db = _qp.db();
- while (!db.has_keyspace(meta::AUTH_KS)) {
+ while (!db.has_keyspace(meta::legacy::AUTH_KS)) {
auto group0_guard = co_await mm.start_group0_operation();
auto ts = group0_guard.write_timestamp();
- if (!db.has_keyspace(meta::AUTH_KS)) {
+ if (!db.has_keyspace(meta::legacy::AUTH_KS)) {
locator::replication_strategy_config_options opts{{"replication_factor", "1"}};
auto ksm = data_dictionary::keyspace_metadata::new_keyspace(
- meta::AUTH_KS,
+ meta::legacy::AUTH_KS,
"org.apache.cassandra.locator.SimpleStrategy",
opts,
std::nullopt);
try {
co_return co_await mm.announce(::service::prepare_new_keyspace_announcement(db.real_database(), ksm, ts),
- std::move(group0_guard), format("auth_service: create {} keyspace", meta::AUTH_KS));
+ std::move(group0_guard), format("auth_service: create {} keyspace", meta::legacy::AUTH_KS));
} catch (::service::group0_concurrent_modification&) {
-
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::AUTH_KS);
+
log.info("Concurrent operation is detected while creating {} keyspace, retrying.", meta::legacy::AUTH_KS);
}
}
}
@@ -241,18 +252,18 @@ void service::reset_authorization_cache() {
}
future<bool> service::has_existing_legacy_users() const {
- if (!_qp.db().has_schema(meta::AUTH_KS, meta::USERS_CF)) {
+ if (!_qp.db().has_schema(meta::legacy::AUTH_KS, meta::legacy::USERS_CF)) {
return make_ready_future<bool>(false);
}
static const sstring default_user_query = format("SELECT * FROM {}.{} WHERE {} = ?",
- meta::AUTH_KS,
- meta::USERS_CF,
+ meta::legacy::AUTH_KS,
+ meta::legacy::USERS_CF,
meta::user_name_col_name);
static const sstring all_users_query = format("SELECT * FROM {}.{} LIMIT 1",
- meta::AUTH_KS,
- meta::USERS_CF);
+ meta::legacy::AUTH_KS,
+ meta::legacy::USERS_CF);
// This logic is borrowed directly from Apache Cassandra. By first checking for the presence of the default user, we
// can potentially avoid doing a range query with a high consistency level.
@@ -480,7 +491,7 @@ future<> create_role(
options,
ser.underlying_authenticator().supported_options()).then([&ser, name, &options] {
return ser.underlying_authenticator().create(name, options);
- }).handle_exception([&ser, &name](std::exception_ptr ep) {
+ }).handle_exception([&ser, name](std::exception_ptr ep) {
// Roll-back.
return ser.underlying_role_manager().drop(name).then([ep = std::move(ep)] {
std::rethrow_exception(ep);
@@ -626,4 +637,75 @@ future<std::vector<permission_details>> list_filtered_permissions(
});
}
+future<> migrate_to_auth_v2(cql3::query_processor& qp, ::service::raft_group0_client& g0, start_operation_func_t start_operation_func, abort_source& as) {
+ // FIXME: if this function fails it may leave partial data in the new tables
+ // that should be cleared
+ auto gen = [&qp] (api::timestamp_type& ts) -> mutations_generator {
+ for (const auto& cf_name : std::vector<sstring>{
+ "roles", "role_members", "role_attributes", "role_permissions"}) {
+ schema_ptr schema;
+ try {
+ schema = qp.db().find_schema(meta::legacy::AUTH_KS, cf_name);
+ } catch (const data_dictionary::no_such_column_family&) {
+ continue; // some tables might not have been created if they were not used
+ }
+
+ // use longer than usual timeout as we scan the whole table
+ // but not infinite or very long as we want to fail reasonably fast
+ const auto t = 5min;
+ const timeout_config tc{t, t, t, t, t, t, t};
+ ::service::client_state cs(::service::client_state::internal_tag{}, tc);
+ ::service::query_state qs(cs, empty_service_permit());
+
+ auto rows = co_await qp.execute_internal(
+ format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, cf_name),
+ db::consistency_level::ALL,
+ qs,
+ {},
+ cql3::query_processor::cache_internal::no);
+ if (rows->empty()) {
+ continue;
+ }
+ std::vector<sstring> col_names;
+ for (const auto& col : schema->all_columns()) {
+ col_names.push_back(col.name_as_cql_string());
+ }
+ auto col_names_str = boost::algorithm::join(col_names, ", ");
+ sstring val_binders_str = "?";
+ for (size_t i = 1; i < col_names.size(); ++i) {
+ val_binders_str += ", ?";
+ }
+ for (const auto& row : *rows) {
+ std::vector<data_value_or_unset> values;
+ for (const auto& col : schema->all_columns()) {
+ if (row.has(col.name_as_text())) {
+ values.push_back(
+ col.type->deserialize(row.get_blob(col.name_as_text())));
+ } else {
+ values.push_back(unset_value{});
+ }
+ }
+ auto muts = co_await qp.get_mutations_internal(
+ format("INSERT INTO {}.{} ({}) VALUES ({})",
+ db::system_auth_keyspace::NAME,
+ cf_name,
+ col_names_str,
+ val_binders_str),
+ internal_distributed_query_state(),
+ ts,
+ std::move(values));
+ if (muts.size() != 1) {
+ on_internal_error(log,
+ format("expecting single insert mutation, got {}", muts.size()));
+ }
+ co_yield std::move(muts[0]);
+ }
+ }
+ };
+ co_await announce_mutations_with_batching(g0,
+ start_operation_func,
+ std::move(gen),
+ &as);
+}
+
}
diff --git a/auth/service.hh b/auth/service.hh
--- a/auth/service.hh
+++ b/auth/service.hh
@@ -22,7 +22,9 @@
#include "auth/permission.hh"
#include "auth/permissions_cache.hh"
#include "auth/role_manager.hh"
+#include "auth/common.hh"
#include "seastarx.hh"
+#include "service/raft/raft_group0_client.hh"
#include "utils/observable.hh"
#include "utils/serialized_action.hh"
#include "db/config.hh"
@@ -115,6 +117,7 @@ public:
service(
utils::loading_cache_config,
cql3::query_processor&,
+ ::service::raft_group0_client&,
::service::migration_notifier&,
::service::migration_manager&,
const service_config&,
@@ -167,7 +170,7 @@ public:
return *_role_manager;
}
- const cql3::query_processor& query_processor() const noexcept {
+ cql3::query_processor& query_processor() const noexcept {
return _qp;
}
@@ -312,4 +315,7 @@ future<std::vector<permission_details>> list_filtered_permissions(
std::optional<std::string_view> role_name,
const std::optional<std::pair<resource, recursive_permissions>>& resource_filter);
+// Migrates data from old keyspace to new one which supports linearizable writes via raft.
+future<> migrate_to_auth_v2(cql3::query_processor& qp, ::service::raft_group0_client& g0, start_operation_func_t start_operation_func, abort_source& as);
+
}
diff --git a/auth/standard_role_manager.cc b/auth/standard_role_manager.cc
--- a/auth/standard_role_manager.cc
+++ b/auth/standard_role_manager.cc
@@ -37,26 +37,22 @@ namespace meta {
namespace role_members_table {
constexpr std::string_view name{"role_members" , 12};
-constexpr std::string_view qualified_name("system_auth.role_members");
}
namespace role_attributes_table {
constexpr std::string_view name{"role_attributes", 15};
-static std::string_view qualified_name() noexcept {
- static const sstring instance = format("{}.{}", AUTH_KS, name);
- return instance;
-}
static std::string_view creation_query() noexcept {
static const sstring instance = format(
- "CREATE TABLE {} ("
+ "CREATE TABLE {}.{} ("
" role text,"
" name text,"
" value text,"
" PRIMARY KEY(role, name)"
")",
- qualified_name());
+ meta::legacy::AUTH_KS,
+ name);
return instance;
}
@@ -69,6 +65,7 @@ static const class_registrator<
role_manager,
standard_role_manager,
cql3::query_processor&,
+ ::service::raft_group0_client&,
::service::migration_manager&> registration("org.apache.cassandra.auth.CassandraRoleManager");
struct record final {
@@ -87,31 +84,29 @@ static db::consistency_level consistency_for_role(std::string_view role_name) no
}
static future<std::optional<record>> find_record(cql3::query_processor& qp, std::string_view role_name) {
- static const sstring query = format("SELECT * FROM {} WHERE {} = ?",
- meta::roles_table::qualified_name,
+ const sstring query = format("SELECT * FROM {}.{} WHERE {} = ?",
+ get_auth_ks_name(qp),
+ meta::roles_table::name,
meta::roles_table::role_col_name);
- return qp.execute_internal(
+ const auto results = co_await qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_query_state(),
{sstring(role_name)},
- cql3::query_processor::cache_internal::yes).then([](::shared_ptr<cql3::untyped_result_set> results) {
- if (results->empty()) {
- return std::optional<record>();
- }
-
- const cql3::untyped_result_set_row& row = results->one();
+ cql3::query_processor::cache_internal::yes);
+ if (results->empty()) {
+ co_return std::optional<record>();
+ }
- return std::make_optional(
- record{
- row.get_as<sstring>(sstring(meta::roles_table::role_col_name)),
- row.get_or<bool>("is_superuser", false),
- row.get_or<bool>("can_login", false),
- (row.has("member_of")
- ? row.get_set<sstring>("member_of")
- : role_set())});
- });
+ const cql3::untyped_result_set_row& row = results->one();
+ co_return std::make_optional(record{
+ row.get_as<sstring>(sstring(meta::roles_table::role_col_name)),
+ row.get_or<bool>("is_superuser", false),
+ row.get_or<bool>("can_login", false),
+ (row.has("member_of")
+ ? row.get_set<sstring>("member_of")
+ : role_set())});
}
static future<record> require_record(cql3::query_processor& qp, std::string_view role_name) {
@@ -128,8 +123,9 @@ static bool has_can_login(const cql3::untyped_result_set_row& row) {
return row.has("can_login") && !(boolean_type->deserialize(row.get_blob("can_login")).is_null());
}
-standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::migration_manager& mm)
+standard_role_manager::standard_role_manager(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
: _qp(qp)
+ , _group0_client(g0)
, _migration_manager(mm)
, _stopped(make_ready_future<>())
, _superuser(password_authenticator::default_superuser(qp.db().get_config()))
@@ -141,20 +137,21 @@ std::string_view standard_role_manager::qualified_java_name() const noexcept {
const resource_set& standard_role_manager::protected_resources() const {
static const resource_set resources({
- make_data_resource(meta::AUTH_KS, meta::roles_table::name),
- make_data_resource(meta::AUTH_KS, meta::role_members_table::name)});
+ make_data_resource(meta::legacy::AUTH_KS, meta::roles_table::name),
+ make_data_resource(meta::legacy::AUTH_KS, meta::role_members_table::name)});
return resources;
}
future<> standard_role_manager::create_metadata_tables_if_missing() const {
static const sstring create_role_members_query = fmt::format(
- "CREATE TABLE {} ("
+ "CREATE TABLE {}.{} ("
" role text,"
" member text,"
" PRIMARY KEY (role, member)"
")",
- meta::role_members_table::qualified_name);
+ meta::legacy::AUTH_KS,
+ meta::role_members_table::name);
return when_all_succeed(
@@ -176,39 +173,41 @@ future<> standard_role_manager::create_metadata_tables_if_missing() const {
}
future<> standard_role_manager::create_default_role_if_missing() {
- return default_role_row_satisfies(_qp, &has_can_login, _superuser).then([this](bool exists) {
- if (!exists) {
- static const sstring query = format("INSERT INTO {} ({}, is_superuser, can_login) VALUES (?, true, true)",
- meta::roles_table::qualified_name,
- meta::roles_table::role_col_name);
-
- return _qp.execute_internal(
+ try {
+ const auto exists = co_await default_role_row_satisfies(_qp, &has_can_login, _superuser);
+ if (exists) {
+ co_return;
+ }
+ const sstring query = format("INSERT INTO {}.{} ({}, is_superuser, can_login) VALUES (?, true, true)",
+ get_auth_ks_name(_qp),
+ meta::roles_table::name,
+ meta::roles_table::role_col_name);
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_query_state(),
{_superuser},
- cql3::query_processor::cache_internal::no).then([this](auto&&) {
-
log.info("Created default superuser role '{}'.", _superuser);
- return make_ready_future<>();
- });
+ cql3::query_processor::cache_internal::no).discard_result();
+ } else {
+ co_await announce_mutations(_qp, _group0_client, query, {_superuser}, &_as);
}
-
- return make_ready_future<>();
- }).handle_exception_type([](const exceptions::unavailable_exception& e) {
+
log.info("Created default superuser role '{}'.", _superuser);
+ } catch(const exceptions::unavailable_exception& e) {
log.warn("Skipped default role setup: some nodes were not ready; will retry");
- return make_exception_future<>(e);
- });
+ throw e;
+ }
}
static const sstring legacy_table_name{"users"};
bool standard_role_manager::legacy_metadata_exists() {
- return _qp.db().has_schema(meta::AUTH_KS, legacy_table_name);
+ return _qp.db().has_schema(meta::legacy::AUTH_KS, legacy_table_name);
}
future<> standard_role_manager::migrate_legacy_metadata() {
log.info("Starting migration of legacy user metadata.");
- static const sstring query = format("SELECT * FROM {}.{}", meta::AUTH_KS, legacy_table_name);
+ static const sstring query = format("SELECT * FROM {}.{}", meta::legacy::AUTH_KS, legacy_table_name);
return _qp.execute_internal(
query,
@@ -268,16 +267,20 @@ future<> standard_role_manager::stop() {
}
future<> standard_role_manager::create_or_replace(std::string_view role_name, const role_config& c) {
- static const sstring query = format("INSERT INTO {} ({}, is_superuser, can_login) VALUES (?, ?, ?)",
- meta::roles_table::qualified_name,
+ const sstring query = format("INSERT INTO {}.{} ({}, is_superuser, can_login) VALUES (?, ?, ?)",
+ get_auth_ks_name(_qp),
+ meta::roles_table::name,
meta::roles_table::role_col_name);
-
- return _qp.execute_internal(
- query,
- consistency_for_role(role_name),
- internal_distributed_query_state(),
- {sstring(role_name), c.is_superuser, c.can_login},
- cql3::query_processor::cache_internal::yes).discard_result();
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(
+ query,
+ consistency_for_role(role_name),
+ internal_distributed_query_state(),
+ {sstring(role_name), c.is_superuser, c.can_login},
+ cql3::query_processor::cache_internal::yes).discard_result();
+ } else {
+ co_await announce_mutations(_qp, _group0_client, query, {sstring(role_name), c.is_superuser, c.can_login}, &_as);
+ }
}
future<>
@@ -311,138 +314,163 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat
if (!u.is_superuser && !u.can_login) {
return make_ready_future<>();
}
-
- return _qp.execute_internal(
- format("UPDATE {} SET {} WHERE {} = ?",
- meta::roles_table::qualified_name,
- build_column_assignments(u),
- meta::roles_table::role_col_name),
- consistency_for_role(role_name),
- internal_distributed_query_state(),
- {sstring(role_name)},
- cql3::query_processor::cache_internal::no).discard_result();
- });
-}
-
-future<> standard_role_manager::drop(std::string_view role_name) {
- return this->exists(role_name).then([this, role_name](bool role_exists) {
- if (!role_exists) {
- throw nonexistant_role(role_name);
- }
-
- // First, revoke this role from all roles that are members of it.
- const auto revoke_from_members = [this, role_name] {
- static const sstring query = format("SELECT member FROM {} WHERE role = ?",
- meta::role_members_table::qualified_name);
-
+ const sstring query = format("UPDATE {}.{} SET {} WHERE {} = ?",
+ get_auth_ks_name(_qp),
+ meta::roles_table::name,
+ build_column_assignments(u),
+ meta::roles_table::role_col_name);
+ if (legacy_mode(_qp)) {
return _qp.execute_internal(
- query,
+ std::move(query),
consistency_for_role(role_name),
internal_distributed_query_state(),
{sstring(role_name)},
- cql3::query_processor::cache_internal::no).then([this, role_name](::shared_ptr<cql3::untyped_result_set> members) {
- return parallel_for_each(
- members->begin(),
- members->end(),
- [this, role_name](const cql3::untyped_result_set_row& member_row) {
- const sstring member = member_row.template get_as<sstring>("member");
- return this->modify_membership(member, role_name, membership_change::remove);
- }).finally([members] {});
- });
- };
-
- // In parallel, revoke all roles that this role is members of.
- const auto revoke_members_of = [this, grantee = role_name] {
- return this->query_granted(
- grantee,
- recursive_role_query::no).then([this, grantee](role_set granted_roles) {
- return do_with(
- std::move(granted_roles),
- [this, grantee](const role_set& granted_roles) {
- return parallel_for_each(
- granted_roles.begin(),
- granted_roles.end(),
- [this, grantee](const sstring& role_name) {
- return this->modify_membership(grantee, role_name, membership_change::remove);
- });
- });
- });
- };
-
- // Delete all attributes for that role
- const auto remove_attributes_of = [this, role_name] {
- static const sstring query = format("DELETE FROM {} WHERE role = ?", meta::role_attributes_table::qualified_name());
- return _qp.execute_internal(query, {sstring(role_name)}, cql3::query_processor::cache_internal::yes).discard_result();
- };
+ cql3::query_processor::cache_internal::no).discard_result();
+ } else {
+ return announce_mutations(_qp, _group0_client, std::move(query), {sstring(role_name)}, &_as);
+ }
+ });
+}
- // Finally, delete the role itself.
- auto delete_role = [this, role_name] {
- static const sstring query = format("DELETE FROM {} WHERE {} = ?",
- meta::roles_table::qualified_name,
- meta::roles_table::role_col_name);
+future<> standard_role_manager::drop(std::string_view role_name) {
+ if (!co_await this->exists(role_name)) {
+ throw nonexistant_role(role_name);
+ }
+ // First, revoke this role from all roles that are members of it.
+ const auto revoke_from_members = [this, role_name] () -> future<> {
+ const sstring query = format("SELECT member FROM {}.{} WHERE role = ?",
+ get_auth_ks_name(_qp),
+ meta::role_members_table::name);
+ const auto members = co_await _qp.execute_internal(
+ query,
+ consistency_for_role(role_name),
+ internal_distributed_query_state(),
+ {sstring(role_name)},
+ cql3::query_processor::cache_internal::no);
+ co_await parallel_for_each(
+ members->begin(),
+ members->end(),
+ [this, role_name] (const cql3::untyped_result_set_row& member_row) -> future<> {
+ const sstring member = member_row.template get_as<sstring>("member");
+ co_await this->modify_membership(member, role_name, membership_change::remove);
+ }
+ );
+ };
+ // In parallel, revoke all roles that this role is members of.
+ const auto revoke_members_of = [this, grantee = role_name] () -> future<> {
+ const role_set granted_roles = co_await this->query_granted(
+ grantee,
+ recursive_role_query::no);
+ co_await parallel_for_each(
+ granted_roles.begin(),
+ granted_roles.end(),
+ [this, grantee](const sstring& role_name) {
+ return this->modify_membership(grantee, role_name, membership_change::remove);
+ });
+ };
+ // Delete all attributes for that role
+ const auto remove_attributes_of = [this, role_name] () -> future<> {
+ const sstring query = format("DELETE FROM {}.{} WHERE role = ?",
+ get_auth_ks_name(_qp),
+ meta::role_attributes_table::name);
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(query, {sstring(role_name)},
+ cql3::query_processor::cache_internal::yes).discard_result();
+ } else {
+ co_await announce_mutations(_qp, _group0_client, query, {sstring(role_name)}, &_as);
+ }
+ };
+ // Finally, delete the role itself.
+ const auto delete_role = [this, role_name] () -> future<> {
+ const sstring query = format("DELETE FROM {}.{} WHERE {} = ?",
+ get_auth_ks_name(_qp),
+ meta::roles_table::name,
+ meta::roles_table::role_col_name);
- return _qp.execute_internal(
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(
query,
consistency_for_role(role_name),
internal_distributed_query_state(),
{sstring(role_name)},
cql3::query_processor::cache_internal::no).discard_result();
- };
+ } else {
+ co_await announce_mutations(_qp, _group0_client, query, {sstring(role_name)}, &_as);
+ }
+ };
- return when_all_succeed(revoke_from_members(), revoke_members_of(),
- remove_attributes_of()).then_unpack([delete_role = std::move(delete_role)] {
- return delete_role();
- });
- });
+ co_await when_all_succeed(revoke_from_members, revoke_members_of, remove_attributes_of);
+ co_await delete_role();
}
future<>
standard_role_manager::modify_membership(
std::string_view grantee_name,
std::string_view role_name,
membership_change ch) {
+ // FIXME: in auth-v2 mode callers of this function should use a single guard
+ // to achieve consistent data across read and write, but the structure of calls
+ // is too complex to make such a refactor a quick fix.
-
- const auto modify_roles = [this, role_name, grantee_name, ch] {
+ const auto modify_roles = [this, role_name, grantee_name, ch] () -> future<> {
const auto query = format(
- "UPDATE {} SET member_of = member_of {} ? WHERE {} = ?",
- meta::roles_table::qualified_name,
+ "UPDATE {}.{} SET member_of = member_of {} ? WHERE {} = ?",
+ get_auth_ks_name(_qp),
+ meta::roles_table::name,
(ch == membership_change::add ? '+' : '-'),
meta::roles_table::role_col_name);
-
- return _qp.execute_internal(
- query,
- consistency_for_role(grantee_name),
- internal_distributed_query_state(),
- {role_set{sstring(role_name)}, sstring(grantee_name)},
- cql3::query_processor::cache_internal::no).discard_result();
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(
+ query,
+ consistency_for_role(grantee_name),
+ internal_distributed_query_state(),
+ {role_set{sstring(role_name)}, sstring(grantee_name)},
+ cql3::query_processor::cache_internal::no).discard_result();
+ } else {
+ co_await announce_mutations(_qp, _group0_client, std::move(query),
+ {role_set{sstring(role_name)}, sstring(grantee_name)}, &_as);
+ }
};
- const auto modify_role_members = [this, role_name, grantee_name, ch] {
+ const auto modify_role_members = [this, role_name, grantee_name, ch] () -> future<> {
switch (ch) {
- case membership_change::add:
- return _qp.execute_internal(
- format("INSERT INTO {} (role, member) VALUES (?, ?)",
- meta::role_members_table::qualified_name),
- consistency_for_role(role_name),
- internal_distributed_query_state(),
- {sstring(role_name), sstring(grantee_name)},
- cql3::query_processor::cache_internal::no).discard_result();
-
- case membership_change::remove:
- return _qp.execute_internal(
- format("DELETE FROM {} WHERE role = ? AND member = ?",
- meta::role_members_table::qualified_name),
- consistency_for_role(role_name),
- internal_distributed_query_state(),
- {sstring(role_name), sstring(grantee_name)},
- cql3::query_processor::cache_internal::no).discard_result();
- }
+ case membership_change::add: {
+ const sstring insert_query = format("INSERT INTO {}.{} (role, member) VALUES (?, ?)",
+ get_auth_ks_name(_qp),
+ meta::role_members_table::name);
+ if (legacy_mode(_qp)) {
+ co_return co_await _qp.execute_internal(
+ insert_query,
+ consistency_for_role(role_name),
+ internal_distributed_query_state(),
+ {sstring(role_name), sstring(grantee_name)},
+ cql3::query_processor::cache_internal::no).discard_result();
+ } else {
+ co_return co_await announce_mutations(_qp, _group0_client, insert_query,
+ {sstring(role_name), sstring(grantee_name)}, &_as);
+ }
+ }
- return make_ready_future<>();
+ case membership_change::remove: {
+ const sstring delete_query = format("DELETE FROM {}.{} WHERE role = ? AND member = ?",
+ get_auth_ks_name(_qp),
+ meta::role_members_table::name);
+ if (legacy_mode(_qp)) {
+ co_return co_await _qp.execute_internal(
+ delete_query,
+ consistency_for_role(role_name),
+ internal_distributed_query_state(),
+ {sstring(role_name), sstring(grantee_name)},
+ cql3::query_processor::cache_internal::no).discard_result();
+ } else {
+ co_return co_await announce_mutations(_qp, _group0_client, delete_query,
+ {sstring(role_name), sstring(grantee_name)}, &_as);
+ }
+ }
+ }
};
- return when_all_succeed(modify_roles(), modify_role_members).discard_result();
+ co_await when_all_succeed(modify_roles, modify_role_members).discard_result();
}
future<>
@@ -528,30 +556,29 @@ future<role_set> standard_role_manager::query_granted(std::string_view grantee_n
}
future<role_set> standard_role_manager::query_all() {
- static const sstring query = format("SELECT {} FROM {}",
+ const sstring query = format("SELECT {} FROM {}.{}",
meta::roles_table::role_col_name,
- meta::roles_table::qualified_name);
+ get_auth_ks_name(_qp),
+ meta::roles_table::name);
// To avoid many copies of a view.
static const auto role_col_name_string = sstring(meta::roles_table::role_col_name);
- return _qp.execute_internal(
+ const auto results = co_await _qp.execute_internal(
query,
db::consistency_level::QUORUM,
internal_distributed_query_state(),
- cql3::query_processor::cache_internal::yes).then([](::shared_ptr<cql3::untyped_result_set> results) {
- role_set roles;
-
- std::transform(
- results->begin(),
- results->end(),
- std::inserter(roles, roles.begin()),
- [](const cql3::untyped_result_set_row& row) {
- return row.get_as<sstring>(role_col_name_string);
- });
+ cql3::query_processor::cache_internal::yes);
- return roles;
- });
+ role_set roles;
+ std::transform(
+ results->begin(),
+ results->end(),
+ std::inserter(roles, roles.begin()),
+ [] (const cql3::untyped_result_set_row& row) {
+ return row.get_as<sstring>(role_col_name_string);}
+ );
+ co_return roles;
}
future<bool> standard_role_manager::exists(std::string_view role_name) {
@@ -573,14 +600,15 @@ future<bool> standard_role_manager::can_login(std::string_view role_name) {
}
future<std::optional<sstring>> standard_role_manager::get_attribute(std::string_view role_name, std::string_view attribute_name) {
- static const sstring query = format("SELECT name, value FROM {} WHERE role = ? AND name = ?", meta::role_attributes_table::qualified_name());
- return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes).then([] (shared_ptr<cql3::untyped_result_set> result_set) {
- if (!result_set->empty()) {
- const cql3::untyped_result_set_row &row = result_set->one();
- return std::optional<sstring>(row.get_as<sstring>("value"));
- }
- return std::optional<sstring>{};
- });
+ const sstring query = format("SELECT name, value FROM {}.{} WHERE role = ? AND name = ?",
+ get_auth_ks_name(_qp),
+ meta::role_attributes_table::name);
+ const auto result_set = co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes);
+ if (!result_set->empty()) {
+ const cql3::untyped_result_set_row &row = result_set->one();
+ co_return std::optional<sstring>(row.get_as<sstring>("value"));
+ }
+ co_return std::optional<sstring>{};
}
future<role_manager::attribute_vals> standard_role_manager::query_attribute_for_all (std::string_view attribute_name) {
@@ -600,28 +628,32 @@ future<role_manager::attribute_vals> standard_role_manager::query_attribute_for_
}
future<> standard_role_manager::set_attribute(std::string_view role_name, std::string_view attribute_name, std::string_view attribute_value) {
- static const sstring query = format("INSERT INTO {} (role, name, value) VALUES (?, ?, ?)", meta::role_attributes_table::qualified_name());
- return do_with(sstring(role_name), sstring(attribute_name), sstring(attribute_value), [this] (sstring& role_name, sstring &attribute_name,
- sstring &attribute_value) {
- return exists(role_name).then([&role_name, &attribute_name, &attribute_value, this] (bool role_exists) {
- if (!role_exists) {
- throw auth::nonexistant_role(role_name);
- }
- return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}, cql3::query_processor::cache_internal::yes).discard_result();
- });
- });
-
+ if (!co_await exists(role_name)) {
+ throw auth::nonexistant_role(role_name);
+ }
+ const sstring query = format("INSERT INTO {}.{} (role, name, value) VALUES (?, ?, ?)",
+ get_auth_ks_name(_qp),
+ meta::role_attributes_table::name);
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}, cql3::query_processor::cache_internal::yes).discard_result();
+ } else {
+ co_await announce_mutations(_qp, _group0_client, query,
+ {sstring(role_name), sstring(attribute_name), sstring(attribute_value)}, &_as);
+ }
}
future<> standard_role_manager::remove_attribute(std::string_view role_name, std::string_view attribute_name) {
- static const sstring query = format("DELETE FROM {} WHERE role = ? AND name = ?", meta::role_attributes_table::qualified_name());
- return do_with(sstring(role_name), sstring(attribute_name), [this] (sstring& role_name, sstring &attribute_name) {
- return exists(role_name).then([&role_name, &attribute_name, this] (bool role_exists) {
- if (!role_exists) {
- throw auth::nonexistant_role(role_name);
- }
- return _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes).discard_result();
- });
- });
+ if (!co_await exists(role_name)) {
+ throw auth::nonexistant_role(role_name);
+ }
+ const sstring query = format("DELETE FROM {}.{} WHERE role = ? AND name = ?",
+ get_auth_ks_name(_qp),
+ meta::role_attributes_table::name);
+ if (legacy_mode(_qp)) {
+ co_await _qp.execute_internal(query, {sstring(role_name), sstring(attribute_name)}, cql3::query_processor::cache_internal::yes).discard_result();
+ } else {
+ co_await announce_mutations(_qp, _group0_client, query,
+ {sstring(role_name), sstring(attribute_name)}, &_as);
+ }
}
}
diff --git a/auth/standard_role_manager.hh b/auth/standard_role_manager.hh
--- a/auth/standard_role_manager.hh
+++ b/auth/standard_role_manager.hh
@@ -17,6 +17,7 @@
#include <seastar/core/sstring.hh>
#include "seastarx.hh"
+#include "service/raft/raft_group0_client.hh"
namespace cql3 {
class query_processor;
@@ -30,13 +31,14 @@ namespace auth {
class standard_role_manager final : public role_manager {
cql3::query_processor& _qp;
+ ::service::raft_group0_client& _group0_client;
::service::migration_manager& _migration_manager;
future<> _stopped;
- seastar::abort_source _as;
+ abort_source _as;
std::string _superuser;
public:
- standard_role_manager(cql3::query_processor&, ::service::migration_manager&);
+ standard_role_manager(cql3::query_processor&, ::service::raft_group0_client&, ::service::migration_manager&);
virtual std::string_view qualified_java_name() const noexcept override;
diff --git a/auth/transitional.cc b/auth/transitional.cc
--- a/auth/transitional.cc
+++ b/auth/transitional.cc
@@ -14,6 +14,7 @@
#include "auth/default_authorizer.hh"
#include "auth/password_authenticator.hh"
#include "auth/permission.hh"
+#include "service/raft/raft_group0_client.hh"
#include "utils/class_registrator.hh"
namespace auth {
@@ -36,8 +37,8 @@ class transitional_authenticator : public authenticator {
public:
static const sstring PASSWORD_AUTHENTICATOR_NAME;
- transitional_authenticator(cql3::query_processor& qp, ::service::migration_manager& mm)
- : transitional_authenticator(std::make_unique<password_authenticator>(qp, mm)) {
+ transitional_authenticator(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
+ : transitional_authenticator(std::make_unique<password_authenticator>(qp, g0, mm)) {
}
transitional_authenticator(std::unique_ptr<authenticator> a)
: _authenticator(std::move(a)) {
@@ -152,8 +153,8 @@ class transitional_authorizer : public authorizer {
std::unique_ptr<authorizer> _authorizer;
public:
- transitional_authorizer(cql3::query_processor& qp, ::service::migration_manager& mm)
- : transitional_authorizer(std::make_unique<default_authorizer>(qp, mm)) {
+ transitional_authorizer(cql3::query_processor& qp, ::service::raft_group0_client& g0, ::service::migration_manager& mm)
+ : transitional_authorizer(std::make_unique<default_authorizer>(qp, g0, mm)) {
}
transitional_authorizer(std::unique_ptr<authorizer> a)
: _authorizer(std::move(a)) {
@@ -221,10 +222,12 @@ static const class_registrator<
auth::authenticator,
auth::transitional_authenticator,
cql3::query_processor&,
+ ::service::raft_group0_client&,
::service::migration_manager&> transitional_authenticator_reg(auth::PACKAGE_NAME + "TransitionalAuthenticator");
static const class_registrator<
auth::authorizer,
auth::transitional_authorizer,
cql3::query_processor&,
+ ::service::raft_group0_client&,
::service::migration_manager&> transitional_authorizer_reg(auth::PACKAGE_NAME + "TransitionalAuthorizer");
diff --git a/configure.py b/configure.py
--- a/configure.py
+++ b/configure.py
@@ -1011,6 +1011,7 @@ def find_ninja():
'cql3/result_set.cc',
'cql3/prepare_context.cc',
'db/consistency_level.cc',
+ 'db/system_auth_keyspace.cc',
'db/system_keyspace.cc',
'db/virtual_table.cc',
'db/virtual_tables.cc',
diff --git a/cql3/cql_statement.hh b/cql3/cql_statement.hh
--- a/cql3/cql_statement.hh
+++ b/cql3/cql_statement.hh
@@ -49,8 +49,10 @@ public:
// CQL statement text
seastar::sstring raw_cql_statement;
- // True for statements that needs guard to be taken before the execution
- bool needs_guard = false;
+ // Returns true for statements that needs guard to be taken before the execution
+ virtual bool needs_guard(query_processor& qp) const {
+ return false;
+ }
explicit cql_statement(timeout_config_selector timeout_selector) : _timeout_config_selector(timeout_selector) {}
diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc
--- a/cql3/query_processor.cc
+++ b/cql3/query_processor.cc
@@ -547,7 +547,7 @@ template<typename... Args>
future<::shared_ptr<result_message>>
query_processor::execute_maybe_with_guard(service::query_state& query_state, ::shared_ptr<cql_statement> statement, const query_options& options,
future<::shared_ptr<result_message>>(query_processor::*fn)(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>, Args...), Args... args) {
- if (!statement->needs_guard) {
+ if (!statement->needs_guard(*this)) {
return (this->*fn)(query_state, std::move(statement), options, std::nullopt, std::forward<Args>(args)...);
}
static auto exec = [fn] (query_processor& qp, Args... args, service::query_state& query_state, ::shared_ptr<cql_statement> statement, const query_options& options, std::optional<service::group0_guard> guard) {
@@ -761,7 +761,7 @@ std::pair<std::reference_wrapper<struct query_processor::remote>, gate::holder>
query_options query_processor::make_internal_options(
const statements::prepared_statement::checked_weak_ptr& p,
- const data_value_list& values,
+ const std::vector<data_value_or_unset>& values,
db::consistency_level cl,
int32_t page_size) const {
if (p->bound_names.size() != values.size()) {
@@ -918,6 +918,28 @@ query_processor::execute_internal(
}
}
+future<std::vector<mutation>> query_processor::get_mutations_internal(
+ const sstring query_string,
+ service::query_state& query_state,
+ api::timestamp_type timestamp,
+ std::vector<data_value_or_unset> values) {
+ log.trace("get_mutations_internal: \"{}\" ({})", query_string, fmt::join(values, ", "));
+ auto stmt = prepare_internal(query_string);
+ auto mod_stmt = dynamic_pointer_cast<cql3::statements::modification_statement>(stmt->statement);
+ if (!mod_stmt) {
+ on_internal_error(log, "Only modification statement is supported in get_mutations_internal");
+ }
+ auto opts = make_internal_options(stmt, values, db::consistency_level::LOCAL_ONE);
+ auto json_cache = mod_stmt->maybe_prepare_json_cache(opts);
+ auto keys = mod_stmt->build_partition_keys(opts, json_cache);
+ // timeout only applies when modification requires read
+ auto timeout = db::timeout_clock::now() + query_state.get_client_state().get_timeout_config().read_timeout;
+ if (mod_stmt->requires_read()) {
+ on_internal_error(log, "Read-modified-write queries forbidden in get_mutations_internal");
+ }
+ co_return co_await mod_stmt->get_mutations(*this, opts, timeout, true, timestamp, query_state, json_cache, std::move(keys));
+}
+
future<::shared_ptr<untyped_result_set>>
query_processor::execute_with_params(
statements::prepared_statement::checked_weak_ptr p,
diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh
--- a/cql3/query_processor.hh
+++ b/cql3/query_processor.hh
@@ -23,13 +23,15 @@
#include "cql3/cql_statement.hh"
#include "exceptions/exceptions.hh"
#include "service/migration_listener.hh"
+#include "timestamp.hh"
#include "transport/messages/result_message.hh"
#include "service/client_state.hh"
#include "service/broadcast_tables/experimental/query_result.hh"
#include "utils/observable.hh"
#include "lang/wasm.hh"
#include "service/raft/raft_group0_client.hh"
#include "types/types.hh"
+#include "db/system_auth_keyspace.hh"
namespace service {
@@ -174,6 +176,8 @@ public:
wasm::manager& wasm() { return _wasm; }
+ db::system_auth_keyspace::version_t auth_version = db::system_auth_keyspace::version_t::v1;
+
statements::prepared_statement::checked_weak_ptr get_prepared(const std::optional<auth::authenticated_user>& user, const prepared_cache_key_type& key) {
if (user) {
auto vp = _authorized_prepared_cache.find(*user, key);
@@ -371,6 +375,18 @@ public:
execute_internal(const sstring& query_string, cache_internal cache) {
return execute_internal(query_string, db::consistency_level::ONE, {}, cache);
}
+
+ // Obtains mutations from query. For internal usage, most notable
+ // use-case is generating data for group0 announce(). Note that this
+ // function enables putting multiple CQL queries into a single raft command
+ // and vice versa, split mutations from one query into separate commands.
+ // It supports write-only queries, read-modified-writes not supported.
+ future<std::vector<mutation>> get_mutations_internal(
+ const sstring query_string,
+ service::query_state& query_state,
+ api::timestamp_type timestamp,
+ std::vector<data_value_or_unset> values);
+
future<::shared_ptr<untyped_result_set>> execute_with_params(
statements::prepared_statement::checked_weak_ptr p,
db::consistency_level,
@@ -451,7 +467,7 @@ private:
query_options make_internal_options(
const statements::prepared_statement::checked_weak_ptr& p,
- const data_value_list& values,
+ const std::vector<data_value_or_unset>& values,
db::consistency_level,
int32_t page_size = -1) const;
diff --git a/cql3/statements/alter_role_statement.hh b/cql3/statements/alter_role_statement.hh
--- a/cql3/statements/alter_role_statement.hh
+++ b/cql3/statements/alter_role_statement.hh
@@ -22,7 +22,7 @@ class query_processor;
namespace statements {
-class alter_role_statement final : public authentication_statement {
+class alter_role_statement final : public authentication_altering_statement {
sstring _role;
role_options _options;
diff --git a/cql3/statements/attach_service_level_statement.cc b/cql3/statements/attach_service_level_statement.cc
--- a/cql3/statements/attach_service_level_statement.cc
+++ b/cql3/statements/attach_service_level_statement.cc
@@ -13,6 +13,8 @@
#include "transport/messages/result_message.hh"
#include "service/client_state.hh"
#include "service/query_state.hh"
+#include "cql3/query_processor.hh"
+#include "auth/common.hh"
namespace cql3 {
@@ -22,6 +24,10 @@ attach_service_level_statement::attach_service_level_statement(sstring service_l
_service_level(service_level), _role_name(role_name) {
}
+bool attach_service_level_statement::needs_guard(query_processor& qp) const {
+ return !auth::legacy_mode(qp);
+}
+
std::unique_ptr<cql3::statements::prepared_statement>
cql3::statements::attach_service_level_statement::prepare(
data_dictionary::database db, cql_stats &stats) {
diff --git a/cql3/statements/attach_service_level_statement.hh b/cql3/statements/attach_service_level_statement.hh
--- a/cql3/statements/attach_service_level_statement.hh
+++ b/cql3/statements/attach_service_level_statement.hh
@@ -21,6 +21,7 @@ class attach_service_level_statement final : public service_level_statement {
public:
attach_service_level_statement(sstring service_level, sstring role_name);
+ virtual bool needs_guard(query_processor& qp) const override;
std::unique_ptr<cql3::statements::prepared_statement> prepare(data_dictionary::database db, cql_stats &stats) override;
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
virtual future<::shared_ptr<cql_transport::messages::result_message>>
diff --git a/cql3/statements/authentication_statement.cc b/cql3/statements/authentication_statement.cc
--- a/cql3/statements/authentication_statement.cc
+++ b/cql3/statements/authentication_statement.cc
@@ -10,6 +10,8 @@
#include "authentication_statement.hh"
#include "transport/messages/result_message.hh"
+#include "cql3/query_processor.hh"
+#include "auth/common.hh"
uint32_t cql3::statements::authentication_statement::get_bound_terms() const {
return 0;
@@ -22,3 +24,7 @@ bool cql3::statements::authentication_statement::depends_on(std::string_view ks_
future<> cql3::statements::authentication_statement::check_access(query_processor& qp, const service::client_state& state) const {
return make_ready_future<>();
}
+
+bool cql3::statements::authentication_altering_statement::needs_guard(query_processor& qp) const {
+ return !auth::legacy_mode(qp);
+}
diff --git a/cql3/statements/authentication_statement.hh b/cql3/statements/authentication_statement.hh
--- a/cql3/statements/authentication_statement.hh
+++ b/cql3/statements/authentication_statement.hh
@@ -28,6 +28,11 @@ public:
future<> check_access(query_processor& qp, const service::client_state& state) const override;
};
+class authentication_altering_statement : public authentication_statement {
+public:
+ virtual bool needs_guard(query_processor& qp) const override;
+};
+
}
}
diff --git a/cql3/statements/authorization_statement.cc b/cql3/statements/authorization_statement.cc
--- a/cql3/statements/authorization_statement.cc
+++ b/cql3/statements/authorization_statement.cc
@@ -17,6 +17,7 @@
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
#include "db/cql_type_parser.hh"
+#include "auth/common.hh"
uint32_t cql3::statements::authorization_statement::get_bound_terms() const {
return 0;
@@ -75,3 +76,7 @@ void cql3::statements::authorization_statement::maybe_correct_resource(auth::res
}
}
+bool cql3::statements::authorization_altering_statement::needs_guard(
+ query_processor& qp) const {
+ return !auth::legacy_mode(qp);
+};
diff --git a/cql3/statements/authorization_statement.hh b/cql3/statements/authorization_statement.hh
--- a/cql3/statements/authorization_statement.hh
+++ b/cql3/statements/authorization_statement.hh
@@ -35,6 +35,11 @@ protected:
static void maybe_correct_resource(auth::resource&, const service::client_state&, query_processor&);
};
+class authorization_altering_statement : public authorization_statement {
+public:
+ virtual bool needs_guard(query_processor& qp) const override;
+};
+
}
}
diff --git a/cql3/statements/create_role_statement.hh b/cql3/statements/create_role_statement.hh
--- a/cql3/statements/create_role_statement.hh
+++ b/cql3/statements/create_role_statement.hh
@@ -22,7 +22,7 @@ class query_processor;
namespace statements {
-class create_role_statement final : public authentication_statement {
+class create_role_statement final : public authentication_altering_statement {
sstring _role;
role_options _options;
diff --git a/cql3/statements/detach_service_level_statement.cc b/cql3/statements/detach_service_level_statement.cc
--- a/cql3/statements/detach_service_level_statement.cc
+++ b/cql3/statements/detach_service_level_statement.cc
@@ -11,6 +11,8 @@
#include "transport/messages/result_message.hh"
#include "service/client_state.hh"
#include "service/query_state.hh"
+#include "cql3/query_processor.hh"
+#include "auth/common.hh"
namespace cql3 {
@@ -20,6 +22,10 @@ detach_service_level_statement::detach_service_level_statement(sstring role_name
_role_name(role_name) {
}
+bool detach_service_level_statement::needs_guard(query_processor& qp) const {
+ return !auth::legacy_mode(qp);
+}
+
std::unique_ptr<cql3::statements::prepared_statement>
cql3::statements::detach_service_level_statement::prepare(
data_dictionary::database db, cql_stats &stats) {
diff --git a/cql3/statements/detach_service_level_statement.hh b/cql3/statements/detach_service_level_statement.hh
--- a/cql3/statements/detach_service_level_statement.hh
+++ b/cql3/statements/detach_service_level_statement.hh
@@ -20,6 +20,7 @@ class detach_service_level_statement final : public service_level_statement {
public:
detach_service_level_statement(sstring role_name);
std::unique_ptr<cql3::statements::prepared_statement> prepare(data_dictionary::database db, cql_stats &stats) override;
+ virtual bool needs_guard(query_processor& qp) const override;
virtual future<> check_access(query_processor& qp, const service::client_state&) const override;
virtual future<::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor&, service::query_state&, const query_options&, std::optional<service::group0_guard> guard) const override;
diff --git a/cql3/statements/drop_role_statement.hh b/cql3/statements/drop_role_statement.hh
--- a/cql3/statements/drop_role_statement.hh
+++ b/cql3/statements/drop_role_statement.hh
@@ -22,7 +22,7 @@ class query_processor;
namespace statements {
-class drop_role_statement final : public authentication_statement {
+class drop_role_statement final : public authentication_altering_statement {
sstring _role;
bool _if_exists;
diff --git a/cql3/statements/grant_role_statement.hh b/cql3/statements/grant_role_statement.hh
--- a/cql3/statements/grant_role_statement.hh
+++ b/cql3/statements/grant_role_statement.hh
@@ -22,7 +22,7 @@ class query_processor;
namespace statements {
-class grant_role_statement final : public authorization_statement {
+class grant_role_statement final : public authorization_altering_statement {
sstring _role;
sstring _grantee;
diff --git a/cql3/statements/list_permissions_statement.cc b/cql3/statements/list_permissions_statement.cc
--- a/cql3/statements/list_permissions_statement.cc
+++ b/cql3/statements/list_permissions_statement.cc
@@ -80,15 +80,15 @@ cql3::statements::list_permissions_statement::execute(
service::query_state& state,
const query_options& options,
std::optional<service::group0_guard> guard) const {
- static auto make_column = [](sstring name) {
+ auto make_column = [auth_ks = auth::get_auth_ks_name(qp)](sstring name) {
return make_lw_shared<column_specification>(
- auth::meta::AUTH_KS,
+ auth_ks,
"permissions",
::make_shared<column_identifier>(std::move(name), true),
utf8_type);
};
- static thread_local const std::vector<lw_shared_ptr<column_specification>> metadata({
+ std::vector<lw_shared_ptr<column_specification>> metadata({
make_column("role"), make_column("username"), make_column("resource"), make_column("permission")
});
@@ -105,15 +105,15 @@ cql3::statements::list_permissions_statement::execute(
const auto& as = *state.get_client_state().get_auth_service();
- return do_with(make_resource_filter(), [this, &as](const auto& resource_filter) {
+ return do_with(make_resource_filter(), [this, &as, metadata = std::move(metadata)](const auto& resource_filter) mutable {
return auth::list_filtered_permissions(
as,
_permissions,
_role_name,
- resource_filter).then([](std::vector<auth::permission_details> all_details) {
+ resource_filter).then([metadata = std::move(metadata)](std::vector<auth::permission_details> all_details) mutable {
std::sort(all_details.begin(), all_details.end());
- auto rs = std::make_unique<result_set>(metadata);
+ auto rs = std::make_unique<result_set>(std::move(metadata));
for (const auto& pd : all_details) {
const std::vector<sstring> sorted_permission_names = [&pd] {
diff --git a/cql3/statements/list_users_statement.cc b/cql3/statements/list_users_statement.cc
--- a/cql3/statements/list_users_statement.cc
+++ b/cql3/statements/list_users_statement.cc
@@ -29,23 +29,23 @@ future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::list_users_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
static const sstring virtual_table_name("users");
- static const auto make_column_spec = [](const sstring& name, const ::shared_ptr<const abstract_type>& ty) {
+ const auto make_column_spec = [auth_ks = auth::get_auth_ks_name(qp)](const sstring& name, const ::shared_ptr<const abstract_type>& ty) {
return make_lw_shared<column_specification>(
- auth::meta::AUTH_KS,
+ auth_ks,
virtual_table_name,
::make_shared<column_identifier>(name, true),
ty);
};
- static thread_local const auto metadata = ::make_shared<cql3::metadata>(
+ auto metadata = ::make_shared<cql3::metadata>(
std::vector<lw_shared_ptr<column_specification>>{
make_column_spec("name", utf8_type),
make_column_spec("super", boolean_type)});
- static const auto make_results = [](const auth::service& as, std::unordered_set<sstring>&& roles) {
+ auto make_results = [metadata = std::move(metadata)](const auth::service& as, std::unordered_set<sstring>&& roles) mutable {
using cql_transport::messages::result_message;
- auto results = std::make_unique<result_set>(metadata);
+ auto results = std::make_unique<result_set>(std::move(metadata));
std::vector<sstring> sorted_roles(roles.cbegin(), roles.cend());
std::sort(sorted_roles.begin(), sorted_roles.end());
@@ -73,14 +73,14 @@ cql3::statements::list_users_statement::execute(query_processor& qp, service::qu
const auto& cs = state.get_client_state();
const auto& as = *cs.get_auth_service();
- return auth::has_superuser(as, *cs.user()).then([&cs, &as](bool has_superuser) {
+ return auth::has_superuser(as, *cs.user()).then([&cs, &as, make_results = std::move(make_results)](bool has_superuser) mutable {
if (has_superuser) {
- return as.underlying_role_manager().query_all().then([&as](std::unordered_set<sstring> roles) {
+ return as.underlying_role_manager().query_all().then([&as, make_results = std::move(make_results)](std::unordered_set<sstring> roles) mutable {
return make_results(as, std::move(roles));
});
}
- return auth::get_roles(as, *cs.user()).then([&as](std::unordered_set<sstring> granted_roles) {
+ return auth::get_roles(as, *cs.user()).then([&as, make_results = std::move(make_results)](std::unordered_set<sstring> granted_roles) mutable {
return make_results(as, std::move(granted_roles));
});
});
diff --git a/cql3/statements/permission_altering_statement.hh b/cql3/statements/permission_altering_statement.hh
--- a/cql3/statements/permission_altering_statement.hh
+++ b/cql3/statements/permission_altering_statement.hh
@@ -20,7 +20,7 @@ class role_name;
namespace statements {
-class permission_altering_statement : public authorization_statement {
+class permission_altering_statement : public authorization_altering_statement {
protected:
auth::permission_set _permissions;
mutable auth::resource _resource;
diff --git a/cql3/statements/revoke_role_statement.hh b/cql3/statements/revoke_role_statement.hh
--- a/cql3/statements/revoke_role_statement.hh
+++ b/cql3/statements/revoke_role_statement.hh
@@ -22,7 +22,7 @@ class query_processor;
namespace statements {
-class revoke_role_statement final : public authorization_statement {
+class revoke_role_statement final : public authorization_altering_statement {
sstring _role;
sstring _revokee;
diff --git a/cql3/statements/role-management-statements.cc b/cql3/statements/role-management-statements.cc
--- a/cql3/statements/role-management-statements.cc
+++ b/cql3/statements/role-management-statements.cc
@@ -285,31 +285,31 @@ future<> list_roles_statement::check_access(query_processor& qp, const service::
}
future<result_message_ptr>
-list_roles_statement::execute(query_processor&, service::query_state& state, const query_options&, std::optional<service::group0_guard> guard) const {
+list_roles_statement::execute(query_processor& qp, service::query_state& state, const query_options&, std::optional<service::group0_guard> guard) const {
static const sstring virtual_table_name("roles");
- static const auto make_column_spec = [](const sstring& name, const ::shared_ptr<const abstract_type>& ty) {
+ const auto make_column_spec = [auth_ks = auth::get_auth_ks_name(qp)](const sstring& name, const ::shared_ptr<const abstract_type>& ty) {
return make_lw_shared<column_specification>(
- auth::meta::AUTH_KS,
+ auth_ks,
virtual_table_name,
::make_shared<column_identifier>(name, true),
ty);
};
static const thread_local auto custom_options_type = map_type_impl::get_instance(utf8_type, utf8_type, true);
- static const thread_local auto metadata = ::make_shared<cql3::metadata>(
+ auto metadata = ::make_shared<cql3::metadata>(
std::vector<lw_shared_ptr<column_specification>>{
make_column_spec("role", utf8_type),
make_column_spec("super", boolean_type),
make_column_spec("login", boolean_type),
make_column_spec("options", custom_options_type)});
- static const auto make_results = [](
+ auto make_results = [metadata = std::move(metadata)](
auth::role_manager& rm,
const auth::authenticator& a,
- auth::role_set&& roles) -> future<result_message_ptr> {
- auto results = std::make_unique<result_set>(metadata);
+ auth::role_set&& roles) mutable -> future<result_message_ptr> {
+ auto results = std::make_unique<result_set>(std::move(metadata));
if (roles.empty()) {
return make_ready_future<result_message_ptr>(
@@ -352,7 +352,7 @@ list_roles_statement::execute(query_processor&, service::query_state& state, con
const auto& cs = state.get_client_state();
const auto& as = *cs.get_auth_service();
- return auth::has_superuser(as, *cs.user()).then([this, &cs, &as](bool super) {
+ return auth::has_superuser(as, *cs.user()).then([this, &cs, &as, make_results = std::move(make_results)](bool super) mutable {
auto& rm = as.underlying_role_manager();
const auto& a = as.underlying_authenticator();
const auto query_mode = _recursive ? auth::recursive_role_query::yes : auth::recursive_role_query::no;
@@ -362,20 +362,20 @@ list_roles_statement::execute(query_processor&, service::query_state& state, con
// only the roles granted to them.
return cs.check_has_permission({
auth::permission::DESCRIBE,
- auth::root_role_resource()}).then([&cs, &rm, &a, query_mode](bool has_describe) {
+ auth::root_role_resource()}).then([&cs, &rm, &a, query_mode, make_results = std::move(make_results)](bool has_describe) mutable {
if (has_describe) {
- return rm.query_all().then([&rm, &a](auto&& roles) {
+ return rm.query_all().then([&rm, &a, make_results = std::move(make_results)](auto&& roles) mutable {
return make_results(rm, a, std::move(roles));
});
}
- return rm.query_granted(*cs.user()->name, query_mode).then([&rm, &a](auth::role_set roles) {
+ return rm.query_granted(*cs.user()->name, query_mode).then([&rm, &a, make_results = std::move(make_results)](auth::role_set roles) mutable {
return make_results(rm, a, std::move(roles));
});
});
}
- return rm.query_granted(*_grantee, query_mode).then([&rm, &a](auth::role_set roles) {
+ return rm.query_granted(*_grantee, query_mode).then([&rm, &a, make_results = std::move(make_results)](auth::role_set roles) mutable {
return make_results(rm, a, std::move(roles));
});
}).handle_exception_type([](const auth::nonexistant_role& e) {
diff --git a/cql3/statements/schema_altering_statement.cc b/cql3/statements/schema_altering_statement.cc
--- a/cql3/statements/schema_altering_statement.cc
+++ b/cql3/statements/schema_altering_statement.cc
@@ -21,17 +21,17 @@ namespace statements {
schema_altering_statement::schema_altering_statement(timeout_config_selector timeout_selector)
: cf_statement(cf_name())
, cql_statement_no_metadata(timeout_selector)
- , _is_column_family_level{false}
-{
- needs_guard = true;
+ , _is_column_family_level{false} {
}
schema_altering_statement::schema_altering_statement(cf_name name, timeout_config_selector timeout_selector)
: cf_statement{std::move(name)}
, cql_statement_no_metadata(timeout_selector)
- , _is_column_family_level{true}
-{
- needs_guard = true;
+ , _is_column_family_level{true} {
+}
+
+bool schema_altering_statement::needs_guard(query_processor& qp) const {
+ return true;
}
future<> schema_altering_statement::grant_permissions_to_creator(const service::client_state&) const {
diff --git a/cql3/statements/schema_altering_statement.hh b/cql3/statements/schema_altering_statement.hh
--- a/cql3/statements/schema_altering_statement.hh
+++ b/cql3/statements/schema_altering_statement.hh
@@ -42,6 +42,8 @@ protected:
schema_altering_statement(cf_name name, timeout_config_selector timeout_selector = &timeout_config::other_timeout);
+ virtual bool needs_guard(query_processor& qp) const override;
+
/**
* When a new data_dictionary::database object (keyspace, table) is created, the creator needs to be granted all applicable
* permissions on it.
diff --git a/db/CMakeLists.txt b/db/CMakeLists.txt
--- a/db/CMakeLists.txt
+++ b/db/CMakeLists.txt
@@ -2,6 +2,7 @@ add_library(db STATIC)
target_sources(db
PRIVATE
consistency_level.cc
+ system_auth_keyspace.cc
system_keyspace.cc
virtual_table.cc
virtual_tables.cc
diff --git a/db/schema_tables.cc b/db/schema_tables.cc
--- a/db/schema_tables.cc
+++ b/db/schema_tables.cc
@@ -14,6 +14,7 @@
#include "gms/feature_service.hh"
#include "partition_slice_builder.hh"
#include "dht/i_partitioner.hh"
+#include "system_auth_keyspace.hh"
#include "system_keyspace.hh"
#include "query-result-set.hh"
#include "query-result-writer.hh"
@@ -229,6 +230,10 @@ future<> save_system_schema(cql3::query_processor& qp) {
co_await save_system_schema_to_keyspace(qp, schema_tables::NAME);
// #2514 - make sure "system" is written to system_schema.keyspaces.
co_await save_system_schema_to_keyspace(qp, system_keyspace::NAME);
+ if (qp.db().get_config().check_experimental(
+ db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES)) {
+ co_await save_system_schema_to_keyspace(qp, system_auth_keyspace::NAME);
+ }
}
namespace v3 {
diff --git a/db/system_auth_keyspace.cc b/db/system_auth_keyspace.cc
--- a/db/system_auth_keyspace.cc
+++ b/db/system_auth_keyspace.cc
@@ -0,0 +1,141 @@
+/*
+ * Modified by ScyllaDB
+ * Copyright (C) 2024-present ScyllaDB
+ */
+
+/*
+ * SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
+ */
+
+#include "system_auth_keyspace.hh"
+#include "system_keyspace.hh"
+#include "db/schema_tables.hh"
+#include "schema/schema_builder.hh"
+#include "types/set.hh"
+
+namespace db {
+
+// all system auth tables use schema commitlog
+namespace {
+ const auto set_use_schema_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
+ if (ks_name == system_auth_keyspace::NAME) {
+ props.enable_schema_commitlog();
+ }
+ });
+} // anonymous namespace
+
+namespace system_auth_keyspace {
+
+// use the same gc setting as system_schema tables
+using days = std::chrono::duration<int, std::ratio<24 * 3600>>;
+// FIXME: in some cases time-based gc may cause data resurrection,
+// for more info see
https://github.com/scylladb/scylladb/issues/15607
+static constexpr auto auth_gc_grace = std::chrono::duration_cast<std::chrono::seconds>(days(7)).count();
+
+schema_ptr roles() {
+ static thread_local auto schema = [] {
+ schema_builder builder(generate_legacy_id(NAME, ROLES), NAME, ROLES,
+ // partition key
+ {{"role", utf8_type}},
+ // clustering key
+ {},
+ // regular columns
+ {
+ {"can_login", boolean_type},
+ {"is_superuser", boolean_type},
+ {"member_of", set_type_impl::get_instance(utf8_type, true)},
+ {"salted_hash", utf8_type}
+ },
+ // static columns
+ {},
+ // regular column name type
+ utf8_type,
+ // comment
+ "roles for authentication and RBAC"
+ );
+ builder.set_gc_grace_seconds(auth_gc_grace);
+ builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
+ return builder.build();
+ }();
+ return schema;
+}
+
+schema_ptr role_members() {
+ static thread_local auto schema = [] {
+ schema_builder builder(generate_legacy_id(NAME, ROLE_MEMBERS), NAME, ROLE_MEMBERS,
+ // partition key
+ {{"role", utf8_type}},
+ // clustering key
+ {{"member", utf8_type}},
+ // regular columns
+ {},
+ // static columns
+ {},
+ // regular column name type
+ utf8_type,
+ // comment
+ "joins users and their granted roles in RBAC"
+ );
+ builder.set_gc_grace_seconds(auth_gc_grace);
+ builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
+ return builder.build();
+ }();
+ return schema;
+}
+
+schema_ptr role_attributes() {
+ static thread_local auto schema = [] {
+ schema_builder builder(generate_legacy_id(NAME, ROLE_ATTRIBUTES), NAME, ROLE_ATTRIBUTES,
+ // partition key
+ {{"role", utf8_type}},
+ // clustering key
+ {{"name", utf8_type}},
+ // regular columns
+ {
+ {"value", utf8_type}
+ },
+ // static columns
+ {},
+ // regular column name type
+ utf8_type,
+ // comment
+ "role permissions in RBAC"
+ );
+ builder.set_gc_grace_seconds(auth_gc_grace);
+ builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
+ return builder.build();
+ }();
+ return schema;
+}
+
+schema_ptr role_permissions() {
+ static thread_local auto schema = [] {
+ schema_builder builder(generate_legacy_id(NAME, ROLE_PERMISSIONS), NAME, ROLE_PERMISSIONS,
+ // partition key
+ {{"role", utf8_type}},
+ // clustering key
+ {{"resource", utf8_type}},
+ // regular columns
+ {
+ {"permissions", set_type_impl::get_instance(utf8_type, true)}
+ },
+ // static columns
+ {},
+ // regular column name type
+ utf8_type,
+ // comment
+ "role permissions for CassandraAuthorizer"
+ );
+ builder.set_gc_grace_seconds(auth_gc_grace);
+ builder.with_version(system_keyspace::generate_schema_version(builder.uuid()));
+ return builder.build();
+ }();
+ return schema;
+}
+
+std::vector<schema_ptr> all_tables() {
+ return {roles(), role_members(), role_attributes(), role_permissions()};
+}
+
+} // namespace system_auth_keyspace
+} // namespace db
diff --git a/db/system_auth_keyspace.hh b/db/system_auth_keyspace.hh
--- a/db/system_auth_keyspace.hh
+++ b/db/system_auth_keyspace.hh
@@ -0,0 +1,38 @@
+/*
+ * Modified by ScyllaDB
+ * Copyright (C) 2024-present ScyllaDB
+ */
+
+/*
+ * SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
+ */
+
+#pragma once
+
+#include "schema/schema_fwd.hh"
+#include <vector>
+
+namespace db {
+
+namespace system_auth_keyspace {
+ enum class version_t: int64_t {
+ v1 = 1,
+ v2 = 2,
+ };
+ static constexpr auto NAME = "system_auth_v2";
+ // tables
+ static constexpr auto ROLES = "roles";
+ static constexpr auto ROLE_MEMBERS = "role_members";
+ static constexpr auto ROLE_ATTRIBUTES = "role_attributes";
+ static constexpr auto ROLE_PERMISSIONS = "role_permissions";
+
+
+ schema_ptr roles();
+ schema_ptr role_members();
+ schema_ptr role_attributes();
+ schema_ptr role_permissions();
+
+ std::vector<schema_ptr> all_tables();
+}; // namespace system_auth_keyspace
+
+} // namespace db
diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc
--- a/db/system_keyspace.cc
+++ b/db/system_keyspace.cc
@@ -16,6 +16,7 @@
#include <seastar/coroutine/parallel_for_each.hh>
#include "system_keyspace.hh"
#include "cql3/untyped_result_set.hh"
+#include "db/system_auth_keyspace.hh"
#include "thrift/server.hh"
#include "cql3/query_processor.hh"
#include "partition_slice_builder.hh"
@@ -2112,6 +2113,11 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
r.insert(r.end(), {topology(), cdc_generations_v3(), topology_requests()});
}
+ if (cfg.check_experimental(db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES)) {
+ auto auth_tables = db::system_auth_keyspace::all_tables();
+ std::copy(auth_tables.begin(), auth_tables.end(), std::back_inserter(r));
+ }
+
if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
r.insert(r.end(), {broadcast_kv_store()});
}
@@ -2154,10 +2160,15 @@ void system_keyspace::mark_writable() {
}
}
+future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
+system_keyspace::query_mutations(distributed<replica::database>& db, schema_ptr schema) {
+ return replica::query_mutations(db, schema, query::full_partition_range, schema->full_slice(), db::no_timeout);
+}
+
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
system_keyspace::query_mutations(distributed<replica::database>& db, const sstring& ks_name, const sstring& cf_name) {
schema_ptr schema = db.local().find_schema(ks_name, cf_name);
- return replica::query_mutations(db, schema, query::full_partition_range, schema->full_slice(), db::no_timeout);
+ return query_mutations(db, schema);
}
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh
--- a/db/system_keyspace.hh
+++ b/db/system_keyspace.hh
@@ -312,6 +312,10 @@ public:
/// overloads
+ future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
+ static query_mutations(distributed<replica::database>& db,
+ schema_ptr schema);
+
future<foreign_ptr<lw_shared_ptr<reconcilable_result>>>
static query_mutations(distributed<replica::database>& db,
const sstring& ks_name,
@@ -605,6 +609,9 @@ public:
const replica::database& local_db() const noexcept {
return _db;
}
+ cql3::query_processor& query_processor() const noexcept {
+ return _qp;
+ }
}; // class system_keyspace
} // namespace db
diff --git a/idl/storage_service.idl.hh b/idl/storage_service.idl.hh
--- a/idl/storage_service.idl.hh
+++ b/idl/storage_service.idl.hh
@@ -59,8 +59,17 @@ struct raft_topology_snapshot {
struct raft_topology_pull_params {};
+struct raft_snapshot {
+ utils::chunked_vector<canonical_mutation> mutations;
+};
+
+struct raft_snapshot_pull_params {
+ std::vector<table_id> tables;
+};
+
verb raft_topology_cmd (raft::server_id dst_id, raft::term_t term, uint64_t cmd_index, service::raft_topology_cmd) -> service::raft_topology_cmd_result;
verb [[cancellable]] raft_pull_topology_snapshot (raft::server_id dst_id, service::raft_topology_pull_params) -> service::raft_topology_snapshot;
+verb [[cancellable]] raft_pull_snapshot (raft::server_id dst_id, service::raft_snapshot_pull_params) -> service::raft_snapshot;
verb [[cancellable]] tablet_stream_data (raft::server_id dst_id, locator::global_tablet_id);
verb [[cancellable]] tablet_cleanup (raft::server_id dst_id, locator::global_tablet_id);
verb [[cancellable]] table_load_stats (raft::server_id dst_id) -> locator::load_stats;
diff --git a/main.cc b/main.cc
--- a/main.cc
+++ b/main.cc
@@ -1715,7 +1715,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
maintenance_auth_config.authenticator_java_name = sstring{auth::allow_all_authenticator_name};
maintenance_auth_config.role_manager_java_name = sstring{auth::maintenance_socket_role_manager_name};
- maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes).get();
+ maintenance_auth_service.start(perm_cache_config, std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), maintenance_auth_config, maintenance_socket_enabled::yes).get();
scheduling_group_key_config maintenance_cql_sg_stats_cfg =
make_scheduling_group_key_config<cql_transport::cql_sg_stats>(maintenance_socket_enabled::yes);
@@ -1850,7 +1850,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
auth_config.authenticator_java_name = qualified_authenticator_name;
auth_config.role_manager_java_name = qualified_role_manager_name;
- auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no).get();
+ auth_service.start(std::move(perm_cache_config), std::ref(qp), std::ref(group0_client), std::ref(mm_notifier), std::ref(mm), auth_config, maintenance_socket_enabled::no).get();
std::any stop_auth_service;
start_auth_service(auth_service, stop_auth_service, "auth service");
diff --git a/message/messaging_service.cc b/message/messaging_service.cc
--- a/message/messaging_service.cc
+++ b/message/messaging_service.cc
@@ -376,7 +376,7 @@ void messaging_service::do_start_listen() {
_server[1] = listen(broadcast_address, rpc::streaming_domain_type(0x66BB));
}
}
-
+
if (!_server_tls[0] && _cfg.ssl_port) {
auto listen = [&] (const gms::inet_address& a, rpc::streaming_domain_type sdomain) {
so.filter_connection = {};
@@ -645,6 +645,7 @@ static constexpr unsigned do_get_rpc_client_idx(messaging_verb verb) {
case messaging_verb::RAFT_MODIFY_CONFIG:
case messaging_verb::DIRECT_FD_PING:
case messaging_verb::RAFT_PULL_TOPOLOGY_SNAPSHOT:
+ case messaging_verb::RAFT_PULL_SNAPSHOT:
return 2;
case messaging_verb::MUTATION_DONE:
case messaging_verb::MUTATION_FAILED:
diff --git a/message/messaging_service.hh b/message/messaging_service.hh
--- a/message/messaging_service.hh
+++ b/message/messaging_service.hh
@@ -194,7 +194,8 @@ enum class messaging_verb : int32_t {
STREAM_BLOB = 71,
TABLE_LOAD_STATS = 72,
JOIN_NODE_QUERY = 73,
- LAST = 74,
+ RAFT_PULL_SNAPSHOT = 74,
+ LAST = 75,
};
} // namespace netw
diff --git a/replica/database.cc b/replica/database.cc
--- a/replica/database.cc
+++ b/replica/database.cc
@@ -11,6 +11,7 @@
#include "utils/lister.hh"
#include "replica/database.hh"
#include <seastar/core/future-util.hh>
+#include "db/system_auth_keyspace.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/commitlog/commitlog.hh"
@@ -805,8 +806,11 @@ future<> database::drop_keyspace_on_all_shards(sharded<database>& sharded_db, co
}
static bool is_system_table(const schema& s) {
- return s.ks_name() == db::system_keyspace::NAME || s.ks_name() == db::system_distributed_keyspace::NAME
- || s.ks_name() == db::system_distributed_keyspace::NAME_EVERYWHERE;
+ auto& k = s.ks_name();
+ return k == db::system_keyspace::NAME ||
+ k == db::system_auth_keyspace::NAME ||
+ k == db::system_distributed_keyspace::NAME ||
+ k == db::system_distributed_keyspace::NAME_EVERYWHERE;
}
void database::init_schema_commitlog() {
diff --git a/replica/distributed_loader.cc b/replica/distributed_loader.cc
--- a/replica/distributed_loader.cc
+++ b/replica/distributed_loader.cc
@@ -16,6 +16,7 @@
#include "replica/global_table_ptr.hh"
#include "db/config.hh"
#include "db/extensions.hh"
+#include "db/system_auth_keyspace.hh"
#include "db/system_keyspace.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/schema_tables.hh"
@@ -34,7 +35,7 @@
extern logging::logger dblog;
static const std::unordered_set<std::string_view> system_keyspaces = {
- db::system_keyspace::NAME, db::schema_tables::NAME
+ db::system_keyspace::NAME, db::system_auth_keyspace::NAME, db::schema_tables::NAME,
};
// Not super nice. Adding statefulness to the file.
@@ -58,8 +59,9 @@ static const std::unordered_set<std::string_view> internal_keyspaces = {
db::system_distributed_keyspace::NAME,
db::system_distributed_keyspace::NAME_EVERYWHERE,
db::system_keyspace::NAME,
+ db::system_auth_keyspace::NAME,
db::schema_tables::NAME,
- auth::meta::AUTH_KS,
+ auth::meta::legacy::AUTH_KS,
tracing::trace_keyspace_helper::KEYSPACE_NAME
};
diff --git a/scylla-gdb.py b/scylla-gdb.py
--- a/scylla-gdb.py
+++ b/scylla-gdb.py
@@ -1803,7 +1803,7 @@ def __getitem__(self, item):
return self.ptr[item]
def is_system(self):
- return self.ks_name in ["system", "system_schema", "system_distributed", "system_traces", "system_auth", "audit"]
+ return self.ks_name in ["system", "system_schema", "system_distributed", "system_traces", "system_auth", "system_auth_v2", "audit"]
class scylla_active_sstables(gdb.Command):
diff --git a/service/client_state.cc b/service/client_state.cc
--- a/service/client_state.cc
+++ b/service/client_state.cc
@@ -130,14 +130,14 @@ future<> service::client_state::has_access(const sstring& ks, auth::command_desc
}
//
- // we want to disallow dropping any contents of TRACING_KS and disallow dropping the `auth::meta::AUTH_KS`
+ // we want to disallow dropping any contents of TRACING_KS and disallow dropping the `auth::meta::legacy::AUTH_KS`
// keyspace.
//
const bool dropping_anything_in_tracing = (name == tracing::trace_keyspace_helper::KEYSPACE_NAME)
&& (cmd.permission == auth::permission::DROP);
- const bool dropping_auth_keyspace = (cmd.resource == auth::make_data_resource(auth::meta::AUTH_KS))
+ const bool dropping_auth_keyspace = (cmd.resource == auth::make_data_resource(auth::meta::legacy::AUTH_KS))
&& (cmd.permission == auth::permission::DROP);
if (dropping_anything_in_tracing || dropping_auth_keyspace) {
diff --git a/service/client_state.hh b/service/client_state.hh
--- a/service/client_state.hh
+++ b/service/client_state.hh
@@ -50,22 +50,29 @@ public:
private:
const client_state* _cs;
seastar::sharded<auth::service>* _auth_service;
- client_state_for_another_shard(const client_state* cs, seastar::sharded<auth::service>* auth_service) : _cs(cs), _auth_service(auth_service) {}
+ seastar::sharded<qos::service_level_controller>* _sl_controller;
+ client_state_for_another_shard(const client_state* cs,
+ seastar::sharded<auth::service>* auth_service,
+ seastar::sharded<qos::service_level_controller>* sl_controller)
+ : _cs(cs), _auth_service(auth_service), _sl_controller(sl_controller) {}
friend client_state;
public:
client_state get() const {
- return client_state(_cs, _auth_service);
+ return client_state(_cs, _auth_service, _sl_controller);
}
};
private:
- client_state(const client_state* cs, seastar::sharded<auth::service>* auth_service)
+ client_state(const client_state* cs,
+ seastar::sharded<auth::service>* auth_service,
+ seastar::sharded<qos::service_level_controller>* sl_controller)
: _keyspace(cs->_keyspace)
, _user(cs->_user)
, _auth_state(cs->_auth_state)
, _is_internal(cs->_is_internal)
, _is_thrift(cs->_is_thrift)
, _remote_address(cs->_remote_address)
, _auth_service(auth_service ? &auth_service->local() : nullptr)
+ , _sl_controller(sl_controller ? &sl_controller->local() : nullptr)
, _default_timeout_config(cs->_default_timeout_config)
, _timeout_config(cs->_timeout_config)
, _enabled_protocol_extensions(cs->_enabled_protocol_extensions)
@@ -172,7 +179,7 @@ public:
gms::inet_address get_client_address() const {
return gms::inet_address(_remote_address);
}
-
+
::in_port_t get_client_port() const {
return _remote_address.port();
}
@@ -377,7 +384,9 @@ public:
}
client_state_for_another_shard move_to_other_shard() {
- return client_state_for_another_shard(this, _auth_service ? &_auth_service->container() : nullptr);
+ return client_state_for_another_shard(this,
+ _auth_service ? &_auth_service->container() : nullptr,
+ _sl_controller ? &_sl_controller->container() : nullptr);
}
#if 0
diff --git a/service/qos/service_level_controller.cc b/service/qos/service_level_controller.cc
--- a/service/qos/service_level_controller.cc
+++ b/service/qos/service_level_controller.cc
@@ -340,7 +340,7 @@ future<> service_level_controller::drop_distributed_service_level(sstring name,
return role_manager.query_attribute_for_all("service_level").then( [&role_manager, name] (auth::role_manager::attribute_vals attributes) {
return parallel_for_each(attributes.begin(), attributes.end(), [&role_manager, name] (auto&& attr) {
if (attr.second == name) {
- return role_manager.remove_attribute(attr.first, "service_level");
+ return do_with(attr.first, [&role_manager] (const sstring& role_name) {return role_manager.remove_attribute(role_name, "service_level");});
} else {
return make_ready_future();
}
diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc
--- a/service/raft/group0_state_machine.cc
+++ b/service/raft/group0_state_machine.cc
@@ -6,6 +6,7 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include "service/raft/group0_state_machine.hh"
+#include "db/system_auth_keyspace.hh"
#include "mutation/atomic_cell.hh"
#include "cql3/selection/selection.hh"
#include "dht/i_partitioner.hh"
@@ -83,6 +84,14 @@ static mutation extract_history_mutation(std::vector<canonical_mutation>& muts,
return res;
}
+static future<> mutate_locally(utils::chunked_vector<canonical_mutation> muts, storage_proxy& sp) {
+ auto db = sp.data_dictionary();
+ co_await max_concurrent_for_each(muts, 128, [&sp, &db] (const canonical_mutation& cmut) -> future<> {
+ auto schema = db.find_schema(cmut.column_family_id());
+ return sp.mutate_locally(cmut.to_mutation(schema), nullptr, db::commitlog::force_sync::yes);
+ });
+}
+
static bool should_flush_system_topology_after_applying(const mutation& mut, const data_dictionary::database db) {
if (!db.has_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY)) {
return false;
@@ -257,8 +266,16 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
}
std::optional<service::raft_topology_snapshot> topology_snp;
+ std::optional<service::raft_snapshot> auth_snp;
if (_topology_change_enabled) {
topology_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_topology_snapshot(&_mm._messaging, addr, as, from_id, service::raft_topology_pull_params{});
+
+ std::vector<table_id> tables;
+ for (const auto& schema : db::system_auth_keyspace::all_tables()) {
+ tables.push_back(schema->id());
+ }
+ auth_snp = co_await ser::storage_service_rpc_verbs::send_raft_pull_snapshot(
+ &_mm._messaging, addr, as, from_id, service::raft_snapshot_pull_params{std::move(tables)});
}
auto history_mut = extract_history_mutation(*cm, _sp.data_dictionary());
@@ -275,6 +292,10 @@ future<> group0_state_machine::transfer_snapshot(raft::server_id from_id, raft::
co_await _sp.get_db().local().flush(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
}
+ if (auth_snp) {
+ co_await mutate_locally(std::move(auth_snp->mutations), _sp);
+ }
+
co_await _sp.mutate_locally({std::move(history_mut)}, nullptr);
} catch (const abort_requested_exception&) {
throw raft::request_aborted();
diff --git a/service/raft/raft_group0_client.cc b/service/raft/raft_group0_client.cc
--- a/service/raft/raft_group0_client.cc
+++ b/service/raft/raft_group0_client.cc
@@ -294,7 +294,7 @@ future<group0_guard> raft_group0_client::start_operation(seastar::abort_source*
}
template<typename Command>
-requires std::same_as<Command, schema_change> || std::same_as<Command, topology_change>
+requires std::same_as<Command, schema_change> || std::same_as<Command, topology_change> || std::same_as<Command, write_mutations>
group0_command raft_group0_client::prepare_command(Command change, group0_guard& guard, std::string_view description) {
group0_command group0_cmd {
.change{std::move(change)},
@@ -337,6 +337,10 @@ raft_group0_client::raft_group0_client(service::raft_group_registry& raft_gr, db
: _raft_gr(raft_gr), _sys_ks(sys_ks), _maintenance_mode(maintenance_mode) {
}
+size_t raft_group0_client::max_command_size() const {
+ return _raft_gr.group0().max_command_size();
+}
+
future<> raft_group0_client::init() {
auto value = [] (std::optional<sstring> s) {
if (!s || *s == "use_pre_raft_procedures") {
@@ -487,6 +491,7 @@ void raft_group0_client::set_query_result(utils::UUID query_id, service::broadca
template group0_command raft_group0_client::prepare_command(schema_change change, group0_guard& guard, std::string_view description);
template group0_command raft_group0_client::prepare_command(topology_change change, group0_guard& guard, std::string_view description);
+template group0_command raft_group0_client::prepare_command(write_mutations change, group0_guard& guard, std::string_view description);
template group0_command raft_group0_client::prepare_command(broadcast_table_query change, std::string_view description);
template group0_command raft_group0_client::prepare_command(write_mutations change, std::string_view description);
diff --git a/service/raft/raft_group0_client.hh b/service/raft/raft_group0_client.hh
--- a/service/raft/raft_group0_client.hh
+++ b/service/raft/raft_group0_client.hh
@@ -137,8 +137,10 @@ public:
requires std::same_as<Command, broadcast_table_query> || std::same_as<Command, write_mutations>
group0_command prepare_command(Command change, std::string_view description);
template<typename Command>
- requires std::same_as<Command, schema_change> || std::same_as<Command, topology_change>
+ requires std::same_as<Command, schema_change> || std::same_as<Command, topology_change> || std::same_as<Command, write_mutations>
group0_command prepare_command(Command change, group0_guard& guard, std::string_view description);
+ // Checks maximum allowed serialized command size, server rejects bigger commands with command_is_too_big_error exception
+ size_t max_command_size() const;
// Returns the current group 0 upgrade state.
//
diff --git a/service/storage_service.cc b/service/storage_service.cc
--- a/service/storage_service.cc
+++ b/service/storage_service.cc
@@ -11,6 +11,7 @@
#include "storage_service.hh"
#include "compaction/task_manager_module.hh"
+#include "db/system_auth_keyspace.hh"
#include "gc_clock.hh"
#include "raft/raft.hh"
#include "service/topology_guard.hh"
@@ -88,6 +89,7 @@
#include "node_ops/node_ops_ctl.hh"
#include "service/topology_mutation.hh"
#include "service/topology_coordinator.hh"
+#include "cql3/query_processor.hh"
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/classification.hpp>
@@ -596,6 +598,12 @@ future<> storage_service::topology_state_load() {
co_return;
}
+ co_await _qp.container().invoke_on_all([] (cql3::query_processor& qp) {
+ // auth-v2 gets enabled when consistent topology changes are enabled
+ // (see topology::upgrade_state_type::done above) as we use the same migration procedure
+ qp.auth_version = db::system_auth_keyspace::version_t::v2;
+ });
+
co_await _feature_service.container().invoke_on_all([&] (gms::feature_service& fs) {
return fs.enable(boost::copy_range<std::set<std::string_view>>(_topology_state_machine._topology.enabled_features));
});
@@ -6068,6 +6076,22 @@ future<join_node_response_result> storage_service::join_node_response_handler(jo
}
}
+future<std::vector<canonical_mutation>> storage_service::get_system_mutations(schema_ptr schema) {
+ std::vector<canonical_mutation> result;
+ auto rs = co_await db::system_keyspace::query_mutations(_db, schema);
+ result.reserve(rs->partitions().size());
+ boost::range::transform(
+ rs->partitions(), std::back_inserter(result), [schema] (const partition& p) {
+ return canonical_mutation{p.mut().unfreeze(schema)};
+ });
+ co_return result;
+}
+
+future<std::vector<canonical_mutation>> storage_service::get_system_mutations(const sstring& ks_name, const sstring& cf_name) {
+ auto s = _db.local().find_schema(ks_name, cf_name);
+ return get_system_mutations(s);
+}
+
node_state storage_service::get_node_state(locator::host_id id) {
if (this_shard_id() != 0) {
on_internal_error(rtlogger, "cannot access node state on non zero shard");
@@ -6118,14 +6142,7 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled)
// FIXME: make it an rwlock, here we only need to lock for reads,
// might be useful if multiple nodes are trying to pull concurrently.
auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex();
- auto rs = co_await db::system_keyspace::query_mutations(
- ss._db, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
- auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
- topology_mutations.reserve(rs->partitions().size());
- boost::range::transform(
- rs->partitions(), std::back_inserter(topology_mutations), [s] (const partition& p) {
- return canonical_mutation{p.mut().unfreeze(s)};
- });
+ topology_mutations = co_await ss.get_system_mutations(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
}
std::vector<canonical_mutation> cdc_generation_mutations;
@@ -6140,29 +6157,15 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled)
// (garbage-collected with time) instead of just the last one, and load all of them.
// Alternatively, a node would wait for some time before switching to normal state.
auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex();
- auto rs = co_await db::system_keyspace::query_mutations(
- ss._db, db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
- auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
- cdc_generation_mutations.reserve(rs->partitions().size());
- boost::range::transform(
- rs->partitions(), std::back_inserter(cdc_generation_mutations), [s] (const partition& p) {
- return canonical_mutation{p.mut().unfreeze(s)};
- });
+ cdc_generation_mutations = co_await ss.get_system_mutations(db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3);
}
std::vector<canonical_mutation> topology_requests_mutations;
{
// FIXME: make it an rwlock, here we only need to lock for reads,
// might be useful if multiple nodes are trying to pull concurrently.
auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex();
- auto rs = co_await db::system_keyspace::query_mutations(
- ss._db, db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);
- auto s = ss._db.local().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);
- topology_requests_mutations.reserve(rs->partitions().size());
- boost::range::transform(
- rs->partitions(), std::back_inserter(topology_requests_mutations), [s] (const partition& p) {
- return canonical_mutation{p.mut().unfreeze(s)};
- });
+ topology_requests_mutations = co_await ss.get_system_mutations(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);
}
co_return raft_topology_snapshot{
@@ -6172,6 +6175,24 @@ void storage_service::init_messaging_service(bool raft_topology_change_enabled)
};
});
});
+ ser::storage_service_rpc_verbs::register_raft_pull_snapshot(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, raft_snapshot_pull_params params) {
+ return handle_raft_rpc(dst_id, [params = std::move(params)] (storage_service& ss) -> future<raft_snapshot> {
+ utils::chunked_vector<canonical_mutation> mutations;
+ // FIXME: make it an rwlock, here we only need to lock for reads,
+ // might be useful if multiple nodes are trying to pull concurrently.
+ auto read_apply_mutex_holder = co_await ss._group0->client().hold_read_apply_mutex();
+ for (const auto& table : params.tables) {
+ auto schema = ss._db.local().find_schema(table);
+ auto muts = co_await ss.get_system_mutations(schema);
+ mutations.reserve(mutations.size() + muts.size());
+ std::move(muts.begin(), muts.end(), std::back_inserter(mutations));
+ }
+
+ co_return raft_snapshot{
+ .mutations = std::move(mutations),
+ };
+ });
+ });
ser::storage_service_rpc_verbs::register_tablet_stream_data(&_messaging.local(), [handle_raft_rpc] (raft::server_id dst_id, locator::global_tablet_id tablet) {
return handle_raft_rpc(dst_id, [tablet] (auto& ss) {
return ss.stream_tablet(tablet);
diff --git a/service/storage_service.hh b/service/storage_service.hh
--- a/service/storage_service.hh
+++ b/service/storage_service.hh
@@ -14,6 +14,7 @@
#include <seastar/core/shared_future.hh>
#include "gms/endpoint_state.hh"
#include "gms/i_endpoint_state_change_subscriber.hh"
+#include "schema/schema_fwd.hh"
#include "service/endpoint_lifecycle_subscriber.hh"
#include "service/topology_guard.hh"
#include "locator/abstract_replication_strategy.hh"
@@ -855,6 +856,8 @@ public:
future<> start_maintenance_mode();
private:
+ future<std::vector<canonical_mutation>> get_system_mutations(schema_ptr schema);
+ future<std::vector<canonical_mutation>> get_system_mutations(const sstring& ks_name, const sstring& cf_name);
// Synchronizes the local node state (token_metadata, system.peers/system.local tables,
// gossiper) to align it with the other raft topology nodes.
// Optional target_node can be provided to restrict the synchronization to the specified node.
diff --git a/service/topology_coordinator.cc b/service/topology_coordinator.cc
--- a/service/topology_coordinator.cc
+++ b/service/topology_coordinator.cc
@@ -14,6 +14,7 @@
#include <seastar/core/sharded.hh>
#include <seastar/util/noncopyable_function.hh>
+#include "auth/service.hh"
#include "cdc/generation.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/system_keyspace.hh"
@@ -2321,9 +2322,13 @@ future<> topology_coordinator::build_coordinator_state(group0_guard guard) {
rtlogger.info("waiting for all nodes to finish upgrade to raft schema");
release_guard(std::move(guard));
co_await _group0.wait_for_all_nodes_to_finish_upgrade(_as);
- guard = co_await start_operation();
+
+
rtlogger.info("migrating system_auth keyspace data");
+ co_await auth::migrate_to_auth_v2(_sys_ks.query_processor(), _group0.client(),
+ [this] (abort_source*) { return start_operation();}, _as);
rtlogger.info("building initial raft topology state and CDC generation");
+ guard = co_await start_operation();
auto get_application_state = [&] (locator::host_id host_id, gms::inet_address ep, const gms::application_state_map& epmap, gms::application_state app_state) -> sstring {
const auto it = epmap.find(app_state);
diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh
--- a/service/topology_state_machine.hh
+++ b/service/topology_state_machine.hh
@@ -213,6 +213,15 @@ struct raft_topology_snapshot {
struct raft_topology_pull_params {
};
+struct raft_snapshot {
+ // FIXME: handle this with rpc streaming instead as we can't guarantee size bounds.
+ utils::chunked_vector<canonical_mutation> mutations;
+};
+
+struct raft_snapshot_pull_params {
+ std::vector<table_id> tables;
+};
+
// State machine that is responsible for topology change
struct topology_state_machine {
using topology_type = topology;
diff --git a/test/alternator/run b/test/alternator/run
--- a/test/alternator/run
+++ b/test/alternator/run
@@ -101,7 +101,7 @@ run.wait_for_services(pid, [
# test. Currently this can only be done through CQL, which is why above we
# needed to make sure CQL is available.
cluster = run.get_cql_cluster(ip)
-cluster.connect().execute("INSERT INTO system_auth.roles (role, salted_hash) VALUES ('alternator', 'secret_pass')")
+cluster.connect().execute("INSERT INTO system_auth_v2.roles (role, salted_hash) VALUES ('alternator', 'secret_pass')")
cluster.shutdown()
# Finally run pytest:
diff --git a/test/auth_cluster/suite.yaml b/test/auth_cluster/suite.yaml
--- a/test/auth_cluster/suite.yaml
+++ b/test/auth_cluster/suite.yaml
@@ -2,3 +2,10 @@ type: Topology
pool_size: 2
cluster:
initial_size: 0
+skip_in_release:
+ - test_auth_v2_migration
+extra_scylla_config_options:
+ authenticator: PasswordAuthenticator
+ authorizer: CassandraAuthorizer
+ enable_user_defined_functions: False
+ experimental_features: ['consistent-topology-changes']
diff --git a/test/auth_cluster/test_auth_no_quorum.py b/test/auth_cluster/test_auth_no_quorum.py
--- a/test/auth_cluster/test_auth_no_quorum.py
+++ b/test/auth_cluster/test_auth_no_quorum.py
@@ -0,0 +1,100 @@
+#
+# Copyright (C) 2024-present ScyllaDB
+#
+# SPDX-License-Identifier: AGPL-3.0-or-later
+#
+
+import asyncio
+import time
+from test.pylib.internal_types import ServerInfo
+from test.pylib.manager_client import ManagerClient
+import pytest
+from cassandra.auth import PlainTextAuthProvider
+from test.pylib.util import read_barrier, unique_name, wait_for_cql_and_get_hosts
+
+
+"""
+Tests how cluster behaves when lost quorum. Ideally for operations with CL=1 live part of the
+cluster should still work but that's guaranteed only if auth data is replicated everywhere.
+"""
+...@pytest.mark.asyncio
+async def test_auth_no_quorum(manager: ManagerClient) -> None:
+ config = {
+ # disable auth cache
+ 'permissions_validity_in_ms': 0,
+ 'permissions_update_interval_in_ms': 0,
+ }
+ servers = await manager.servers_add(3, config=config)
+
+ cql = manager.get_cql()
+ hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
+ await manager.servers_see_each_other(servers)
+
+ # create users, this is done so that previous auth implementation (with RF=1) fails this test
+ # otherwise it could happen that all users are luckily placed on a single node
+ roles = ["r" + unique_name() for _ in range(10)]
+ for role in roles:
+ # if not exists due to
https://github.com/scylladb/python-driver/issues/296
+ await cql.run_async(f"CREATE ROLE IF NOT EXISTS {role} WITH PASSWORD = '{role}' AND LOGIN = true")
+
+ # auth reads are eventually consistent so we need to sync all nodes
+ await asyncio.gather(*(read_barrier(cql, host) for host in hosts))
+
+ # check if users are replicated everywhere
+ for role in roles:
+ for server in servers:
+ await manager.driver_connect(server=server,
+ auth_provider=PlainTextAuthProvider(username=role, password=role))
+ # lost quorum
+ await asyncio.gather(*(
+ manager.server_stop_gracefully(srv.server_id) for srv in servers[0:2]))
+ alive_server = servers[2]
+ # can still login on remaining node - whole auth data is local
+ roles.append('cassandra') # include default admin role
+ for role in roles:
+ await manager.driver_connect(server=alive_server,
+ auth_provider=PlainTextAuthProvider(username=role, password=role))
+ names = [row.role for row in await manager.get_cql().run_async(f"LIST ROLES", execution_profile='whitelist')]
+ assert set(names) == set(roles)
+
+
+async def trigger_snapshot(manager: ManagerClient, server: ServerInfo) -> None:
+ cql = manager.get_cql()
+ host = cql.cluster.metadata.get_host(server.ip_addr)
+ group0_id = (await cql.run_async(
+ "select value from system.scylla_local where key = 'raft_group0_id'",
+ host=host))[0].value
+ await
manager.api.client.post(f"/raft/trigger_snapshot/{group0_id}", host=server.ip_addr)
+
+
+"""
+Tests raft snapshot transfer of auth data.
+"""
+...@pytest.mark.asyncio
+async def test_auth_raft_snapshot_transfer(manager: ManagerClient) -> None:
+ servers = await manager.servers_add(1)
+
+ cql = manager.get_cql()
+ await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
+ await manager.servers_see_each_other(servers)
+
+ roles = ["ro" + unique_name() for _ in range(10)]
+ for role in roles:
+ # if not exists due to
https://github.com/scylladb/python-driver/issues/296
+ await cql.run_async(f"CREATE ROLE IF NOT EXISTS {role}")
+
+ await trigger_snapshot(manager, servers[0])
+
+ # on startup node should receive snapshot
+ snapshot_receiving_server = await manager.server_add()
+ servers.append(snapshot_receiving_server)
+ await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
+ await manager.servers_see_each_other(servers)
+
+ # simulate lost quorum but snapshot with data should have been transferred already
+ await manager.server_stop_gracefully(servers[0].server_id)
+
+ await manager.driver_connect(server=snapshot_receiving_server)
+ cql = manager.get_cql()
+ names = [row.role for row in await cql.run_async(f"LIST ROLES", execution_profile='whitelist')]
+ assert(set(names) == set(['cassandra'] + roles))
diff --git a/test/auth_cluster/test_auth_raft_command_split.py b/test/auth_cluster/test_auth_raft_command_split.py
--- a/test/auth_cluster/test_auth_raft_command_split.py
+++ b/test/auth_cluster/test_auth_raft_command_split.py
@@ -0,0 +1,56 @@
+#
+# Copyright (C) 2024-present ScyllaDB
+#
+# SPDX-License-Identifier: AGPL-3.0-or-later
+#
+
+import asyncio
+import time
+from test.pylib.manager_client import ManagerClient
+import pytest
+from test.pylib.rest_client import inject_error, inject_error_one_shot
+from test.pylib.util import read_barrier, unique_name, wait_for_cql_and_get_hosts
+
+
+"""
+Tests case when bigger auth operation is split into multiple raft commands.
+"""
+...@pytest.mark.asyncio
+async def test_auth_raft_command_split(manager: ManagerClient) -> None:
+ servers = await manager.servers_add(3)
+
+ cql = manager.get_cql()
+ hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
+ await manager.servers_see_each_other(servers)
+
+ initial_perms = await cql.run_async("SELECT * FROM system_auth_v2.role_permissions")
+
+ shared_role = "shared_role_" + unique_name()
+ await cql.run_async(f"CREATE ROLE IF NOT EXISTS {shared_role}")
+
+ users = ["user_" + unique_name() for _ in range(30)]
+ for user in users:
+ # if not exists due to
https://github.com/scylladb/python-driver/issues/296
+ await cql.run_async(f"CREATE ROLE IF NOT EXISTS {user}")
+ await cql.run_async(f"GRANT ALL ON ROLE {shared_role} TO {user}")
+
+ # this will trigger cascade of deletes which should be packed
+ # into raft commands in a way that none exceeds max_command_size
+ await manager.driver_connect(server=servers[0])
+ cql = manager.get_cql()
+ async with inject_error(manager.api, servers[0].ip_addr,
+ 'auth_announce_mutations_command_max_size'):
+ await cql.run_async(f"DROP ROLE IF EXISTS {shared_role}", execution_profile='whitelist')
+
+ # auth reads are eventually consistent so we need to sync all nodes
+ await asyncio.gather(*(read_barrier(cql, host) for host in hosts))
+
+ # confirm that deleted shared_role is not attached to any other role
+ assert await cql.run_async(f"SELECT * FROM system_auth_v2.role_permissions WHERE resource = 'role/{shared_role}' ALLOW FILTERING") == []
+
+ # cleanup
+ for user in users:
+ await cql.run_async(f"DROP ROLE IF EXISTS {user}")
+ await asyncio.gather(*(read_barrier(cql, host) for host in hosts))
+ current_perms = await cql.run_async("SELECT * FROM system_auth_v2.role_permissions")
+ assert initial_perms == current_perms
diff --git a/test/auth_cluster/test_auth_v2_migration.py b/test/auth_cluster/test_auth_v2_migration.py
--- a/test/auth_cluster/test_auth_v2_migration.py
+++ b/test/auth_cluster/test_auth_v2_migration.py
@@ -0,0 +1,165 @@
+#
+# Copyright (C) 2024-present ScyllaDB
+#
+# SPDX-License-Identifier: AGPL-3.0-or-later
+#
+
+import asyncio
+import logging
+import pytest
+import time
+
+from test.pylib.manager_client import ManagerClient
+from test.pylib.util import read_barrier, wait_for_cql_and_get_hosts
+from test.topology.util import wait_until_topology_upgrade_finishes
+from cassandra.cluster import ConsistencyLevel
+
+
+def auth_data():
+ return [
+ {
+ "statement": "INSERT INTO system_auth.roles (role, can_login, is_superuser, member_of, salted_hash) VALUES (?, ?, ?, ?, ?)",
+ "rows": [
+ ("cassandra", False, True, None, None),
+ ("user 1", True, False, frozenset({'users'}), "salt1?"),
+ ("user 2", True, False, frozenset({'users'}), "salt2#"),
+ ("users", False, False, None, None),
+ ]
+ },
+ {
+ "statement": "INSERT INTO system_auth.role_members (role, member) VALUES (?, ?)",
+ "rows": [
+ ("users", "user 1"),
+ ("users", "user 2"),
+ ]
+ },
+ {
+ "statement": "INSERT INTO system_auth.role_attributes (role, name, value) VALUES (?, ?, ?)",
+ "rows": [
+ ("users", "service_level", "sl:fefe"),
+ ]
+ },
+ ]
+
+
+async def populate_test_data(manager: ManagerClient, data):
+ cql = manager.get_cql()
+ for d in data:
+ stmt = cql.prepare(d["statement"])
+ stmt.consistency_level = ConsistencyLevel.ALL
+ await asyncio.gather(*(
+ cql.run_async(stmt.bind(row_data)) for row_data in d["rows"]))
+
+
+async def populate_auth_v1_data(manager: ManagerClient):
+ await populate_test_data(manager, auth_data())
+ # test also absence of deleted data
+ await populate_test_data(manager, [
+ {
+ "statement": "INSERT INTO system_auth.roles (role, can_login, is_superuser, member_of, salted_hash) VALUES (?, ?, ?, ?, ?)",
+ "rows": [
+ ("deleted_user", True, False, None, "fefe"),
+ ]
+ },
+ {
+ "statement": "DELETE FROM system_auth.roles WHERE role = ?",
+ "rows": [
+ ("deleted_user",),
+ ]
+ },
+ ])
+
+
+async def warmup_v1_static_values(manager: ManagerClient, hosts):
+ # auth-v1 was using statics to cache internal queries
+ # in auth-v2 those statics were removed but we want to
+ # verify that it was effective so trigger here internal
+ # call to potentially populate static storage and we'll
+ # verify later that after migration properly formed query
+ # executes (query has to change because keyspace name changes)
+ cql = manager.get_cql()
+ await asyncio.gather(*(cql.run_async("LIST ROLES", host=host) for host in hosts))
+
+
+async def check_auth_v2_data_migration(manager: ManagerClient, hosts):
+ cql = manager.get_cql()
+ # auth reads are eventually consistent so we need to make sure hosts are up-to-date
+ assert hosts
+ await asyncio.gather(*(read_barrier(cql, host) for host in hosts))
+
+ data = auth_data()
+
+ roles = set()
+ for row in await cql.run_async("SELECT * FROM system_auth_v2.roles"):
+ member_of = frozenset(row.member_of) if row.member_of else None
+ roles.add((row.role, row.can_login, row.is_superuser, member_of, row.salted_hash))
+ assert roles == set(data[0]["rows"])
+
+ role_members = set()
+ for row in await cql.run_async("SELECT * FROM system_auth_v2.role_members"):
+ role_members.add((row.role, row.member))
+ assert role_members == set(data[1]["rows"])
+
+ role_attributes = set()
+ for row in await cql.run_async("SELECT * FROM system_auth_v2.role_attributes"):
+ role_attributes.add((row.role,
row.name, row.value))
+ assert role_attributes == set(data[2]["rows"])
+
+
+async def check_auth_v2_works(manager: ManagerClient, hosts):
+ cql = manager.get_cql()
+ roles = set()
+ for row in await cql.run_async("LIST ROLES"):
+ roles.add(row.role)
+ assert roles == set(["cassandra", "user 1", "user 2", "users"])
+
+ user1_roles = await cql.run_async("LIST ROLES OF 'user 1'")
+ assert len(user1_roles) == 2
+ assert set([user1_roles[0].role, user1_roles[1].role]) == set(["users", "user 1"])
+
+ await cql.run_async("CREATE ROLE IF NOT EXISTS user_after_migration")
+ await asyncio.gather(*(read_barrier(cql, host) for host in hosts))
+ # see warmup_v1_static_values for background about checks below
+ # check if it was added to a new table
+ assert len(await cql.run_async("SELECT role FROM system_auth_v2.roles WHERE role = 'user_after_migration'")) == 1
+ # check whether list roles statement sees it also via new table (on all nodes)
+ await asyncio.gather(*(cql.run_async("LIST ROLES OF user_after_migration", host=host) for host in hosts))
+ await cql.run_async("DROP ROLE user_after_migration")
+
+
+...@pytest.mark.asyncio
+async def test_auth_v2_migration(request, manager: ManagerClient):
+ # First, force the first node to start in legacy mode due to the error injection
+ cfg = {'error_injections_at_startup': ['force_gossip_based_join']}
+
+ servers = [await manager.server_add(config=cfg)]
+ # Disable injections for the subsequent nodes - they should fall back to
+ # using gossiper-based node operations
+ del cfg['error_injections_at_startup']
+
+ servers += [await manager.server_add(config=cfg) for _ in range(2)]
+ cql = manager.cql
+ assert(cql)
+
+
logging.info("Waiting until driver connects to every server")
+ hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
+
+
logging.info("Checking the upgrade state on all nodes")
+ for host in hosts:
+ status = await manager.api.raft_topology_upgrade_status(host.address)
+ assert status == "not_upgraded"
+
+ await populate_auth_v1_data(manager)
+ await warmup_v1_static_values(manager, hosts)
+
+
logging.info("Triggering upgrade to raft topology")
+ await manager.api.upgrade_to_raft_topology(hosts[0].address)
+
+
logging.info("Waiting until upgrade finishes")
+ await asyncio.gather(*(wait_until_topology_upgrade_finishes(manager, h.address, time.time() + 60) for h in hosts))
+
+
logging.info("Checking migrated data in system_auth_v2")
+ await check_auth_v2_data_migration(manager, hosts)
+
+
logging.info("Checking auth statements after migration")
+ await check_auth_v2_works(manager, hosts)
diff --git a/test/auth_cluster/test_password_login_message.py b/test/auth_cluster/test_password_login_message.py
--- a/test/auth_cluster/test_password_login_message.py
+++ b/test/auth_cluster/test_password_login_message.py
@@ -1,49 +0,0 @@
-#
-# Copyright (C) 2023-present ScyllaDB
-#
-# SPDX-License-Identifier: AGPL-3.0-or-later
-#
-
-from test.pylib.manager_client import ManagerClient
-import pytest
-import logging
-from cassandra.cluster import NoHostAvailable
-
-logger = logging.getLogger(__name__)
-
-"""
-Tests the error message after half of the cluster goes down.
-
-This test reproduces ScyllaDB OSS issue #2339. We create a
-new cluster with 2 nodes and then stop one of them. We then
-try to connect to the cluster and expect to get the error
-message:
-"Cannot achieve consistency level for cl QUORUM. Requires 1, alive 0"
-"""
-...@pytest.mark.asyncio
-async def test_login_message_after_half_of_the_cluster_is_down(manager: ManagerClient) -> None:
- """Tests the error message after half of the cluster goes down"""
- config = {
- 'failure_detector_timeout_in_ms': 2000,
- 'authenticator': 'PasswordAuthenticator',
- 'authorizer': 'CassandraAuthorizer',
- 'permissions_validity_in_ms': 0,
- 'permissions_update_interval_in_ms': 0,
- }
-
- servers = await manager.servers_add(2, config=config)
- cql = manager.get_cql()
- # system_auth is ALTERed to make the test stable, because by default
- # system_auth has SimpleStrategy with RF=1, but if RF=1, we cannot make sure
- # which node has the replica, and we have to kill one node
- cql.execute(f"ALTER KEYSPACE system_auth WITH replication = {{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}}")
- await manager.server_stop_gracefully(servers[1].server_id)
- try:
- """This is expected to fail"""
- await manager.driver_connect(server=servers[0])
- except NoHostAvailable as e:
- message = str([v for k, v in e.args[1].items()][0])
- expected_msg = "Cannot achieve consistency level for cl QUORUM. Requires 2, alive 1"
- assert expected_msg in message, f"Expected message: '{expected_msg}', got: '{message}'"
- else:
- pytest.fail("Expected NoHostAvailable exception")
diff --git a/test/boost/auth_test.cc b/test/boost/auth_test.cc
--- a/test/boost/auth_test.cc
+++ b/test/boost/auth_test.cc
@@ -32,6 +32,18 @@
#include "utils/fmt-compat.hh"
+cql_test_config auth_on(bool with_authorizer = true) {
+ cql_test_config cfg;
+ if (with_authorizer) {
+ cfg.db_config->authorizer("CassandraAuthorizer");
+ }
+ cfg.db_config->authenticator("PasswordAuthenticator");
+ cfg.db_config->experimental_features({
+ db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES,
+ });
+ return cfg;
+}
+
SEASTAR_TEST_CASE(test_default_authenticator) {
return do_with_cql_env([](cql_test_env& env) {
auto& a = env.local_auth_service().underlying_authenticator();
@@ -42,15 +54,12 @@ SEASTAR_TEST_CASE(test_default_authenticator) {
}
SEASTAR_TEST_CASE(test_password_authenticator_attributes) {
- auto cfg = make_shared<db::config>();
- cfg->authenticator(sstring(auth::password_authenticator_name));
-
return do_with_cql_env([](cql_test_env& env) {
auto& a = env.local_auth_service().underlying_authenticator();
BOOST_REQUIRE(a.require_authentication());
BOOST_REQUIRE_EQUAL(a.qualified_java_name(), auth::password_authenticator_name);
return make_ready_future();
- }, cfg);
+ }, auth_on(false));
}
static future<auth::authenticated_user>
@@ -83,9 +92,6 @@ future<> require_throws(seastar::future<Args...> fut) {
}
SEASTAR_TEST_CASE(test_password_authenticator_operations) {
- auto cfg = make_shared<db::config>();
- cfg->authenticator(sstring(auth::password_authenticator_name));
-
/**
* Not using seastar::async due to apparent ASan bug.
* Enjoy the slightly less readable code.
@@ -152,62 +158,45 @@ SEASTAR_TEST_CASE(test_password_authenticator_operations) {
return require_throws<exceptions::authentication_exception>(authenticate(env, username, password));
});
});
- }, cfg);
+ }, auth_on(false));
}
namespace {
/// Asserts that table is protected from alterations that can brick a node.
void require_table_protected(cql_test_env& env, const char* table) {
- using exception_predicate::message_contains;
+ using exception_predicate::message_matches;
using unauth = exceptions::unauthorized_exception;
const auto q = [&] (const char* stmt) { return env.execute_cql(fmt::format(fmt::runtime(stmt), table)).get(); };
+ const char* pattern = ".*(is protected)|(is not user-modifiable).*";
BOOST_TEST_INFO(table);
- BOOST_REQUIRE_EXCEPTION(q("ALTER TABLE {} ALTER role TYPE blob"), unauth, message_contains("is protected"));
- BOOST_REQUIRE_EXCEPTION(q("ALTER TABLE {} RENAME role TO user"), unauth, message_contains("is protected"));
- BOOST_REQUIRE_EXCEPTION(q("ALTER TABLE {} DROP role"), unauth, message_contains("is protected"));
- BOOST_REQUIRE_EXCEPTION(q("DROP TABLE {}"), unauth, message_contains("is protected"));
-}
-
-cql_test_config auth_on() {
- cql_test_config cfg;
- cfg.db_config->authorizer("CassandraAuthorizer");
- cfg.db_config->authenticator("PasswordAuthenticator");
- return cfg;
+ BOOST_REQUIRE_EXCEPTION(q("ALTER TABLE {} ALTER role TYPE blob"), unauth, message_matches(pattern));
+ BOOST_REQUIRE_EXCEPTION(q("ALTER TABLE {} RENAME role TO user"), unauth, message_matches(pattern));
+ BOOST_REQUIRE_EXCEPTION(q("ALTER TABLE {} DROP role"), unauth, message_matches(pattern));
+ BOOST_REQUIRE_EXCEPTION(q("DROP TABLE {}"), unauth, message_matches(pattern));
}
} // anonymous namespace
SEASTAR_TEST_CASE(roles_table_is_protected) {
return do_with_cql_env_thread([] (cql_test_env& env) {
- require_table_protected(env, "system_auth.roles");
+ require_table_protected(env, "system_auth_v2.roles");
}, auth_on());
}
SEASTAR_TEST_CASE(role_members_table_is_protected) {
return do_with_cql_env_thread([] (cql_test_env& env) {
- require_table_protected(env, "system_auth.role_members");
+ require_table_protected(env, "system_auth_v2.role_members");
}, auth_on());
}
SEASTAR_TEST_CASE(role_permissions_table_is_protected) {
return do_with_cql_env_thread([] (cql_test_env& env) {
- require_table_protected(env, "system_auth.role_permissions");
- }, auth_on());
-}
-
-SEASTAR_TEST_CASE(alter_opts_on_system_auth_tables) {
- return do_with_cql_env_thread([] (cql_test_env& env) {
- cquery_nofail(env, "ALTER TABLE system_auth.roles WITH speculative_retry = 'NONE'");
- cquery_nofail(env, "ALTER TABLE system_auth.role_members WITH gc_grace_seconds = 123");
- cquery_nofail(env, "ALTER TABLE system_auth.role_permissions WITH min_index_interval = 456");
+ require_table_protected(env, "system_auth_v2.role_permissions");
}, auth_on());
}
SEASTAR_TEST_CASE(test_alter_with_timeouts) {
- auto cfg = make_shared<db::config>();
- cfg->authenticator(sstring(auth::password_authenticator_name));
-
return do_with_cql_env_thread([] (cql_test_env& e) {
auth::role_config config {
.can_login = true,
@@ -303,13 +292,10 @@ SEASTAR_TEST_CASE(test_alter_with_timeouts) {
cquery_nofail(e, "SELECT * FROM t where id = 1 BYPASS CACHE");
cquery_nofail(e, "SELECT * FROM t BYPASS CACHE");
cquery_nofail(e, "INSERT INTO t (id, v) VALUES (1,2)");
- }, cfg);
+ }, auth_on(false));
}
SEASTAR_TEST_CASE(test_alter_with_workload_type) {
- auto cfg = make_shared<db::config>();
- cfg->authenticator(sstring(auth::password_authenticator_name));
-
return do_with_cql_env_thread([] (cql_test_env& e) {
auth::role_config config {
.can_login = true,
@@ -369,5 +355,5 @@ SEASTAR_TEST_CASE(test_alter_with_workload_type) {
authenticate(e, "user4", "pass").get();
e.refresh_client_state().get();
BOOST_REQUIRE_EQUAL(e.local_client_state().get_workload_type(), service::client_state::workload_type::interactive);
- }, cfg);
+ }, auth_on(false));
}
diff --git a/test/boost/role_manager_test.cc b/test/boost/role_manager_test.cc
--- a/test/boost/role_manager_test.cc
+++ b/test/boost/role_manager_test.cc
@@ -18,7 +18,7 @@ auto make_manager(cql_test_env& env) {
std::default_delete<auth::standard_role_manager>()(m);
};
return std::unique_ptr<auth::standard_role_manager, decltype(stop_role_manager)>(
- new auth::standard_role_manager(env.local_qp(), env.migration_manager().local()),
+ new auth::standard_role_manager(env.local_qp(), env.get_raft_group0_client(), env.migration_manager().local()),
std::move(stop_role_manager));
}
diff --git a/test/cql-pytest/test_permissions.py b/test/cql-pytest/test_permissions.py
--- a/test/cql-pytest/test_permissions.py
+++ b/test/cql-pytest/test_permissions.py
@@ -151,7 +151,7 @@ def test_udf_permissions_serialization(cql):
for permission in permissions:
cql.execute(f"GRANT {permission} ON {resource} TO {user}")
- permissions = {row.resource: row.permissions for row in cql.execute(f"SELECT * FROM system_auth.role_permissions")}
+ permissions = {row.resource: row.permissions for row in cql.execute(f"SELECT * FROM system_auth_v2.role_permissions")}
assert permissions['functions'] == set(['ALTER', 'AUTHORIZE', 'CREATE', 'DROP', 'EXECUTE'])
assert permissions[f'functions/{keyspace}'] == set(['ALTER', 'AUTHORIZE', 'CREATE', 'DROP', 'EXECUTE'])
assert permissions[f'functions/{keyspace}/{div_fun}[org.apache.cassandra.db.marshal.LongType^org.apache.cassandra.db.marshal.Int32Type]'] == set(['ALTER', 'AUTHORIZE', 'DROP', 'EXECUTE'])
diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc
--- a/test/lib/cql_test_env.cc
+++ b/test/lib/cql_test_env.cc
@@ -923,7 +923,7 @@ class single_node_cql_env : public cql_test_env {
auth_config.authenticator_java_name = qualified_authenticator_name;
auth_config.role_manager_java_name = qualified_role_manager_name;
- _auth_service.start(perm_cache_config, std::ref(_qp), std::ref(_mnotifier), std::ref(_mm), auth_config, maintenance_socket_enabled::no).get();
+ _auth_service.start(perm_cache_config, std::ref(_qp), std::ref(group0_client), std::ref(_mnotifier), std::ref(_mm), auth_config, maintenance_socket_enabled::no).get();
_auth_service.invoke_on_all([this] (auth::service& auth) {
return auth.start(_mm.local());
}).get();
diff --git a/test/pylib/manager_client.py b/test/pylib/manager_client.py
--- a/test/pylib/manager_client.py
+++ b/test/pylib/manager_client.py
@@ -21,6 +21,7 @@
from test.pylib.scylla_cluster import ReplaceConfig, ScyllaServer
from cassandra.cluster import Session as CassandraSession # type: ignore # pylint: disable=no-name-in-module
from cassandra.cluster import Cluster as CassandraCluster # type: ignore # pylint: disable=no-name-in-module
+from cassandra.auth import AuthProvider
import aiohttp
import asyncio
@@ -56,12 +57,15 @@ async def stop(self):
self.driver_close()
await self.client.shutdown()
- async def driver_connect(self, server: Optional[ServerInfo] = None) -> None:
+ async def driver_connect(self, server: Optional[ServerInfo] = None, auth_provider: Optional[AuthProvider] = None) -> None:
"""Connect to cluster"""
targets = [server] if server else await self.running_servers()
servers = [s_info.rpc_address for s_info in targets]
+ # avoids leaking connections if driver wasn't closed before
+ self.driver_close()
logger.debug("driver connecting to %s", servers)
- self.ccluster = self.con_gen(servers, self.port, self.use_ssl, self.auth_provider)
+ self.ccluster = self.con_gen(servers, self.port, self.use_ssl,
+ auth_provider if auth_provider else self.auth_provider)
self.cql = self.ccluster.connect()
def driver_close(self) -> None:
diff --git a/tools/schema_loader.cc b/tools/schema_loader.cc
--- a/tools/schema_loader.cc
+++ b/tools/schema_loader.cc
@@ -28,6 +28,7 @@
#include "db/large_data_handler.hh"
#include "db/system_distributed_keyspace.hh"
#include "db/schema_tables.hh"
+#include "db/system_auth_keyspace.hh"
#include "db/system_keyspace.hh"
#include "partition_slice_builder.hh"
#include "readers/combined.hh"
@@ -674,12 +675,15 @@ future<schema_ptr> load_one_schema_from_file(const db::config& dbcfg, std::files
}
schema_ptr load_system_schema(const db::config& cfg, std::string_view keyspace, std::string_view table) {
- const std::unordered_map<std::string_view, std::vector<schema_ptr>> schemas{
+ std::unordered_map<std::string_view, std::vector<schema_ptr>> schemas{
{db::schema_tables::NAME, db::schema_tables::all_tables(db::schema_features::full())},
{db::system_keyspace::NAME, db::system_keyspace::all_tables(cfg)},
{db::system_distributed_keyspace::NAME, db::system_distributed_keyspace::all_distributed_tables()},
{db::system_distributed_keyspace::NAME_EVERYWHERE, db::system_distributed_keyspace::all_everywhere_tables()},
};
+ if (cfg.check_experimental(db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES)) {
+ schemas[db::system_auth_keyspace::NAME] = db::system_auth_keyspace::all_tables();
+ }
auto ks_it = schemas.find(keyspace);
if (ks_it == schemas.end()) {
throw std::invalid_argument(fmt::format("unknown system keyspace: {}", keyspace));
diff --git a/tools/schema_loader.hh b/tools/schema_loader.hh
--- a/tools/schema_loader.hh
+++ b/tools/schema_loader.hh
@@ -45,6 +45,7 @@ future<schema_ptr> load_one_schema_from_file(const db::config& dbcfg, std::files
/// Note that only schemas from builtin system tables are supported, i.e.,
/// from the following keyspaces:
/// * system
+/// * system_auth_v2
/// * system_schema
/// * system_distributed
/// * system_distributed_everywhere