[Boost-users] [Asio] Socket Read/Write Thread-Safety

2,715 views
Skip to first unread message

Timothy Liang

unread,
Jan 28, 2011, 12:48:31 AM1/28/11
to boost...@lists.boost.org
Hi, I'm implementing a full-duplex protocol over TCP/IP using Boost.Asio.
Multiple threads will be calling 'io_service::run_one()', so I have to
serialize my completion handlers. Since I need to be reading and writing on
a socket at the same time, can I use the 'boost::asio::async_read(...)' and
'boost::asio::async_write(...)' functions? The documentation states that
these are implemented in terms of calls to the stream's async_read_some and
async_write_some functions, and the 'basic_stream_socket' class isn't
thread-safe.

Thanks,
Timothy


_______________________________________________
Boost-users mailing list
Boost...@lists.boost.org
http://lists.boost.org/mailman/listinfo.cgi/boost-users

Igor R

unread,
Jan 29, 2011, 4:44:37 PM1/29/11
to boost...@lists.boost.org
> Hi, I'm implementing a full-duplex protocol over TCP/IP using Boost.Asio.
> Multiple threads will be calling 'io_service::run_one()', so I have to
> serialize my completion handlers.  Since I need to be reading and writing on
> a socket at the same time, can I use the 'boost::asio::async_read(...)' and
> 'boost::asio::async_write(...)' functions?  The documentation states that
> these are implemented in terms of calls to the stream's async_read_some and
> async_write_some functions, and the 'basic_stream_socket' class isn't
> thread-safe.

Your statements are correct, but I'm not sure I got the question. If
you ask whether you may call async_read and async_write simultaneously
on the same socket, then the answer is yes, of course.

Dean Michael Berris

unread,
Jan 30, 2011, 2:35:07 AM1/30/11
to boost...@lists.boost.org
On Fri, Jan 28, 2011 at 1:48 PM, Timothy Liang <timot...@msn.com> wrote:
> Hi, I'm implementing a full-duplex protocol over TCP/IP using Boost.Asio.
> Multiple threads will be calling 'io_service::run_one()', so I have to
> serialize my completion handlers.  Since I need to be reading and writing on
> a socket at the same time, can I use the 'boost::asio::async_read(...)' and
> 'boost::asio::async_write(...)' functions?  The documentation states that
> these are implemented in terms of calls to the stream's async_read_some and
> async_write_some functions, and the 'basic_stream_socket' class isn't
> thread-safe.
>

I think you want multiple threads calling io_service::run instead of run_one.

To serialize your connection-related completion handlers you'd want to
wrap them in a Boost.Asio strand.

HTH

--
Dean Michael Berris
about.me/deanberris

Timothy Liang

unread,
Jan 29, 2011, 10:49:09 PM1/29/11
to boost...@lists.boost.org
> Your statements are correct, but I'm not sure I got the question. If you
> ask whether you may call async_read and async_write simultaneously on the
> same socket, then the answer is yes, of course.

The 'boost::asio::async_read(...)' and 'boost::asio::async_write(...)'
functions might call async_read_some and async_write_some concurrently in
some scenarios, as they do not synchronize accesses to the stream object
that I'm passing to them as a reference. A look in their implementations
confirms this: if the stream's async_read_some or async_write_some didn't
satisfy the completion condition, it tries again, using the same reference
to the stream object. With multiple threads calling io_service::run_one(),
the result can be one thread calling async_read_some while another is
calling async_write_some at the same time. This seems to prevent me from
using async_read and async_write at all, so I'm wondering if the stream has
some undocumented thread-safety guarantees, or if there's a proper way to
achieve the same effect.

Timothy Liang

unread,
Jan 31, 2011, 1:55:11 AM1/31/11
to boost...@lists.boost.org
> I think you want multiple threads calling io_service::run instead of
> run_one.

I need to test for a program interruption signal, so I can't use
io_service::run.

> To serialize your connection-related completion handlers you'd want to
> wrap them in a Boost.Asio strand.

I'm not concerned about my own completion handlers. The handlers in Asio's
composed operations is the problem. And I have no way to wrap those in
strands.

Igor R

unread,
Jan 31, 2011, 2:40:03 AM1/31/11
to boost...@lists.boost.org
> I'm not concerned about my own completion handlers.  The handlers in Asio's
> composed operations is the problem.  And I have no way to wrap those in
> strands.

<<In the case of composed asynchronous operations, such as
async_read() or async_read_until(), if a completion handler goes
through a strand, then all intermediate handlers should also go
through the same strand. This is needed to ensure thread safe access
for any objects that are shared between the caller and the composed
operation (in the case of async_read() it's the socket, which the
caller can close() to cancel the operation). This is done by having
hook functions for all intermediate handlers which forward the calls
to the customisable hook associated with the final handler>>
http://www.boost.org/doc/libs/1_45_0/doc/html/boost_asio/overview/core/strands.html

