From b7376edf06e3836394503e16ec14af23c7615f22 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Fri, 8 Sep 2017 15:31:24 +0200 Subject: ThreadPool: On exception, interrupt the other worker threads --- src/libutil/thread-pool.cc | 10 ++++++---- src/libutil/thread-pool.hh | 4 +++- src/libutil/util.cc | 2 +- src/libutil/util.hh | 5 ++++- 4 files changed, 14 insertions(+), 7 deletions(-) (limited to 'src/libutil') diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc index f43dbe0c3715..ce126d36db8e 100644 --- a/src/libutil/thread-pool.cc +++ b/src/libutil/thread-pool.cc @@ -21,7 +21,7 @@ ThreadPool::~ThreadPool() std::vector workers; { auto state(state_.lock()); - state->quit = true; + quit = true; std::swap(workers, state->workers); } @@ -36,7 +36,7 @@ ThreadPool::~ThreadPool() void ThreadPool::enqueue(const work_t & t) { auto state(state_.lock()); - if (state->quit) + if (quit) throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down"); state->left.push(t); if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads) @@ -63,6 +63,8 @@ void ThreadPool::process() void ThreadPool::workerEntry() { + interruptCheck = [&]() { return (bool) quit; }; + bool didWork = false; std::exception_ptr exc; @@ -80,7 +82,7 @@ void ThreadPool::workerEntry() if (!state->exception) { state->exception = exc; // Tell the other workers to quit. - state->quit = true; + quit = true; work.notify_all(); } else { /* Print the exception, since we can't @@ -100,7 +102,7 @@ void ThreadPool::workerEntry() /* Wait until a work item is available or another thread had an exception or we're asked to quit. */ while (true) { - if (state->quit) { + if (quit) { if (!state->active) done.notify_one(); return; diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh index 835dfb4b83a6..06a097ab5ea7 100644 --- a/src/libutil/thread-pool.hh +++ b/src/libutil/thread-pool.hh @@ -7,6 +7,7 @@ #include #include #include +#include namespace nix { @@ -47,9 +48,10 @@ private: size_t active = 0; std::exception_ptr exception; std::vector workers; - bool quit = false; }; + std::atomic_bool quit{false}; + Sync state_; std::condition_variable work, done; diff --git a/src/libutil/util.cc b/src/libutil/util.cc index 605e89ce27ff..3c98a61f9e50 100644 --- a/src/libutil/util.cc +++ b/src/libutil/util.cc @@ -1002,6 +1002,7 @@ void closeOnExec(int fd) bool _isInterrupted = false; static thread_local bool interruptThrown = false; +thread_local std::function interruptCheck; void setInterruptThrown() { @@ -1020,7 +1021,6 @@ void _interrupted() } - ////////////////////////////////////////////////////////////////////// diff --git a/src/libutil/util.hh b/src/libutil/util.hh index 30e3c0df1c1a..6a66576e96ce 100644 --- a/src/libutil/util.hh +++ b/src/libutil/util.hh @@ -273,13 +273,16 @@ void closeOnExec(int fd); extern bool _isInterrupted; +extern thread_local std::function interruptCheck; + void setInterruptThrown(); void _interrupted(); void inline checkInterrupt() { - if (_isInterrupted) _interrupted(); + if (_isInterrupted || (interruptCheck && interruptCheck())) + _interrupted(); } MakeError(Interrupted, BaseError) -- cgit 1.4.1