thread safety of C client

106 views
Skip to first unread message

Philip Howard

unread,
May 9, 2012, 2:14:43 PM5/9/12
to mongodb-user
I have a C client that does the following in a loop:


bson_init(&doc);
bson_append_new_oid(&doc, "_id");
bson_append_string(&doc, "text", text);
bson_append_int(&doc, "key", key);
bson_append_int(&doc, "visits", 0);
bson_finish(&doc);

status = mongo_insert(conn, NAMESPACE, &doc);
if (status != MONGO_OK) return status;

bson_destroy(&doc);

the loop creates N documents, each with a different key. If I have a
single thread create all the docs, it works fine. If I have two
threads, one that creates the first N/2 docs and the other that
creates the second N/2 docs, a small percentage of the docs come up
missing when I try to query them. The status to mongo_insert always
comes up MONGO_OK, so the errors aren't reported while inserting.

Each thread has its own connection to the DB, and the threads are not
sharing any data in my client code.

Any ideas on why multiple threads seems to be breaking this?

Kyle Banker

unread,
May 9, 2012, 2:34:20 PM5/9/12
to mongod...@googlegroups.com
Hi Philip,

Can you please post a complete code sample? The reason for the behavior you're seeing isn't obvious yet.

Kyle

Waitman Gobble

unread,
May 9, 2012, 3:41:17 PM5/9/12
to mongod...@googlegroups.com

> --
> You received this message because you are subscribed to the Google Groups "mongodb-user" group.
> To view this discussion on the web visit https://groups.google.com/d/msg/mongodb-user/-/1XXAol7NJAAJ.
>
> To post to this group, send email to mongod...@googlegroups.com.
> To unsubscribe from this group, send email to mongodb-user...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/mongodb-user?hl=en.

i haven't noticed stuff missing, i will run more tests to confirm

... but the order of inserts may vary.

Waitman Gobble
San Jose California USA

Philip Howard

unread,
May 10, 2012, 12:18:52 PM5/10/12
to mongodb-user
Here is the complete source for our test program. When run with
multiple threads (e.g. -t:2), some of the inserts do not seem to make
it to the DB.
-phil

#include <stdio.h>
#include <stdlib.h>
#include <sys/timeb.h>
#include <pthread.h>
#include <mongo.h>

#define DB "test"
#define COLLECTION "benchmark1"
#define NAMESPACE DB "." COLLECTION
#define SETUP_NAMESPACE DB "." COLLECTION "Setup"

static char HOST[256] = "127.0.0.1";
#define PORT 27017

enum {OP_INSERT, OP_READ, OP_UPDATE};

//******************************************************
unsigned long long usec()
{
unsigned long long usec_value;
struct timeb t;

ftime(&t);
usec_value = ((unsigned long long)t.time)*1000000 +
((unsigned long long)t.millitm)*1000;

return usec_value;
}
//******************************************************
int create_data(mongo *conn, int n_docs, int start_key, int size)
{
bson doc;
char *text;
int key;
int ii,jj;
int status;
unsigned long long start, end;

text = malloc(size+1);

start = usec();

for (key=start_key; key<start_key+n_docs; key++)
{
for (ii=0; ii<size; ii++)
{
text[ii] = 'A' + rand()%26;
}
text[size-1] = 0;

bson_init(&doc);
bson_append_new_oid(&doc, "_id");
bson_append_string(&doc, "text", text);
bson_append_int(&doc, "key", key);
bson_append_int(&doc, "visits", 0);
bson_finish(&doc);

status = mongo_insert(conn, NAMESPACE, &doc);
if (status != MONGO_OK) return status;
//printf("Inserted %05d\n", key);

bson_destroy(&doc);
}

end = usec();

free(text);

printf("Created %d docs in %8.2f msecs\n", n_docs, (end - start)/
1000.0);

return MONGO_OK;
}

