[Boost-users] ASIO sockets - 2nd write fails

723 views
Skip to first unread message

bill comment

unread,
May 6, 2011, 9:57:56 AM5/6/11
to boost...@lists.boost.org

I am having a problem writing a TCP socket message using boost:asio async_*

I am new to using boost asio sockets. reading what I could find (this list, samples, other forums) I have added boost::asio socket use to my application.

 

What works is: accept, receive, parse and reply to a message.

After my client sends a message and receives a response it remains connected and starts an async receive.

Several seconds later my server tries to send another message to the client (by calling server::write()). the code runs ok up to connection::handle_writeMessage() where error is: 2719 The file handle supplied is not valid.

 

I thought I read (links below) that using strand::wrap() to queue the async_* calls and io_service::post() to post the initial function call would allow a thread that didnt call io_service::run() to initiate the sending or a message.

http://stackoverflow.com/questions/4078484/using-boost-sockets-do-i-need-only-one-io-service

http://stackoverflow.com/questions/4090567/better-understanding-boosts-chat-client-example

http://groups.google.com/group/boost-list/browse_thread/thread/124a643a06d67fd9

 

did I interpret this incorrectly? Did I code it wrong?

 

Thank you for your time and advice.

 

 

to develop my code I started with the http3 and chat server samples. Changes include:

- server::threads is a class member

- server::run() returns without waiting for threads to stop so I start other threads in my program

- added server::stop() to stop threads.

- server::write() is called from another thread.

 

- connection::do_writeMessage() and connection::handle_writeMessage() are from the chat server sample.

 

A summary of my server code is below. its long, but should be familiar from the samples.

Server.hpp

  std::vector<boost::shared_ptr<boost::thread> > threads;

 

server::server

  : thread_pool_size_(2), // 1 thread for wait, read, reply. 1 for unsolicited writes.

    acceptor_(io_service_),

    new_connection_(new connection(io_service_, request_handler_)),

    request_handler_()

{

async_accept(new_connection_->socket(),

      boost::bind(&server::handle_accept, this,

        boost::asio::placeholders::error));

}

 

void server::run()

{

   // 1 thread for wait, read, reply. 1 for unsolicited writes.

 

  // Create a pool of threads to run all of the io_services.

  //std::vector<boost::shared_ptr<boost::thread> > threads;

  for (std::size_t i = 0; i < thread_pool_size_; ++i)

  {

    boost::shared_ptr<boost::thread> thread(new boost::thread(

          boost::bind(&boost::asio::io_service::run, &io_service_)));

    threads.push_back(thread);

  }

}

 

 

void server::stop()

{

  io_service_.stop();

 

  // Wait for all threads in the pool to exit.

  for (std::size_t i = 0; i < threads.size(); ++i)

  {

    threads[i]->join();

  }

}

 

 

server::handle_accept (const boost::system::error_code& e)

{

  if (!e)

  {

     // read, parse and reply

    new_connection_->start();

 

    new_connection_.reset(new connection(io_service_, request_handler_));

 

    // ready to accept another connection

     cout << "waiting for new accept ..." << endl;

    acceptor_.async_accept(new_connection_->socket(),

        boost::bind(&server::handle_accept, this,

          boost::asio::placeholders::error));

 }

}

 

// public

// called from another thread in my program to send an unsolicited message to my connected client

void server::write(const string& msg)

{

   io_service_.post(boost::bind(&connection::do_writeMessage, new_connection_, msg));

}

 

- - - - - - - - - -

connection::connection(boost::asio::io_service& io_service,

    request_handler& handler)

  : strand_(io_service),

    socket_(io_service),

    request_handler_(handler)

{

   _running = true;

}

 

void connection::start()

{

   buffer_.fill (0);

  socket_.async_read_some(boost::asio::buffer(buffer_),

      strand_.wrap(

        boost::bind(&connection::handle_read, shared_from_this(),

          boost::asio::placeholders::error,

          boost::asio::placeholders::bytes_transferred)

       )

    );

}

 

void connection::handle_read(const boost::system::error_code& e,

    std::size_t bytes_transferred)

{

  if (!e)

  {

     // parse what we have so far. request_parser_.parse() is my code and it works correctly.

     // when successful result==true, request

    boost::tribool result;

    request_.reset();

    boost::tie(result, boost::tuples::ignore) = request_parser_.parse(

        request_, buffer_.data(), buffer_.data() + bytes_transferred);

 

    if (result)

    {

       // we read an entire message

 

       // this is my code to process the message and put fill in the reply_ buffer.

       request_handler_.handle_request(request_, reply_);

 

      // send the reply

      boost::asio::async_write(socket_, reply_.to_buffers(),

          strand_.wrap(

            boost::bind(&connection::handle_writeResponse, shared_from_this(),

              boost::asio::placeholders::error)

              )

              );

 

    }

    else if (!result)

    {

       // there was an error reading the message

      boost::asio::async_write(socket_, reply_.to_buffers(),

          strand_.wrap(

            boost::bind(&connection::handle_writeResponse, shared_from_this(),

              boost::asio::placeholders::error)

              )

              );

    }

    else

    {

       // read some ok. need to read some more.

      socket_.async_read_some(boost::asio::buffer(buffer_),

          strand_.wrap(

            boost::bind(&connection::handle_read, shared_from_this(),

              boost::asio::placeholders::error,

              boost::asio::placeholders::bytes_transferred)

              )

              );

    }

  }

  else

  {

      cout << "connection::handle_read: error: " << e.value() << " " << e.message() << endl;

  }

 

  // If an error occurs then no new asynchronous operations are started. This

  // means that all shared_ptr references to the connection object will

  // disappear and the object will be destroyed automatically after this

  // handler returns. The connection class's destructor closes the socket.

}

 

