LIBUV: Releasing Handles & epoll Oriented Reads

206 views
Skip to first unread message

pa...@rcom-software.com

unread,
Dec 20, 2020, 6:47:07 AM12/20/20
to libuv
Hi Folks:

My Libuv based Server performs all its functions correctly except for TCP connection termination.

Each TCP connection has uv_tcp_t connection handle and uv_poll_t handle whose allocation
and operation are explained below.  When the Protocol_Task() thread needs to terminate
a connection, it must stop polling, terminate the TCP socket connection, and deallocate
the handles.

NOTE: I am using the GIT HUB distribution from the following link on Ubuntu Linux version 15.04.

    https://github.com/nikhilm/uvbook

I have tried the following two approaches.

1) Just use uv_poll_stop() to terminate polling and uv_close() to terminate the TCP connection.

2) Use uv_poll_stop() to terminate polling and the using uv_queue_work() and uv_async_send() to
   wake up the Connect_Loop, in the main() process described below, so it can terminate the
   TCP connection, by proxy, with uv_close().

In both cases the following problem occurs. The callback routine supplied to uv_close()
does not execute until another incoming TCP connection occurs, and in most cases,
the Pool_Loop, in the IO_Task() described below, stops invoking it callback routine--
poll_callback(). In case 2, a crash almost alway ensues. (I probably am not using
uv_async_send() correctly.)

Do I have a fundamental misunderstanding of how Libuv works or am I doing something wrong ?

Also, I strongly suspect using Linux recv() to read data is not optimal when epoll() is
being used. My understanding is that there is a way to pass buffers to epoll() such that
data will automatically be inserted in them when a UV_READABLE event occurs. Do you have
any advice about this ?

An overview of my Server and the relevant code follow.

Best Regards,

Paul Romero

Multi-Connection TCP Server Functional Architecture Overview
-----------------------------------------------------------------------------------------
There is a connection descriptor for each incoming TCP connection which contains all data
needed to manage the connection and perform the relevant functions.

When the main() process detects an incoming TCP connection, it sends a notification message to the
IO_Trigger_Task(). The IO_Trigger_Task() then sets up epoll() monitoring of incoming TCP data
for that connection.

Subsequently, the IO_Task() invokes poll_callback() when incoming data is available, reads a chunk
of data, and sends a protocol message to the Protocol_Task() when a complete protocol message is
recognized.

The Timer_Task() sends an expiration notification message to the Protocol_Task() when a protocol
timer expires.

The Protocol_Task() send messages to the Send_Op_Task() for transmission across the network.
It spawns a DB Operation Task to perform slow data base operations and the DB Operation Task
notifies the Protocol_Task() when the operation is complete and then terminates.

Loops of type uv_loop_t
-----------------------
* Connect_Loop
* Pool_Loop
* Timer_Loop`

Tasks: All Libuv thread tasks run concurrently and are launched by main() at startup time.
------------------------------------------------------------------------------------------
* main(): A Linux process that runs the Connect_Loop to detect incoming TCP connections.
  The make_incoming_connection() callback routine accepts incoming connections and
  allocates a uv_tcp_t handle on a per connection basis

* IO_Trigger_Task(): A Libuv thread that sets up epoll() plumbing for the IO_Task()
  when an incoming TCP connection occurs. It allocates a uv_poll_t handle, on a per
  connection basis, and calls uv_poll_start() to initiate epoll() operation with the
  Poll_Loop in the IO_Task(). It configures the handle to detect UV_READABLE events and
  handles them with the poll_callback() routine.  However, it does not run the Poll_Loop.
  (Basically, this task just sets up plumbing.)

* IO_Task(): A Libuv thread that runs the Poll_Loop to handle incoming TCP data, on a per
  connection basis. The poll_callback() routine executes and uses normal Linux recv() to read
  chunks of data, in non-blocking mode, when a UV_READABLE event occurs.

* Timer_Task(): A Libuv thread that runs the Time_Loop to handle ticks, and whose main
  function is to detect protocol timer expiration. The tick duration is configured with
  is configured with uv_timer_init() and uv_timer_start(), and ticks are handled by the
  timer_callback() routine.

* Protocol_Task(): A Libuv thread that handles protocol messages sent to it by the following tasks
  on per connection basis: IO_Task(), Timer_Task(), DB Operation Tasks. DB Operation Libuv thread tasks
  are spawned by the Protocol_Task() to perform slow database operations and send a notification message
  to the Protocol_Task() upon completion of the operation.

* Send_Op_Task(): A Libuv thread that transmits all network bound messages with normal
  Linux send() on a per connection basis.


Approach 1 Code
-------------
ROUTINE void close_callback(uv_handle_t *handle)
{

    free(handle);
    return;
}

ROUTINE void RELEASE_CONNECTION(CONN_DESC *cdesc)
{
    struct linger spec;
    int r;

    if(N_Sockets > 0)
        N_Sockets--;

    if(cdesc->poll_handle)
       {
        uv_poll_stop(cdesc->poll_handle);
        free((void *) cdesc->poll_handle);
      }

    if(cdesc->conn_handle)
      {
        struct linger spec;

        spec.l_onoff = TRUE;
        spec.l_linger = 0;
        setsockopt(cdesc->fd, SOL_SOCKET, SO_LINGER, &spec, sizeof(spec) );

        uv_close((uv_handle_t *) cdesc->conn_handle, close_callback);
      }

ENTER_MUTEX(&Service_Q_Mutex);
    DELETE_CONN(cdesc);
    cdesc->fd = -1;
    flush_msg(&cdesc->task_input_q);
EXIT_MUTEX(&Service_Q_Mutex);

    return;
}

Approach 2 Code
-----------------
ROUTINE void close_callback(uv_handle_t *handle)
{
    free(handle);
    return;
}

typedef struct close_template {
uv_handle_t    *handle;
void        (*callback) (uv_handle_t *);
} CLOSE_TEMPLATE;

ROUTINE void close_proxy(uv_work_t *data)
{
    CLOSE_TEMPLATE *cparam = (CLOSE_TEMPLATE *) cparam;

    uv_close(cparam->handle, cparam->callback);
    return;
}


extern uv_loop_t Connect_Loop;
static CLOSE_TEMPLATE close_data;

ROUTINE void RELEASE_CONNECTION(CONN_DESC *cdesc)
{
    uv_work_t wreq;
    uv_async_t as_handle;
    struct linger spec;

    if(N_Sockets > 0)
        N_Sockets--;

    //
    // Stop this. TBD: Might need to do this via proxy in the IO_Task() Poll_Loop.
    //
    uv_poll_stop(cdesc->poll_handle);

    uv_async_init(&Connect_Loop, &as_handle, NULL);

    close_data.handle = (uv_handle_t *) cdesc->conn_handle;
    close_data.callback = close_callback;
    //
    // Call uv_close() in the close_proxy()
    //
    wreq.data = (void *) &close_data;
    uv_queue_work(&Connect_Loop, &wreq, close_proxy, NULL);

    spec.l_onoff = TRUE;
    spec.l_linger = 0;
    setsockopt(cdesc->fd, SOL_SOCKET, SO_LINGER, &spec, sizeof(spec) );

    uv_async_send(&as_handle);
    uv_close((uv_handle_t *) &as_handle, NULL);

    free(cdesc->poll_handle);

ENTER_MUTEX(&Service_Q_Mutex);
    DELETE_CONN(cdesc);
    cdesc->fd = -1;
    flush_msg(&cdesc->task_input_q);
EXIT_MUTEX(&Service_Q_Mutex);

    return;
}

pa...@rcom-software.com

unread,
Dec 20, 2020, 1:13:34 PM12/20/20
to libuv
Hi Folks:

I made some progress on the problem but it is definitely not solved. The updated code
and more diagnostic code are included in the message.


NOTE: I am using the GIT HUB distribution from the following link on Ubuntu Linux version 15.04.

    https://github.com/nikhilm/uvbook

The Libuv software package looks like version 1.3.0.

I have had to take extraordinary measures to make connection release reliable.
The relevant code is included at near end of this message and the extraordinary
measures are in the CLOSE_KLUDGE sections. The difficulty arises because the
Libuv loops are not used in the Protocol_Task() yet it must affect operations
on those loops to release handles. It would be nice if Libuv included an API
for releasing handles reliably which could be called from any task.

Connection release still fails about 15% of the time in which case a crash occurs
and the following diagnostic is displayed.

    pexd: src/unix/core.c:210: uv__finish_close: Assertion `!(handle->flags & UV_CLOSED)' failed.

