Here is the complete source for our test program. When run with
multiple threads (e.g. -t:2), some of the inserts do not seem to make
it to the DB.
-phil
#include <stdio.h>
#include <stdlib.h>
#include <sys/timeb.h>
#include <pthread.h>
#include <mongo.h>
#define DB "test"
#define COLLECTION "benchmark1"
#define NAMESPACE DB "." COLLECTION
#define SETUP_NAMESPACE DB "." COLLECTION "Setup"
static char HOST[256] = "127.0.0.1";
#define PORT 27017
enum {OP_INSERT, OP_READ, OP_UPDATE};
//******************************************************
unsigned long long usec()
{
unsigned long long usec_value;
struct timeb t;
ftime(&t);
usec_value = ((unsigned long long)t.time)*1000000 +
((unsigned long long)t.millitm)*1000;
return usec_value;
}
//******************************************************
int create_data(mongo *conn, int n_docs, int start_key, int size)
{
bson doc;
char *text;
int key;
int ii,jj;
int status;
unsigned long long start, end;
text = malloc(size+1);
start = usec();
for (key=start_key; key<start_key+n_docs; key++)
{
for (ii=0; ii<size; ii++)
{
text[ii] = 'A' + rand()%26;
}
text[size-1] = 0;
bson_init(&doc);
bson_append_new_oid(&doc, "_id");
bson_append_string(&doc, "text", text);
bson_append_int(&doc, "key", key);
bson_append_int(&doc, "visits", 0);
bson_finish(&doc);
status = mongo_insert(conn, NAMESPACE, &doc);
if (status != MONGO_OK) return status;
//printf("Inserted %05d\n", key);
bson_destroy(&doc);
}
end = usec();
free(text);
printf("Created %d docs in %8.2f msecs\n", n_docs, (end - start)/
1000.0);
return MONGO_OK;
}
//******************************************************
int find_data(mongo *conn, int n_queries, int start_key)
{
bson query;
mongo_cursor cursor;
int ii;
int key;
int found = 0;
unsigned long long start, end;
start = usec();
for (key=start_key; key<start_key+n_queries; key++)
{
bson_init(&query);
bson_append_int(&query, "key", key);
bson_finish(&query);
mongo_cursor_init(&cursor, conn, NAMESPACE);
mongo_cursor_set_query(&cursor, &query);
if (mongo_cursor_next(&cursor) == MONGO_OK) {
found++;
while (mongo_cursor_next(&cursor) == MONGO_OK) {
//bson_print(&cursor.current);
//printf("Found %05d\n", key);
found++;
printf("Duplicate Found %d\n", key);
}
} else {
printf("Not Found %d\n", key);
}
bson_destroy(&query);
mongo_cursor_destroy(&cursor);
}
end = usec();
printf("Found %d documents in %d tries in %8.2f msecs\n",
found, n_queries, (end - start)/1000.0);
if (found == n_queries) return MONGO_OK;
return MONGO_ERROR;
}
//******************************************************
int update_data(mongo *conn, int n_queries, int start_key)
{
/*
bson query, op;
int ii;
int key;
int found = 0;
int status;
unsigned long long start, end;
start = usec();
for (ii=0; ii<n_queries; ii++)
{
if (rand()%100 > local_percent)
key = rand()%(key_max * local_ext / 100);
else
key = rand()%key_max;
bson_init(&query);
bson_append_int(&query, "key", key);
bson_finish(&query);
bson_init(&op);
bson_append_start_object(&op, "$inc");
bson_append_int(&op, "visits", 1);
bson_append_finish_object(&op);
bson_finish(&op);
status = mongo_update(conn, NAMESPACE,
&query, &op, MONGO_UPDATE_BASIC);
if (status == MONGO_OK)
{
found++;
} else {
printf("Update failed: %d\n", conn->err);
}
bson_destroy(&query);
bson_destroy(&op);
}
end = usec();
printf("Updated %d documents in %8.2f msecs\n", found, (end -
start)/1000.0);
*/
return MONGO_ERROR; // not implemented
}
//******************************************************
typedef struct
{
int size;
int n_records;
int n_threads;
int op;
int no_create;
int thread_index;
mongo conn;
pthread_t thread;
} args_t;
//******************************************************
void usage()
{
printf("test1\n"
" [-c:<do create>]\n"
" [-n:<number of records>]\n"
" [-h:<host>]\n"
" [-s:<data size>]\n"
" [-t:<n threads>]\n"
);
exit(1);
}
//******************************************************
void parse_args(int argc, char **argv, args_t *args)
{
int ii;
char arg;
int value;
int index;
char *cvalue;
for (ii=1; ii<argc; ii++)
{
if (argv[ii][0] == '-') {
arg = argv[ii][1];
index = 2;
} else {
arg = argv[ii][0];
index = 1;
}
if (argv[ii][index] == ':') {
cvalue = &argv[ii][index+1];
value = atoi(&argv[ii][index+1]);
} else {
if (argc < ii+1) usage();
ii++;
cvalue = argv[ii];
value = atoi(argv[ii]);
}
switch (arg)
{
case 'c':
args->no_create = value;
break;
case 'h':
strcpy(HOST, cvalue);
break;
case 'n':
args->n_records = value;
break;
case 's':
args->size = value;
break;
case 't':
args->n_threads = value;
break;
default:
usage();
break;
}
}
}
//******************************************************
static char *op_name[] = {"insert", "read", "update"};
void *thread_func(void *argp)
{
args_t *args = (args_t *)argp;
static volatile int n_threads = 0;
static long long start, stop;
int n_running;
unsigned long result;
int start_rec;
int n_records;
// Pause until all threads are started.
// This will generate maximum contention once we start hitting
// the DB.
n_running = __sync_add_and_fetch(&n_threads, 1);
if (n_running == args->n_threads)
{
start = usec();
} else {
while (n_threads != args->n_threads)
{
__asm__ volatile ("pause" :::);
}
}
n_records = args->n_records/args->n_threads;
start_rec = n_records * args->thread_index;
switch (args->op)
{
case OP_INSERT:
result = create_data(&(args->conn), n_records, start_rec,
args->size);
break;
case OP_READ:
result = find_data(&(args->conn), n_records, start_rec);
break;
case OP_UPDATE:
result = update_data(&(args->conn), n_records, start_rec);
break;
}
n_running = __sync_add_and_fetch(&n_threads, -1);
if (n_running == 0)
{
stop = usec();
printf("total time for %s %d %d is %f msecs\n",
op_name[args->op], args->n_threads, args->n_records,
(stop - start)/1000.0);
}
return (void *)result;
}
//******************************************************
static int create_threads(args_t *args, int op)
{
int ii;
int status;
args_t *arg_list;
void *thread_result;
int result = MONGO_OK;
args->op = op;
arg_list = (args_t *)malloc(sizeof(args_t) * args->n_threads);
for (ii=0; ii<args->n_threads; ii++)
{
memcpy(&arg_list[ii], args, sizeof(args_t));
arg_list[ii].thread_index = ii;
status = mongo_connect( &(arg_list[ii].conn), HOST, PORT);
if( status != MONGO_OK ) {
printf("Connection %d failed\n", ii);
exit(1);
}
}
for (ii=0; ii<args->n_threads; ii++)
{
pthread_create(&(arg_list[ii].thread), NULL, thread_func,
&arg_list[ii]);
}
for (ii=0; ii<args->n_threads; ii++)
{
pthread_join(arg_list[ii].thread, &thread_result);
if (thread_result != NULL) result = MONGO_ERROR;
}
free(arg_list);
return result;
}
//******************************************************
int main(int argc, char **argv)
{
bson doc;
mongo conn[1];
bson index;
int status;
args_t args = {64,0,0,0,0,0};
parse_args(argc, argv, &args);
status = mongo_connect( conn, HOST, PORT);
if( status != MONGO_OK ) {
printf("Connection failed\n");
return 1;
}
if (!args.no_create)
{
mongo_cmd_drop_collection( conn, DB, COLLECTION, NULL);
status = create_threads(&args, OP_INSERT);
if (status != MONGO_OK)
{
printf("Insert thread failed\n");
exit(2);
}
bson_init(&index);
bson_append_int(&index, "key", 1);
bson_finish(&index);
status = mongo_create_index(conn, NAMESPACE, &index, 0, NULL);
if (status != MONGO_OK)
{
printf("create index failed %d\n", conn->err);
return 4;
}
bson_destroy(&index);
}
status = create_threads(&args, OP_READ);
if (status != MONGO_OK)
{
printf("Read thread failed\n");
exit(3);
}
//create_threads(&args, OP_UPDATE);
mongo_destroy( conn );
printf("Test PASSED\n");
return 0;
}
On May 9, 12:41 pm, Waitman Gobble <
gobble...@gmail.com> wrote: