about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/libutil/thread-pool.cc130
-rw-r--r--src/libutil/thread-pool.hh13
2 files changed, 84 insertions, 59 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
index 743038b588a7..32363ecf0098 100644
--- a/src/libutil/thread-pool.cc
+++ b/src/libutil/thread-pool.cc
@@ -1,79 +1,99 @@
 #include "thread-pool.hh"
+#include "affinity.hh"
 
 namespace nix {
 
-ThreadPool::ThreadPool(size_t _nrThreads)
-    : nrThreads(_nrThreads)
+ThreadPool::ThreadPool(size_t _maxThreads)
+    : maxThreads(_maxThreads)
 {
-    if (!nrThreads) {
-        nrThreads = std::thread::hardware_concurrency();
-        if (!nrThreads) nrThreads = 1;
+    restoreAffinity(); // FIXME
+
+    if (!maxThreads) {
+        maxThreads = std::thread::hardware_concurrency();
+        if (!maxThreads) maxThreads = 1;
     }
+
+    debug(format("starting pool of %d threads") % maxThreads);
+}
+
+ThreadPool::~ThreadPool()
+{
+    std::vector<std::thread> workers;
+    {
+        auto state(state_.lock());
+        state->quit = true;
+        std::swap(workers, state->workers);
+    }
+
+    debug(format("reaping %d worker threads") % workers.size());
+
+    work.notify_all();
+
+    for (auto & thr : workers)
+        thr.join();
 }
 
 void ThreadPool::enqueue(const work_t & t)
 {
-    auto state_(state.lock());
-    state_->left.push(t);
-    wakeup.notify_one();
+    auto state(state_.lock());
+    assert(!state->quit);
+    state->left.push(t);
+    if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
+        state->workers.emplace_back(&ThreadPool::workerEntry, this);
+    work.notify_one();
 }
 
 void ThreadPool::process()
 {
-    printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads);
-
-    std::vector<std::thread> workers;
+    while (true) {
+        auto state(state_.lock());
+        if (state->exception)
+            std::rethrow_exception(state->exception);
+        if (state->left.empty() && !state->pending) break;
+        state.wait(done);
+    }
+}
 
-    for (size_t n = 0; n < nrThreads; n++)
-        workers.push_back(std::thread([&]() {
-            bool first = true;
+void ThreadPool::workerEntry()
+{
+    bool didWork = false;
 
+    while (true) {
+        work_t w;
+        {
+            auto state(state_.lock());
             while (true) {
-                work_t work;
-                {
-                    auto state_(state.lock());
-                    if (state_->exception) return;
-                    if (!first) {
-                        assert(state_->pending);
-                        state_->pending--;
-                    }
-                    first = false;
-                    while (state_->left.empty()) {
-                        if (!state_->pending) {
-                            wakeup.notify_all();
-                            return;
-                        }
-                        if (state_->exception) return;
-                        state_.wait(wakeup);
-                    }
-                    work = state_->left.front();
-                    state_->left.pop();
-                    state_->pending++;
-                }
-
-                try {
-                    work();
-                } catch (std::exception & e) {
-                    auto state_(state.lock());
-                    if (state_->exception) {
-                        if (!dynamic_cast<Interrupted*>(&e))
-                            printMsg(lvlError, format("error: %s") % e.what());
-                    } else {
-                        state_->exception = std::current_exception();
-                        wakeup.notify_all();
-                    }
+                if (state->quit || state->exception) return;
+                if (didWork) {
+                    assert(state->pending);
+                    state->pending--;
+                    didWork = false;
                 }
+                if (!state->left.empty()) break;
+                if (!state->pending)
+                    done.notify_all();
+                state.wait(work);
             }
+            w = state->left.front();
+            state->left.pop();
+            state->pending++;
+        }
 
-        }));
-
-    for (auto & thr : workers)
-        thr.join();
+        try {
+            w();
+        } catch (std::exception & e) {
+            auto state(state_.lock());
+            if (state->exception) {
+                if (!dynamic_cast<Interrupted*>(&e))
+                    printMsg(lvlError, format("error: %s") % e.what());
+            } else {
+                state->exception = std::current_exception();
+                work.notify_all();
+                done.notify_all();
+            }
+        }
 
-    {
-        auto state_(state.lock());
-        if (state_->exception)
-            std::rethrow_exception(state_->exception);
+        didWork = true;
     }
 }
 
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
index 77641d88ba4e..939bcf1ef93b 100644
--- a/src/libutil/thread-pool.hh
+++ b/src/libutil/thread-pool.hh
@@ -15,7 +15,9 @@ class ThreadPool
 {
 public:
 
-    ThreadPool(size_t nrThreads = 0);
+    ThreadPool(size_t maxThreads = 0);
+
+    ~ThreadPool();
 
     // FIXME: use std::packaged_task?
     typedef std::function<void()> work_t;
@@ -34,19 +36,22 @@ public:
 
 private:
 
-    size_t nrThreads;
+    size_t maxThreads;
 
     struct State
     {
         std::queue<work_t> left;
         size_t pending = 0;
         std::exception_ptr exception;
+        std::vector<std::thread> workers;
+        bool quit = false;
     };
 
-    Sync<State> state;
+    Sync<State> state_;
 
-    std::condition_variable wakeup;
+    std::condition_variable work, done;
 
+    void workerEntry();
 };
 
 }