Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Main thread distributes workload to worker threads, then they all rendezvous later

96 views
Skip to first unread message

Frederick Virchanza Gotham

unread,
Jul 22, 2022, 10:28:42 AM7/22/22
to

I have one main thread and eight worker threads.

The main thread is reading in data from a file, and at every 8 kilobytes, it distributes the workload evenly to the eight worker threads, so each worker thread processes 1 kB at a time.

By the time all the worker threads have finished processing their kilobyte, the main thread has prepared another 8 kB workload to distribute. It is very important that the previous workload has been fully processed before the next workload is dished out.

So it's a constant cycle of the main thread distributing the workload, then all of the threads rendezvousing later for the main thread to distribute the next workload.

I don't know if I missed something in Boost, or even in the C++ Standard Library, for an easy way to do this, but anyway I set out today to write this, and the following is what I have.

I would appreciate any tips anyone can give me. If you have a better way of doing this then tell me.

The class I've written is called " Distribute_Workload_And_Rendezvous_Later", and it has four public methods:
(1) Constructor (the argument is how many worker threads)
(2) Distribute_Workload (called from main thread)
(3) Rendezvous (called from main thread)
(4) Worker_Thread_Wait_For_Work (called from a worker thread)
(5) Worker_Thread_Report_Work_Finished (called from a worker thread)

Here's my code:

#include <cassert> // assert
#include <mutex> // mutex, unique_lock
#include <condition_variable> // condition_variable
#include <atomic> // atomic<unsigned>
#include <semaphore> // counting_semaphore

#ifndef NDEBUG
# include <thread> // this_thread::get_id()
#endif

class Distribute_Workload_And_Rendezvous_Later final {
private:

# ifndef NDEBUG
std::thread::id const id_main_thread;
# endif

unsigned const how_many_worker_threads;

// A bitmask is used to keep track of which threads have started
// and which have finished. For example, the following two bitmasks
// indicate that the 1st, 3rd and 4th thread have started, but only
// the 3rd has finished:
// bitmask_started = 1101
// bitmask_finished = 0101
std::atomic<unsigned> bitmask_started{0u}, bitmask_finished{0u};

inline bool Are_All_Workers_Finished(void) const noexcept
{
// For example, if we have 5 worker threads, then:
// Step 1: Shift 1 by 5 : 1u << 5u == 0b100000
// Step 2: Subtract 1 : 0b100000 - 1u == 0b11111
//
// We know all threads are finished when all bits are set
assert( bitmask_finished < (1u << how_many_worker_threads) );
return ((1u << how_many_worker_threads) - 1u) == bitmask_finished;
}

std::condition_variable cv_for_main_thread_waiting;
std::mutex mutex_for_cv;
std::counting_semaphore<16u> sem{0u};

public:

Distribute_Workload_And_Rendezvous_Later(unsigned const arg) : how_many_worker_threads(arg)
# ifndef NDEBUG
, id_main_thread(std::this_thread::get_id())
# endif
{
assert(how_many_worker_threads <= 16u); // because of the template
// parameter given to
// std::counting_semaphore
}

void Distribute_Workload(void)
{
// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

// The two 'assert' statements on the next lines make sure of
// two things:
// (1) The threads that have started == The threads that have finished
// (2) Either no threads have finished, or all threads have finished
assert( bitmask_started == bitmask_finished );
assert( (bitmask_started == 0u) || (bitmask_started == ((1u << how_many_worker_threads) - 1u)) );

bitmask_started = 0u;
bitmask_finished = 0u;
sem.release(how_many_worker_threads);
}

void Rendezvous(void)
{
// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

std::unique_lock<std::mutex> lock(mutex_for_cv);

while ( false == Are_All_Workers_Finished() )
{
cv_for_main_thread_waiting.wait(lock);
}
}

void Worker_Thread_Wait_For_Work(unsigned const thread_id)
{
// The 'assert' on the next line makes sure that this method is
// NOT invoked from the main thread.
assert( std::this_thread::get_id() != id_main_thread );

assert( thread_id < how_many_worker_threads );

// The following 'while' loop accommodates the circumstances in
// which one thread would acquire the semaphore two times for the
// same workload. This won't happen if each thread takes
// milliseconds to execute, and if the thread scheduler takes
// only microseconds to start another thread going again.
while ( 0u != (bitmask_started & (1u << thread_id) ) )
{
std::this_thread::sleep_for(std::chrono::milliseconds(1u));
}

sem.acquire();

bitmask_started |= (1u << thread_id);
assert( 0u == (bitmask_finished & (1u << thread_id)) );
}

void Worker_Thread_Report_Work_Finished(unsigned const thread_id)
{
// The 'assert' on the next line makes sure that this method is
// NOT invoked from the main thread.
assert( std::this_thread::get_id() != id_main_thread );

assert( thread_id < how_many_worker_threads );
assert( 0u != (bitmask_started & (1u << thread_id) ) );
assert( 0u == (bitmask_finished & (1u << thread_id) ) );

bitmask_finished |= (1u << thread_id);

if ( Are_All_Workers_Finished() )
{
// All workers are now finished, so notify main thread
cv_for_main_thread_waiting.notify_one();
}
}

// Delete the 3 constructors: no parameters, copy-construct, move-construct
Distribute_Workload_And_Rendezvous_Later(void) = delete;
Distribute_Workload_And_Rendezvous_Later(Distribute_Workload_And_Rendezvous_Later const & ) = delete;
Distribute_Workload_And_Rendezvous_Later(Distribute_Workload_And_Rendezvous_Later &&) = delete;

// Delete the 2 assignment operators
Distribute_Workload_And_Rendezvous_Later &operator=(Distribute_Workload_And_Rendezvous_Later const & ) = delete;
Distribute_Workload_And_Rendezvous_Later &operator=(Distribute_Workload_And_Rendezvous_Later &&) = delete;
};

