From 90da34e421607ad6c40f3dea08709ae89db7a7e1 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Wed, 28 Jun 2017 16:38:02 +0200 Subject: processGraph(): Call getEdges in parallel --- src/libutil/thread-pool.hh | 75 +++++++++++++++++++++++++++++----------------- 1 file changed, 47 insertions(+), 28 deletions(-) (limited to 'src/libutil/thread-pool.hh') diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh index b64dc52d473e..361a9d33a732 100644 --- a/src/libutil/thread-pool.hh +++ b/src/libutil/thread-pool.hh @@ -70,50 +70,69 @@ void processGraph( struct Graph { std::set left; std::map> refs, rrefs; - std::function wrap; }; - ref> graph_ = make_ref>(); + Sync graph_(Graph{nodes, {}, {}}); - auto wrapWork = [&pool, graph_, processNode](const T & node) { + std::function worker; + + worker = [&](const T & node) { + + { + auto graph(graph_.lock()); + auto i = graph->refs.find(node); + if (i == graph->refs.end()) + goto getRefs; + goto doWork; + } + + getRefs: + { + auto refs = getEdges(node); + refs.erase(node); + + { + auto graph(graph_.lock()); + for (auto & ref : refs) + if (graph->left.count(ref)) { + graph->refs[node].insert(ref); + graph->rrefs[ref].insert(node); + } + if (graph->refs[node].empty()) + goto doWork; + } + } + + return; + + doWork: processNode(node); - /* Enqueue work for all nodes that were waiting on this one. */ + /* Enqueue work for all nodes that were waiting on this one + and have no unprocessed dependencies. */ { - auto graph(graph_->lock()); - graph->left.erase(node); + auto graph(graph_.lock()); for (auto & rref : graph->rrefs[node]) { auto & refs(graph->refs[rref]); auto i = refs.find(node); assert(i != refs.end()); refs.erase(i); if (refs.empty()) - pool.enqueue(std::bind(graph->wrap, rref)); + pool.enqueue(std::bind(worker, rref)); } + graph->left.erase(node); + graph->refs.erase(node); + graph->rrefs.erase(node); } }; - { - auto graph(graph_->lock()); - graph->left = nodes; - graph->wrap = wrapWork; - } - - /* Build the dependency graph; enqueue all nodes with no - dependencies. */ - for (auto & node : nodes) { - auto refs = getEdges(node); - { - auto graph(graph_->lock()); - for (auto & ref : refs) - if (ref != node && graph->left.count(ref)) { - graph->refs[node].insert(ref); - graph->rrefs[ref].insert(node); - } - if (graph->refs[node].empty()) - pool.enqueue(std::bind(graph->wrap, node)); - } - } + for (auto & node : nodes) + pool.enqueue(std::bind(worker, std::ref(node))); + + pool.process(); + + if (!graph_.lock()->left.empty()) + throw Error("graph processing incomplete (cyclic reference?)"); } } -- cgit 1.4.1