blob: a16f4c2b603398ea087fc421c097c4e6697367e8 (
plain) (
tree)
|
|
#include "thread-pool.hh"
#include "affinity.hh"
#include "glog/logging.h"
namespace nix {
ThreadPool::ThreadPool(size_t _maxThreads) : maxThreads(_maxThreads) {
restoreAffinity(); // FIXME
if (!maxThreads) {
maxThreads = std::thread::hardware_concurrency();
if (!maxThreads) {
maxThreads = 1;
}
}
DLOG(INFO) << "starting pool of " << maxThreads - 1 << " threads";
}
ThreadPool::~ThreadPool() { shutdown(); }
void ThreadPool::shutdown() {
std::vector<std::thread> workers;
{
auto state(state_.lock());
quit = true;
std::swap(workers, state->workers);
}
if (workers.empty()) {
return;
}
DLOG(INFO) << "reaping " << workers.size() << " worker threads";
work.notify_all();
for (auto& thr : workers) {
thr.join();
}
}
void ThreadPool::enqueue(const work_t& t) {
auto state(state_.lock());
if (quit) {
throw ThreadPoolShutDown(
"cannot enqueue a work item while the thread pool is shutting down");
}
state->pending.push(t);
/* Note: process() also executes items, so count it as a worker. */
if (state->pending.size() > state->workers.size() + 1 &&
state->workers.size() + 1 < maxThreads)
state->workers.emplace_back(&ThreadPool::doWork, this, false);
work.notify_one();
}
void ThreadPool::process() {
state_.lock()->draining = true;
/* Do work until no more work is pending or active. */
try {
doWork(true);
auto state(state_.lock());
assert(quit);
if (state->exception) {
std::rethrow_exception(state->exception);
}
} catch (...) {
/* In the exceptional case, some workers may still be
active. They may be referencing the stack frame of the
caller. So wait for them to finish. (~ThreadPool also does
this, but it might be destroyed after objects referenced by
the work item lambdas.) */
shutdown();
throw;
}
}
void ThreadPool::doWork(bool mainThread) {
if (!mainThread) {
interruptCheck = [&]() { return (bool)quit; };
}
bool didWork = false;
std::exception_ptr exc;
while (true) {
work_t w;
{
auto state(state_.lock());
if (didWork) {
assert(state->active);
state->active--;
if (exc) {
if (!state->exception) {
state->exception = exc;
// Tell the other workers to quit.
quit = true;
work.notify_all();
} else {
/* Print the exception, since we can't
propagate it. */
try {
std::rethrow_exception(exc);
} catch (std::exception& e) {
if (!dynamic_cast<Interrupted*>(&e) &&
!dynamic_cast<ThreadPoolShutDown*>(&e))
ignoreException();
} catch (...) {
}
}
}
}
/* Wait until a work item is available or we're asked to
quit. */
while (true) {
if (quit) {
return;
}
if (!state->pending.empty()) {
break;
}
/* If there are no active or pending items, and the
main thread is running process(), then no new items
can be added. So exit. */
if (!state->active && state->draining) {
quit = true;
work.notify_all();
return;
}
state.wait(work);
}
w = std::move(state->pending.front());
state->pending.pop();
state->active++;
}
try {
w();
} catch (...) {
exc = std::current_exception();
}
didWork = true;
}
}
} // namespace nix
|