Christopher Pisz
unread,Sep 1, 2015, 4:07:38 PM9/1/15You do not have permission to delete messages in this group
Either email addresses are anonymous for this group or you need the view member email addresses permission to view the original message
to
I've created a new minimal compilable example of what I want to achieve
in production code. Can you guys take a look and see if I broke any
rules, created deadlock situations, did things right? I am new to
std::thread and std::condition_variable, not to mention C++11 vs C++98
in general.
One of my concerns in whether or not I should do anything in the
destructor of my ThreadedOnGoingTask and whether or not I've created any
potential deadlock by my use of std::condition_variable::wait_for with
shared variable g_stop.
Sorry, this code is long. Threading is complicated! This really is the
the most minimal compilable example I could come up with.
___
main.cpp
// Project Includes
#include "ThreadedOngoingTask.h"
// Shared Library
#include "Logger.h" // Replace with any threadsafe output
// Standard Library
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#include <string>
#include <thread>
#include <vector>
#include <signal.h>
//--------------------------------------------------------------------------------------------------
// TODO - Only global for concept testing, later we'll wrap all this in
a class
std::vector<ThreadedOngoingTask::Shared_Ptr> g_workerObjects;
// Tasks wrapped in a class that represent each child thread
std::mutex g_threadsExitedMutex; // For
locks to read/write the following two variables
std::vector<std::shared_ptr<bool> > g_threadsExited; // Bool
that gets set to true by a child thread immediatly before the child exits
std::condition_variable g_threadsExitedCV; //
Condition Variable that gets notified immediatly before a child thread exits
std::mutex g_stopMutex; // For
locks to read/write the following two variables
bool g_stop; // Flag
stops child threads
std::condition_variable g_stopCV; //
Condition Variable that gets notified when child threads should exit
//--------------------------------------------------------------------------------------------------
void SignalHandler(int signal)
{
// Signal worker thread to exit
const std::string msg("Halt signal caught. Exiting all threads...");
Shared::Logger::getInstance()->logExecutionEvent(msg, __FILE__,
__LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);
std::unique_lock<std::mutex> lock(g_stopMutex);
g_stop = true;
g_stopCV.notify_all();
}
//--------------------------------------------------------------------------------------------------
int main()
{
// Register a handler for the ctrl-z signal
signal(SIGINT, SignalHandler);
// Create the workers and start thier threads
for(size_t index = 0; index < 10; ++index)
{
std::shared_ptr<bool> threadExited(new bool(false));
ThreadedOngoingTask::Shared_Ptr worker(new
ThreadedOngoingTask(g_threadsExitedMutex,
threadExited,
g_threadsExitedCV,
g_stopMutex,
g_stop,
g_stopCV));
ThreadedOngoingTask::Start(*worker);
g_threadsExited.push_back(threadExited);
g_workerObjects.push_back(worker);
}
// Wait for all child threads to exit
std::unique_lock<std::mutex> lock(g_threadsExitedMutex);
while(std::any_of(g_threadsExited.begin(), g_threadsExited.end(),
[](std::shared_ptr<bool> threadExited){ return !(*threadExited); }))
{
g_threadsExitedCV.wait(lock);
}
// Clean up (even though these go away at exit..for debugging)
g_threadsExited.clear();
g_workerObjects.clear();
// Done
return 0;
}
____
ThreadedOngoingTask.h
#pragma once
// Standard Includes
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
//--------------------------------------------------------------------------------------------------
class ThreadedOngoingTask
{
public:
typedef std::shared_ptr<ThreadedOngoingTask> Shared_Ptr;
static void Start(ThreadedOngoingTask & ThreadedOngoingTask);
ThreadedOngoingTask(std::mutex & threadsExitedMutex,
std::shared_ptr<bool> threadExited,
std::condition_variable & threadsExitedCV,
std::mutex & stopMutex,
bool & stop,
std::condition_variable & stopCV);
ThreadedOngoingTask(const ThreadedOngoingTask & rhs) = delete;
ThreadedOngoingTask & operator = (const ThreadedOngoingTask & rhs)
= delete;
~ThreadedOngoingTask();
protected:
void ThreadProcedure();
static size_t m_count;
size_t m_id;
std::thread m_thread; // Worker thread
this object will run on
std::function<void()> m_threadProcedure; // Function
object that is the main loop for the thread
std::mutex & m_threadsExitedMutex; // Owned by
parent thread. For locks to read/write the following two variables
std::shared_ptr<bool> m_threadExited; // Owned by
parent thread. Bool that gets set to true by a child thread immediatly
before the child exits
std::condition_variable & m_threadsExitedCV; // Owned by
parent thread. Condition Variable that gets notified immediatly before a
child thread exits
std::mutex & m_stopMutex; // Owned by
parent thread. For locks to read/write the following two variables
bool & m_stop; // Owned by
parent thread. Flag stops child threads
std::condition_variable & m_stopCV; // Condition
Variable that gets notified when child threads should exit
};
___
ThreadedOngoingTask.cpp
#include "ThreadedOngoingTask.h"
// Shared Includes
#include "Logger.h"
// Standard Includes
#include <iostream>
//--------------------------------------------------------------------------------------------------
size_t ThreadedOngoingTask::m_count = 0;
//--------------------------------------------------------------------------------------------------
void ThreadedOngoingTask::Start(ThreadedOngoingTask & ThreadedOngoingTask)
{
// Start the thread
ThreadedOngoingTask.m_thread.swap(std::thread(ThreadedOngoingTask.m_threadProcedure));
ThreadedOngoingTask.m_thread.detach();
}
//--------------------------------------------------------------------------------------------------
ThreadedOngoingTask::ThreadedOngoingTask(std::mutex & threadsExitedMutex,
std::shared_ptr<bool>
threadExited,
std::condition_variable &
threadsExitedCV,
std::mutex & stopMutex,
bool & stop,
std::condition_variable & stopCV)
:
m_id (m_count++),
m_thread (),
// Derived classes should override this behavior by binding their
own thread procedure in their constructor
m_threadProcedure
(std::bind(&ThreadedOngoingTask::ThreadProcedure, this)),
m_threadsExitedMutex(threadsExitedMutex),
m_threadExited (threadExited),
m_threadsExitedCV (threadsExitedCV),
m_stopMutex (stopMutex),
m_stop (stop),
m_stopCV (stopCV)
{
std::ostringstream msg;
msg << "ThreadedOngoingTask #" << m_id << " constructor has been
called.";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);
}
//--------------------------------------------------------------------------------------------------
ThreadedOngoingTask::~ThreadedOngoingTask()
{
std::ostringstream msg;
msg << "ThreadedOngoingTask #" << m_id << " deconstructor has been
called.";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);
// Note - std::thread has no mechanism of killing the thread
// I suppose we are in UDB here if the object is destroyed
before the thread exits
// I don't know what I should do. I don't really want to
flag all threads to exit by playing with the m_stop
// Do I just hope we never get here before it exited and
that all exceptions are caught in ThreadProcedure?
}
//--------------------------------------------------------------------------------------------------
void ThreadedOngoingTask::ThreadProcedure()
{
std::ostringstream msg;
msg << "ThreadedOngoingTask #" << m_id << " thread procedure has
been called.";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);
// Do not allow exceptions to escape the thread
try
{
// Loop forever until signalled to exit
std::unique_lock<std::mutex> lock(m_stopMutex);
while(!m_stopCV.wait_for(lock, std::chrono::seconds(10),
[this](){return m_stop;}) )
{
std::ostringstream msg;
msg << "ThreadedOngoingTask #" << m_id << " tick...";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);
}
}
catch(...)
{
std::ostringstream msg;
msg << "An unhandled exception occured in thread of
ThreadedOngoingTask #" << m_id << ". Stopping thread.";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);
}
msg.str("");
msg.clear();
msg << "ThreadedOngoingTask #" << m_id << " stopped. Exiting thread.";
Shared::Logger::getInstance()->logExecutionEvent(msg.str(),
__FILE__, __LINE__, Shared::DateTime(),
Shared::Logger::LoggingLevel::LOG_LEVEL_DEBUG);
// Notify parent the thread has exited
std::unique_lock<std::mutex> lock(m_threadsExitedMutex);
*m_threadExited = true;
std::notify_all_at_thread_exit(m_threadsExitedCV, std::move(lock));
}
--
I have chosen to troll filter/ignore all subthreads containing the
words: "Rick C. Hodgins", "Flibble", and "Islam"
So, I won't be able to see or respond to any such messages
---