Atomic XCHG based Stack, simple for learning...

67 views
Skip to first unread message

Chris M. Thomasson

unread,
Dec 29, 2018, 1:38:50 AM12/29/18
to Scalable Synchronization Algorithms
This experimental algorithm uses only XCHG at the cost of having a consumer wait for the next pointer to be set in a node. However, it allows for real work to be done before any waits are performed. So, the "real" work should provide a "natural backoff" that might minimize the waiting. The algorithm can be improved upon quite a bit. I have some "interesting" ideas for it. Well, here is the code in the form of a Relacy unit test:

Notice the "hand off" comments and 0xDEADBEEF, this can be made to work much better...
______________
// Simple Atomic Stack
// For Academic and Experimental things... 
// Beginner, Moderate?
// In Good Ol' Relacy! Nice as always.
//_______________________________________________


//#define RL_DEBUGBREAK_ON_ASSERT
//#define RL_MSVC_OUTPUT
//#define RL_FORCE_SEQ_CST
//#define RL_GC


#include <relacy/relacy_std.hpp>
#include <iostream>


// Simple macro based redirection of the verbose  std membars.
#define CT_MB_ACQ std::memory_order_acquire
#define CT_MB_REL std::memory_order_release
#define CT_MB_RLX std::memory_order_relaxed
#define CT_MB_ACQ_REL std::memory_order_acq_rel
#define CT_MB_SEQ_CST std::memory_order_seq_cst
#define CT_MB(mp_0) std::atomic_thread_fence(mp_0)


// Some global vars directing the show...
#define THREADS 3
#define ITERS 2


// Experimental Stack
struct ct_ecstack
{
    #define WAITNEXT ((node*)0xDEADBEEF)

    struct node
    {
        std::atomic<node*> m_next;
        VAR_T(unsigned int) m_data;

        node(unsigned int data) : m_next(nullptr), m_data(data) {}

        void process()
        {
            // simulate some work?
            rl::yield(1 + rl::rand(2), $); // ;^)
        }

        node* next_wait()
        {
            node* next = nullptr;

            while ((next = m_next.load(CT_MB_RLX)) == WAITNEXT)
            {
                // Humm, we can actually do other work right here...
                // Hand off...
                rl::yield(1, $);
            }

            return next;
        }
    };


    std::atomic<node*> m_head;

    ct_ecstack() : m_head(nullptr) {}


    void push(node* n)
    {
        n->m_next.store(WAITNEXT, CT_MB_RLX);
        node* head = m_head.exchange(n, CT_MB_REL); // release
        n->m_next.store(head, CT_MB_RLX);
    }


    node* flush_try()
    {
        return m_head.exchange(nullptr, CT_MB_ACQ); // acquire
    }
};




// Relacy Stack Test...
struct ct_ecstack_test
    : rl::test_suite<ct_ecstack_test, THREADS>
{
    ct_ecstack g_ecstack;

    void process()
    {
        ct_ecstack::node* n = g_ecstack.flush_try(); // flush all

        while (n)
        {
            // Process n first, acts like a backoff for the next wait
            // Hand off some other nodes? Future note...
            n->process();

            // Wait for the next pointer, or hand off?
            ct_ecstack::node* next = n->next_wait();

            // Destroy
            delete n;

            // Loop on...
            n = next;
        }
    }

    void thread(unsigned int tidx)
    {
        for (unsigned int i = 0; i < ITERS; ++i)
        {
            g_ecstack.push(new ct_ecstack::node(tidx));
            g_ecstack.push(new ct_ecstack::node(tidx));
            g_ecstack.push(new ct_ecstack::node(tidx));

            process();

            g_ecstack.push(new ct_ecstack::node(tidx));
        }

        process();
    }
};



// Test away... Or fly? Humm...
int main()
{
    {
        rl::test_params p;

        p.iteration_count = 10000000;
        //p.execution_depth_limit = 33333;
        //p.search_type = rl::sched_bound;
        //p.search_type = rl::fair_full_search_scheduler_type;
        //p.search_type = rl::fair_context_bound_scheduler_type;

        rl::simulate<ct_ecstack_test>(p);
    }

    return 0;
}
______________


Any thoughts? This should be fairly basic in nature.

Chris M. Thomasson

unread,
Jan 4, 2019, 9:38:06 PM1/4/19
to Scalable Synchronization Algorithms
On Friday, December 28, 2018 at 10:38:50 PM UTC-8, Chris M. Thomasson wrote:
This experimental algorithm uses only XCHG at the cost of having a consumer wait for the next pointer to be set in a node. However, it allows for real work to be done before any waits are performed. So, the "real" work should provide a "natural backoff" that might minimize the waiting. The algorithm can be improved upon quite a bit. I have some "interesting" ideas for it. Well, here is the code in the form of a Relacy unit test:


