waveview: redesign thread exit strategy

The previous design had a race condition. When WaveViewThreads::stop_threads() was called, it would
first acquire the mutex, then set _quit, then call condition.broadcast(). But worker threads would
check _quit without holding the mutex. It was therefore for a thread to be delayed in its
own lock acquisition by the ::stop_threads() caller, then end up back in cond.wait() AFTER
the cond.broadcast() was done. Such a thread would sleep forever and never wake up.

This new design removes WaveViewDrawRequestQueue, which was a clean encapsulation of the
queueing aspects of WaveViewThreads, but unfortunately made correct mutex acquisition
and condition signalling/waiting needlessly complex. THe mutex, condition variable
and actual queue were moved into WaveViewThreads, and all worker threads execute a method
of the class which gives the appropriate code easy access to the mutex and condition var,
which must always be used together.
This commit is contained in:
Paul Davis
2021-05-10 21:40:31 -06:00
parent e64e140809
commit 166ac63924
3 changed files with 79 additions and 113 deletions

View File

@@ -254,79 +254,13 @@ WaveViewCache::set_image_cache_threshold (uint64_t sz)
/*-------------------------------------------------*/
WaveViewDrawRequest::WaveViewDrawRequest ()
{
g_atomic_int_set (&_stop, 0);
}
WaveViewDrawRequest::~WaveViewDrawRequest ()
{
}
void
WaveViewDrawRequestQueue::enqueue (boost::shared_ptr<WaveViewDrawRequest>& request)
{
Glib::Threads::Mutex::Lock lm (_queue_mutex);
_queue.push_back (request);
_cond.broadcast ();
}
void
WaveViewDrawRequestQueue::wake_up ()
{
boost::shared_ptr<WaveViewDrawRequest> null_ptr;
// hack!?...wake up the drawing thread
enqueue (null_ptr);
}
boost::shared_ptr<WaveViewDrawRequest>
WaveViewDrawRequestQueue::dequeue (bool block)
{
if (block) {
_queue_mutex.lock();
} else {
if (!_queue_mutex.trylock()) {
return boost::shared_ptr<WaveViewDrawRequest>();
}
}
// _queue_mutex is always held at this point
if (_queue.empty()) {
if (block) {
_cond.wait (_queue_mutex);
} else {
_queue_mutex.unlock();
return boost::shared_ptr<WaveViewDrawRequest>();
}
}
boost::shared_ptr<WaveViewDrawRequest> req;
if (!_queue.empty()) {
req = _queue.front ();
_queue.pop_front ();
} else {
// Queue empty, returning empty DrawRequest
}
_queue_mutex.unlock();
return req;
}
/*-------------------------------------------------*/
WaveViewThreads::WaveViewThreads ()
{
g_atomic_int_set (&_quit, 0);
}
WaveViewThreads::~WaveViewThreads ()
{
}
uint32_t WaveViewThreads::init_count = 0;
@@ -358,21 +292,44 @@ void
WaveViewThreads::enqueue_draw_request (boost::shared_ptr<WaveViewDrawRequest>& request)
{
assert (instance);
instance->_request_queue.enqueue (request);
instance->_enqueue_draw_request (request);
}
void
WaveViewThreads::_enqueue_draw_request (boost::shared_ptr<WaveViewDrawRequest>& request)
{
Glib::Threads::Mutex::Lock lm (_queue_mutex);
_queue.push_back (request);
/* wake one (random) thread */
_cond.signal ();
}
boost::shared_ptr<WaveViewDrawRequest>
WaveViewThreads::dequeue_draw_request ()
{
assert (instance);
return instance->_request_queue.dequeue (true);
return instance->_dequeue_draw_request ();
}
void
WaveViewThreads::wake_up ()
boost::shared_ptr<WaveViewDrawRequest>
WaveViewThreads::_dequeue_draw_request ()
{
assert (instance);
return instance->_request_queue.wake_up ();
/* _queue_mutex must be held at this point */
assert (!_queue_mutex.trylock());
if (_queue.empty()) {
_cond.wait (_queue_mutex);
}
boost::shared_ptr<WaveViewDrawRequest> req;
if (!_queue.empty()) {
req = _queue.front ();
_queue.pop_front ();
}
return req;
}
void
@@ -400,21 +357,42 @@ WaveViewThreads::stop_threads ()
{
assert (_threads.size());
{
Glib::Threads::Mutex::Lock lm (_queue_mutex);
g_atomic_int_set (&_quit, 1);
_cond.broadcast ();
}
/* Deleting the WaveViewThread objects will force them to join() with
* their underlying (p)threads, and thus cleanup. The threads will
* all be woken by the condition broadcast above.
*/
_threads.clear ();
}
/*-------------------------------------------------*/
WaveViewDrawRequest::WaveViewDrawRequest ()
{
g_atomic_int_set (&_stop, 0);
}
WaveViewDrawRequest::~WaveViewDrawRequest ()
{
}
/*-------------------------------------------------*/
WaveViewDrawingThread::WaveViewDrawingThread ()
: _thread(0)
{
g_atomic_int_set (&_quit, 0);
start ();
}
WaveViewDrawingThread::~WaveViewDrawingThread ()
{
quit ();
_thread->join ();
}
void
@@ -422,33 +400,35 @@ WaveViewDrawingThread::start ()
{
assert (!_thread);
_thread = Glib::Threads::Thread::create (sigc::mem_fun (*this, &WaveViewDrawingThread::run));
_thread = Glib::Threads::Thread::create (sigc::ptr_fun (&WaveViewThreads::thread_proc));
}
void
WaveViewDrawingThread::quit ()
WaveViewThreads::thread_proc ()
{
assert (_thread);
g_atomic_int_set (&_quit, 1);
WaveViewThreads::wake_up ();
_thread->join();
_thread = 0;
assert (instance);
instance->_thread_proc ();
}
void
WaveViewDrawingThread::run ()
WaveViewThreads::_thread_proc ()
{
pthread_set_name ("WaveViewDrawing");
while (true) {
_queue_mutex.lock ();
if (g_atomic_int_get (&_quit)) {
_queue_mutex.unlock ();
break;
}
// block until a request is available.
boost::shared_ptr<WaveViewDrawRequest> req = WaveViewThreads::dequeue_draw_request ();
_queue_mutex.unlock ();
if (req && !req->stopped()) {
try {
WaveView::process_draw_request (req);
@@ -456,10 +436,8 @@ WaveViewDrawingThread::run ()
/* just in case it was set before the exception, whatever it was */
req->image->cairo_image.clear ();
}
} else {
// null or stopped Request, processing skipped
}
}
}
}
} /* namespace */

