diff options
Diffstat (limited to 'third_party/nix/src/libutil/thread-pool.hh')
-rw-r--r-- | third_party/nix/src/libutil/thread-pool.hh | 140 |
1 files changed, 0 insertions, 140 deletions
diff --git a/third_party/nix/src/libutil/thread-pool.hh b/third_party/nix/src/libutil/thread-pool.hh deleted file mode 100644 index 0efc4c1bfc67..000000000000 --- a/third_party/nix/src/libutil/thread-pool.hh +++ /dev/null @@ -1,140 +0,0 @@ -#pragma once - -#include <atomic> -#include <functional> -#include <map> -#include <queue> -#include <thread> - -#include "libutil/sync.hh" -#include "libutil/util.hh" - -namespace nix { - -MakeError(ThreadPoolShutDown, Error); - -/* A simple thread pool that executes a queue of work items - (lambdas). */ -class ThreadPool { - public: - explicit ThreadPool(size_t maxThreads = 0); - - ~ThreadPool(); - - // FIXME: use std::packaged_task? - typedef std::function<void()> work_t; - - /* Enqueue a function to be executed by the thread pool. */ - void enqueue(const work_t& t); - - /* Execute work items until the queue is empty. Note that work - items are allowed to add new items to the queue; this is - handled correctly. Queue processing stops prematurely if any - work item throws an exception. This exception is propagated to - the calling thread. If multiple work items throw an exception - concurrently, only one item is propagated; the others are - printed on stderr and otherwise ignored. */ - void process(); - - private: - size_t maxThreads; - - struct State { - std::queue<work_t> pending; - size_t active = 0; - std::exception_ptr exception; - std::vector<std::thread> workers; - bool draining = false; - }; - - std::atomic_bool quit{false}; - - Sync<State> state_; - - std::condition_variable work; - - void doWork(bool mainThread); - - void shutdown(); -}; - -/* Process in parallel a set of items of type T that have a partial - ordering between them. Thus, any item is only processed after all - its dependencies have been processed. */ -template <typename T> -void processGraph(ThreadPool& pool, const std::set<T>& nodes, - std::function<std::set<T>(const T&)> getEdges, - std::function<void(const T&)> processNode) { - struct Graph { - std::set<T> left; - std::map<T, std::set<T>> refs, rrefs; - }; - - Sync<Graph> graph_(Graph{nodes, {}, {}}); - - std::function<void(const T&)> worker; - - worker = [&](const T& node) { - { - auto graph(graph_.lock()); - auto i = graph->refs.find(node); - if (i == graph->refs.end()) { - goto getRefs; - } - goto doWork; - } - - getRefs : { - auto refs = getEdges(node); - refs.erase(node); - - { - auto graph(graph_.lock()); - for (auto& ref : refs) { - if (graph->left.count(ref)) { - graph->refs[node].insert(ref); - graph->rrefs[ref].insert(node); - } - } - if (graph->refs[node].empty()) { - goto doWork; - } - } - } - - return; - - doWork: - processNode(node); - - /* Enqueue work for all nodes that were waiting on this one - and have no unprocessed dependencies. */ - { - auto graph(graph_.lock()); - for (auto& rref : graph->rrefs[node]) { - auto& refs(graph->refs[rref]); - auto i = refs.find(node); - assert(i != refs.end()); - refs.erase(i); - if (refs.empty()) { - pool.enqueue(std::bind(worker, rref)); - } - } - graph->left.erase(node); - graph->refs.erase(node); - graph->rrefs.erase(node); - } - }; - - for (auto& node : nodes) { - pool.enqueue(std::bind(worker, std::ref(node))); - } - - pool.process(); - - if (!graph_.lock()->left.empty()) { - throw Error("graph processing incomplete (cyclic reference?)"); - } -} - -} // namespace nix |