[Boost-users] Boost.Asio -- combining sync read with async read

1,280 views
Skip to first unread message

Sameer Parekh

unread,
Oct 14, 2009, 8:53:58 AM10/14/09
to boost...@lists.boost.org, a-te...@googlegroups.com
Hello everyone! I am writing a client for a protocol which goes something like this:

The client connects to the server, and it submits "orders" -- every time it gets an order it gets back an "ack". However, the server might asynchronously send a "fill" at any time. So I want to be listening for a fill even while the client might be doing other things (listening to another server for one-way data stream. The client then chooses to submit an order or not based on the information it gets in the datastream, or it might choose to submit an order if it receives a fill.)

I thought the way to do it would be to do

async_read_until() and have it call my fill handler when it receives something.

Then the function that submits orders would call

write()

and then block waiting for the ack message using

read_until()

However, when I implement that, the client never seems to notice that a fill message is sent. It receives the acks just fine, and it successfully reads the datafeed coming in from the other tcp connection. I thought maybe the read_until() would cancel the async_read_until() so after I received the ack message I called async_read_until() again, but that didn't work either...

(The server works as it should, I tested it with telnet.)

Is it possible to mix sync/async reads in this way? Can anyone tell if I am doing something wrong? Do I need to do something else to make this work? 

Thanks, I appreciate your help!

Here is the code in question:

(as an aside, is it bad form to call connect() in the constructor? Should I call connect() and setup the async_read_until() in a separate start() method?)


MarketConnectionTCP::MarketConnectionTCP(boost::asio::io_service &io_service_, std::string ip_,
unsigned short agentPort_, unsigned short feedPort_) :
_io_service(io_service_), _agentSocket(_io_service), _feedSocket(_io_service),
_timer(_io_service, boost::posix_time::seconds(1))
{

boost::asio::ip::address ipAddress(boost::asio::ip::address::from_string(
ip_));

tcp::endpoint agentEndpoint(ipAddress, agentPort_);
Logger::stream() << "Trying to connect to agent:" << agentEndpoint
<< std::endl;

_agentSocket.connect(agentEndpoint);
Logger::stream() << "Agent connected:" << _agentSocket.remote_endpoint()
<< std::endl;

boost::asio::ip::tcp::endpoint feedEndpoint(ipAddress, feedPort_);

Logger::stream() << "Trying to connect to feed:" << feedEndpoint
<< std::endl;
_feedSocket.connect(feedEndpoint);

Logger::stream() << "Feed connected:" << _feedSocket.remote_endpoint()
<< std::endl;

boost::asio::async_read_until(_agentSocket, _fillBuf, "\r\n", boost::bind(
&MarketConnectionTCP::handleReadFill, this,
boost::asio::placeholders::error));
Logger::stream() << "Listening on agent connection:"
<< _agentSocket.remote_endpoint() << std::endl;

boost::asio::async_read_until(_feedSocket, _feedBuf, "\r\n", boost::bind(
&MarketConnectionTCP::handleReadFeed, this,
boost::asio::placeholders::error));
Logger::stream() << "Listening on feed connection:"
<< _feedSocket.remote_endpoint() << std::endl;

_timer.async_wait(boost::bind(&MarketConnectionTCP::handleTimer, this,
boost::asio::placeholders::error));
Logger::stream() << "Timer is waiting" << std::endl;

}


OrderID MarketConnectionTCP::sendOrder(const Order &order)
{
boost::asio::streambuf orderBuf;
std::ostream orderStream(&orderBuf);
orderStream << order << "\r\n";

// This blocks until we get an ack with the order id
Logger::stream() << "Sending order to:" << _agentSocket.remote_endpoint()
<< ":" << order << std::endl;
boost::asio::write(_agentSocket, orderBuf);

Logger::stream() << "Waiting for ack from:"
<< _agentSocket.remote_endpoint() << std::endl;

boost::asio::streambuf ackBuf;
std::istream ackStream(&ackBuf);
boost::asio::read_until(_agentSocket, ackBuf, "\r\n");

OrderID id;
ackStream >> id;
Logger::stream() << "Received ack from:" << _agentSocket.remote_endpoint()
<< ":" << id << std::endl;

// listen again
boost::asio::async_read_until(_agentSocket, _fillBuf, "\r\n",
boost::bind(&MarketConnectionTCP::handleReadFill, this,
boost::asio::placeholders::error));
Logger::stream() << "Listening on agent connection:"
<< _agentSocket.remote_endpoint() << std::endl;

return id;
}

void MarketConnectionTCP::handleReadFill(const boost::system::error_code& err)
{
if (!err)
{
std::istream fillStream(&_fillBuf);
Fill newFill;
fillStream >> newFill;

if (newFill.isValid())
{
Logger::stream() << "Fill received on:"
<< _agentSocket.remote_endpoint() << ":" << newFill
<< std::endl;
receiveFill(newFill);
}
else
{
Logger::stream() << "Invalid fill received on:"
<< _agentSocket.remote_endpoint() << std::endl;
}

// listen again
boost::asio::async_read_until(_agentSocket, _fillBuf, "\r\n",
boost::bind(&MarketConnectionTCP::handleReadFill, this,
boost::asio::placeholders::error));
Logger::stream() << "Listening on agent connection:"
<< _agentSocket.remote_endpoint() << std::endl;
}
else
{
delete this;
}
}

