Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

crude mpmc bounded queue test in IA-32/C...

23 views
Skip to first unread message

Chris M. Thomasson

unread,
Dec 20, 2011, 10:46:34 PM12/20/11
to
This test currently works in MSVC++, however I am going to port it to GCC
IA-32 pretty soon and test it on Linux/OpenSolaris. Here is the code:

http://pastebin.com/fQBNtmnc


Here are the global settings for the test:
__________________________________________________________
/* Global Settings
__________________________________________________________*/
/* #define CMT_QUEUE_ALGO 1 */
#define DSL_QUEUE_ALGO 1


#define N 4096
#define ITERS 100000000
#define PUSH_POPS 4
#define THREADS 4
typedef long queue_state_t;


typedef char global_settings_static_assert[
N >= (PUSH_POPS * THREADS) ? 1 : -1
];
__________________________________________________________




Here is the thread function which peforms all of the queue work:
__________________________________________________________
void*
test_thread(
void* state
){
unsigned i;
queue_state_t qstate;
unsigned const tid = (unsigned const)state;

pthread_barrier_wait(&g_tbarrier);

for (i = 0; i < ITERS; ++i)
{
unsigned c;

for (c = 0; c < PUSH_POPS; ++c)
{
test_queue_push(g_tqueue, (queue_state_t)tid);

/*printf("test_thread(%u)::pushed(%u)\n", tid, tid);*/
}

for (c = 0; c < PUSH_POPS; ++c)
{
qstate = test_queue_pop(g_tqueue);

/*printf("test_thread(%u)::popped(%u)\n", tid,
(unsigned)qstate);*/
}

}

return NULL;
}
__________________________________________________________



WRT the current configuration, 4 threads perform 4 pushes and pops a
100,000,000 times each. I simply measure the time from when I start threads
to the time I join them. The test gives time in elapsed milliseconds.
Therefore, the smaller is better. N is set at a modest 4096. Everything is
properly padded and aligned; false sharing is reduced to bare minimum. The
timing results I get from the Dual Spinlock Queue are:

397,875ms run 1
396,607ms run 2
396, 301ms run 3


My algorithm is getting following times:

187,968ms run 1
185,828ms run 2
186,150ms run 3




My platform is an old: P4 3.06ghz 512 ram


Alls I can say is that my algorithm is outperforming the dual spinlock queue
for me.



Can anyone else run the test and report some numbers?


Chris M. Thomasson

unread,
Dec 20, 2011, 10:57:29 PM12/20/11
to
"Chris M. Thomasson" <cri...@charter.net> wrote in message
news:0EcIq.30189$8O1....@newsfe07.iad...
> This test currently works in MSVC++, however I am going to port it to GCC
> IA-32 pretty soon and test it on Linux/OpenSolaris. Here is the code:
>
> http://pastebin.com/fQBNtmnc

BTW, the only reason I am including windows headers is for the `Sleep(0);'
call. This can be completely eliminated by using `pthread_yield()' instead.
Here is version without windows header shi%:

http://pastebin.com/ZVX7dL4g


Chris M. Thomasson

unread,
Dec 21, 2011, 4:08:09 AM12/21/11
to
"Chris M. Thomasson" <cri...@charter.net> wrote in message
news:0EcIq.30189$8O1....@newsfe07.iad...
> This test currently works in MSVC++, however I am going to port it to GCC
> IA-32 pretty soon and test it on Linux/OpenSolaris. Here is the code:
>
> http://pastebin.com/fQBNtmnc

I noticed something wrong with the padding of the queue cell structure for
the dual spinlock algorithm. It did not effect correctness of the algorithm,
however it was not properly padding the structure up to a l2 cache line.
Here was problem code fragment:
_____________________________________________
struct dsl_queue_cell
{
queue_state_t state;
L2_PAD_EX(l2pad2, atomic_uint32_t, 2);
};
_____________________________________________