/* ======================= TEST CODE IS BELOW THIS LINE ============= */

#include <iostream> // cout, endl
using std::cout;
using std::endl;

Distribute_Workload_And_Rendezvous_Later sar(3u); // Three worker threads

void Thread_Entry_Point_A(void)
{
for (; /* ever */ ;)
{
sar.Worker_Thread_Wait_For_Work(0u);

cout << "A\n";

sar.Worker_Thread_Report_Work_Finished(0u);
}
}

void Thread_Entry_Point_B(void)
{
for (; /* ever */ ;)
{
sar.Worker_Thread_Wait_For_Work(1u);

cout << "B\n";

sar.Worker_Thread_Report_Work_Finished(1u);
}
}

void Thread_Entry_Point_C(void)
{
for (; /* ever */ ;)
{
sar.Worker_Thread_Wait_For_Work(2u);

cout << "C\n";

sar.Worker_Thread_Report_Work_Finished(2u);
}
}

int main(void)
{
std::thread tA(Thread_Entry_Point_A), tB(Thread_Entry_Point_B), tC(Thread_Entry_Point_C);

for (; /* ever */ ;)
{
/* ====== We start off with just one thread working ========= */
cout << "========================================= START\n";

/* ====== Next we have 4 threads working in parallel ====== */
sar.Distribute_Workload();

cout << "D\n";

/* ====== Next we go back to just one thread working ======== */
sar.Rendezvous();

cout << "========================================= FINISH\n";

std::this_thread::sleep_for( std::chrono::milliseconds(500u) );
}
}

Frederick Virchanza Gotham

unread,
Jul 22, 2022, 10:40:14 AM7/22/22
to
On Friday, July 22, 2022 at 3:28:42 PM UTC+1, Frederick Virchanza Gotham wrote:

> // A bitmask is used to keep track of which threads have started
> // and which have finished. For example, the following two bitmasks
> // indicate that the 1st, 3rd and 4th thread have started, but only
> // the 3rd has finished:
> // bitmask_started = 1101
> // bitmask_finished = 0101

Typo. That bottom line should be:

bitmask_finished = 0100

Chris M. Thomasson

unread,
Jul 24, 2022, 8:05:05 PM7/24/22
to
On 7/22/2022 7:28 AM, Frederick Virchanza Gotham wrote:
>
> I have one main thread and eight worker threads.
>
> The main thread is reading in data from a file, and at every 8 kilobytes, it distributes the workload evenly to the eight worker threads, so each worker thread processes 1 kB at a time.
>
> By the time all the worker threads have finished processing their kilobyte, the main thread has prepared another 8 kB workload to distribute.


It depends on how flexible you want to be. Can worker threads begin to
process an 1kb chunk of data _before_ the main thread is finished
getting the 8kb chunk ready, so to speak?

Is the 8kb important, or arbitrary?


> It is very important that the previous workload has been fully processed before the next workload is dished out.

So, the main thread cannot read the next 8kb and prepare it for work
_until_ the previous 8kb has been processed?

[...]

Chris M. Thomasson

unread,
Jul 24, 2022, 8:33:56 PM7/24/22
to
On 7/22/2022 7:28 AM, Frederick Virchanza Gotham wrote:
>
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

YIKES! This sounds very fishy.

> while ( 0u != (bitmask_started & (1u << thread_id) ) )
> {
> std::this_thread::sleep_for(std::chrono::milliseconds(1u));
> }

^^^^^^^^^^^^^^^^^^^^^^^^

Strange.


>
> sem.acquire();
>
> bitmask_started |= (1u << thread_id);
> assert( 0u == (bitmask_finished & (1u << thread_id)) );
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Frederick Virchanza Gotham

unread,
Jul 25, 2022, 6:30:09 PM7/25/22
to
On Monday, July 25, 2022 at 1:05:05 AM UTC+1, Chris M. Thomasson wrote:

