diff options
Diffstat (limited to 'third_party/nix/src/libutil/thread-pool.cc')
-rw-r--r-- | third_party/nix/src/libutil/thread-pool.cc | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/third_party/nix/src/libutil/thread-pool.cc b/third_party/nix/src/libutil/thread-pool.cc new file mode 100644 index 000000000000..879de446c294 --- /dev/null +++ b/third_party/nix/src/libutil/thread-pool.cc @@ -0,0 +1,162 @@ +#include "thread-pool.hh" + +#include "affinity.hh" +#include "glog/logging.h" + +namespace nix { + +ThreadPool::ThreadPool(size_t _maxThreads) : maxThreads(_maxThreads) { + restoreAffinity(); // FIXME + + if (maxThreads == 0u) { + maxThreads = std::thread::hardware_concurrency(); + if (maxThreads == 0u) { + maxThreads = 1; + } + } + + DLOG(INFO) << "starting pool of " << maxThreads - 1 << " threads"; +} + +ThreadPool::~ThreadPool() { shutdown(); } + +void ThreadPool::shutdown() { + std::vector<std::thread> workers; + { + auto state(state_.lock()); + quit = true; + std::swap(workers, state->workers); + } + + if (workers.empty()) { + return; + } + + DLOG(INFO) << "reaping " << workers.size() << " worker threads"; + + work.notify_all(); + + for (auto& thr : workers) { + thr.join(); + } +} + +void ThreadPool::enqueue(const work_t& t) { + auto state(state_.lock()); + if (quit) { + throw ThreadPoolShutDown( + "cannot enqueue a work item while the thread pool is shutting down"); + } + state->pending.push(t); + /* Note: process() also executes items, so count it as a worker. */ + if (state->pending.size() > state->workers.size() + 1 && + state->workers.size() + 1 < maxThreads) { + state->workers.emplace_back(&ThreadPool::doWork, this, false); + } + work.notify_one(); +} + +void ThreadPool::process() { + state_.lock()->draining = true; + + /* Do work until no more work is pending or active. */ + try { + doWork(true); + + auto state(state_.lock()); + + assert(quit); + + if (state->exception) { + std::rethrow_exception(state->exception); + } + + } catch (...) { + /* In the exceptional case, some workers may still be + active. They may be referencing the stack frame of the + caller. So wait for them to finish. (~ThreadPool also does + this, but it might be destroyed after objects referenced by + the work item lambdas.) */ + shutdown(); + throw; + } +} + +void ThreadPool::doWork(bool mainThread) { + if (!mainThread) { + interruptCheck = [&]() { return (bool)quit; }; + } + + bool didWork = false; + std::exception_ptr exc; + + while (true) { + work_t w; + { + auto state(state_.lock()); + + if (didWork) { + assert(state->active); + state->active--; + + if (exc) { + if (!state->exception) { + state->exception = exc; + // Tell the other workers to quit. + quit = true; + work.notify_all(); + } else { + /* Print the exception, since we can't + propagate it. */ + try { + std::rethrow_exception(exc); + } catch (std::exception& e) { + if ((dynamic_cast<Interrupted*>(&e) == nullptr) && + (dynamic_cast<ThreadPoolShutDown*>(&e) == nullptr)) { + ignoreException(); + } + } catch (...) { + } + } + } + } + + /* Wait until a work item is available or we're asked to + quit. */ + while (true) { + if (quit) { + return; + } + + if (!state->pending.empty()) { + break; + } + + /* If there are no active or pending items, and the + main thread is running process(), then no new items + can be added. So exit. */ + if ((state->active == 0u) && state->draining) { + quit = true; + work.notify_all(); + return; + } + + state.wait(work); + } + + w = std::move(state->pending.front()); + state->pending.pop(); + state->active++; + } + + try { + w(); + } catch (...) { + exc = std::current_exception(); + } + + didWork = true; + } +} + +} // namespace nix |