More diagnostic information follows.  Do you know what causes this crash ?

Best Regards,

Paul Romero


Crash Diagnostics
-----------------
The crash occurs when run() is executing in the IO_Task() in network_io.c according to the following
GBD stack trace.

#0  0x00007f281754c267 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:55
#1  0x00007f281754deca in __GI_abort () at abort.c:89
#2  0x00007f281754503d in __assert_fail_base (fmt=0x7f28176a7028 "%s%s%s:%u: %s%sAssertion `%s' failed.\n%n",
    assertion=assertion@entry=0x41e093 "!(handle->flags & UV_CLOSED)", file=file@entry=0x41e068 "src/unix/core.c",
    line=line@entry=210, function=function@entry=0x41e2b0 <__PRETTY_FUNCTION__.9522> "uv__finish_close") at assert.c:92
#3  0x00007f28175450f2 in __GI___assert_fail (assertion=assertion@entry=0x41e093 "!(handle->flags & UV_CLOSED)",
    file=file@entry=0x41e068 "src/unix/core.c", line=line@entry=210,
    function=function@entry=0x41e2b0 <__PRETTY_FUNCTION__.9522> "uv__finish_close") at assert.c:101
#4  0x000000000040c967 in uv__finish_close (handle=<optimized out>) at src/unix/core.c:210
#5  uv__run_closing_handles (loop=0x638080 <Poll_Loop>) at src/unix/core.c:259
#6  uv_run (loop=0x638080 <Poll_Loop>, mode=UV_RUN_DEFAULT) at src/unix/core.c:326
#7  0x0000000000404962 in IO_Task (arg=0x0) at network_io.c:226
#8  0x0000000000412ad7 in uv__thread_start (arg=<optimized out>) at src/unix/thread.c:49
#9  0x00007f2817bf06aa in start_thread (arg=0x7f2816d15700) at pthread_create.c:333
#10 0x00007f281761deed in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109

However, the GDB thread information indicates that RELEASE_CONNECTION(), in protocol.c, is executing
in the Protocol_Task() when the crash occurs.

  Id   Target Id         Frame
  6    Thread 0x7f2817516700 (LWP 3424) syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
  5    Thread 0x7f2816514700 (LWP 3426) pthread_cond_wait@@GLIBC_2.3.2 ()
    at ../sysdeps/unix/sysv/linux/x86_64/pthread_cond_wait.S:185
  4    Thread 0x7f2818003700 (LWP 3423) syscall () at ../sysdeps/unix/sysv/linux/x86_64/syscall.S:38
  3    Thread 0x7f2815512700 (LWP 3428) pthread_cond_wait@@GLIBC_2.3.2 ()
    at ../sysdeps/unix/sysv/linux/x86_64/pthread_cond_wait.S:185
  2    Thread 0x7f2815d13700 (LWP 3427) 0x0000000000404500 in RELEASE_CONNECTION (cdesc=0x6384c0 <Conn_Desc_Table>)
    at protocol.c:357
* 1    Thread 0x7f2816d15700 (LWP 3425) 0x00007f281754c267 in __GI_raise (sig=sig@entry=6)
    at ../sysdeps/unix/sysv/linux/raise.c:55

Line 357 of protocol.c is as follows.

        while(WaitClose[cdesc->index]);

Wait_Close[] is only modified in two cases and only in the Protocol_Task().

1) It is initialized to a handle address in RELEASE_CONNECTION() in the Protocol_Task().
2) It is cleared in the uv_close() callback routine close_callback().


Code
-----

#define CLOSE_KLUDGE

extern uv_loop_t Poll_Loop;
extern uv_loop_t Connect_Loop;

#ifdef CLOSE_KLUDGE
uv_handle_t *WaitClose[MAX_CONN_DESC] = { NULL };
#endif // CLOSE_KLUDGE

ROUTINE void close_callback(uv_handle_t *handle)
{
    int k;

    free(handle);

#ifdef CLOSE_KLUDGE
    //
    // Determine if the handle is being closed.
    //
    for(k = 0; k < MAX_CONN_DESC; k++)
      {
        if(WaitClose[k] == handle)
          {
            //
            // Closure is complete.
            //
            WaitClose[k] = NULL;
            break;
          }
      }
#endif // CLOSE_KLUDGE


    return;
}