> So, the main thread cannot read the next 8kb and prepare it for work
> _until_ the previous 8kb has been processed?


That's correct. I'm doing encryption, and so the previous block must be finished so that I have an IV for the next block (for example when implementing CBC (Cipher Block Chain) encryption).

I've taken my "Rendezvous" class and I've implemented it 4 different ways:
(1) Polling atomics
(2) Using one mutex and one condition variable
(3) Using 32 semaphores (two per thread)
(4) Using 16 mutexes and 16 condition variables (one of each per thread)

You can see it up on Github here:

https://github.com/healytpk/rendezvous

and also I've copy-pasted all the code here:

#include <mutex> // mutex, unique_lock
#include <condition_variable> // condition_variable

class Gate {
private:

std::mutex m;
std::condition_variable cv;
bool is_gate_open = false;

public:

void open(void)
{
m.lock();
is_gate_open = true;
m.unlock();
cv.notify_one();
}

void close(void)
{
m.lock();
is_gate_open = false;
m.unlock();
cv.notify_one();
}

void wait_for_open(void)
{
std::unique_lock<std::mutex> lock(m);
while ( false == is_gate_open ) cv.wait(lock);
}

void wait_for_open_and_then_immediately_close_without_notification(void)
{
{
std::unique_lock<std::mutex> lock(m);
while ( false == is_gate_open ) cv.wait(lock);
}

m.lock();
is_gate_open = false;
m.unlock();
}

void wait_for_close(void)
{
std::unique_lock<std::mutex> lock(m);
while ( is_gate_open ) cv.wait(lock);
}
};

/*
Four Modes
==========
0 = (undefined)
1 = poll atomics
2 = one mutex, one condition variable
3 = two binary_semaphores per working thread
4 = one Gate per thread (i.e. one mutex and one conditional variable per thread)
*/

#ifdef RENDEZVOUS_DEBUG
static bool constexpr debug_rendezvous = true;
#else
static bool constexpr debug_rendezvous = false;
#endif

#include <cassert> // assert
#ifndef NDEBUG
# include <thread> // this_thread::get_id()
#endif

class IRendezvous {
public:

virtual char const *Which_Derived_Class(void) const noexcept = 0;

protected:

unsigned const how_many_worker_threads;

#ifndef NDEBUG
std::thread::id const id_main_thread;
#endif

IRendezvous(unsigned const arg) noexcept
: how_many_worker_threads(arg)
#ifndef NDEBUG
, id_main_thread(std::this_thread::get_id())
#endif
{
assert( (how_many_worker_threads >= 2u)
&& (how_many_worker_threads <= 16u) );
}

public:

// Three methods to be invoked by Main Thread
virtual void Distribute_Workload(void) = 0;
virtual void Rendezvous(void) = 0;
virtual void Finish(void) = 0;

// Two methods to be invoked by worker threads
virtual bool Worker_Thread_Wait_For_Work(unsigned thread_id) = 0;
virtual void Worker_Thread_Report_Work_Finished(unsigned thread_id) = 0;

// Delete the 3 constructors: no parameters, copy-construct, move-construct
IRendezvous(void) = delete;
IRendezvous(IRendezvous const & ) = delete;
IRendezvous(IRendezvous &&) = delete;

// Delete the 2 assignment operators
IRendezvous &operator=(IRendezvous const & ) = delete;
IRendezvous &operator=(IRendezvous &&) = delete;
};

#include <string>

// The following function is just for debugging
static inline std::string MakeStr(char const *const a, unsigned const i, char const *const b)
{
std::string retval = a;
retval += std::to_string(i);
retval += b;
return retval;
}

#include <cstddef> // size_t
#include <mutex> // mutex, unique_lock
#include <condition_variable> // condition_variable
#include <semaphore> // counting_semaphore, binary_semaphore
#include <atomic> // atomic<>

#include <iostream> // REMOVE THIS -- Just for debugging
#include <string> // REMOVE THIS -- Just for debugging

