Scheduling groups (per shard) are comparable to threads in the sence that at
any given moment only one task can run per scheduling group (per shard).
One of the basic utilities for per thread management is pthread specific
data support through the pthread_getspecific.
This patch introduces similar mechanism inspired by the aformentiond
utility.
It allows for creating keys that contains objects or primitives. Once a
key configuration is set and a key is generated, an alocation will occure
for every new scheduling group that will be created and the constructor will be
called (if exists).
The data is always created and destroyed in the owning scheduling group's context.
The api inself contains two versions:
1. A global one that supports getting a specified value for the current sg or a specified
one.
2. An addition to the sg object that can get the specific value for the sg.
Two more API functions that were implemented are:
1. map_reduce over the values from all of the scheduling groups.
2. reduce over the values from all of the scheduling groups.
Testing:
New unit tests were added to test all of the above api.
Testing: Unit tests in dev mode and debug mode.
Signed-off-by: Eliran Sinvani <
elir...@scylladb.com>
Message-Id: <
20190909071018.3...@scylladb.com>
---
Changes from V5:
The previous version was reverted due to unused result
warning as an error which turned out to be a real bug.
This version adds using the result to wait for all shards
to destroy the scheduling group.
include/seastar/core/reactor.hh | 41 +++-
include/seastar/core/scheduling.hh | 170 ++++++++++++++++
include/seastar/core/scheduling_specific.hh | 124 ++++++++++++
src/core/reactor.cc | 98 ++++++++-
tests/unit/CMakeLists.txt | 3 +
tests/unit/scheduling_group_test.cc | 213 ++++++++++++++++++++
6 files changed, 643 insertions(+), 6 deletions(-)
create mode 100644 include/seastar/core/scheduling_specific.hh
create mode 100644 tests/unit/scheduling_group_test.cc
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index b4ba64a8..a07e4c54 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -528,6 +528,12 @@ class reactor {
uint64_t _tasks_processed = 0;
circular_buffer<std::unique_ptr<task>> _q;
sstring _name;
+ /**
+ * This array holds pointers to the scheduling group specific
+ * data. The pointer is not use as is but is cast to a reference
+ * to the appropriate type that is actually pointed to.
+ */
+ std::vector<void*> _scheduling_group_specific_vals;
int64_t to_vruntime(sched_clock::duration runtime) const;
void set_shares(float shares);
struct indirect_compare;
@@ -538,6 +544,7 @@ class reactor {
void register_stats();
};
boost::container::static_vector<std::unique_ptr<task_queue>, max_scheduling_groups()> _task_queues;
+ std::vector<scheduling_group_key_config> _scheduling_group_key_configs;
int64_t _last_vruntime = 0;
task_queue_list _active_task_queues;
task_queue_list _activating_task_queues;
@@ -643,8 +650,20 @@ class reactor {
void insert_activating_task_queues();
void account_runtime(task_queue& tq, sched_clock::duration runtime);
void account_idle(sched_clock::duration idletime);
- void init_scheduling_group(scheduling_group sg, sstring name, float shares);
- void destroy_scheduling_group(scheduling_group sg);
+ void allocate_scheduling_group_specific_data(scheduling_group sg, scheduling_group_key key);
+ future<> init_scheduling_group(scheduling_group sg, sstring name, float shares);
+ future<> init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg);
+ future<> destroy_scheduling_group(scheduling_group sg);
+ [[noreturn]] void no_such_scheduling_group(scheduling_group sg);
+ void* get_scheduling_group_specific_value(scheduling_group sg, scheduling_group_key key) {
+ if (!_task_queues[sg._id]) {
+ no_such_scheduling_group(sg);
+ }
+ return _task_queues[sg._id]->_scheduling_group_specific_vals[
key.id()];
+ }
+ void* get_scheduling_group_specific_value(scheduling_group_key key) {
+ return get_scheduling_group_specific_value(*internal::current_scheduling_group_ptr(), key);
+ }
uint64_t tasks_processed() const;
uint64_t min_vruntime() const;
void request_preemption();
@@ -871,6 +890,24 @@ class reactor {
friend future<scheduling_group> create_scheduling_group(sstring name, float shares);
friend future<> seastar::destroy_scheduling_group(scheduling_group);
friend future<> seastar::rename_scheduling_group(scheduling_group sg, sstring new_name);
+ friend future<scheduling_group_key> scheduling_group_key_create(scheduling_group_key_config cfg);
+
+ template<typename T>
+ friend T& scheduling_group_get_specific(scheduling_group sg, scheduling_group_key key);
+ template<typename T>
+ friend T& scheduling_group_get_specific(scheduling_group_key key);
+ template<typename SpecificValType, typename Mapper, typename Reducer, typename Initial>
+ GCC6_CONCEPT( requires requires(SpecificValType specific_val, Mapper mapper, Reducer reducer, Initial initial) {
+ {reducer(initial, mapper(specific_val))} -> Initial;
+ })
+ friend future<typename function_traits<Reducer>::return_type>
+ map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer, Initial initial_val, scheduling_group_key key);
+ template<typename SpecificValType, typename Reducer, typename Initial>
+ GCC6_CONCEPT( requires requires(SpecificValType specific_val, Reducer reducer, Initial initial) {
+ {reducer(initial, specific_val)} -> Initial;
+ })
+ friend future<typename function_traits<Reducer>::return_type>
+ reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, scheduling_group_key key);
public:
bool wait_and_process(int timeout = 0, const sigset_t* active_sigmask = nullptr);
future<> readable(pollable_fd_state& fd);
diff --git a/include/seastar/core/scheduling.hh b/include/seastar/core/scheduling.hh
index 3868943f..cb2d9c61 100644
--- a/include/seastar/core/scheduling.hh
+++ b/include/seastar/core/scheduling.hh
@@ -21,7 +21,10 @@
#pragma once
+#include <typeindex>
#include <seastar/core/sstring.hh>
+#include <seastar/core/function_traits.hh>
+#include <seastar/util/gcc6-concepts.hh>
/// \file
@@ -80,6 +83,147 @@ future<> destroy_scheduling_group(scheduling_group sg);
/// \return a future that is ready when the scheduling group has been renamed
future<> rename_scheduling_group(scheduling_group sg, sstring new_name);
+
+/**
+ * Represents a configuration for a specific scheduling group value,
+ * it contains all that is needed to maintain a scheduling group specific
+ * value when it needs to be created, due to, for example, a new \ref scheduling
+ * group being created.
+ *
+ * @note is is recomended to use @ref make_scheduling_group_key_config in order to
+ * create and configure this syructure. The only reason that one might want to not use
+ * this method is because of a need for specific intervention in the construction or
+ * destruction of the value. Even then, it is recommended to first create the configuration
+ * with @ref make_scheduling_group_key_config and only the change it.
+ *
+ */
+struct scheduling_group_key_config {
+ /**
+ * Constructs a default configuration
+ */
+ scheduling_group_key_config() :
+ scheduling_group_key_config(typeid(void)) {}
+ /**
+ * Creates a configuration that is made for a specific type.
+ * It does not contain the right alignment and allocation sizes
+ * neither the correct construction or destruction logic, but only
+ * the indication for the intended type which is used in debug mode
+ * to make sure that the correct type is reffered to when accessing
+ * the value.
+ * @param type_info - the type information class (create with typeid(T)).
+ */
+ scheduling_group_key_config(const std::type_info& type_info) :
+ type_index(type_info) {}
+ /// The allocation size for the value (usually: sizeof(T))
+ size_t allocation_size;
+ /// The required alignment of the value (usually: alignof(T))
+ size_t alignment;
+ /// Holds the type information for debug mode runtime validation
+ std::type_index type_index;
+ /// A function that will be called for each newly allocated value
+ std::function<void (void*)> constructor;
+ /// A function that will be called for each element that is about
+ /// to be dealocated.
+ std::function<void (void*)> destructor;
+
+};
+
+
+/**
+ * A class that is intended to encapsulate the scheduling group specific
+ * key and "hide" it implementation concerns and details.
+ *
+ * @note this object can be copied accross shards and scheduling groups.
+ */
+class scheduling_group_key {
+public:
+ /// The only user allowed operation on a key is copying.
+ scheduling_group_key(const scheduling_group_key&) = default;
+private:
+ scheduling_group_key(unsigned long id) :
+ _id(id) {}
+ unsigned long _id;
+ unsigned long id() const {
+ return _id;
+ }
+ friend class reactor;
+ friend future<scheduling_group_key> scheduling_group_key_create(scheduling_group_key_config cfg);
+ template<typename T>
+ friend T& scheduling_group_get_specific(scheduling_group sg, scheduling_group_key key);
+ template<typename T>
+ friend T& scheduling_group_get_specific(scheduling_group_key key);
+};
+
+namespace internal {
+/**
+ * @brief A function in the spirit of Cpp17 apply, but specifically for constructors.
+ * This function is used in order to preserve support in Cpp14.
+
+ * @tparam ConstructorType - the constructor type or in other words the type to be constructed
+ * @tparam Tuple - T params tuple type (should be deduced)
+ * @tparam size_t...Idx - a sequence of indexes in order to access the typpels members in compile time.
+ * (should be deduced)
+ *
+ * @param pre_alocated_mem - a pointer to the pre allocated memory chunk that will hold the
+ * the initialized object.
+ * @param args - A tupple that holds the prarameters for the constructor
+ * @param idx_seq - An index sequence that will be used to access the members of the tuple in compile
+ * time.
+ *
+ * @note this function was not intended to be called by users and it is only a utility function
+ * for suporting \ref make_scheduling_group_key_config
+ */
+template<typename ConstructorType, typename Tuple, size_t...Idx>
+void apply_constructor(void* pre_alocated_mem, Tuple args, std::index_sequence<Idx...> idx_seq) {
+ new (pre_alocated_mem) ConstructorType(std::get<Idx>(args)...);
+}
+}
+
+/**
+ * A template function that builds a scheduling group specific value configuration.
+ * This configuration is used by the infrastructure to allocate memory for the values
+ * and initialize or deinitialize them when they are created or destroyed.
+ *
+ * @tparam T - the type for the newly created value.
+ * @tparam ...ConstructorArgs - the types for the constructor parameters (should be deduced)
+ * @param args - The parameters for the constructor.
+ * @return a fully initialized \ref scheduling_group_key_config object.
+ */
+template <typename T, typename... ConstructorArgs>
+scheduling_group_key_config
+make_scheduling_group_key_config(ConstructorArgs... args) {
+ scheduling_group_key_config sgkc(typeid(T));
+ sgkc.allocation_size = sizeof(T);
+ sgkc.alignment = alignof(T);
+ sgkc.constructor = [args = std::make_tuple(args...)] (void* p) {
+ internal::apply_constructor<T>(p, args, std::make_index_sequence<sizeof...(ConstructorArgs)>());
+ };
+ sgkc.destructor = [] (void* p) {
+ static_cast<T*>(p)->~T();
+ };
+ return sgkc;
+}
+
+/**
+ * Returns a future that holds a scheduling key and resolves when this key can be used
+ * to access the scheduling group specific value it represents.
+ * @param cfg - A \ref scheduling_group_key_config object (by recomendation: initialized with
+ * \ref make_scheduling_group_key_config )
+ * @return A future containing \ref scheduling_group_key for the newly created specific value.
+ */
+future<scheduling_group_key> scheduling_group_key_create(scheduling_group_key_config cfg);
+
+/**
+ * Returnes a reference to the given scheduling group specific value
+ * @tparam T - the type of the scheduling specific type (cannot be deduced)
+ * @param sg - the scheduling group which it's specific value to retrieve
+ * @param key - the key of the value to retrieve.
+ * @return A reference to the scheduling specific value.
+ */
+template<typename T>
+T& scheduling_group_get_specific(scheduling_group sg, scheduling_group_key key);
+
+
/// \brief Identifies function calls that are accounted as a group
///
/// A `scheduling_group` is a tag that can be used to mark a function call.
@@ -96,6 +240,16 @@ class scheduling_group {
bool operator==(scheduling_group x) const { return _id == x._id; }
bool operator!=(scheduling_group x) const { return _id != x._id; }
bool is_main() const { return _id == 0; }
+ template<typename T>
+ /**
+ * Returnes a reference to this scheduling group specific value
+ * @tparam T - the type of the scheduling specific type (cannot be deduced)
+ * @param key - the key of the value to retrieve.
+ * @return A reference to this scheduling specific value.
+ */
+ T& get_specific(scheduling_group_key key) {
+ return scheduling_group_get_specific<T>(*this, key);
+ }
/// Adjusts the number of shares allotted to the group.
///
/// Dynamically adjust the number of shares allotted to the group, increasing or
@@ -115,6 +269,22 @@ class scheduling_group {
friend class reactor;
friend unsigned internal::scheduling_group_index(scheduling_group sg);
friend scheduling_group internal::scheduling_group_from_index(unsigned index);
+
+ template<typename SpecificValType, typename Mapper, typename Reducer, typename Initial>
+ GCC6_CONCEPT( requires requires(SpecificValType specific_val, Mapper mapper, Reducer reducer, Initial initial) {
+ {reducer(initial, mapper(specific_val))} -> Initial;
+ })
+ friend future<typename function_traits<Reducer>::return_type>
+ map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer, Initial initial_val, scheduling_group_key key);
+
+ template<typename SpecificValType, typename Reducer, typename Initial>
+ GCC6_CONCEPT( requires requires(SpecificValType specific_val, Reducer reducer, Initial initial) {
+ {reducer(initial, specific_val)} -> Initial;
+ })
+ friend future<typename function_traits<Reducer>::return_type>
+ reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, scheduling_group_key key);
+
+
};
/// \cond internal
diff --git a/include/seastar/core/scheduling_specific.hh b/include/seastar/core/scheduling_specific.hh
new file mode 100644
index 00000000..813182fe
--- /dev/null
+++ b/include/seastar/core/scheduling_specific.hh
@@ -0,0 +1,124 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2019 Scylla DB Ltd
+ */
+
+#include <boost/range/adaptor/filtered.hpp>
+#include <seastar/core/reactor.hh>
+
+#pragma once
+
+namespace seastar {
+
+/**
+ * Returns a reference to the given scheduling group specific data.
+ * @param sg - The scheduling group which it's data needs to be accessed
+ * @param key - The scheduling group key that for the data to access
+ * @return A reference of type T& to the data.
+ *
+ * @note The parameter T has to be given since there is no way to deduce it.
+ */
+template<typename T>
+T& scheduling_group_get_specific(scheduling_group sg, scheduling_group_key key) {
+#ifdef SEASTAR_DEBUG
+ assert(std::type_index(typeid(T)) == engine()._scheduling_group_key_configs[
key.id()].type_index);
+#endif
+ return *reinterpret_cast<T*>(engine().get_scheduling_group_specific_value(sg, key));
+}
+
+/**
+ * Returns a reference to the current specific data.
+ * @param key - The scheduling group key that for the data to access
+ * @return A reference of type T& to the data.
+ *
+ * @note The parameter T has to be given since there is no way to deduce it.
+ */
+template<typename T>
+T& scheduling_group_get_specific(scheduling_group_key key) {
+#ifdef SEASTAR_DEBUG
+ assert(std::type_index(typeid(T)) == engine()._scheduling_group_key_configs[
key.id()].type_index);
+#endif
+ return *reinterpret_cast<T*>(engine().get_scheduling_group_specific_value(key));
+}
+
+/**
+ * A map reduce over all values of a specific scheduling group data.
+ * @param mapper - A functor SomeType(SpecificValType&) or SomeType(SpecificValType) that maps
+ * the specific data to a value of any type.
+ * @param reducer - A functor of of type ConvetibleToInitial(Initial, MapperReurnType) that reduces
+ * a value of type Initial and of the mapper return type to a value of type convertible to Initial.
+ * @param initial_val - the initial value to pass in the first call to the reducer.
+ * @param key - the key to the specific data that the mapper should act upon.
+ * @return A future that resolves when the result of the map reduce is ready.
+ * @note The type of SpecificValType must be given because there is no way to deduce it in a *consistent*
+ * manner.
+ * @note Theoretically the parameter type of Mapper can be deduced to be the type (function_traits<Mapper>::arg<0>)
+ * but then there is a danger when the Mapper accepts a parameter type T where SpecificValType is convertible to
+ * SpecificValType.
+ */
+template<typename SpecificValType, typename Mapper, typename Reducer, typename Initial>
+GCC6_CONCEPT( requires requires(SpecificValType specific_val, Mapper mapper, Reducer reducer, Initial initial) {
+ {reducer(initial, mapper(specific_val))} -> Initial;
+})
+future<typename function_traits<Reducer>::return_type>
+map_reduce_scheduling_group_specific(Mapper mapper, Reducer reducer,
+ Initial initial_val, scheduling_group_key key) {
+ auto wrapped_mapper = [key, mapper] (std::unique_ptr<reactor::task_queue>& tq) {
+ return make_ready_future<typename function_traits<Mapper>::return_type>
+ (mapper(scheduling_group(tq->_id).get_specific<SpecificValType>(key)));
+ };
+ auto queue_exists = [] (std::unique_ptr<reactor::task_queue>& tq) {
+ return bool(tq);
+ };
+
+ return map_reduce(engine()._task_queues|boost::adaptors::filtered(queue_exists),
+ wrapped_mapper, std::move(initial_val), reducer);
+}
+
+/**
+ * A reduce over all values of a specific scheduling group data.
+ * @param reducer - A functor of of type ConvetibleToInitial(Initial, SpecificValType) that reduces
+ * a value of type Initial and of the sg specific data type to a value of type convertible to Initial.
+ * @param initial_val - the initial value to pass in the first call to the reducer.
+ * @param key - the key to the specific data that the mapper should act upon.
+ * @return A future that resolves when the result of the reduce is ready.
+ * * @note The type of SpecificValType must be given because there is no way to deduce it in a *consistent*
+ * manner.
+ * @note Theoretically the parameter type of Reducer can be deduced to be the type (function_traits<Reducer>::arg<0>)
+ * but then there is a danger when the Reducer accepts a parameter type T where SpecificValType is convertible to
+ * SpecificValType.
+ */
+template<typename SpecificValType, typename Reducer, typename Initial>
+GCC6_CONCEPT( requires requires(SpecificValType specific_val, Reducer reducer, Initial initial) {
+ {reducer(initial, specific_val)} -> Initial;
+})
+future<typename function_traits<Reducer>::return_type>
+reduce_scheduling_group_specific(Reducer reducer, Initial initial_val, scheduling_group_key key) {
+ auto mapper = [key] (std::unique_ptr<reactor::task_queue>& tq) {
+ return make_ready_future<SpecificValType>(scheduling_group(tq->_id).get_specific<SpecificValType>(key));
+ };
+ auto queue_exists = [] (std::unique_ptr<reactor::task_queue>& tq) {
+ return bool(tq);
+ };
+
+ return map_reduce(engine()._task_queues|boost::adaptors::filtered(queue_exists),
+ mapper, std::move(initial_val), reducer);
+}
+
+}
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 3ad03926..2bb8ee87 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -1464,6 +1464,23 @@ reactor::~reactor() {
eraser(_expired_lowres_timers);
eraser(_expired_manual_timers);
io_destroy(_io_context);
+ for (auto&& tq : _task_queues) {
+ if (tq) {
+ // The following line will preserve the convention that constructor and destructor functions
+ // for the per sg values are called in the context of the containing scheduling group.
+ *internal::current_scheduling_group_ptr() = scheduling_group(tq->_id);
+ for (size_t key : boost::irange<size_t>(0, _scheduling_group_key_configs.size())) {
+ void* val = tq->_scheduling_group_specific_vals[key];
+ if (val) {
+ if (_scheduling_group_key_configs[key].destructor) {
+ _scheduling_group_key_configs[key].destructor(val);
+ }
+ free(val);
+ tq->_scheduling_group_specific_vals[key] = nullptr;
+ }
+ }
+ }
+ }
}
bool reactor::wait_and_process(int timeout, const sigset_t* active_sigmask) {
@@ -6124,6 +6141,7 @@ std::chrono::nanoseconds reactor::total_steal_time() {
}
static std::atomic<unsigned long> s_used_scheduling_group_ids_bitmap{3}; // 0=main, 1=atexit
+static std::atomic<unsigned long> s_next_scheduling_group_specific_key{0};
static
unsigned
@@ -6142,6 +6160,12 @@ allocate_scheduling_group_id() {
return i;
}
+static
+unsigned long
+allocate_scheduling_group_specific_key() {
+ return s_next_scheduling_group_specific_key.fetch_add(1, std::memory_order_relaxed);
+}
+
static
void
deallocate_scheduling_group_id(unsigned id) {
@@ -6149,14 +6173,70 @@ deallocate_scheduling_group_id(unsigned id) {
}
void
+reactor::allocate_scheduling_group_specific_data(scheduling_group sg, scheduling_group_key key) {
+ std::unique_ptr<task_queue>& tq = _task_queues[sg._id];
+ tq->_scheduling_group_specific_vals.resize(std::max<size_t>(tq->_scheduling_group_specific_vals.size(),
key.id()+1));
+ tq->_scheduling_group_specific_vals[
key.id()] =
+ aligned_alloc(_scheduling_group_key_configs[
key.id()].alignment,
+ _scheduling_group_key_configs[
key.id()].allocation_size);
+ if (!tq->_scheduling_group_specific_vals[
key.id()]) {
+ std::abort();
+ }
+ if (_scheduling_group_key_configs[
key.id()].constructor) {
+ _scheduling_group_key_configs[
key.id()].constructor(tq->_scheduling_group_specific_vals[
key.id()]);
+ }
+}
+
+future<>
reactor::init_scheduling_group(seastar::scheduling_group sg, sstring name, float shares) {
_task_queues.resize(std::max<size_t>(_task_queues.size(), sg._id + 1));
_task_queues[sg._id] = std::make_unique<task_queue>(sg._id, name, shares);
+ unsigned long num_keys = s_next_scheduling_group_specific_key.load(std::memory_order_relaxed);
+
+ return with_scheduling_group(sg, [this, num_keys, sg] () {
+ for (unsigned long key_id = 0; key_id < num_keys; key_id++) {
+ allocate_scheduling_group_specific_data(sg, scheduling_group_key(key_id));
+ }
+ });
}
-void
+future<>
+reactor::init_new_scheduling_group_key(scheduling_group_key key, scheduling_group_key_config cfg) {
+ _scheduling_group_key_configs.resize(std::max<size_t>(_scheduling_group_key_configs.size(),
key.id() + 1));
+ _scheduling_group_key_configs[
key.id()] = cfg;
+ return parallel_for_each(_task_queues, [this, cfg, key] (std::unique_ptr<task_queue>& tq) {
+ if (tq) {
+ scheduling_group sg = scheduling_group(tq->_id);
+ return with_scheduling_group(sg, [this, key, sg] () {
+ allocate_scheduling_group_specific_data(sg, key);
+ });
+ }
+ return make_ready_future();
+ });
+}
+
+future<>
reactor::destroy_scheduling_group(scheduling_group sg) {
- _task_queues[sg._id].reset();
+ return with_scheduling_group(sg, [this, sg] () {
+ for (unsigned long key_id = 0; key_id < _scheduling_group_key_configs.size(); key_id++) {
+ void* val = _task_queues[sg._id]->_scheduling_group_specific_vals[key_id];
+ if (val) {
+ if (_scheduling_group_key_configs[key_id].destructor) {
+ _scheduling_group_key_configs[key_id].destructor(val);
+ }
+ free(val);
+ _task_queues[sg._id]->_scheduling_group_specific_vals[key_id] = nullptr;
+ }
+ }
+ }).then( [this, sg] () {
+ _task_queues[sg._id].reset();
+ });
+
+}
+
+void
+reactor::no_such_scheduling_group(scheduling_group sg) {
+ throw std::invalid_argument(format("The scheduling group does not exist ({})", sg._id));
}
const sstring&
@@ -6175,12 +6255,22 @@ create_scheduling_group(sstring name, float shares) {
assert(id < max_scheduling_groups());
auto sg = scheduling_group(id);
return smp::invoke_on_all([sg, name, shares] {
- engine().init_scheduling_group(sg, name, shares);
+ return engine().init_scheduling_group(sg, name, shares);
}).then([sg] {
return make_ready_future<scheduling_group>(sg);
});
}
+future<scheduling_group_key>
+scheduling_group_key_create(scheduling_group_key_config cfg) {
+ scheduling_group_key key = allocate_scheduling_group_specific_key();
+ return smp::invoke_on_all([key, cfg] {
+ return engine().init_new_scheduling_group_key(key, cfg);
+ }).then([key] {
+ return make_ready_future<scheduling_group_key>(key);
+ });
+}
+
future<>
rename_priority_class(io_priority_class pc, sstring new_name) {
return reactor::rename_priority_class(pc, new_name);
@@ -6195,7 +6285,7 @@ destroy_scheduling_group(scheduling_group sg) {
throw_with_backtrace<std::runtime_error>("Attempt to destroy the current scheduling group");
}
return smp::invoke_on_all([sg] {
- engine().destroy_scheduling_group(sg);
+ return engine().destroy_scheduling_group(sg);
}).then([sg] {
deallocate_scheduling_group_id(sg._id);
});
diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt
index dc12eecc..67be5eed 100644
--- a/tests/unit/CMakeLists.txt
+++ b/tests/unit/CMakeLists.txt
@@ -377,6 +377,9 @@ seastar_add_test (stall_detector
seastar_add_test (thread
SOURCES thread_test.cc)
+seastar_add_test (scheduling_group
+ SOURCES scheduling_group_test.cc)
+
seastar_add_app_test (thread_context_switch
SOURCES thread_context_switch_test.cc)
diff --git a/tests/unit/scheduling_group_test.cc b/tests/unit/scheduling_group_test.cc
new file mode 100644
index 00000000..0d411557
--- /dev/null
+++ b/tests/unit/scheduling_group_test.cc
@@ -0,0 +1,213 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2017 ScyllaDB Ltd.
+ */
+
+#include <algorithm>
+#include <vector>
+#include <chrono>
+
+#include <seastar/core/thread.hh>
+#include <seastar/testing/test_case.hh>
+#include <seastar/testing/thread_test_case.hh>
+#include <seastar/testing/test_runner.hh>
+#include <seastar/core/execution_stage.hh>
+#include <seastar/core/sleep.hh>
+#include <seastar/core/print.hh>
+#include <seastar/core/scheduling_specific.hh>
+
+using namespace std::chrono_literals;
+
+using namespace seastar;
+
+/**
+ * Test setting primitive and object as a value after all groups are created
+ */
+SEASTAR_THREAD_TEST_CASE(sg_specific_values_define_after_sg_create) {
+ using ivec = std::vector<int>;
+ const int num_scheduling_groups = 4;
+ std::vector<scheduling_group> sgs;
+ for (int i = 0; i < num_scheduling_groups; i++) {
+ sgs.push_back(create_scheduling_group(format("sg{}", i).c_str(), 100).get0());
+ }
+
+ const auto destroy_scheduling_groups = defer([&sgs] () {
+ for (scheduling_group sg : sgs) {
+ destroy_scheduling_group(sg).get();
+ }
+ });
+ scheduling_group_key_config key1_conf = make_scheduling_group_key_config<int>();
+ scheduling_group_key key1 = scheduling_group_key_create(key1_conf).get0();
+
+ scheduling_group_key_config key2_conf = make_scheduling_group_key_config<ivec>();
+ scheduling_group_key key2 = scheduling_group_key_create(key2_conf).get0();
+
+ smp::invoke_on_all([key1, key2, &sgs] () {
+ int factor = engine().cpu_id() + 1;
+ for (int i=0; i < num_scheduling_groups; i++) {
+ sgs[i].get_specific<int>(key1) = (i + 1) * factor;
+ sgs[i].get_specific<ivec>(key2).push_back((i + 1) * factor);
+ }
+
+ for (int i=0; i < num_scheduling_groups; i++) {
+ BOOST_REQUIRE_EQUAL(sgs[i].get_specific<int>(key1) = (i + 1) * factor, (i + 1) * factor);
+ BOOST_REQUIRE_EQUAL(sgs[i].get_specific<ivec>(key2)[0], (i + 1) * factor);
+ }
+
+ }).get();
+
+ smp::invoke_on_all([key1, key2] () {
+ return reduce_scheduling_group_specific<int>(std::plus<int>(), int(0), key1).then([] (int sum) {
+ int factor = engine().cpu_id() + 1;
+ int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2;
+ BOOST_REQUIRE_EQUAL(expected_sum, sum);
+ }). then([key1, key2] {
+ auto ivec_to_int = [] (ivec& v) {
+ return v.size() ? v[0] : 0;
+ };
+
+ return map_reduce_scheduling_group_specific<ivec>(ivec_to_int, std::plus<int>(), int(0), key2).then([] (int sum) {
+ int factor = engine().cpu_id() + 1;
+ int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2;
+ BOOST_REQUIRE_EQUAL(expected_sum, sum);
+ });
+
+ });
+ }).get();
+
+
+}
+
+/**
+ * Test setting primitive and object as a value before all groups are created
+ */
+SEASTAR_THREAD_TEST_CASE(sg_specific_values_define_before_sg_create) {
+ using ivec = std::vector<int>;
+ const int num_scheduling_groups = 4;
+ std::vector<scheduling_group> sgs;
+ const auto destroy_scheduling_groups = defer([&sgs] () {
+ for (scheduling_group sg : sgs) {
+ destroy_scheduling_group(sg).get();
+ }
+ });
+ scheduling_group_key_config key1_conf = make_scheduling_group_key_config<int>();
+ scheduling_group_key key1 = scheduling_group_key_create(key1_conf).get0();
+
+ scheduling_group_key_config key2_conf = make_scheduling_group_key_config<ivec>();
+ scheduling_group_key key2 = scheduling_group_key_create(key2_conf).get0();
+
+ for (int i = 0; i < num_scheduling_groups; i++) {
+ sgs.push_back(create_scheduling_group(format("sg{}", i).c_str(), 100).get0());
+ }
+
+ smp::invoke_on_all([key1, key2, &sgs] () {
+ int factor = engine().cpu_id() + 1;
+ for (int i=0; i < num_scheduling_groups; i++) {
+ sgs[i].get_specific<int>(key1) = (i + 1) * factor;
+ sgs[i].get_specific<ivec>(key2).push_back((i + 1) * factor);
+ }
+
+ for (int i=0; i < num_scheduling_groups; i++) {
+ BOOST_REQUIRE_EQUAL(sgs[i].get_specific<int>(key1) = (i + 1) * factor, (i + 1) * factor);
+ BOOST_REQUIRE_EQUAL(sgs[i].get_specific<ivec>(key2)[0], (i + 1) * factor);
+ }
+
+ }).get();
+
+ smp::invoke_on_all([key1, key2] () {
+ return reduce_scheduling_group_specific<int>(std::plus<int>(), int(0), key1).then([] (int sum) {
+ int factor = engine().cpu_id() + 1;
+ int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2;
+ BOOST_REQUIRE_EQUAL(expected_sum, sum);
+ }). then([key1, key2] {
+ auto ivec_to_int = [] (ivec& v) {
+ return v.size() ? v[0] : 0;
+ };
+
+ return map_reduce_scheduling_group_specific<ivec>(ivec_to_int, std::plus<int>(), int(0), key2).then([] (int sum) {
+ int factor = engine().cpu_id() + 1;
+ int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2;
+ BOOST_REQUIRE_EQUAL(expected_sum, sum);
+ });
+
+ });
+ }).get();
+
+}
+
+/**
+ * Test setting primitive and an object as a value before some groups are created
+ * and after some of the groups are created.
+ */
+SEASTAR_THREAD_TEST_CASE(sg_specific_values_define_before_and_after_sg_create) {
+ using ivec = std::vector<int>;
+ const int num_scheduling_groups = 4;
+ std::vector<scheduling_group> sgs;
+ const auto destroy_scheduling_groups = defer([&sgs] () {
+ for (scheduling_group sg : sgs) {
+ destroy_scheduling_group(sg).get();
+ }
+ });
+
+ for (int i = 0; i < num_scheduling_groups/2; i++) {
+ sgs.push_back(create_scheduling_group(format("sg{}", i).c_str(), 100).get0());
+ }
+ scheduling_group_key_config key1_conf = make_scheduling_group_key_config<int>();
+ scheduling_group_key key1 = scheduling_group_key_create(key1_conf).get0();
+
+ scheduling_group_key_config key2_conf = make_scheduling_group_key_config<ivec>();
+ scheduling_group_key key2 = scheduling_group_key_create(key2_conf).get0();
+
+ for (int i = num_scheduling_groups/2; i < num_scheduling_groups; i++) {
+ sgs.push_back(create_scheduling_group(format("sg{}", i).c_str(), 100).get0());
+ }
+
+ smp::invoke_on_all([key1, key2, &sgs] () {
+ int factor = engine().cpu_id() + 1;
+ for (int i=0; i < num_scheduling_groups; i++) {
+ sgs[i].get_specific<int>(key1) = (i + 1) * factor;
+ sgs[i].get_specific<ivec>(key2).push_back((i + 1) * factor);
+ }
+
+ for (int i=0; i < num_scheduling_groups; i++) {
+ BOOST_REQUIRE_EQUAL(sgs[i].get_specific<int>(key1) = (i + 1) * factor, (i + 1) * factor);
+ BOOST_REQUIRE_EQUAL(sgs[i].get_specific<ivec>(key2)[0], (i + 1) * factor);
+ }
+
+ }).get();
+
+ smp::invoke_on_all([key1, key2] () {
+ return reduce_scheduling_group_specific<int>(std::plus<int>(), int(0), key1).then([] (int sum) {
+ int factor = engine().cpu_id() + 1;
+ int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2;
+ BOOST_REQUIRE_EQUAL(expected_sum, sum);
+ }). then([key1, key2] {
+ auto ivec_to_int = [] (ivec& v) {
+ return v.size() ? v[0] : 0;
+ };
+
+ return map_reduce_scheduling_group_specific<ivec>(ivec_to_int, std::plus<int>(), int(0), key2).then([] (int sum) {
+ int factor = engine().cpu_id() + 1;
+ int expected_sum = ((1 + num_scheduling_groups)*num_scheduling_groups) * factor /2;
+ BOOST_REQUIRE_EQUAL(expected_sum, sum);
+ });
+
+ });
+ }).get();
+}
--
2.20.1