std::string timed_read_line(boost::asio::streambuf& buffer, boost::asio::ip::tcp::socket& sock) {
namespace asio = boost::asio;
using boost::system::error_code;
std::condition_variable cv;
std::mutex mut;
error_code op_error = error_code();
bool done = false;
auto get_lock = [&] { return std::unique_lock<std::mutex>(mut); };
auto read_handler = [&](error_code ec, std::size_t sz)
{
auto lock = get_lock();
if (not done)
{
done = true;
op_error = ec;
}
lock.unlock();
cv.notify_one();
};
auto timer_handler = [&](error_code ec)
{
auto lock = get_lock();
if (not done)
{
done = true;
op_error = asio::error::timed_out;
}
};
asio::async_read_until(sock, buffer, '\n', read_handler);
asio::deadline_timer timer(sock.get_io_context(), boost::posix_time::seconds(3));
timer.async_wait(timer_handler);
auto lock = get_lock();
cv.wait(lock, [&]{return done; });
if (op_error)
{
throw boost::system::system_error(op_error);
} else {
std::istream is(std::addressof(buffer));
std::string result;
std::getline(is, result);
return result;
}
};
On 19 March 2018 at 10:33 t...@quarendon.net wrote:Attempting to understand the implementation it feels like this could be made to work.Interestingly I notice that the basic_socket_streambuf class (in 1.67 at least), in accordance with the N4656 draft specification, DOES have support for timeouts. It implements the overflow and underflow calls using lower level asio calls, somewhat shadowing the implementation of socket_ops::sync_recv, but, crucially, not attempting an initial blocking read, then passing a timeout to the "poll_read" call:// Wait for socket to become ready.if (detail::socket_ops::poll_read(socket().native_handle(), 0, timeout(), ec_) < 0)
This would seem to suggest that fundamentally, reading with a timeout can be made to work, as it works fine here.
std::string read_line_with_timeout(boost::asio::ip::tcp::socket &sock, boost::asio::streambuf &buf)
{
namespace asio = boost::asio;
// these statics could of course be encapsulated into a service object
static asio::io_context executor;
static asio::io_context::work work(executor);
static std::thread mythread{[&] { executor.run(); }};
auto temp_socket = asio::generic::stream_protocol::socket(executor,
sock.local_endpoint().protocol(),
dup(sock.native_handle()));
auto timer = asio::deadline_timer(executor, boost::posix_time::milliseconds(3000));
std::condition_variable cv;
std::mutex m;
int done_count = 0;
boost::system::error_code err;
auto get_lock = [&] { return std::unique_lock<std::mutex>(m); };
auto aborted = [](boost::system::error_code const &ec) { return ec == boost::asio::error::operation_aborted; };
auto common_handler = [&](auto ec)
{
if (not aborted(ec))
{
auto lock = get_lock();
if (done_count++ == 0) {
err = ec;
boost::system::error_code sink;
temp_socket.cancel(sink);
timer.cancel(sink);
}
lock.unlock();
cv.notify_one();
}
};
async_read_until(temp_socket, buf, '\n', [&](auto ec, auto&&...) { common_handler(ec); });
timer.async_wait([&](auto ec)
{
common_handler(ec ? ec : asio::error::timed_out);
});
auto lock = get_lock();
cv.wait(lock, [&] { return done_count == 2; });
if (err) throw boost::system::system_error(err);
std::istream is(&buf);
std::string result;
std::getline(is, result);
return result;
}
Don't mix async and sync. Once you go async, you have to go async "all
the way down".
This means that your async handlers must never make blocking calls
themselves.
If your sync and async operations are both on the same socket, then you
must convert them all to async -- if you don't like the async-callback
code style that results, look into the coroutine style instead, which
looks more like sync code while still behaving like async code.
If your sync calls operate on different objects and you can't convert
those blocking calls to async calls (eg. they're calling some library
API that doesn't provide async), then you should make a "sync worker
thread" that has its own separate io_service, and have your async
workers post jobs to this service, then post back completions to the
original io_service once the task is done.
It's up to you how many of these sync worker threads to create, ranging
from one global one (easy but will make everything wait for everyone
else's blocking operations), through to a small threadpool, through to
one per connection (also easy but risks thread explosion). There's no
One True Answer™, it will depend on your application's expected workload
and connection count.
> You can use a deadline_timer and cancel the synchronous read if it
> trips. It's a bit fugly though and is subject to races. It's much
> nicer to use async.
The point is though that I don't think this works. This is what I started with.
It works on Windows OK. But on Linux, calling "cancel" or "close" on a socket doesn't
cancel a synchronous read call. This is what started me down this whole route.
#include <cstdlib>
#include <sys/types.h>
#include <sys/wait.h>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
using namespace std::literals;
namespace asio = boost::asio;
using protocol = asio::ip::tcp;expected output:
void server(protocol::acceptor& acceptor, int child_pid)
{
acceptor.listen();
auto& executor = acceptor.get_io_context();
auto sock = protocol::socket(executor);
auto timer = asio::system_timer(executor);
acceptor.accept(sock);
auto read_handler = [](auto ec, auto...)
{
if (ec)
std::cerr << "read handler error: " << ec.message();
else
std::cerr << "strange - we expected an error";
};
auto timer_handler = [&](auto ec)
{
if (not ec) {
sock.cancel();
}
};
sock.async_read_some(asio::null_buffers(), read_handler);
timer.expires_after(1s);
timer.async_wait(timer_handler);
executor.run();
auto data = "foo"s;
sock.write_some(asio::buffer(data));
int status = 0;
waitpid(child_pid, & status, 0);
}
void client(asio::io_context& executor, protocol::endpoint server_endpoint)
{
protocol::socket sock(executor);
sock.connect(server_endpoint);
auto on_read = [](auto, auto) {};
sock.async_read_some(asio::null_buffers(), on_read);
executor.run();
}
int main() {
auto executor = asio::io_context();
auto acceptor = protocol::acceptor(executor);
acceptor.open(protocol::v4());
acceptor.bind(protocol::endpoint(protocol::v4(), 0));
auto server_endpoint = acceptor.local_endpoint();
executor.notify_fork(asio::io_context::fork_prepare);
int child_pid = fork();
if (child_pid < 0)
{
std::cerr << "fork failed" << std::endl;
std::exit(100);
}
else if (child_pid > 0)
{
executor.notify_fork(asio::io_context::fork_parent);
server(acceptor,child_pid);
}
else
{
executor.notify_fork(asio::io_context::fork_child);
client(executor, server_endpoint);
}
return 0;
}
You should probably not use boost to handle signals. The page about signalfd (https://linux.die.net/man/2/signalfd) has this remark: Normally, the set of signals to be received via the file descriptor should be blocked using sigprocmask(2), to prevent the signals being handled according to their default dispositions. So I would expect that boost does exactly this and therefore the synchronous read doesn't get interrupted. (If boost didn't block the signal, the behavior would be "default disposition", i.e., termination as you have observed).
So try the following: install a signal handler for SIGALRM (or any other) and do NOT wait for it using boost. Have the handler just return and see whether blocking recv got interrupted. However... it's not that simple. Signals and threads don't play nicely together: a signal will be delivered to an arbitrary thread that didn't block it.
So you should have a variable holding the thread id of the thread running your io_context::run, and from within the signal handler: