#include "libutil/thread-pool.hh" #include <glog/logging.h> #include "libutil/affinity.hh" namespace nix { ThreadPool::ThreadPool(size_t _maxThreads) : maxThreads(_maxThreads) { restoreAffinity(); // FIXME if (maxThreads == 0u) { maxThreads = std::thread::hardware_concurrency(); if (maxThreads == 0u) { maxThreads = 1; } } DLOG(INFO) << "starting pool of " << maxThreads - 1 << " threads"; } ThreadPool::~ThreadPool() { shutdown(); } void ThreadPool::shutdown() { std::vector<std::thread> workers; { auto state(state_.lock()); quit = true; std::swap(workers, state->workers); } if (workers.empty()) { return; } DLOG(INFO) << "reaping " << workers.size() << " worker threads"; work.notify_all(); for (auto& thr : workers) { thr.join(); } } void ThreadPool::enqueue(const work_t& t) { auto state(state_.lock()); if (quit) { throw ThreadPoolShutDown( "cannot enqueue a work item while the thread pool is shutting down"); } state->pending.push(t); /* Note: process() also executes items, so count it as a worker. */ if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads) { state->workers.emplace_back(&ThreadPool::doWork, this, false); } work.notify_one(); } void ThreadPool::process() { state_.lock()->draining = true; /* Do work until no more work is pending or active. */ try { doWork(true); auto state(state_.lock()); assert(quit); if (state->exception) { std::rethrow_exception(state->exception); } } catch (...) { /* In the exceptional case, some workers may still be active. They may be referencing the stack frame of the caller. So wait for them to finish. (~ThreadPool also does this, but it might be destroyed after objects referenced by the work item lambdas.) */ shutdown(); throw; } } void ThreadPool::doWork(bool mainThread) { if (!mainThread) { interruptCheck = [&]() { return (bool)quit; }; } bool didWork = false; std::exception_ptr exc; while (true) { work_t w; { auto state(state_.lock()); if (didWork) { assert(state->active); state->active--; if (exc) { if (!state->exception) { state->exception = exc; // Tell the other workers to quit. quit = true; work.notify_all(); } else { /* Print the exception, since we can't propagate it. */ try { std::rethrow_exception(exc); } catch (std::exception& e) { if ((dynamic_cast<Interrupted*>(&e) == nullptr) && (dynamic_cast<ThreadPoolShutDown*>(&e) == nullptr)) { ignoreException(); } } catch (...) { } } } } /* Wait until a work item is available or we're asked to quit. */ while (true) { if (quit) { return; } if (!state->pending.empty()) { break; } /* If there are no active or pending items, and the main thread is running process(), then no new items can be added. So exit. */ if ((state->active == 0u) && state->draining) { quit = true; work.notify_all(); return; } state.wait(work); } w = std::move(state->pending.front()); state->pending.pop(); state->active++; } try { w(); } catch (...) { exc = std::current_exception(); } didWork = true; } } } // namespace nix