[PATCH seastar v1] alien: prepare for multi-instance use

37 views
Skip to first unread message

Avi Kivity

<avi@scylladb.com>
unread,
Jun 3, 2021, 1:35:43 PM6/3/21
to seastar-dev@googlegroups.com
As a continuation for e6463df8a ("smp: allow having multiple instances
of the smp class"), alien is extended to support multiple smp instances.
This means several independent Seastar shard-sets running in the same
process, to allow single-process testing of clusters.

To do this, the alien::smp class which was just a container of some
static functions and variables is renamed to alien::instance, and
its functions and variables are made into non-static members.

The alien::run_on() and alien::submit_to() functions are modified to
accept an alien::instance parameter, so they know which Seastar instance
to target (the old signatures are retained but deprecated; they will target
the first Seastar intance created).

To obtain the alien::instance object, one can call app_template::alien().
To construct multiple Seastar instances, one can create several app_template
instances and run them.

Note: qs_deleter was made into a non-member class. As a private member
it was not std::is_default_constructible, which made the unique_ptr
constructor using it not accessible.
---
include/seastar/core/alien.hh | 76 +++++++++++++++++++++++-----
include/seastar/core/app-template.hh | 13 +++++
include/seastar/core/reactor.hh | 4 +-
include/seastar/core/smp.hh | 8 +++
src/core/alien.cc | 14 ++---
src/core/app-template.cc | 10 +++-
src/core/reactor.cc | 17 ++++---
tests/unit/alien_test.cc | 6 +--
8 files changed, 114 insertions(+), 34 deletions(-)

diff --git a/include/seastar/core/alien.hh b/include/seastar/core/alien.hh
index 43510d4ea1..055b9bd794 100644
--- a/include/seastar/core/alien.hh
+++ b/include/seastar/core/alien.hh
@@ -95,39 +95,72 @@ class message_queue {
}
size_t process_incoming();
bool pure_poll_rx() const;
};

