diff options
author | Vincent Ambo <tazjin@google.com> | 2020-05-17T15·31+0100 |
---|---|---|
committer | Vincent Ambo <tazjin@google.com> | 2020-05-17T15·31+0100 |
commit | 0f2cf531f705d370321843e5ba9135b2ebdb5d19 (patch) | |
tree | 256feb13963a849ed96e89228fa05454c2a22363 /third_party/nix/src/libutil/thread-pool.hh | |
parent | 65a1aae98ce5a237c9643e639e550c8b0c0be7f1 (diff) |
style(3p/nix): Reformat project in Google C++ style r/740
Reformatted with: fd . -e hh -e cc | xargs clang-format -i
Diffstat (limited to 'third_party/nix/src/libutil/thread-pool.hh')
-rw-r--r-- | third_party/nix/src/libutil/thread-pool.hh | 206 |
1 files changed, 96 insertions, 110 deletions
diff --git a/third_party/nix/src/libutil/thread-pool.hh b/third_party/nix/src/libutil/thread-pool.hh index bb16b639a591..18fd208b1bb4 100644 --- a/third_party/nix/src/libutil/thread-pool.hh +++ b/third_party/nix/src/libutil/thread-pool.hh @@ -1,143 +1,129 @@ #pragma once -#include "sync.hh" -#include "util.hh" - -#include <queue> +#include <atomic> #include <functional> -#include <thread> #include <map> -#include <atomic> +#include <queue> +#include <thread> +#include "sync.hh" +#include "util.hh" namespace nix { MakeError(ThreadPoolShutDown, Error) -/* A simple thread pool that executes a queue of work items - (lambdas). */ -class ThreadPool -{ -public: + /* A simple thread pool that executes a queue of work items + (lambdas). */ + class ThreadPool { + public: + ThreadPool(size_t maxThreads = 0); - ThreadPool(size_t maxThreads = 0); + ~ThreadPool(); - ~ThreadPool(); + // FIXME: use std::packaged_task? + typedef std::function<void()> work_t; - // FIXME: use std::packaged_task? - typedef std::function<void()> work_t; + /* Enqueue a function to be executed by the thread pool. */ + void enqueue(const work_t& t); - /* Enqueue a function to be executed by the thread pool. */ - void enqueue(const work_t & t); + /* Execute work items until the queue is empty. Note that work + items are allowed to add new items to the queue; this is + handled correctly. Queue processing stops prematurely if any + work item throws an exception. This exception is propagated to + the calling thread. If multiple work items throw an exception + concurrently, only one item is propagated; the others are + printed on stderr and otherwise ignored. */ + void process(); - /* Execute work items until the queue is empty. Note that work - items are allowed to add new items to the queue; this is - handled correctly. Queue processing stops prematurely if any - work item throws an exception. This exception is propagated to - the calling thread. If multiple work items throw an exception - concurrently, only one item is propagated; the others are - printed on stderr and otherwise ignored. */ - void process(); + private: + size_t maxThreads; -private: + struct State { + std::queue<work_t> pending; + size_t active = 0; + std::exception_ptr exception; + std::vector<std::thread> workers; + bool draining = false; + }; - size_t maxThreads; + std::atomic_bool quit{false}; - struct State - { - std::queue<work_t> pending; - size_t active = 0; - std::exception_ptr exception; - std::vector<std::thread> workers; - bool draining = false; - }; + Sync<State> state_; - std::atomic_bool quit{false}; + std::condition_variable work; - Sync<State> state_; + void doWork(bool mainThread); - std::condition_variable work; - - void doWork(bool mainThread); - - void shutdown(); + void shutdown(); }; /* Process in parallel a set of items of type T that have a partial ordering between them. Thus, any item is only processed after all its dependencies have been processed. */ -template<typename T> -void processGraph( - ThreadPool & pool, - const std::set<T> & nodes, - std::function<std::set<T>(const T &)> getEdges, - std::function<void(const T &)> processNode) -{ - struct Graph { - std::set<T> left; - std::map<T, std::set<T>> refs, rrefs; - }; - - Sync<Graph> graph_(Graph{nodes, {}, {}}); - - std::function<void(const T &)> worker; - - worker = [&](const T & node) { - - { - auto graph(graph_.lock()); - auto i = graph->refs.find(node); - if (i == graph->refs.end()) - goto getRefs; - goto doWork; - } +template <typename T> +void processGraph(ThreadPool& pool, const std::set<T>& nodes, + std::function<std::set<T>(const T&)> getEdges, + std::function<void(const T&)> processNode) { + struct Graph { + std::set<T> left; + std::map<T, std::set<T>> refs, rrefs; + }; - getRefs: - { - auto refs = getEdges(node); - refs.erase(node); - - { - auto graph(graph_.lock()); - for (auto & ref : refs) - if (graph->left.count(ref)) { - graph->refs[node].insert(ref); - graph->rrefs[ref].insert(node); - } - if (graph->refs[node].empty()) - goto doWork; - } - } + Sync<Graph> graph_(Graph{nodes, {}, {}}); + + std::function<void(const T&)> worker; - return; - - doWork: - processNode(node); - - /* Enqueue work for all nodes that were waiting on this one - and have no unprocessed dependencies. */ - { - auto graph(graph_.lock()); - for (auto & rref : graph->rrefs[node]) { - auto & refs(graph->refs[rref]); - auto i = refs.find(node); - assert(i != refs.end()); - refs.erase(i); - if (refs.empty()) - pool.enqueue(std::bind(worker, rref)); - } - graph->left.erase(node); - graph->refs.erase(node); - graph->rrefs.erase(node); + worker = [&](const T& node) { + { + auto graph(graph_.lock()); + auto i = graph->refs.find(node); + if (i == graph->refs.end()) goto getRefs; + goto doWork; + } + + getRefs : { + auto refs = getEdges(node); + refs.erase(node); + + { + auto graph(graph_.lock()); + for (auto& ref : refs) + if (graph->left.count(ref)) { + graph->refs[node].insert(ref); + graph->rrefs[ref].insert(node); } - }; + if (graph->refs[node].empty()) goto doWork; + } + } - for (auto & node : nodes) - pool.enqueue(std::bind(worker, std::ref(node))); + return; - pool.process(); + doWork: + processNode(node); - if (!graph_.lock()->left.empty()) - throw Error("graph processing incomplete (cyclic reference?)"); + /* Enqueue work for all nodes that were waiting on this one + and have no unprocessed dependencies. */ + { + auto graph(graph_.lock()); + for (auto& rref : graph->rrefs[node]) { + auto& refs(graph->refs[rref]); + auto i = refs.find(node); + assert(i != refs.end()); + refs.erase(i); + if (refs.empty()) pool.enqueue(std::bind(worker, rref)); + } + graph->left.erase(node); + graph->refs.erase(node); + graph->rrefs.erase(node); + } + }; + + for (auto& node : nodes) pool.enqueue(std::bind(worker, std::ref(node))); + + pool.process(); + + if (!graph_.lock()->left.empty()) + throw Error("graph processing incomplete (cyclic reference?)"); } -} +} // namespace nix |