about summary refs log tree commit diff
path: root/absl/synchronization/internal/thread_pool.h
diff options
context:
space:
mode:
Diffstat (limited to 'absl/synchronization/internal/thread_pool.h')
-rw-r--r--absl/synchronization/internal/thread_pool.h90
1 files changed, 90 insertions, 0 deletions
diff --git a/absl/synchronization/internal/thread_pool.h b/absl/synchronization/internal/thread_pool.h
new file mode 100644
index 000000000000..846404277a03
--- /dev/null
+++ b/absl/synchronization/internal/thread_pool.h
@@ -0,0 +1,90 @@
+// Copyright 2017 The Abseil Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
+#define ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_
+
+#include <cassert>
+#include <functional>
+#include <queue>
+#include <thread>  // NOLINT(build/c++11)
+#include <vector>
+
+#include "absl/base/thread_annotations.h"
+#include "absl/synchronization/mutex.h"
+
+namespace absl {
+namespace synchronization_internal {
+
+// A simple ThreadPool implementation for tests.
+class ThreadPool {
+ public:
+  explicit ThreadPool(int num_threads) {
+    for (int i = 0; i < num_threads; ++i) {
+      threads_.push_back(std::thread(&ThreadPool::WorkLoop, this));
+    }
+  }
+
+  ThreadPool(const ThreadPool &) = delete;
+  ThreadPool &operator=(const ThreadPool &) = delete;
+
+  ~ThreadPool() {
+    {
+      absl::MutexLock l(&mu_);
+      for (int i = 0; i < threads_.size(); ++i) {
+        queue_.push(nullptr);  // Shutdown signal.
+      }
+    }
+    for (auto &t : threads_) {
+      t.join();
+    }
+  }
+
+  // Schedule a function to be run on a ThreadPool thread immediately.
+  void Schedule(std::function<void()> func) {
+    assert(func != nullptr);
+    absl::MutexLock l(&mu_);
+    queue_.push(std::move(func));
+  }
+
+ private:
+  bool WorkAvailable() const EXCLUSIVE_LOCKS_REQUIRED(mu_) {
+    return !queue_.empty();
+  }
+
+  void WorkLoop() {
+    while (true) {
+      std::function<void()> func;
+      {
+        absl::MutexLock l(&mu_);
+        mu_.Await(absl::Condition(this, &ThreadPool::WorkAvailable));
+        func = std::move(queue_.front());
+        queue_.pop();
+      }
+      if (func == nullptr) {  // Shutdown signal.
+        break;
+      }
+      func();
+    }
+  }
+
+  absl::Mutex mu_;
+  std::queue<std::function<void()>> queue_ GUARDED_BY(mu_);
+  std::vector<std::thread> threads_;
+};
+
+}  // namespace synchronization_internal
+}  // namespace absl
+
+#endif  // ABSL_SYNCHRONIZATION_INTERNAL_THREAD_POOL_H_