Hi Folks:
I implemented a method, which I believe is in line with your previous recommendations, to make a
Libuv loop oriented task perform an operation in response to an uv_async_send() call initiated by
another task. The main steps are as follows.
* The initiating task inserts a message into the input queue of the target task.
(See the send_async_msg() routine below.)
* A proxy routine configured with uv_async_init() executes in the target task.
(See the poll_proxy() and connect_proxy() routines below.)
The target tasks are as follows. The IO_Task() is based on the uv_loop_t Poll_Loop and
reads incoming data with the callback routine specified during uv_poll_start() call, and
the main() process is based on uv_loop_t Connect_Loop and accepts incoming connections.
(See the poll_callback() routine below.)
* The proxy routine handles each message on the task input queue and performs
the operation specified by the message type.
* The target task sends a message to the initiating task notifying it the operation is complete.
(See the async_close_callback() and poll_proxy() routines below.)
The relevant code segments follow and any additional comments or suggestions you have would
be appreciated.
Best Regards,
Paul R.
A) Code from async.c - send_async_msg() and async_close_callback().
#include "os_def.h"
#include "basic_def.h"
#include <uv.h>
#include "sdu.h"
#include <sys/types.h>
#include <sys/socket.h>
//
// Use this definition once debugging is complete.
//
// #define PRIVATE static
//
#define PRIVATE
#define ENTER_MUTEX uv_mutex_lock
#define EXIT_MUTEX uv_mutex_unlock
//
// ***********************************************************************************************
// *************************************** DATA **************************************************
// ***********************************************************************************************
//
//
// Protects Connect_Task_Input_Q and IO_Task_Input_Q.
//
extern uv_mutex_t Async_Mutex;
//
// ***********************************************************************************************
// ************************************* PROTOTYPES *********************************************
// ***********************************************************************************************
//
void send_async_msg(CONN_DESC *, ASYNC_DATA *);
void async_close_callback(uv_handle_t *);
void SEND_SDU(CONN_DESC *, SDU *);
void insert_msg(SDU_FIFO *, SDU *);
SDU *MSG_ALLOC(int, BOOL);
void VFREE(UCHAR *);
#ifdef LIBUV_PLUMBING
//
// Send a message to the target loop task.
//
ROUTINE void send_async_msg(CONN_DESC *cdesc, ASYNC_DATA *parm)
{
SDU *msg;
uv_handle_t **ppdata;
msg = MSG_ALLOC( sizeof(uv_handle_t *), FALSE );
msg->class = C_ASYNC;
msg->type = parm->type;
msg->conn_desc = (void *) cdesc;
if(parm->object_handle)
{
ppdata = (uv_handle_t **) &msg->buffer[0];
*ppdata = (uv_handle_t *) parm->object_handle;
}
ENTER_MUTEX(&Async_Mutex);
insert_msg(parm->async_q, msg);
EXIT_MUTEX(&Async_Mutex);
uv_async_send(parm->async_handle);
return;
}
ROUTINE void async_close_callback(uv_handle_t *handle)
{
SDU *msg;
CONN_DESC *cdesc = (CONN_DESC *) handle->data;
VFREE((UCHAR *) handle);
//
// Send a notification message to the Protocol_Task.
//
msg = MSG_ALLOC(0, FALSE);
msg->class = C_NOTIFY;
msg->type = T_DISMANTLE_RSP;
msg->info = 0;
SEND_SDU(cdesc, msg);
return;
}
#endif // LIBUV_PLUMBING
B) Code from io_task.c: poll_proxy() and poll_callback()
#ifdef LIBUV_PLUMBING
//
// Executes in the IO_Task() poll in response to a uv_async_send() by another task.
//
ROUTINE PRIVATE void poll_proxy(uv_async_t *handle)
{
CONN_DESC *cdesc;
SDU *msg, *rsp;
uv_poll_t *poll_handle;
uv_handle_t **ppdata;
int r;
BOOL done;
//
// Avoid clobbering another instance of the routine.
//
if(Poll_Proxy_Busy)
return;
Poll_Proxy_Busy = TRUE;
//
// Handle all messages on the input queue.
//
done = FALSE;
while(done != TRUE)
{
ENTER_MUTEX(&Async_Mutex);
msg = remove_msg(&IO_Task_Input_Q);
EXIT_MUTEX(&Async_Mutex);
if(msg)
{
cdesc = (CONN_DESC *) msg->conn_desc;
switch(msg->type)
{
case T_POLL_CONFIG_REQ:
poll_handle = (uv_poll_t *) VALLOC(sizeof(uv_poll_t));
//
// Add a new pool handle with the specified TCP socket fd to the loop.
//
uv_poll_init(&Poll_Loop, poll_handle, cdesc->fd);
poll_handle->data = (void *) cdesc;
//
// Begin loop service for the handle.
//
if((r = uv_poll_start(poll_handle, UV_READABLE, poll_callback)) < 0)
{
fprintf(stderr, "POLL_PROXY: Polling Initiation Error %d: %s\n", r, uv_err_name(r));
abort();
}
//
// Notify the Protocol_Task() that the handle has been added.
//
rsp = MSG_ALLOC(0, FALSE);
rsp->class = C_NOTIFY;
rsp->type = T_POLL_CONFIG_RSP;
rsp->info = 0;
rsp->conn_desc = (void *) poll_handle;
SEND_SDU(cdesc, rsp);
break;
case T_POLL_DISMANTLE_REQ:
//
// Remove an existing handled from loop service.
//
ppdata = (uv_handle_t **) &msg->buffer[0];
poll_handle = (uv_poll_t *) *ppdata;
if( (r = uv_poll_stop(poll_handle)) )
{
printf("POLL_PROXY: Poll Stop Error %d: %s\n", r, uv_err_name(r) );
abort();
}
//
// async_close_callback() will notify the Protocol_Task() when
// the operation is complete.
//
poll_handle->data = (void *) cdesc;
uv_close((uv_handle_t *) poll_handle, async_close_callback);
break;
default:
fprintf(stderr, "POLL_PROXY: Invalid Message = %d\n", msg->type);
abort();
}
MSG_FREE(msg);
}
else
done = TRUE;
}
Poll_Proxy_Busy = FALSE;
return;
}
//
// This routine is invoked when epoll() detects an I/O read event.
//
ROUTINE PRIVATE void poll_callback(uv_poll_t *poll_handle, int status, int events)
{
if(status == 0)
{
if(events & UV_READABLE)
{
int n;
SDU *sdu;
CONN_DESC *cdesc = (CONN_DESC *) poll_handle->data;
//
// Read data from the TCP socket.
//
n = recv(cdesc->fd, cdesc->rx_buf, RX_BUF_SIZE, MSG_DONTWAIT);
if(n > 0)
{
DECODE_RESULT r;
//
// Try to recognize an SDU.
//
r = rx_sdu(cdesc, n);
switch(r)
{
case RX_PARTIAL:
case RX_COMPLETE:
case RX_EMPTY:
break;
case RX_OVERFLOW:
case RX_NOMEMORY:
//
// Notify the Protocol_Task() of the error. There is no choice but
// to send an abort, reinitialize everything, and release the connection.
//
// fprintf(stderr, "IO_TASK: Fatal SDU Reception Error %d\n", r);
//
sdu = MSG_ALLOC(0, FALSE);
sdu->class = C_NOTIFY;
sdu->type = T_FAULT;
sdu->info = (int) r;
SEND_SDU(cdesc, sdu);
break;
default:
fprintf(stderr, "IO_TASK: BUG - Invalid SDU Recognition Code: %d\n", r);
abort();
}
}
else
if(n < 0)
{
n = 0;
if( !(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) )
{
//
// Notify the Protocol_Task() that the socket is disconnected.
//
SDU *msg;
printf("RX DISCONNECT %d: %s !\n", errno, strerror(errno) );
fflush(stdout);
msg = MSG_ALLOC(0, FALSE);
msg->class = C_NOTIFY;
msg->type = T_DISCONNECT;
msg->info = 0;
SEND_SDU(cdesc, msg);
}
}
}
}
else
{
fprintf(stderr, "poll_callback: Bad Status %d\n", status);
}
return;
}
#endif // LIBUV_PLUMBING
C) Code from main.c: connect_proxy()
//
// Executes in the main() Connect_Loop in response to a call to async_send() by the Protocol_Task().
//
ROUTINE PRIVATE void connect_proxy(uv_async_t *handle)
{
CONN_DESC *cdesc;
SDU *msg;
uv_tcp_t *conn_handle;
uv_handle_t **ppdata;
BOOL done;
if(Connect_Proxy_Busy)
return;
Connect_Proxy_Busy = TRUE;
//
// Handle all messages from the Protocol_Task()
//
done = FALSE;
while(done != TRUE)
{
ENTER_MUTEX(&Async_Mutex);
msg = remove_msg(&Connect_Task_Input_Q);
EXIT_MUTEX(&Async_Mutex);
if(msg)
{
cdesc = (CONN_DESC *) msg->conn_desc;
switch(msg->type)
{
case T_CONN_DISMANTLE_REQ:
//
// Release a uv_tcp_t connection handle.
// The protocol task is notified by async_close_callback()
// when the operation is complete.
//
ppdata = (uv_handle_t **) &msg->buffer[0];
conn_handle = (uv_tcp_t *) *ppdata;
conn_handle->data = (void *) cdesc;
uv_close((uv_handle_t *) conn_handle, async_close_callback);
break;
default:
fprintf(stderr, "CONNECT_PROXY: Invalid Message = %d\n", msg->type);
abort();
}
MSG_FREE(msg);
}
else
done = TRUE;
}
Connect_Proxy_Busy = FALSE;
return;
}
#endif // LIBUV_PLUMBING
D) Code from main.c: Relevant initialization
//
// Initialize the Libuv async. channel Mutex.
//
uv_mutex_init(&Async_Mutex);
//
// Initialize the Service_Q, used in the Protocol_Task() and
// it Mutex and condition variable.
//
Service_Q = NULL;
uv_mutex_init(&Service_Q_Mutex);
uv_cond_init(&Service_Q_Cond);
//
// Initialize the protocol data Mutex.
//
uv_mutex_init(&Protocol_D_Mutex);
//
// Launch the Signal_Task(), Timer_Task(), IO_Task(), DB_Task(), Protocol_Task(), and Transmit_Task().
//
uv_thread_create(&Signal_Task_Handle, Signal_Task, NULL);
uv_thread_create(&DB_Task_Handle, DB_Task, NULL);
uv_thread_create(&Protocol_Task_Handle, Protocol_Task, NULL);
uv_thread_create(&Send_Task_Handle, Transmit_Task, NULL);
uv_thread_create(&Timer_Task_Handle, Timer_Task, NULL);
uv_thread_create(&IO_Task_Handle, IO_Task, NULL);
//
// Initialize the Connect_Loop and its data.
//
Connect_Accept_Busy = FALSE;
Connect_Proxy_Busy = FALSE;
Connect_Task_Input_Q.head = Connect_Task_Input_Q.tail = NULL;
uv_loop_init(&Connect_Loop);
uv_async_init(&Connect_Loop, &Connect_Task_Async_Handle, connect_proxy);
uv_tcp_init(&Connect_Loop, &listen_handle);