about summary refs log tree commit diff
path: root/src/libutil/thread-pool.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil/thread-pool.cc')
-rw-r--r--src/libutil/thread-pool.cc75
1 files changed, 52 insertions, 23 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
index 0a3a407240f7..f43dbe0c3715 100644
--- a/src/libutil/thread-pool.cc
+++ b/src/libutil/thread-pool.cc
@@ -46,11 +46,17 @@ void ThreadPool::enqueue(const work_t & t)
 
 void ThreadPool::process()
 {
+    /* Loop until there are no active work items *and* there either
+       are no queued items or there is an exception. The
+       post-condition is that no new items will become active. */
     while (true) {
         auto state(state_.lock());
-        if (state->exception)
-            std::rethrow_exception(state->exception);
-        if (state->left.empty() && !state->pending) break;
+        if (!state->active) {
+            if (state->exception)
+                std::rethrow_exception(state->exception);
+            if (state->left.empty())
+                break;
+        }
         state.wait(done);
     }
 }
@@ -58,41 +64,64 @@ void ThreadPool::process()
 void ThreadPool::workerEntry()
 {
     bool didWork = false;
+    std::exception_ptr exc;
 
     while (true) {
         work_t w;
         {
             auto state(state_.lock());
+
+            if (didWork) {
+                assert(state->active);
+                state->active--;
+
+                if (exc) {
+
+                    if (!state->exception) {
+                        state->exception = exc;
+                        // Tell the other workers to quit.
+                        state->quit = true;
+                        work.notify_all();
+                    } else {
+                        /* Print the exception, since we can't
+                           propagate it. */
+                        try {
+                            std::rethrow_exception(exc);
+                        } catch (std::exception & e) {
+                            if (!dynamic_cast<Interrupted*>(&e) &&
+                                !dynamic_cast<ThreadPoolShutDown*>(&e))
+                                ignoreException();
+                        } catch (...) {
+                        }
+                    }
+                }
+            }
+
+            /* Wait until a work item is available or another thread
+               had an exception or we're asked to quit. */
             while (true) {
-                if (state->quit || state->exception) return;
-                if (didWork) {
-                    assert(state->pending);
-                    state->pending--;
-                    didWork = false;
+                if (state->quit) {
+                    if (!state->active)
+                        done.notify_one();
+                    return;
                 }
                 if (!state->left.empty()) break;
-                if (!state->pending)
-                    done.notify_all();
+                if (!state->active) {
+                    done.notify_one();
+                    return;
+                }
                 state.wait(work);
             }
-            w = state->left.front();
+
+            w = std::move(state->left.front());
             state->left.pop();
-            state->pending++;
+            state->active++;
         }
 
         try {
             w();
-        } catch (std::exception & e) {
-            auto state(state_.lock());
-            if (state->exception) {
-                if (!dynamic_cast<Interrupted*>(&e) &&
-                    !dynamic_cast<ThreadPoolShutDown*>(&e))
-                    printError(format("error: %s") % e.what());
-            } else {
-                state->exception = std::current_exception();
-                work.notify_all();
-                done.notify_all();
-            }
+        } catch (...) {
+            exc = std::current_exception();
         }
 
         didWork = true;