about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--mk/tests.mk4
-rw-r--r--release-common.nix4
-rw-r--r--src/libutil/thread-pool.cc81
-rw-r--r--src/libutil/thread-pool.hh9
4 files changed, 61 insertions, 37 deletions
diff --git a/mk/tests.mk b/mk/tests.mk
index d18b910584f4..1138857c3c16 100644
--- a/mk/tests.mk
+++ b/mk/tests.mk
@@ -26,9 +26,9 @@ installcheck:
 	  printf "running test $$i..."; \
 	  log="$$(cd $$(dirname $$i) && $(tests-environment) $$(basename $$i) 2>&1)"; \
 	  if [ $$? -eq 0 ]; then \
-	    echo "[$${green}PASS$$normal]"; \
+	    echo " [$${green}PASS$$normal]"; \
 	  else \
-	    echo "[$${red}FAIL$$normal]"; \
+	    echo " [$${red}FAIL$$normal]"; \
 	    echo "$$log" | sed 's/^/    /'; \
 	    failed=$$((failed + 1)); \
 	  fi; \
diff --git a/release-common.nix b/release-common.nix
index c64fc619df6d..4553118e1f56 100644
--- a/release-common.nix
+++ b/release-common.nix
@@ -7,8 +7,8 @@ rec {
     enableMinimal = true;
     extraConfig = ''
       CONFIG_ASH y
-      CONFIG_ASH_BUILTIN_ECHO y
-      CONFIG_ASH_BUILTIN_TEST y
+      CONFIG_ASH_ECHO y
+      CONFIG_ASH_TEST y
       CONFIG_ASH_OPTIMIZE_FOR_SIZE y
     '';
   };
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
index ce126d36db8e..857ee91f87d0 100644
--- a/src/libutil/thread-pool.cc
+++ b/src/libutil/thread-pool.cc
@@ -13,11 +13,16 @@ ThreadPool::ThreadPool(size_t _maxThreads)
         if (!maxThreads) maxThreads = 1;
     }
 
-    debug(format("starting pool of %d threads") % maxThreads);
+    debug("starting pool of %d threads", maxThreads - 1);
 }
 
 ThreadPool::~ThreadPool()
 {
+    shutdown();
+}
+
+void ThreadPool::shutdown()
+{
     std::vector<std::thread> workers;
     {
         auto state(state_.lock());
@@ -25,7 +30,9 @@ ThreadPool::~ThreadPool()
         std::swap(workers, state->workers);
     }
 
-    debug(format("reaping %d worker threads") % workers.size());
+    if (workers.empty()) return;
+
+    debug("reaping %d worker threads", workers.size());
 
     work.notify_all();
 
@@ -38,32 +45,43 @@ void ThreadPool::enqueue(const work_t & t)
     auto state(state_.lock());
     if (quit)
         throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
-    state->left.push(t);
-    if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
-        state->workers.emplace_back(&ThreadPool::workerEntry, this);
+    state->pending.push(t);
+    /* Note: process() also executes items, so count it as a worker. */
+    if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads)
+        state->workers.emplace_back(&ThreadPool::doWork, this, false);
     work.notify_one();
 }
 
 void ThreadPool::process()
 {
-    /* Loop until there are no active work items *and* there either
-       are no queued items or there is an exception. The
-       post-condition is that no new items will become active. */
-    while (true) {
+    state_.lock()->draining = true;
+
+    /* Do work until no more work is pending or active. */
+    try {
+        doWork(true);
+
         auto state(state_.lock());
-        if (!state->active) {
-            if (state->exception)
-                std::rethrow_exception(state->exception);
-            if (state->left.empty())
-                break;
-        }
-        state.wait(done);
+
+        assert(quit);
+
+        if (state->exception)
+            std::rethrow_exception(state->exception);
+
+    } catch (...) {
+        /* In the exceptional case, some workers may still be
+           active. They may be referencing the stack frame of the
+           caller. So wait for them to finish. (~ThreadPool also does
+           this, but it might be destroyed after objects referenced by
+           the work item lambdas.) */
+        shutdown();
+        throw;
     }
 }
 
-void ThreadPool::workerEntry()
+void ThreadPool::doWork(bool mainThread)
 {
-    interruptCheck = [&]() { return (bool) quit; };
+    if (!mainThread)
+        interruptCheck = [&]() { return (bool) quit; };
 
     bool didWork = false;
     std::exception_ptr exc;
@@ -99,24 +117,27 @@ void ThreadPool::workerEntry()
                 }
             }
 
-            /* Wait until a work item is available or another thread
-               had an exception or we're asked to quit. */
+            /* Wait until a work item is available or we're asked to
+               quit. */
             while (true) {
-                if (quit) {
-                    if (!state->active)
-                        done.notify_one();
-                    return;
-                }
-                if (!state->left.empty()) break;
-                if (!state->active) {
-                    done.notify_one();
+                if (quit) return;
+
+                if (!state->pending.empty()) break;
+
+                /* If there are no active or pending items, and the
+                   main thread is running process(), then no new items
+                   can be added. So exit. */
+                if (!state->active && state->draining) {
+                    quit = true;
+                    work.notify_all();
                     return;
                 }
+
                 state.wait(work);
             }
 
-            w = std::move(state->left.front());
-            state->left.pop();
+            w = std::move(state->pending.front());
+            state->pending.pop();
             state->active++;
         }
 
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
index 06a097ab5ea7..bb16b639a591 100644
--- a/src/libutil/thread-pool.hh
+++ b/src/libutil/thread-pool.hh
@@ -44,19 +44,22 @@ private:
 
     struct State
     {
-        std::queue<work_t> left;
+        std::queue<work_t> pending;
         size_t active = 0;
         std::exception_ptr exception;
         std::vector<std::thread> workers;
+        bool draining = false;
     };
 
     std::atomic_bool quit{false};
 
     Sync<State> state_;
 
-    std::condition_variable work, done;
+    std::condition_variable work;
 
-    void workerEntry();
+    void doWork(bool mainThread);
+
+    void shutdown();
 };
 
 /* Process in parallel a set of items of type T that have a partial