Timothy Liang

unread,
Jan 31, 2011, 6:30:23 AM1/31/11
to boost...@lists.boost.org
> <<In the case of composed asynchronous operations, such as async_read() or
> async_read_until(), if a completion handler goes through a strand, then
> all intermediate handlers should also go through the same strand. This is
> needed to ensure thread safe access for any objects that are shared
> between the caller and the composed operation (in the case of async_read()
> it's the socket, which the caller can close() to cancel the operation).
> This is done by having hook functions for all intermediate handlers which
> forward the calls to the customisable hook associated with the final
> handler>>
> http://www.boost.org/doc/libs/1_45_0/doc/html/boost_asio/overview/core/strands.html

Ah! It also says, "The io_service::strand::wrap() function creates a new
completion handler that defines asio_handler_invoke so that the function
object is executed through the strand." And the composed operations have a
hook on their intermediate handlers that call asio_handler_invoke with the
completion handler's context. So I simply need to pass the wrapped handler
object directly to the composed operations as the completion handler. Cool.

Now I'm beginning to wonder if this will work with my design. I want my
program to run handlers as concurrently as possible. I'm currently using a
single io_service and setting up a thread pool like the HTTP Server 3
example. Since the io_service doesn't know if a handler would be blocked by
a strand, it could needlessly assign an about-to-be-blocked handler to a
free thread. How do I fix that?

Dean Michael Berris

unread,
Jan 31, 2011, 6:39:31 AM1/31/11
to boost...@lists.boost.org

It doesn't do that. It's smart enough to see that it's in a given
strand which may be blocked so it moves on to the next available
scheduled handler. I do it all the time and I haven't seen it be the
bottleneck for anything I've developed with Boost.Asio even in the
early days and leading up to especially now.

Give it a shot and then measure to identify which part of the solution
is causing you problems.

HTH

--
Dean Michael Berris
about.me/deanberris

Timothy Liang

unread,
Jan 31, 2011, 7:30:47 AM1/31/11
to boost...@lists.boost.org
> It doesn't do that. It's smart enough to see that it's in a given strand
> which may be blocked so it moves on to the next available scheduled
> handler. I do it all the time and I haven't seen it be the bottleneck for
> anything I've developed with Boost.Asio even in the early days and leading
> up to especially now.
>
> Give it a shot and then measure to identify which part of the solution is
> causing you problems.

Wow, you're right! Strands clearly do more than they appear to do. Thanks.

Gianni Ambrosio

unread,
Apr 5, 2011, 9:20:06 AM4/5/11
to boost...@lists.boost.org
Hi All,
I'm implementing a tcp socket to be used with a client and server
derived classes. I'm testing the code with boost test but I get the
following error:

"The I/O operation has been aborted because of either a thread exit or
an application request"

Here is the test code.

std::string host = "localhost";
unsigned short port = 20001;

//Server* server = new Server();
//server->acceptAsyncOnThread(port);
Server server;
server.acceptAsyncOnThread(port);

Client client;
TcpSessionPtr connection = client.connect(host, port);

bool accepted = false;
boost::timer timer;
while (!accepted && timer.elapsed() < 1.0)
{
//accepted = server->hasAccepted();
accepted = server.hasAccepted();
}
//delete server;

BOOST_CHECK_EQUAL( accepted, true );

Can anybody please explain me the reason of that error? Moreover, do you
expect that using the commented code (i.e. the Server instance is
created on the heap and deleted before the BOOST_CHECK_EQUAL call) may
solve the problem? Why?

Regards
Gianni

Igor R

unread,
Apr 5, 2011, 9:36:42 AM4/5/11
to boost...@lists.boost.org
> "The I/O operation has been aborted because of either a thread exit or an
> application request"

Perhaps the thread that invoked async. operation exited before the
operation was completed?

Gianni Ambrosio

unread,
Apr 5, 2011, 10:55:45 AM4/5/11
to boost...@lists.boost.org
Il 4/5/2011 3:36 PM, Igor R ha scritto:
>> "The I/O operation has been aborted because of either a thread exit or an
>> application request"
> Perhaps the thread that invoked async. operation exited before the
> operation was completed?

I'm not an expert so I try to explain my scenario.
The server runs an async accept on a thread with basically the following
code:


boost::asio::ip::tcp::endpoint endpoint =
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), iPort);
acceptor.open(endpoint.protocol());

acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
acceptor.bind(endpoint);
acceptor.listen();
TcpSessionPtr connection(new TcpSession(io_service));
acceptor.async_accept(connection->getSocket(),
boost::bind(&BoostTcpSocket::handleAccept, this, connection,
boost::asio::placeholders::error));


