// Copyright 2017 The Abseil Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ #define ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_ #include <cassert> #include <cstddef> #include <functional> #include <queue> #include <thread> // NOLINT(build/c++11) #include <vector> #include "absl/base/thread_annotations.h" #include "absl/synchronization/mutex.h" namespace absl { ABSL_NAMESPACE_BEGIN namespace synchronization_internal { // A simple ThreadPool implementation for tests. class ThreadPool { public: explicit ThreadPool(int num_threads) { for (int i = 0; i < num_threads; ++i) { threads_.push_back(std::thread(&ThreadPool::WorkLoop, this)); } } ThreadPool(const ThreadPool &) = delete; ThreadPool &operator=(const ThreadPool &) = delete; ~ThreadPool() { { absl::MutexLock l(&mu_); for (size_t i = 0; i < threads_.size(); i++) { queue_.push(nullptr); // Shutdown signal. } } for (auto &t : threads_) { t.join(); } } // Schedule a function to be run on a ThreadPool thread immediately. void Schedule(std::function<void()> func) { assert(func != nullptr); absl::MutexLock l(&mu_); queue_.push(std::move(func)); } private: bool WorkAvailable() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return !queue_.empty(); } void WorkLoop() { while (true) { std::function<void()> func; { absl::MutexLock l(&mu_); mu_.Await(absl::Condition(this, &ThreadPool::WorkAvailable)); func = std::move(queue_.front()); queue_.pop(); } if (func == nullptr) { // Shutdown signal. break; } func(); } } absl::Mutex mu_; std::queue<std::function<void()>> queue_ ABSL_GUARDED_BY(mu_); std::vector<std::thread> threads_; }; } // namespace synchronization_internal ABSL_NAMESPACE_END } // namespace absl #endif // ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_