diff options
Diffstat (limited to 'src/libutil')
-rw-r--r-- | src/libutil/thread-pool.cc | 75 | ||||
-rw-r--r-- | src/libutil/thread-pool.hh | 2 |
2 files changed, 53 insertions, 24 deletions
diff --git a/src/libutil/thread-pool.cc b/src/libutil/thread-pool.cc index 0a3a407240f7..f43dbe0c3715 100644 --- a/src/libutil/thread-pool.cc +++ b/src/libutil/thread-pool.cc @@ -46,11 +46,17 @@ void ThreadPool::enqueue(const work_t & t) void ThreadPool::process() { + /* Loop until there are no active work items *and* there either + are no queued items or there is an exception. The + post-condition is that no new items will become active. */ while (true) { auto state(state_.lock()); - if (state->exception) - std::rethrow_exception(state->exception); - if (state->left.empty() && !state->pending) break; + if (!state->active) { + if (state->exception) + std::rethrow_exception(state->exception); + if (state->left.empty()) + break; + } state.wait(done); } } @@ -58,41 +64,64 @@ void ThreadPool::process() void ThreadPool::workerEntry() { bool didWork = false; + std::exception_ptr exc; while (true) { work_t w; { auto state(state_.lock()); + + if (didWork) { + assert(state->active); + state->active--; + + if (exc) { + + if (!state->exception) { + state->exception = exc; + // Tell the other workers to quit. + state->quit = true; + work.notify_all(); + } else { + /* Print the exception, since we can't + propagate it. */ + try { + std::rethrow_exception(exc); + } catch (std::exception & e) { + if (!dynamic_cast<Interrupted*>(&e) && + !dynamic_cast<ThreadPoolShutDown*>(&e)) + ignoreException(); + } catch (...) { + } + } + } + } + + /* Wait until a work item is available or another thread + had an exception or we're asked to quit. */ while (true) { - if (state->quit || state->exception) return; - if (didWork) { - assert(state->pending); - state->pending--; - didWork = false; + if (state->quit) { + if (!state->active) + done.notify_one(); + return; } if (!state->left.empty()) break; - if (!state->pending) - done.notify_all(); + if (!state->active) { + done.notify_one(); + return; + } state.wait(work); } - w = state->left.front(); + + w = std::move(state->left.front()); state->left.pop(); - state->pending++; + state->active++; } try { w(); - } catch (std::exception & e) { - auto state(state_.lock()); - if (state->exception) { - if (!dynamic_cast<Interrupted*>(&e) && - !dynamic_cast<ThreadPoolShutDown*>(&e)) - printError(format("error: %s") % e.what()); - } else { - state->exception = std::current_exception(); - work.notify_all(); - done.notify_all(); - } + } catch (...) { + exc = std::current_exception(); } didWork = true; diff --git a/src/libutil/thread-pool.hh b/src/libutil/thread-pool.hh index 361a9d33a732..835dfb4b83a6 100644 --- a/src/libutil/thread-pool.hh +++ b/src/libutil/thread-pool.hh @@ -44,7 +44,7 @@ private: struct State { std::queue<work_t> left; - size_t pending = 0; + size_t active = 0; std::exception_ptr exception; std::vector<std::thread> workers; bool quit = false; |