about summary refs log tree commit diff
path: root/third_party/nix/src/libutil/thread-pool.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/nix/src/libutil/thread-pool.cc')
-rw-r--r--third_party/nix/src/libutil/thread-pool.cc156
1 files changed, 156 insertions, 0 deletions
diff --git a/third_party/nix/src/libutil/thread-pool.cc b/third_party/nix/src/libutil/thread-pool.cc
new file mode 100644
index 000000000000..857ee91f87d0
--- /dev/null
+++ b/third_party/nix/src/libutil/thread-pool.cc
@@ -0,0 +1,156 @@
+#include "thread-pool.hh"
+#include "affinity.hh"
+
+namespace nix {
+
+ThreadPool::ThreadPool(size_t _maxThreads)
+    : maxThreads(_maxThreads)
+{
+    restoreAffinity(); // FIXME
+
+    if (!maxThreads) {
+        maxThreads = std::thread::hardware_concurrency();
+        if (!maxThreads) maxThreads = 1;
+    }
+
+    debug("starting pool of %d threads", maxThreads - 1);
+}
+
+ThreadPool::~ThreadPool()
+{
+    shutdown();
+}
+
+void ThreadPool::shutdown()
+{
+    std::vector<std::thread> workers;
+    {
+        auto state(state_.lock());
+        quit = true;
+        std::swap(workers, state->workers);
+    }
+
+    if (workers.empty()) return;
+
+    debug("reaping %d worker threads", workers.size());
+
+    work.notify_all();
+
+    for (auto & thr : workers)
+        thr.join();
+}
+
+void ThreadPool::enqueue(const work_t & t)
+{
+    auto state(state_.lock());
+    if (quit)
+        throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
+    state->pending.push(t);
+    /* Note: process() also executes items, so count it as a worker. */
+    if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads)
+        state->workers.emplace_back(&ThreadPool::doWork, this, false);
+    work.notify_one();
+}
+
+void ThreadPool::process()
+{
+    state_.lock()->draining = true;
+
+    /* Do work until no more work is pending or active. */
+    try {
+        doWork(true);
+
+        auto state(state_.lock());
+
+        assert(quit);
+
+        if (state->exception)
+            std::rethrow_exception(state->exception);
+
+    } catch (...) {
+        /* In the exceptional case, some workers may still be
+           active. They may be referencing the stack frame of the
+           caller. So wait for them to finish. (~ThreadPool also does
+           this, but it might be destroyed after objects referenced by
+           the work item lambdas.) */
+        shutdown();
+        throw;
+    }
+}
+
+void ThreadPool::doWork(bool mainThread)
+{
+    if (!mainThread)
+        interruptCheck = [&]() { return (bool) quit; };
+
+    bool didWork = false;
+    std::exception_ptr exc;
+
+    while (true) {
+        work_t w;
+        {
+            auto state(state_.lock());
+
+            if (didWork) {
+                assert(state->active);
+                state->active--;
+
+                if (exc) {
+
+                    if (!state->exception) {
+                        state->exception = exc;
+                        // Tell the other workers to quit.
+                        quit = true;
+                        work.notify_all();
+                    } else {
+                        /* Print the exception, since we can't
+                           propagate it. */
+                        try {
+                            std::rethrow_exception(exc);
+                        } catch (std::exception & e) {
+                            if (!dynamic_cast<Interrupted*>(&e) &&
+                                !dynamic_cast<ThreadPoolShutDown*>(&e))
+                                ignoreException();
+                        } catch (...) {
+                        }
+                    }
+                }
+            }
+
+            /* Wait until a work item is available or we're asked to
+               quit. */
+            while (true) {
+                if (quit) return;
+
+                if (!state->pending.empty()) break;
+
+                /* If there are no active or pending items, and the
+                   main thread is running process(), then no new items
+                   can be added. So exit. */
+                if (!state->active && state->draining) {
+                    quit = true;
+                    work.notify_all();
+                    return;
+                }
+
+                state.wait(work);
+            }
+
+            w = std::move(state->pending.front());
+            state->pending.pop();
+            state->active++;
+        }
+
+        try {
+            w();
+        } catch (...) {
+            exc = std::current_exception();
+        }
+
+        didWork = true;
+    }
+}
+
+}
+
+