As a continuation for e6463df8a ("smp: allow having multiple instances
of the smp class"), alien is extended to support multiple smp instances.
This means several independent Seastar shard-sets running in the same
process, to allow single-process testing of clusters.
To do this, the alien::smp class which was just a container of some
static functions and variables is renamed to alien::instance, and
its functions and variables are made into non-static members.
The alien::run_on() and alien::submit_to() functions are modified to
accept an alien::instance parameter, so they know which Seastar instance
to target (the old signatures are retained but deprecated; they will target
the first Seastar intance created).
To obtain the alien::instance object, one can call app_template::alien().
To construct multiple Seastar instances, one can create several app_template
instances and run them.
Note: qs_deleter was made into a non-member class. As a private member
it was not std::is_default_constructible, which made the unique_ptr
constructor using it not accessible.
---
include/seastar/core/alien.hh | 76 +++++++++++++++++++++++-----
include/seastar/core/app-template.hh | 13 +++++
include/seastar/core/reactor.hh | 4 +-
include/seastar/core/smp.hh | 8 +++
src/core/alien.cc | 14 ++---
src/core/app-template.cc | 10 +++-
src/core/reactor.cc | 17 ++++---
tests/unit/alien_test.cc | 6 +--
8 files changed, 114 insertions(+), 34 deletions(-)
diff --git a/include/seastar/core/alien.hh b/include/seastar/core/alien.hh
index 43510d4ea1..055b9bd794 100644
--- a/include/seastar/core/alien.hh
+++ b/include/seastar/core/alien.hh
@@ -95,39 +95,72 @@ class message_queue {
}
size_t process_incoming();
bool pure_poll_rx() const;
};
-class smp {
- struct qs_deleter {
- unsigned count;
- qs_deleter(unsigned n = 0) : count(n) {}
- qs_deleter(const qs_deleter& d) : count(d.count) {}
- void operator()(message_queue* qs) const;
- };
- using qs = std::unique_ptr<message_queue[], qs_deleter>;
+namespace internal {
+
+struct qs_deleter {
+ unsigned count;
+ qs_deleter(unsigned n = 0) : count(n) {}
+ qs_deleter(const qs_deleter& d) : count(d.count) {}
+ void operator()(message_queue* qs) const;
+};
+
+}
+
+/// Represents the Seastar system from alien's point of view. In a normal
+/// system, there is just one instance, but for in-process clustering testing
+/// there may be more than one. Function such as run_on() direct messages to
+/// and (instance, shard) tuple.
+class instance {
+ using qs = std::unique_ptr<message_queue[], internal::qs_deleter>;
public:
static qs create_qs(const std::vector<reactor*>& reactors);
- static qs _qs;
- static bool poll_queues();
- static bool pure_poll_queues();
+ qs _qs;
+ bool poll_queues();
+ bool pure_poll_queues();
};
+namespace internal {
+
+extern instance* default_instance;
+
+}
+
/// Runs a function on a remote shard from an alien thread where engine() is not available.
///
+/// \param instance designates the Seastar instance to process the message
/// \param shard designates the shard to run the function on
/// \param func a callable to run on shard \c t. If \c func is a temporary object,
/// its lifetime will be extended by moving it. If \c func is a reference,
/// the caller must guarantee that it will survive the call.
/// \note the func must not throw and should return \c void. as we cannot identify the
/// alien thread, hence we are not able to post the fulfilled promise to the
/// message queue managed by the shard executing the alien thread which is
/// interested to the return value. Please use \c submit_to() instead, if
/// \c func throws.
template <typename Func>
+void run_on(instance& instance, unsigned shard, Func func) {
+ instance._qs[shard].submit(std::move(func));
+}
+
+/// Runs a function on a remote shard from an alien thread where engine() is not available.
+///
+/// \param shard designates the shard to run the function on
+/// \param func a callable to run on shard \c t. If \c func is a temporary object,
+/// its lifetime will be extended by moving it. If \c func is a reference,
+/// the caller must guarantee that it will survive the call.
+/// \note the func must not throw and should return \c void. as we cannot identify the
+/// alien thread, hence we are not able to post the fulfilled promise to the
+/// message queue managed by the shard executing the alien thread which is
+/// interested to the return value. Please use \c submit_to() instead, if
+/// \c func throws.
+template <typename Func>
+[[deprecated("Use run_on(instance&, unsigned shard, Func) instead")]]
void run_on(unsigned shard, Func func) {
- smp::_qs[shard].submit(std::move(func));
+ run_on(*internal::default_instance, shard, std::move(func));
}
namespace internal {
template<typename Func>
using return_value_t = typename futurize<std::invoke_result_t<Func>>::value_type;
@@ -155,21 +188,22 @@ struct return_type_of<Func, false> {
template <typename Func> using return_type_t = typename return_type_of<Func>::type;
}
/// Runs a function on a remote shard from an alien thread where engine() is not available.
///
+/// \param instance designates the Seastar instance to process the message
/// \param shard designates the shard to run the function on
/// \param func a callable to run on \c shard. If \c func is a temporary object,
/// its lifetime will be extended by moving it. If \c func is a reference,
/// the caller must guarantee that it will survive the call.
/// \return whatever \c func returns, as a \c std::future<>
/// \note the caller must keep the returned future alive until \c func returns
template<typename Func, typename T = internal::return_type_t<Func>>
-std::future<T> submit_to(unsigned shard, Func func) {
+std::future<T> submit_to(instance& instance, unsigned shard, Func func) {
std::promise<T> pr;
auto fut = pr.get_future();
- run_on(shard, [pr = std::move(pr), func = std::move(func)] () mutable {
+ run_on(instance, shard, [pr = std::move(pr), func = std::move(func)] () mutable {
// std::future returned via std::promise above.
(void)func().then_wrapped([pr = std::move(pr)] (auto&& result) mutable {
try {
internal::return_type_of<Func>::set(pr, result.get());
} catch (...) {
@@ -178,7 +212,21 @@ std::future<T> submit_to(unsigned shard, Func func) {
});
});
return fut;
}
+/// Runs a function on a remote shard from an alien thread where engine() is not available.
+///
+/// \param shard designates the shard to run the function on
+/// \param func a callable to run on \c shard. If \c func is a temporary object,
+/// its lifetime will be extended by moving it. If \c func is a reference,
+/// the caller must guarantee that it will survive the call.
+/// \return whatever \c func returns, as a \c std::future<>
+/// \note the caller must keep the returned future alive until \c func returns
+template<typename Func, typename T = internal::return_type_t<Func>>
+[[deprecated("Use submit_to(instance&, unsigned shard, Func) instead.")]]
+std::future<T> submit_to(unsigned shard, Func func) {
+ return submit_to(*internal::default_instance, shard, std::move(func));
+}
+
}
}
diff --git a/include/seastar/core/app-template.hh b/include/seastar/core/app-template.hh
index 6103f5f113..a462b6d17a 100644
--- a/include/seastar/core/app-template.hh
+++ b/include/seastar/core/app-template.hh
@@ -27,10 +27,16 @@
#include <seastar/core/sstring.hh>
#include <chrono>
namespace seastar {
+namespace alien {
+
+class instance;
+
+}
+
class app_template {
public:
struct config {
/// The name of the application.
///
@@ -58,10 +64,12 @@ class app_template {
config() {}
};
using configuration_reader = std::function<void (boost::program_options::variables_map&)>;
private:
+ // unique_ptr to avoid pulling in alien.hh.
+ std::unique_ptr<alien::instance> _alien;
// reactor destruction is asynchronous, so we must let the last reactor
// destroy the smp instance
std::shared_ptr<smp> _smp;
config _cfg;
boost::program_options::options_description _opts;
@@ -78,20 +86,25 @@ class app_template {
const char* help;
int max_count;
};
public:
explicit app_template(config cfg = config());
+ ~app_template();
boost::program_options::options_description& get_options_description();
boost::program_options::options_description& get_conf_file_options_description();
boost::program_options::options_description_easy_init add_options();
void add_positional_options(std::initializer_list<positional_option> options);
boost::program_options::variables_map& configuration();
int run_deprecated(int ac, char ** av, std::function<void ()>&& func);
void set_configuration_reader(configuration_reader conf_reader);
+ /// Obtains an alien::instance object that can be used to send messages
+ /// to Seastar shards from non-Seastar threads.
+ alien::instance& alien() { return *_alien; }
+
// Runs given function and terminates the application when the future it
// returns resolves. The value with which the future resolves will be
// returned by this function.
int run(int ac, char ** av, std::function<future<int> ()>&& func);
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 461a4a7f2f..2b3a46f628 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -99,10 +99,11 @@ namespace seastar {
using shard_id = unsigned;
namespace alien {
class message_queue;
+class instance;
}
class reactor;
inline
size_t iovec_len(const std::vector<iovec>& iov)
{
@@ -243,10 +244,11 @@ class reactor {
};
friend void io_completion::complete_with(ssize_t);
private:
std::shared_ptr<smp> _smp;
+ alien::instance& _alien;
reactor_config _cfg;
file_desc _notify_eventfd;
file_desc _task_quota_timer;
#ifdef HAVE_OSV
reactor_backend_osv _backend;
@@ -457,11 +459,11 @@ class reactor {
do_write_some(pollable_fd_state& fd, const void* buffer, size_t size);
future<size_t>
do_write_some(pollable_fd_state& fd, net::packet& p);
public:
static boost::program_options::options_description get_options_description(reactor_config cfg);
- explicit reactor(std::shared_ptr<smp> smp, unsigned id, reactor_backend_selector rbs, reactor_config cfg);
+ explicit reactor(std::shared_ptr<smp> smp, alien::instance& alien, unsigned id, reactor_backend_selector rbs, reactor_config cfg);
reactor(const reactor&) = delete;
~reactor();
void operator=(const reactor&) = delete;
sched_clock::duration uptime() {
diff --git a/include/seastar/core/smp.hh b/include/seastar/core/smp.hh
index 429b435200..630e427907 100644
--- a/include/seastar/core/smp.hh
+++ b/include/seastar/core/smp.hh
@@ -41,10 +41,16 @@ namespace seastar {
using shard_id = unsigned;
class smp_service_group;
class reactor_backend_selector;
+namespace alien {
+
+class instance;
+
+}
+
namespace internal {
unsigned smp_service_group_id(smp_service_group ssg) noexcept;
inline shard_id* this_shard_id_ptr() noexcept {
@@ -282,10 +288,11 @@ class smp_message_queue {
friend class smp;
};
class smp : public std::enable_shared_from_this<smp> {
+ alien::instance& _alien;
std::vector<posix_thread> _threads;
std::vector<std::function<void ()>> _thread_loops; // for dpdk
std::optional<boost::barrier> _all_event_loops_done;
struct qs_deleter {
void operator()(smp_message_queue** qs) const;
@@ -298,10 +305,11 @@ class smp : public std::enable_shared_from_this<smp> {
template <typename Func>
using returns_future = is_future<std::result_of_t<Func()>>;
template <typename Func>
using returns_void = std::is_same<std::result_of_t<Func()>, void>;
public:
+ explicit smp(alien::instance& alien) : _alien(alien) {}
static boost::program_options::options_description get_options_description();
void register_network_stacks();
void configure(boost::program_options::variables_map vm, reactor_config cfg = {});
void cleanup();
void cleanup_cpu();
diff --git a/src/core/alien.cc b/src/core/alien.cc
index 71b9a5a26d..3fd6b5da97 100644
--- a/src/core/alien.cc
+++ b/src/core/alien.cc
@@ -112,34 +112,34 @@ void message_queue::start() {
sm::make_derive("total_sent_messages", [this] { return _sent.value.load(); }, sm::description("Total number of sent messages")),
});
}
-void smp::qs_deleter::operator()(alien::message_queue* qs) const {
+void internal::qs_deleter::operator()(alien::message_queue* qs) const {
for (unsigned i = 0; i < count; i++) {
qs[i].~message_queue();
}
::operator delete[](qs);
}
-smp::qs smp::_qs;
-
-smp::qs smp::create_qs(const std::vector<reactor*>& reactors) {
+instance::qs instance::create_qs(const std::vector<reactor*>& reactors) {
auto queues = reinterpret_cast<alien::message_queue*>(operator new[] (sizeof(alien::message_queue) * reactors.size()));
for (unsigned i = 0; i < reactors.size(); i++) {
new (&queues[i]) alien::message_queue(reactors[i]);
}
- return qs{queues, smp::qs_deleter{static_cast<unsigned>(reactors.size())}};
+ return qs{queues, internal::qs_deleter{static_cast<unsigned>(reactors.size())}};
}
-bool smp::poll_queues() {
+bool instance::poll_queues() {
auto& queue = _qs[this_shard_id()];
return queue.process_incoming() != 0;
}
-bool smp::pure_poll_queues() {
+bool instance::pure_poll_queues() {
auto& queue = _qs[this_shard_id()];
return queue.pure_poll_rx();
}
+instance* internal::default_instance;
+
}
}
diff --git a/src/core/app-template.cc b/src/core/app-template.cc
index 31a51f8bbe..c2fe47b09a 100644
--- a/src/core/app-template.cc
+++ b/src/core/app-template.cc
@@ -19,10 +19,11 @@
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/
#include <seastar/core/app-template.hh>
#include <seastar/core/reactor.hh>
+#include <seastar/core/alien.hh>
#include <seastar/core/scollectd.hh>
#include <seastar/core/metrics_api.hh>
#include <boost/program_options.hpp>
#include <seastar/core/print.hh>
#include <seastar/util/log.hh>
@@ -44,14 +45,19 @@ reactor_config_from_app_config(app_template::config cfg) {
ret.task_quota = cfg.default_task_quota;
return ret;
}
app_template::app_template(app_template::config cfg)
- : _smp(std::make_shared<smp>())
+ : _alien(std::make_unique<alien::instance>())
+ , _smp(std::make_shared<smp>(*_alien))
, _cfg(std::move(cfg))
, _opts(_
cfg.name + " options")
, _conf_reader(get_default_configuration_reader()) {
+
+ if (!alien::internal::default_instance) {
+ alien::internal::default_instance = _alien.get();
+ }
_opts.add_options()
("help,h", "show help message")
;
_smp->register_network_stacks();
@@ -62,10 +68,12 @@ app_template::app_template(app_template::config cfg)
_opts_conf_file.add(log_cli::get_options_description());
_opts.add(_opts_conf_file);
}
+app_template::~app_template() = default;
+
app_template::configuration_reader app_template::get_default_configuration_reader() {
return [this] (bpo::variables_map& configuration) {
auto home = std::getenv("HOME");
if (home) {
std::ifstream ifs(std::string(home) + "/.config/seastar/seastar.conf");
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 25038a4e69..6bce0d8bdc 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -910,12 +910,13 @@ struct reactor::task_queue::indirect_compare {
bool operator()(const task_queue* tq1, const task_queue* tq2) const {
return tq1->_vruntime < tq2->_vruntime;
}
};
-reactor::reactor(std::shared_ptr<smp> smp, unsigned id, reactor_backend_selector rbs, reactor_config cfg)
+reactor::reactor(std::shared_ptr<smp> smp, alien::instance& alien, unsigned id, reactor_backend_selector rbs, reactor_config cfg)
: _smp(std::move(smp))
+ , _alien(alien)
, _cfg(cfg)
, _notify_eventfd(file_desc::eventfd(0, EFD_CLOEXEC))
, _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC))
, _id(id)
#ifdef HAVE_OSV
@@ -2472,15 +2473,15 @@ class reactor::smp_pollfn final : public reactor::pollfn {
reactor& _r;
public:
smp_pollfn(reactor& r) : _r(r) {}
virtual bool poll() final override {
return (smp::poll_queues() |
- alien::smp::poll_queues());
+ _r._alien.poll_queues());
}
virtual bool pure_poll() final override {
return (smp::pure_poll_queues() ||
- alien::smp::pure_poll_queues());
+ _r._alien.pure_poll_queues());
}
virtual bool try_enter_interrupt_mode() override {
// systemwide_memory_barrier() is very slow if run concurrently,
// so don't go to sleep if it is running now.
_r._sleeping.store(true, std::memory_order_relaxed);
@@ -3443,11 +3444,11 @@ void smp::start_all_queues()
for (unsigned c = 0; c < count; c++) {
if (c != this_shard_id()) {
_qs[c][this_shard_id()].start(c);
}
}
- alien::smp::_qs[this_shard_id()].start();
+ _alien._qs[this_shard_id()].start();
}
#ifdef SEASTAR_HAVE_DPDK
int dpdk_thread_adaptor(void* f)
@@ -3493,11 +3494,11 @@ void smp::allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_co
void *buf;
int r = posix_memalign(&buf, cache_line_size, sizeof(reactor));
assert(r == 0);
local_engine = reinterpret_cast<reactor*>(buf);
*internal::this_shard_id_ptr() = id;
- new (buf) reactor(this->shared_from_this(), id, std::move(rbs), cfg);
+ new (buf) reactor(this->shared_from_this(), _alien, id, std::move(rbs), cfg);
reactor_holder.reset(local_engine);
}
void smp::cleanup() {
smp::_threads = std::vector<posix_thread>();
@@ -3510,12 +3511,12 @@ void smp::cleanup_cpu() {
if (_qs) {
for(unsigned i = 0; i < smp::count; i++) {
_qs[i][cpuid].stop();
}
}
- if (alien::smp::_qs) {
- alien::smp::_qs[cpuid].stop();
+ if (_alien._qs) {
+ _alien._qs[cpuid].stop();
}
}
void smp::create_thread(std::function<void ()> thread_loop) {
if (_using_dpdk) {
@@ -4028,11 +4029,11 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
smp::_qs_owner[i] = reinterpret_cast<smp_message_queue*>(operator new[] (sizeof(smp_message_queue) * smp::count));
for (unsigned j = 0; j < smp::count; ++j) {
new (&smp::_qs_owner[i][j]) smp_message_queue(reactors[j], reactors[i]);
}
}
- alien::smp::_qs = alien::smp::create_qs(reactors);
+ _alien._qs = alien::instance::create_qs(reactors);
smp_queues_constructed.wait();
start_all_queues();
for (auto& dev_id : disk_config.device_ids()) {
assign_io_queue(0, dev_id);
}
diff --git a/tests/unit/alien_test.cc b/tests/unit/alien_test.cc
index 3aa527a4ae..77eba8d568 100644
--- a/tests/unit/alien_test.cc
+++ b/tests/unit/alien_test.cc
@@ -41,14 +41,15 @@ int main(int argc, char** argv)
{
// we need a protocol that both seastar and alien understand.
// and on which, a seastar future can wait.
int engine_ready_fd = eventfd(0, 0);
auto alien_done = file_desc::eventfd(0, 0);
+ seastar::app_template app;
// use the raw fd, because seastar engine want to take over the fds, if it
// polls on them.
- auto zim = std::async([engine_ready_fd,
+ auto zim = std::async([&app, engine_ready_fd,
alien_done=alien_done.get()] {
eventfd_t result = 0;
// wait until the seastar engine is ready
int r = ::eventfd_read(engine_ready_fd, &result);
if (r < 0) {
@@ -63,11 +64,11 @@ int main(int argc, char** argv)
counts.push_back(alien::submit_to(i, [i] {
return seastar::make_ready_future<int>(i);
}));
}
// std::future<void>
- alien::submit_to(0, [] {
+ alien::submit_to(app.alien(), 0, [] {
return seastar::make_ready_future<>();
}).wait();
int total = 0;
for (auto& count : counts) {
total += count.get();
@@ -75,11 +76,10 @@ int main(int argc, char** argv)
// i am done. dismiss the engine
::eventfd_write(alien_done, ALIEN_DONE);
return total;
});
- seastar::app_template app;
eventfd_t result = 0;
app.run(argc, argv, [&] {
return seastar::now().then([engine_ready_fd] {
// engine ready!
::eventfd_write(engine_ready_fd, ENGINE_READY);
--
2.31.1