ROUTINE void RELEASE_CONNECTION(CONN_DESC *cdesc)
{
    uv_async_t as_handle;
    struct linger spec;

    if(N_Sockets > 0)
        N_Sockets--;
    //
    // This causes immediate socket disconnection when it is closed.
    //

    spec.l_onoff = TRUE;
    spec.l_linger = 0;
    setsockopt(cdesc->fd, SOL_SOCKET, SO_LINGER, &spec, sizeof(spec) );

    if(cdesc->poll_handle)
      {
#ifdef CLOSE_KLUDGE
        WaitClose[cdesc->index] = (uv_handle_t *) cdesc->poll_handle;
#endif // CLOSE_KLUDGE
        //
        // Deactive and release the poll handle.
        // You have stop the Poll_Loop to deactivate and deallocate the poll handle.
        //
        uv_stop(&Poll_Loop);

        uv_poll_stop(cdesc->poll_handle);
        uv_close((uv_handle_t *) cdesc->poll_handle, close_callback);
        //
        // Wake up the Poll_Loop in the IO_Task()
        //
        uv_async_init(&Poll_Loop, &as_handle, NULL);

        uv_async_send(&as_handle);
        uv_close((uv_handle_t *) &as_handle, NULL);
#ifdef CLOSE_KLUDGE
        //
        // Wait for the handle to be closed and deallocated.
        //
        while(WaitClose[cdesc->index]);
#endif // CLOSE_KLUDGE
      }

    if(cdesc->conn_handle)
      {
#ifdef CLOSE_KLUDGE
        WaitClose[cdesc->index] = (uv_handle_t *) cdesc->conn_handle;
#endif // CLOSE_KLUDGE
        //
        // Close and deallocate the connect handle in order to close the socket connecction.
        // You have to wake up the Connect_Loop for the close_callback()
        // routine to execute.
        //

        uv_close((uv_handle_t *) cdesc->conn_handle, close_callback);
        //
        // Wake up the Connect_Loop in the main() process.
        //
        uv_async_init(&Connect_Loop, &as_handle, NULL);

        uv_async_send(&as_handle);
        uv_close((uv_handle_t *) &as_handle, NULL);
#ifdef CLOSE_KLUDGE
        //
        // Wait for the handle and socket connection to be release and closed.
        //
        while(WaitClose[cdesc->index]);
#endif // CLOSE_KLUDGE

      }


ENTER_MUTEX(&Service_Q_Mutex);
    DELETE_CONN(cdesc);
    cdesc->fd = -1;
    flush_msg(&cdesc->task_input_q);
EXIT_MUTEX(&Service_Q_Mutex);

    return;
}

pa...@rcom-software.com

unread,
Dec 20, 2020, 4:23:52 PM12/20/20
to libuv
Hi Folks:

With limited testing the problem ceases to happen if you force uv_run() in the IO_Task()
enough to finish its pending work. As an interim measure I do this by making the
Protocol_Task() to yield the CPU after calling uv_stop() and up_poll_stop() as follows in
the RELEASE_CONNECTION() routine.  This appears to cause IO_Task() to be scheduled and run
but I am not all all convinced this is a reliable technique.


        //
        // Deactive and release the poll handle.
        // You have stop the Poll_Loop to deactivate and deallocate the poll handle.
        //
        uv_stop(&Poll_Loop);

        uv_poll_stop(cdesc->poll_handle);
#ifdef CLOSE_KLUDGE2
        //
        // Try to let run() in the IO_Task() finish pending work by yielding the CPU.
        //
        for(k = 0; k < 10; k++) pthread_yield();
#endif // CLOSE_KLUDGE2

        uv_close((uv_handle_t *) cdesc->poll_handle, close_callback);


Best Regards,

Paul R.

pa...@rcom-software.com

unread,
Dec 21, 2020, 8:39:09 PM12/21/20
to libuv
Hi Folks:

I think I see the heart of my problem. Everything appears to work correctly
when you establish an incoming TCP connection and release it and the related
Libuv handles--the uv_tcp_t connection handle and the uv_poll_t poll handle.
(I revised the release code to do things the right way.)

The comments about coalescing of uv_async_send() calls in the documentation is somewhat misleading.
They should indicate that call with the same handle are synchronous. Also, I suspect that
uv_async_send() is not reentrant.

When you attempt another incoming connection the following things occur.

Notice in 2.2, below that uv_start_loop() executes without being called. This
doesn't make sense to me--at least on the surface. Can you think of a possible
reason this occurs ?

1) The connection is successfully established--with uv_accept(), and the
  socket descriptor fd is the same as was used in the previous connection,
  in the main() process. (Occurs with uv_loop_t Session_Loop.)

        conn_handle = (uv_tcp_t *) malloc(sizeof(uv_tcp_t));
    if(conn_handle == NULL)
      {
        fprintf(stderr, "MAIN: No Connect Handle Memory\n");
        abort();
      }

    uv_tcp_init(&Connect_Loop, conn_handle);
    if(uv_accept(listen_handle, (uv_stream_t *) conn_handle) == 0)
       {
            uv_os_fd_t fd;

        uv_fileno((const uv_handle_t*) conn_handle, &fd);
       }

2.1)  A poll handle is successfully allocated in the IO_Trigger_Task() thread.
  (No loop involved.)

        poll_handle = (uv_poll_t *) malloc(sizeof(uv_poll_t));
        if(poll_handle == NULL)
          {
            fprintf(stderr, "IO_TRIGGER_TASK: No Poll HAndle Memory\n");
            abort();
          }

        uv_poll_init(&Poll_Loop, poll_handle, pm->info);
        if((r = uv_poll_start(poll_handle, UV_READABLE, poll_callback)) < 0)
          {
            fprintf(stderr, "IO_TRIGGER_TASK: Polling Initiation Error %d: %s\n", r, uv_err_name(r));
            abort();
          }

2.2)  uv_poll_start() is invoked via a call.

        uv_poll_init(&Poll_Loop, poll_handle, pm->info);
        if((r = uv_poll_start(poll_handle, UV_READABLE, poll_callback)) < 0)
          {
            fprintf(stderr, "IO_TRIGGER_TASK: Polling Initiation Error %d: %s\n", r, uv_err_name(r));
            abort();
          }

2.3) uv_poll_start() executes again without being called !

This is what you see in GDB which is very strange since I know there is only
one instance of the IO_Trigger_Task() running and it was not called a second time
because the line before line 212 didn't execute a second time.

Breakpoint 1, IO_Trigger_Task (arg=0x0) at network_io.c:212
212            if((r = uv_poll_start(poll_handle, UV_READABLE, poll_callback)) < 0)
(gdb) bt
#0  IO_Trigger_Task (arg=0x0) at network_io.c:212
#1  0x0000000000413017 in uv__thread_start (arg=<optimized out>) at src/unix/thread.c:49
#2  0x00007ffff7bc26aa in start_thread (arg=0x7ffff64e6700) at pthread_create.c:333
#3  0x00007ffff75efeed in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109
(gdb) s
uv_poll_start (handle=0x7fffec0008c0, pevents=1, poll_cb=0x404599 <poll_callback>) at src/unix/poll.c:89
89      assert((pevents & ~(UV_READABLE | UV_WRITABLE)) == 0);
(gdb) s
86    int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) {
(gdb) bt
#0  uv_poll_start (handle=0x7fffec0008c0, pevents=1, poll_cb=0x404599 <poll_callback>) at src/unix/poll.c:86
#1  0x00000000004048bb in IO_Trigger_Task (arg=0x0) at network_io.c:212
#2  0x0000000000413017 in uv__thread_start (arg=<optimized out>) at src/unix/thread.c:49
#3  0x00007ffff7bc26aa in start_thread (arg=0x7ffff64e6700) at pthread_create.c:333
#4  0x00007ffff75efeed in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109
(gdb)

This is the relevant code of the IO_Trigger_Task() thread.


    for(;;)
      {
        //
        // Wait for a message from the main() process.
        //
        pm  = WAIT_IO_Trigger_MSG();

        poll_handle = (uv_poll_t *) malloc(sizeof(uv_poll_t));
        if(poll_handle == NULL)
          {
            fprintf(stderr, "IO_TRIGGER_TASK: No Poll HAndle Memory\n");
            abort();
          }

        uv_poll_init(&Poll_Loop, poll_handle, pm->info);
        //
        // Start epoll() monitoring of the connection.
        //
        if((r = uv_poll_start(poll_handle, UV_READABLE, poll_callback)) < 0)
          {
            fprintf(stderr, "IO_TRIGGER_TASK: Polling Initiation Error %d: %s\n", r, uv_err_name(r));
            abort();
          }

        MSG_FREE(pm);
      }



2.4) The polling callback function never executes.

