uv_write callback not called in user thread

546 views
Skip to first unread message

Sarav Sandhu

unread,
Nov 26, 2013, 2:11:00 AM11/26/13
to li...@googlegroups.com
Hi,

I'm trying to make a HTTP server and I'm seeing some strange behaviour while running binary :-

Case 1 :
Code :

#include <stdio.h>
#include <stdlib.h>
#include <uv.h>
#include <errno.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

typedef struct
{
int mtype;
void *client;
void *buffer;
}stIPC;
int qid;

uv_stream_t *temp_client;

void on_new_connection(uv_stream_t *server, int status);
uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size);
void on_client_read(uv_stream_t *client, ssize_t nread, uv_buf_t buf);
void on_client_write(uv_write_t *req, int status);

void on_new_connection(uv_stream_t *server, int status) {
if (status == -1) {
fprintf(stderr, "error on_new_connection");
return;
}

uv_tcp_t *client;
client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(uv_default_loop(), client);

int result = uv_accept(server, (uv_stream_t*) client);

if (result == 0) { // success
uv_read_start((uv_stream_t*) client, alloc_buffer, on_client_read);
} else {
printf( "Issue in accept, closing client\n"); 
uv_close((uv_handle_t*) client, NULL);
}
}

uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size) {
return uv_buf_init((char*) malloc( 200 ), 200 );
}

void on_client_read(uv_stream_t *_client, ssize_t nread, uv_buf_t buf) {
if (nread == -1) {
fprintf(stderr, "error on_client_read");
uv_close((uv_handle_t*) _client, NULL);
return;
}
else
{
//printf("3rd party sent data : [%s]\n", (char*) buf.base);

uv_read_stop( _client );

///This is test code

char *msg;
msg = (char*) malloc( 200 );
bzero( msg, sizeof(msg) );
strcpy( msg, "This is alien world\n" );

uv_buf_t buf_1 = uv_buf_init( msg, sizeof(msg));
buf_1.len = strlen(msg);
buf_1.base = msg;

temp_client = _client;
uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));
req->data = (void*) buf_1.base;
//sleep(1);
//printf("Gonna write data \n");
uv_write(req, (uv_stream_t*) temp_client, &buf_1, 1, on_client_write);
}
}

void recv_from_gw()
{

stIPC recvObj;
bzero( &recvObj, sizeof( stIPC ) );

/*uv_work_t reciever_work;
 reciever_work.data = (void*) &recvObj;
 uv_queue_work( uv_default_loop(), &reciever_work, first, second );*/
while(0) // Do not run this part - while 0
{
                          //this section is disabled
}
}

}

void on_client_write(uv_write_t *req, int status) {
if (status == -1) {
fprintf(stderr, "error on_client_write\n");
uv_close((uv_handle_t*) req->handle, NULL);
printf( "closing.written status = %d: \n", status);
return;
}

uv_close((uv_handle_t*) req->handle, NULL);
//printf("Data sent to 3rd party [%s]\n", (char*) req->data);
char *buffer = (char*) req->data;
//free(req);
//free(buffer);
}

void recv_from_3rd_party()
{
uv_tcp_t server;
uv_tcp_init(uv_default_loop(), &server);
struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000);
uv_tcp_bind(&server, bind_addr);
printf("In listen Thread\n");
int r = uv_listen((uv_stream_t*) &server, 111, on_new_connection);
if (r) {
fprintf(stderr, "error uv_listen");
}

}


void main(void) {
int key = 7876;
qid = msgget( key, IPC_CREAT | 0666 );
if ( qid == -1 )
{
printf ( "msgget err, %d\n", errno);
exit(0);
}
uv_tcp_t server;
uv_tcp_init(uv_default_loop(), &server);
struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000);
uv_tcp_bind(&server, bind_addr);

int r = uv_listen((uv_stream_t*) &server, 200, on_new_connection);
if (r) {
fprintf(stderr, "error uv_listen");
}

