about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
authorEelco Dolstra <edolstra@gmail.com>2017-09-08T12·40+0200
committerEelco Dolstra <edolstra@gmail.com>2017-09-08T12·42+0200
commit8f6b347abd93706e36cf67fc4c53a46c7ad1ad49 (patch)
treee633fe5a3e5526c50c539480f108564efe22fbb5 /src
parenta2740c9ca23e748c3ab8ea61a135594a94c86aae (diff)
ThreadPool: Improve exception handling
In particular, process() won't return as long as there are active
items. This prevents work item lambdas from referring to stack frames
that no longer exist.
Diffstat (limited to 'src')
-rw-r--r--src/libutil/thread-pool.cc75
-rw-r--r--src/libutil/thread-pool.hh2
2 files changed, 53 insertions, 24 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
index 0a3a407240f7..f43dbe0c3715 100644
--- a/src/libutil/thread-pool.cc
+++ b/src/libutil/thread-pool.cc
@@ -46,11 +46,17 @@ void ThreadPool::enqueue(const work_t & t)
 
 void ThreadPool::process()
 {
+    /* Loop until there are no active work items *and* there either
+       are no queued items or there is an exception. The
+       post-condition is that no new items will become active. */
     while (true) {
         auto state(state_.lock());
-        if (state->exception)
-            std::rethrow_exception(state->exception);
-        if (state->left.empty() && !state->pending) break;
+        if (!state->active) {
+            if (state->exception)
+                std::rethrow_exception(state->exception);
+            if (state->left.empty())
+                break;
+        }
         state.wait(done);
     }
 }
@@ -58,41 +64,64 @@ void ThreadPool::process()
 void ThreadPool::workerEntry()
 {
     bool didWork = false;
+    std::exception_ptr exc;
 
     while (true) {
         work_t w;
         {
             auto state(state_.lock());
+
+            if (didWork) {
+                assert(state->active);
+                state->active--;
+
+                if (exc) {
+
+                    if (!state->exception) {
+                        state->exception = exc;
+                        // Tell the other workers to quit.
+                        state->quit = true;
+                        work.notify_all();
+                    } else {
+                        /* Print the exception, since we can't
+                           propagate it. */
+                        try {
+                            std::rethrow_exception(exc);
+                        } catch (std::exception & e) {
+                            if (!dynamic_cast<Interrupted*>(&e) &&
+                                !dynamic_cast<ThreadPoolShutDown*>(&e))
+                                ignoreException();
+                        } catch (...) {
+                        }
+                    }
+                }
+            }
+
+            /* Wait until a work item is available or another thread
+               had an exception or we're asked to quit. */
             while (true) {
-                if (state->quit || state->exception) return;
-                if (didWork) {
-                    assert(state->pending);
-                    state->pending--;
-                    didWork = false;
+                if (state->quit) {
+                    if (!state->active)
+                        done.notify_one();
+                    return;
                 }
                 if (!state->left.empty()) break;
-                if (!state->pending)
-                    done.notify_all();
+                if (!state->active) {
+                    done.notify_one();
+                    return;
+                }
                 state.wait(work);
             }
-            w = state->left.front();
+
+            w = std::move(state->left.front());
             state->left.pop();
-            state->pending++;
+            state->active++;
         }
 
         try {
             w();
-        } catch (std::exception & e) {
-            auto state(state_.lock());
-            if (state->exception) {
-                if (!dynamic_cast<Interrupted*>(&e) &&
-                    !dynamic_cast<ThreadPoolShutDown*>(&e))
-                    printError(format("error: %s") % e.what());
-            } else {
-                state->exception = std::current_exception();
-                work.notify_all();
-                done.notify_all();
-            }
+        } catch (...) {
+            exc = std::current_exception();
         }
 
         didWork = true;
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
index 361a9d33a732..835dfb4b83a6 100644
--- a/src/libutil/thread-pool.hh
+++ b/src/libutil/thread-pool.hh
@@ -44,7 +44,7 @@ private:
     struct State
     {
         std::queue<work_t> left;
-        size_t pending = 0;
+        size_t active = 0;
         std::exception_ptr exception;
         std::vector<std::thread> workers;
         bool quit = false;