diff options
author | Eelco Dolstra <edolstra@gmail.com> | 2017-06-28T14·38+0200 |
---|---|---|
committer | Eelco Dolstra <edolstra@gmail.com> | 2017-07-03T09·38+0200 |
commit | 90da34e421607ad6c40f3dea08709ae89db7a7e1 (patch) | |
tree | f33985d155d93d11fbbb3eb976554ddf1b1695f4 /src | |
parent | 63d6e0ad3f0a93305cebc870dbb3639506727b27 (diff) |
processGraph(): Call getEdges in parallel
Diffstat (limited to 'src')
-rw-r--r-- | src/libutil/thread-pool.hh | 75 |
1 files changed, 47 insertions, 28 deletions
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh index b64dc52d473e..361a9d33a732 100644 --- a/src/libutil/thread-pool.hh +++ b/src/libutil/thread-pool.hh @@ -70,50 +70,69 @@ void processGraph( struct Graph { std::set<T> left; std::map<T, std::set<T>> refs, rrefs; - std::function<void(T)> wrap; }; - ref<Sync<Graph>> graph_ = make_ref<Sync<Graph>>(); + Sync<Graph> graph_(Graph{nodes, {}, {}}); - auto wrapWork = [&pool, graph_, processNode](const T & node) { + std::function<void(const T &)> worker; + + worker = [&](const T & node) { + + { + auto graph(graph_.lock()); + auto i = graph->refs.find(node); + if (i == graph->refs.end()) + goto getRefs; + goto doWork; + } + + getRefs: + { + auto refs = getEdges(node); + refs.erase(node); + + { + auto graph(graph_.lock()); + for (auto & ref : refs) + if (graph->left.count(ref)) { + graph->refs[node].insert(ref); + graph->rrefs[ref].insert(node); + } + if (graph->refs[node].empty()) + goto doWork; + } + } + + return; + + doWork: processNode(node); - /* Enqueue work for all nodes that were waiting on this one. */ + /* Enqueue work for all nodes that were waiting on this one + and have no unprocessed dependencies. */ { - auto graph(graph_->lock()); - graph->left.erase(node); + auto graph(graph_.lock()); for (auto & rref : graph->rrefs[node]) { auto & refs(graph->refs[rref]); auto i = refs.find(node); assert(i != refs.end()); refs.erase(i); if (refs.empty()) - pool.enqueue(std::bind(graph->wrap, rref)); + pool.enqueue(std::bind(worker, rref)); } + graph->left.erase(node); + graph->refs.erase(node); + graph->rrefs.erase(node); } }; - { - auto graph(graph_->lock()); - graph->left = nodes; - graph->wrap = wrapWork; - } - - /* Build the dependency graph; enqueue all nodes with no - dependencies. */ - for (auto & node : nodes) { - auto refs = getEdges(node); - { - auto graph(graph_->lock()); - for (auto & ref : refs) - if (ref != node && graph->left.count(ref)) { - graph->refs[node].insert(ref); - graph->rrefs[ref].insert(node); - } - if (graph->refs[node].empty()) - pool.enqueue(std::bind(graph->wrap, node)); - } - } + for (auto & node : nodes) + pool.enqueue(std::bind(worker, std::ref(node))); + + pool.process(); + + if (!graph_.lock()->left.empty()) + throw Error("graph processing incomplete (cyclic reference?)"); } } |