in the handleAccept() callback a new async accept is called to allow the
server to be able to accept other clients. Here is the code:


TcpSessionPtr connection(new TcpSession(io_service));
acceptor.async_accept(connection->getSocket(),
boost::bind(&BoostTcpSocket::handleAccept, this, connection,
boost::asio::placeholders::error));


So, it sounds reasonable that if the application ends an async accept
operation may be pending. Now, how could I cancel a pending operation?
Is there another way to solve the problem?

Regards
Gianni

Igor R

unread,
Apr 5, 2011, 2:32:19 PM4/5/11
to boost...@lists.boost.org
> The server runs an async accept on a thread with basically the following
> code:
>   boost::asio::ip::tcp::endpoint endpoint =
> boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), iPort);
>   acceptor.open(endpoint.protocol());
>   acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
>   acceptor.bind(endpoint);
>   acceptor.listen();
>   TcpSessionPtr connection(new TcpSession(io_service));
>   acceptor.async_accept(connection->getSocket(),
> boost::bind(&BoostTcpSocket::handleAccept, this, connection,
> boost::asio::placeholders::error));

Do you mean that the above code runs not in io_service::run() thread?
When does this thread exit, just after it executes the above code?


> So, it sounds reasonable that if the application ends an async accept
> operation may be pending. Now, how could I cancel a pending operation? Is
> there another way to solve the problem?

You can close() your acceptor, but its destructor closes it anyway.

Gianni Ambrosio

unread,
Apr 6, 2011, 3:49:41 AM4/6/11
to boost...@lists.boost.org
Il 4/5/2011 8:32 PM, Igor R ha scritto:
>> The server runs an async accept on a thread with basically the following
>> code:
>> boost::asio::ip::tcp::endpoint endpoint =
>> boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), iPort);
>> acceptor.open(endpoint.protocol());
>> acceptor.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
>> acceptor.bind(endpoint);
>> acceptor.listen();
>> TcpSessionPtr connection(new TcpSession(io_service));
>> acceptor.async_accept(connection->getSocket(),
>> boost::bind(&BoostTcpSocket::handleAccept, this, connection,
>> boost::asio::placeholders::error));
> Do you mean that the above code runs not in io_service::run() thread?
> When does this thread exit, just after it executes the above code?
>

No, I'm sorry I forgot one line at the end:

thread = boost::thread(boost::bind(&boost::asio::io_service::run,
&io_service));

thread is a boost::thread member variable of the TcpSocket class I'm
trying to implement.
io_service and acceptor are also member variables of TcpSocket class.

> You can close() your acceptor, but its destructor closes it anyway.

Yes, you are right in fact in the TcpSocket destructor I already call:

if (acceptor.is_open()) acceptor.close();

Regards
Gianni

Igor R

unread,
Apr 6, 2011, 4:03:57 AM4/6/11
to boost...@lists.boost.org
> No, I'm sorry I forgot one line at the end:
>
> thread = boost::thread(boost::bind(&boost::asio::io_service::run,
> &io_service));

Ok, but this line just creates a thread and passes to it
io_service::run() -- it doesn't block your current thread, where you
issued async_accept.
Ensure that the thread where you *call* async_accept() doesn't ends
before the operation is complete.

Gianni Ambrosio

unread,
Apr 6, 2011, 5:20:58 AM4/6/11
to boost...@lists.boost.org
Il 4/6/2011 10:03 AM, Igor R ha scritto:
>> No, I'm sorry I forgot one line at the end:
>>
>> thread = boost::thread(boost::bind(&boost::asio::io_service::run,
>> &io_service));
> Ok, but this line just creates a thread and passes to it
> io_service::run() -- it doesn't block your current thread, where you
> issued async_accept.
> Ensure that the thread where you *call* async_accept() doesn't ends
> before the operation is complete.

Igor, thanks for your patience.

You are right, the piece of code I posted does not block the thread. May
be it's better going back to the test I posted at first:

Server server;
server.acceptAsyncOnThread(port);

Client client;
TcpSessionPtr connection = client.connect(host, port);

bool accepted = false;
boost::timer timer;
while (!accepted && timer.elapsed() < 1.0)
{

accepted = server.accepted();
}

Here the Server class derives from TcpSocket and the
acceptAsyncOnThread() basically calls the code I posted in the prevoius
email including the thread. The server.accepted() returns true if the
callback of the accept_async is called (and it works correctly indeed).
The process is blocked by a loop with a timer (as you can see from the
code). I would be glad if you could find the error in that code.

Regards
Gianni

Reply all
Reply to author
Forward
0 new messages