diff options
Diffstat (limited to 'third_party/nix/src/libutil/thread-pool.cc')
-rw-r--r-- | third_party/nix/src/libutil/thread-pool.cc | 163 |
1 files changed, 0 insertions, 163 deletions
diff --git a/third_party/nix/src/libutil/thread-pool.cc b/third_party/nix/src/libutil/thread-pool.cc deleted file mode 100644 index 7c6b0a1b469b..000000000000 --- a/third_party/nix/src/libutil/thread-pool.cc +++ /dev/null @@ -1,163 +0,0 @@ -#include "libutil/thread-pool.hh" - -#include <glog/logging.h> - -#include "libutil/affinity.hh" - -namespace nix { - -ThreadPool::ThreadPool(size_t _maxThreads) : maxThreads(_maxThreads) { - restoreAffinity(); // FIXME - - if (maxThreads == 0u) { - maxThreads = std::thread::hardware_concurrency(); - if (maxThreads == 0u) { - maxThreads = 1; - } - } - - DLOG(INFO) << "starting pool of " << maxThreads - 1 << " threads"; -} - -ThreadPool::~ThreadPool() { shutdown(); } - -void ThreadPool::shutdown() { - std::vector<std::thread> workers; - { - auto state(state_.lock()); - quit = true; - std::swap(workers, state->workers); - } - - if (workers.empty()) { - return; - } - - DLOG(INFO) << "reaping " << workers.size() << " worker threads"; - - work.notify_all(); - - for (auto& thr : workers) { - thr.join(); - } -} - -void ThreadPool::enqueue(const work_t& t) { - auto state(state_.lock()); - if (quit) { - throw ThreadPoolShutDown( - "cannot enqueue a work item while the thread pool is shutting down"); - } - state->pending.push(t); - /* Note: process() also executes items, so count it as a worker. */ - if (state->pending.size() > state->workers.size() + 1 && - state->workers.size() + 1 < maxThreads) { - state->workers.emplace_back(&ThreadPool::doWork, this, false); - } - work.notify_one(); -} - -void ThreadPool::process() { - state_.lock()->draining = true; - - /* Do work until no more work is pending or active. */ - try { - doWork(true); - - auto state(state_.lock()); - - assert(quit); - - if (state->exception) { - std::rethrow_exception(state->exception); - } - - } catch (...) { - /* In the exceptional case, some workers may still be - active. They may be referencing the stack frame of the - caller. So wait for them to finish. (~ThreadPool also does - this, but it might be destroyed after objects referenced by - the work item lambdas.) */ - shutdown(); - throw; - } -} - -void ThreadPool::doWork(bool mainThread) { - if (!mainThread) { - interruptCheck = [&]() { return (bool)quit; }; - } - - bool didWork = false; - std::exception_ptr exc; - - while (true) { - work_t w; - { - auto state(state_.lock()); - - if (didWork) { - assert(state->active); - state->active--; - - if (exc) { - if (!state->exception) { - state->exception = exc; - // Tell the other workers to quit. - quit = true; - work.notify_all(); - } else { - /* Print the exception, since we can't - propagate it. */ - try { - std::rethrow_exception(exc); - } catch (std::exception& e) { - if ((dynamic_cast<Interrupted*>(&e) == nullptr) && - (dynamic_cast<ThreadPoolShutDown*>(&e) == nullptr)) { - ignoreException(); - } - } catch (...) { - } - } - } - } - - /* Wait until a work item is available or we're asked to - quit. */ - while (true) { - if (quit) { - return; - } - - if (!state->pending.empty()) { - break; - } - - /* If there are no active or pending items, and the - main thread is running process(), then no new items - can be added. So exit. */ - if ((state->active == 0u) && state->draining) { - quit = true; - work.notify_all(); - return; - } - - state.wait(work); - } - - w = std::move(state->pending.front()); - state->pending.pop(); - state->active++; - } - - try { - w(); - } catch (...) { - exc = std::current_exception(); - } - - didWork = true; - } -} - -} // namespace nix |