#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <pthread.h>
#include <assert.h>
#include <sys/time.h>
#include <hyperdex/client.h>
#include <hyperdex/datastructures.h>
#include <iostream>
#include <sstream>
#include <utility>
#include <thread>
#include <chrono>
#include <functional>
#include <atomic>
#include <future>
#include <mutex>
#include <condition_variable>
#include <openssl/md5.h>
#include <openssl/sha.h>
using namespace std;
string hex_md5(const char *md5str, uint64_t md5str_len)
{
MD5_CTX ctx;
unsigned char sign[MD5_DIGEST_LENGTH] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
MD5_Init(&ctx);
MD5_Update(&ctx, md5str, md5str_len);
MD5_Final(sign, &ctx);
char buf[MD5_DIGEST_LENGTH * 2 + 1];
for (int i = 0; i < MD5_DIGEST_LENGTH; i++) {
sprintf (buf + i * 2, "%02x", sign[i]);
}
buf[MD5_DIGEST_LENGTH * 2] = 0;
return string(buf);
}
string hex_sha1(const char *sha1str, uint64_t sha1str_len)
{
SHA_CTX ctx;
unsigned char sign[SHA_DIGEST_LENGTH];
SHA1((unsigned char *)sha1str, sha1str_len, sign);
char buf[SHA_DIGEST_LENGTH * 2 + 1];
for (int i = 0; i < SHA_DIGEST_LENGTH; i++) {
sprintf (buf + i * 2, "%02x", sign[i]);
}
buf[SHA_DIGEST_LENGTH * 2] = 0;
return string(buf);
}
int64_t current_microsec()
{
struct timeval t;
gettimeofday(&t, NULL);
return (int64_t)(t.tv_sec * 1000000LL + t.tv_usec);
}
class QPSStat
{
public:
int64_t start_time;
atomic<int64_t> total_count;
};
void hput(size_t n, size_t thread_num, QPSStat *s)
{
struct hyperdex_client *client = NULL;
struct hyperdex_client_attribute attr[3];
const struct hyperdex_client_attribute *attrs = NULL;
size_t attrs_sz = 0;
int64_t op_id;
enum hyperdex_client_returncode op_status;
int64_t loop_id;
enum hyperdex_client_returncode loop_status;
size_t i;
const char *host = "127.0.0.1";
uint16_t port = 1982;
const char *ns = "phonebook";
client = hyperdex_client_create(host, port);
if (client == NULL) {
cerr << "client is too much..." << endl;
return;
}
while (true) {
string strKey = "some key" + to_string(n) + to_string(time(NULL));
string strFirst = "John" + to_string(n) + to_string(time(NULL));
strFirst = hex_md5(strFirst.c_str(), strFirst.size());
string strLast = "Smith" + to_string(n) + to_string(time(NULL));
strLast = hex_sha1(strLast.c_str(), strLast.size());
int64_t num = time(NULL) + n;
const char *key = strKey.c_str();
attr[0].attr = "first";
attr[0].value = strFirst.c_str();
attr[0].value_sz = strlen(attr[0].value);
attr[0].datatype = HYPERDATATYPE_STRING;
attr[1].attr = "last";
attr[1].value = strLast.c_str();
attr[1].value_sz = strlen(attr[1].value);
attr[1].datatype = HYPERDATATYPE_STRING;
attr[2].attr = "phone";
char *buf = (char *)malloc(sizeof(int64_t));
hyperdex_ds_pack_int(num, buf);
attr[2].value = buf;
attr[2].value_sz = 8;
attr[2].datatype = HYPERDATATYPE_INT64;
cout << "start put " << key << " ..." << endl;
struct hyperdex_client_attribute *p = &attr[0];
op_id = hyperdex_client_put(client, ns, key, strlen(key), &attr[0], 3, &op_status);
free(buf);
if (op_id < 0) {
cout << hyperdex_client_error_message(client) << endl;
return;
}
loop_id = hyperdex_client_loop(client, -1, &loop_status);
if (loop_id < 0) {
cout << hyperdex_client_error_message(client) << endl;
return;
}
assert(op_id == loop_id);
cout << "start get " << key << " ..." << endl;
op_id = hyperdex_client_get(client, ns, key, strlen(key), &op_status, &attrs, &attrs_sz);
if (op_id < 0) {
cout << hyperdex_client_error_message(client) << endl;
return;
}
loop_id = hyperdex_client_loop(client, -1, &loop_status);
if (loop_id < 0) {
cout << hyperdex_client_error_message(client) << endl;
return;
}
s->total_count.fetch_add(2);
printf("cost: %ld\n", current_microsec());
int64_t total_cost = current_microsec() - s->start_time;
printf("total_cost: %ld and total_count: %ld\n", total_cost, s->total_count.load());
double qps = (s->total_count.load() * 1000000L)/total_cost;
printf("qps: %.3f\n", qps);
assert(op_id == loop_id);
printf("get done\n");
for (i = 0; i < attrs_sz; ++i) {
switch (attrs[i].datatype) {
case HYPERDATATYPE_STRING:
printf("got attribute \"%s\" = \"%.*s\"\n", attrs[i].attr, attrs[i].value_sz, attrs[i].value);
break;
case HYPERDATATYPE_INT64: {
int64_t num = 0;
hyperdex_ds_unpack_int(attrs[i].value, attrs[i].value_sz, &num);
printf("got attribute \"%s\" = \"%d\"\n", attrs[i].attr, num);
}
break;
default:
printf("got error attribute\n");
}
}
if (thread_num == 1) {
break;
}
}
hyperdex_client_destroy_attrs(attrs, attrs_sz);
hyperdex_client_destroy(client);
}
int main(int c, char *v[])
{
size_t thread_num;
thread_num = v[1] ? atoi(v[1]) : 1;
thread thr[thread_num];
cout << "Spawning " << thread_num << " threads..." << endl;
QPSStat s;
s.start_time = current_microsec();
s.total_count = 0;
printf("start time: %ld\n", s.start_time);
for (size_t i = 0; i < thread_num; i ++) {
thr[i] = thread(hput, i, thread_num, &s);
}
cout << "Done spawning threads. Now waiting for them to join:" << endl;
for (size_t i = 0; i < thread_num; i ++) {
thr[i].join();
}
cout << "All threads joined!" << endl;
return 0;
}