about summary refs log tree commit diff
path: root/third_party/nix/src/libutil/thread-pool.hh
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/nix/src/libutil/thread-pool.hh')
-rw-r--r--third_party/nix/src/libutil/thread-pool.hh140
1 files changed, 140 insertions, 0 deletions
diff --git a/third_party/nix/src/libutil/thread-pool.hh b/third_party/nix/src/libutil/thread-pool.hh
new file mode 100644
index 0000000000..0efc4c1bfc
--- /dev/null
+++ b/third_party/nix/src/libutil/thread-pool.hh
@@ -0,0 +1,140 @@
+#pragma once
+
+#include <atomic>
+#include <functional>
+#include <map>
+#include <queue>
+#include <thread>
+
+#include "libutil/sync.hh"
+#include "libutil/util.hh"
+
+namespace nix {
+
+MakeError(ThreadPoolShutDown, Error);
+
+/* A simple thread pool that executes a queue of work items
+   (lambdas). */
+class ThreadPool {
+ public:
+  explicit ThreadPool(size_t maxThreads = 0);
+
+  ~ThreadPool();
+
+  // FIXME: use std::packaged_task?
+  typedef std::function<void()> work_t;
+
+  /* Enqueue a function to be executed by the thread pool. */
+  void enqueue(const work_t& t);
+
+  /* Execute work items until the queue is empty. Note that work
+     items are allowed to add new items to the queue; this is
+     handled correctly. Queue processing stops prematurely if any
+     work item throws an exception. This exception is propagated to
+     the calling thread. If multiple work items throw an exception
+     concurrently, only one item is propagated; the others are
+     printed on stderr and otherwise ignored. */
+  void process();
+
+ private:
+  size_t maxThreads;
+
+  struct State {
+    std::queue<work_t> pending;
+    size_t active = 0;
+    std::exception_ptr exception;
+    std::vector<std::thread> workers;
+    bool draining = false;
+  };
+
+  std::atomic_bool quit{false};
+
+  Sync<State> state_;
+
+  std::condition_variable work;
+
+  void doWork(bool mainThread);
+
+  void shutdown();
+};
+
+/* Process in parallel a set of items of type T that have a partial
+   ordering between them. Thus, any item is only processed after all
+   its dependencies have been processed. */
+template <typename T>
+void processGraph(ThreadPool& pool, const std::set<T>& nodes,
+                  std::function<std::set<T>(const T&)> getEdges,
+                  std::function<void(const T&)> processNode) {
+  struct Graph {
+    std::set<T> left;
+    std::map<T, std::set<T>> refs, rrefs;
+  };
+
+  Sync<Graph> graph_(Graph{nodes, {}, {}});
+
+  std::function<void(const T&)> worker;
+
+  worker = [&](const T& node) {
+    {
+      auto graph(graph_.lock());
+      auto i = graph->refs.find(node);
+      if (i == graph->refs.end()) {
+        goto getRefs;
+      }
+      goto doWork;
+    }
+
+  getRefs : {
+    auto refs = getEdges(node);
+    refs.erase(node);
+
+    {
+      auto graph(graph_.lock());
+      for (auto& ref : refs) {
+        if (graph->left.count(ref)) {
+          graph->refs[node].insert(ref);
+          graph->rrefs[ref].insert(node);
+        }
+      }
+      if (graph->refs[node].empty()) {
+        goto doWork;
+      }
+    }
+  }
+
+    return;
+
+  doWork:
+    processNode(node);
+
+    /* Enqueue work for all nodes that were waiting on this one
+       and have no unprocessed dependencies. */
+    {
+      auto graph(graph_.lock());
+      for (auto& rref : graph->rrefs[node]) {
+        auto& refs(graph->refs[rref]);
+        auto i = refs.find(node);
+        assert(i != refs.end());
+        refs.erase(i);
+        if (refs.empty()) {
+          pool.enqueue(std::bind(worker, rref));
+        }
+      }
+      graph->left.erase(node);
+      graph->refs.erase(node);
+      graph->rrefs.erase(node);
+    }
+  };
+
+  for (auto& node : nodes) {
+    pool.enqueue(std::bind(worker, std::ref(node)));
+  }
+
+  pool.process();
+
+  if (!graph_.lock()->left.empty()) {
+    throw Error("graph processing incomplete (cyclic reference?)");
+  }
+}
+
+}  // namespace nix