remote procedure never exits

91 views
Skip to first unread message

Rosa Lisin

unread,
Nov 4, 2015, 3:14:11 PM11/4/15
to CppWAMP
Hello Emile,
I am experiencing an interesting race condition in my application that uses cppwamp and crossbar.io as a wamp router.  My application calls a remote procedure within an event handler. If another event is received by the application while the remote procedure is executing, and the applicaton starts executing another event handler (by another thread), the remote procedure call (call method wamp::CoroSession) never exits (although the router returns result message).  I've set up TraceHandler to log WAMP traffic, but the problem does not happen with the tracing on!
I am wondering if you are aware of that problem.  

Regards,
Rosa 

Emile Cormier

unread,
Nov 4, 2015, 3:44:29 PM11/4/15
to CppWAMP
Rosa,

wamp::Session objects are currently not thread-safe. Only one thread may be invoking a wamp::Session member function at a time. I am considering changing it so that Session objects are more thread-safe, while possibly sacrificing some performance (namely, WAMP message objects are created every time instead of being reused).

I suspect that the TraceHandler is affecting timing in a way that the deadlock no longer manifests itself.

Boost.Asio provides ways to ensure that only one thread accesses an object. For example, you can use io_service::post() to enqueue the work that needs to be done on the wamp::Session object. You might also be able to make use of boost::io_service::strand.

If you showed me some code that reflects what you're trying to do, I might be able to give you some more guidance.

Cheers,
Emile

Rosa Lisin

unread,
Nov 4, 2015, 3:51:49 PM11/4/15
to CppWAMP
Thanks, Emile,
I'll try to come up with a simple program demonstrate the problem.
Rosa

Rosa Lisin

unread,
Nov 6, 2015, 10:48:04 AM11/6/15
to CppWAMP
Hi Emile,
I am attaching a test application (along with mycomments and more explanation) that demonstrates the issue. Please let me know if there is a problem with my approach or there is an issue in cppwamp. 
Thanks, Rosa
rpc_event_issue.tar.gz

Emile Cormier

unread,
Nov 6, 2015, 5:06:16 PM11/6/15
to CppWAMP
Thanks, Rosa. I'll try taking a look this weekend.

Emile Cormier

unread,
Nov 8, 2015, 3:46:20 PM11/8/15
to cpp...@googlegroups.com
I took a look. What's happening is that iosvc.run() is being run in the main thread, while your worker thread is simultaneously invoking operations on the CoroSession. There ends up being two threads simultaneously modifying the same CoroSession object, which puts it in an inconsistent state.

You need to understand that boost::asio::io_service is a work queue that performs work sequentially.  io_service::run() is simply a loop that continually calls io_service::poll_one until the work queue runs out of work. Since CoroSession is always reading the next available incoming WAMP message, the io_service queue never becomes empty and io_service::run() will block until you disconnect the CoroSession (or explicitly call io_serivice::stop).

All work performed by the CoroSession happens during iterations of the io_service::run() loop in the main thread. If a second worker thread wants to perform operations on the same CoroSession, that work has to be enqueued to the same io_service object that's servicing the CoroSession. For example:

