Groups keyboard shortcuts have been updated
Dismiss
See shortcuts

Is there a better way to synchronize the chain of callables

54 views
Skip to first unread message

Leonid Manieiev

<leonid.manieiev@gmail.com>
unread,
Jul 6, 2024, 6:03:37 AM7/6/24
to seastar-dev
I'd like to have a facility that:
  1. synchronizes a bunch of tasks (call it chain)
  2. allows to cancel chain at any point of execution after current task is done
  3. allows to add a task from within another task 
  4. ensures that tasks won't execute after the facility is destroyed
To accomplish the first two items of the requirements list, I wrote this:

class TaskManagerSync : public seastar::gate
{
public:
using exception_handler_t = std::move_only_function<void(std::exception_ptr)>;

explicit TaskManagerSync(exception_handler_t eh)
: m_exceptionHandler{std::move(eh)}
{
if (!m_exceptionHandler) { throw std::invalid_argument{"Empty exception handler is not allowed"}; }
}

template <class FUNC, class... ARGs>
void add(FUNC&& func, ARGs&&... args) noexcept
{
m_done = m_done.then([func = std::forward<FUNC>(func), ...args = std::forward<ARGs>(args), this] mutable {
if (try_enter())
{
return seastar::futurize_invoke(std::forward<FUNC>(func), std::forward<ARGs>(args)...)
.discard_result()
.handle_exception(std::ref(m_exceptionHandler))
.finally([this] { leave(); });
}

return seastar::make_ready_future();
});
}

private:
seastar::future<> m_done{seastar::make_ready_future()};
exception_handler_t m_exceptionHandler;
};

Works fine until someone tries to add() a task from within another task:

static auto task2 = [] -> seastar::future<> {
co_await seastar::sleep(10ms);
};

static auto task1 = [](TaskManagerSync* tms) {
tms->add(task2);
return seastar::make_ready_future();
};

TaskManagerSync tms([](auto) {});
tms.add(task1, &tms);
co_await tms.close();

TaskManagerSync crashes. My best guess is that is has something to do with the promise reattachment that happens during m_done = m_done.then(...)

To fix that issue, we can chain seastar::yield before chaining the task:

template <class FUNC, class... ARGs>
void add(FUNC&& func, ARGs&&... args) noexcept
{
m_done = m_done.then([] {
return sestar::yield(); // fixes the crash
}).then([func = std::forward<FUNC>(func), ...args = std::forward<ARGs>(args), this] mutable {
if (try_enter())
{
return seastar::futurize_invoke(std::forward<FUNC>(func), std::forward<ARGs>(args)...)
.discard_result()
.handle_exception(std::ref(m_exceptionHandler))
.finally([this] { leave(); });
}

return seastar::make_ready_future();
});
}

Now we have 3 items done from the list. But with yield in place here comes the 4th item. Task may execute even after TaskManagerSync is destroyed:

TaskManagerSync tms([](auto) {});
co_await tms.close();
// from this point new tasks shall be discarded
tms.add([] {
assert(false && "Task should never have been executed");
return seastar::make_ready_future();
});
// give time to tms to discard the task
co_await seastar::yield();

So here is what happens:
  1. TaskManagerSync gets created
  2. gate gets closed
  3. add(...) gets called
  4. m_done.then(...) gets suspended
  5. seastar::yield from the test gets suspended
  6. seastar::yield from the add() gets suspended
  7. TaskManagerSync gets destroyed
  8. add() starts executing 2nd then()
  9. try_enter() is called with destroyed gate, so _stopped is actually false hence try_enter() is true
  10. futurize_invoke(...) executes the task that shouldn't have been executed
This can be fixed by replacing seastar::yield() from the test with seastar::sleep(). But in actual code user won't be aware of that problem. It's only required from the user to co_await on close() before destroying TaskManagerSync.

I'd like to ask:
  1. do you consider TaskManagerSync a good solution?
  2. how would you write this facility (trying to make it lightweight)?
  3. what are your thoughts on the post destruction task execution?

Avi Kivity

<avi@scylladb.com>
unread,
Jul 11, 2024, 7:24:47 AM7/11/24
to Leonid Manieiev, seastar-dev
On Sat, 2024-07-06 at 03:03 -0700, Leonid Manieiev wrote:
I'd like to have a facility that:
  1. synchronizes a bunch of tasks (call it chain)

The Seastar term is "fiber".

  1. allows to cancel chain at any point of execution after current task is done

There is seastar::abort_source. Each continuation must check it manually, since automatic cancellation can prevent cleanup actions (like file::close()).


  1. allows to add a task from within another task 

You can use some container to hold the tasks, and a condition_variable to communicate among fibers.


  1. ensures that tasks won't execute after the facility is destroyed
To accomplish the first two items of the requirements list, I wrote this:

class TaskManagerSync : public seastar::gate
{
public:
using exception_handler_t = std::move_only_function<void(std::exception_ptr)>;

explicit TaskManagerSync(exception_handler_t eh)
: m_exceptionHandler{std::move(eh)}
{
if (!m_exceptionHandler) { throw std::invalid_argument{"Empty exception handler is not allowed"}; }
}

template <class FUNC, class... ARGs>
void add(FUNC&& func, ARGs&&... args) noexcept
{
m_done = m_done.then([func = std::forward<FUNC>(func), ...args = std::forward<ARGs>(args), this] mutable {
if (try_enter())
{
return seastar::futurize_invoke(std::forward<FUNC>(func), std::forward<ARGs>(args)...)
.discard_result()
.handle_exception(std::ref(m_exceptionHandler))
.finally([this] { leave(); });
}

return seastar::make_ready_future();
});
}

private:
seastar::future<> m_done{seastar::make_ready_future()};
exception_handler_t m_exceptionHandler;
};



I recommend rewriting using coroutines. Your quality of life will improve.
add() then has to check the gate in the same continuation that it performs the addition.


  1. m_done.then(...) gets suspended
  2. seastar::yield from the test gets suspended
  3. seastar::yield from the add() gets suspended
  4. TaskManagerSync gets destroyed
  5. add() starts executing 2nd then()
  6. try_enter() is called with destroyed gate, so _stopped is actually false hence try_enter() is true
  7. futurize_invoke(...) executes the task that shouldn't have been executed
This can be fixed by replacing seastar::yield() from the test with seastar::sleep(). But in actual code user won't be aware of that problem. It's only required from the user to co_await on close() before destroying TaskManagerSync.

I'd like to ask:
  1. do you consider TaskManagerSync a good solution?
  2. how would you write this facility (trying to make it lightweight)?
  3. what are your thoughts on the post destruction task execution?

These are common things that were coded many times, each time differently.

My preferred solution is to have a coroutine that runs an execution loop and is waited for during closing. Outsiders communicate with the coroutine by setting variable (like a request to stop), adding items to collections (say tasks) etc. 

Sorry for not giving concrete comments on your code, I just dislike reviewing continuation-style code these days, it takes too much effort to understand it.
Reply all
Reply to author
Forward
0 new messages