Igor R

unread,
Oct 14, 2009, 10:02:45 AM10/14/09
to boost...@lists.boost.org
async_read_until() and have it call my fill handler when it receives something.

Then the function that submits orders would call

write()

and then block waiting for the ack message using

read_until()

However, when I implement that, the client never seems to notice that a fill message is sent. It receives the acks just fine, and it successfully reads the datafeed coming in from the other tcp connection. I thought maybe the read_until() would cancel the async_read_until() so after I received the ack message I called async_read_until() again, but that didn't work either...

(The server works as it should, I tested it with telnet.)

Is it possible to mix sync/async reads in this way? Can anyone tell if I am doing something wrong? Do I need to do something else to make this work? 
 
It's ok to use both sync. and async. calls, but you should not call boost::asio::async_read_until() and/or boost::asio::async_read() until previous read is complete (for the same socket/buffer). Besides, pay attention that reading "until" might read more data than you expect.

Sameer Parekh

unread,
Oct 14, 2009, 2:56:50 PM10/14/09
to boost...@lists.boost.org
>
> No, it will read anything available in the socket and put it in the
> buffer.
> It can be 1 line, 2 lines or 3.25 lines. What I mean is that you
> have to be
> aware of the fact that the buffer might contain more data than you
> "requested" from read_until.

Aha, thanks Igor. So my handler needs to be able to deal with a buffer
that contains more than one line. That's not so bad, I can just run
the processing code for each line until the buffer is empty, that
should be fine.

However, what concerns me is if the handler receives a buffer that has
just 2.5 lines in it. All the messages from the server are going to be
one line long. So if the server sends three messages in succession but
say there is some network hiccup partway through the 3rd line, then my
client will only receive 2.5 lines, and the handler will get called
with the partial line? Then the handler has to cache the partial line
so that when the rest of the line comes through it can paste them
together again? That seems hard...

thanks,
-s
_______________________________________________
Boost-users mailing list
Boost...@lists.boost.org
http://lists.boost.org/mailman/listinfo.cgi/boost-users

Igor R

unread,
Oct 14, 2009, 3:06:11 PM10/14/09
to boost...@lists.boost.org
However, what concerns me is if the handler receives a buffer that has just 2.5 lines in it. All the messages from the server are going to be one line long. So if the server sends three messages in succession but say there is some network hiccup partway through the 3rd line, then my client will only receive 2.5 lines, and the handler will get called with the partial line? Then the handler has to cache the partial line so that when the rest of the line comes through it can paste them together again? That seems hard...
 
Right. The most simple way to accomplish this is to use asio::streambuf as a buffer. When you process the input data, you call consume() function only for the amout you've really processed, and the rest of the data remains unused in the streambuf - until the next time you get something from the server (look at the asio examples with streambuf).

Sameer Parekh

unread,
Oct 14, 2009, 3:40:16 PM10/14/09
to boost...@lists.boost.org, a-te...@googlegroups.com

> Right. The most simple way to accomplish this is to use
> asio::streambuf as a
> buffer. When you process the input data, you call consume() function
> only
> for the amout you've really processed, and the rest of the data
> remains
> unused in the streambuf - until the next time you get something from
> the
> server (look at the asio examples with streambuf).

Yes, I'm using the streambuf, and then constructing an istream from
the streambuf, and then calling getline on the istream. (The chat
client/server was the primary example that I used to build my app.)

If I have 2.5 lines in the buffer then the first two calls to getline
will give me the first two lines, but won't the third call to getline
get me
the half line, and consume that half line from the streambuf? Do I
need to process the streambuf in a more manual fashion rather than
calling getline on the istream?

Also to make sure I understand it how it works with the streambuf,
that I am understanding you correctly: if I was just processing one
line in the async_handler, and the first time the server sends me 3
lines, then the handler is called and will process the first line
sent. Then the server sends another 3 lines, the async handler is
called and the client will process the 2nd line that it received, not
the 4th line, right?

Thanks for your help,
-Sameer

Igor R

unread,
Oct 14, 2009, 4:03:54 PM10/14/09
to boost...@lists.boost.org
> If I have 2.5 lines in the buffer then the first two calls to getline will give me the first two lines, but won't the third call to getline get me the half line, and consume that half line from the streambuf? Do I need to process the streambuf in a more manual fashion rather than calling getline on the istream?

If this is the case in your application (i.e. the server might send few lines in a bunch), then I think yes, you have to inspect the sterambuf manually.
Actually, this is the reason I never used read_until - in most cases it doesn't make your code simpler. The one case where it does is when your protocol has a text header and a body (like in HTTP).



> if I was just processing one line in the async_handler, and the first time the server sends me 3 lines, then the handler is called and will process the first line sent. Then the server sends another 3 lines, the async handler is called and the client will process the 2nd line that it received, not the 4th line, right?

Right, the data is added to the "get" area of the streambuf, you don't loose anything. But if your server would not send anything more, there would not be any trigger to continue processing the remained data.
Reply all
Reply to author
Forward
0 new messages