View File

@@ -159,7 +159,7 @@ private:
private:
friend class WaveViewThreadClient;
friend class WaveViewDrawingThread;
friend class WaveViewThreads;
boost::shared_ptr<ARDOUR::AudioRegion> _region;

View File

@@ -287,26 +287,6 @@ private:
bool full () { return image_cache_size > _image_cache_threshold; }
};
class WaveViewDrawRequestQueue
{
public:
void enqueue (boost::shared_ptr<WaveViewDrawRequest>&);
// @return valid request or null if non-blocking or no request is available
boost::shared_ptr<WaveViewDrawRequest> dequeue (bool block);
void wake_up ();
private:
mutable Glib::Threads::Mutex _queue_mutex;
Glib::Threads::Cond _cond;
typedef std::deque<boost::shared_ptr<WaveViewDrawRequest> > DrawRequestQueueType;
DrawRequestQueueType _queue;
};
class WaveViewDrawingThread
{
public:
@@ -315,12 +295,10 @@ public:
private:
void start ();
void quit ();
void run ();
private:
Glib::Threads::Thread* _thread;
GATOMIC_QUAL gint _quit;
};
class WaveViewThreads {
@@ -339,10 +317,13 @@ public:
private:
friend class WaveViewDrawingThread;
static void wake_up ();
// will block until a request is available
static boost::shared_ptr<WaveViewDrawRequest> dequeue_draw_request ();
static void thread_proc ();
boost::shared_ptr<WaveViewDrawRequest> _dequeue_draw_request ();
void _enqueue_draw_request (boost::shared_ptr<WaveViewDrawRequest>&);
void _thread_proc ();
void start_threads ();
void stop_threads ();
@@ -354,8 +335,15 @@ private:
// TODO use std::unique_ptr when possible
typedef std::vector<boost::shared_ptr<WaveViewDrawingThread> > WaveViewThreadList;
GATOMIC_QUAL gint _quit;
WaveViewThreadList _threads;
WaveViewDrawRequestQueue _request_queue;
mutable Glib::Threads::Mutex _queue_mutex;
Glib::Threads::Cond _cond;
typedef std::deque<boost::shared_ptr<WaveViewDrawRequest> > DrawRequestQueueType;
DrawRequestQueueType _queue;
};