class Rendezvous_Poll_Atomic final : public IRendezvous {
public:

char const *Which_Derived_Class(void) const noexcept override { return "Rendezvous_Poll_Atomic"; }

private:

std::atomic<bool> should_finish{ false };

// A bitmask is used to keep track of which threads have started
// and which have finished. For example, the following two bitmasks
// indicate that the 1st, 3rd and 4th thread have started, but only
// the 3rd has finished:
// bitmask_started = 1101
// bitmask_finished = 0100
std::atomic<unsigned> bitmask_started { static_cast<unsigned>(-1) },
bitmask_finished{ static_cast<unsigned>(-1) };

inline bool Are_All_Workers_Finished(void) const noexcept
{
// For example, if we have 5 worker threads, then:
// Step 1: Shift 1 by 5 : 1u << 5u == 0b100000
// Step 2: Subtract 1 : 0b100000 - 1u == 0b11111
//
// We know all threads are finished when all bits are set
assert( bitmask_finished < (1u << how_many_worker_threads) );

return ((1u << how_many_worker_threads) - 1u) == bitmask_finished;
}

public:

Rendezvous_Poll_Atomic(unsigned const arg) noexcept : IRendezvous(arg) {}

void Distribute_Workload(void) override
{
if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Distribute_Workload\n";

// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

// The two 'assert' statements on the next lines make sure of
// two things:
// (1) The threads that have started == The threads that have finished
// (2) Either no threads have finished, or all threads have finished
assert( bitmask_started == bitmask_finished );
assert( (bitmask_started == static_cast<unsigned>(-1))
|| (bitmask_started == 0u)
|| (bitmask_started == ((1u << how_many_worker_threads) - 1u)) );

bitmask_finished = 0u;
bitmask_started = 0u; // This is the line that starts the spinners going again
}

void Rendezvous(void) override
{
// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

while ( false == Are_All_Workers_Finished() );

if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Rendezvous\n";
}

void Finish(void) override
{
if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Finish (this will invoke Distribute_Workload)\n";

// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

should_finish = true;
Distribute_Workload();
}

bool Worker_Thread_Wait_For_Work(unsigned const thread_id) override
{
// The 'assert' on the next line makes sure that this method is
// NOT invoked from the main thread.
assert( std::this_thread::get_id() != id_main_thread );

assert( thread_id < how_many_worker_threads );

while ( 0u != (bitmask_started & (1u << thread_id)) );

if constexpr ( debug_rendezvous ) std::cerr << MakeStr("Worker thread ", thread_id, should_finish ? ": Shutting down\n" : ": Received work\n");

assert( 0u == (bitmask_started & (1u << thread_id) ) );

bitmask_started |= (1u << thread_id);
assert( 0u == (bitmask_finished & (1u << thread_id)) );

return false == should_finish;
}

void Worker_Thread_Report_Work_Finished(unsigned const thread_id) override
{
if constexpr ( debug_rendezvous ) std::cerr << MakeStr("Worker thread ", thread_id, ": Reporting its own work done\n");

// The 'assert' on the next line makes sure that this method is
// NOT invoked from the main thread.
assert( std::this_thread::get_id() != id_main_thread );

assert( thread_id < how_many_worker_threads );
assert( 0u != (bitmask_started & (1u << thread_id) ) );
assert( 0u == (bitmask_finished & (1u << thread_id) ) );

if ( (bitmask_finished | (1u << thread_id)) == ((1u << how_many_worker_threads) - 1u) )
{
if constexpr ( debug_rendezvous ) std::cerr << MakeStr("Worker thread ", thread_id, ": - - - REPORTING ALL WORK DONE - - -\n");
}

bitmask_finished |= (1u << thread_id); // For the last thread to finish, this will unspin the main thread
}

// Delete the 3 constructors: no parameters, copy-construct, move-construct
Rendezvous_Poll_Atomic(void) = delete;
Rendezvous_Poll_Atomic(Rendezvous_Poll_Atomic const & ) = delete;
Rendezvous_Poll_Atomic(Rendezvous_Poll_Atomic &&) = delete;

// Delete the 2 assignment operators
Rendezvous_Poll_Atomic &operator=(Rendezvous_Poll_Atomic const & ) = delete;
Rendezvous_Poll_Atomic &operator=(Rendezvous_Poll_Atomic &&) = delete;
};

#include "gate.hpp" // Gate

