about summary refs log tree commit diff
path: root/src/libutil/thread-pool.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil/thread-pool.cc')
-rw-r--r--src/libutil/thread-pool.cc104
1 files changed, 104 insertions, 0 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
new file mode 100644
index 000000000000..0a3a407240f7
--- /dev/null
+++ b/src/libutil/thread-pool.cc
@@ -0,0 +1,104 @@
+#include "thread-pool.hh"
+#include "affinity.hh"
+
+namespace nix {
+
+ThreadPool::ThreadPool(size_t _maxThreads)
+    : maxThreads(_maxThreads)
+{
+    restoreAffinity(); // FIXME
+
+    if (!maxThreads) {
+        maxThreads = std::thread::hardware_concurrency();
+        if (!maxThreads) maxThreads = 1;
+    }
+
+    debug(format("starting pool of %d threads") % maxThreads);
+}
+
+ThreadPool::~ThreadPool()
+{
+    std::vector<std::thread> workers;
+    {
+        auto state(state_.lock());
+        state->quit = true;
+        std::swap(workers, state->workers);
+    }
+
+    debug(format("reaping %d worker threads") % workers.size());
+
+    work.notify_all();
+
+    for (auto & thr : workers)
+        thr.join();
+}
+
+void ThreadPool::enqueue(const work_t & t)
+{
+    auto state(state_.lock());
+    if (state->quit)
+        throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
+    state->left.push(t);
+    if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads)
+        state->workers.emplace_back(&ThreadPool::workerEntry, this);
+    work.notify_one();
+}
+
+void ThreadPool::process()
+{
+    while (true) {
+        auto state(state_.lock());
+        if (state->exception)
+            std::rethrow_exception(state->exception);
+        if (state->left.empty() && !state->pending) break;
+        state.wait(done);
+    }
+}
+
+void ThreadPool::workerEntry()
+{
+    bool didWork = false;
+
+    while (true) {
+        work_t w;
+        {
+            auto state(state_.lock());
+            while (true) {
+                if (state->quit || state->exception) return;
+                if (didWork) {
+                    assert(state->pending);
+                    state->pending--;
+                    didWork = false;
+                }
+                if (!state->left.empty()) break;
+                if (!state->pending)
+                    done.notify_all();
+                state.wait(work);
+            }
+            w = state->left.front();
+            state->left.pop();
+            state->pending++;
+        }
+
+        try {
+            w();
+        } catch (std::exception & e) {
+            auto state(state_.lock());
+            if (state->exception) {
+                if (!dynamic_cast<Interrupted*>(&e) &&
+                    !dynamic_cast<ThreadPoolShutDown*>(&e))
+                    printError(format("error: %s") % e.what());
+            } else {
+                state->exception = std::current_exception();
+                work.notify_all();
+                done.notify_all();
+            }
+        }
+
+        didWork = true;
+    }
+}
+
+}
+
+