pthread_t gwthrd, rdthrd;
pthread_create( &gwthrd, NULL,(void*) &recv_from_gw, NULL );
uv_loop_t* loop = uv_default_loop();
uv_run(loop, UV_RUN_DEFAULT);
}



The above code runs flawlessly and without any errors(though memory free() call is pending).

Case 2:
Code :
#include <stdio.h>
#include <stdlib.h>
#include <uv.h>
#include <errno.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

typedef struct
{
int mtype;
void *client;
void *buffer;
}stIPC;
int qid;

uv_stream_t *temp_client;

void on_new_connection(uv_stream_t *server, int status);
uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size);
void on_client_read(uv_stream_t *client, ssize_t nread, uv_buf_t buf);
void on_client_write(uv_write_t *req, int status);

void on_new_connection(uv_stream_t *server, int status) {
if (status == -1) {
fprintf(stderr, "error on_new_connection");
return;
}

//printf( " gotta new client\n");
uv_tcp_t *client;
client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
uv_tcp_init(uv_default_loop(), client);

int result = uv_accept(server, (uv_stream_t*) client);

if (result == 0) { // success
uv_read_start((uv_stream_t*) client, alloc_buffer, on_client_read);
} else {
printf( "Issue in accept, closing client\n"); 
uv_close((uv_handle_t*) client, NULL);
}
}

uv_buf_t alloc_buffer(uv_handle_t *handle, size_t suggested_size) {
return uv_buf_init((char*) malloc( 200 ), 200 );
}

void on_client_read(uv_stream_t *_client, ssize_t nread, uv_buf_t buf) {
if (nread == -1) {
fprintf(stderr, "error on_client_read");
uv_close((uv_handle_t*) _client, NULL);
return;
}
else
{
//printf("3rd party sent data : [%s]\n", (char*) buf.base);

uv_read_stop( _client );

stIPC sendobj;
sendobj.mtype=111;
sendobj.client =  _client;
sendobj.buffer =  &buf;
int ret = msgsnd( qid, (void*) &sendobj, sizeof(sendobj), IPC_NOWAIT);
if ( -1 == ret )
{
printf ("Msgsnd err : %d\n ", errno);
exit(0);
}
}
}

void recv_from_gw()
{

stIPC recvObj;
bzero( &recvObj, sizeof( stIPC ) );

/*uv_work_t recie=ver_work;
 reciever_work.data = (void*) &recvObj;
 uv_queue_work( uv_default_loop(), &reciever_work, first, second );*/
while(1)
{
int ret=0;
//printf("in msgrcv\n");
//sleep(1);
//usleep(11);
ret = msgrcv( qid, &recvObj, sizeof( recvObj ), 111, 0 );
if( -1 == ret )
{
//usleep(500);
if( ENOMSG != errno )
exit(0);
else
{
printf( "no data from q yet \n");
usleep(1);
}
}
else
{
usleep(111);
printf( "got data from q \n");
uv_stream_t* client = recvObj.client;
char *msg;
msg = (char*) malloc( 200 );
bzero( msg, sizeof(msg) );
strcpy( msg, "This is alien world\n" );

uv_buf_t buf = uv_buf_init( msg, sizeof(msg));
buf.len = strlen(msg);
buf.base = msg;

uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));
req->data = (void*) buf.base;
printf("Gonna write data \n");
uv_write(req, (uv_stream_t*) client, &buf, 1, on_client_write);
}
}

}

void on_client_write(uv_write_t *req, int status) {
if (status == -1) {
fprintf(stderr, "error on_client_write\n");
uv_close((uv_handle_t*) req->handle, NULL);
printf( "closing.written status = %d: \n", status);
return;
}

//printf( "closing.written status = %d: \n", status);
uv_close((uv_handle_t*) req->handle, NULL);
//printf("Data sent to 3rd party [%s]\n", (char*) req->data);
char *buffer = (char*) req->data;
//free(req);
//free(buffer);
}