//******************************************************
int find_data(mongo *conn, int n_queries, int start_key)
{
bson query;
mongo_cursor cursor;
int ii;
int key;
int found = 0;
unsigned long long start, end;

start = usec();

for (key=start_key; key<start_key+n_queries; key++)
{
bson_init(&query);
bson_append_int(&query, "key", key);
bson_finish(&query);
mongo_cursor_init(&cursor, conn, NAMESPACE);
mongo_cursor_set_query(&cursor, &query);

if (mongo_cursor_next(&cursor) == MONGO_OK) {
found++;
while (mongo_cursor_next(&cursor) == MONGO_OK) {
//bson_print(&cursor.current);
//printf("Found %05d\n", key);
found++;
printf("Duplicate Found %d\n", key);
}
} else {
printf("Not Found %d\n", key);
}

bson_destroy(&query);
mongo_cursor_destroy(&cursor);
}

end = usec();

printf("Found %d documents in %d tries in %8.2f msecs\n",
found, n_queries, (end - start)/1000.0);

if (found == n_queries) return MONGO_OK;
return MONGO_ERROR;
}
//******************************************************
int update_data(mongo *conn, int n_queries, int start_key)
{
/*
bson query, op;
int ii;
int key;
int found = 0;
int status;
unsigned long long start, end;

start = usec();

for (ii=0; ii<n_queries; ii++)
{
if (rand()%100 > local_percent)
key = rand()%(key_max * local_ext / 100);
else
key = rand()%key_max;

bson_init(&query);
bson_append_int(&query, "key", key);
bson_finish(&query);

bson_init(&op);
bson_append_start_object(&op, "$inc");
bson_append_int(&op, "visits", 1);
bson_append_finish_object(&op);
bson_finish(&op);

status = mongo_update(conn, NAMESPACE,
&query, &op, MONGO_UPDATE_BASIC);
if (status == MONGO_OK)
{
found++;
} else {
printf("Update failed: %d\n", conn->err);
}

bson_destroy(&query);
bson_destroy(&op);
}

end = usec();

printf("Updated %d documents in %8.2f msecs\n", found, (end -
start)/1000.0);
*/
return MONGO_ERROR; // not implemented
}
//******************************************************
typedef struct
{
int size;
int n_records;
int n_threads;
int op;
int no_create;
int thread_index;
mongo conn;
pthread_t thread;
} args_t;
//******************************************************
void usage()
{
printf("test1\n"
" [-c:<do create>]\n"
" [-n:<number of records>]\n"
" [-h:<host>]\n"
" [-s:<data size>]\n"
" [-t:<n threads>]\n"
);
exit(1);
}
//******************************************************
void parse_args(int argc, char **argv, args_t *args)
{
int ii;
char arg;
int value;
int index;
char *cvalue;

for (ii=1; ii<argc; ii++)
{
if (argv[ii][0] == '-') {
arg = argv[ii][1];
index = 2;
} else {
arg = argv[ii][0];
index = 1;
}
if (argv[ii][index] == ':') {
cvalue = &argv[ii][index+1];
value = atoi(&argv[ii][index+1]);
} else {
if (argc < ii+1) usage();
ii++;
cvalue = argv[ii];
value = atoi(argv[ii]);
}

switch (arg)
{
case 'c':
args->no_create = value;
break;
case 'h':
strcpy(HOST, cvalue);
break;
case 'n':
args->n_records = value;
break;
case 's':
args->size = value;
break;
case 't':
args->n_threads = value;
break;
default:
usage();
break;
}
}
}
//******************************************************
static char *op_name[] = {"insert", "read", "update"};

