[Boost-users] [boost::asio] Wait for multiple async_read()'s

336 views
Skip to first unread message

Adam Zegelin

unread,
Jan 21, 2017, 7:09:42 AM1/21/17
to boost...@lists.boost.org
Hi,

I'm writing a boost::asio application that communicates with multiple
streams. I'd like to write data to these streams then wait for
everyone to reply before performing computation on the resulting data.

What is the best way to implement this?

The route I've ventured down involves me calling async_write() on each
stream, with the callback hander executing async_read(). But this
where I'm stuck. What is the best way to execute one final hander once
all read operations have completed?

One solution I've found, though it feels "un-asio" is to pass the
use_future option as the hander for the reads. The result is futures
for each read, which I can then place into a list<future>, and have an
additional io_service.post()'ed handler call get() on these futures.
The only issue with this is that I must call io_service.run() from
more than one thread, since the handler calling get() blocks —
otherwise I can potentially deadlock the whole application.

Another alternative I can think of is that the function that schedules
all the async_writes knows ahead-of-time the number of reads that need
to be completed. Each async_read() handler would put its results into
a shared list and then, if the list's size is equal to the expected
number of results, fires the "completion" handler. While easy enough
to implement, this seems fraught with issues. If there is more than
one thread calling io_service.run(), then access to the shared list
has to be synchronised. Also, it doesn't seem very generic — I have a
number of places where this write n, read n, execute (or is it map,
reduce?) pattern needs to happen, and I'd prefer a solution that was
write one, worked everywhere.

Any help would be appreciated.

- Adam
_______________________________________________
Boost-users mailing list
Boost...@lists.boost.org
http://lists.boost.org/mailman/listinfo.cgi/boost-users

Maarten de Vries

unread,
Jan 21, 2017, 7:25:27 AM1/21/17
to boost...@lists.boost.org
Hey,

On 21 January 2017 at 13:09, Adam Zegelin <ad...@zegelin.com> wrote:
Hi,



Another alternative I can think of is that the function that schedules
all the async_writes knows ahead-of-time the number of reads that need
to be completed. Each async_read() handler would put its results into
a shared list and then, if the list's size is equal to the expected
number of results, fires the "completion" handler. While easy enough
to implement, this seems fraught with issues. If there is more than
one thread calling io_service.run(), then access to the shared list
has to be synchronised. Also, it doesn't seem very generic — I have a
number of places where this write n, read n, execute (or is it map,
reduce?) pattern needs to happen, and I'd prefer a solution that was
write one, worked everywhere.

​I would definitely go with this approach​. It's by far the simplest and to me it seems to match an asynchronous philosophy pretty well.

You can use an io_service::strand [1] to synchronize access to the shared list easily without explicit locking. For best performance you should probably keep the handler running in the strand as short as possible and keep the parts of the read handler that don't access shared state outside of the strand.

[1] http://www.boost.org/doc/libs/1_63_0/doc/html/boost_asio/reference.html#boost_asio.reference.io_service__strand

​Hope ​this helps,

-- Maarten


Ben Pope

unread,
Jan 21, 2017, 9:11:56 AM1/21/17
to boost...@lists.boost.org
On 21/01/17 12:09, Adam Zegelin wrote:
> Hi,
>
> I'm writing a boost::asio application that communicates with multiple
> streams. I'd like to write data to these streams then wait for
> everyone to reply before performing computation on the resulting data.
>
> What is the best way to implement this?
>
> The route I've ventured down involves me calling async_write() on each
> stream, with the callback hander executing async_read(). But this
> where I'm stuck. What is the best way to execute one final hander once
> all read operations have completed?
>
> One solution I've found, though it feels "un-asio" is to pass the
> use_future option as the hander for the reads. The result is futures
> for each read, which I can then place into a list<future>, and have an
> additional io_service.post()'ed handler call get() on these futures.
> The only issue with this is that I must call io_service.run() from
> more than one thread, since the handler calling get() blocks —
> otherwise I can potentially deadlock the whole application.

This is why futures without continuations aren't that useful; with
continuations, useful helpers can be made:

http://www.boost.org/doc/libs/1_63_0/doc/html/thread/synchronization.html#thread.synchronization.futures.reference.wait_for_all

