the variable chan lock question about busybee

22 views
Skip to first unread message

李俊良

unread,
May 16, 2016, 11:32:48 PM5/16/16
to hyperdex-discuss
see code:

@@ -608,6 +609,7 @@ busybee_st :: set_external_fd(int fd)
 
     if (add_event(fd, EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP) < 0 && errno != EEXIST)
     {
+        chan->unlock();
         return BUSYBEE_POLLFAILED;
     }

Robert Escriva

unread,
May 17, 2016, 11:15:16 PM5/17/16
to hyperdex...@googlegroups.com
Technically, this would make no difference as "lock" and "unlock" are a
NOP for busybee_st (the Single Threaded "client" variant). The
set_external_fd function is only implemented for st.

I've added the unlock function in case anyone ever decides to expand
that function to the mta (Multi-Threaded Accept) variant.

-Robert
Message has been deleted

李俊良

unread,
May 24, 2016, 4:15:48 AM5/24/16
to hyperdex-discuss
busybee_st :: set_external_fd(int fd)
{
    channel
* chan = &m_channels[fd];
    chan
->lock();

set_external_fd , if fd < 0, m_channels occur errors. fixed like this:
busybee_st :: set_external_fd(int fd)
{
    if (fd < 0 && fd >= m_channels_sz) {
        // std::runtime_error("could not set external fd: " + po6::strerror(errno));
        return BUSYBEE_POLLFAILED;
    }    
    channel* chan = &m_channels[fd];
    chan->lock();


在 2016年5月18日星期三 UTC+8上午11:15:16,Robert Escriva写道:

Robert Escriva

unread,
May 24, 2016, 4:17:00 AM5/24/16
to hyperdex...@googlegroups.com, 李俊良
No, it cannot occur. The set external fd function is not available in the mta interface.

The line from the daemon you linked is irrelevant. It's just reseting an auto pointer.

On May 23, 2016 10:37:17 PM EDT, "李俊良" <cna...@gmail.com> wrote:
>like the file daemon/communication.cc:
> m_busybee.reset(new busybee_mta(&m_daemon->m_gc, &m_busybee_mapper,
>bind_to, m_daemon->m_us.get(), threads));
>
>so hyperdex daemon use busybee_mta for communication, and uncertainty
>occur.
>
>在 2016年5月18日星期三 UTC+8上午11:15:16,Robert Escriva写道:
>>
--
Sent from my phone. Please excuse my brevity.

李俊良

unread,
May 24, 2016, 5:10:28 AM5/24/16
to hyperdex-discuss, cna...@gmail.com
But it happened, there is a client mutil thread to request(here's an example of phonebook cited)
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <pthread.h>
#include <assert.h>
#include <sys/time.h>
#include <hyperdex/client.h>
#include <hyperdex/datastructures.h>
#include <iostream>
#include <sstream>
#include <utility>
#include <thread>
#include <chrono>
#include <functional>
#include <atomic>
#include <future>
#include <mutex>
#include <condition_variable>
#include <openssl/md5.h>
#include <openssl/sha.h>

using namespace std;

string hex_md5(const char *md5str, uint64_t md5str_len)
{
    MD5_CTX ctx
;
   
unsigned char sign[MD5_DIGEST_LENGTH] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
    MD5_Init
(&ctx);
    MD5_Update
(&ctx, md5str, md5str_len);
    MD5_Final
(sign, &ctx);
   
char buf[MD5_DIGEST_LENGTH * 2 + 1];

   
for (int i = 0; i < MD5_DIGEST_LENGTH; i++) {
        sprintf
(buf + i * 2, "%02x", sign[i]);
   
}

    buf
[MD5_DIGEST_LENGTH * 2] = 0;
   
return string(buf);
}

string hex_sha1(const char *sha1str, uint64_t sha1str_len)
{
    SHA_CTX ctx
;
   
unsigned char sign[SHA_DIGEST_LENGTH];
    SHA1
((unsigned char *)sha1str, sha1str_len, sign);
   
char buf[SHA_DIGEST_LENGTH * 2 + 1];

   
for  (int i = 0; i < SHA_DIGEST_LENGTH; i++) {
        sprintf
(buf + i * 2, "%02x", sign[i]);
   
}

    buf
[SHA_DIGEST_LENGTH * 2] = 0;
   
return string(buf);
}

int64_t current_microsec
()
{
   
struct timeval t;
    gettimeofday
(&t, NULL);
   
return (int64_t)(t.tv_sec * 1000000LL + t.tv_usec);
}

class QPSStat
{
public:
    int64_t start_time
;
    atomic
<int64_t> total_count;
};

void hput(size_t n, size_t thread_num, QPSStat *s)
{
   
struct hyperdex_client *client = NULL;
   
struct hyperdex_client_attribute attr[3];
   
const struct hyperdex_client_attribute *attrs = NULL;
    size_t attrs_sz
= 0;
    int64_t op_id
;
   
enum hyperdex_client_returncode op_status;
    int64_t loop_id
;
   
enum hyperdex_client_returncode loop_status;
    size_t i
;
   
const char *host = "127.0.0.1";
    uint16_t port
= 1982;
   
const char *ns = "phonebook";
    client
= hyperdex_client_create(host, port);
   
if (client == NULL) {
        cerr
<< "client is too much..." << endl;
       
return;
   
}
   
while (true) {
       
string strKey = "some key" + to_string(n) + to_string(time(NULL));
       
string strFirst = "John" + to_string(n) + to_string(time(NULL));
        strFirst
= hex_md5(strFirst.c_str(), strFirst.size());
       
string strLast = "Smith" + to_string(n) + to_string(time(NULL));
        strLast
= hex_sha1(strLast.c_str(), strLast.size());
        int64_t num
= time(NULL) + n;
       
const char *key = strKey.c_str();
        attr
[0].attr = "first";
        attr
[0].value = strFirst.c_str();
        attr
[0].value_sz = strlen(attr[0].value);
        attr
[0].datatype = HYPERDATATYPE_STRING;
        attr
[1].attr = "last";
        attr
[1].value = strLast.c_str();
        attr
[1].value_sz = strlen(attr[1].value);
        attr
[1].datatype = HYPERDATATYPE_STRING;
        attr
[2].attr = "phone";
       
char *buf = (char *)malloc(sizeof(int64_t));
        hyperdex_ds_pack_int
(num, buf);
        attr
[2].value = buf;
        attr
[2].value_sz = 8;
        attr
[2].datatype = HYPERDATATYPE_INT64;
        cout
<< "start put " << key << " ..." << endl;
       
struct hyperdex_client_attribute *p = &attr[0];
       
        op_id
= hyperdex_client_put(client, ns, key, strlen(key), &attr[0], 3, &op_status);
        free
(buf);

       
if (op_id < 0) {
            cout
<< hyperdex_client_error_message(client) << endl;
           
return;
       
}

        loop_id
= hyperdex_client_loop(client, -1, &loop_status);

       
if (loop_id < 0) {
            cout
<< hyperdex_client_error_message(client) << endl;
           
return;
       
}

       
assert(op_id == loop_id);
        cout
<< "start get " << key << " ..." << endl;
        op_id
= hyperdex_client_get(client, ns, key, strlen(key), &op_status, &attrs, &attrs_sz);

       
if (op_id < 0) {
            cout
<< hyperdex_client_error_message(client) << endl;
           
return;
       
}

        loop_id
= hyperdex_client_loop(client, -1, &loop_status);

       
if (loop_id < 0) {
            cout
<< hyperdex_client_error_message(client) << endl;
           
return;
       
}
        s
->total_count.fetch_add(2);
        printf
("cost: %ld\n", current_microsec());
        int64_t total_cost
= current_microsec() - s->start_time;
        printf
("total_cost: %ld and total_count: %ld\n", total_cost, s->total_count.load());
       
double qps = (s->total_count.load() * 1000000L)/total_cost;
        printf
("qps: %.3f\n", qps);

       
assert(op_id == loop_id);
        printf
("get done\n");

       
for (i = 0; i < attrs_sz; ++i) {
           
switch (attrs[i].datatype) {
           
case HYPERDATATYPE_STRING:
                printf
("got attribute \"%s\" = \"%.*s\"\n", attrs[i].attr, attrs[i].value_sz, attrs[i].value);
               
break;

           
case HYPERDATATYPE_INT64: {
                int64_t num
= 0;
                hyperdex_ds_unpack_int
(attrs[i].value, attrs[i].value_sz, &num);
                printf
("got attribute \"%s\" = \"%d\"\n", attrs[i].attr, num);
           
}
           
break;

           
default:
                printf
("got error attribute\n");
           
}
       
}


       
if (thread_num == 1) {
           
break;
       
}
   
}

    hyperdex_client_destroy_attrs
(attrs, attrs_sz);
    hyperdex_client_destroy
(client);
}

int main(int c, char *v[])
{
    size_t thread_num
;
    thread_num
= v[1] ? atoi(v[1]) : 1;
    thread thr
[thread_num];
    cout
<< "Spawning " << thread_num << " threads..." << endl;
   
QPSStat s;

    s
.start_time = current_microsec();
    s
.total_count = 0;
    printf
("start time: %ld\n", s.start_time);
   
for (size_t i = 0; i < thread_num; i ++) {
        thr
[i] = thread(hput, i, thread_num, &s);
   
}

    cout
<< "Done spawning threads. Now waiting for them to join:" << endl;

   
for (size_t i = 0; i < thread_num; i ++) {
        thr
[i].join();
   
}

    cout
<< "All threads joined!" << endl;
   
return 0;
}


When start like this :
make LDFLAGS="-g -std=c++11 -lhyperdex-client -lcrypto -lssl -pthread " multi_put
and run:
./multi_put 2000 > /dev/null
Soon fd < 0 when crash in set_external_fd
assert(chan->state == channel::NOTCONNECTED);







在 2016年5月24日星期二 UTC+8下午4:17:00,Robert Escriva写道:

李俊良

unread,
May 24, 2016, 5:17:13 AM5/24/16
to hyperdex-discuss, cna...@gmail.com
When assert failed, we could debug the fd, and found the value is -1, and throught po6::strerror(errno) , it print  "Too many open files"

so I think that should fix the problem


在 2016年5月24日星期二 UTC+8下午4:17:00,Robert Escriva写道:
No, it cannot occur. The set external fd function is not available in the mta interface.

李俊良

unread,
May 24, 2016, 6:04:18 AM5/24/16
to hyperdex-discuss, cna...@gmail.com
Recommendation busybee_st and busybee_mta separation :)


在 2016年5月24日星期二 UTC+8下午4:17:00,Robert Escriva写道:
No, it cannot occur. The set external fd function is not available in the mta interface.

Robert Escriva

unread,
May 24, 2016, 11:46:50 AM5/24/16
to hyperdex...@googlegroups.com, cna...@gmail.com
This is a different bug and one I will look into. The "unlock" has
absolutely no effect in this case. Multiple single threaded instances
is not the same as a single multi-threaded instance.

-Robert
> --
> You received this message because you are subscribed to the Google Groups
> "hyperdex-discuss" group.
> To unsubscribe from this group and stop receiving emails from it, send an email
> to hyperdex-discu...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages