about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2017-09-08T13·31+0200
committerEelco Dolstra <edolstra@gmail.com>2017-09-08T13·31+0200
commitb7376edf06e3836394503e16ec14af23c7615f22 (patch)
tree1ba2185c659f832628580e98073792e02c13be00
parent6a888ec29a6724f916f96508b3a94a86c643c18e (diff)
ThreadPool: On exception, interrupt the other worker threads
-rw-r--r--src/libutil/thread-pool.cc10
-rw-r--r--src/libutil/thread-pool.hh4
-rw-r--r--src/libutil/util.cc2
-rw-r--r--src/libutil/util.hh5
4 files changed, 14 insertions, 7 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
index f43dbe0c37..ce126d36db 100644
--- a/src/libutil/thread-pool.cc
+++ b/src/libutil/thread-pool.cc
@@ -21,7 +21,7 @@ ThreadPool::~ThreadPool()
     std::vector<std::thread> workers;
     {
         auto state(state_.lock());
-        state->quit = true;
+        quit = true;
         std::swap(workers, state->workers);
     }
 
@@ -36,7 +36,7 @@ ThreadPool::~ThreadPool()
 void ThreadPool::enqueue(const work_t & t)
 {
     auto state(state_.lock());
-    if (state->quit)
+    if (quit)
         throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
     state->left.push(t);
     if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
@@ -63,6 +63,8 @@ void ThreadPool::process()
 
 void ThreadPool::workerEntry()
 {
+    interruptCheck = [&]() { return (bool) quit; };
+
     bool didWork = false;
     std::exception_ptr exc;
 
@@ -80,7 +82,7 @@ void ThreadPool::workerEntry()
                     if (!state->exception) {
                         state->exception = exc;
                         // Tell the other workers to quit.
-                        state->quit = true;
+                        quit = true;
                         work.notify_all();
                     } else {
                         /* Print the exception, since we can't
@@ -100,7 +102,7 @@ void ThreadPool::workerEntry()
             /* Wait until a work item is available or another thread
                had an exception or we're asked to quit. */
             while (true) {
-                if (state->quit) {
+                if (quit) {
                     if (!state->active)
                         done.notify_one();
                     return;
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
index 835dfb4b83..06a097ab5e 100644
--- a/src/libutil/thread-pool.hh
+++ b/src/libutil/thread-pool.hh
@@ -7,6 +7,7 @@
 #include <functional>
 #include <thread>
 #include <map>
+#include <atomic>
 
 namespace nix {
 
@@ -47,9 +48,10 @@ private:
         size_t active = 0;
         std::exception_ptr exception;
         std::vector<std::thread> workers;
-        bool quit = false;
     };
 
+    std::atomic_bool quit{false};
+
     Sync<State> state_;
 
     std::condition_variable work, done;
diff --git a/src/libutil/util.cc b/src/libutil/util.cc
index 605e89ce27..3c98a61f9e 100644
--- a/src/libutil/util.cc
+++ b/src/libutil/util.cc
@@ -1002,6 +1002,7 @@ void closeOnExec(int fd)
 bool _isInterrupted = false;
 
 static thread_local bool interruptThrown = false;
+thread_local std::function<bool()> interruptCheck;
 
 void setInterruptThrown()
 {
@@ -1020,7 +1021,6 @@ void _interrupted()
 }
 
 
-
 //////////////////////////////////////////////////////////////////////
 
 
diff --git a/src/libutil/util.hh b/src/libutil/util.hh
index 30e3c0df1c..6a66576e96 100644
--- a/src/libutil/util.hh
+++ b/src/libutil/util.hh
@@ -273,13 +273,16 @@ void closeOnExec(int fd);
 
 extern bool _isInterrupted;
 
+extern thread_local std::function<bool()> interruptCheck;
+
 void setInterruptThrown();
 
 void _interrupted();
 
 void inline checkInterrupt()
 {
-    if (_isInterrupted) _interrupted();
+    if (_isInterrupted || (interruptCheck && interruptCheck()))
+        _interrupted();
 }
 
 MakeError(Interrupted, BaseError)