Fwiw, I finally found the time to code this experimental implementation up in c++11. Here is the code:

https://pastebin.com/raw/j41cPT9S
(raw text, C code, no ads...)
____________________________________
// Simple XCHG based Atomic Stack
// By: Chris M. Thomasson


#include <iostream>
#include <atomic>
#include <mutex>
#include <thread>
#include <functional>
#include <cassert>


// sorry about the macros...
#define THREADS 42
#define ITERS 100000


#define CT_MB_RLX std::memory_order_relaxed
#define CT_MB_ACQ std::memory_order_acquire
#define CT_MB_REL std::memory_order_release


// HACK! Humm...
#define CT_WAIT ((ct_work*)(0xDEADBEEFU))



// Just to track all the dynamic allocations...
static std::atomic<unsigned long> g_allocs(0);
static std::mutex g_cout_mtx;


// A work node
struct ct_work
{
    std::atomic<ct_work*> m_next;
    std::thread::id m_data;
    ct_work(std::thread::id data) : m_next(nullptr), m_data(data) {}


    void process()
    {
        // Simulate just a tiny little work?
        g_cout_mtx.lock();
            std::this_thread::yield();
            std::this_thread::yield();
            std::this_thread::yield();

                std::thread::id local = std::this_thread::get_id();

                if (m_data == local)
                {
                   // std::cout << "processing local = " << m_data <<
                   //     " from " << std::this_thread::get_id() << "\n";
                }

                else
                {
                    std::cout << "processing foreign = " << m_data <<
                        " from " << std::this_thread::get_id() << "\n";
                }

            std::this_thread::yield();
            std::this_thread::yield();
            std::this_thread::yield();
        g_cout_mtx.unlock();
    }


    ct_work* get_next() const
    {
        ct_work* w = nullptr;

        while ((w = m_next.load(CT_MB_RLX)) == CT_WAIT)
        {
            // we can spin, or even do other work right here...
            std::this_thread::yield();
        }

        return w;
    }
};



// Easy Stack, only uses XCHG
struct ct_estack
{
    std::atomic<ct_work*> m_head;
    ct_estack() : m_head(nullptr) {}


    void push(ct_work* n)
    {
        n->m_next.store(CT_WAIT, CT_MB_RLX);
        ct_work* head = m_head.exchange(n, CT_MB_REL); // release
        n->m_next.store(head, CT_MB_RLX);
    }


    ct_work* flush_try()
    {
        return m_head.exchange(nullptr, CT_MB_ACQ); // acquire
    }
};



// Consume an Easy Stack...
void ct_consume(
    ct_estack& estack
) {
    ct_work* w = estack.flush_try();

    while (w)
    {
        // Process FIRST!
        w->process();

        // Now, we can gain the next pointer.
        ct_work* next = w->get_next();

        // Okay, we can delete the work
        delete w;
        g_allocs.fetch_sub(1, CT_MB_RLX); // dec

        w = next;
    }
}



// Our shared state
struct ct_shared
{
    ct_estack m_estack;
};



// Produce some work...
void ct_produce(
    ct_estack& estack
) {
    ct_work* w = new ct_work(std::this_thread::get_id());
    g_allocs.fetch_add(1, CT_MB_RLX); // inc
    estack.push(w);
}


// Do some work...
void ct_worker(ct_shared& shared)
{
    for (unsigned int i = 0; i < ITERS; ++i)
    {
        ct_produce(shared.m_estack);
        ct_produce(shared.m_estack);
        ct_produce(shared.m_estack);

        std::this_thread::yield(); // little spice...

        ct_consume(shared.m_estack);
    }

    ct_consume(shared.m_estack);
}



int main(void)
{
    {
        ct_shared shared;
        std::thread threads[THREADS];

        for (unsigned long i = 0; i < THREADS; ++i)
        {
            threads[i] = std::thread(ct_worker, std::ref(shared));
        }

        for (unsigned long i = 0; i < THREADS; ++i)
        {
            threads[i].join();
        }
    }

    if (g_allocs.load(CT_MB_RLX) != 0)
    {
        std::cout << "\n\nLEAKED!!!!\n";
    }

    std::cout << "\n\nFIN!\n";

    return 0;
}
____________________________________

Can anybody get this sucker to compile and run to completion? If so, can you show me some of the output? I am getting some data like in GCC:
____________________________________
processing foreign = 33 from 32
processing foreign = 33 from 32
processing foreign = 33 from 32
processing foreign = 41 from 26
processing foreign = 41 from 26
[...]
____________________________________


And some like the following in MSVC, the thread ids are larger there:
____________________________________
processing foreign = 15644 from 9844
processing foreign = 15644 from 9844
processing foreign = 18092 from 19964
processing foreign = 18092 from 19964
____________________________________


Any thoughts? 
Reply all
Reply to author
Forward
0 new messages