http://www.boost.org/doc/libs/1_63_0/doc/html/thread/synchronization.html#thread.synchronization.futures.reference.when_all

Good luck.

Ben

TONGARI J

unread,
Jan 21, 2017, 9:32:25 AM1/21/17
to boost...@lists.boost.org
2017-01-21 20:09 GMT+08:00 Adam Zegelin <ad...@zegelin.com>:
Hi,

I'm writing a boost::asio application that communicates with multiple
streams. I'd like to write data to these streams then wait for
everyone to reply before performing computation on the resulting data.

What is the best way to implement this?

The route I've ventured down involves me calling async_write() on each
stream, with the callback hander executing async_read(). But this
where I'm stuck. What is the best way to execute one final hander once
all read operations have completed?

Coroutine may help. Take a simple echo-server for example: https://github.com/jamboree/co2#asio-echo-server
In your case, you just need to keep the tasks in a list and 'await' for them to complete.

Rutger ter Borg

unread,
Jan 22, 2017, 3:10:09 PM1/22/17
to boost...@lists.boost.org
On 21-01-17 13:09, Adam Zegelin wrote:
> Hi,
>
> I'm writing a boost::asio application that communicates with multiple
> streams. I'd like to write data to these streams then wait for
> everyone to reply before performing computation on the resulting data.
>
> What is the best way to implement this?
>
> The route I've ventured down involves me calling async_write() on each
> stream, with the callback hander executing async_read(). But this
> where I'm stuck. What is the best way to execute one final hander once
> all read operations have completed?
>

[snip]

Not sure if it's the best way, but for this typical use case I've
written a combiner, which uses reference counting in a wrapped handler.
A part of this io_object's signature is

combiner_service::wrapped_handler async_combine( const
boost::system::error_code& a1,
const handler& handler );

and the following usage

auto wrapped_handler = some_combiner_.async_combine(
default_error_code, ..handler when all is complete... )
for( ..0 or more items.. ) {
some_io_object.async_do_stuff( ..., wrapped_handler );
}

in the example above, the operation will only finish if the variable
wrapped_handler goes out of scope, and all copies of wrapped_handler
have been called. The wrapped handler maximizes the final error code.

HtH,
Cheers,

Rutger

Gavin Lambert

unread,
Jan 22, 2017, 5:58:44 PM1/22/17
to boost...@lists.boost.org
On 22/01/2017 01:09, Adam Zegelin wrote:
> One solution I've found, though it feels "un-asio" is to pass the
> use_future option as the hander for the reads. The result is futures
> for each read, which I can then place into a list<future>, and have an
> additional io_service.post()'ed handler call get() on these futures.
> The only issue with this is that I must call io_service.run() from
> more than one thread, since the handler calling get() blocks —
> otherwise I can potentially deadlock the whole application.

You can pass a collection of futures to when_all(), which returns a
future that will be completed when all the parameters are completed.

You can then use future.then() to attach a continuation to this future
which will execute when that occurs.

Adam Zegelin

unread,
Jan 24, 2017, 5:47:28 PM1/24/17
to boost...@lists.boost.org
Thanks for everyones suggestions.

On Mon, Jan 23, 2017 at 9:58 AM, Gavin Lambert <gav...@compacsort.com> wrote:

> You can pass a collection of futures to when_all(), which returns a future
> that will be completed when all the parameters are completed.
>
> You can then use future.then() to attach a continuation to this future which
> will execute when that occurs.
>

That's exactly what I was looking for, thank you!

I swear I read the docs for boost::future multiple times, but it looks
like its a fairly recent addition to the library (labeled
experimental).
Google has this tendency to link to older documentation sets (this
always gets me when googling for postgres docs) and I probably Googled
boost::future and ended up reading the docs from 20 years ago!
Hint to boost devs: do what postgres does and have a link to the most
recent doc version at the top of every page.


On Sun, Jan 22, 2017 at 1:31 AM, TONGARI J <tong...@gmail.com> wrote:
> Coroutine may help. Take a simple echo-server for example:
> https://github.com/jamboree/co2#asio-echo-server
> In your case, you just need to keep the tasks in a list and 'await' for them
> to complete.

I looked into co-routines but they seem like too much magic for my taste.
Perhaps in a future project.


