about summary refs log tree commit diff
path: root/src/libutil/thread-pool.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil/thread-pool.cc')
-rw-r--r--src/libutil/thread-pool.cc82
1 files changed, 82 insertions, 0 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc
new file mode 100644
index 000000000000..743038b588a7
--- /dev/null
+++ b/src/libutil/thread-pool.cc
@@ -0,0 +1,82 @@
+#include "thread-pool.hh"
+
+namespace nix {
+
+ThreadPool::ThreadPool(size_t _nrThreads)
+    : nrThreads(_nrThreads)
+{
+    if (!nrThreads) {
+        nrThreads = std::thread::hardware_concurrency();
+        if (!nrThreads) nrThreads = 1;
+    }
+}
+
+void ThreadPool::enqueue(const work_t & t)
+{
+    auto state_(state.lock());
+    state_->left.push(t);
+    wakeup.notify_one();
+}
+
+void ThreadPool::process()
+{
+    printMsg(lvlDebug, format("starting pool of %d threads") % nrThreads);
+
+    std::vector<std::thread> workers;
+
+    for (size_t n = 0; n < nrThreads; n++)
+        workers.push_back(std::thread([&]() {
+            bool first = true;
+
+            while (true) {
+                work_t work;
+                {
+                    auto state_(state.lock());
+                    if (state_->exception) return;
+                    if (!first) {
+                        assert(state_->pending);
+                        state_->pending--;
+                    }
+                    first = false;
+                    while (state_->left.empty()) {
+                        if (!state_->pending) {
+                            wakeup.notify_all();
+                            return;
+                        }
+                        if (state_->exception) return;
+                        state_.wait(wakeup);
+                    }
+                    work = state_->left.front();
+                    state_->left.pop();
+                    state_->pending++;
+                }
+
+                try {
+                    work();
+                } catch (std::exception & e) {
+                    auto state_(state.lock());
+                    if (state_->exception) {
+                        if (!dynamic_cast<Interrupted*>(&e))
+                            printMsg(lvlError, format("error: %s") % e.what());
+                    } else {
+                        state_->exception = std::current_exception();
+                        wakeup.notify_all();
+                    }
+                }
+            }
+
+        }));
+
+    for (auto & thr : workers)
+        thr.join();
+
+    {
+        auto state_(state.lock());
+        if (state_->exception)
+            std::rethrow_exception(state_->exception);
+    }
+}
+
+}
+
+