void *thread_func(void *argp)
{
args_t *args = (args_t *)argp;
static volatile int n_threads = 0;
static long long start, stop;
int n_running;
unsigned long result;
int start_rec;
int n_records;

// Pause until all threads are started.
// This will generate maximum contention once we start hitting
// the DB.
n_running = __sync_add_and_fetch(&n_threads, 1);

if (n_running == args->n_threads)
{
start = usec();
} else {
while (n_threads != args->n_threads)
{
__asm__ volatile ("pause" :::);
}
}

n_records = args->n_records/args->n_threads;
start_rec = n_records * args->thread_index;
switch (args->op)
{
case OP_INSERT:
result = create_data(&(args->conn), n_records, start_rec,
args->size);
break;
case OP_READ:
result = find_data(&(args->conn), n_records, start_rec);
break;
case OP_UPDATE:
result = update_data(&(args->conn), n_records, start_rec);
break;
}

n_running = __sync_add_and_fetch(&n_threads, -1);

if (n_running == 0)
{
stop = usec();
printf("total time for %s %d %d is %f msecs\n",
op_name[args->op], args->n_threads, args->n_records,
(stop - start)/1000.0);
}
return (void *)result;
}
//******************************************************
static int create_threads(args_t *args, int op)
{
int ii;
int status;
args_t *arg_list;
void *thread_result;
int result = MONGO_OK;

args->op = op;

arg_list = (args_t *)malloc(sizeof(args_t) * args->n_threads);
for (ii=0; ii<args->n_threads; ii++)
{
memcpy(&arg_list[ii], args, sizeof(args_t));

arg_list[ii].thread_index = ii;

status = mongo_connect( &(arg_list[ii].conn), HOST, PORT);

if( status != MONGO_OK ) {
printf("Connection %d failed\n", ii);
exit(1);
}
}


for (ii=0; ii<args->n_threads; ii++)
{
pthread_create(&(arg_list[ii].thread), NULL, thread_func,
&arg_list[ii]);
}

for (ii=0; ii<args->n_threads; ii++)
{
pthread_join(arg_list[ii].thread, &thread_result);
if (thread_result != NULL) result = MONGO_ERROR;
}

free(arg_list);

return result;
}
//******************************************************
int main(int argc, char **argv)
{
bson doc;
mongo conn[1];
bson index;
int status;
args_t args = {64,0,0,0,0,0};

parse_args(argc, argv, &args);

status = mongo_connect( conn, HOST, PORT);

if( status != MONGO_OK ) {
printf("Connection failed\n");
return 1;
}

if (!args.no_create)
{
mongo_cmd_drop_collection( conn, DB, COLLECTION, NULL);

status = create_threads(&args, OP_INSERT);
if (status != MONGO_OK)
{
printf("Insert thread failed\n");
exit(2);
}

bson_init(&index);
bson_append_int(&index, "key", 1);
bson_finish(&index);
status = mongo_create_index(conn, NAMESPACE, &index, 0, NULL);
if (status != MONGO_OK)
{
printf("create index failed %d\n", conn->err);
return 4;
}
bson_destroy(&index);
}

status = create_threads(&args, OP_READ);
if (status != MONGO_OK)
{
printf("Read thread failed\n");
exit(3);
}

//create_threads(&args, OP_UPDATE);

mongo_destroy( conn );

printf("Test PASSED\n");

return 0;
}

On May 9, 12:41 pm, Waitman Gobble <gobble...@gmail.com> wrote:

Gary Murakami

unread,
Aug 8, 2012, 10:06:30 AM8/8/12
to mongod...@googlegroups.com
bson_append_new_oid calls bson_oid_gen which uses statics and is not thread safe, i.e., there could be collisions with the generated oids with multiple threads.  Please see https://jira.mongodb.org/browse/CDRIVER-145 - this is a known issue.  To provide OIDs without collision, you should supply
bson_set_oid_fuzz and/or bson_set_oid_inc, please see:


http://api.mongodb.org/c/current/api/bson_8h.html#a1a0cfc578a3c067a4ffbe5d3f12386b9
http://api.mongodb.org/c/current/api/bson_8h.html#aeb6eee3f9ee6a8f3df0b11198a451455





Hope that this helps.  Please let us know if this addresses your issue.


On Wednesday, May 9, 2012 2:14:43 PM UTC-4, Philip Howard wrote:
Reply all
Reply to author
Forward
0 new messages