NOTE: The polling loop, Poll_Loop, of type uv_loop_t is already running and was started,
in the IO_Task() thread, at startup time as follows.

    uv_loop_init(&Poll_Loop);
    for(;;)
      {
        r = uv_run(&Poll_Loop, UV_RUN_DEFAULT);
        if(r)
            fprintf(stderr, "IO_TASK: Run Error %d\n", r);
      }


This is the sequence of operations used to free the first connection.

1) Release the uv_poll_t poll handle in the IO_Task() from the Protocol_Task()


    //
    // This causes immediate socket disconnection when it is closed.
    //
    spec.l_onoff = TRUE;
    spec.l_linger = 0;
    setsockopt(cdesc->fd, SOL_SOCKET, SO_LINGER, &spec, sizeof(spec) );
    //
    // poll_release_proxy() executes in the IO_Task() and releases poll handle.
    //
    uv_async_init(&Poll_Loop, &cdesc->async_handle, poll_release_proxy);
    cdesc->async_handle.data = (void *) cdesc;
    uv_async_send(&cdesc->async_handle);

1.1) Wait for poll handle to be freed and then release the async. handle.

    uv_close((uv_handle_t *) &cdesc->async_handle, NULL);

2) Release the uv_tcp_t connect handle in the main() process from the Protocol_Task()

    uv_async_init(&Connect_Loop, &cdesc->async_handle, conn_release_proxy);
    cdesc->async_handle.data = (void *) cdesc;
    uv_async_send(&cdesc->async_handle);

2.1) Wait for the connect handle to be free and then release the async. handle.

    uv_close((uv_handle_t *) &cdesc->async_handle, NULL);

3) Do protocol bookkeeping.

This is the code of the proxy callback routines and the close callback routine.

//
// This routine executes asynchronously and frees a handle.
// It is invoked in the follows two cases.
//
// * When the main process invokes poll_release_proxy()
//
// * When the IO_Task invokes conn_release_proxy().
//
ROUTINE void close_callback(uv_handle_t *handle)
{
    SDU *msg;

    CONN_DESC *cdesc = (CONN_DESC *) handle->data;

    free(handle);

ENTER_MUTEX(&Service_Q_Mutex);
    //
    // Set the state correctly and validate the state.
    //
    switch(cdesc->release_state)
    {
    case RS_POLL_HANDLE_PEND:
        cdesc->release_state = RS_POLL_HANDLE_FREE;
        break;
    case RS_CONN_HANDLE_PEND:
        cdesc->release_state = RS_CONN_HANDLE_FREE;
        break;
    default:
        fprintf(stderr, "CLOSE_PROXY - BUG: Invalid Release State = %d\n", cdesc->release_state);
        abort();
    }
EXIT_MUTEX(&Service_Q_Mutex);

    //
    // Send a notification message to the Protocol_Task.
    //
    msg = MSG_ALLOC(0, FALSE);
    msg->class = C_NOTIFY;
    msg->type = T_HANDLE_FREE;
    msg->info = 0;

    SEND_SDU(cdesc, msg);

    return;
}


//
// This routine is invoked by the IO_Task() in response to an async. wakeup by the Protocol_Task()
// during TCP connection termination. It release the resources used by the Poll_Loop.
//
ROUTINE void poll_release_proxy(uv_async_t *async_handle)
{
    CONN_DESC *cdesc = (CONN_DESC *) async_handle->data;

    //
    // Stop polling operations before closing the handle.
    //
    uv_poll_stop(cdesc->poll_handle);
    cdesc->poll_handle->data = (void *) cdesc;

    uv_close((uv_handle_t *) cdesc->poll_handle, close_callback);

    return;
}

//
// This routine is invoked by the main process in response to an async. wakeup by the Protocol_Task()
// during TCP connection termination. It release the resources used by the Connect_Loop.
//
ROUTINE void conn_release_proxy(uv_async_t *async_handle)
{
    CONN_DESC *cdesc = (CONN_DESC *) async_handle->data;

    cdesc->conn_handle->data = (void *) cdesc;

    uv_close((uv_handle_t *) cdesc->conn_handle, close_callback);

    return;
}

Best Regards,

Paul R.

pa...@rcom-software.com

unread,
Dec 22, 2020, 11:56:08 AM12/22/20
to libuv
Hi Folks:

This is what happens at a lower level. Do you have insight about the cause ?
The relevant code segments and the GDB stack traces follow below.

The main() process and IO_Task() are executing concurrently.

NOTE: The IO_Trigger_Task() has been eliminated and the uv_start_poll() call is invoked in the IO_Task()
by a proxy routine and an async. wakeup in the main() process. The wakeup is done in the make_incoming_connection()
callback routine by the following code segment.


    poll_handle->data = (void *) cdesc;
    //
    // Start epoll() monitoring of the connection. The poll_start_proxy()
    // routine executes in the IO_Task().
    //
    uv_async_init(&Poll_Loop, &cdesc->async_handle, poll_start_proxy);

    cdesc->async_handle.data = (void *) cdesc;
    uv_async_send(&cdesc->async_handle);


* The main() process is executing uv__finish_close() which is invoked via uv_run() and uv__run_closing_handles().
  It crashes in the call to uv__finish_close()

  This is the code from unix/core.c: Line 210 is the first assert() statement.
------------------------------------------------------------------------------
static void uv__finish_close(uv_handle_t* handle) {
  /* Note: while the handle is in the UV_CLOSING state now, it's still possible
   * for it to be active in the sense that uv__is_active() returns true.
   * A good example is when the user calls uv_shutdown(), immediately followed
   * by uv_close(). The handle is considered active at this point because the
   * completion of the shutdown req is still pending.
   */
#ifndef SUPRESS
  assert(handle->flags & UV_CLOSING);
  assert(!(handle->flags & UV_CLOSED));
#endif /* SUPRESS */
  handle->flags |= UV_CLOSED;

* The IO_Task() thread is executing uv_poll_start() which is called from start_poll_proxy() in
  response to an async. request from the main() process. (uv__async_event() makes the call to
  start_poll_proxy()).

This is the code that executes in the IO_Task() from network_io.c
-----------------------------------------------------------------
//
// Executes in IO_Task
//
ROUTINE void poll_start_proxy(uv_async_t *async_handle)
{
    int r;


    CONN_DESC *cdesc = (CONN_DESC *) async_handle->data;

    uv_poll_init(&Poll_Loop, cdesc->poll_handle, cdesc->fd);
    if((r = uv_poll_start(cdesc->poll_handle, UV_READABLE, poll_callback)) < 0)
      {
        fprintf(stderr, "PROXY: IO_TASK - Polling Initiation Error %d: %s\n", r, uv_err_name(r));
        abort();
      }
    uv_close( (uv_handle_t *) async_handle, NULL);

    return;
}

This is the code from unix/poll.c: Line 92 is the call to uv__poll_stop()
--------------------------------------------------------------------------

int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) {
  int events;


  assert((pevents & ~(UV_READABLE | UV_WRITABLE)) == 0);
  assert(!(handle->flags & (UV_CLOSING | UV_CLOSED)));

  uv__poll_stop(handle);

  if (pevents == 0)
    return 0;

  events = 0;
  if (pevents & UV_READABLE)
    events |= UV__POLLIN;
  if (pevents & UV_WRITABLE)
    events |= UV__POLLOUT;

  uv__io_start(handle->loop, &handle->io_watcher, events);
  uv__handle_start(handle);
  handle->poll_cb = poll_cb;

  return 0;
}