class Rendezvous_One_Condition_Variable final : public IRendezvous {
public:

char const *Which_Derived_Class(void) const noexcept override { return "Rendezvous_One_Condition_Variable"; }

private:

std::atomic<bool> should_finish{ false };

// A bitmask is used to keep track of which threads have started
// and which have finished. For example, the following two bitmasks
// indicate that the 1st, 3rd and 4th thread have started, but only
// the 3rd has finished:
// bitmask_started = 1101
// bitmask_finished = 0100
std::atomic<unsigned> bitmask_started { static_cast<unsigned>(-1) },
bitmask_finished{ static_cast<unsigned>(-1) };

Gate gate;
std::counting_semaphore<16u> sem{0u};

public:

Rendezvous_One_Condition_Variable(unsigned const arg) noexcept : IRendezvous(arg) {}

void Distribute_Workload(void) override
{
if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Distribute_Workload\n";

// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

// The two 'assert' statements on the next lines make sure of
// two things:
// (1) The threads that have started == The threads that have finished
// (2) Either no threads have finished, or all threads have finished
assert( bitmask_started == bitmask_finished );
assert( (bitmask_started == static_cast<unsigned>(-1))
|| (bitmask_started == 0u)
|| (bitmask_started == ((1u << how_many_worker_threads) - 1u)) );

bitmask_finished = 0u;
bitmask_started = 0u; // This is the line that starts the spinners going again

sem.release(how_many_worker_threads);
}

void Rendezvous(void) override
{
// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

gate.wait_for_open_and_then_immediately_close_without_notification();

if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Rendezvous\n";
}

void Finish(void) override
{
if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Finish (this will invoke Distribute_Workload)\n";

// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

should_finish = true;
Distribute_Workload();
}

bool Worker_Thread_Wait_For_Work(unsigned const thread_id) override
{
// The 'assert' on the next line makes sure that this method is
// NOT invoked from the main thread.
assert( std::this_thread::get_id() != id_main_thread );

assert( thread_id < how_many_worker_threads );

for (; /* ever */ ;)
{
//std::cerr << "Acquiring semaphore...\n";
//std::this_thread::sleep_for(std::chrono::milliseconds(5u));
sem.acquire();
//std::cerr << "Semaphore acquired...\n";

// The following line accommodates the circumstances in
// which one thread would acquire the semaphore two times for the
// same workload. This won't happen if each thread takes
// milliseconds to execute, and if the thread scheduler takes
// only microseconds to start another thread going again.
if ( 0u == (bitmask_started & (1u << thread_id) ) ) break;

sem.release();

if constexpr ( debug_rendezvous )
{
std::cerr << MakeStr("= = = = = = = = = = = = The same worker thread (", thread_id,
") acquired a semaphore more"
" than once for the same workload. Now releasing"
" and sleeping for 1 millisecond = = = = = = = = = = = =\n");

std::this_thread::sleep_for(std::chrono::milliseconds(1u));
}
}

if constexpr ( debug_rendezvous ) std::cerr << MakeStr("Worker thread ", thread_id, should_finish ? ": Shutting down\n" : ": Received work\n");

assert( 0u == (bitmask_started & (1u << thread_id) ) );

bitmask_started |= (1u << thread_id);
assert( 0u == (bitmask_finished & (1u << thread_id)) );

return false == should_finish;
}

void Worker_Thread_Report_Work_Finished(unsigned const thread_id) override
{
if constexpr ( debug_rendezvous ) std::cerr << MakeStr("Worker thread ", thread_id, ": Reporting its own work done\n");

// The 'assert' on the next line makes sure that this method is
// NOT invoked from the main thread.
assert( std::this_thread::get_id() != id_main_thread );

assert( thread_id < how_many_worker_threads );
assert( 0u != (bitmask_started & (1u << thread_id) ) );
assert( 0u == (bitmask_finished & (1u << thread_id) ) );

if ( (bitmask_finished | (1u << thread_id)) == ((1u << how_many_worker_threads) - 1u) )
{
if constexpr ( debug_rendezvous ) std::cerr << MakeStr("Worker thread ", thread_id, ": - - - REPORTING ALL WORK DONE - - -\n");
}

unsigned const tmp = bitmask_finished.fetch_or(1u << thread_id);

if ( tmp == ( ((1u << how_many_worker_threads) - 1u) - (1u << thread_id) ) )
{
gate.open();
}
}

// Delete the 3 constructors: no parameters, copy-construct, move-construct
Rendezvous_One_Condition_Variable(void) = delete;
Rendezvous_One_Condition_Variable(Rendezvous_One_Condition_Variable const & ) = delete;
Rendezvous_One_Condition_Variable(Rendezvous_One_Condition_Variable &&) = delete;

// Delete the 2 assignment operators
Rendezvous_One_Condition_Variable &operator=(Rendezvous_One_Condition_Variable const & ) = delete;
Rendezvous_One_Condition_Variable &operator=(Rendezvous_One_Condition_Variable &&) = delete;
};

class Rendezvous_32_Semaphores final : public IRendezvous {
public:

char const *Which_Derived_Class(void) const noexcept override { return "Rendezvous_32_Semaphores"; }

private:

std::atomic<bool> should_finish{ false };

std::binary_semaphore sems_start [16u],
sems_finish[16u];

typedef std::binary_semaphore bs;

public:

Rendezvous_32_Semaphores(unsigned const arg) noexcept
: IRendezvous(arg),
sems_start {bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u)},
sems_finish{bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u),bs(0u)}
{
/* Nothing to do in here */
}

void Distribute_Workload(void) override
{
// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Distribute_Workload\n";

for ( unsigned i = 0u; i != how_many_worker_threads; ++i )
{
sems_start[i].release();
}
}

void Rendezvous(void) override
{
// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

for ( unsigned i = 0u; i != how_many_worker_threads; ++i )
{
sems_finish[i].acquire();
}

if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Rendezvous\n";
}

void Finish(void) override
{
// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Finish (this will invoke Distribute_Workload)\n";

should_finish = true;
Distribute_Workload();
}