void recv_from_3rd_party()
{
uv_tcp_t server;
uv_tcp_init(uv_default_loop(), &server);
struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000);
uv_tcp_bind(&server, bind_addr);
printf("In listen Thread\n");
int r = uv_listen((uv_stream_t*) &server, 111, on_new_connection);
if (r) {
fprintf(stderr, "error uv_listen");
//return 1;
}

}


void main(void) {
printf( "in sarav \n");
//buffer_size = 200;
int key = 7876;
qid = msgget( key, IPC_CREAT | 0666 );
if ( qid == -1 )
{
printf ( "msgget err, %d\n", errno);
exit(0);
}
uv_tcp_t server;
uv_tcp_init(uv_default_loop(), &server);
struct sockaddr_in bind_addr = uv_ip4_addr("0.0.0.0", 7000);
uv_tcp_bind(&server, bind_addr);

int r = uv_listen((uv_stream_t*) &server, 200, on_new_connection);
if (r) {
fprintf(stderr, "error uv_listen");
//return 1;
}

pthread_t gwthrd, rdthrd;
pthread_create( &gwthrd, NULL,(void*) &recv_from_gw, NULL );
uv_loop_t* loop = uv_default_loop();
//uv_run(uv_default_loop(), UV_RUN_DEFAULT);
uv_run(loop, UV_RUN_DEFAULT);
//return 0;
}


Now this is the code that is causing weird behavior.  When we run serial connection(one connection at a time), the previous uv_write's callback is not called until new connection is not initiated. The same, when happening in parallel connections, throws assert at line "    uv__req_unregister(stream->loop, req);" in file src/unix/stream.c, line number 877.

Please suggest the solution to this problem.

OS : Fedora Linux 17
gcc : gcc version 4.7.2 20120921 (Red Hat 4.7.2-2) (GCC) 
libuv version 0.11.1
code Compilation command : gcc -O0 -g server.c -o server -I../../libuv/include/ ../../libuv/libuv.a -lrt -ldl -lm -pthread

Saúl Ibarra Corretgé

unread,
Nov 26, 2013, 3:22:21 AM11/26/13
to li...@googlegroups.com
Hi,

In your case 2 examole you are calling uv_write in the recv_from_gw
function, which runs in a different thread. The only thread safe
function in libuv is uv_async_send, calling uv_write from a different
thread is a no go.


Cheers,

On 11/26/13 8:11 AM, Sarav Sandhu wrote:
> Hi,
>
> I'm trying to make a HTTP server and I'm seeing some strange behaviour
> while running binary :-
>
> *_Case 1 :_*
> _Code :
> _
> _*Case 2:*_
> _Code :_
> --
> You received this message because you are subscribed to the Google
> Groups "libuv" group.
> To unsubscribe from this group and stop receiving emails from it, send
> an email to libuv+un...@googlegroups.com.
> To post to this group, send email to li...@googlegroups.com.
> Visit this group at http://groups.google.com/group/libuv.
> For more options, visit https://groups.google.com/groups/opt_out.


--
Sa�l Ibarra Corretg�
http://bettercallsaghul.com

Sarav Sandhu

unread,
Nov 26, 2013, 5:26:50 AM11/26/13
to li...@googlegroups.com
Well I did know that function but I've heard that "libuv may also combine multiple calls to uv_async_send and invoke your callback only once. The only guarantee that libuv makes is – The callback function is called at least once after the call to uv_async_send." at this link : http://nikhilm.github.io/uvbook/threads.html#inter-thread-communication

Now the example supplied with the book mentioned on the link has a global uv_async_t  object and the uv_async_send calls are made repetitively using this global object. I want to know if the problem mentioned above will go away if the object is locally declared in user thread. Book Example Code: https://github.com/nikhilm/uvbook/blob/master/code/progress/main.c 

