about summary refs log tree commit diff
path: root/third_party/nix/src/libutil/thread-pool.cc
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/nix/src/libutil/thread-pool.cc')
-rw-r--r--third_party/nix/src/libutil/thread-pool.cc231
1 files changed, 109 insertions, 122 deletions
diff --git a/third_party/nix/src/libutil/thread-pool.cc b/third_party/nix/src/libutil/thread-pool.cc
index 857ee91f87..5b5be92653 100644
--- a/third_party/nix/src/libutil/thread-pool.cc
+++ b/third_party/nix/src/libutil/thread-pool.cc
@@ -3,154 +3,141 @@
 
 namespace nix {
 
-ThreadPool::ThreadPool(size_t _maxThreads)
-    : maxThreads(_maxThreads)
-{
-    restoreAffinity(); // FIXME
-
-    if (!maxThreads) {
-        maxThreads = std::thread::hardware_concurrency();
-        if (!maxThreads) maxThreads = 1;
-    }
+ThreadPool::ThreadPool(size_t _maxThreads) : maxThreads(_maxThreads) {
+  restoreAffinity();  // FIXME
 
-    debug("starting pool of %d threads", maxThreads - 1);
-}
+  if (!maxThreads) {
+    maxThreads = std::thread::hardware_concurrency();
+    if (!maxThreads) maxThreads = 1;
+  }
 
-ThreadPool::~ThreadPool()
-{
-    shutdown();
+  debug("starting pool of %d threads", maxThreads - 1);
 }
 
-void ThreadPool::shutdown()
-{
-    std::vector<std::thread> workers;
-    {
-        auto state(state_.lock());
-        quit = true;
-        std::swap(workers, state->workers);
-    }
+ThreadPool::~ThreadPool() { shutdown(); }
+
+void ThreadPool::shutdown() {
+  std::vector<std::thread> workers;
+  {
+    auto state(state_.lock());
+    quit = true;
+    std::swap(workers, state->workers);
+  }
 
-    if (workers.empty()) return;
+  if (workers.empty()) return;
 
-    debug("reaping %d worker threads", workers.size());
+  debug("reaping %d worker threads", workers.size());
 
-    work.notify_all();
+  work.notify_all();
 
-    for (auto & thr : workers)
-        thr.join();
+  for (auto& thr : workers) thr.join();
 }
 
-void ThreadPool::enqueue(const work_t & t)
-{
-    auto state(state_.lock());
-    if (quit)
-        throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down");
-    state->pending.push(t);
-    /* Note: process() also executes items, so count it as a worker. */
-    if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads)
-        state->workers.emplace_back(&ThreadPool::doWork, this, false);
-    work.notify_one();
+void ThreadPool::enqueue(const work_t& t) {
+  auto state(state_.lock());
+  if (quit)
+    throw ThreadPoolShutDown(
+        "cannot enqueue a work item while the thread pool is shutting down");
+  state->pending.push(t);
+  /* Note: process() also executes items, so count it as a worker. */
+  if (state->pending.size() > state->workers.size() + 1 &&
+      state->workers.size() + 1 < maxThreads)
+    state->workers.emplace_back(&ThreadPool::doWork, this, false);
+  work.notify_one();
 }
 
-void ThreadPool::process()
-{
-    state_.lock()->draining = true;
+void ThreadPool::process() {
+  state_.lock()->draining = true;
 
-    /* Do work until no more work is pending or active. */
-    try {
-        doWork(true);
+  /* Do work until no more work is pending or active. */
+  try {
+    doWork(true);
 
-        auto state(state_.lock());
+    auto state(state_.lock());
 
-        assert(quit);
+    assert(quit);
 
-        if (state->exception)
-            std::rethrow_exception(state->exception);
+    if (state->exception) std::rethrow_exception(state->exception);
 
-    } catch (...) {
-        /* In the exceptional case, some workers may still be
-           active. They may be referencing the stack frame of the
-           caller. So wait for them to finish. (~ThreadPool also does
-           this, but it might be destroyed after objects referenced by
-           the work item lambdas.) */
-        shutdown();
-        throw;
-    }
+  } catch (...) {
+    /* In the exceptional case, some workers may still be
+       active. They may be referencing the stack frame of the
+       caller. So wait for them to finish. (~ThreadPool also does
+       this, but it might be destroyed after objects referenced by
+       the work item lambdas.) */
+    shutdown();
+    throw;
+  }
 }
 
-void ThreadPool::doWork(bool mainThread)
-{
-    if (!mainThread)
-        interruptCheck = [&]() { return (bool) quit; };
-
-    bool didWork = false;
-    std::exception_ptr exc;
-
-    while (true) {
-        work_t w;
-        {
-            auto state(state_.lock());
-
-            if (didWork) {
-                assert(state->active);
-                state->active--;
-
-                if (exc) {
-
-                    if (!state->exception) {
-                        state->exception = exc;
-                        // Tell the other workers to quit.
-                        quit = true;
-                        work.notify_all();
-                    } else {
-                        /* Print the exception, since we can't
-                           propagate it. */
-                        try {
-                            std::rethrow_exception(exc);
-                        } catch (std::exception & e) {
-                            if (!dynamic_cast<Interrupted*>(&e) &&
-                                !dynamic_cast<ThreadPoolShutDown*>(&e))
-                                ignoreException();
-                        } catch (...) {
-                        }
-                    }
-                }
-            }
-
-            /* Wait until a work item is available or we're asked to
-               quit. */
-            while (true) {
-                if (quit) return;
+void ThreadPool::doWork(bool mainThread) {
+  if (!mainThread) interruptCheck = [&]() { return (bool)quit; };
 
-                if (!state->pending.empty()) break;
+  bool didWork = false;
+  std::exception_ptr exc;
 
-                /* If there are no active or pending items, and the
-                   main thread is running process(), then no new items
-                   can be added. So exit. */
-                if (!state->active && state->draining) {
-                    quit = true;
-                    work.notify_all();
-                    return;
-                }
-
-                state.wait(work);
+  while (true) {
+    work_t w;
+    {
+      auto state(state_.lock());
+
+      if (didWork) {
+        assert(state->active);
+        state->active--;
+
+        if (exc) {
+          if (!state->exception) {
+            state->exception = exc;
+            // Tell the other workers to quit.
+            quit = true;
+            work.notify_all();
+          } else {
+            /* Print the exception, since we can't
+               propagate it. */
+            try {
+              std::rethrow_exception(exc);
+            } catch (std::exception& e) {
+              if (!dynamic_cast<Interrupted*>(&e) &&
+                  !dynamic_cast<ThreadPoolShutDown*>(&e))
+                ignoreException();
+            } catch (...) {
             }
-
-            w = std::move(state->pending.front());
-            state->pending.pop();
-            state->active++;
+          }
         }
-
-        try {
-            w();
-        } catch (...) {
-            exc = std::current_exception();
+      }
+
+      /* Wait until a work item is available or we're asked to
+         quit. */
+      while (true) {
+        if (quit) return;
+
+        if (!state->pending.empty()) break;
+
+        /* If there are no active or pending items, and the
+           main thread is running process(), then no new items
+           can be added. So exit. */
+        if (!state->active && state->draining) {
+          quit = true;
+          work.notify_all();
+          return;
         }
 
-        didWork = true;
+        state.wait(work);
+      }
+
+      w = std::move(state->pending.front());
+      state->pending.pop();
+      state->active++;
     }
-}
 
-}
+    try {
+      w();
+    } catch (...) {
+      exc = std::current_exception();
+    }
 
+    didWork = true;
+  }
+}
 
+}  // namespace nix