-class smp {
- struct qs_deleter {
- unsigned count;
- qs_deleter(unsigned n = 0) : count(n) {}
- qs_deleter(const qs_deleter& d) : count(d.count) {}
- void operator()(message_queue* qs) const;
- };
- using qs = std::unique_ptr<message_queue[], qs_deleter>;
+namespace internal {
+
+struct qs_deleter {
+ unsigned count;
+ qs_deleter(unsigned n = 0) : count(n) {}
+ qs_deleter(const qs_deleter& d) : count(d.count) {}
+ void operator()(message_queue* qs) const;
+};
+
+}
+
+/// Represents the Seastar system from alien's point of view. In a normal
+/// system, there is just one instance, but for in-process clustering testing
+/// there may be more than one. Function such as run_on() direct messages to
+/// and (instance, shard) tuple.
+class instance {
+ using qs = std::unique_ptr<message_queue[], internal::qs_deleter>;
public:
static qs create_qs(const std::vector<reactor*>& reactors);
- static qs _qs;
- static bool poll_queues();
- static bool pure_poll_queues();
+ qs _qs;
+ bool poll_queues();
+ bool pure_poll_queues();
};

+namespace internal {
+
+extern instance* default_instance;
+
+}
+
/// Runs a function on a remote shard from an alien thread where engine() is not available.
///
+/// \param instance designates the Seastar instance to process the message
/// \param shard designates the shard to run the function on
/// \param func a callable to run on shard \c t. If \c func is a temporary object,
/// its lifetime will be extended by moving it. If \c func is a reference,
/// the caller must guarantee that it will survive the call.
/// \note the func must not throw and should return \c void. as we cannot identify the
/// alien thread, hence we are not able to post the fulfilled promise to the
/// message queue managed by the shard executing the alien thread which is
/// interested to the return value. Please use \c submit_to() instead, if
/// \c func throws.
template <typename Func>
+void run_on(instance& instance, unsigned shard, Func func) {
+ instance._qs[shard].submit(std::move(func));
+}
+
+/// Runs a function on a remote shard from an alien thread where engine() is not available.
+///
+/// \param shard designates the shard to run the function on
+/// \param func a callable to run on shard \c t. If \c func is a temporary object,
+/// its lifetime will be extended by moving it. If \c func is a reference,
+/// the caller must guarantee that it will survive the call.
+/// \note the func must not throw and should return \c void. as we cannot identify the
+/// alien thread, hence we are not able to post the fulfilled promise to the
+/// message queue managed by the shard executing the alien thread which is
+/// interested to the return value. Please use \c submit_to() instead, if
+/// \c func throws.
+template <typename Func>
+[[deprecated("Use run_on(instance&, unsigned shard, Func) instead")]]
void run_on(unsigned shard, Func func) {
- smp::_qs[shard].submit(std::move(func));
+ run_on(*internal::default_instance, shard, std::move(func));
}

namespace internal {
template<typename Func>
using return_value_t = typename futurize<std::invoke_result_t<Func>>::value_type;
@@ -155,21 +188,22 @@ struct return_type_of<Func, false> {
template <typename Func> using return_type_t = typename return_type_of<Func>::type;
}

/// Runs a function on a remote shard from an alien thread where engine() is not available.
///
+/// \param instance designates the Seastar instance to process the message
/// \param shard designates the shard to run the function on
/// \param func a callable to run on \c shard. If \c func is a temporary object,
/// its lifetime will be extended by moving it. If \c func is a reference,
/// the caller must guarantee that it will survive the call.
/// \return whatever \c func returns, as a \c std::future<>
/// \note the caller must keep the returned future alive until \c func returns
template<typename Func, typename T = internal::return_type_t<Func>>
-std::future<T> submit_to(unsigned shard, Func func) {
+std::future<T> submit_to(instance& instance, unsigned shard, Func func) {
std::promise<T> pr;
auto fut = pr.get_future();
- run_on(shard, [pr = std::move(pr), func = std::move(func)] () mutable {
+ run_on(instance, shard, [pr = std::move(pr), func = std::move(func)] () mutable {
// std::future returned via std::promise above.
(void)func().then_wrapped([pr = std::move(pr)] (auto&& result) mutable {
try {
internal::return_type_of<Func>::set(pr, result.get());
} catch (...) {
@@ -178,7 +212,21 @@ std::future<T> submit_to(unsigned shard, Func func) {
});
});
return fut;
}

+/// Runs a function on a remote shard from an alien thread where engine() is not available.
+///
+/// \param shard designates the shard to run the function on
+/// \param func a callable to run on \c shard. If \c func is a temporary object,
+/// its lifetime will be extended by moving it. If \c func is a reference,
+/// the caller must guarantee that it will survive the call.
+/// \return whatever \c func returns, as a \c std::future<>
+/// \note the caller must keep the returned future alive until \c func returns
+template<typename Func, typename T = internal::return_type_t<Func>>
+[[deprecated("Use submit_to(instance&, unsigned shard, Func) instead.")]]
+std::future<T> submit_to(unsigned shard, Func func) {
+ return submit_to(*internal::default_instance, shard, std::move(func));
+}
+
}
}
diff --git a/include/seastar/core/app-template.hh b/include/seastar/core/app-template.hh
index 6103f5f113..a462b6d17a 100644
--- a/include/seastar/core/app-template.hh
+++ b/include/seastar/core/app-template.hh
@@ -27,10 +27,16 @@
#include <seastar/core/sstring.hh>
#include <chrono>

namespace seastar {

+namespace alien {
+
+class instance;
+
+}
+
class app_template {
public:
struct config {
/// The name of the application.
///
@@ -58,10 +64,12 @@ class app_template {
config() {}
};

using configuration_reader = std::function<void (boost::program_options::variables_map&)>;
private:
+ // unique_ptr to avoid pulling in alien.hh.
+ std::unique_ptr<alien::instance> _alien;
// reactor destruction is asynchronous, so we must let the last reactor
// destroy the smp instance
std::shared_ptr<smp> _smp;
config _cfg;
boost::program_options::options_description _opts;
@@ -78,20 +86,25 @@ class app_template {
const char* help;
int max_count;
};
public:
explicit app_template(config cfg = config());
+ ~app_template();

boost::program_options::options_description& get_options_description();
boost::program_options::options_description& get_conf_file_options_description();
boost::program_options::options_description_easy_init add_options();
void add_positional_options(std::initializer_list<positional_option> options);
boost::program_options::variables_map& configuration();
int run_deprecated(int ac, char ** av, std::function<void ()>&& func);

void set_configuration_reader(configuration_reader conf_reader);

+ /// Obtains an alien::instance object that can be used to send messages
+ /// to Seastar shards from non-Seastar threads.
+ alien::instance& alien() { return *_alien; }
+
// Runs given function and terminates the application when the future it
// returns resolves. The value with which the future resolves will be
// returned by this function.
int run(int ac, char ** av, std::function<future<int> ()>&& func);

diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
index 461a4a7f2f..2b3a46f628 100644
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -99,10 +99,11 @@ namespace seastar {

using shard_id = unsigned;

namespace alien {
class message_queue;
+class instance;
}
class reactor;
inline
size_t iovec_len(const std::vector<iovec>& iov)
{
@@ -243,10 +244,11 @@ class reactor {
};
friend void io_completion::complete_with(ssize_t);

private:
std::shared_ptr<smp> _smp;
+ alien::instance& _alien;
reactor_config _cfg;
file_desc _notify_eventfd;
file_desc _task_quota_timer;
#ifdef HAVE_OSV
reactor_backend_osv _backend;
@@ -457,11 +459,11 @@ class reactor {
do_write_some(pollable_fd_state& fd, const void* buffer, size_t size);
future<size_t>
do_write_some(pollable_fd_state& fd, net::packet& p);
public:
static boost::program_options::options_description get_options_description(reactor_config cfg);
- explicit reactor(std::shared_ptr<smp> smp, unsigned id, reactor_backend_selector rbs, reactor_config cfg);
+ explicit reactor(std::shared_ptr<smp> smp, alien::instance& alien, unsigned id, reactor_backend_selector rbs, reactor_config cfg);
reactor(const reactor&) = delete;
~reactor();
void operator=(const reactor&) = delete;

sched_clock::duration uptime() {
diff --git a/include/seastar/core/smp.hh b/include/seastar/core/smp.hh
index 429b435200..630e427907 100644
--- a/include/seastar/core/smp.hh
+++ b/include/seastar/core/smp.hh
@@ -41,10 +41,16 @@ namespace seastar {
using shard_id = unsigned;

class smp_service_group;
class reactor_backend_selector;

+namespace alien {
+
+class instance;
+
+}
+
namespace internal {

unsigned smp_service_group_id(smp_service_group ssg) noexcept;

inline shard_id* this_shard_id_ptr() noexcept {
@@ -282,10 +288,11 @@ class smp_message_queue {

friend class smp;
};

class smp : public std::enable_shared_from_this<smp> {
+ alien::instance& _alien;
std::vector<posix_thread> _threads;
std::vector<std::function<void ()>> _thread_loops; // for dpdk
std::optional<boost::barrier> _all_event_loops_done;
struct qs_deleter {
void operator()(smp_message_queue** qs) const;
@@ -298,10 +305,11 @@ class smp : public std::enable_shared_from_this<smp> {
template <typename Func>
using returns_future = is_future<std::result_of_t<Func()>>;
template <typename Func>
using returns_void = std::is_same<std::result_of_t<Func()>, void>;
public:
+ explicit smp(alien::instance& alien) : _alien(alien) {}
static boost::program_options::options_description get_options_description();
void register_network_stacks();
void configure(boost::program_options::variables_map vm, reactor_config cfg = {});
void cleanup();
void cleanup_cpu();
diff --git a/src/core/alien.cc b/src/core/alien.cc
index 71b9a5a26d..3fd6b5da97 100644
--- a/src/core/alien.cc
+++ b/src/core/alien.cc
@@ -112,34 +112,34 @@ void message_queue::start() {
sm::make_derive("total_sent_messages", [this] { return _sent.value.load(); }, sm::description("Total number of sent messages")),
});
}


-void smp::qs_deleter::operator()(alien::message_queue* qs) const {
+void internal::qs_deleter::operator()(alien::message_queue* qs) const {
for (unsigned i = 0; i < count; i++) {
qs[i].~message_queue();
}
::operator delete[](qs);
}

-smp::qs smp::_qs;
-
-smp::qs smp::create_qs(const std::vector<reactor*>& reactors) {
+instance::qs instance::create_qs(const std::vector<reactor*>& reactors) {
auto queues = reinterpret_cast<alien::message_queue*>(operator new[] (sizeof(alien::message_queue) * reactors.size()));
for (unsigned i = 0; i < reactors.size(); i++) {
new (&queues[i]) alien::message_queue(reactors[i]);
}
- return qs{queues, smp::qs_deleter{static_cast<unsigned>(reactors.size())}};
+ return qs{queues, internal::qs_deleter{static_cast<unsigned>(reactors.size())}};
}

-bool smp::poll_queues() {
+bool instance::poll_queues() {
auto& queue = _qs[this_shard_id()];
return queue.process_incoming() != 0;
}

-bool smp::pure_poll_queues() {
+bool instance::pure_poll_queues() {
auto& queue = _qs[this_shard_id()];
return queue.pure_poll_rx();
}

+instance* internal::default_instance;
+
}
}
diff --git a/src/core/app-template.cc b/src/core/app-template.cc
index 31a51f8bbe..c2fe47b09a 100644
--- a/src/core/app-template.cc
+++ b/src/core/app-template.cc
@@ -19,10 +19,11 @@
* Copyright (C) 2014 Cloudius Systems, Ltd.
*/

#include <seastar/core/app-template.hh>
#include <seastar/core/reactor.hh>
+#include <seastar/core/alien.hh>
#include <seastar/core/scollectd.hh>
#include <seastar/core/metrics_api.hh>
#include <boost/program_options.hpp>
#include <seastar/core/print.hh>
#include <seastar/util/log.hh>
@@ -44,14 +45,19 @@ reactor_config_from_app_config(app_template::config cfg) {
ret.task_quota = cfg.default_task_quota;
return ret;
}

app_template::app_template(app_template::config cfg)
- : _smp(std::make_shared<smp>())
+ : _alien(std::make_unique<alien::instance>())
+ , _smp(std::make_shared<smp>(*_alien))
, _cfg(std::move(cfg))
, _opts(_cfg.name + " options")
, _conf_reader(get_default_configuration_reader()) {
+
+ if (!alien::internal::default_instance) {
+ alien::internal::default_instance = _alien.get();
+ }
_opts.add_options()
("help,h", "show help message")
;

_smp->register_network_stacks();
@@ -62,10 +68,12 @@ app_template::app_template(app_template::config cfg)
_opts_conf_file.add(log_cli::get_options_description());

_opts.add(_opts_conf_file);
}

+app_template::~app_template() = default;
+
app_template::configuration_reader app_template::get_default_configuration_reader() {
return [this] (bpo::variables_map& configuration) {
auto home = std::getenv("HOME");
if (home) {
std::ifstream ifs(std::string(home) + "/.config/seastar/seastar.conf");
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
index 25038a4e69..6bce0d8bdc 100644
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -910,12 +910,13 @@ struct reactor::task_queue::indirect_compare {
bool operator()(const task_queue* tq1, const task_queue* tq2) const {
return tq1->_vruntime < tq2->_vruntime;
}
};

-reactor::reactor(std::shared_ptr<smp> smp, unsigned id, reactor_backend_selector rbs, reactor_config cfg)
+reactor::reactor(std::shared_ptr<smp> smp, alien::instance& alien, unsigned id, reactor_backend_selector rbs, reactor_config cfg)
: _smp(std::move(smp))
+ , _alien(alien)
, _cfg(cfg)
, _notify_eventfd(file_desc::eventfd(0, EFD_CLOEXEC))
, _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC))
, _id(id)
#ifdef HAVE_OSV
@@ -2472,15 +2473,15 @@ class reactor::smp_pollfn final : public reactor::pollfn {
reactor& _r;
public:
smp_pollfn(reactor& r) : _r(r) {}
virtual bool poll() final override {
return (smp::poll_queues() |
- alien::smp::poll_queues());
+ _r._alien.poll_queues());
}
virtual bool pure_poll() final override {
return (smp::pure_poll_queues() ||
- alien::smp::pure_poll_queues());
+ _r._alien.pure_poll_queues());
}
virtual bool try_enter_interrupt_mode() override {
// systemwide_memory_barrier() is very slow if run concurrently,
// so don't go to sleep if it is running now.
_r._sleeping.store(true, std::memory_order_relaxed);
@@ -3443,11 +3444,11 @@ void smp::start_all_queues()
for (unsigned c = 0; c < count; c++) {
if (c != this_shard_id()) {
_qs[c][this_shard_id()].start(c);
}
}
- alien::smp::_qs[this_shard_id()].start();
+ _alien._qs[this_shard_id()].start();
}

#ifdef SEASTAR_HAVE_DPDK

int dpdk_thread_adaptor(void* f)
@@ -3493,11 +3494,11 @@ void smp::allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_co
void *buf;
int r = posix_memalign(&buf, cache_line_size, sizeof(reactor));
assert(r == 0);
local_engine = reinterpret_cast<reactor*>(buf);
*internal::this_shard_id_ptr() = id;
- new (buf) reactor(this->shared_from_this(), id, std::move(rbs), cfg);
+ new (buf) reactor(this->shared_from_this(), _alien, id, std::move(rbs), cfg);
reactor_holder.reset(local_engine);
}

void smp::cleanup() {
smp::_threads = std::vector<posix_thread>();
@@ -3510,12 +3511,12 @@ void smp::cleanup_cpu() {
if (_qs) {
for(unsigned i = 0; i < smp::count; i++) {
_qs[i][cpuid].stop();
}
}
- if (alien::smp::_qs) {
- alien::smp::_qs[cpuid].stop();
+ if (_alien._qs) {
+ _alien._qs[cpuid].stop();
}
}

void smp::create_thread(std::function<void ()> thread_loop) {
if (_using_dpdk) {
@@ -4028,11 +4029,11 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
smp::_qs_owner[i] = reinterpret_cast<smp_message_queue*>(operator new[] (sizeof(smp_message_queue) * smp::count));
for (unsigned j = 0; j < smp::count; ++j) {
new (&smp::_qs_owner[i][j]) smp_message_queue(reactors[j], reactors[i]);
}
}
- alien::smp::_qs = alien::smp::create_qs(reactors);
+ _alien._qs = alien::instance::create_qs(reactors);
smp_queues_constructed.wait();
start_all_queues();
for (auto& dev_id : disk_config.device_ids()) {
assign_io_queue(0, dev_id);
}
diff --git a/tests/unit/alien_test.cc b/tests/unit/alien_test.cc
index 3aa527a4ae..77eba8d568 100644
--- a/tests/unit/alien_test.cc
+++ b/tests/unit/alien_test.cc
@@ -41,14 +41,15 @@ int main(int argc, char** argv)
{
// we need a protocol that both seastar and alien understand.
// and on which, a seastar future can wait.
int engine_ready_fd = eventfd(0, 0);
auto alien_done = file_desc::eventfd(0, 0);
+ seastar::app_template app;

// use the raw fd, because seastar engine want to take over the fds, if it
// polls on them.
- auto zim = std::async([engine_ready_fd,
+ auto zim = std::async([&app, engine_ready_fd,
alien_done=alien_done.get()] {
eventfd_t result = 0;
// wait until the seastar engine is ready
int r = ::eventfd_read(engine_ready_fd, &result);
if (r < 0) {
@@ -63,11 +64,11 @@ int main(int argc, char** argv)
counts.push_back(alien::submit_to(i, [i] {
return seastar::make_ready_future<int>(i);
}));
}
// std::future<void>
- alien::submit_to(0, [] {
+ alien::submit_to(app.alien(), 0, [] {
return seastar::make_ready_future<>();
}).wait();
int total = 0;
for (auto& count : counts) {
total += count.get();
@@ -75,11 +76,10 @@ int main(int argc, char** argv)
// i am done. dismiss the engine
::eventfd_write(alien_done, ALIEN_DONE);
return total;
});

- seastar::app_template app;
eventfd_t result = 0;
app.run(argc, argv, [&] {
return seastar::now().then([engine_ready_fd] {
// engine ready!
::eventfd_write(engine_ready_fd, ENGINE_READY);
--
2.31.1

Avi Kivity

<avi@scylladb.com>
unread,
Jun 3, 2021, 1:38:30 PM6/3/21
to seastar-dev@googlegroups.com, Kefu Chai, Asias He
Kefu, as the author and main user of alien, please review this.

I plan to use this in Scylla too for white-box testing. The idea is to
create a small cluster in a single process, and use alien::submit_to()
to coordinate interesting states in the fake nodes.

Asias, when I say "I plan", I really mean that you will add
gossip/bootstrap/repair unit tests.

Avi Kivity

<avi@scylladb.com>
unread,
Jun 9, 2021, 8:25:08 AM6/9/21
to seastar-dev@googlegroups.com, Kefu Chai, Asias He, Nadav Har'El
Kefu, ping.


Nadav, please review this too.

Kefu Chai

<kchai@redhat.com>
unread,
Jun 9, 2021, 9:45:28 AM6/9/21
to Avi Kivity, seastar-dev, Asias He, Nadav Har'El
hi Avi,

sorry for the late reply. will take a look in this week.

Kefu Chai

<kchai@redhat.com>
unread,
Jun 14, 2021, 11:52:54 AM6/14/21
to Avi Kivity, seastar-dev, Asias He, Nadav Har'El
hi Avi,

thank you. LGTM.

the only thing i was not quite sure about was the name of "instance"
as a type's name. but since it is in the namespace of alien, and it's
quite readable as i read the code where it kicks in, and i cannot
think of a better name. so it's great!

Commit Bot

<bot@cloudius-systems.com>
unread,
Jun 14, 2021, 5:08:50 PM6/14/21
to seastar-dev@googlegroups.com, Avi Kivity
From: Avi Kivity <a...@scylladb.com>
Committer: Nadav Har'El <n...@scylladb.com>
Branch: master

alien: prepare for multi-instance use

As a continuation for e6463df8a ("smp: allow having multiple instances
of the smp class"), alien is extended to support multiple smp instances.
This means several independent Seastar shard-sets running in the same
process, to allow single-process testing of clusters.

To do this, the alien::smp class which was just a container of some
static functions and variables is renamed to alien::instance, and
its functions and variables are made into non-static members.

The alien::run_on() and alien::submit_to() functions are modified to
accept an alien::instance parameter, so they know which Seastar instance
to target (the old signatures are retained but deprecated; they will target
the first Seastar intance created).

To obtain the alien::instance object, one can call app_template::alien().
To construct multiple Seastar instances, one can create several app_template
instances and run them.

Note: qs_deleter was made into a non-member class. As a private member
it was not std::is_default_constructible, which made the unique_ptr
constructor using it not accessible.
Message-Id: <20210603173538...@scylladb.com>

---
diff --git a/include/seastar/core/alien.hh b/include/seastar/core/alien.hh
--- a/include/seastar/core/alien.hh
+++ b/include/seastar/core/alien.hh
@@ -97,23 +97,39 @@ public:
@@ -124,8 +140,25 @@ public:
/// interested to the return value. Please use \c submit_to() instead, if
/// \c func throws.
template <typename Func>
+void run_on(instance& instance, unsigned shard, Func func) {
+ instance._qs[shard].submit(std::move(func));
+}
+
+/// Runs a function on a remote shard from an alien thread where engine() is not available.
+///
+/// \param shard designates the shard to run the function on
+/// \param func a callable to run on shard \c t. If \c func is a temporary object,
+/// its lifetime will be extended by moving it. If \c func is a reference,
+/// the caller must guarantee that it will survive the call.
+/// \note the func must not throw and should return \c void. as we cannot identify the
+/// alien thread, hence we are not able to post the fulfilled promise to the
+/// message queue managed by the shard executing the alien thread which is
+/// interested to the return value. Please use \c submit_to() instead, if
+/// \c func throws.
+template <typename Func>
+[[deprecated("Use run_on(instance&, unsigned shard, Func) instead")]]
void run_on(unsigned shard, Func func) {
- smp::_qs[shard].submit(std::move(func));
+ run_on(*internal::default_instance, shard, std::move(func));
}

namespace internal {
@@ -157,17 +190,18 @@ template <typename Func> using return_type_t = typename return_type_of<Func>::ty

/// Runs a function on a remote shard from an alien thread where engine() is not available.
///
+/// \param instance designates the Seastar instance to process the message
/// \param shard designates the shard to run the function on
/// \param func a callable to run on \c shard. If \c func is a temporary object,
/// its lifetime will be extended by moving it. If \c func is a reference,
/// the caller must guarantee that it will survive the call.
/// \return whatever \c func returns, as a \c std::future<>
/// \note the caller must keep the returned future alive until \c func returns
template<typename Func, typename T = internal::return_type_t<Func>>
-std::future<T> submit_to(unsigned shard, Func func) {
+std::future<T> submit_to(instance& instance, unsigned shard, Func func) {
std::promise<T> pr;
auto fut = pr.get_future();
- run_on(shard, [pr = std::move(pr), func = std::move(func)] () mutable {
+ run_on(instance, shard, [pr = std::move(pr), func = std::move(func)] () mutable {
// std::future returned via std::promise above.
(void)func().then_wrapped([pr = std::move(pr)] (auto&& result) mutable {
try {
@@ -180,5 +214,19 @@ std::future<T> submit_to(unsigned shard, Func func) {
return fut;
}

+/// Runs a function on a remote shard from an alien thread where engine() is not available.
+///
+/// \param shard designates the shard to run the function on
+/// \param func a callable to run on \c shard. If \c func is a temporary object,
+/// its lifetime will be extended by moving it. If \c func is a reference,
+/// the caller must guarantee that it will survive the call.
+/// \return whatever \c func returns, as a \c std::future<>
+/// \note the caller must keep the returned future alive until \c func returns
+template<typename Func, typename T = internal::return_type_t<Func>>
+[[deprecated("Use submit_to(instance&, unsigned shard, Func) instead.")]]
+std::future<T> submit_to(unsigned shard, Func func) {
+ return submit_to(*internal::default_instance, shard, std::move(func));
+}
+
}
}
diff --git a/include/seastar/core/app-template.hh b/include/seastar/core/app-template.hh
--- a/include/seastar/core/app-template.hh
+++ b/include/seastar/core/app-template.hh
@@ -29,6 +29,12 @@

namespace seastar {

+namespace alien {
+
+class instance;
+
+}
+
class app_template {
public:
struct config {
@@ -60,6 +66,8 @@ public:

using configuration_reader = std::function<void (boost::program_options::variables_map&)>;
private:
+ // unique_ptr to avoid pulling in alien.hh.
+ std::unique_ptr<alien::instance> _alien;
// reactor destruction is asynchronous, so we must let the last reactor
// destroy the smp instance
std::shared_ptr<smp> _smp;
@@ -80,6 +88,7 @@ public:
};
public:
explicit app_template(config cfg = config());
+ ~app_template();

boost::program_options::options_description& get_options_description();
boost::program_options::options_description& get_conf_file_options_description();
@@ -90,6 +99,10 @@ public:

void set_configuration_reader(configuration_reader conf_reader);

+ /// Obtains an alien::instance object that can be used to send messages
+ /// to Seastar shards from non-Seastar threads.
+ alien::instance& alien() { return *_alien; }
+
// Runs given function and terminates the application when the future it
// returns resolves. The value with which the future resolves will be
// returned by this function.
diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh
--- a/include/seastar/core/reactor.hh
+++ b/include/seastar/core/reactor.hh
@@ -101,6 +101,7 @@ using shard_id = unsigned;

namespace alien {
class message_queue;
+class instance;
}
class reactor;
inline
@@ -245,6 +246,7 @@ public:

private:
std::shared_ptr<smp> _smp;
+ alien::instance& _alien;
reactor_config _cfg;
file_desc _notify_eventfd;
file_desc _task_quota_timer;
@@ -459,7 +461,7 @@ private:
do_write_some(pollable_fd_state& fd, net::packet& p);
public:
static boost::program_options::options_description get_options_description(reactor_config cfg);
- explicit reactor(std::shared_ptr<smp> smp, unsigned id, reactor_backend_selector rbs, reactor_config cfg);
+ explicit reactor(std::shared_ptr<smp> smp, alien::instance& alien, unsigned id, reactor_backend_selector rbs, reactor_config cfg);
reactor(const reactor&) = delete;
~reactor();
void operator=(const reactor&) = delete;
diff --git a/include/seastar/core/smp.hh b/include/seastar/core/smp.hh
--- a/include/seastar/core/smp.hh
+++ b/include/seastar/core/smp.hh
@@ -43,6 +43,12 @@ using shard_id = unsigned;
class smp_service_group;
class reactor_backend_selector;

+namespace alien {
+
+class instance;
+
+}
+
namespace internal {

unsigned smp_service_group_id(smp_service_group ssg) noexcept;
@@ -284,6 +290,7 @@ private:
};

class smp : public std::enable_shared_from_this<smp> {
+ alien::instance& _alien;
std::vector<posix_thread> _threads;
std::vector<std::function<void ()>> _thread_loops; // for dpdk
std::optional<boost::barrier> _all_event_loops_done;
@@ -300,6 +307,7 @@ class smp : public std::enable_shared_from_this<smp> {
template <typename Func>
using returns_void = std::is_same<std::result_of_t<Func()>, void>;
public:
+ explicit smp(alien::instance& alien) : _alien(alien) {}
static boost::program_options::options_description get_options_description();
void register_network_stacks();
void configure(boost::program_options::variables_map vm, reactor_config cfg = {});
diff --git a/src/core/alien.cc b/src/core/alien.cc
--- a/src/core/alien.cc
+++ b/src/core/alien.cc
@@ -114,32 +114,32 @@ void message_queue::start() {
--- a/src/core/app-template.cc
+++ b/src/core/app-template.cc
@@ -21,6 +21,7 @@

#include <seastar/core/app-template.hh>
#include <seastar/core/reactor.hh>
+#include <seastar/core/alien.hh>
#include <seastar/core/scollectd.hh>
#include <seastar/core/metrics_api.hh>
#include <boost/program_options.hpp>
@@ -46,10 +47,15 @@ reactor_config_from_app_config(app_template::config cfg) {
}

app_template::app_template(app_template::config cfg)
- : _smp(std::make_shared<smp>())
+ : _alien(std::make_unique<alien::instance>())
+ , _smp(std::make_shared<smp>(*_alien))
, _cfg(std::move(cfg))
, _opts(_cfg.name + " options")
, _conf_reader(get_default_configuration_reader()) {
+
+ if (!alien::internal::default_instance) {
+ alien::internal::default_instance = _alien.get();
+ }
_opts.add_options()
("help,h", "show help message")
;
@@ -64,6 +70,8 @@ app_template::app_template(app_template::config cfg)
_opts.add(_opts_conf_file);
}

+app_template::~app_template() = default;
+
app_template::configuration_reader app_template::get_default_configuration_reader() {
return [this] (bpo::variables_map& configuration) {
auto home = std::getenv("HOME");
diff --git a/src/core/reactor.cc b/src/core/reactor.cc
--- a/src/core/reactor.cc
+++ b/src/core/reactor.cc
@@ -912,8 +912,9 @@ struct reactor::task_queue::indirect_compare {
}
};

-reactor::reactor(std::shared_ptr<smp> smp, unsigned id, reactor_backend_selector rbs, reactor_config cfg)
+reactor::reactor(std::shared_ptr<smp> smp, alien::instance& alien, unsigned id, reactor_backend_selector rbs, reactor_config cfg)
: _smp(std::move(smp))
+ , _alien(alien)
, _cfg(cfg)
, _notify_eventfd(file_desc::eventfd(0, EFD_CLOEXEC))
, _task_quota_timer(file_desc::timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC))
@@ -2474,11 +2475,11 @@ class reactor::smp_pollfn final : public reactor::pollfn {
smp_pollfn(reactor& r) : _r(r) {}
virtual bool poll() final override {
return (smp::poll_queues() |
- alien::smp::poll_queues());
+ _r._alien.poll_queues());
}
virtual bool pure_poll() final override {
return (smp::pure_poll_queues() ||
- alien::smp::pure_poll_queues());
+ _r._alien.pure_poll_queues());
}
virtual bool try_enter_interrupt_mode() override {
// systemwide_memory_barrier() is very slow if run concurrently,
@@ -3445,7 +3446,7 @@ void smp::start_all_queues()
_qs[c][this_shard_id()].start(c);
}
}
- alien::smp::_qs[this_shard_id()].start();
+ _alien._qs[this_shard_id()].start();
}

#ifdef SEASTAR_HAVE_DPDK
@@ -3495,7 +3496,7 @@ void smp::allocate_reactor(unsigned id, reactor_backend_selector rbs, reactor_co
assert(r == 0);
local_engine = reinterpret_cast<reactor*>(buf);
*internal::this_shard_id_ptr() = id;
- new (buf) reactor(this->shared_from_this(), id, std::move(rbs), cfg);
+ new (buf) reactor(this->shared_from_this(), _alien, id, std::move(rbs), cfg);
reactor_holder.reset(local_engine);
}

@@ -3512,8 +3513,8 @@ void smp::cleanup_cpu() {
_qs[i][cpuid].stop();
}
}
- if (alien::smp::_qs) {
- alien::smp::_qs[cpuid].stop();
+ if (_alien._qs) {
+ _alien._qs[cpuid].stop();
}
}

@@ -4010,7 +4011,7 @@ void smp::configure(boost::program_options::variables_map configuration, reactor
new (&smp::_qs_owner[i][j]) smp_message_queue(reactors[j], reactors[i]);
}
}
- alien::smp::_qs = alien::smp::create_qs(reactors);
+ _alien._qs = alien::instance::create_qs(reactors);
smp_queues_constructed.wait();
start_all_queues();
for (auto& dev_id : disk_config.device_ids()) {
diff --git a/tests/unit/alien_test.cc b/tests/unit/alien_test.cc
--- a/tests/unit/alien_test.cc
+++ b/tests/unit/alien_test.cc
@@ -43,10 +43,11 @@ int main(int argc, char** argv)
// and on which, a seastar future can wait.
int engine_ready_fd = eventfd(0, 0);
auto alien_done = file_desc::eventfd(0, 0);
+ seastar::app_template app;

// use the raw fd, because seastar engine want to take over the fds, if it
// polls on them.
- auto zim = std::async([engine_ready_fd,
+ auto zim = std::async([&app, engine_ready_fd,
alien_done=alien_done.get()] {
eventfd_t result = 0;
// wait until the seastar engine is ready
@@ -65,7 +66,7 @@ int main(int argc, char** argv)
}));
}
// std::future<void>
- alien::submit_to(0, [] {
+ alien::submit_to(app.alien(), 0, [] {
return seastar::make_ready_future<>();
}).wait();
int total = 0;
@@ -77,7 +78,6 @@ int main(int argc, char** argv)

Nadav Har'El

<nyh@scylladb.com>
unread,
Jun 14, 2021, 5:10:06 PM6/14/21
to Avi Kivity, seastar-dev
Thanks. I committed.

Your patch left two deprecated calls in tests/alien_test.cc. For example:

../../tests/unit/alien_test.cc:66:14: warning: ‘std::future<T> seastar::alien::submit_to(unsigned int, Func) [with Func = main(int, char**)::<lambda()>::<lambda()>; T = int]’ is deprecated: Use submit_to(instance&, unsigned shard, Func) instead. [-Wdeprecated-declarations]
   66 |             }));

It would be great if you can fix these warnings in a follow-up patch.

--
Nadav Har'El
n...@scylladb.com


--
You received this message because you are subscribed to the Google Groups "seastar-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/20210603173538.3843447-1-avi%40scylladb.com.

Avi Kivity

<avi@scylladb.com>
unread,
Jun 15, 2021, 3:29:23 AM6/15/21
to Nadav Har'El, seastar-dev

Oops, I planned to fix them up in the patch itself.

Reply all
Reply to author
Forward
0 new messages