bool Worker_Thread_Wait_For_Work(unsigned const thread_id) override
{
// The 'assert' on the next line makes sure that this method is
// NOT invoked from the main thread.
assert( std::this_thread::get_id() != id_main_thread );

assert( thread_id < how_many_worker_threads );

sems_start[thread_id].acquire();

if constexpr ( debug_rendezvous ) std::cerr << MakeStr("Worker thread ", thread_id, should_finish ? ": Shutting down\n" : ": Received work\n");

return false == should_finish;
}

void Worker_Thread_Report_Work_Finished(unsigned const thread_id) override
{
// The 'assert' on the next line makes sure that this method is
// NOT invoked from the main thread.
assert( std::this_thread::get_id() != id_main_thread );

if constexpr ( debug_rendezvous ) std::cerr << MakeStr("Worker thread ", thread_id, ": Reporting its own work done\n");

sems_finish[thread_id].release();
}

// Delete the 3 constructors: no parameters, copy-construct, move-construct
Rendezvous_32_Semaphores(void) = delete;
Rendezvous_32_Semaphores(Rendezvous_32_Semaphores const & ) = delete;
Rendezvous_32_Semaphores(Rendezvous_32_Semaphores &&) = delete;

// Delete the 2 assignment operators
Rendezvous_32_Semaphores &operator=(Rendezvous_32_Semaphores const & ) = delete;
Rendezvous_32_Semaphores &operator=(Rendezvous_32_Semaphores &&) = delete;
};

class Rendezvous_16_Gates final : public IRendezvous {
public:

char const *Which_Derived_Class(void) const noexcept override { return "Rendezvous_16_Gates"; }

private:

std::atomic<bool> should_finish{ false };

Gate gates[16u];

typedef std::binary_semaphore bs;

public:

Rendezvous_16_Gates(unsigned const arg) noexcept : IRendezvous(arg) {}

void Distribute_Workload(void) override
{
// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Distribute_Workload\n";

for ( unsigned i = 0u; i != how_many_worker_threads; ++i )
{
gates[i].open();
}
}

void Rendezvous(void) override
{
// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

for ( unsigned i = 0u; i != how_many_worker_threads; ++i )
{
gates[i].wait_for_close();
}

if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Rendezvous\n";
}

void Finish(void) override
{
// The 'assert' on the next line makes sure that this method is
// only invoked from the main thread.
assert( std::this_thread::get_id() == id_main_thread );

if constexpr ( debug_rendezvous ) std::cerr << "Main thread: Finish (this will invoke Distribute_Workload)\n";

should_finish = true;
Distribute_Workload();
}

bool Worker_Thread_Wait_For_Work(unsigned const thread_id) override
{
// The 'assert' on the next line makes sure that this method is
// NOT invoked from the main thread.
assert( std::this_thread::get_id() != id_main_thread );

assert( thread_id < how_many_worker_threads );

gates[thread_id].wait_for_open();

if constexpr ( debug_rendezvous ) std::cerr << MakeStr("Worker thread ", thread_id, should_finish ? ": Shutting down\n" : ": Received work\n");

return false == should_finish;
}

void Worker_Thread_Report_Work_Finished(unsigned const thread_id) override
{
// The 'assert' on the next line makes sure that this method is
// NOT invoked from the main thread.
assert( std::this_thread::get_id() != id_main_thread );

if constexpr ( debug_rendezvous ) std::cerr << MakeStr("Worker thread ", thread_id, ": Reporting its own work done\n");

gates[thread_id].close();
}

// Delete the 3 constructors: no parameters, copy-construct, move-construct
Rendezvous_16_Gates(void) = delete;
Rendezvous_16_Gates(Rendezvous_16_Gates const & ) = delete;
Rendezvous_16_Gates(Rendezvous_16_Gates &&) = delete;

// Delete the 2 assignment operators
Rendezvous_16_Gates &operator=(Rendezvous_16_Gates const & ) = delete;
Rendezvous_16_Gates &operator=(Rendezvous_16_Gates &&) = delete;
};

#include <type_traits> // conditional

template<unsigned mode>
using Rendezvous =
std::conditional_t<
1u == mode,
Rendezvous_Poll_Atomic,
std::conditional_t<2u == mode,
Rendezvous_One_Condition_Variable,
std::conditional_t<3u == mode, Rendezvous_32_Semaphores,Rendezvous_16_Gates> > >;

Chris M. Thomasson

unread,
Jul 25, 2022, 6:40:50 PM7/25/22
to
On 7/25/2022 3:30 PM, Frederick Virchanza Gotham wrote:
> On Monday, July 25, 2022 at 1:05:05 AM UTC+1, Chris M. Thomasson wrote:
>
>> So, the main thread cannot read the next 8kb and prepare it for work
>> _until_ the previous 8kb has been processed?
>
>
> That's correct. I'm doing encryption, and so the previous block must be finished so that I have an IV for the next block (for example when implementing CBC (Cipher Block Chain) encryption).