void connection::handle_writeResponse(const boost::system::error_code& e)

{

   if (!e)

   {

     // test. This message is received by my client.

      do_writeMessage ("hello world");

 

     // ready to read more

     start();

   }

   else

   {

      cout << "connection::handle_writeMessage: error sending to client: " << e.value() << " " << e.message() << endl;

   }

 

  // No new asynchronous operations are started. This means that all shared_ptr

  // references to the connection object will disappear and the object will be

  // destroyed automatically after this handler returns. The connection class's

  // destructor closes the socket.

}

 

 

- - - - - - - - - -

 

// call to this function is posed to io_service from server::write

// public

void connection::do_writeMessage(string msg)

{

  

   bool write_in_progress = !_messagesToWrite.empty();

   _messagesToWrite.push_back(msg);

 

   // if there are any in the queue, handle_writeMessage is already sending them.

   if (write_in_progress)

   {

      cout << "connection::do_writeMessage: write in progress." << endl;

   }

   else

   {

      // start the async write.

      boost::asio::async_write(socket_,

         boost::asio::buffer(_messagesToWrite.front().data(), _messagesToWrite.front().length()),

      strand_.wrap(

         boost::bind(&connection::handle_writeMessage, shared_from_this()/*this*/,

         boost::asio::placeholders::error)

         )

         );

   }

}

 

 

// async write is done. if no error and more messages to write, write them, too.

void connection::handle_writeMessage(const boost::system::error_code& error)

{

   if (!error)

   {

      cout << "connection::handle_writeMessage: message written ok." << endl;

      // remove the message we just wrote

      _messagesToWrite.pop_front();

 

      // any more?

      if (!_messagesToWrite.empty())

      {

         // write the next one and call this function when that'd done.

         boost::asio::async_write(socket_,

            boost::asio::buffer(_messagesToWrite.front().data(), _messagesToWrite.front().length()),

      strand_.wrap(

            boost::bind(&connection::handle_writeMessage, shared_from_this()/*this*/,

            boost::asio::placeholders::error)

            )

            );

      }

      else

      {

         cout << "connection::handle_writeMessage: no more messages" << endl;

      }

   }

   else

   {

      cout << "connection::handle_writeMessage: error: " << error.value() << " " << error.message() << endl;

   }

 

   // let this async chain end.

}

 

- - - - - - - - - -

main()

{

pTCPServer = new http::server3::server (argv[TCPPortNumber]);

if (pTCPServer)

{

pTCPServer->run();

}

 

 

// start thread that will initiate 2nd message

 … … …

}


Igor R

unread,
May 6, 2011, 10:36:13 AM5/6/11
to boost...@lists.boost.org
> Several seconds later my server tries to send another message to the client
> (by calling server::write()). the code runs ok up to
> connection::handle_writeMessage() where error is: 2719 “The file handle
> supplied is not valid”.

FWIW, this usually means that the socket is already closed.
_______________________________________________
Boost-users mailing list
Boost...@lists.boost.org
http://lists.boost.org/mailman/listinfo.cgi/boost-users

Cliff Green

unread,
May 6, 2011, 10:51:33 AM5/6/11
to boost...@lists.boost.org
Hi Bill - without looking at your code in detail (so I'm not sure where the bug is), you might want to consider having a single thread running the io_service, and have the callbacks / completion handlers forward any significant work through a wait queue to a thread pool. This isolates all of the Asio interaction from the code doing the app work, which is going to greatly simplify your thread safety and your Asio related networking code. The networking code then does not have to deal with multiple threads, strands, mutex locks, and OS level socket thread safety, and the app code only has to mutex protect the data that is shared between multiple threads.
 
There's many wait queue examples available, or I can share one I adapted from an Anthony Williams (Boost Thread author) example.
 
Make sure your completion handler lifetimes are correct, which it looks like you've handled with "shared_from_this".
 
Otherwise, as Igor already mentioned, you probably have a socket that is already closed.
 
Cliff
 

肖锋

unread,
May 9, 2011, 3:44:34 AM5/9/11
to boost...@lists.boost.org
'new_connection_' always points to an invalid socket because after a connection is established you always replace it with a new one. 

bill comment

unread,
May 9, 2011, 12:09:04 PM5/9/11
to boost...@lists.boost.org
'new_connection_' always points to an invalid socket because after a
connection is established you always replace it with a new one.

yes. i was suspecting this but didn't fully understand. thanks for confirming. 

my solution was to change my architecture to that of the ChatServer sample where a reference to my Connection object (parallel to their participant/session) is handed to a new Client object  (parallel to their room) so that it will live for the duration of the client app's connection and my server can write several responses.

thanks for the responses. 
Reply all
Reply to author
Forward
0 new messages