main() Stack Trace
--------------------
#0  0x00007ffff751e267 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:55
#1  0x00007ffff751feca in __GI_abort () at abort.c:89
#2  0x00007ffff751703d in __assert_fail_base (fmt=0x7ffff7679028 "%s%s%s:%u: %s%sAssertion `%s' failed.\n%n",
    assertion=assertion@entry=0x41e678 "handle->flags & UV_CLOSING", file=file@entry=0x41e668 "src/unix/core.c",
    line=line@entry=210, function=function@entry=0x41e8b0 <__PRETTY_FUNCTION__.9522> "uv__finish_close")
    at assert.c:92
#3  0x00007ffff75170f2 in __GI___assert_fail (assertion=assertion@entry=0x41e678 "handle->flags & UV_CLOSING",
    file=file@entry=0x41e668 "src/unix/core.c", line=line@entry=210,
    function=function@entry=0x41e8b0 <__PRETTY_FUNCTION__.9522> "uv__finish_close") at assert.c:101
#4  0x000000000040cd60 in uv__finish_close (handle=<optimized out>) at src/unix/core.c:210
#5  uv__run_closing_handles (loop=0x830680 <Connect_Loop>) at src/unix/core.c:261
#6  uv_run (loop=0x830680 <Connect_Loop>, mode=UV_RUN_DEFAULT) at src/unix/core.c:328
#7  0x0000000000403e0f in main () at main.c:184

IO_Task() Stack Trace
-----------------------
#0  uv__poll_stop (handle=0x831bc0) at src/unix/poll.c:73
#1  0x000000000040e93f in uv_poll_start (handle=0x831bc0, pevents=1, poll_cb=0x404576 <poll_callback>)
    at src/unix/poll.c:92
#2  0x00000000004047d4 in poll_start_proxy (async_handle=0x639468 <Conn_Desc_Table+40>) at network_io.c:146
#3  0x000000000040c1dd in uv__async_event (loop=0x639000 <Poll_Loop>, w=<optimized out>, nevents=<optimized out>)
    at src/unix/async.c:89
#4  0x000000000040c29e in uv__async_io (loop=0x639000 <Poll_Loop>, w=0x6391c8 <Poll_Loop+456>,
    events=<optimized out>) at src/unix/async.c:160
#5  0x000000000041590d in uv__io_poll (loop=loop@entry=0x639000 <Poll_Loop>, timeout=-1) at src/unix/linux-core.c:319
#6  0x000000000040cac7 in uv_run (loop=0x639000 <Poll_Loop>, mode=UV_RUN_DEFAULT) at src/unix/core.c:326
#7  0x000000000040484e in IO_Task (arg=0x0) at network_io.c:171
#8  0x0000000000412eb7 in uv__thread_start (arg=<optimized out>) at src/unix/thread.c:49
#9  0x00007ffff7bc26aa in start_thread (arg=0x7ffff6ce7700) at pthread_create.c:333
#10 0x00007ffff75efeed in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109


Best Regards,

Paul R.

Jameson Nash

unread,
Dec 22, 2020, 1:59:05 PM12/22/20
to libuv
Hi Paul,

This list isn't really likely to give you free consulting work. My brief skimming suggested the following tips as possibly relevant:
 - libuv is an async library first-and-foremost. You must adapt your mental model to use callback-managed state machines (inverted control flow). Learning Javascript can be helpful here for the mental model aspects.
 - libuv is not thread-safe. There's some specific operations defined for concurrent program integration, but you're responsible for the details.
 - uv_async_t is not a queue, it is a condition object. You are responsible for providing your own queue.

-jameson

--
You received this message because you are subscribed to the Google Groups "libuv" group.
To unsubscribe from this group and stop receiving emails from it, send an email to libuv+un...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/libuv/893eb3ed-685e-416b-9d05-3501588095den%40googlegroups.com.

Paul Romero

unread,
Dec 22, 2020, 5:45:19 PM12/22/20
to li...@googlegroups.com
Hi Jameson:

I appreciate your suggestions and realize Libuv is callback and asynchronously oriented.

This whole problem revolves around releasing  a uv_tcp_t connection handle,
and of course, its associated TCP socket. I have verified that uv_close() and
its callback function execute in the right thread task when it is called.

The Libuv documentation states that uv_async_send() is reentrant but it
also states that calls may be "coalesced". My, perhaps incorrect, understanding
is that uv_async_send() is synchronous when it is called with the async. handle.
That implies that calls to uv_async_send() with different handles execute independently.
(I.e. With separate data.)

Here is what I am trying to do.  The task Protocol_Task() is the only thread task that
can determine when a TCP socket should be released. When it determines that is
the case, it uses uv_async_send() to have the main process release the socket
on its behalf because the main process is a different thread task. This is more
or less equivalent to sending a message to the main process.

Also, my initial assumption was that releasing TCP sockets is a common operation
and there would be some API to facilitate the operation.

I don't see how the loop in the main process can invoke a callback to do this
since it doesn't know when the connection needs to be released.

I will attach the files I consider relevant in case you are curious.

Best Regards,

Paul R.
To view this discussion on the web visit https://groups.google.com/d/msgid/libuv/CADnnjUU5fwbCzRO4eLRRjdzD0jtTZUxcQ6B84aQ9P_pt_McW3Q%40mail.gmail.com.

-- 


Paul Romero
-----------
RCOM Communications Software
EMAIL: pa...@rcom-software.com
PHONE: (510)482-2769




main.c
network_io.c
protocol.c
rx_fsm.c

Paul Romero

unread,
Dec 23, 2020, 7:53:25 PM12/23/20
to li...@googlegroups.com
Hi Folks:

After taking several wrong paths I isolated the problem.

It occurs when uv_close() is called as follows to close the async. handle
used to initiate an async. operation with uv_async_send().


     uv_close((uv_handle_t *) &cdesc->async_handle, NULL);