How are you parallelizing a single block? Well, I guess I need to see
your cipher algorithm. Fwiw, my experimental HMAC cipher has the same
issue where a previous hash digest must be computed before the next:

http://funwithfractals.atspace.cc/ct_cipher

http://fractallife247.com/test/hmac_cipher/ver_0_0_0_1?ct_hmac_cipher=d2779ad54504797fd10ed10e3c347d9f8f35b15b14c60a9fe03c748b6573c67a90a5579efa48647f26758fe6bb5944b6f2d5f6ef49249a44d63339da630dcad0bff62a7df1745354861a168838503c64cf13f131e70b37139b17f6a843fc

A message encrypted with the default key, for everyone to see... ;^)

>
> I've taken my "Rendezvous" class and I've implemented it 4 different ways:
> (1) Polling atomics
> (2) Using one mutex and one condition variable
> (3) Using 32 semaphores (two per thread)
> (4) Using 16 mutexes and 16 condition variables (one of each per thread)
>
> You can see it up on Github here:
>
> https://github.com/healytpk/rendezvous
>
> and also I've copy-pasted all the code here:
> [...]

How are you assigning work to a worker thread? A range in a buffer?

Frederick Virchanza Gotham

unread,
Jul 27, 2022, 6:51:47 AM7/27/22
to
On Monday, July 25, 2022 at 11:40:50 PM UTC+1, Chris M. Thomasson wrote:

> How are you parallelizing a single block?


I read 384 bytes in from the files, into a big buffer like this:

char buf[6][64];

Each of the six worker threads accesses the array like this:

char (&data)[64] = buf[thread_id];
Encrypt_Data_In_Place(data);

So the main thread populates the global array, "buf", sets the workers to work, waits for them all to finish, and then loads another 384 bytes into "buf".

You get me?

(In a previous post I wrote 8kb with 8 threads but I was just picking nice round numbers. Really I have 6 threads each processing 64 bytes at a time -- yeah I should probably increase that massively).

Paavo Helde

unread,
Jul 27, 2022, 7:47:16 AM7/27/22
to
If you are doing things like this, you should also turn attention to
cache lines and false sharing. See e.g.
std::hardware_destructive_interference_size
("https://en.cppreference.com/w/cpp/thread/hardware_destructive_interference_size")

64 bytes separation is probably just fine (for the current hardware,
that is).

Scott Lurndal

unread,
Jul 27, 2022, 10:02:45 AM7/27/22
to
There is hardware out in the real world with 128-byte cache lines. Primarily
in the DPU arena.

Chris M. Thomasson

unread,
Jul 27, 2022, 7:18:09 PM7/27/22
to
On 7/27/2022 3:51 AM, Frederick Virchanza Gotham wrote:
> On Monday, July 25, 2022 at 11:40:50 PM UTC+1, Chris M. Thomasson wrote:
>
>> How are you parallelizing a single block?
>
>
> I read 384 bytes in from the files, into a big buffer like this:
>
> char buf[6][64];
>
> Each of the six worker threads accesses the array like this:
>
> char (&data)[64] = buf[thread_id];
> Encrypt_Data_In_Place(data);
>
> So the main thread populates the global array, "buf", sets the workers to work, waits for them all to finish, and then loads another 384 bytes into "buf".

Okay, so you must be creating your own thread_id's. Humm... I am not
exactly sure if threads are going to help you here. One thread can
probably process such a small buffer all by itself faster than the
synchronization costs of actually distributing the buffer: Have you
comparing the two? How much faster does the multi-threaded parallel
version go?


> You get me?

I think so. I thought about how to parallelize my non-parallelizable
cipher algorithm. Basically, it's not really worth it. I would have to
be able to compute a SHA2 hash in parallel. The block sizes are small.
SHA2-384/256/512 is just too small of workload to multi-thread it, well,
afaict. I am not sure how to do it. A hash digest n+1 needs to wait
until hash digest n is completed. So, no threading for me! I just
doesn't make sense. Humm...


> (In a previous post I wrote 8kb with 8 threads but I was just picking nice round numbers. Really I have 6 threads each processing 64 bytes at a time -- yeah I should probably increase that massively).

Big time! Are you reading in a ciphertext/plaintext as a file, or a stream?

Frederick Virchanza Gotham

unread,
Jul 28, 2022, 10:25:49 AM7/28/22
to
On Thursday, July 28, 2022 at 12:18:09 AM UTC+1, Chris M. Thomasson wrote:

> Big time! Are you reading in a ciphertext/plaintext as a file, or a stream?

My program reads from stdin and writes to stdout. So if you want to encrypt a file, you do this:

./prog < plain_text.txt > cipher_text.txt

Chris M. Thomasson

unread,
Jul 28, 2022, 3:30:50 PM7/28/22
to
I thought so. However, I am still not exactly sure how threading is
going to help you here... The main thread can read in a very large
buffer from the stream. Then, it can try to parallelize on a per block?
If these blocks are small, I am not sure how threading it can help.

