diff options
author | Vincent Ambo <tazjin@google.com> | 2020-05-17T15·31+0100 |
---|---|---|
committer | Vincent Ambo <tazjin@google.com> | 2020-05-17T15·31+0100 |
commit | 0f2cf531f705d370321843e5ba9135b2ebdb5d19 (patch) | |
tree | 256feb13963a849ed96e89228fa05454c2a22363 /third_party/nix/src/libutil/thread-pool.cc | |
parent | 65a1aae98ce5a237c9643e639e550c8b0c0be7f1 (diff) |
style(3p/nix): Reformat project in Google C++ style r/740
Reformatted with: fd . -e hh -e cc | xargs clang-format -i
Diffstat (limited to 'third_party/nix/src/libutil/thread-pool.cc')
-rw-r--r-- | third_party/nix/src/libutil/thread-pool.cc | 231 |
1 files changed, 109 insertions, 122 deletions
diff --git a/third_party/nix/src/libutil/thread-pool.cc b/third_party/nix/src/libutil/thread-pool.cc index 857ee91f87d0..5b5be926539d 100644 --- a/third_party/nix/src/libutil/thread-pool.cc +++ b/third_party/nix/src/libutil/thread-pool.cc @@ -3,154 +3,141 @@ namespace nix { -ThreadPool::ThreadPool(size_t _maxThreads) - : maxThreads(_maxThreads) -{ - restoreAffinity(); // FIXME - - if (!maxThreads) { - maxThreads = std::thread::hardware_concurrency(); - if (!maxThreads) maxThreads = 1; - } +ThreadPool::ThreadPool(size_t _maxThreads) : maxThreads(_maxThreads) { + restoreAffinity(); // FIXME - debug("starting pool of %d threads", maxThreads - 1); -} + if (!maxThreads) { + maxThreads = std::thread::hardware_concurrency(); + if (!maxThreads) maxThreads = 1; + } -ThreadPool::~ThreadPool() -{ - shutdown(); + debug("starting pool of %d threads", maxThreads - 1); } -void ThreadPool::shutdown() -{ - std::vector<std::thread> workers; - { - auto state(state_.lock()); - quit = true; - std::swap(workers, state->workers); - } +ThreadPool::~ThreadPool() { shutdown(); } + +void ThreadPool::shutdown() { + std::vector<std::thread> workers; + { + auto state(state_.lock()); + quit = true; + std::swap(workers, state->workers); + } - if (workers.empty()) return; + if (workers.empty()) return; - debug("reaping %d worker threads", workers.size()); + debug("reaping %d worker threads", workers.size()); - work.notify_all(); + work.notify_all(); - for (auto & thr : workers) - thr.join(); + for (auto& thr : workers) thr.join(); } -void ThreadPool::enqueue(const work_t & t) -{ - auto state(state_.lock()); - if (quit) - throw ThreadPoolShutDown("cannot enqueue a work item while the thread pool is shutting down"); - state->pending.push(t); - /* Note: process() also executes items, so count it as a worker. */ - if (state->pending.size() > state->workers.size() + 1 && state->workers.size() + 1 < maxThreads) - state->workers.emplace_back(&ThreadPool::doWork, this, false); - work.notify_one(); +void ThreadPool::enqueue(const work_t& t) { + auto state(state_.lock()); + if (quit) + throw ThreadPoolShutDown( + "cannot enqueue a work item while the thread pool is shutting down"); + state->pending.push(t); + /* Note: process() also executes items, so count it as a worker. */ + if (state->pending.size() > state->workers.size() + 1 && + state->workers.size() + 1 < maxThreads) + state->workers.emplace_back(&ThreadPool::doWork, this, false); + work.notify_one(); } -void ThreadPool::process() -{ - state_.lock()->draining = true; +void ThreadPool::process() { + state_.lock()->draining = true; - /* Do work until no more work is pending or active. */ - try { - doWork(true); + /* Do work until no more work is pending or active. */ + try { + doWork(true); - auto state(state_.lock()); + auto state(state_.lock()); - assert(quit); + assert(quit); - if (state->exception) - std::rethrow_exception(state->exception); + if (state->exception) std::rethrow_exception(state->exception); - } catch (...) { - /* In the exceptional case, some workers may still be - active. They may be referencing the stack frame of the - caller. So wait for them to finish. (~ThreadPool also does - this, but it might be destroyed after objects referenced by - the work item lambdas.) */ - shutdown(); - throw; - } + } catch (...) { + /* In the exceptional case, some workers may still be + active. They may be referencing the stack frame of the + caller. So wait for them to finish. (~ThreadPool also does + this, but it might be destroyed after objects referenced by + the work item lambdas.) */ + shutdown(); + throw; + } } -void ThreadPool::doWork(bool mainThread) -{ - if (!mainThread) - interruptCheck = [&]() { return (bool) quit; }; - - bool didWork = false; - std::exception_ptr exc; - - while (true) { - work_t w; - { - auto state(state_.lock()); - - if (didWork) { - assert(state->active); - state->active--; - - if (exc) { - - if (!state->exception) { - state->exception = exc; - // Tell the other workers to quit. - quit = true; - work.notify_all(); - } else { - /* Print the exception, since we can't - propagate it. */ - try { - std::rethrow_exception(exc); - } catch (std::exception & e) { - if (!dynamic_cast<Interrupted*>(&e) && - !dynamic_cast<ThreadPoolShutDown*>(&e)) - ignoreException(); - } catch (...) { - } - } - } - } - - /* Wait until a work item is available or we're asked to - quit. */ - while (true) { - if (quit) return; +void ThreadPool::doWork(bool mainThread) { + if (!mainThread) interruptCheck = [&]() { return (bool)quit; }; - if (!state->pending.empty()) break; + bool didWork = false; + std::exception_ptr exc; - /* If there are no active or pending items, and the - main thread is running process(), then no new items - can be added. So exit. */ - if (!state->active && state->draining) { - quit = true; - work.notify_all(); - return; - } - - state.wait(work); + while (true) { + work_t w; + { + auto state(state_.lock()); + + if (didWork) { + assert(state->active); + state->active--; + + if (exc) { + if (!state->exception) { + state->exception = exc; + // Tell the other workers to quit. + quit = true; + work.notify_all(); + } else { + /* Print the exception, since we can't + propagate it. */ + try { + std::rethrow_exception(exc); + } catch (std::exception& e) { + if (!dynamic_cast<Interrupted*>(&e) && + !dynamic_cast<ThreadPoolShutDown*>(&e)) + ignoreException(); + } catch (...) { } - - w = std::move(state->pending.front()); - state->pending.pop(); - state->active++; + } } - - try { - w(); - } catch (...) { - exc = std::current_exception(); + } + + /* Wait until a work item is available or we're asked to + quit. */ + while (true) { + if (quit) return; + + if (!state->pending.empty()) break; + + /* If there are no active or pending items, and the + main thread is running process(), then no new items + can be added. So exit. */ + if (!state->active && state->draining) { + quit = true; + work.notify_all(); + return; } - didWork = true; + state.wait(work); + } + + w = std::move(state->pending.front()); + state->pending.pop(); + state->active++; } -} -} + try { + w(); + } catch (...) { + exc = std::current_exception(); + } + didWork = true; + } +} +} // namespace nix |