int main()
{
   
AsioService iosvc;
   
auto session = CoroSession<>::create( /*...*/ );
   
    boost
::asio::spawn(iosvc, [&](boost::asio::yield_context yield)
   
{
        session
->connect(yield);
        session
->join( /*...*/ );
        session
->subscribe( /*....*/ );
   
});


   
// Spawn worker thread.
    std
::thread worker([iosvc&, session&]() {doWork(iosvc, session);}


   
// Continually perform CoroSession work on the main thread.
    iosvc
.run();


    worker
.join();

   
return 0;
}


void doWork(AsioService& iosvc, CoroSession<>::Ptr session)
{
    std
::this_thread::sleep_for(std::chrono::seconds(3));
   
    boost
::asio::spawn(iosvc, [session](boost::asio::yield_context yield)
   
{
       
// This only gets executed in the context of the main thread.
        session
->call(/*...*/, yield);
   
});

   
// Do other stuff while above coroutine runs on main thread.
   
// ...
}


Emile Cormier

unread,
Nov 8, 2015, 4:37:20 PM11/8/15
to CppWAMP
Sorry, my previous post got accidentally sent before I finished writing. Please re-read my edited post.

What is your motivation for using a second thread anyway? Is it because you're calling RPCs within event handlers? If so, then you don't need to go to the trouble of spawning a second thread. You just need to spawn a new coroutine within the event handler, and call the RPC from there. The trick is to ensure that the io_service and CoroSession objects are made available to the event handler. One such way is to write the event handler as a member function of a class that holds the io_service and CoroSession objects as data members:

class App
{
public:
   
// ...


private:
   
AsioService iosvc_;
   
CoroSession<>::Ptr session_;


   
void onEvent(Event evt)

   
{
        boost
::asio::spawn(iosvc, [this](boost::asio::yield_context yield)
       
{
            session_
->call(Rpc("Foo").withArgs(42), yield);
       
}
   
}
}



Another way is to bind the CoroSession and io_service objects using lambda functions:

void onEvent(AsioService iosvc, CoroSession<>::Ptr session, Event evt)
{

    boost
::asio::spawn(iosvc, [session](boost::asio::yield_context yield)
   
{

        session_
->call(Rpc("Foo").withArgs(42), yield);
   
});
}

int main()
{
   
AsioService iosvc;
   
CoroSession<>::Ptr session = /* ... */;
   
// ...
    session
->subscribe(
       
Topic("foo"),
       
[iosvc&, session](Event evt)
       
{
           
// Forward the event on to the "real" handler, and pass along the
           
// AsioService and CoroSession objects to that the handler may
           
// spawn a coroutine.
            onEvent
(iosvc, session, evt);
       
},
       
yield);
   
// ...
}


Yet another way is shown in this unit test case: https://github.com/ecorm/cppwamp/blob/master/test/wamptest.cpp#L1497

Hope this helps.

Emile Cormier

unread,
Nov 8, 2015, 4:39:43 PM11/8/15
to CppWAMP
You may also find this GitHub issue to be informative: https://github.com/ecorm/cppwamp/issues/85

Emile Cormier

unread,
Nov 8, 2015, 4:48:54 PM11/8/15
to CppWAMP
Sorry, I see that your MyWampSession::RunSession is similar to the doWork function I wrote above. I'll need to study it some more to see how the deadlock happens.

Emile Cormier

unread,
Nov 8, 2015, 5:52:54 PM11/8/15
to CppWAMP
I think I know what's going on. The body of the the boost::asio::spawn coroutine in MyWampSession::RunSession is not really being executed by the background thread. It's being executed (in chunks) by whichever thread happens to be calling iosvc.run(). It doesn't matter which thread spawned the coroutine. The coroutine will be executed by whichever thread(s) are calling run() on the io_service associated with the coroutine.

In your case, iosvc.run() is being called in both the main thread and the background thread. You've essentially have a 2-thread thread pool running the same io_service.

One thread will execute a portion of the coroutine until it suspends. Afterwards, it's possible for the other thread to resume the coroutine. In your case, i think it leads to the problem described this SO answer:

Also, as noted by sehe, undefined behavior may occur if the coroutine is running in one thread, acquires a mutex lock, then suspends, but resumes and runs in a different thread and releases the lock. To avoid this, ideally one should not hold locks while the coroutine suspends. However, if it is necessary, one must guarantee that the coroutine runs within the same thread when it is resumed, such as by only running the io_service from a single thread.

If you eliminated one of the duplicate iosvc.run() calls, and found another way to join your threads, that might alleviate your problem.

Emile Cormier

unread,
Nov 8, 2015, 6:26:09 PM11/8/15
to CppWAMP
On Sunday, November 8, 2015 at 6:52:54 PM UTC-4, Emile Cormier wrote:
If you eliminated one of the duplicate iosvc.run() calls, and found another way to join your threads, that might alleviate your problem.

If you did eliminate one of the iosvc.run() calls, you'd still have a problem with line 43 in MyWampSession.cpp:

if(!m_reqQueue.Wait(2000))continue;  // waits for request


While you are waiting here, you are temporarily suspending the thread that is calling iosvc.run(), which prevents CoroSession from processing pending incoming/outgoing WAMP messages.

I think the fundamental problem is that you're mixing old-school multi-threading programming (with mutexes and condvars) with asynchronous programming. Again, I ask what is the motivation for the background thread anyway?

Emile Cormier

unread,
Nov 9, 2015, 12:06:02 PM11/9/15
to CppWAMP
Rosa,

You have indicated via private email that your motivation for a worker thread is because your RPCs need to perform synchronous blocking operations on a database. This is a perfectly valid concern, and is one that might be shared by several other CppWAMP users.

The solution to this problem is to provide an asynchronous service wrapper around the synchronous DB service. Unfortunately, this technique is very poorly covered by the Boost documentation. The basic idea is that the service wrapper has its own private io_service and worker thread. The private io_service acts as a work queue that performs synchronous tasks sequentially on the background thread. When the service wrapper completes an operation on its background thread, it posts a user-defined handler back to the main app's io_service.

Boost.Asio covers this in its Services examples. There is also this SO question.

I have written up an example. I has been compiled, but not tested:

using namespace wamp;


// Represents the database API
struct Database
{
   
int query(const std::string& sql) {return 42;}
};


class DatabaseService
{
public:
   
using QueryHandler = std::function<void (int)>;

   
explicit DatabaseService(AsioService& iosvc)
       
: iosvc_(iosvc)
   
{}

   
void start()
   
{
       
// Spawn database worker thread
        thread_
= std::thread([&dbIosvc_]()
       
{
           
// This keeps the private io_service running even if there's
           
// no pending work
           
AsioService::work work(workIosvc_);


           
// All work to be performed on the database is be posted
           
// to the workIosvc_ work queue. The actual DB work is executed
           
// in the following loop.
            workIosvc_
.run();
       
});
   
}

   
void query(std::string sql, QueryHandler handler)
   
{
       
// Post the actual work to the background io_service.
       
// The io_service acts like a queue, so work will be performed
       
// on a FIFO basis.
        workIosvc_
.post([this, handler]()
       
{
           
// Use the database API to obtain the result. Blocking calls
           
// are okay here because we are executing in the worker thread.
           
auto result = db_.query(sql);


           
// Post the asynchronous hander to the main app's io_service.
            iosvc_
.post(std::bind(handler, result));
       
});
   
}

private:
   
AsioService& iosvc_;         // IO service for main app
   
AsioService workIosvc_;      // Private IO service for database work
    std
::thread thread_;         // Background thread for database
   
Database db_;                // Represents database API
};


class App
{
public:
   
App(AsioService& iosvc, ConnectorList connectors)
       
: iosvc_(iosvc),
          db_
(iosvc),
          session_
(std::move(connectors))
   
{}

   
void start()
   
{
        boost
::asio::spawn(iosvc, [&](boost::asio::yield_context yield)
       
{
            session_
->connect(yield);
            session_
->join(Realm("myrealm"), yield);


           
using namespace std::placeholders;
            session_
->enroll(
               
Rpc("getUltimateAnswer"),
                std
::bind(&App::getUltimateAnswerRpc, this, _1, _2),
               
yield);
       
});
   
}

private:
   
Outcome getUltimateAnswerRpc(Invocation inv, std::string question)
   
{
       
// Issue a query to the asynchronous DB service. The DB service will
       
// post our handler to the iosvc_ when the query operation is
       
// complete. We then use the Invocation object to return the result
       
// back to the WAMP callee.
        db_
.query(question, [inv](int answer)
       
{
            inv
.yield({answer});
       
});

       
// Indicate to CppWAMP that the RPC result will be sent later via
       
// the Invocation object.
       
return Outcome::deferred();
   
}

   
AsioService& iosvc_;
   
DatabaseService db_;
   
CoroSession<>::Ptr session_;
};


int main()
{
   
AsioService iosvc;
   
auto tcp = connector<Json>( iosvc, TcpHost("localhost", 12345) );
   
App app(iosvc, {tcp});
    app
.start();

    iosvc
.run();

   
return 0;
}

I have not paid any attention to orderly shutdown and destruction. std::enable_shared_from_this can be invaluable for keeping objects temporarily "alive" while operations are still pending. The idiom goes something like this:

self = shared_from_this();
asyncOperation(operationArg, [this, self](HandlerArg arg)
{
    // handler code
});

Because the self shared pointer is bound to the lambda passed to the io_service queue, the object issuing the asynchronous operation will be kept alive until the operation completes.

When you're troubleshooting shutdown problems, it's sometime useful to write dummy destructors that print to the console. That way you can find out if an object is being prematurely destroyed, and whether it should be kept alive using the shared_from_this technique.

The author or Boost.Asio, Christopher Kohlhoff, has seminar videos on asynchronous programming techniques. I encourage you to watch them.
Reply all
Reply to author
Forward
0 new messages