I'd like to have a facility that:
- synchronizes a bunch of tasks (call it chain)
- allows to cancel chain at any point of execution after current task is done
- allows to add a task from within another task
- ensures that tasks won't execute after the facility is destroyed
To accomplish the first two items of the requirements list, I wrote this:
class TaskManagerSync : public seastar::gate
{
public:
using exception_handler_t = std::move_only_function<void(std::exception_ptr)>;
explicit TaskManagerSync(exception_handler_t eh)
: m_exceptionHandler{std::move(eh)}
{
if (!m_exceptionHandler) { throw std::invalid_argument{"Empty exception handler is not allowed"}; }
}
template <class FUNC, class... ARGs>
void add(FUNC&& func, ARGs&&... args) noexcept
{
m_done = m_done.then([func = std::forward<FUNC>(func), ...args = std::forward<ARGs>(args), this] mutable {
if (try_enter())
{
return seastar::futurize_invoke(std::forward<FUNC>(func), std::forward<ARGs>(args)...)
.discard_result()
.handle_exception(std::ref(m_exceptionHandler))
.finally([this] { leave(); });
}
return seastar::make_ready_future();
});
}
private:
seastar::future<> m_done{seastar::make_ready_future()};
exception_handler_t m_exceptionHandler;
};
Works fine until someone tries to
add() a task from within another task:
static auto task2 = [] -> seastar::future<> {
co_await seastar::sleep(10ms);
};
static auto task1 = [](TaskManagerSync* tms) {
tms->add(task2);
return seastar::make_ready_future();
};
TaskManagerSync tms([](auto) {});
tms.add(task1, &tms);
co_await tms.close();
TaskManagerSync crashes. My best guess is that is has something to do with the promise reattachment that happens during
m_done = m_done.then(...)To fix that issue, we can chain
seastar::yield before chaining the task:
template <class FUNC, class... ARGs>
void add(FUNC&& func, ARGs&&... args) noexcept
{
m_done = m_done.then([] {
return sestar::yield(); // fixes the crash
}).then([func = std::forward<FUNC>(func), ...args = std::forward<ARGs>(args), this] mutable {
if (try_enter())
{
return seastar::futurize_invoke(std::forward<FUNC>(func), std::forward<ARGs>(args)...)
.discard_result()
.handle_exception(std::ref(m_exceptionHandler))
.finally([this] { leave(); });
}
return seastar::make_ready_future();
});
}
Now we have 3 items done from the list. But with yield in place here comes the 4th item. Task may execute even after TaskManagerSync is destroyed:
TaskManagerSync tms([](auto) {});
co_await tms.close();
// from this point new tasks shall be discarded
tms.add([] {
assert(false && "Task should never have been executed");
return seastar::make_ready_future();
});
// give time to tms to discard the task
co_await seastar::yield();
So here is what happens:
- TaskManagerSync gets created
- gate gets closed
- add(...) gets called
- m_done.then(...) gets suspended
- seastar::yield from the test gets suspended
- seastar::yield from the add() gets suspended
- TaskManagerSync gets destroyed
- add() starts executing 2nd then()
- try_enter() is called with destroyed gate, so _stopped is actually false hence try_enter() is true
- futurize_invoke(...) executes the task that shouldn't have been executed
This can be fixed by replacing seastar::yield() from the test with seastar::sleep(). But in actual code user won't be aware of that problem. It's only required from the user to co_await on close() before destroying TaskManagerSync.
I'd like to ask:
- do you consider TaskManagerSync a good solution?
- how would you write this facility (trying to make it lightweight)?
- what are your thoughts on the post destruction task execution?