diff options
author | Eelco Dolstra <edolstra@gmail.com> | 2017-09-08T13·31+0200 |
---|---|---|
committer | Eelco Dolstra <edolstra@gmail.com> | 2017-09-08T13·31+0200 |
commit | b7376edf06e3836394503e16ec14af23c7615f22 (patch) | |
tree | 1ba2185c659f832628580e98073792e02c13be00 /src/libutil | |
parent | 6a888ec29a6724f916f96508b3a94a86c643c18e (diff) |
ThreadPool: On exception, interrupt the other worker threads
Diffstat (limited to 'src/libutil')
-rw-r--r-- | src/libutil/thread-pool.cc | 10 | ||||
-rw-r--r-- | src/libutil/thread-pool.hh | 4 | ||||
-rw-r--r-- | src/libutil/util.cc | 2 | ||||
-rw-r--r-- | src/libutil/util.hh | 5 |
4 files changed, 14 insertions, 7 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc index f43dbe0c3715..ce126d36db8e 100644 --- a/src/libutil/thread-pool.cc +++ b/src/libutil/thread-pool.cc @@ -21,7 +21,7 @@ ThreadPool::~ThreadPool() std::vector<std::thread> workers; { auto state(state_.lock()); - state->quit = true; + quit = true; std::swap(workers, state->workers); } @@ -36,7 +36,7 @@ ThreadPool::~ThreadPool() void ThreadPool::enqueue(const work_t & t) { auto state(state_.lock()); - if (state->quit) + if (quit) throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down"); state->left.push(t); if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads) @@ -63,6 +63,8 @@ void ThreadPool::process() void ThreadPool::workerEntry() { + interruptCheck = [&]() { return (bool) quit; }; + bool didWork = false; std::exception_ptr exc; @@ -80,7 +82,7 @@ void ThreadPool::workerEntry() if (!state->exception) { state->exception = exc; // Tell the other workers to quit. - state->quit = true; + quit = true; work.notify_all(); } else { /* Print the exception, since we can't @@ -100,7 +102,7 @@ void ThreadPool::workerEntry() /* Wait until a work item is available or another thread had an exception or we're asked to quit. */ while (true) { - if (state->quit) { + if (quit) { if (!state->active) done.notify_one(); return; diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh index 835dfb4b83a6..06a097ab5ea7 100644 --- a/src/libutil/thread-pool.hh +++ b/src/libutil/thread-pool.hh @@ -7,6 +7,7 @@ #include <functional> #include <thread> #include <map> +#include <atomic> namespace nix { @@ -47,9 +48,10 @@ private: size_t active = 0; std::exception_ptr exception; std::vector<std::thread> workers; - bool quit = false; }; + std::atomic_bool quit{false}; + Sync<State> state_; std::condition_variable work, done; diff --git a/src/libutil/util.cc b/src/libutil/util.cc index 605e89ce27ff..3c98a61f9e50 100644 --- a/src/libutil/util.cc +++ b/src/libutil/util.cc @@ -1002,6 +1002,7 @@ void closeOnExec(int fd) bool _isInterrupted = false; static thread_local bool interruptThrown = false; +thread_local std::function<bool()> interruptCheck; void setInterruptThrown() { @@ -1020,7 +1021,6 @@ void _interrupted() } - ////////////////////////////////////////////////////////////////////// diff --git a/src/libutil/util.hh b/src/libutil/util.hh index 30e3c0df1c1a..6a66576e96ce 100644 --- a/src/libutil/util.hh +++ b/src/libutil/util.hh @@ -273,13 +273,16 @@ void closeOnExec(int fd); extern bool _isInterrupted; +extern thread_local std::function<bool()> interruptCheck; + void setInterruptThrown(); void _interrupted(); void inline checkInterrupt() { - if (_isInterrupted) _interrupted(); + if (_isInterrupted || (interruptCheck && interruptCheck())) + _interrupted(); } MakeError(Interrupted, BaseError) |