From 166ac63924c2b0d710759176788d91ca83bd3a06 Mon Sep 17 00:00:00 2001 From: Paul Davis Date: Mon, 10 May 2021 21:40:31 -0600 Subject: [PATCH] waveview: redesign thread exit strategy The previous design had a race condition. When WaveViewThreads::stop_threads() was called, it would first acquire the mutex, then set _quit, then call condition.broadcast(). But worker threads would check _quit without holding the mutex. It was therefore for a thread to be delayed in its own lock acquisition by the ::stop_threads() caller, then end up back in cond.wait() AFTER the cond.broadcast() was done. Such a thread would sleep forever and never wake up. This new design removes WaveViewDrawRequestQueue, which was a clean encapsulation of the queueing aspects of WaveViewThreads, but unfortunately made correct mutex acquisition and condition signalling/waiting needlessly complex. THe mutex, condition variable and actual queue were moved into WaveViewThreads, and all worker threads execute a method of the class which gives the appropriate code easy access to the mutex and condition var, which must always be used together. --- libs/waveview/wave_view_private.cc | 152 +++++++++------------ libs/waveview/waveview/wave_view.h | 2 +- libs/waveview/waveview/wave_view_private.h | 38 ++---- 3 files changed, 79 insertions(+), 113 deletions(-) diff --git a/libs/waveview/wave_view_private.cc b/libs/waveview/wave_view_private.cc index 77a4ae8ba6..e853384dee 100644 --- a/libs/waveview/wave_view_private.cc +++ b/libs/waveview/wave_view_private.cc @@ -254,79 +254,13 @@ WaveViewCache::set_image_cache_threshold (uint64_t sz) /*-------------------------------------------------*/ -WaveViewDrawRequest::WaveViewDrawRequest () -{ - g_atomic_int_set (&_stop, 0); -} - -WaveViewDrawRequest::~WaveViewDrawRequest () -{ - -} - -void -WaveViewDrawRequestQueue::enqueue (boost::shared_ptr& request) -{ - Glib::Threads::Mutex::Lock lm (_queue_mutex); - - _queue.push_back (request); - _cond.broadcast (); -} - -void -WaveViewDrawRequestQueue::wake_up () -{ - boost::shared_ptr null_ptr; - // hack!?...wake up the drawing thread - enqueue (null_ptr); -} - -boost::shared_ptr -WaveViewDrawRequestQueue::dequeue (bool block) -{ - if (block) { - _queue_mutex.lock(); - } else { - if (!_queue_mutex.trylock()) { - return boost::shared_ptr(); - } - } - - // _queue_mutex is always held at this point - - if (_queue.empty()) { - if (block) { - _cond.wait (_queue_mutex); - } else { - _queue_mutex.unlock(); - return boost::shared_ptr(); - } - } - - boost::shared_ptr req; - - if (!_queue.empty()) { - req = _queue.front (); - _queue.pop_front (); - } else { - // Queue empty, returning empty DrawRequest - } - - _queue_mutex.unlock(); - - return req; -} - -/*-------------------------------------------------*/ - WaveViewThreads::WaveViewThreads () { - + g_atomic_int_set (&_quit, 0); } WaveViewThreads::~WaveViewThreads () { - } uint32_t WaveViewThreads::init_count = 0; @@ -358,21 +292,44 @@ void WaveViewThreads::enqueue_draw_request (boost::shared_ptr& request) { assert (instance); - instance->_request_queue.enqueue (request); + instance->_enqueue_draw_request (request); +} + +void +WaveViewThreads::_enqueue_draw_request (boost::shared_ptr& request) +{ + Glib::Threads::Mutex::Lock lm (_queue_mutex); + _queue.push_back (request); + /* wake one (random) thread */ + _cond.signal (); } boost::shared_ptr WaveViewThreads::dequeue_draw_request () { assert (instance); - return instance->_request_queue.dequeue (true); + return instance->_dequeue_draw_request (); } -void -WaveViewThreads::wake_up () +boost::shared_ptr +WaveViewThreads::_dequeue_draw_request () { - assert (instance); - return instance->_request_queue.wake_up (); + /* _queue_mutex must be held at this point */ + + assert (!_queue_mutex.trylock()); + + if (_queue.empty()) { + _cond.wait (_queue_mutex); + } + + boost::shared_ptr req; + + if (!_queue.empty()) { + req = _queue.front (); + _queue.pop_front (); + } + + return req; } void @@ -400,21 +357,42 @@ WaveViewThreads::stop_threads () { assert (_threads.size()); + { + Glib::Threads::Mutex::Lock lm (_queue_mutex); + g_atomic_int_set (&_quit, 1); + _cond.broadcast (); + } + + /* Deleting the WaveViewThread objects will force them to join() with + * their underlying (p)threads, and thus cleanup. The threads will + * all be woken by the condition broadcast above. + */ + _threads.clear (); } +/*-------------------------------------------------*/ +WaveViewDrawRequest::WaveViewDrawRequest () +{ + g_atomic_int_set (&_stop, 0); +} + +WaveViewDrawRequest::~WaveViewDrawRequest () +{ + +} + /*-------------------------------------------------*/ WaveViewDrawingThread::WaveViewDrawingThread () : _thread(0) { - g_atomic_int_set (&_quit, 0); start (); } WaveViewDrawingThread::~WaveViewDrawingThread () { - quit (); + _thread->join (); } void @@ -422,33 +400,35 @@ WaveViewDrawingThread::start () { assert (!_thread); - _thread = Glib::Threads::Thread::create (sigc::mem_fun (*this, &WaveViewDrawingThread::run)); + _thread = Glib::Threads::Thread::create (sigc::ptr_fun (&WaveViewThreads::thread_proc)); } void -WaveViewDrawingThread::quit () +WaveViewThreads::thread_proc () { - assert (_thread); - - g_atomic_int_set (&_quit, 1); - WaveViewThreads::wake_up (); - _thread->join(); - _thread = 0; + assert (instance); + instance->_thread_proc (); } void -WaveViewDrawingThread::run () +WaveViewThreads::_thread_proc () { pthread_set_name ("WaveViewDrawing"); + while (true) { + _queue_mutex.lock (); + if (g_atomic_int_get (&_quit)) { + _queue_mutex.unlock (); break; } // block until a request is available. boost::shared_ptr req = WaveViewThreads::dequeue_draw_request (); + _queue_mutex.unlock (); + if (req && !req->stopped()) { try { WaveView::process_draw_request (req); @@ -456,10 +436,8 @@ WaveViewDrawingThread::run () /* just in case it was set before the exception, whatever it was */ req->image->cairo_image.clear (); } - } else { - // null or stopped Request, processing skipped } } } -} +} /* namespace */ diff --git a/libs/waveview/waveview/wave_view.h b/libs/waveview/waveview/wave_view.h index bca9007c3a..42181902d9 100644 --- a/libs/waveview/waveview/wave_view.h +++ b/libs/waveview/waveview/wave_view.h @@ -159,7 +159,7 @@ private: private: friend class WaveViewThreadClient; - friend class WaveViewDrawingThread; + friend class WaveViewThreads; boost::shared_ptr _region; diff --git a/libs/waveview/waveview/wave_view_private.h b/libs/waveview/waveview/wave_view_private.h index 0b9336ff99..38db71807f 100644 --- a/libs/waveview/waveview/wave_view_private.h +++ b/libs/waveview/waveview/wave_view_private.h @@ -287,26 +287,6 @@ private: bool full () { return image_cache_size > _image_cache_threshold; } }; -class WaveViewDrawRequestQueue -{ -public: - - void enqueue (boost::shared_ptr&); - - // @return valid request or null if non-blocking or no request is available - boost::shared_ptr dequeue (bool block); - - void wake_up (); - -private: - - mutable Glib::Threads::Mutex _queue_mutex; - Glib::Threads::Cond _cond; - - typedef std::deque > DrawRequestQueueType; - DrawRequestQueueType _queue; -}; - class WaveViewDrawingThread { public: @@ -315,12 +295,10 @@ public: private: void start (); - void quit (); void run (); private: Glib::Threads::Thread* _thread; - GATOMIC_QUAL gint _quit; }; class WaveViewThreads { @@ -339,10 +317,13 @@ public: private: friend class WaveViewDrawingThread; - static void wake_up (); - // will block until a request is available static boost::shared_ptr dequeue_draw_request (); + static void thread_proc (); + + boost::shared_ptr _dequeue_draw_request (); + void _enqueue_draw_request (boost::shared_ptr&); + void _thread_proc (); void start_threads (); void stop_threads (); @@ -354,8 +335,15 @@ private: // TODO use std::unique_ptr when possible typedef std::vector > WaveViewThreadList; + GATOMIC_QUAL gint _quit; WaveViewThreadList _threads; - WaveViewDrawRequestQueue _request_queue; + + + mutable Glib::Threads::Mutex _queue_mutex; + Glib::Threads::Cond _cond; + + typedef std::deque > DrawRequestQueueType; + DrawRequestQueueType _queue; };