From 0f2cf531f705d370321843e5ba9135b2ebdb5d19 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Sun, 17 May 2020 16:31:57 +0100 Subject: style(3p/nix): Reformat project in Google C++ style Reformatted with: fd . -e hh -e cc | xargs clang-format -i --- third_party/nix/src/libutil/thread-pool.hh | 206 ++++++++++++++--------------- 1 file changed, 96 insertions(+), 110 deletions(-) (limited to 'third_party/nix/src/libutil/thread-pool.hh') diff --git a/third_party/nix/src/libutil/thread-pool.hh b/third_party/nix/src/libutil/thread-pool.hh index bb16b639a591..18fd208b1bb4 100644 --- a/third_party/nix/src/libutil/thread-pool.hh +++ b/third_party/nix/src/libutil/thread-pool.hh @@ -1,143 +1,129 @@ #pragma once -#include "sync.hh" -#include "util.hh" - -#include +#include #include -#include #include -#include +#include +#include +#include "sync.hh" +#include "util.hh" namespace nix { MakeError(ThreadPoolShutDown, Error) -/* A simple thread pool that executes a queue of work items - (lambdas). */ -class ThreadPool -{ -public: + /* A simple thread pool that executes a queue of work items + (lambdas). */ + class ThreadPool { + public: + ThreadPool(size_t maxThreads = 0); - ThreadPool(size_t maxThreads = 0); + ~ThreadPool(); - ~ThreadPool(); + // FIXME: use std::packaged_task? + typedef std::function work_t; - // FIXME: use std::packaged_task? - typedef std::function work_t; + /* Enqueue a function to be executed by the thread pool. */ + void enqueue(const work_t& t); - /* Enqueue a function to be executed by the thread pool. */ - void enqueue(const work_t & t); + /* Execute work items until the queue is empty. Note that work + items are allowed to add new items to the queue; this is + handled correctly. Queue processing stops prematurely if any + work item throws an exception. This exception is propagated to + the calling thread. If multiple work items throw an exception + concurrently, only one item is propagated; the others are + printed on stderr and otherwise ignored. */ + void process(); - /* Execute work items until the queue is empty. Note that work - items are allowed to add new items to the queue; this is - handled correctly. Queue processing stops prematurely if any - work item throws an exception. This exception is propagated to - the calling thread. If multiple work items throw an exception - concurrently, only one item is propagated; the others are - printed on stderr and otherwise ignored. */ - void process(); + private: + size_t maxThreads; -private: + struct State { + std::queue pending; + size_t active = 0; + std::exception_ptr exception; + std::vector workers; + bool draining = false; + }; - size_t maxThreads; + std::atomic_bool quit{false}; - struct State - { - std::queue pending; - size_t active = 0; - std::exception_ptr exception; - std::vector workers; - bool draining = false; - }; + Sync state_; - std::atomic_bool quit{false}; + std::condition_variable work; - Sync state_; + void doWork(bool mainThread); - std::condition_variable work; - - void doWork(bool mainThread); - - void shutdown(); + void shutdown(); }; /* Process in parallel a set of items of type T that have a partial ordering between them. Thus, any item is only processed after all its dependencies have been processed. */ -template -void processGraph( - ThreadPool & pool, - const std::set & nodes, - std::function(const T &)> getEdges, - std::function processNode) -{ - struct Graph { - std::set left; - std::map> refs, rrefs; - }; - - Sync graph_(Graph{nodes, {}, {}}); - - std::function worker; - - worker = [&](const T & node) { - - { - auto graph(graph_.lock()); - auto i = graph->refs.find(node); - if (i == graph->refs.end()) - goto getRefs; - goto doWork; - } +template +void processGraph(ThreadPool& pool, const std::set& nodes, + std::function(const T&)> getEdges, + std::function processNode) { + struct Graph { + std::set left; + std::map> refs, rrefs; + }; - getRefs: - { - auto refs = getEdges(node); - refs.erase(node); - - { - auto graph(graph_.lock()); - for (auto & ref : refs) - if (graph->left.count(ref)) { - graph->refs[node].insert(ref); - graph->rrefs[ref].insert(node); - } - if (graph->refs[node].empty()) - goto doWork; - } - } + Sync graph_(Graph{nodes, {}, {}}); + + std::function worker; - return; - - doWork: - processNode(node); - - /* Enqueue work for all nodes that were waiting on this one - and have no unprocessed dependencies. */ - { - auto graph(graph_.lock()); - for (auto & rref : graph->rrefs[node]) { - auto & refs(graph->refs[rref]); - auto i = refs.find(node); - assert(i != refs.end()); - refs.erase(i); - if (refs.empty()) - pool.enqueue(std::bind(worker, rref)); - } - graph->left.erase(node); - graph->refs.erase(node); - graph->rrefs.erase(node); + worker = [&](const T& node) { + { + auto graph(graph_.lock()); + auto i = graph->refs.find(node); + if (i == graph->refs.end()) goto getRefs; + goto doWork; + } + + getRefs : { + auto refs = getEdges(node); + refs.erase(node); + + { + auto graph(graph_.lock()); + for (auto& ref : refs) + if (graph->left.count(ref)) { + graph->refs[node].insert(ref); + graph->rrefs[ref].insert(node); } - }; + if (graph->refs[node].empty()) goto doWork; + } + } - for (auto & node : nodes) - pool.enqueue(std::bind(worker, std::ref(node))); + return; - pool.process(); + doWork: + processNode(node); - if (!graph_.lock()->left.empty()) - throw Error("graph processing incomplete (cyclic reference?)"); + /* Enqueue work for all nodes that were waiting on this one + and have no unprocessed dependencies. */ + { + auto graph(graph_.lock()); + for (auto& rref : graph->rrefs[node]) { + auto& refs(graph->refs[rref]); + auto i = refs.find(node); + assert(i != refs.end()); + refs.erase(i); + if (refs.empty()) pool.enqueue(std::bind(worker, rref)); + } + graph->left.erase(node); + graph->refs.erase(node); + graph->rrefs.erase(node); + } + }; + + for (auto& node : nodes) pool.enqueue(std::bind(worker, std::ref(node))); + + pool.process(); + + if (!graph_.lock()->left.empty()) + throw Error("graph processing incomplete (cyclic reference?)"); } -} +} // namespace nix -- cgit 1.4.1