Now if another async. operation is initiated with the same handle,
after calling uv_async_init(), it triggers uv__finish_close() to be
invoked in both the main() process and IO_Task() thread concurrently.
Then the following familiar crash occurs at the assert().

#0  0x00007f256fdf0267 in __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:55
#1  0x00007f256fdf1eca in __GI_abort () at abort.c:89
#2  0x00007f256fde903d in __assert_fail_base (fmt=0x7f256ff4b028 "%s%s%s:%u: %s%sAssertion `%s' failed.\n%n",
    assertion=assertion@entry=0x4261e0 "handle->flags & UV_HANDLE_CLOSING",
    file=file@entry=0x426115 "src/unix/core.c", line=line@entry=254,
    function=function@entry=0x426480 <__PRETTY_FUNCTION__.10077> "uv__finish_close") at assert.c:92
#3  0x00007f256fde90f2 in __GI___assert_fail (assertion=assertion@entry=0x4261e0 "handle->flags & UV_HANDLE_CLOSING",
    file=file@entry=0x426115 "src/unix/core.c", line=line@entry=254,
    function=function@entry=0x426480 <__PRETTY_FUNCTION__.10077> "uv__finish_close") at assert.c:101
#4  0x0000000000410149 in uv__finish_close (handle=<optimized out>) at src/unix/core.c:254
#5  uv__run_closing_handles (loop=0x839880 <Connect_Loop>) at src/unix/core.c:317
#6  uv_run (loop=0x839880 <Connect_Loop>, mode=UV_RUN_DEFAULT) at src/unix/core.c:395
#7  0x0000000000404b5f in main () at main.c:188

There are two obvious possible causes.

1) The async. operation is not really complete.
2) A bug in Libuv.

This problem stops occurring if you don't call uv_close(). This is not a problem in my
application, since the cdesc->async->handle is not dynamically allocated,  but I am not sure
if the same is true for the Libuv software. (My application is Server that handles large numbers
of concurrent TCP connections and is a Linux Daemon.)

The Libuv documentation states that a uv_async_t handle remains activated until uv_close()
is called. Are there negative Libuv software consequences if an async. handle is left
activated ? My guess is no.

If this is a Libuv problem, perhaps dealing with it may be better to deal with it at the
documentation rather than code level.

It appears that caolescing of uv_async_send() calls is still an issue.

Best Regards,

Paul R.

PS: The reason I am considering using Libuv is that it provides a convenient epoll() wrapper
and epoll() allows very quick data reception without user space file descriptor polling or
the like.


On 12/22/2020 10:58 AM, Jameson Nash wrote:

Jameson Nash

unread,
Dec 23, 2020, 9:55:02 PM12/23/20
to libuv
So you called uv_async_init after destroying the object with uv_close? That sounds like a classic-use-after-free client bug, not a libuv issue

> coalescing of uv_async_send

Again, it's not a queue, you're responsible for getting that from another library, not a libuv issue

Paul Romero

unread,
Dec 24, 2020, 12:47:13 AM12/24/20
to li...@googlegroups.com
Hi Jameson:

Thank you.  For clarification I am not allocating or freeing cdesc->async_handle. (It is statically allocated
at compile time.) However, I don't fully understand what uv_close() does when it is called with a NULL
callback routine. It may be the case that it destroys some hidden data associated with the handle. Is
that true ?

Best Regards,

Paul R.

Iñaki Baz Castillo

unread,
Dec 24, 2020, 12:55:11 AM12/24/20
to li...@googlegroups.com
1. You call uv_close() once you NO LONGER need the handle.

2. After 1, you don't attemp to use the handle again. If you do it, this is your fault.

3. Eventually you get the uv_close_cb called. You free the handle if it was dynamically allocated.

It looks like you think that calling uv_close() with uv_close_cb=NULL changes bullet 2 above. Not true.

Paul Romero

unread,
Dec 24, 2020, 4:02:23 AM12/24/20
to li...@googlegroups.com
Hi Inaki:

Thank you. You answered my question completely.

Best Regards,

Paul R.

Paul Romero

unread,
Dec 25, 2020, 9:00:46 PM12/25/20
to li...@googlegroups.com
Hi Inaki and Jameson:

A .tar file containing my prototype Server code is attached to this note. Under moderate testing,
with 5 concurrent TCP connections, the Server behaves correctly. Look at the following items
in the code to understand the details of how it uses async. operations.

* BOOL Async_Busy: A global variable protected by a Mutex.

* uv_mutex_t Async_Mutex: A Mutex used in async. operations.

* cdesc->async_handle: A per connection statically allocated per connection uv_async_t handle.

* cdesc->release_state: A per connection state variable used to manage async. operations performed
  for the purpose of releasing Libuv resources during TCP socket connection release.

Note that LIVUV_RELEASE is defined, identifies the relevant code sections, and the following
routines are essential to async. operations.

* ASYNC_WAIT(): Waits for an  async. operation to complete.

* ASYNC_BUSY(): Indicates whether an async. operation is in progress on not.

* poll_start_proxy(): Starts polling for a specific uv_poll_t handle with uv_poll_start().

* start_conn_release(): Stops polling of a a specific uv_poll_t handle and releases it,
  and terminates a TCP connection on a specific uv_tcp_t handle and releases it.

* close_callback(): Informs another task that a handle has been released.

What I need to do is have one task, A, prompt another task, B, to perform an operation
on an object owned by task B. The description below describes how my Server does so.
You indicated that uv_async_send() is not a queuing mechanism but suggested there is
a safe way to queue data for a loop orient task. I don't know how Libuv queues work
for such a task and how another task can convey "messages" to a loop oriented task.
Perhaps, there is a section of the documentation that describes such a mechanism
but I have not found it yet.

Could you point me in the right direction ?  I think the procedure described below
is unsafe. Even though uv_async_send() is reentrant the fact that you need to use
uv_async_init(), which is not reentrant, to start async. operation effectively
makes it non-reentrant. Also, I think if you can prevent uv_asyn_init() from being
called while an async. operation is in progress you can circumvent the problem.

This is a logical description of what occurs in my Server. For purposes of simplicity,
assume we have only the following two tasks which could be a thread task or Linux main
procedure.

Task A: A task which can receive messages via a condition variable and input queue with uv_cond_wait().

Task B: A loop oriented task that is executing uv_run()

Then we perform the following operations in order to make task B perform the desired operation.

1) Task A sends a message to task B.

  Currently this is done with uv_async_send() after the proxy routine to
  execute in task B had been specified with uv_async_init().

2) Task B performs an operation on a object it owns.

  The proxy routine executes in task B and performs the operation.

3) Task B informs Task A the operation is complete.

  Upon completion of the operation Task B sends a notification message to task A
  with a conventional queue insertion and uv_cond_send().


Best Regards,

Paul R.



On 12/23/2020 09:54 PM, Iñaki Baz Castillo wrote:
code.tar

Paul Romero

unread,
Dec 29, 2020, 3:39:18 PM12/29/20
to li...@googlegroups.com
Hi Inaki and Jameson:Hi Folks:

A version of ASYNC_WAIT() with the right logic follows. As I'm sure you guessed,
after moderately heavy testing with the old incorrect version, the predictable
consequences ensued.

My basic strategy for managing the uv_async_send() "channel" is to treat it
as a global resource. The Async_Busy variable is used to guarantee that only
1 task can access only 1 instance of the channel at any given time.


//
// Wait for the async. channel to become available.
//
ROUTINE void ASYNC_WAIT()
{
    struct timespec tsec;
    BOOL done, wait;

    //
    // 20 Milliseconds.
    //
    tsec.tv_sec = 0;
    tsec.tv_nsec = 20000000;        // 20 MS

    done = FALSE;
    while(done != TRUE)
      {
        wait = FALSE;
ENTER_MUTEX(&Async_Mutex);
        if(Async_Busy)
            wait = TRUE;
        else
          {
            Async_Busy = TRUE;
            done = TRUE;
          }
EXIT_MUTEX(&Async_Mutex);
        if(wait)
          {
            nanosleep(&tsec, NULL);

          }
      }

    return;
}


Best Regards,

Paul R.





xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx  LAST MSG xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Best Regards,

Paul R.



On 12/23/2020 09:54 PM, Iñaki Baz Castillo wrote:

-- 


Paul Romero
-----------
RCOM Communications Software
EMAIL: pa...@rcom-software.com
PHONE: (510)482-2769




--
You received this message because you are subscribed to the Google Groups "libuv" group.
To unsubscribe from this group and stop receiving emails from it, send an email to libuv+un...@googlegroups.com.

Iñaki Baz Castillo

unread,
Dec 29, 2020, 4:26:12 PM12/29/20
to li...@googlegroups.com
Hi, Paul. As a general rule please don't mention people when doing questions in a mailing list. Here there are many users that can help.

pa...@rcom-software.com

unread,
Jan 2, 2021, 12:13:46 PM1/2/21
to libuv
Hi Folks:

I implemented a method, which I believe is in line with your previous recommendations, to make a
Libuv loop oriented task perform an operation in response to an uv_async_send() call initiated by
another task.  The main steps are as follows.

* The initiating task inserts a message into the input queue of the target task.
  (See the send_async_msg() routine below.)

* A proxy routine configured with uv_async_init() executes in the target task.
  (See the poll_proxy() and connect_proxy() routines below.)

  The target tasks are as follows. The IO_Task() is based on the uv_loop_t Poll_Loop and
  reads incoming data with the callback routine specified during uv_poll_start() call, and
  the main() process is based on uv_loop_t Connect_Loop and accepts incoming connections.
  (See the poll_callback() routine below.)

* The proxy routine handles each message on the task input queue and performs
  the operation specified by the message type.

* The target task sends a message to the initiating task notifying it the operation is complete.
  (See the async_close_callback() and poll_proxy() routines below.)

The relevant code segments follow and any additional comments or suggestions you have would
be appreciated.

Best Regards,

Paul R.

A) Code from async.c - send_async_msg() and async_close_callback().

#include "os_def.h"
#include "basic_def.h"
#include <uv.h>
#include "sdu.h"

#include <sys/types.h>
#include <sys/socket.h>

//
// Use this definition once debugging is complete.
//
// #define PRIVATE static
//
#define PRIVATE

#define ENTER_MUTEX    uv_mutex_lock
#define EXIT_MUTEX    uv_mutex_unlock


//
// ***********************************************************************************************
// *************************************** DATA **************************************************
// ***********************************************************************************************
//

//
// Protects Connect_Task_Input_Q and IO_Task_Input_Q.
//
extern uv_mutex_t Async_Mutex;

//
// ***********************************************************************************************
// ************************************* PROTOTYPES  *********************************************
// ***********************************************************************************************
//

void send_async_msg(CONN_DESC *, ASYNC_DATA *);
void async_close_callback(uv_handle_t *);

void SEND_SDU(CONN_DESC *, SDU *);

void insert_msg(SDU_FIFO *, SDU *);

SDU *MSG_ALLOC(int, BOOL);
void VFREE(UCHAR *);

#ifdef LIBUV_PLUMBING

//
// Send a message to the target loop task.
//
ROUTINE void send_async_msg(CONN_DESC *cdesc, ASYNC_DATA *parm)
{
    SDU *msg;
    uv_handle_t **ppdata;

    msg = MSG_ALLOC( sizeof(uv_handle_t *), FALSE );

    msg->class = C_ASYNC;
    msg->type = parm->type;
    msg->conn_desc = (void *) cdesc;

    if(parm->object_handle)
       {
        ppdata = (uv_handle_t **) &msg->buffer[0];
        *ppdata = (uv_handle_t *) parm->object_handle;
      }

ENTER_MUTEX(&Async_Mutex);
    insert_msg(parm->async_q, msg);
EXIT_MUTEX(&Async_Mutex);

    uv_async_send(parm->async_handle);

    return;
}

ROUTINE void async_close_callback(uv_handle_t *handle)

{
    SDU *msg;

    CONN_DESC *cdesc = (CONN_DESC *) handle->data;

    VFREE((UCHAR *) handle);


    //
    // Send a notification message to the Protocol_Task.
    //
    msg = MSG_ALLOC(0, FALSE);
    msg->class = C_NOTIFY;
    msg->type = T_DISMANTLE_RSP;

    msg->info = 0;

    SEND_SDU(cdesc, msg);

    return;
}

#endif // LIBUV_PLUMBING

B) Code from io_task.c: poll_proxy() and poll_callback()


#ifdef LIBUV_PLUMBING

//
// Executes in the IO_Task() poll in response to a uv_async_send() by another task.
//
ROUTINE PRIVATE void poll_proxy(uv_async_t *handle)
{
    CONN_DESC *cdesc;
    SDU *msg, *rsp;
    uv_poll_t *poll_handle;
    uv_handle_t **ppdata;
    int r;
    BOOL done;

    //
    // Avoid clobbering another instance of the routine.
    //
    if(Poll_Proxy_Busy)
        return;

    Poll_Proxy_Busy = TRUE;
    //
    // Handle all messages on the input queue.
    //
    done = FALSE;
    while(done != TRUE)
      {
ENTER_MUTEX(&Async_Mutex);
        msg = remove_msg(&IO_Task_Input_Q);
EXIT_MUTEX(&Async_Mutex);

        if(msg)
          {
            cdesc = (CONN_DESC *) msg->conn_desc;
            switch(msg->type)
            {
            case T_POLL_CONFIG_REQ:
                poll_handle = (uv_poll_t *) VALLOC(sizeof(uv_poll_t));
                //
                // Add a new pool handle with the specified TCP socket fd to the loop.
                //
                uv_poll_init(&Poll_Loop, poll_handle, cdesc->fd);

                poll_handle->data = (void *) cdesc;
                //
                // Begin loop service for the handle.

                //
                if((r = uv_poll_start(poll_handle, UV_READABLE, poll_callback)) < 0)
                  {
                    fprintf(stderr, "POLL_PROXY:  Polling Initiation Error %d: %s\n", r, uv_err_name(r));
                    abort();
                  }
                //
                // Notify the Protocol_Task() that the handle has been added.
                //
                rsp = MSG_ALLOC(0, FALSE);
                rsp->class = C_NOTIFY;
                rsp->type = T_POLL_CONFIG_RSP;
                rsp->info = 0;
                rsp->conn_desc = (void *) poll_handle;

                SEND_SDU(cdesc, rsp);

                break;
            case T_POLL_DISMANTLE_REQ:
                //
                // Remove an existing handled from loop service.
                //
                ppdata = (uv_handle_t **) &msg->buffer[0];
                poll_handle = (uv_poll_t *) *ppdata;

                if( (r = uv_poll_stop(poll_handle)) )
                  {
                    printf("POLL_PROXY: Poll Stop Error %d: %s\n", r, uv_err_name(r) );
                    abort();
                  }
                //
                // async_close_callback() will notify the Protocol_Task() when
                // the operation is complete.
                //

                poll_handle->data = (void *) cdesc;
                uv_close((uv_handle_t *) poll_handle, async_close_callback);

                break;
            default:
                fprintf(stderr, "POLL_PROXY: Invalid Message = %d\n", msg->type);
                abort();

            }

            MSG_FREE(msg);
          }
        else
            done = TRUE;
      }

    Poll_Proxy_Busy = FALSE;

    return;
}