Are you sure a single high priority thread cannot process a block faster
than six low priority threads? For some reason, your numbers seem to be
arbitrary. Does your encryption impose a limit on the block size? For
instance, my cipher relies on the size of a digest produced from a hash
algorithm. The hash algorithm is part of the secret key itself...

Chris M. Thomasson

unread,
Jul 28, 2022, 3:32:50 PM7/28/22
to
On 7/28/2022 12:30 PM, Chris M. Thomasson wrote:
> On 7/28/2022 7:25 AM, Frederick Virchanza Gotham wrote:
>> On Thursday, July 28, 2022 at 12:18:09 AM UTC+1, Chris M. Thomasson
>> wrote:
>>
>>> Big time! Are you reading in a ciphertext/plaintext as a file, or a
>>> stream?
>>
>> My program reads from stdin and writes to stdout. So if you want to
>> encrypt a file, you do this:
>>
>> ./prog < plain_text.txt > cipher_text.txt
>>
>
> I thought so. However, I am still not exactly sure how threading is
> going to help you here... The main thread can read in a very large
> buffer from the stream. Then, it can try to parallelize on a per block?
> If these blocks are small, I am not sure how threading it can help.

Per block, with the god damn clause that block/digest n+1 cannot be
processed, until n is processed... That's a killer for parallel
operations...

[...]

Öö Tiib

unread,
Jul 29, 2022, 7:47:58 AM7/29/22
to
Yes, the parallel operations make sense only when the whole
workload can be split to independent from each other tasks. If that
is not the case then it often turns into waste. One type of waste
is most workers often waiting for dependencies, other type is that
workers eagerly do (and cancel) tasks that might be hopefully useful
for whole goal but life may also show that they just warmed room
pointlessly. That is complex to follow and inefficient but kind of
reminds behaviour of some human collectives. :D

Chris M. Thomasson

unread,
Aug 5, 2022, 2:46:28 AM8/5/22
to
Indeed. Nice write up. Actually, I have a need to multi thread one of my
vector fields. It's ripe for parallelization as multiple field lines can
be plotted at the same time. This is _not_ the embarrassingly parallel
version that can be found here:

https://www.shadertoy.com/view/MdcyRs

This idea will multi-thread another way of plotting the field,
segment-by-segment. Or just points along a field line. Basically, to
plot a line, take a point A, find its vector field normal, travel down
this normal from A a short distance, call it E for epsilon, to create a
new point B. Draw a line from A to B, set A to B, repeat... That's it.

So, threads plotting lines can possible interfere with one another wrt
points plotted. So, I need to create a new canvas that has a nice cache
aligned setup. Have some ideas. This deserves a new thread. Will be all
in C++.

Here is an online version of my field, does it work for you? Click
around within the image:

http://fractallife247.com/test/vfield

James Lothian

unread,
Aug 9, 2022, 3:00:42 PM8/9/22
to
Frederick Virchanza Gotham wrote:
>
> I have one main thread and eight worker threads.
>
> The main thread is reading in data from a file, and at every 8 kilobytes, it distributes the workload evenly to the eight worker threads, so each worker thread processes 1 kB at a time.
>
> By the time all the worker threads have finished processing their kilobyte, the main thread has prepared another 8 kB workload to distribute. It is very important that the previous workload has been fully processed before the next workload is dished out.
>
> So it's a constant cycle of the main thread distributing the workload, then all of the threads rendezvousing later for the main thread to distribute the next workload.
>

Off the top of the head, this sounds like a job for a semaphore. The
main thread hands off the workloads to the N worker threads. It then
decrements a semaphore N times. Meanwhile, each worker increments the
semaphore when it's finished its workload. When the main thread has
successfully decremented the semaphore N times, it knows the workers
have finished their current workloads.

James

Chris M. Thomasson

unread,
Aug 9, 2022, 3:29:50 PM8/9/22
to
That would work for sure.

Chris M. Thomasson

unread,
Aug 9, 2022, 3:37:21 PM8/9/22
to
Think of an atomic counter S:

N worker threads
S = 0

main_thread: sets counter S to N+1

main_thread: distributes work to N worker threads

main_thread: decrements S by one, if result is _not_ zero, it waits on a
slow bin_sema.


worker_thread: decrements S by one, if the result _is_ zero, it signals
the slow bin_sema.


This can be improved on for sure.

Chris M. Thomasson

unread,
Aug 11, 2022, 2:13:03 AM8/11/22
to
Ahhh.... The quick and easy way. I create N worker threads, each with
its own copy of the vector field, and its own file to plot to. This
eliminates any worker threads possibly fighting over the same pixel.
After the main workload is completed by the worker threads, The main
thread assembles the final rendering by superimposing the workers render
results. Should work.

0 new messages