about summary refs log tree commit diff
path: root/src/libutil/thread-pool.hh
diff options
context:
space:
mode:
Diffstat (limited to 'src/libutil/thread-pool.hh')
-rw-r--r--src/libutil/thread-pool.hh75
1 files changed, 47 insertions, 28 deletions
diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh
index b64dc52d473e..361a9d33a732 100644
--- a/src/libutil/thread-pool.hh
+++ b/src/libutil/thread-pool.hh
@@ -70,50 +70,69 @@ void processGraph(
     struct Graph {
         std::set<T> left;
         std::map<T, std::set<T>> refs, rrefs;
-        std::function<void(T)> wrap;
     };
 
-    ref<Sync<Graph>> graph_ = make_ref<Sync<Graph>>();
+    Sync<Graph> graph_(Graph{nodes, {}, {}});
 
-    auto wrapWork = [&pool, graph_, processNode](const T & node) {
+    std::function<void(const T &)> worker;
+
+    worker = [&](const T & node) {
+
+        {
+            auto graph(graph_.lock());
+            auto i = graph->refs.find(node);
+            if (i == graph->refs.end())
+                goto getRefs;
+            goto doWork;
+        }
+
+    getRefs:
+        {
+            auto refs = getEdges(node);
+            refs.erase(node);
+
+            {
+                auto graph(graph_.lock());
+                for (auto & ref : refs)
+                    if (graph->left.count(ref)) {
+                        graph->refs[node].insert(ref);
+                        graph->rrefs[ref].insert(node);
+                    }
+                if (graph->refs[node].empty())
+                    goto doWork;
+            }
+        }
+
+        return;
+
+    doWork:
         processNode(node);
 
-        /* Enqueue work for all nodes that were waiting on this one. */
+        /* Enqueue work for all nodes that were waiting on this one
+           and have no unprocessed dependencies. */
         {
-            auto graph(graph_->lock());
-            graph->left.erase(node);
+            auto graph(graph_.lock());
             for (auto & rref : graph->rrefs[node]) {
                 auto & refs(graph->refs[rref]);
                 auto i = refs.find(node);
                 assert(i != refs.end());
                 refs.erase(i);
                 if (refs.empty())
-                    pool.enqueue(std::bind(graph->wrap, rref));
+                    pool.enqueue(std::bind(worker, rref));
             }
+            graph->left.erase(node);
+            graph->refs.erase(node);
+            graph->rrefs.erase(node);
         }
     };
 
-    {
-        auto graph(graph_->lock());
-        graph->left = nodes;
-        graph->wrap = wrapWork;
-    }
-
-    /* Build the dependency graph; enqueue all nodes with no
-       dependencies. */
-    for (auto & node : nodes) {
-        auto refs = getEdges(node);
-        {
-            auto graph(graph_->lock());
-            for (auto & ref : refs)
-                if (ref != node && graph->left.count(ref)) {
-                    graph->refs[node].insert(ref);
-                    graph->rrefs[ref].insert(node);
-                }
-            if (graph->refs[node].empty())
-                pool.enqueue(std::bind(graph->wrap, node));
-        }
-    }
+    for (auto & node : nodes)
+        pool.enqueue(std::bind(worker, std::ref(node)));
+
+    pool.process();
+
+    if (!graph_.lock()->left.empty())
+        throw Error("graph processing incomplete (cyclic reference?)");
 }
 
 }