Google Groups no longer supports new Usenet posts or subscriptions. Historical content remains viewable.
Dismiss

Threading Review/Feedback

24 views
Skip to first unread message

Christopher Pisz

unread,
Sep 1, 2015, 4:07:38 PM9/1/15
to

I've created a new minimal compilable example of what I want to achieve
in production code. Can you guys take a look and see if I broke any
rules, created deadlock situations, did things right? I am new to
std::thread and std::condition_variable, not to mention C++11 vs C++98
in general.

One of my concerns in whether or not I should do anything in the
destructor of my ThreadedOnGoingTask and whether or not I've created any
potential deadlock by my use of std::condition_variable::wait_for with
shared variable g_stop.

Sorry, this code is long. Threading is complicated! This really is the
the most minimal compilable example I could come up with.

___
main.cpp


// Project Includes
#include "ThreadedOngoingTask.h"

// Shared Library
#include "Logger.h" // Replace with any threadsafe output

// Standard Library
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#include <string>
#include <thread>
#include <vector>

#include <signal.h>

//--------------------------------------------------------------------------------------------------
// TODO - Only global for concept testing, later we'll wrap all this in
a class
std::vector<ThreadedOngoingTask::Shared_Ptr> g_workerObjects;
// Tasks wrapped in a class that represent each child thread

std::mutex g_threadsExitedMutex; // For
locks to read/write the following two variables
std::vector<std::shared_ptr<bool> > g_threadsExited; // Bool
that gets set to true by a child thread immediatly before the child exits
std::condition_variable g_threadsExitedCV; //
Condition Variable that gets notified immediatly before a child thread exits

std::mutex g_stopMutex; // For
locks to read/write the following two variables
bool g_stop; // Flag
stops child threads
std::condition_variable g_stopCV; //
Condition Variable that gets notified when child threads should exit

//--------------------------------------------------------------------------------------------------
void SignalHandler(int signal)
{
// Signal worker thread to exit
const std::string msg("Halt signal caught. Exiting all threads...");
Shared::Logger::getInstance()->logExecutionEvent(msg, __FILE__,
__LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);

std::unique_lock<std::mutex> lock(g_stopMutex);
g_stop = true;
g_stopCV.notify_all();
}

//--------------------------------------------------------------------------------------------------
int main()
{
// Register a handler for the ctrl-z signal
signal(SIGINT, SignalHandler);

// Create the workers and start thier threads
for(size_t index = 0; index < 10; ++index)
{
std::shared_ptr<bool> threadExited(new bool(false));

ThreadedOngoingTask::Shared_Ptr worker(new
ThreadedOngoingTask(g_threadsExitedMutex,
threadExited,
g_threadsExitedCV,
g_stopMutex,
g_stop,
g_stopCV));
ThreadedOngoingTask::Start(*worker);

g_threadsExited.push_back(threadExited);
g_workerObjects.push_back(worker);
}

// Wait for all child threads to exit
std::unique_lock<std::mutex> lock(g_threadsExitedMutex);
while(std::any_of(g_threadsExited.begin(), g_threadsExited.end(),
[](std::shared_ptr<bool> threadExited){ return !(*threadExited); }))
{
g_threadsExitedCV.wait(lock);
}

// Clean up (even though these go away at exit..for debugging)
g_threadsExited.clear();
g_workerObjects.clear();

// Done
return 0;
}

____
ThreadedOngoingTask.h

#pragma once

// Standard Includes
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>

//--------------------------------------------------------------------------------------------------
class ThreadedOngoingTask
{
public:

typedef std::shared_ptr<ThreadedOngoingTask> Shared_Ptr;

static void Start(ThreadedOngoingTask & ThreadedOngoingTask);

ThreadedOngoingTask(std::mutex & threadsExitedMutex,
std::shared_ptr<bool> threadExited,
std::condition_variable & threadsExitedCV,
std::mutex & stopMutex,
bool & stop,
std::condition_variable & stopCV);

ThreadedOngoingTask(const ThreadedOngoingTask & rhs) = delete;
ThreadedOngoingTask & operator = (const ThreadedOngoingTask & rhs)
= delete;
~ThreadedOngoingTask();

protected:

void ThreadProcedure();

static size_t m_count;
size_t m_id;
std::thread m_thread; // Worker thread
this object will run on
std::function<void()> m_threadProcedure; // Function
object that is the main loop for the thread

std::mutex & m_threadsExitedMutex; // Owned by
parent thread. For locks to read/write the following two variables
std::shared_ptr<bool> m_threadExited; // Owned by
parent thread. Bool that gets set to true by a child thread immediatly
before the child exits
std::condition_variable & m_threadsExitedCV; // Owned by
parent thread. Condition Variable that gets notified immediatly before a
child thread exits
std::mutex & m_stopMutex; // Owned by
parent thread. For locks to read/write the following two variables
bool & m_stop; // Owned by
parent thread. Flag stops child threads
std::condition_variable & m_stopCV; // Condition
Variable that gets notified when child threads should exit
};

___
ThreadedOngoingTask.cpp