//
// This routine is invoked when epoll() detects an I/O read event.
//
ROUTINE PRIVATE void poll_callback(uv_poll_t *poll_handle, int status, int events)
{
    if(status == 0)
      {
        if(events & UV_READABLE)
           {
            int n;
            SDU *sdu;

            CONN_DESC *cdesc = (CONN_DESC *)  poll_handle->data;
            //
            // Read data from the TCP socket.
            //
            n = recv(cdesc->fd, cdesc->rx_buf, RX_BUF_SIZE, MSG_DONTWAIT);
            if(n > 0)
               {
                DECODE_RESULT r;
                //
                // Try to recognize an SDU.
                //
                r = rx_sdu(cdesc, n);
                switch(r)
                {
                case RX_PARTIAL:
                case RX_COMPLETE:
                case RX_EMPTY:
                    break;
                case RX_OVERFLOW:
                case RX_NOMEMORY:
                    //
                    // Notify the Protocol_Task() of the error. There is no choice but
                    // to send an abort, reinitialize everything, and release the connection.
                    //
                    // fprintf(stderr, "IO_TASK: Fatal SDU Reception Error %d\n", r);
                    //
                    sdu = MSG_ALLOC(0, FALSE);
                    sdu->class = C_NOTIFY;
                    sdu->type = T_FAULT;
                    sdu->info = (int) r;

                    SEND_SDU(cdesc, sdu);
                    break;
                default:
                    fprintf(stderr, "IO_TASK: BUG - Invalid SDU Recognition Code: %d\n", r);
                    abort();
                }

               }
            else
            if(n < 0)
              {
                n = 0;
                if( !(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) )
                  {
                    //
                    // Notify the Protocol_Task() that the socket is disconnected.
                    //
                    SDU *msg;

printf("RX DISCONNECT %d: %s !\n", errno, strerror(errno) );
fflush(stdout);

                    msg = MSG_ALLOC(0, FALSE);
                    msg->class = C_NOTIFY;
                    msg->type = T_DISCONNECT;

                    msg->info = 0;

                    SEND_SDU(cdesc, msg);
                  }
              }
           }
      }
    else
      {
        fprintf(stderr, "poll_callback: Bad Status %d\n", status);

      }

    return;
}

#endif // LIBUV_PLUMBING

C) Code from main.c: connect_proxy()

//
// Executes in the main() Connect_Loop in response to a call to async_send() by the Protocol_Task().
//
ROUTINE PRIVATE void connect_proxy(uv_async_t *handle)
{
    CONN_DESC *cdesc;
    SDU *msg;
    uv_tcp_t *conn_handle;
    uv_handle_t **ppdata;
    BOOL done;

    if(Connect_Proxy_Busy)
        return;

    Connect_Proxy_Busy = TRUE;

    //
    // Handle all messages from the Protocol_Task()
    //

    done = FALSE;
    while(done != TRUE)
      {
ENTER_MUTEX(&Async_Mutex);
        msg = remove_msg(&Connect_Task_Input_Q);
EXIT_MUTEX(&Async_Mutex);

        if(msg)
          {
            cdesc = (CONN_DESC *) msg->conn_desc;
            switch(msg->type)
            {
            case T_CONN_DISMANTLE_REQ:
                //
                // Release a uv_tcp_t connection handle.
                // The protocol task is notified by async_close_callback()
                // when the operation is complete.
                //
                ppdata = (uv_handle_t **) &msg->buffer[0];
                conn_handle = (uv_tcp_t *) *ppdata;

               
                conn_handle->data = (void *) cdesc;
                uv_close((uv_handle_t *) conn_handle, async_close_callback);
                break;
            default:
                fprintf(stderr, "CONNECT_PROXY: Invalid Message = %d\n", msg->type);
                abort();
            }

            MSG_FREE(msg);
          }
        else
            done = TRUE;
      }

    Connect_Proxy_Busy = FALSE;

    return;
}


#endif // LIBUV_PLUMBING

D) Code from main.c: Relevant initialization

    //
    //  Initialize the Libuv async. channel Mutex.
    //
    uv_mutex_init(&Async_Mutex);
    //
    // Initialize the Service_Q, used in the Protocol_Task() and
    // it Mutex and condition variable.
    //
    Service_Q = NULL;
    uv_mutex_init(&Service_Q_Mutex);
    uv_cond_init(&Service_Q_Cond);
    //
    // Initialize the protocol data Mutex.
    //
    uv_mutex_init(&Protocol_D_Mutex);
    //
    // Launch the Signal_Task(), Timer_Task(), IO_Task(), DB_Task(), Protocol_Task(), and Transmit_Task().
    //
    uv_thread_create(&Signal_Task_Handle, Signal_Task, NULL);
    uv_thread_create(&DB_Task_Handle, DB_Task, NULL);
    uv_thread_create(&Protocol_Task_Handle, Protocol_Task, NULL);
    uv_thread_create(&Send_Task_Handle, Transmit_Task, NULL);
    uv_thread_create(&Timer_Task_Handle, Timer_Task, NULL);
    uv_thread_create(&IO_Task_Handle, IO_Task, NULL);
    //
    // Initialize the Connect_Loop and its data.
    //
    Connect_Accept_Busy = FALSE;
    Connect_Proxy_Busy = FALSE;

    Connect_Task_Input_Q.head = Connect_Task_Input_Q.tail = NULL;

    uv_loop_init(&Connect_Loop);
    uv_async_init(&Connect_Loop, &Connect_Task_Async_Handle, connect_proxy);

    uv_tcp_init(&Connect_Loop, &listen_handle);
Reply all
Reply to author
Forward
0 new messages