diff options
Diffstat (limited to 'src/libutil/thread-pool.hh')
-rw-r--r-- | src/libutil/thread-pool.hh | 73 |
1 files changed, 69 insertions, 4 deletions
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh index 77641d88ba4e..78b63467d62e 100644 --- a/src/libutil/thread-pool.hh +++ b/src/libutil/thread-pool.hh @@ -6,6 +6,7 @@ #include <queue> #include <functional> #include <thread> +#include <map> namespace nix { @@ -15,7 +16,9 @@ class ThreadPool { public: - ThreadPool(size_t nrThreads = 0); + ThreadPool(size_t maxThreads = 0); + + ~ThreadPool(); // FIXME: use std::packaged_task? typedef std::function<void()> work_t; @@ -34,19 +37,81 @@ public: private: - size_t nrThreads; + size_t maxThreads; struct State { std::queue<work_t> left; size_t pending = 0; std::exception_ptr exception; + std::vector<std::thread> workers; + bool quit = false; }; - Sync<State> state; + Sync<State> state_; - std::condition_variable wakeup; + std::condition_variable work, done; + void workerEntry(); }; +/* Process in parallel a set of items of type T that have a partial + ordering between them. Thus, any item is only processed after all + its dependencies have been processed. */ +template<typename T> +void processGraph( + ThreadPool & pool, + const std::set<T> & nodes, + std::function<std::set<T>(const T &)> getEdges, + std::function<void(const T &)> processNode) +{ + struct Graph { + std::set<T> left; + std::map<T, std::set<T>> refs, rrefs; + std::function<void(T)> wrap; + }; + + ref<Sync<Graph>> graph_ = make_ref<Sync<Graph>>(); + + auto wrapWork = [&pool, graph_, processNode](const T & node) { + processNode(node); + + /* Enqueue work for all nodes that were waiting on this one. */ + { + auto graph(graph_->lock()); + graph->left.erase(node); + for (auto & rref : graph->rrefs[node]) { + auto & refs(graph->refs[rref]); + auto i = refs.find(node); + assert(i != refs.end()); + refs.erase(i); + if (refs.empty()) + pool.enqueue(std::bind(graph->wrap, rref)); + } + } + }; + + { + auto graph(graph_->lock()); + graph->left = nodes; + graph->wrap = wrapWork; + } + + /* Build the dependency graph; enqueue all nodes with no + dependencies. */ + for (auto & node : nodes) { + auto refs = getEdges(node); + { + auto graph(graph_->lock()); + for (auto & ref : refs) + if (ref != node && graph->left.count(ref)) { + graph->refs[node].insert(ref); + graph->rrefs[ref].insert(node); + } + if (graph->refs[node].empty()) + pool.enqueue(std::bind(graph->wrap, node)); + } + } +} + } |