#include "ThreadedOngoingTask.h"

// Shared Includes
#include "Logger.h"

// Standard Includes
#include <iostream>

//--------------------------------------------------------------------------------------------------
size_t ThreadedOngoingTask::m_count = 0;

//--------------------------------------------------------------------------------------------------
void ThreadedOngoingTask::Start(ThreadedOngoingTask & ThreadedOngoingTask)
{
// Start the thread

ThreadedOngoingTask.m_thread.swap(std::thread(ThreadedOngoingTask.m_threadProcedure));
ThreadedOngoingTask.m_thread.detach();
}

//--------------------------------------------------------------------------------------------------
ThreadedOngoingTask::ThreadedOngoingTask(std::mutex & threadsExitedMutex,
std::shared_ptr<bool>
threadExited,
std::condition_variable &
threadsExitedCV,
std::mutex & stopMutex,
bool & stop,
std::condition_variable & stopCV)
:
m_id (m_count++),
m_thread (),
// Derived classes should override this behavior by binding their
own thread procedure in their constructor
m_threadProcedure
(std::bind(&ThreadedOngoingTask::ThreadProcedure, this)),
m_threadsExitedMutex(threadsExitedMutex),
m_threadExited (threadExited),
m_threadsExitedCV (threadsExitedCV),
m_stopMutex (stopMutex),
m_stop (stop),
m_stopCV (stopCV)
{
std::ostringstream msg;
msg << "ThreadedOngoingTask #" << m_id << " constructor has been
called.";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);
}

//--------------------------------------------------------------------------------------------------
ThreadedOngoingTask::~ThreadedOngoingTask()
{
std::ostringstream msg;
msg << "ThreadedOngoingTask #" << m_id << " deconstructor has been
called.";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);

// Note - std::thread has no mechanism of killing the thread
// I suppose we are in UDB here if the object is destroyed
before the thread exits
// I don't know what I should do. I don't really want to
flag all threads to exit by playing with the m_stop
// Do I just hope we never get here before it exited and
that all exceptions are caught in ThreadProcedure?
}

//--------------------------------------------------------------------------------------------------
void ThreadedOngoingTask::ThreadProcedure()
{
std::ostringstream msg;
msg << "ThreadedOngoingTask #" << m_id << " thread procedure has
been called.";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);

// Do not allow exceptions to escape the thread
try
{
// Loop forever until signalled to exit
std::unique_lock<std::mutex> lock(m_stopMutex);

while(!m_stopCV.wait_for(lock, std::chrono::seconds(10),
[this](){return m_stop;}) )
{
std::ostringstream msg;
msg << "ThreadedOngoingTask #" << m_id << " tick...";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);
}
}
catch(...)
{
std::ostringstream msg;
msg << "An unhandled exception occured in thread of
ThreadedOngoingTask #" << m_id << ". Stopping thread.";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);
}

msg.str("");
msg.clear();
msg << "ThreadedOngoingTask #" << m_id << " stopped. Exiting thread.";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);

// Notify parent the thread has exited
std::unique_lock<std::mutex> lock(m_threadsExitedMutex);
*m_threadExited = true;
std::notify_all_at_thread_exit(m_threadsExitedCV, std::move(lock));
}



--
I have chosen to troll filter/ignore all subthreads containing the
words: "Rick C. Hodgins", "Flibble", and "Islam"
So, I won't be able to see or respond to any such messages
---

Paavo Helde

unread,
Sep 1, 2015, 5:02:15 PM9/1/15
to
Christopher Pisz <nos...@notanaddress.com> wrote in
news:ms50eb$sro$1...@dont-email.me:

>
> I've created a new minimal compilable example of what I want to
> achieve in production code. Can you guys take a look and see if I
> broke any rules, created deadlock situations, did things right? I am
> new to std::thread and std::condition_variable, not to mention C++11
> vs C++98 in general.

One thing caught my eye - in a POSIX multithreaded program about the only
sane way to handle signals is to block signals in all threads, create a
separate thread for signal handling and call sigwait() in that thread. Your
code has an old-fashioned signal handler in which one can use only very
limited functionality, I am pretty sure your signal handler is overstepping
that.

hth
Paavo

Christopher Pisz

unread,
Sep 1, 2015, 6:28:06 PM9/1/15
to
I don't seem to have a sigwait function available, as I am on Windows
and am trying to use standard C++.

I am not too worried about the signal handler anyway though, as it is
only going to be in my debug version running a console app in
production. Normally, this will be part of a Windows Service and waiting
for the service stop notification.

Gerhard Wolf

unread,
Sep 2, 2015, 1:30:17 AM9/2/15
to
Am 01.09.2015 um 22:07 schrieb Christopher Pisz:
>
> I've created a new minimal compilable example of what I want to achieve
> in production code.
at last Logger class header is missing.

Christopher J. Pisz

unread,
Sep 2, 2015, 2:17:55 AM9/2/15
to
There was a comment in the first appearance of logger.h that says, just
replace it with any old threadsafe output mechanism. I don't want to
include all that irrelevant code, it would just be distracting.
0 new messages