[Interest] Best way to terminate threads using QWaitCondition
Bob Hood
bhood2 at comcast.net
Sat Nov 21 21:12:03 CET 2015
I wanted some advice from the brain trust on the best means of terminating
threads that are synchronizing using QMutex/QWaitCondition. In my working
code, I'm getting error messages that look like:
QWaitCondition: cv destroy failure: Device or resource busy
QWaitCondition: mutex destroy failure: Device or resource busy
when I terminate the program, so I'm guessing there's probably a better way
that I'm missing. The following code demonstrates the mechanism I'm using to
signal the threads to stop. I should note that this demonstration code
doesn't give me the above errors, even though it is stopping the threads in
the same fashion as my production code:
--------------------------
test.h header:
--------------------------
#include <atomic>
#include <QMutex>
#include <QWaitCondition>
#include <QSharedPointer>
#include <QWeakPointer>
struct ThreadData
{
std::atomic<bool> sending;
};
typedef QSharedPointer<ThreadData> ThreadDataPointer;
typedef QWeakPointer<ThreadData> ThreadDataWeakPointer;
class Consumer : public QObject
{
Q_OBJECT
public:
Consumer(ThreadDataWeakPointer thread_data,
QWaitCondition& wait,
QMutex& mutex,
QObject* parent = nullptr);
signals:
void signal_finished();
public slots:
void slot_thread();
protected:
ThreadDataWeakPointer thread_data;
QWaitCondition& consumer_wait;
QMutex& consumer_mutex;
};
class Producer : public QObject
{
Q_OBJECT
public:
Producer(ThreadDataWeakPointer thread_data, QWaitCondition& wait,
QObject* parent = nullptr);
public slots:
void slot_thread();
signals:
void signal_finished();
protected:
ThreadDataWeakPointer thread_data;
QWaitCondition& consumer_wait;
};
--------------------------
test.cpp module:
--------------------------
#include <iostream>
#include <chrono>
#include <thread>
#include <signal.h>
#include <QCoreApplication>
#include <QThread>
#include <QTimer>
#include "test.h"
//-------------------------------------------------------------------
// Consumer class methods
Consumer::Consumer(ThreadDataWeakPointer data,
QWaitCondition& wait,
QMutex& mutex,
QObject* parent)
: QObject(parent),
thread_data(data),
consumer_wait(wait),
consumer_mutex(mutex)
{
}
void Consumer::slot_thread()
{
while(true)
{
// wait for the producer to send us data
consumer_mutex.lock();
consumer_wait.wait(&consumer_mutex);
// invalid thread data is a signal to us to terminate
if(thread_data.isNull())
break;
ThreadDataPointer n_dp;
n_dp = thread_data;
if(!n_dp)
break;
// do something for a few milliseconds
std::this_thread::sleep_for(std::chrono::milliseconds(10));
n_dp->sending = false;
consumer_mutex.unlock();
}
std::cout << "Terminating Consumer\n";
consumer_mutex.unlock(); // release the mutex
emit signal_finished();
}
//-------------------------------------------------------------------
// Producer class methods
Producer::Producer(ThreadDataWeakPointer data, QWaitCondition& wait,
QObject* parent)
: QObject(parent),
thread_data(data),
consumer_wait(wait)
{
}
void Producer::slot_thread()
{
while(true)
{
if(thread_data.isNull())
break;
ThreadDataPointer n_dp;
n_dp = thread_data;
if(!n_dp)
break;
// thread_data remains valid as long as we hold n_dp
// wake the consumer thread
n_dp->sending = true;
consumer_wait.wakeAll();
// wait for the consumer to consume our output
while(n_dp->sending)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::cout << "Terminating Producer\n";
consumer_wait.wakeAll(); // wake consumer so it can detect bad
thread data and terminate
emit signal_finished();
}
ThreadDataPointer thread_data;
QMutex consumer_mutex;
QWaitCondition consumer_wait;
QThread* producer_thread;
Producer* producer_worker;
QThread* consumer_thread;
Consumer* consumer_worker;
void signal_handler(int /*sig*/)
{
thread_data.clear(); // signal threads to terminate
QTimer::singleShot(100, qApp, &QCoreApplication::quit);
}
int main(int argc, char* argv[])
{
QCoreApplication app(argc, argv);
for(int sig : {SIGQUIT, SIGINT, SIGTERM, SIGHUP}) // C++11
signal(sig, signal_handler);
thread_data = ThreadDataPointer(new ThreadData);
thread_data->sending = false;
// create the producer thread
QThread* producer_thread = new QThread;
Producer* producer_worker = new Producer(thread_data.toWeakRef(),
consumer_wait);
producer_worker->moveToThread(producer_thread);
QObject::connect(producer_thread, &QThread::started, producer_worker,
&Producer::slot_thread);
QObject::connect(producer_worker, &Producer::signal_finished,
producer_thread, &QThread::quit);
QObject::connect(producer_worker, &Producer::signal_finished,
producer_worker, &Producer::deleteLater);
QObject::connect(producer_thread, &QThread::finished,
producer_thread, &QThread::deleteLater);
// create the consumer thread
QThread* consumer_thread = new QThread;
consumer_worker = new Consumer(thread_data.toWeakRef(),
consumer_wait, consumer_mutex);
consumer_worker->moveToThread(consumer_thread);
QObject::connect(consumer_thread, &QThread::started, consumer_worker,
&Consumer::slot_thread);
QObject::connect(consumer_worker, &Consumer::signal_finished,
consumer_thread, &QThread::quit);
QObject::connect(consumer_worker, &Consumer::signal_finished,
consumer_worker, &Consumer::deleteLater);
QObject::connect(consumer_thread, &QThread::finished,
consumer_thread, &QThread::deleteLater);
consumer_thread->start();
producer_thread->start();
return app.exec();
}
Thanks for any insights.
More information about the Interest
mailing list