[Interest] Best way to terminate threads using QWaitCondition

Bob Hood bhood2 at comcast.net
Sat Nov 21 21:12:03 CET 2015


I wanted some advice from the brain trust on the best means of terminating 
threads that are synchronizing using QMutex/QWaitCondition.  In my working 
code, I'm getting error messages that look like:

     QWaitCondition: cv destroy failure: Device or resource busy
     QWaitCondition: mutex destroy failure: Device or resource busy

when I terminate the program, so I'm guessing there's probably a better way 
that I'm missing.  The following code demonstrates the mechanism I'm using to 
signal the threads to stop.  I should note that this demonstration code 
doesn't give me the above errors, even though it is stopping the threads in 
the same fashion as my production code:

--------------------------
test.h header:
--------------------------

     #include <atomic>

     #include <QMutex>
     #include <QWaitCondition>
     #include <QSharedPointer>
     #include <QWeakPointer>

     struct ThreadData
     {
         std::atomic<bool>    sending;
     };

     typedef QSharedPointer<ThreadData> ThreadDataPointer;
     typedef QWeakPointer<ThreadData> ThreadDataWeakPointer;

     class Consumer : public QObject
     {
         Q_OBJECT

     public:
         Consumer(ThreadDataWeakPointer thread_data,
                 QWaitCondition& wait,
                 QMutex& mutex,
                 QObject* parent = nullptr);

     signals:
         void        signal_finished();

     public slots:
         void        slot_thread();

     protected:
         ThreadDataWeakPointer    thread_data;
         QWaitCondition&            consumer_wait;
         QMutex&                    consumer_mutex;
     };

     class Producer : public QObject
     {
         Q_OBJECT

     public:
         Producer(ThreadDataWeakPointer thread_data, QWaitCondition& wait, 
QObject* parent = nullptr);

     public slots:
         void        slot_thread();

     signals:
         void        signal_finished();

     protected:
         ThreadDataWeakPointer    thread_data;
         QWaitCondition&            consumer_wait;
     };

--------------------------
test.cpp module:
--------------------------

     #include <iostream>
     #include <chrono>
     #include <thread>

     #include <signal.h>

     #include <QCoreApplication>
     #include <QThread>
     #include <QTimer>

     #include "test.h"

//-------------------------------------------------------------------
     // Consumer class methods

     Consumer::Consumer(ThreadDataWeakPointer data,
                         QWaitCondition& wait,
                         QMutex& mutex,
                         QObject* parent)
         : QObject(parent),
         thread_data(data),
         consumer_wait(wait),
         consumer_mutex(mutex)
     {
     }

     void Consumer::slot_thread()
     {
         while(true)
         {
             // wait for the producer to send us data
             consumer_mutex.lock();
             consumer_wait.wait(&consumer_mutex);

             // invalid thread data is a signal to us to terminate

             if(thread_data.isNull())
                 break;

             ThreadDataPointer n_dp;
             n_dp = thread_data;
             if(!n_dp)
                 break;

             // do something for a few milliseconds
std::this_thread::sleep_for(std::chrono::milliseconds(10));

             n_dp->sending = false;
             consumer_mutex.unlock();
         }

         std::cout << "Terminating Consumer\n";

         consumer_mutex.unlock();    // release the mutex
         emit signal_finished();
     }

//-------------------------------------------------------------------
     // Producer class methods

     Producer::Producer(ThreadDataWeakPointer data, QWaitCondition& wait, 
QObject* parent)
         : QObject(parent),
         thread_data(data),
         consumer_wait(wait)
     {
     }

     void Producer::slot_thread()
     {
         while(true)
         {
             if(thread_data.isNull())
                 break;

             ThreadDataPointer n_dp;
             n_dp = thread_data;
             if(!n_dp)
                 break;

             // thread_data remains valid as long as we hold n_dp

             // wake the consumer thread
             n_dp->sending = true;
             consumer_wait.wakeAll();

             // wait for the consumer to consume our output
             while(n_dp->sending)
std::this_thread::sleep_for(std::chrono::milliseconds(1));
         }

         std::cout << "Terminating Producer\n";

         consumer_wait.wakeAll();    // wake consumer so it can detect bad 
thread data and terminate
         emit signal_finished();
     }

     ThreadDataPointer thread_data;
     QMutex            consumer_mutex;
     QWaitCondition    consumer_wait;

     QThread*  producer_thread;
     Producer* producer_worker;
     QThread*  consumer_thread;
     Consumer* consumer_worker;

     void signal_handler(int /*sig*/)
     {
         thread_data.clear();        // signal threads to terminate
         QTimer::singleShot(100, qApp, &QCoreApplication::quit);
     }

     int main(int argc, char* argv[])
     {
         QCoreApplication app(argc, argv);

         for(int sig : {SIGQUIT, SIGINT, SIGTERM, SIGHUP})   // C++11
             signal(sig, signal_handler);

         thread_data = ThreadDataPointer(new ThreadData);
         thread_data->sending = false;

         // create the producer thread

         QThread* producer_thread = new QThread;
         Producer* producer_worker = new Producer(thread_data.toWeakRef(), 
consumer_wait);

         producer_worker->moveToThread(producer_thread);

         QObject::connect(producer_thread, &QThread::started, producer_worker, 
&Producer::slot_thread);
         QObject::connect(producer_worker, &Producer::signal_finished, 
producer_thread, &QThread::quit);
         QObject::connect(producer_worker, &Producer::signal_finished, 
producer_worker, &Producer::deleteLater);
         QObject::connect(producer_thread, &QThread::finished, 
producer_thread, &QThread::deleteLater);

         // create the consumer thread

         QThread* consumer_thread = new QThread;
         consumer_worker = new Consumer(thread_data.toWeakRef(), 
consumer_wait, consumer_mutex);

         consumer_worker->moveToThread(consumer_thread);

         QObject::connect(consumer_thread, &QThread::started, consumer_worker, 
&Consumer::slot_thread);
         QObject::connect(consumer_worker, &Consumer::signal_finished, 
consumer_thread, &QThread::quit);
         QObject::connect(consumer_worker, &Consumer::signal_finished, 
consumer_worker, &Consumer::deleteLater);
         QObject::connect(consumer_thread, &QThread::finished, 
consumer_thread, &QThread::deleteLater);

         consumer_thread->start();
         producer_thread->start();

         return app.exec();
     }

Thanks for any insights.




More information about the Interest mailing list