diff options
Diffstat (limited to 'src/libutil/thread-pool.cc')
-rw-r--r-- | src/libutil/thread-pool.cc | 81 |
1 files changed, 51 insertions, 30 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc index ce126d36db8e..857ee91f87d0 100644 --- a/src/libutil/thread-pool.cc +++ b/src/libutil/thread-pool.cc @@ -13,11 +13,16 @@ ThreadPool::ThreadPool(size_t _maxThreads) if (!maxThreads) maxThreads = 1; } - debug(format("starting pool of %d threads") % maxThreads); + debug("starting pool of %d threads", maxThreads - 1); } ThreadPool::~ThreadPool() { + shutdown(); +} + +void ThreadPool::shutdown() +{ std::vector<std::thread> workers; { auto state(state_.lock()); @@ -25,7 +30,9 @@ ThreadPool::~ThreadPool() std::swap(workers, state->workers); } - debug(format("reaping %d worker threads") % workers.size()); + if (workers.empty()) return; + + debug("reaping %d worker threads", workers.size()); work.notify_all(); @@ -38,32 +45,43 @@ void ThreadPool::enqueue(const work_t & t) auto state(state_.lock()); if (quit) throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down"); - state->left.push(t); - if (state->left.size() > state->workers.size() && state->workers.size() < maxThreads) - state->workers.emplace_back(&ThreadPool::workerEntry, this); + state->pending.push(t); + /* Note: process() also executes items, so count it as a worker. */ + if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads) + state->workers.emplace_back(&ThreadPool::doWork, this, false); work.notify_one(); } void ThreadPool::process() { - /* Loop until there are no active work items *and* there either - are no queued items or there is an exception. The - post-condition is that no new items will become active. */ - while (true) { + state_.lock()->draining = true; + + /* Do work until no more work is pending or active. */ + try { + doWork(true); + auto state(state_.lock()); - if (!state->active) { - if (state->exception) - std::rethrow_exception(state->exception); - if (state->left.empty()) - break; - } - state.wait(done); + + assert(quit); + + if (state->exception) + std::rethrow_exception(state->exception); + + } catch (...) { + /* In the exceptional case, some workers may still be + active. They may be referencing the stack frame of the + caller. So wait for them to finish. (~ThreadPool also does + this, but it might be destroyed after objects referenced by + the work item lambdas.) */ + shutdown(); + throw; } } -void ThreadPool::workerEntry() +void ThreadPool::doWork(bool mainThread) { - interruptCheck = [&]() { return (bool) quit; }; + if (!mainThread) + interruptCheck = [&]() { return (bool) quit; }; bool didWork = false; std::exception_ptr exc; @@ -99,24 +117,27 @@ void ThreadPool::workerEntry() } } - /* Wait until a work item is available or another thread - had an exception or we're asked to quit. */ + /* Wait until a work item is available or we're asked to + quit. */ while (true) { - if (quit) { - if (!state->active) - done.notify_one(); - return; - } - if (!state->left.empty()) break; - if (!state->active) { - done.notify_one(); + if (quit) return; + + if (!state->pending.empty()) break; + + /* If there are no active or pending items, and the + main thread is running process(), then no new items + can be added. So exit. */ + if (!state->active && state->draining) { + quit = true; + work.notify_all(); return; } + state.wait(work); } - w = std::move(state->left.front()); - state->left.pop(); + w = std::move(state->pending.front()); + state->pending.pop(); state->active++; } |