[PATCH 1/1] coroutines: add parallel_for_each

30 views
Skip to first unread message

Benny Halevy

<bhalevy@scylladb.com>
unread,
Feb 23, 2022, 5:19:22 AM2/23/22
to seastar-dev@googlegroups.com, Benny Halevy
Signed-off-by: Benny Halevy <bha...@scylladb.com>
---
demos/coroutines_demo.cc | 11 +-
doc/tutorial.md | 17 +++
.../seastar/coroutine/parallel_for_each.hh | 126 ++++++++++++++++++
tests/unit/coroutines_test.cc | 27 ++++
4 files changed, 180 insertions(+), 1 deletion(-)
create mode 100644 include/seastar/coroutine/parallel_for_each.hh

diff --git a/demos/coroutines_demo.cc b/demos/coroutines_demo.cc
index 58c881c9..06e1ab47 100644
--- a/demos/coroutines_demo.cc
+++ b/demos/coroutines_demo.cc
@@ -38,12 +38,14 @@ int main(int argc, char** argv) {
#include <seastar/core/sleep.hh>
#include <seastar/core/seastar.hh>
#include <seastar/core/loop.hh>
+#include <seastar/core/sstring.hh>
+#include <seastar/coroutine/parallel_for_each.hh>

int main(int argc, char** argv) {
seastar::app_template app;
app.run(argc, argv, [] () -> seastar::future<> {
std::cout << "this is a completely useless program\nplease stand by...\n";
- auto f = seastar::parallel_for_each(std::vector<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, [] (int i) -> seastar::future<> {
+ auto f = seastar::coroutine::parallel_for_each(std::vector<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }, [] (int i) -> seastar::future<> {
co_await seastar::sleep(std::chrono::seconds(i));
std::cout << i << "\n";
});
@@ -55,6 +57,13 @@ int main(int argc, char** argv) {
co_await out.flush();
co_await out.close();

+ bool all_exist = true;
+ std::vector<seastar::sstring> filenames = { "useless_file.txt", "non_existing" };
+ co_await seastar::coroutine::parallel_for_each(filenames, [&all_exist] (const seastar::sstring& name) -> seastar::future<> {
+ all_exist &= co_await seastar::file_exists(name);
+ });
+ std::cout << (all_exist ? "" : "not ") << "all files exist" << std::endl;
+
co_await std::move(f);
std::cout << "done\n";
});
diff --git a/doc/tutorial.md b/doc/tutorial.md
index b9f27731..0d458fbb 100644
--- a/doc/tutorial.md
+++ b/doc/tutorial.md
@@ -440,6 +440,23 @@ Here, two read() calls are launched concurrently. The coroutine is paused until

Note that `all` waits for all of its sub-computations, even if some throw an exception. If an exception is thrown, it is propagated to the calling coroutine.

+The `seastar::coroutine::parallel_for_each` class template allows a coroutine to fork into several concurrently executing function invocations (or Seastar fibers, see below) over a range of elements and join again when they complete. Consider this example:
+
+```cpp
+#include <seastar/core/coroutines.hh>
+#include <seastar/coroutine/parallel_for_each.hh>
+
+seastar::future<bool> all_exist(std::vector<sstring> filenames) {
+ bool res = true;
+ co_await seastar::coroutine::parallel_for_each(filenames, [&res] (const seastar::sstring& name) -> seastar::future<> {
+ res &= co_await seastar::file_exists(name);
+ });
+ co_return res;
+}
+```
+
+Here, the lambda function passed to parallel_for_each is launched concurrently for each element in the filenames vector. The coroutine is paused until all calls complete.
+
## Breaking up long running computations

Seastar is generally used for I/O, and coroutines usually launch I/O operations and consume their results, with little computation in between. But occasionally a long running computation is needed, and this risks preventing the reactor from performing I/O and scheduling other tasks.
diff --git a/include/seastar/coroutine/parallel_for_each.hh b/include/seastar/coroutine/parallel_for_each.hh
new file mode 100644
index 00000000..41ad83cf
--- /dev/null
+++ b/include/seastar/coroutine/parallel_for_each.hh
@@ -0,0 +1,126 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2022-present ScyllaDB
+ */
+
+#pragma once
+
+#include <concepts>
+#include <type_traits>
+#include <tuple>
+#include <seastar/core/coroutine.hh>
+
+namespace seastar::coroutine {
+
+/// Invoke a function on all elements in a range in paralell and wait for all futures to complete in a coroutine.
+///
+/// `parallel_for_each` can be used to launch a function concurrently
+/// on all elements in a given range and wait for all of them to complete.
+/// Waiting is performend by `co_await` and returns a future.
+///
+/// If one or more of the function invocations resolves to an exception
+/// then the one of the exceptions is re-thrown.
+/// All of the futures are waited for, even in the case of exceptions.
+///
+/// Example
+///
+/// ```
+/// future<int> sum_of_squares(std::vector<int> v) {
+/// int sum = 0;
+/// return co_await parallel_for_each(v, [&sum] (int& x) {
+/// sum += x * x;
+/// });
+/// co_return sum;
+/// };
+/// ```
+class [[nodiscard("must co_await an parallel_for_each() object")]] parallel_for_each {
+ using coroutine_handle_t = SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<void>;
+
+ std::vector<future<>> _futures;
+ std::vector<future<>>::iterator _next;
+ std::exception_ptr _ex;
+ coroutine_handle_t _when_ready;
+
+ // Return true iff all futures were consumed
+ bool consume_next() noexcept {
+ while (_next != _futures.end()) {
+ auto& fut = *_next;
+ if (!fut.available()) {
+ return false;
+ }
+ if (fut.failed()) {
+ _ex = fut.get_exception();
+ }
+ ++_next;
+ }
+ return true;
+ }
+
+public:
+ template <typename Iterator, typename Sentinel, typename Func>
+ requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
+ && requires (Func f, Iterator it) { { futurize_invoke(f, *it) } -> std::same_as<future<>>; }
+ explicit parallel_for_each(Iterator begin, Sentinel end, Func&& func) {
+ _futures.reserve(std::distance(begin, end));
+ for (; begin != end; ++begin) {
+ _futures.push_back(futurize_invoke(func, *begin));
+ }
+ _next = _futures.begin();
+ consume_next();
+ }
+
+ template <typename Container, typename Func>
+ explicit parallel_for_each(Container c, Func&& func)
+ : parallel_for_each(std::begin(c), std::end(c), std::forward<Func>(func))
+ { }
+
+ bool await_ready() const {
+ return _next == _futures.end();
+ }
+
+ template<typename T>
+ void await_suspend(SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<T> h) {
+ _when_ready = h;
+ run_and_dispose();
+ }
+
+ void await_resume() {
+ if (_ex) {
+ std::rethrow_exception(std::move(_ex));
+ }
+ }
+
+ void run_and_dispose() noexcept {
+ if (consume_next()) {
+ _when_ready.resume();
+ // No need to delete, this is allocated on the coroutine frame
+ } else {
+ auto cur = _next;
+ _next++;
+ *cur = cur->then_wrapped([this] (future<> f) {
+ if (f.failed()) {
+ _ex = f.get_exception();
+ }
+ run_and_dispose();
+ });
+ }
+ }
+};
+
+}
\ No newline at end of file
diff --git a/tests/unit/coroutines_test.cc b/tests/unit/coroutines_test.cc
index 23de9ac4..d1a637ed 100644
--- a/tests/unit/coroutines_test.cc
+++ b/tests/unit/coroutines_test.cc
@@ -20,6 +20,7 @@
*/

#include <exception>
+#include <numeric>

#include <seastar/core/future-util.hh>
#include <seastar/testing/test_case.hh>
@@ -41,6 +42,7 @@ SEASTAR_TEST_CASE(test_coroutines_not_compiled_in) {
#include <seastar/coroutine/all.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/switch_to.hh>
+#include <seastar/coroutine/parallel_for_each.hh>

namespace {

@@ -414,4 +416,29 @@ SEASTAR_TEST_CASE(generator)

#endif

+SEASTAR_TEST_CASE(test_parallel_for_each) {
+ std::array<int, 3> values = { 3, 1, 4 };
+ int sum_of_squares = 0;
+
+ int expected = std::accumulate(values.begin(), values.end(), 0, [] (int sum, int x) {
+ return sum + x * x;
+ });
+
+ // Test all-ready futures
+ co_await coroutine::parallel_for_each(values, [&sum_of_squares] (int x) {
+ sum_of_squares += x * x;
+ });
+ BOOST_REQUIRE_EQUAL(sum_of_squares, expected); // the test will hang if it doesn't work.
+
+ // Test non-ready futures
+ sum_of_squares = 0;
+ co_await coroutine::parallel_for_each(values, [&sum_of_squares] (int x) -> future<> {
+ if (x > 1) {
+ co_await sleep(std::chrono::milliseconds(x));
+ }
+ sum_of_squares += x * x;
+ });
+ BOOST_REQUIRE_EQUAL(sum_of_squares, expected); // the test will hang if it doesn't work.
+}
+
#endif
--
2.34.1

Avi Kivity

<avi@scylladb.com>
unread,
Feb 23, 2022, 7:36:09 AM2/23/22
to Benny Halevy, seastar-dev@googlegroups.com
On 2/23/22 12:19, Benny Halevy wrote:


<no explanation>
Since this is C++20 only, let's see if we can use std::ranges::range.
There may be problems with clang.


Better to use named concepts like std::invocable rather than open-coding
them.


> + _futures.reserve(std::distance(begin, end));
> + for (; begin != end; ++begin) {
> + _futures.push_back(futurize_invoke(func, *begin));
> + }
> + _next = _futures.begin();
> + consume_next();
> + }
> +
> + template <typename Container, typename Func>
> + explicit parallel_for_each(Container c, Func&& func)
> + : parallel_for_each(std::begin(c), std::end(c), std::forward<Func>(func))
> + { }


Let's merge this with the other constructor, the user can create a range
if they have an iterator/sentinel pair. The natural way is to use a range.
We should avoid this continuation!


Instead of a vector of futures, allocator a vector of something derived
from continuation_base, that has a pointer to the coroutine. It can call
consume_next and wake the coroutine if it's done.
Need to test: exceptions, empty range, different completion orders.


Gleb Natapov

<gleb@scylladb.com>
unread,
Feb 23, 2022, 7:41:24 AM2/23/22
to Avi Kivity, Benny Halevy, seastar-dev@googlegroups.com
On Wed, Feb 23, 2022 at 02:36:05PM +0200, Avi Kivity wrote:
> On 2/23/22 12:19, Benny Halevy wrote:
>
>
> <no explanation>
>
And what is the benefit over existing one?
> --
> You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/8cd8a019-12b0-e071-87d2-3a693fcd6a20%40scylladb.com.

--
Gleb.

Benny Halevy

<bhalevy@scylladb.com>
unread,
Feb 23, 2022, 8:09:09 AM2/23/22
to Avi Kivity, seastar-dev@googlegroups.com
On Wed, 2022-02-23 at 14:36 +0200, Avi Kivity wrote:
> On 2/23/22 12:19, Benny Halevy wrote:
>
>
> <no explanation>

yes, sorry, will add.
ok, will try.

>
>
> Better to use named concepts like std::invocable rather than open-coding
> them.

I tried using std::invocabale but it got hairy with an array<int, N>
since comile failed on Iterator::value_type, claiming that Iterator was an `int *`

>
>
> > +        _futures.reserve(std::distance(begin, end));
> > +        for (; begin != end; ++begin) {
> > +            _futures.push_back(futurize_invoke(func, *begin));
> > +        }
> > +        _next = _futures.begin();
> > +        consume_next();
> > +    }
> > +
> > +    template <typename Container, typename Func>
> > +    explicit parallel_for_each(Container c, Func&& func)
> > +            : parallel_for_each(std::begin(c), std::end(c), std::forward<Func>(func))
> > +    { }
>
>
> Let's merge this with the other constructor, the user can create a range
> if they have an iterator/sentinel pair. The natural way is to use a range.

Ok, make sense.
Hmm, it will probably waste more memory, but do it once.
Will take a stab at it.
true, true, and the second case above generates completion order different than the natural one.
I can add randomness to test different completion order every time, based on the random seed.

Benny Halevy

<bhalevy@scylladb.com>
unread,
Feb 27, 2022, 1:32:16 PM2/27/22
to seastar-dev@googlegroups.com, Benny Halevy, Avi Kivity, Gleb Natapov
This patch adds support for coroutine::parallel_for_each
that can be used in coroutines instead of
co_await seastar::parallel_for_each.

This implementation minimizes memory allocation
by deferring the allocation of a vector of futures
to wait on, till any of the futures is unavailable,
similar to the legacy parallel_for_each implementation.

Also it waits on the unavailable futures from back to front,
similar to to legacy implementation, to minimize
the number of callbacks required.

The advantage over the legacy implementation is
that the parallel_for_each object itself is a
seastar task that is used to wait on unavailable
futures with no need to allocate a parallel_for_each_state.

This is possible since coroutine::parallel_for_each
is defined as [[nodiscard]] and the caller must co_await
for it, making sure it remains valid until all the futures
it's waiting on are resolved.

Test: coroutines_test(debug, release)
w/ clang++ 12.0.1, g++ 11.2.1, c++20

Signed-off-by: Benny Halevy <bha...@scylladb.com>
---
demos/coroutines_demo.cc | 11 +-
doc/tutorial.md | 17 ++
.../seastar/coroutine/parallel_for_each.hh | 165 ++++++++++++++++++
tests/unit/coroutines_test.cc | 97 ++++++++++
4 files changed, 289 insertions(+), 1 deletion(-)
create mode 100644 include/seastar/coroutine/parallel_for_each.hh

In v2:
- Improved commitlog message
- Use std::ranges::range
- Use parallel_for_each object as continuation
- Allocate _futures only if there's an unavialable future
- Wait for _futures in reverse order
- Tested with both g++ and clang++
- Added unit test for empty range, subrange, and exception throwing.
index 00000000..6e811a83
--- /dev/null
+++ b/include/seastar/coroutine/parallel_for_each.hh
@@ -0,0 +1,165 @@
+/*
+ * This file is open source software, licensed to you under the terms
+ * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
+ * distributed with this work for additional information regarding copyright
+ * ownership. You may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/*
+ * Copyright (C) 2022-present ScyllaDB
+ */
+
+#pragma once
+
+#include <ranges>
+
+template <typename Func>
+class [[nodiscard("must co_await an parallel_for_each() object")]] parallel_for_each final : public continuation_base<> {
+ using coroutine_handle_t = SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<void>;
+
+ Func _func;
+ std::vector<future<>> _futures;
+ std::exception_ptr _ex;
+ coroutine_handle_t _when_ready;
+ task* _waiting_task = nullptr;
+
+ // Consume futures in reverse order.
+ // Since futures at the front are expected
+ // to become ready before futures at the back,
+ // therefore it is less likely we will have
+ // to wait on them, after the back futures
+ // become available.
+ //
+ // Return true iff all futures were consumed.
+ bool consume_next() noexcept {
+ while (!_futures.empty()) {
+ auto& fut = _futures.back();
+ if (!fut.available()) {
+ return false;
+ }
+ if (fut.failed()) {
+ _ex = fut.get_exception();
+ }
+ _futures.pop_back();
+ }
+ return true;
+ }
+
+ void set_callback() noexcept {
+ auto fut = std::move(_futures.back());
+ _futures.pop_back();
+ // To reuse `this` as continuation_base<>
+ // we must reset _state, to allow setting
+ // it again.
+ this->_state.~future_state();
+ new (&this->_state) future_state();
+ seastar::internal::set_callback(fut, this);
+ }
+
+ void resume_or_set_callback() noexcept {
+ if (__builtin_expect(consume_next(), false)) {
+ _when_ready.resume();
+ } else {
+ set_callback();
+ }
+ }
+
+public:
+ // clang 13.0.1 doesn't support subrange
+ // so provide also a Iterator/Sentinel based constructor.
+ // See https://github.com/llvm/llvm-project/issues/46091
+ template <typename Iterator, typename Sentinel>
+ requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
+ explicit parallel_for_each(Iterator begin, Sentinel end, Func&& func)
+ : _func(std::move(func))
+ {
+ for (auto it = begin; it != end; ++it) {
+ auto fut = futurize_invoke(_func, *it);
+ if (fut.available()) {
+ if (fut.failed()) {
+ _ex = fut.get_exception();
+ }
+ continue;
+ }
+ if (_futures.empty()) {
+ _futures.reserve(std::distance(it, end));
+ }
+ _futures.push_back(std::move(fut));
+ }
+ }
+
+ template <std::ranges::range Range>
+ requires std::invocable<Func, std::ranges::range_value_t<Range>>
+ explicit parallel_for_each(const Range& range, Func&& func)
+ : parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<Func>(func))
+ { }
+
+ bool await_ready() const {
+ if (_futures.empty()) {
+ await_resume();
+ return true;
+ }
+ return false;
+ }
+
+ template<typename T>
+ void await_suspend(SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<T> h) {
+ _when_ready = h;
+ _waiting_task = &h.promise();
+ resume_or_set_callback();
+ }
+
+ void await_resume() const {
+ if (_ex) {
+ std::rethrow_exception(_ex);
+ }
+ }
+
+ virtual void run_and_dispose() noexcept override {
+ if (__builtin_expect(this->_state.failed(), false)) {
+ _ex = std::move(this->_state).get_exception();
+ }
+ resume_or_set_callback();
+ }
+
+ virtual task* waiting_task() noexcept override {
+ return _waiting_task;
+ }
+};
+
+}
\ No newline at end of file
diff --git a/tests/unit/coroutines_test.cc b/tests/unit/coroutines_test.cc
index 23de9ac4..5fb4e3df 100644
--- a/tests/unit/coroutines_test.cc
+++ b/tests/unit/coroutines_test.cc
@@ -20,11 +20,13 @@
*/

#include <exception>
+#include <numeric>

#include <seastar/core/future-util.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/core/sleep.hh>
#include <seastar/util/later.hh>
+#include <seastar/testing/random.hh>

using namespace seastar;
using namespace std::chrono_literals;
@@ -41,6 +43,7 @@ SEASTAR_TEST_CASE(test_coroutines_not_compiled_in) {
#include <seastar/coroutine/all.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/switch_to.hh>
+#include <seastar/coroutine/parallel_for_each.hh>

namespace {

@@ -414,4 +417,98 @@ SEASTAR_TEST_CASE(generator)

#endif

+SEASTAR_TEST_CASE(test_parallel_for_each_empty) {
+ std::vector<int> values;
+ int count = 0;
+
+ co_await coroutine::parallel_for_each(values, [&] (int x) {
+ ++count;
+ });
+ BOOST_REQUIRE_EQUAL(count, 0); // the test will hang if it doesn't work.
+}
+
+SEASTAR_TEST_CASE(test_parallel_for_each_exception) {
+ std::array<int, 5> values = { 10, 2, 1, 4, 8 };
+ int count = 0;
+ auto& eng = testing::local_random_engine;
+ auto dist = std::uniform_int_distribution<unsigned>();
+ int throw_at = dist(eng) % values.size();
+
+ BOOST_TEST_MESSAGE(fmt::format("Will throw at value #{}/{}", throw_at, values.size()));
+
+ auto f0 = coroutine::parallel_for_each(values, [&] (int x) {
+ if (count++ == throw_at) {
+ BOOST_TEST_MESSAGE("throw");
+ throw std::runtime_error("test");
+ }
+ });
+ // An exception thrown by the functor must be propagated
+ BOOST_REQUIRE_THROW(co_await std::move(f0), std::runtime_error);
+ // Functor must be called on all values, even if there's an exception
+ BOOST_REQUIRE_EQUAL(count, values.size());
+
+ count = 0;
+ throw_at = dist(eng) % values.size();
+ BOOST_TEST_MESSAGE(fmt::format("Will throw at value #{}/{}", throw_at, values.size()));
+
+ auto f1 = coroutine::parallel_for_each(values, [&] (int x) -> future<> {
+ co_await sleep(std::chrono::milliseconds(x));
+ if (count++ == throw_at) {
+ throw std::runtime_error("test");
+ }
+ });
+ BOOST_REQUIRE_THROW(co_await std::move(f1), std::runtime_error);
+ BOOST_REQUIRE_EQUAL(count, values.size());
+}
+
+SEASTAR_TEST_CASE(test_parallel_for_each) {
+ std::vector<int> values = { 3, 1, 4 };
+ int sum_of_squares = 0;
+
+ int expected = std::accumulate(values.begin(), values.end(), 0, [] (int sum, int x) {
+ return sum + x * x;
+ });
+
+ // Test all-ready futures
+ co_await coroutine::parallel_for_each(values, [&sum_of_squares] (int x) {
+ sum_of_squares += x * x;
+ });
+ BOOST_REQUIRE_EQUAL(sum_of_squares, expected);
+
+ // Test non-ready futures
+ sum_of_squares = 0;
+ co_await coroutine::parallel_for_each(values, [&sum_of_squares] (int x) -> future<> {
+ if (x > 1) {
+ co_await sleep(std::chrono::milliseconds(x));
+ }
+ sum_of_squares += x * x;
+ });
+ BOOST_REQUIRE_EQUAL(sum_of_squares, expected);
+
+ // Test legacy subrange
+ sum_of_squares = 0;
+ co_await coroutine::parallel_for_each(values.begin(), values.end() - 1, [&sum_of_squares] (int x) -> future<> {
+ if (x > 1) {
+ co_await sleep(std::chrono::milliseconds(x));
+ }
+ sum_of_squares += x * x;
+ });
+ BOOST_REQUIRE_EQUAL(sum_of_squares, 10);
+
+ // clang 13.0.1 doesn't support subrange
+ // so provide also a Iterator/Sentinel based constructor.
+ // See https://github.com/llvm/llvm-project/issues/46091
+#ifndef __clang__
+ // Test std::ranges::subrange
+ sum_of_squares = 0;
+ co_await coroutine::parallel_for_each(std::ranges::subrange(values.begin(), values.end() - 1), [&sum_of_squares] (int x) -> future<> {
+ if (x > 1) {
+ co_await sleep(std::chrono::milliseconds(x));
+ }
+ sum_of_squares += x * x;
+ });
+ BOOST_REQUIRE_EQUAL(sum_of_squares, 10);
+#endif

Benny Halevy

<bhalevy@scylladb.com>
unread,
Mar 2, 2022, 5:37:10 AM3/2/22
to seastar-dev@googlegroups.com, Avi Kivity, Gleb Natapov
ping

Benny Halevy

<bhalevy@scylladb.com>
unread,
Mar 13, 2022, 11:14:52 AM3/13/22
to seastar-dev@googlegroups.com, Avi Kivity, Gleb Natapov
ping^

Avi Kivity

<avi@scylladb.com>
unread,
Mar 16, 2022, 8:58:22 AM3/16/22
to Benny Halevy, seastar-dev@googlegroups.com, Gleb Natapov
Needs constraint. Since the constraint depends on the iterators, maybe
it should be in a deduction guide.


> +class [[nodiscard("must co_await an parallel_for_each() object")]] parallel_for_each final : public continuation_base<> {


It will be nicer to use private inheritance.


> + using coroutine_handle_t = SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<void>;
> +
> + Func _func;
> + std::vector<future<>> _futures;


I (and Nadav) hate to bring boost into it, but boost::small_vector can
save an allocation here.
Cleaner to use operator=().


> + seastar::internal::set_callback(fut, this);


(set_callback should consume its input since it will be unusable after
it runs)


> + }
> +
> + void resume_or_set_callback() noexcept {
> + if (__builtin_expect(consume_next(), false)) {


The __builtin_expect is wrong. It will return true in an least 1/n
calls, often more, and n can be small.


> + _when_ready.resume();
> + } else {
> + set_callback();
> + }
> + }
> +
> +public:
> + // clang 13.0.1 doesn't support subrange
> + // so provide also a Iterator/Sentinel based constructor.
> + // See https://github.com/llvm/llvm-project/issues/46091
> + template <typename Iterator, typename Sentinel>
> + requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)


Constraint for Func.


> + explicit parallel_for_each(Iterator begin, Sentinel end, Func&& func)
> + : _func(std::move(func))
> + {
> + for (auto it = begin; it != end; ++it) {
> + auto fut = futurize_invoke(_func, *it);
> + if (fut.available()) {
> + if (fut.failed()) {
> + _ex = fut.get_exception();
> + }
> + continue;
> + }
> + if (_futures.empty()) {
> + _futures.reserve(std::distance(it, end));
> + }


This can fail. We can't handle it, so must abort.


std::distance() should not be used on input iterators, they are single pass.


> + _futures.push_back(std::move(fut));
> + }
> + }
> +
> + template <std::ranges::range Range>
> + requires std::invocable<Func, std::ranges::range_value_t<Range>>
> + explicit parallel_for_each(const Range& range, Func&& func)
> + : parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<Func>(func))
> + { }
> +
> + bool await_ready() const {
> + if (_futures.empty()) {
> + await_resume();


I don't know if throwing from await_ready is legal. Better to throw from
the constructor.


> + return true;
> + }
> + return false;
> + }
> +
> + template<typename T>
> + void await_suspend(SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<T> h) {
> + _when_ready = h;
> + _waiting_task = &h.promise();


Can we compute this in waiting_task() and remove the member?


> + resume_or_set_callback();
> + }
> +
> + void await_resume() const {
> + if (_ex) {


[[unlikely]]


> + std::rethrow_exception(_ex);


std::move


> + }
> + }
> +
> + virtual void run_and_dispose() noexcept override {
> + if (__builtin_expect(this->_state.failed(), false)) {


failed() is already annotated.
It will be nice to add tests that demonstrate the lower bounds on
allocations and task counts.


> #endif

Benny Halevy

<bhalevy@scylladb.com>
unread,
Mar 16, 2022, 2:36:45 PM3/16/22
to Avi Kivity, seastar-dev@googlegroups.com, Gleb Natapov
Can you please point at a good example to borrow from?
I tried the following
requires (Func func, Iterator it) { { futurize_invoke(func, *it) } -> std::same_as<future<>>; }

but it triggers g++ 11.2.1 ICE:


../../include/seastar/coroutine/parallel_for_each.hh:108:9: internal compiler error: Segmentation fault
108 | requires (Func func, Iterator it) { { futurize_invoke(func, *it) } -> std::same_as<future<>>; }
| ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

>
>
> > +class [[nodiscard("must co_await an parallel_for_each() object")]] parallel_for_each final : public continuation_base<> {
>
>
> It will be nicer to use private inheritance.

sure, will do

>
>
> > +    using coroutine_handle_t = SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<void>;
> > +
> > +    Func _func;
> > +    std::vector<future<>> _futures;
>
>
> I (and Nadav) hate to bring boost into it, but boost::small_vector can
> save an allocation here.

No problem, I wonder what should be the preallocated size.
Would boost::container::small_vector<future<>, 1> most common case?
Will do.

>
>
> > +        seastar::internal::set_callback(fut, this);
>
>
> (set_callback should consume its input since it will be unusable after
> it runs)

I'm not sure I understand which input it needs to consume
and whether this should be done in this patch or as a follow up.

>
>
> > +    }
> > +
> > +    void resume_or_set_callback() noexcept {
> > +        if (__builtin_expect(consume_next(), false)) {
>
>
> The __builtin_expect is wrong. It will return true in an least 1/n
> calls, often more, and n can be small.

okay, will remove it.

>
>
> > +            _when_ready.resume();
> > +        } else {
> > +            set_callback();
> > +        }
> > +    }
> > +
> > +public:
> > +    // clang 13.0.1 doesn't support subrange
> > +    // so provide also a Iterator/Sentinel based constructor.
> > +    // See https://github.com/llvm/llvm-project/issues/46091
> > +    template <typename Iterator, typename Sentinel>
> > +    requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
>
>
> Constraint for Func.

See my answer above.

>
>
> > +    explicit parallel_for_each(Iterator begin, Sentinel end, Func&& func)
> > +        : _func(std::move(func))
> > +    {
> > +        for (auto it = begin; it != end; ++it) {
> > +            auto fut = futurize_invoke(_func, *it);
> > +            if (fut.available()) {
> > +                if (fut.failed()) {
> > +                    _ex = fut.get_exception();
> > +                }
> > +                continue;
> > +            }
> > +            if (_futures.empty()) {
> > +                _futures.reserve(std::distance(it, end));
> > +            }
>
>
> This can fail. We can't handle it, so must abort.

good point (so is the push_back below)

>
>
> std::distance() should not be used on input iterators, they are single pass.

I'll use seastar::internal::iterator_range_estimate_vector_capacity then,
similar to seastar::parallel_for_each

>
>
> > +            _futures.push_back(std::move(fut));
> > +        }
> > +    }
> > +
> > +    template <std::ranges::range Range>
> > +    requires std::invocable<Func, std::ranges::range_value_t<Range>>
> > +    explicit parallel_for_each(const Range& range, Func&& func)
> > +        : parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<Func>(func))
> > +    { }
> > +
> > +    bool await_ready() const {
> > +        if (_futures.empty()) {
> > +            await_resume();
>
>
> I don't know if throwing from await_ready is legal. Better to throw from
> the constructor.

It's actually tested by test_parallel_for_each_exception
so it works both with gcc and clang.

I think it'd be easier on the user to propagate
the error via co_await so it can be handled by a single
path rather than having to handle both paths - exceptions
from the constructor on synchronous errors, vs.
exceptions from co_await if any function created a continuation.

I'll mark the constructor noexcept to make it clearer
(and failing to allocate the futures vector will terminate)

>
>
> > +            return true;
> > +        }
> > +        return false;
> > +    }
> > +
> > +    template<typename T>
> > +    void await_suspend(SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<T> h) {
> > +        _when_ready = h;
> > +        _waiting_task = &h.promise();
>
>
> Can we compute this in waiting_task() and remove the member?

I wish. It's problematic since we can't deduce the <T> type of _when_ready
therefore we have to declare it as coroutine_handle<void>
and then coroutine_handle<void> doesn't have a promise().

>
>
> > +        resume_or_set_callback();
> > +    }
> > +
> > +    void await_resume() const {
> > +        if (_ex) {
>
>
> [[unlikely]]
>
>
> > +            std::rethrow_exception(_ex);
>
>
> std::move

Sure, will do both

>
>
> > +        }
> > +    }
> > +
> > +    virtual void run_and_dispose() noexcept override {
> > +        if (__builtin_expect(this->_state.failed(), false)) {
>
>
> failed() is already annotated.

right, will get rid of __builtin_expect here too

Benny Halevy

<bhalevy@scylladb.com>
unread,
Mar 16, 2022, 2:54:29 PM3/16/22
to seastar-dev@googlegroups.com, Benny Halevy, Avi Kivity, Gleb Natapov
This patch adds support for coroutine::parallel_for_each
that can be used in coroutines instead of
co_await seastar::parallel_for_each.

This implementation minimizes memory allocation
by deferring the allocation of a vector of futures
to wait on, till any of the futures is unavailable,
similar to the legacy parallel_for_each implementation.

Also it waits on the unavailable futures from back to front,
similar to to legacy implementation, to minimize
the number of callbacks required.

The advantage over the legacy implementation is
that the parallel_for_each object itself is a
seastar task that is used to wait on unavailable
futures with no need to allocate a parallel_for_each_state.

This is possible since coroutine::parallel_for_each
is defined as [[nodiscard]] and the caller must co_await
for it, making sure it remains valid until all the futures
it's waiting on are resolved.

Signed-off-by: Benny Halevy <bha...@scylladb.com>
---
demos/coroutines_demo.cc | 11 +-
doc/tutorial.md | 17 ++
.../seastar/coroutine/parallel_for_each.hh | 174 ++++++++++++++++++
tests/unit/coroutines_test.cc | 97 ++++++++++
4 files changed, 298 insertions(+), 1 deletion(-)
create mode 100644 include/seastar/coroutine/parallel_for_each.hh

in v3:
- added Func constraints
- privately inherits continuation_base<>
- use boost::small_vector<future<>, 1> for the _futures vector
- use operator= to reset task->_state
- removed _builtin_excects
- make constructors noexcept
- and added scoped_critical_alloc_section for futures vector
allocation
- used internal::iterator_range_estimate_vector_capacity to
estimate the iterator distance only when possible.
- add [[unlikely]] for the await_resume error path.
- and std::move(_ex)
index 00000000..fb27de03
--- /dev/null
+++ b/include/seastar/coroutine/parallel_for_each.hh
@@ -0,0 +1,174 @@
+#include <boost/container/small_vector.hpp>
+
+#include <seastar/core/loop.hh>
+// constaints for Func are defined at the parallel_for_each constructor
+class [[nodiscard("must co_await an parallel_for_each() object")]] parallel_for_each final : continuation_base<> {
+ using coroutine_handle_t = SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<void>;
+
+ Func _func;
+ boost::container::small_vector<future<>, 1> _futures;
+ this->_state = {};
+ seastar::internal::set_callback(fut, reinterpret_cast<continuation_base<>*>(this));
+ }
+
+ void resume_or_set_callback() noexcept {
+ if (consume_next()) {
+ _when_ready.resume();
+ } else {
+ set_callback();
+ }
+ }
+
+public:
+ // clang 13.0.1 doesn't support subrange
+ // so provide also a Iterator/Sentinel based constructor.
+ // See https://github.com/llvm/llvm-project/issues/46091
+ template <typename Iterator, typename Sentinel>
+ requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
+ && std::same_as<future<>, futurize_t<std::invoke_result_t<Func, typename std::iterator_traits<Iterator>::value_type>>>
+ explicit parallel_for_each(Iterator begin, Sentinel end, Func&& func) noexcept
+ : _func(std::move(func))
+ {
+ for (auto it = begin; it != end; ++it) {
+ auto fut = futurize_invoke(_func, *it);
+ if (fut.available()) {
+ if (fut.failed()) {
+ _ex = fut.get_exception();
+ }
+ } else {
+ memory::scoped_critical_alloc_section _;
+ if (_futures.empty()) {
+ using itraits = std::iterator_traits<Iterator>;
+ if constexpr (seastar::internal::has_iterator_category<Iterator>::value) {
+ auto n = seastar::internal::iterator_range_estimate_vector_capacity(it, end, typename itraits::iterator_category{});
+ _futures.reserve(n);
+ }
+ }
+ _futures.push_back(std::move(fut));
+ }
+ }
+ }
+
+ template <std::ranges::range Range>
+ requires std::invocable<Func, std::ranges::range_value_t<Range>>
+ explicit parallel_for_each(const Range& range, Func&& func) noexcept
+ : parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<Func>(func))
+ { }
+
+ bool await_ready() const {
+ if (_futures.empty()) {
+ await_resume();
+ return true;
+ }
+ return false;
+ }
+
+ template<typename T>
+ void await_suspend(SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<T> h) {
+ _when_ready = h;
+ _waiting_task = &h.promise();
+ resume_or_set_callback();
+ }
+
+ void await_resume() const {
+ if (_ex) [[unlikely]] {
+ std::rethrow_exception(std::move(_ex));
+ }
+ }
+
+ virtual void run_and_dispose() noexcept override {
+ if (this->_state.failed()) {
+ _ex = std::move(this->_state).get_exception();
+ }
+ resume_or_set_callback();
+ }
+
+ virtual task* waiting_task() noexcept override {
+ return _waiting_task;
+ }
+};
+
+}
diff --git a/tests/unit/coroutines_test.cc b/tests/unit/coroutines_test.cc
index 4a9a2566..73297b26 100644
--- a/tests/unit/coroutines_test.cc
+++ b/tests/unit/coroutines_test.cc
@@ -20,12 +20,14 @@
*/

#include <exception>
+#include <numeric>

#include <seastar/core/future-util.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/core/sleep.hh>
#include <seastar/util/later.hh>
#include <seastar/core/thread.hh>
+#include <seastar/testing/random.hh>

using namespace seastar;
using namespace std::chrono_literals;
@@ -42,6 +44,7 @@ SEASTAR_TEST_CASE(test_coroutines_not_compiled_in) {
#include <seastar/coroutine/all.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/switch_to.hh>
+#include <seastar/coroutine/parallel_for_each.hh>

namespace {

@@ -466,4 +469,98 @@ SEASTAR_TEST_CASE(generator)
#endif
--
2.34.1

Avi Kivity

<avi@scylladb.com>
unread,
Mar 17, 2022, 5:58:00 AM3/17/22
to Benny Halevy, seastar-dev@googlegroups.com, Gleb Natapov
Please check trunk, and minimize and report.


I can't give any recommendation based on this fragment.


>>
>>> +class [[nodiscard("must co_await an parallel_for_each() object")]] parallel_for_each final : public continuation_base<> {
>>
>> It will be nicer to use private inheritance.
> sure, will do
>
>>
>>> +    using coroutine_handle_t = SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<void>;
>>> +
>>> +    Func _func;
>>> +    std::vector<future<>> _futures;
>>
>> I (and Nadav) hate to bring boost into it, but boost::small_vector can
>> save an allocation here.
> No problem, I wonder what should be the preallocated size.
> Would boost::container::small_vector<future<>, 1> most common case?


future<> is small enough. I'd go to 5. This gets embedded in the
coroutine frame, so not free, but better a larger allocation than an
extra allocation.
The future, in a follow up.
That doesn't mean it is legal (the compiler can erroneously allow
illegal constructs.


> I think it'd be easier on the user to propagate
> the error via co_await so it can be handled by a single
> path rather than having to handle both paths - exceptions
> from the constructor on synchronous errors, vs.
> exceptions from co_await if any function created a continuation.


Isn't it exactly the same? try { co_await parallel_for_each() } should
work for both.


>
> I'll mark the constructor noexcept to make it clearer
> (and failing to allocate the futures vector will terminate)
>
>>
>>> +            return true;
>>> +        }
>>> +        return false;
>>> +    }
>>> +
>>> +    template<typename T>
>>> +    void await_suspend(SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<T> h) {
>>> +        _when_ready = h;
>>> +        _waiting_task = &h.promise();
>>
>> Can we compute this in waiting_task() and remove the member?
> I wish. It's problematic since we can't deduce the <T> type of _when_ready
> therefore we have to declare it as coroutine_handle<void>
> and then coroutine_handle<void> doesn't have a promise().


I think it's possible to do it, but not worth the effort.


We can define await_transform() in the promise object that wraps the
p_f_e with a wrapper that depends on the promise object type. It's not
worthwhile here, but a technique to keep in mind.
^^


>>>   #endif
>

Benny Halevy

<bhalevy@scylladb.com>
unread,
Mar 17, 2022, 9:56:03 AM3/17/22
to Avi Kivity, seastar-dev@googlegroups.com, Gleb Natapov
I'm not able to reproduce this on godbolt.org for some reason.
(nor with older gcc version)

See https://godbolt.org/z/zjnn4avjP

>
>
> I can't give any recommendation based on this fragment.
>
>
> > >
> > > > +class [[nodiscard("must co_await an parallel_for_each() object")]] parallel_for_each final : public continuation_base<> {
> > >
> > > It will be nicer to use private inheritance.
> > sure, will do
> >
> > >
> > > > +    using coroutine_handle_t = SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<void>;
> > > > +
> > > > +    Func _func;
> > > > +    std::vector<future<>> _futures;
> > >
> > > I (and Nadav) hate to bring boost into it, but boost::small_vector can
> > > save an allocation here.
> > No problem, I wonder what should be the preallocated size.
> > Would boost::container::small_vector<future<>, 1> most common case?
>
>
> future<> is small enough. I'd go to 5. This gets embedded in the
> coroutine frame, so not free, but better a larger allocation than an
> extra allocation.

okay, will send v4
Opened https://github.com/scylladb/seastar/issues/1027
yes, but the following won't:

auto f = coroutine::parallel_for_each();

co_await std::move(f);

Benny Halevy

<bhalevy@scylladb.com>
unread,
Mar 17, 2022, 11:31:51 AM3/17/22
to seastar-dev@googlegroups.com, Benny Halevy, Avi Kivity
This patch adds support for coroutine::parallel_for_each
that can be used in coroutines instead of
co_await seastar::parallel_for_each.

This implementation minimizes memory allocation
by deferring the allocation of a vector of futures
to wait on, till any of the futures is unavailable,
similar to the legacy parallel_for_each implementation.

Also it waits on the unavailable futures from back to front,
similar to to legacy implementation, to minimize
the number of callbacks required.

The advantage over the legacy implementation is
that the parallel_for_each object itself is a
seastar task that is used to wait on unavailable
futures with no need to allocate a parallel_for_each_state.

This is possible since coroutine::parallel_for_each
is defined as [[nodiscard]] and the caller must co_await
for it, making sure it remains valid until all the futures
it's waiting on are resolved.

Signed-off-by: Benny Halevy <bha...@scylladb.com>
---
demos/coroutines_demo.cc | 11 +-
doc/tutorial.md | 17 ++
.../seastar/coroutine/parallel_for_each.hh | 173 ++++++++++++++++++
tests/unit/coroutines_test.cc | 97 ++++++++++
4 files changed, 297 insertions(+), 1 deletion(-)
create mode 100644 include/seastar/coroutine/parallel_for_each.hh

In v4:
- increased boost::small_vector<future<>, 5>
- do not throw from await_ready
- defer to slow (await_suspend) path instead

in v3:
- added Func constraints
- privately inherits continuation_base<>
- use boost::small_vector<future<>, 1> for the _futures vector
- use operator= to reset task->_state
- removed _builtin_excects
- make constructors noexcept
- and added scoped_critical_alloc_section for futures vector
allocation
- used internal::iterator_range_estimate_vector_capacity to
estimate the iterator distance only when possible.
- add [[unlikely]] for the await_resume error path.
- and std::move(_ex)

index 00000000..e11fe436
--- /dev/null
+++ b/include/seastar/coroutine/parallel_for_each.hh
@@ -0,0 +1,173 @@
+#include <boost/container/small_vector.hpp>
+
+#include <seastar/core/loop.hh>
+// constaints for Func are defined at the parallel_for_each constructor
+class [[nodiscard("must co_await an parallel_for_each() object")]] parallel_for_each final : continuation_base<> {
+ using coroutine_handle_t = SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<void>;
+
+ Func _func;
+ boost::container::small_vector<future<>, 5> _futures;
+ this->_state = {};
+ seastar::internal::set_callback(fut, reinterpret_cast<continuation_base<>*>(this));
+ }
+
+ void resume_or_set_callback() noexcept {
+ if (consume_next()) {
+ _when_ready.resume();
+ } else {
+ set_callback();
+ }
+ }
+
+public:
+ // clang 13.0.1 doesn't support subrange
+ // so provide also a Iterator/Sentinel based constructor.
+ // See https://github.com/llvm/llvm-project/issues/46091
+ template <typename Iterator, typename Sentinel>
+ requires (std::same_as<Sentinel, Iterator> || std::sentinel_for<Sentinel, Iterator>)
+ && std::same_as<future<>, futurize_t<std::invoke_result_t<Func, typename std::iterator_traits<Iterator>::value_type>>>
+ explicit parallel_for_each(Iterator begin, Sentinel end, Func&& func) noexcept
+ : _func(std::move(func))
+ {
+ for (auto it = begin; it != end; ++it) {
+ auto fut = futurize_invoke(_func, *it);
+ if (fut.available()) {
+ if (fut.failed()) {
+ _ex = fut.get_exception();
+ }
+ } else {
+ memory::scoped_critical_alloc_section _;
+ if (_futures.empty()) {
+ using itraits = std::iterator_traits<Iterator>;
+ if constexpr (seastar::internal::has_iterator_category<Iterator>::value) {
+ auto n = seastar::internal::iterator_range_estimate_vector_capacity(it, end, typename itraits::iterator_category{});
+ _futures.reserve(n);
+ }
+ }
+ _futures.push_back(std::move(fut));
+ }
+ }
+ }
+
+ template <std::ranges::range Range>
+ requires std::invocable<Func, std::ranges::range_value_t<Range>>
+ explicit parallel_for_each(const Range& range, Func&& func) noexcept
+ : parallel_for_each(std::ranges::begin(range), std::ranges::end(range), std::forward<Func>(func))
+ { }
+
+ bool await_ready() const noexcept {
+ if (_futures.empty()) {
+ return !_ex;
+ }
+ return false;
+ }
+
+ template<typename T>
+ void await_suspend(SEASTAR_INTERNAL_COROUTINE_NAMESPACE::coroutine_handle<T> h) {
+ _when_ready = h;
+ _waiting_task = &h.promise();
+ resume_or_set_callback();
+ }
+
+ void await_resume() const {
+ if (_ex) [[unlikely]] {
+ std::rethrow_exception(std::move(_ex));
+ }
+ }
+
+ virtual void run_and_dispose() noexcept override {
+ if (this->_state.failed()) {
+ _ex = std::move(this->_state).get_exception();
+ }
+ resume_or_set_callback();
+ }
+
+ virtual task* waiting_task() noexcept override {
+ return _waiting_task;
+ }
+};
+
+}
diff --git a/tests/unit/coroutines_test.cc b/tests/unit/coroutines_test.cc
index 4a9a2566..73297b26 100644
--- a/tests/unit/coroutines_test.cc
+++ b/tests/unit/coroutines_test.cc
@@ -20,12 +20,14 @@
*/

#include <exception>
+#include <numeric>

#include <seastar/core/future-util.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/core/sleep.hh>
#include <seastar/util/later.hh>
#include <seastar/core/thread.hh>
+#include <seastar/testing/random.hh>

using namespace seastar;
using namespace std::chrono_literals;
@@ -42,6 +44,7 @@ SEASTAR_TEST_CASE(test_coroutines_not_compiled_in) {
#include <seastar/coroutine/all.hh>
#include <seastar/coroutine/maybe_yield.hh>
#include <seastar/coroutine/switch_to.hh>
+#include <seastar/coroutine/parallel_for_each.hh>

namespace {

@@ -466,4 +469,98 @@ SEASTAR_TEST_CASE(generator)
#endif
--
2.34.1

Commit Bot

<bot@cloudius-systems.com>
unread,
Mar 17, 2022, 11:59:18 AM3/17/22
to seastar-dev@googlegroups.com, Benny Halevy
From: Benny Halevy <bha...@scylladb.com>
Committer: Avi Kivity <a...@scylladb.com>
Branch: master

coroutines: add parallel_for_each

This patch adds support for coroutine::parallel_for_each
that can be used in coroutines instead of
co_await seastar::parallel_for_each.

This implementation minimizes memory allocation
by deferring the allocation of a vector of futures
to wait on, till any of the futures is unavailable,
similar to the legacy parallel_for_each implementation.

Also it waits on the unavailable futures from back to front,
similar to to legacy implementation, to minimize
the number of callbacks required.

The advantage over the legacy implementation is
that the parallel_for_each object itself is a
seastar task that is used to wait on unavailable
futures with no need to allocate a parallel_for_each_state.

This is possible since coroutine::parallel_for_each
is defined as [[nodiscard]] and the caller must co_await
for it, making sure it remains valid until all the futures
it's waiting on are resolved.

Signed-off-by: Benny Halevy <bha...@scylladb.com>
Message-Id: <20220317153144....@scylladb.com>

---
diff --git a/demos/coroutines_demo.cc b/demos/coroutines_demo.cc
--- a/doc/tutorial.md
+++ b/doc/tutorial.md
@@ -440,6 +440,23 @@ Here, two read() calls are launched concurrently. The coroutine is paused until

Note that `all` waits for all of its sub-computations, even if some throw an exception. If an exception is thrown, it is propagated to the calling coroutine.

+The `seastar::coroutine::parallel_for_each` class template allows a coroutine to fork into several concurrently executing function invocations (or Seastar fibers, see below) over a range of elements and join again when they complete. Consider this example:
+
+```cpp
+#include <seastar/core/coroutines.hh>
+#include <seastar/coroutine/parallel_for_each.hh>
+
+seastar::future<bool> all_exist(std::vector<sstring> filenames) {
+ bool res = true;
+ co_await seastar::coroutine::parallel_for_each(filenames, [&res] (const seastar::sstring& name) -> seastar::future<> {
+ res &= co_await seastar::file_exists(name);
+ });
+ co_return res;
+}
+```
+
+Here, the lambda function passed to parallel_for_each is launched concurrently for each element in the filenames vector. The coroutine is paused until all calls complete.
+
## Breaking up long running computations

Seastar is generally used for I/O, and coroutines usually launch I/O operations and consume their results, with little computation in between. But occasionally a long running computation is needed, and this risks preventing the reactor from performing I/O and scheduling other tasks.
diff --git a/include/seastar/coroutine/parallel_for_each.hh b/include/seastar/coroutine/parallel_for_each.hh
--- a/include/seastar/coroutine/parallel_for_each.hh

Avi Kivity

<avi@scylladb.com>
unread,
Mar 20, 2022, 12:27:01 PM3/20/22
to Benny Halevy, seastar-dev@googlegroups.com, Gleb Natapov

On 17/03/2022 15.55, Benny Halevy wrote:
>
>>> I think it'd be easier on the user to propagate
>>> the error via co_await so it can be handled by a single
>>> path rather than having to handle both paths - exceptions
>>> from the constructor on synchronous errors, vs.
>>> exceptions from co_await if any function created a continuation.
>>
>> Isn't it exactly the same? try { co_await parallel_for_each() } should
>> work for both.
> yes, but the following won't:


Why not?


>
> auto f = coroutine::parallel_for_each();


f is not a future.


> co_await std::move(f);
>

Benny Halevy

<bhalevy@scylladb.com>
unread,
Mar 20, 2022, 3:32:13 PM3/20/22
to Avi Kivity, seastar-dev, Gleb Natapov
true, but still parallel_for_each would throw here from it's constructor, vs. throwing below from operator co_await.



>     co_await std::move(f);

Maybe this is not the best example.

If we ever want to get rid of exceptions as much as possible and wrap exceptions in some
result<T> wrapper that could be implicitly converted to future<T>, then I would like

auto res = co_await coroutine::parallel_for_each

To pass the exception to res rather than throwing and co_return co_await coroutine::parallel_for_each to convey the error to the returned future without throwing at all.

>

Avi Kivity

<avi@scylladb.com>
unread,
Mar 21, 2022, 8:06:54 AM3/21/22
to Benny Halevy, seastar-dev, Gleb Natapov


On 20/03/2022 21.31, Benny Halevy wrote:


On Sun, Mar 20, 2022, 18:27 Avi Kivity <a...@scylladb.com> wrote:

On 17/03/2022 15.55, Benny Halevy wrote:
>
>>> I think it'd be easier on the user to propagate
>>> the error via co_await so it can be handled by a single
>>> path rather than having to handle both paths - exceptions
>>> from the constructor on synchronous errors, vs.
>>> exceptions from co_await if any function created a continuation.
>>
>> Isn't it exactly the same? try { co_await parallel_for_each() } should
>> work for both.
> yes, but the following won't:


Why not?


>
>     auto f = coroutine::parallel_for_each();


f is not a future.

true, but still parallel_for_each would throw here from it's constructor, vs. throwing below from operator co_await.


Why does that matter?





>     co_await std::move(f);

Maybe this is not the best example.

If we ever want to get rid of exceptions as much as possible and wrap exceptions in some
result<T> wrapper that could be implicitly converted to future<T>, then I would like

auto res = co_await coroutine::parallel_for_each

To pass the exception to res rather than throwing and co_return co_await coroutine::parallel_for_each to convey the error to the returned future without throwing at all.


It's not possible with coroutines.



>

Niek Bouman

<niekbouman@gmail.com>
unread,
Jun 21, 2022, 9:59:24 AM6/21/22
to seastar-dev
Dear Benny, (and/or others)

Would similar benefits (i.e. saving some allocations) be achievable with a coroutine::-version of "seastar::max_concurrent_for_each"?
If so, are you planning to add it?

I know you accept patches ;-) but having looked at the implementation of coroutine::parallel_for_each, I concluded that I unfortunately do not understand enough of seastar's internals to be able to contribute such a coroutine::max_concurrent_for_each implementation...

kind regards,
Niek




Avi Kivity

<avi@scylladb.com>
unread,
Jun 21, 2022, 2:34:24 PM6/21/22
to Niek Bouman, seastar-dev


On 21/06/2022 16.59, Niek Bouman wrote:
Dear Benny, (and/or others)

Would similar benefits (i.e. saving some allocations) be achievable with a coroutine::-version of "seastar::max_concurrent_for_each"?
If so, are you planning to add it?


Probably greater, I haven't looked at max_concurrent_for_each but it probably has more housekeeping continuations.


I know you accept patches ;-) but having looked at the implementation of coroutine::parallel_for_each, I concluded that I unfortunately do not understand enough of seastar's internals to be able to contribute such a coroutine::max_concurrent_for_each implementation...


The list is happy to answer questions.


kind regards,
Niek




--
You received this message because you are subscribed to the Google Groups "seastar-dev" group.
To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.

Benny Halevy

<bhalevy@scylladb.com>
unread,
Jun 22, 2022, 2:46:19 AM6/22/22
to Avi Kivity, Niek Bouman, seastar-dev
On Tue, 2022-06-21 at 21:34 +0300, 'Avi Kivity' via seastar-dev wrote:
>
> On 21/06/2022 16.59, Niek Bouman wrote:
>  
> >  
> > Dear Benny, (and/or others)
> >
> > Would similar benefits (i.e. saving some allocations) be achievable with a coroutine::-version of "seastar::max_concurrent_for_each"?

Maybe, I didn't look into it.

> > If so, are you planning to add it?
> >

Not at the moment, until need arises.
If you have a use case for it, feel invited to send a patch for review.

>
> Probably greater, I haven't looked at max_concurrent_for_each but it probably has more housekeeping continuations.
>
>  
> > I know you accept patches ;-) but having looked at the implementation of coroutine::parallel_for_each, I concluded that I unfortunately do not understand enough of seastar's internals to be able
> > to contribute such a coroutine::max_concurrent_for_each implementation...
> >
>
> The list is happy to answer questions.

Agreed.

>
>  
> > kind regards,
> > Niek
> >
> >
> >
> >  
> >  --
> >  You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> >  To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
> >  To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/18a7f695-40bf-4673-80ad-308475757e12n%40googlegroups.com.
> --
> You received this message because you are subscribed to the Google Groups "seastar-dev" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to seastar-dev...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/seastar-dev/99d197ab-5abe-d231-4449-3b5df2074c96%40scylladb.com.

Niek Bouman

<niekbouman@gmail.com>
unread,
Jun 24, 2022, 9:24:31 AM6/24/22
to seastar-dev
> > I know you accept patches ;-) but having looked at the implementation of coroutine::parallel_for_each, I concluded that I unfortunately do not understand enough of seastar's internals to be able
> > to contribute such a coroutine::max_concurrent_for_each implementation...
> >
>
> The list is happy to answer questions.

Agreed.

Ok, good to know ;-). I'm currently working on other stuff; will try to look at this at some later point in time.

best regards,
Niek

 
Reply all
Reply to author
Forward
0 new messages