Regards,
Sarav
Sa�l Ibarra Corretg�
http://bettercallsaghul.com

Saúl Ibarra Corretgé

unread,
Nov 26, 2013, 5:42:51 AM11/26/13
to li...@googlegroups.com
On 11/26/13 11:26 AM, Sarav Sandhu wrote:
> Well I did know that function but I've heard that "libuv may also
> combine multiple calls to uv_async_send and invoke your callback only
> once. The only guarantee that libuv makes is � The callback function is
> called /at least once/ after the call to uv_async_send." at this link :
> http://nikhilm.github.io/uvbook/threads.html#inter-thread-communication
>
> Now the example supplied with the book mentioned on the link has a
> global uv_async_t object and the uv_async_send calls are
> made repetitively using this global object. I want to know if the
> problem mentioned above will go away if the object is locally declared
> in user thread. Book Example Code:
> https://github.com/nikhilm/uvbook/blob/master/code/progress/main.c
>

It's not really safe to attach something to async->data and then use it
in the callback, libuv could coalesce the calls and then the pointer
would be overwritten before you get a chance to read it.

Sarav Sandhu

unread,
Nov 26, 2013, 6:29:17 AM11/26/13
to li...@googlegroups.com
Ok, then what exactly is the solution to this problem. What I need is that there must be 1 uv_write() call corresponding to the data received in msgrcv() call.

Regards,
Sarav

Saúl Ibarra Corretgé

unread,
Nov 26, 2013, 12:29:54 PM11/26/13
to li...@googlegroups.com
On 11/26/13 12:29 PM, Sarav Sandhu wrote:
> Ok, then what exactly is the solution to this problem. What I need is
> that there must be 1 uv_write() call corresponding to the data received
> in msgrcv() call.
>

You could put the data you want to write on a queue and use an async
handle which starts an idle handle in its callback, and in the idle
handle's callback you consume the queue and do the actual uv_write.

Sarav Sandhu

unread,
Dec 3, 2013, 12:56:24 AM12/3/13
to li...@googlegroups.com
Hi Saul,

I couldn't understand why you have mentioned to call the idle callback inside async callback. Why can't we just call the async callback and consume the POSIX Message queue in it directly and do uv_write ?

On Tuesday, November 26, 2013 10:59:54 PM UTC+5:30, Saúl Ibarra Corretgé wrote:
On 11/26/13 12:29 PM, Sarav Sandhu wrote:
> Ok, then what exactly is the solution to this problem. What I need is
> that there must be 1 uv_write() call corresponding to the data received
> in msgrcv() call.
>

You could put the data you want to write on a queue and use an async
handle which starts an idle handle in its callback, and in the idle
handle's callback you consume the queue and do the actual uv_write.

--

Saúl Ibarra Corretgé

unread,
Dec 3, 2013, 3:52:44 AM12/3/13
to li...@googlegroups.com
On 12/3/13 6:56 AM, Sarav Sandhu wrote:
> Hi Saul,
>
> I couldn't understand why you have mentioned to call the idle callback
> inside async callback. Why can't we just call the async callback and
> consume the POSIX Message queue in it directly and do uv_write ?
>

Hi Sarav,

There are probably many ways to do things :-) The one I described has
worked for me very well. I use it from Python, so things are kind of
easier there :-)

The idea is:

1. From whatever thread, put your buffer into a thread safe queue
2. Call uv_async_send
3. In the async callback, start an idle handle, whose only duty is to
process the queue
4. When the idle handle callback is executed, it will process the queue
and stop

The reason why I use the idle handle is because the async handle can
coalesce the calls to send, so new items could be added to the queue
while I'm processing some of them. In my particular implementation, I
calculate the number of items in the queue when the idle callback is
called and only process those. If at the end of the callback the queue
contains more items, the handle is not stopped, they will be processed
in the next tick.

YMMV, of course.
Reply all
Reply to author
Forward
0 new messages