`dsl_queue_cell::l2pad2' is padded for 2 `atomic_uint32_t's. This is wrong
because there is only one type to account for. The code should be changed
to:
_____________________________________________
struct dsl_queue_cell
{
queue_state_t state;
L2_PAD(l2pad1, queue_state_t);
};
_____________________________________________


Here is update:

http://pastebin.com/kvq8fsyF




It does shave off a couple of milliseconds.The timing results I get from the
Dual Spinlock Queue are:

387,875ms run 1
389,607ms run 2
387, 301ms run 3




Sorry about that stupid mistake! ;^(...







here is full code:



/*
Bounded Multi-Producer/Multi-Consumer FIFO Queue
Copyright (C) 2011 Christopher Michael Thomasson

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.


You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
____________________________________________________________________*/




#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <assert.h>
#include <pthread.h>
#include <sched.h>




/* Global Settings
__________________________________________________________*/
#if defined (_MSC_VER)
# pragma warning (disable:4116)
#endif


/* #define CMT_QUEUE_ALGO 1 */
#define DSL_QUEUE_ALGO 1


#define N 4096
#define ITERS 100000000
#define PUSH_POPS 4
#define THREADS 4
typedef long queue_state_t;


typedef char global_settings_static_assert[
N >= (PUSH_POPS * THREADS) ? 1 : -1
];




/* Simple Stop Watch
__________________________________________________________*/
struct c_clock_stop_watch
{
clock_t start;
clock_t stop;
};


void
c_clock_stop_watch_start(
struct c_clock_stop_watch* const self
){
self->start = clock();
}


double
c_clock_stop_watch_stop(
struct c_clock_stop_watch* const self
){
self->stop = clock();
return (((double)self->stop - (double)self->start) / CLOCKS_PER_SEC) *
1000.0;
}





/* Backoff Operations
__________________________________________________________*/
void
thread_backoff(
const char* msg
){
sched_yield();
}




/* Alignment Operations
__________________________________________________________*/
#if ! defined (ALIGN_UINTPTR)
# define ALIGN_UINTPTR size_t
#endif


typedef ALIGN_UINTPTR align_uintptr;


typedef char ALIGN_SASSERT
[
sizeof(align_uintptr) == sizeof(void*) &&
sizeof(align_uintptr) == sizeof(char (*) (double))
? 1 : -1
];


#define ALIGN_UP(mp_ptr, mp_align) \
((void*)( \
(((align_uintptr)(mp_ptr)) + ((mp_align) - 1)) \
& ~(((mp_align) - 1)) \
))




/* IA-32 Atomic Operations
__________________________________________________________*/
typedef unsigned __int32 atomic_uint32_t;


#define ATOMIC_API(mp_ret) \
__declspec(naked) \
mp_ret \
__cdecl


ATOMIC_API(atomic_uint32_t)
ATOMIC_XADD(
atomic_uint32_t volatile* dest,
atomic_uint32_t value
){
_asm
{
MOV ECX, [ESP + 4]
MOV EAX, [ESP + 8]
LOCK XADD [ECX], EAX
RET
}
}


ATOMIC_API(atomic_uint32_t)
ATOMIC_XCHG(
atomic_uint32_t volatile* dest,
atomic_uint32_t value
){
_asm
{
MOV ECX, [ESP + 4]
MOV EAX, [ESP + 8]
XCHG [ECX], EAX
RET
}
}


ATOMIC_API(atomic_uint32_t)
ATOMIC_LOAD(
atomic_uint32_t volatile* dest
){
_asm
{
MOV ECX, [ESP + 4]
MOV EAX, [ECX]
RET
}
}


ATOMIC_API(void)
ATOMIC_STORE(
atomic_uint32_t volatile* dest,
atomic_uint32_t value
){
_asm
{
MOV ECX, [ESP + 4]
MOV EAX, [ESP + 8]
MOV [ECX], EAX
RET
}
}




/* IA-32 Cache Line
__________________________________________________________*/
#define L2_CACHE 128


#define L2_PAD_EX(mp_name, mp_type, mp_count) \
unsigned char mp_name[ \
L2_CACHE - (sizeof(mp_type) * (mp_count)) \
]


#define L2_PAD(mp_name, mp_type) \
L2_PAD_EX(mp_name, mp_type, 1)




/* MPMC Bounded Queue By Chris M. Thomasson
__________________________________________________________*/
struct cmt_queue_cell
{
atomic_uint32_t ver;
queue_state_t state;
L2_PAD(l2pad1, struct { atomic_uint32_t a; queue_state_t b; } );
};


struct cmt_queue
{
atomic_uint32_t head;
L2_PAD(l2pad1, atomic_uint32_t);
atomic_uint32_t tail;
L2_PAD(l2pad2, atomic_uint32_t);
struct cmt_queue_cell cells[N];
};


void
cmt_queue_init(
struct cmt_queue* const self
){
atomic_uint32_t i;

self->head = 0;
self->tail = 0;

for (i = 0; i < N; ++i)
{
self->cells[i].ver = i;
}
}


void
cmt_queue_push(
struct cmt_queue* const self,
queue_state_t qstate
){
atomic_uint32_t ver = ATOMIC_XADD(&self->head, 1);
struct cmt_queue_cell* cell = self->cells + (ver & (N - 1U));

while (ATOMIC_LOAD(&cell->ver) != ver)
{
thread_backoff("cmt_queue_push");
}

cell->state = qstate;

ATOMIC_STORE(&cell->ver, ver + 1);
}


queue_state_t
cmt_queue_pop(
struct cmt_queue* const self
){
atomic_uint32_t ver = ATOMIC_XADD(&self->tail, 1);
struct cmt_queue_cell* cell = self->cells + (ver & (N - 1U));
queue_state_t qstate;

while (ATOMIC_LOAD(&cell->ver) != ver + 1)
{
thread_backoff("cmt_queue_pop");
}

qstate = cell->state;

ATOMIC_STORE(&cell->ver, ver + N);

return qstate;
}




/* MPMC Bounded Queue /w Dual Spinlock's
__________________________________________________________*/
struct dsl_queue_cell
{
queue_state_t state;
L2_PAD(l2pad1, queue_state_t);
};


struct dsl_queue
{
atomic_uint32_t head_lock;
L2_PAD(l2pad1, atomic_uint32_t);
atomic_uint32_t head;
L2_PAD(l2pad2, atomic_uint32_t);
atomic_uint32_t tail_lock;
L2_PAD(l2pad3, atomic_uint32_t);
atomic_uint32_t tail;
L2_PAD(l2pad4, atomic_uint32_t);
struct dsl_queue_cell cells[N];
};


void
dsl_queue_init(
struct dsl_queue* const self
){
self->head_lock = 0;
self->head = 0;
self->tail_lock = 0;
self->tail = 0;
}


int
dsl_queue_try_push(
struct dsl_queue* const self,
queue_state_t qstate
){
atomic_uint32_t head;
atomic_uint32_t tail;

/* acquire head lock */
while (ATOMIC_XCHG(&self->head_lock, 1))
{
thread_backoff("dsl_queue_try_push");
}

head = ATOMIC_LOAD(&self->head);
tail = ATOMIC_LOAD(&self->tail);

if (tail == ((head + 1) & (N - 1)))
{
/* release head lock */
ATOMIC_STORE(&self->head_lock, 0);

return 0;
}

self->cells[head].state = qstate;
ATOMIC_STORE(&self->head, (head + 1UL) & (N - 1UL));

/* release head lock */
ATOMIC_STORE(&self->head_lock, 0);

return 1;
}


int
dsl_queue_try_pop(
struct dsl_queue* const self,
queue_state_t* qstate
){
atomic_uint32_t head;
atomic_uint32_t tail;

/* acquire tail lock */
while (ATOMIC_XCHG(&self->tail_lock, 1))
{
thread_backoff("dsl_queue_try_pop");
}

head = ATOMIC_LOAD(&self->head);
tail = ATOMIC_LOAD(&self->tail);

if (head == tail)
{
/* release tail lock */
ATOMIC_STORE(&self->tail_lock, 0);

return 0;
}

*qstate = self->cells[head].state;
ATOMIC_STORE(&self->tail, (tail + 1UL) & (N - 1UL));

/* release tail lock */
ATOMIC_STORE(&self->tail_lock, 0);

return 1;
}


void
dsl_queue_push(
struct dsl_queue* const self,
queue_state_t qstate
){
while (! dsl_queue_try_push(self, qstate))
{
thread_backoff("dsl_queue_push");
}
}


queue_state_t
dsl_queue_pop(
struct dsl_queue* const self
){
queue_state_t qstate;

while (! dsl_queue_try_pop(self, &qstate))
{
thread_backoff("dsl_queue_pop");
}

return qstate;
}








/* MPMC Bounded Queue Test Abstraction
__________________________________________________________*/
#if defined (CMT_QUEUE_ALGO)
/* Chris M. Thomasson Algorithm */
# define QUEUE_ALGO_ID "Chris M. Thomasson MPMC Queue Aglorithm"
typedef struct cmt_queue test_queue_t;
# define test_queue_init cmt_queue_init
# define test_queue_push cmt_queue_push
# define test_queue_pop cmt_queue_pop
#elif defined (DSL_QUEUE_ALGO)
/* Dual Spinlock Algorithm */
# define QUEUE_ALGO_ID "Dual Spinlock MPMC Queue Aglorithm"
typedef struct dsl_queue test_queue_t;
# define test_queue_init dsl_queue_init
# define test_queue_push dsl_queue_push
# define test_queue_pop dsl_queue_pop
#else
# error No Test Macro Defined!
#endif




/* Stop Watch Abstraction
__________________________________________________________*/

/* C `clock()' Stop Watch */
typedef struct c_clock_stop_watch stop_watch_t;
#define stop_watch_start c_clock_stop_watch_start
#define stop_watch_stop c_clock_stop_watch_stop




/* Test Program
__________________________________________________________*/
static test_queue_t* g_tqueue;
static pthread_barrier_t g_tbarrier;


void*
test_thread(
void* state
){
unsigned i;
queue_state_t qstate;
unsigned const tid = (unsigned const)state;

pthread_barrier_wait(&g_tbarrier);

for (i = 0; i < ITERS; ++i)
{
unsigned c;

for (c = 0; c < PUSH_POPS; ++c)
{
test_queue_push(g_tqueue, (queue_state_t)tid);

/*printf("test_thread(%u)::pushed(%u)\n", tid, tid);*/
}

for (c = 0; c < PUSH_POPS; ++c)
{
qstate = test_queue_pop(g_tqueue);

/*printf("test_thread(%u)::popped(%u)\n", tid,
(unsigned)qstate);*/
}

}

return NULL;
}




/* Main Entry */
int
main(void)
{
pthread_t tid[THREADS];
unsigned i = 0;
stop_watch_t stop_watch;
double elapsed_time;

void* raw_buf = malloc(sizeof(test_queue_t) + L2_CACHE);
puts("Testing " QUEUE_ALGO_ID "...");

g_tqueue = ALIGN_UP(raw_buf, L2_CACHE);
test_queue_init(g_tqueue);
pthread_barrier_init(&g_tbarrier, NULL, THREADS);

stop_watch_start(&stop_watch);

for (i = 0; i < THREADS; ++i)
{
pthread_create(tid + i, NULL, test_thread, (void*)i);
}

for (i = 0; i < THREADS; ++i)
{
pthread_join(tid[i], NULL);
}

elapsed_time = stop_watch_stop(&stop_watch);
printf("\nelapsed time: %lf\n", elapsed_time);
pthread_barrier_destroy(&g_tbarrier);

puts("\n\n\n___________________________________________\n"
"The program has completed; hit <ENTER> to exit...");

getchar();

free(raw_buf);

return 0;
}


0 new messages