On Sat, Jan 21, 2017 at 11:24 PM, Maarten de Vries <maa...@de-vri.es> wrote:
> I would definitely go with this approach. It's by far the simplest and to me
> it seems to match an asynchronous philosophy pretty well.

I got this to work, but it seems like a lot of work at every "join" point.
Not to mention it combines the concerns of the upstream and downstream
read-handlers — upstream has to know about downstream.

Regards,
Adam

Gavin Lambert

unread,
Jan 24, 2017, 9:17:45 PM1/24/17
to boost...@lists.boost.org
On 25/01/2017 11:47, Adam Zegelin wrote:
> I swear I read the docs for boost::future multiple times, but it looks
> like its a fairly recent addition to the library (labeled
> experimental).
> Google has this tendency to link to older documentation sets (this
> always gets me when googling for postgres docs) and I probably Googled
> boost::future and ended up reading the docs from 20 years ago!
> Hint to boost devs: do what postgres does and have a link to the most
> recent doc version at the top of every page.

FWIW, the top Google hit for "boost future" links to the 1.55 docs
(which do not have when_all), but there is indeed a big yellow box at
the top of the page which links to the latest version.

Perhaps it didn't look enough like a hyperlink so you mentally skipped
it. :)

Or perhaps instead of entering the page at the top you entered at one of
the anchors further down? Perhaps that's an argument to style the
header so that it stays stuck at the top of the window, even when
scrolled down. (It also wouldn't be the first time that I've wished I
didn't have to scroll to get to the navigation arrows.) Although that
might annoy people on mobile browsers.

Adam Zegelin

unread,
Jan 26, 2017, 10:41:12 PM1/26/17
to boost...@lists.boost.org
Next problem :)

asio's use_future returns a std::future. when_all() expects boost::futures.
I can't find an obvious way to convert between the two and/or get
asio's use_future to return a boost::future.

My understanding is that std::futures are pretty much a re-namespaced
boost::future (aka, boost's implementation was standardised).

My compiler (clang, Apple LLVM version 8.0.0 (clang-800.0.42.1),
bundled with Xcode 8.2.1) doesn't come with experimental/future (see
http://en.cppreference.com/w/cpp/experimental/when_all)

Besides re-implementing all the "magic" in asio/impl/use_future.hpp,
is there a way to mix these future types?



On Wed, Jan 25, 2017 at 1:16 PM, Gavin Lambert <gav...@compacsort.com> wrote:

> FWIW, the top Google hit for "boost future" links to the 1.55 docs (which do
> not have when_all), but there is indeed a big yellow box at the top of the
> page which links to the latest version.
>
> Perhaps it didn't look enough like a hyperlink so you mentally skipped it.
> :)

Now that you've told me that its there I can't miss it. Perhaps I just
mentally skipped it before as it doesn't look much like a link and is
inline with the header.

Chris Glover

unread,
Jan 28, 2017, 12:09:42 PM1/28/17
to boost...@lists.boost.org
On Thu, 26 Jan 2017 at 22:41 Adam Zegelin <ad...@zegelin.com> wrote:
Next problem :)

asio's use_future returns a std::future. when_all() expects boost::futures.
I can't find an obvious way to convert between the two and/or get
asio's use_future to return a boost::future.


ASIO is set up to solve this -- you can write use_boost_future in the same way that asio::use_future is written, but make it return a boost::future instead.

You should be able to copy and paste the ASIO implementation of use_future, and change the details to use boost::future instead.

A similar example can be found here, which adapts a custom future implementation into Chris' std::executors reference implementation. This is same author as ASIO and so it uses a similar technique. The code may even work as is with just a change of the namespace from std::experimental to boost::asio.


Good Luck!

-- chris

Marat Abrarov

unread,
Jan 30, 2017, 3:48:08 PM1/30/17
to boost...@lists.boost.org
Hi Adam,

> I'd like to write data to these streams then wait for everyone to reply
before performing computation on the resulting data.

What about Chris's coinvoke
(http://blog.think-async.com/2008/10/asynchronous-forkjoin-using-asio.html)?
Could it help for you case?

Regards,
Marat Abrarov.
Reply all
Reply to author
Forward
0 new messages