about summary refs log tree commit diff
path: root/src/libutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil')
-rw-r--r--src/libutil/args.hh2
-rw-r--r--src/libutil/thread-pool.cc81
-rw-r--r--src/libutil/thread-pool.hh9
3 files changed, 58 insertions, 34 deletions
diff --git a/src/libutil/args.hh b/src/libutil/args.hh
index 401bebbabc08..37e31825ab37 100644
--- a/src/libutil/args.hh
+++ b/src/libutil/args.hh
@@ -90,7 +90,7 @@ public:
 
         template<class T>
         FlagMaker & set(T * dest, const T & val) {
-            flag->arity = 1;
+            flag->arity = 0;
             flag->handler = [=](Strings ss) { *dest = val; };
             return *this;
         };
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
index ce126d36db8e..857ee91f87d0 100644
--- a/src/libutil/thread-pool.cc
+++ b/src/libutil/thread-pool.cc
@@ -13,11 +13,16 @@ ThreadPool::ThreadPool(size_t _maxThreads)
         if (!maxThreads) maxThreads = 1;
     }
 
-    debug(format("starting pool of %d threads") % maxThreads);
+    debug("starting pool of %d threads", maxThreads - 1);
 }
 
 ThreadPool::~ThreadPool()
 {
+    shutdown();
+}
+
+void ThreadPool::shutdown()
+{
     std::vector<std::thread> workers;
     {
         auto state(state_.lock());
@@ -25,7 +30,9 @@ ThreadPool::~ThreadPool()
         std::swap(workers, state->workers);
     }
 
-    debug(format("reaping %d worker threads") % workers.size());
+    if (workers.empty()) return;
+
+    debug("reaping %d worker threads", workers.size());
 
     work.notify_all();
 
@@ -38,32 +45,43 @@ void ThreadPool::enqueue(const work_t & t)
     auto state(state_.lock());
     if (quit)
         throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
-    state->left.push(t);
-    if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
-        state->workers.emplace_back(&ThreadPool::workerEntry, this);
+    state->pending.push(t);
+    /* Note: process() also executes items, so count it as a worker. */
+    if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads)
+        state->workers.emplace_back(&ThreadPool::doWork, this, false);
     work.notify_one();
 }
 
 void ThreadPool::process()
 {
-    /* Loop until there are no active work items *and* there either
-       are no queued items or there is an exception. The
-       post-condition is that no new items will become active. */
-    while (true) {
+    state_.lock()->draining = true;
+
+    /* Do work until no more work is pending or active. */
+    try {
+        doWork(true);
+
         auto state(state_.lock());
-        if (!state->active) {
-            if (state->exception)
-                std::rethrow_exception(state->exception);
-            if (state->left.empty())
-                break;
-        }
-        state.wait(done);
+
+        assert(quit);
+
+        if (state->exception)
+            std::rethrow_exception(state->exception);
+
+    } catch (...) {
+        /* In the exceptional case, some workers may still be
+           active. They may be referencing the stack frame of the
+           caller. So wait for them to finish. (~ThreadPool also does
+           this, but it might be destroyed after objects referenced by
+           the work item lambdas.) */
+        shutdown();
+        throw;
     }
 }
 
-void ThreadPool::workerEntry()
+void ThreadPool::doWork(bool mainThread)
 {
-    interruptCheck = [&]() { return (bool) quit; };
+    if (!mainThread)
+        interruptCheck = [&]() { return (bool) quit; };
 
     bool didWork = false;
     std::exception_ptr exc;
@@ -99,24 +117,27 @@ void ThreadPool::workerEntry()
                 }
             }
 
-            /* Wait until a work item is available or another thread
-               had an exception or we're asked to quit. */
+            /* Wait until a work item is available or we're asked to
+               quit. */
             while (true) {
-                if (quit) {
-                    if (!state->active)
-                        done.notify_one();
-                    return;
-                }
-                if (!state->left.empty()) break;
-                if (!state->active) {
-                    done.notify_one();
+                if (quit) return;
+
+                if (!state->pending.empty()) break;
+
+                /* If there are no active or pending items, and the
+                   main thread is running process(), then no new items
+                   can be added. So exit. */
+                if (!state->active && state->draining) {
+                    quit = true;
+                    work.notify_all();
                     return;
                 }
+
                 state.wait(work);
             }
 
-            w = std::move(state->left.front());
-            state->left.pop();
+            w = std::move(state->pending.front());
+            state->pending.pop();
             state->active++;
         }
 
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
index 06a097ab5ea7..bb16b639a591 100644
--- a/src/libutil/thread-pool.hh
+++ b/src/libutil/thread-pool.hh
@@ -44,19 +44,22 @@ private:
 
     struct State
     {
-        std::queue<work_t> left;
+        std::queue<work_t> pending;
         size_t active = 0;
         std::exception_ptr exception;
         std::vector<std::thread> workers;
+        bool draining = false;
     };
 
     std::atomic_bool quit{false};
 
     Sync<State> state_;
 
-    std::condition_variable work, done;
+    std::condition_variable work;
 
-    void workerEntry();
+    void doWork(bool mainThread);
+
+    void shutdown();
 };
 
 /* Process in parallel a set of items of type T that have a partial