How to add new connections to already active uv_loop

458 views
Skip to first unread message

Darren Smith

unread,
Dec 28, 2015, 3:37:56 PM12/28/15
to libuv
Hi,

I cannot figure out how to achieve the following: to add a new uv_tcp_t to already active uv loop which is presently blocked on IO.

In detail:

My uv event loop is invoked as UV_RUN_DEFAULT.  This is managing the IO for a single TCP connection which was actively established as follows:

  uv_connect_t connect_req;
 
struct sockaddr_in addr;
  uv_ip4_addr
("127.0.0.1", 40001, &addr);
  uv_tcp_init
(uv_default_loop(), &tcp_handle_1);
  uv_tcp_connect
(&connect_req,
                 
&tcp_handle_1,
                 
(const struct sockaddr*) &addr,
                 connect_cb
);
  uv_run
(uv_default_loop(), UV_RUN_DEFAULT);


In the connect_cb() the read is enabled:

static void connect_cb(uv_connect_t* req, int status)
{
  uv_read_start
(req->handle, alloc_buffer, on_read);
}


Nothing surprising so far, and all works ; this is a single actively established socket which can read from other side.

Next, slightly later, and in a separate thread, I wish to create a new uv_tcp_t handle and add that to the loop, so I do the same (just changing the handle variable):

  uv_connect_t connect_req;
 
struct sockaddr_in addr;
  uv_ip4_addr
("127.0.0.1", 40002, &addr);
  uv_tcp_init
(uv_default_loop(), &tcp_handle_2);
  uv_tcp_connect
(&connect_req,
                 
&tcp_handle_2,
                 
(const struct sockaddr*) &addr,
                 connect_cb
);


Now the problem I encounter is that while I do see a connection is made to 127.0.0.1:40002, this second stream (tcp_handle_2) does not read from the socket.  In fact it will read, but only after some bytes have arrived on the first connection.

I think the problem is that the uv event loop is blocked in the IO; so when tcp_handle_2 is added, the uv loop does not immediately call the connect_cb() to set up the stream reading.  The uv loop remains blocked until bytes appears on tcp_handle_1, at which point the connect_cb() for tcp_handle_2 is called and then reads can take place.

So what's the best approach to resolve this?  I would like to have a single uv event loop to manage many actively established connections, but I have not found a way to interrupt an io-blocked uv loop so that it can deliver the callbacks associated with setting up a new connection.  One possible approach would be to use an idler, however if I try that, I see CPU use rise to 100%.  Another approach might be to use something like socketpair to manually interrupt the uv loop; but I've not yet found that in libuv.

Help appreciated. Thanks,

Darren

Ben Noordhuis

unread,
Dec 28, 2015, 3:45:58 PM12/28/15
to li...@googlegroups.com
The short answer is that you can't do that. The event loop in libuv
is not thread-safe (with the sole exception of uv_async_send()) and
this is not expected to change anytime soon; maybe in libuv v2.0.

If you have a file descriptor that you want to wrap in uv_tcp_t, you
can try the following:

1. Create a uv_async_t handle in your main thread.

2. Start the worker thread.

3. Call uv_run() in the main thread.

4. In the worker thread, put the file descriptor in a thread-safe
data structure (e.g. one guarded by a uv_mutex_t) and call
uv_async_send().

5. In the main thread, in your uv_async_cb callback, pop off the
file descriptor and create a new uv_tcp_t out of it.

Note that uv_async_send() can and will coalesce calls so don't expect
one callback for every call to uv_async_send().
Message has been deleted

Darren Smith

unread,
Dec 28, 2015, 6:43:54 PM12/28/15
to libuv
Thanks for quick answer Ben.

Yes, this looks like exactly what I need.  uv_async_send -- Wakeup the event loop and call the async handle’s callback.

Regarding callback coalescing, I think it shouldn't be an issue since I'll use a mutex & message queue, and I'll call uv_async_send only after the message queue has been prepared.  So I don't think the following has a race condition, because async_cb should be triggered even if a uv_async_send is delivered while async_cb is concurrently active:

static uv_async_t async;


// concurrently called by many worker threads, & inc. UV Loop thread.
void push_event(Event evt)
{  
 
{
    unique_lock
<mutex> guard( queue_mutex );
    my_queue
.push_back( evt );
 
}
  uv_async_send
( &async );
}


// called by UV Loop
void async_cb(uv_async_t* handle)
{  
  vector
<Event> new_events;
 
{
    unique_lock
<mutex> guard( queue_mutex );
    new_events
.swap( my_queue );
 
}


 
// ... for each Event in new_events, do uv_tcp_init & uv_tcp_connect
}



Ben Noordhuis

unread,
Dec 29, 2015, 2:51:26 AM12/29/15
to li...@googlegroups.com
On Tue, Dec 29, 2015 at 12:34 AM, Darren Smith <darr...@gmail.com> wrote:
> Thanks for quick answer Ben.
>
> Yes, this looks like exactly what I need. uv_async_send -- Wakeup the event
> loop and call the async handle’s callback.
>
> Regarding callback coalescing, I think it shouldn't be an issue since I'll
> use a mutex & message queue, and I'll call uv_async_send only after the
> message queue has been prepared. So I don't think the following has a race
> condition:
>
> static uv_async_t async;
>
> // concurrently called by many worker threads, & inc. UV Loop thread.
> void push_event(Event evt)
> {
> {
> unique_lock<mutex> guard( queue_mutex );
> my_queue.push_back( evt );
> }
> uv_async_send( &async );
> }
>
>
> // called by UV Loop
> void async_cb(uv_async_t* handle)
> {
> vector<Event> new_events;
> {
> unique_lock<mutex> guard( queue_mutex );
> new_events.swap( my_queue );
> }
>
> // ... for each Event in new_events, do uv_tcp_init & uv_tcp_connect
> }

Yes, that should work. The key is to clear the whole queue instead of
popping just the first element.
Reply all
Reply to author
Forward
0 new messages