diff options
Diffstat (limited to 'third_party/abseil_cpp/absl/synchronization/internal/waiter.cc')
-rw-r--r-- | third_party/abseil_cpp/absl/synchronization/internal/waiter.cc | 492 |
1 files changed, 492 insertions, 0 deletions
diff --git a/third_party/abseil_cpp/absl/synchronization/internal/waiter.cc b/third_party/abseil_cpp/absl/synchronization/internal/waiter.cc new file mode 100644 index 000000000000..b6150b9b2bf1 --- /dev/null +++ b/third_party/abseil_cpp/absl/synchronization/internal/waiter.cc @@ -0,0 +1,492 @@ +// Copyright 2017 The Abseil Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "absl/synchronization/internal/waiter.h" + +#include "absl/base/config.h" + +#ifdef _WIN32 +#include <windows.h> +#else +#include <pthread.h> +#include <sys/time.h> +#include <unistd.h> +#endif + +#ifdef __linux__ +#include <linux/futex.h> +#include <sys/syscall.h> +#endif + +#ifdef ABSL_HAVE_SEMAPHORE_H +#include <semaphore.h> +#endif + +#include <errno.h> +#include <stdio.h> +#include <time.h> + +#include <atomic> +#include <cassert> +#include <cstdint> +#include <new> +#include <type_traits> + +#include "absl/base/internal/raw_logging.h" +#include "absl/base/internal/thread_identity.h" +#include "absl/base/optimization.h" +#include "absl/synchronization/internal/kernel_timeout.h" + +namespace absl { +ABSL_NAMESPACE_BEGIN +namespace synchronization_internal { + +static void MaybeBecomeIdle() { + base_internal::ThreadIdentity *identity = + base_internal::CurrentThreadIdentityIfPresent(); + assert(identity != nullptr); + const bool is_idle = identity->is_idle.load(std::memory_order_relaxed); + const int ticker = identity->ticker.load(std::memory_order_relaxed); + const int wait_start = identity->wait_start.load(std::memory_order_relaxed); + if (!is_idle && ticker - wait_start > Waiter::kIdlePeriods) { + identity->is_idle.store(true, std::memory_order_relaxed); + } +} + +#if ABSL_WAITER_MODE == ABSL_WAITER_MODE_FUTEX + +// Some Android headers are missing these definitions even though they +// support these futex operations. +#ifdef __BIONIC__ +#ifndef SYS_futex +#define SYS_futex __NR_futex +#endif +#ifndef FUTEX_WAIT_BITSET +#define FUTEX_WAIT_BITSET 9 +#endif +#ifndef FUTEX_PRIVATE_FLAG +#define FUTEX_PRIVATE_FLAG 128 +#endif +#ifndef FUTEX_CLOCK_REALTIME +#define FUTEX_CLOCK_REALTIME 256 +#endif +#ifndef FUTEX_BITSET_MATCH_ANY +#define FUTEX_BITSET_MATCH_ANY 0xFFFFFFFF +#endif +#endif + +#if defined(__NR_futex_time64) && !defined(SYS_futex_time64) +#define SYS_futex_time64 __NR_futex_time64 +#endif + +#if defined(SYS_futex_time64) && !defined(SYS_futex) +#define SYS_futex SYS_futex_time64 +#endif + +class Futex { + public: + static int WaitUntil(std::atomic<int32_t> *v, int32_t val, + KernelTimeout t) { + int err = 0; + if (t.has_timeout()) { + // https://locklessinc.com/articles/futex_cheat_sheet/ + // Unlike FUTEX_WAIT, FUTEX_WAIT_BITSET uses absolute time. + struct timespec abs_timeout = t.MakeAbsTimespec(); + // Atomically check that the futex value is still 0, and if it + // is, sleep until abs_timeout or until woken by FUTEX_WAKE. + err = syscall( + SYS_futex, reinterpret_cast<int32_t *>(v), + FUTEX_WAIT_BITSET | FUTEX_PRIVATE_FLAG | FUTEX_CLOCK_REALTIME, val, + &abs_timeout, nullptr, FUTEX_BITSET_MATCH_ANY); + } else { + // Atomically check that the futex value is still 0, and if it + // is, sleep until woken by FUTEX_WAKE. + err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v), + FUTEX_WAIT | FUTEX_PRIVATE_FLAG, val, nullptr); + } + if (err != 0) { + err = -errno; + } + return err; + } + + static int Wake(std::atomic<int32_t> *v, int32_t count) { + int err = syscall(SYS_futex, reinterpret_cast<int32_t *>(v), + FUTEX_WAKE | FUTEX_PRIVATE_FLAG, count); + if (ABSL_PREDICT_FALSE(err < 0)) { + err = -errno; + } + return err; + } +}; + +Waiter::Waiter() { + futex_.store(0, std::memory_order_relaxed); +} + +Waiter::~Waiter() = default; + +bool Waiter::Wait(KernelTimeout t) { + // Loop until we can atomically decrement futex from a positive + // value, waiting on a futex while we believe it is zero. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + while (true) { + int32_t x = futex_.load(std::memory_order_relaxed); + while (x != 0) { + if (!futex_.compare_exchange_weak(x, x - 1, + std::memory_order_acquire, + std::memory_order_relaxed)) { + continue; // Raced with someone, retry. + } + return true; // Consumed a wakeup, we are done. + } + + + if (!first_pass) MaybeBecomeIdle(); + const int err = Futex::WaitUntil(&futex_, 0, t); + if (err != 0) { + if (err == -EINTR || err == -EWOULDBLOCK) { + // Do nothing, the loop will retry. + } else if (err == -ETIMEDOUT) { + return false; + } else { + ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err); + } + } + first_pass = false; + } +} + +void Waiter::Post() { + if (futex_.fetch_add(1, std::memory_order_release) == 0) { + // We incremented from 0, need to wake a potential waiter. + Poke(); + } +} + +void Waiter::Poke() { + // Wake one thread waiting on the futex. + const int err = Futex::Wake(&futex_, 1); + if (ABSL_PREDICT_FALSE(err < 0)) { + ABSL_RAW_LOG(FATAL, "Futex operation failed with error %d\n", err); + } +} + +#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR + +class PthreadMutexHolder { + public: + explicit PthreadMutexHolder(pthread_mutex_t *mu) : mu_(mu) { + const int err = pthread_mutex_lock(mu_); + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_mutex_lock failed: %d", err); + } + } + + PthreadMutexHolder(const PthreadMutexHolder &rhs) = delete; + PthreadMutexHolder &operator=(const PthreadMutexHolder &rhs) = delete; + + ~PthreadMutexHolder() { + const int err = pthread_mutex_unlock(mu_); + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_mutex_unlock failed: %d", err); + } + } + + private: + pthread_mutex_t *mu_; +}; + +Waiter::Waiter() { + const int err = pthread_mutex_init(&mu_, 0); + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err); + } + + const int err2 = pthread_cond_init(&cv_, 0); + if (err2 != 0) { + ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2); + } + + waiter_count_ = 0; + wakeup_count_ = 0; +} + +Waiter::~Waiter() { + const int err = pthread_mutex_destroy(&mu_); + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err); + } + + const int err2 = pthread_cond_destroy(&cv_); + if (err2 != 0) { + ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2); + } +} + +bool Waiter::Wait(KernelTimeout t) { + struct timespec abs_timeout; + if (t.has_timeout()) { + abs_timeout = t.MakeAbsTimespec(); + } + + PthreadMutexHolder h(&mu_); + ++waiter_count_; + // Loop until we find a wakeup to consume or timeout. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + while (wakeup_count_ == 0) { + if (!first_pass) MaybeBecomeIdle(); + // No wakeups available, time to wait. + if (!t.has_timeout()) { + const int err = pthread_cond_wait(&cv_, &mu_); + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err); + } + } else { + const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout); + if (err == ETIMEDOUT) { + --waiter_count_; + return false; + } + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err); + } + } + first_pass = false; + } + // Consume a wakeup and we're done. + --wakeup_count_; + --waiter_count_; + return true; +} + +void Waiter::Post() { + PthreadMutexHolder h(&mu_); + ++wakeup_count_; + InternalCondVarPoke(); +} + +void Waiter::Poke() { + PthreadMutexHolder h(&mu_); + InternalCondVarPoke(); +} + +void Waiter::InternalCondVarPoke() { + if (waiter_count_ != 0) { + const int err = pthread_cond_signal(&cv_); + if (ABSL_PREDICT_FALSE(err != 0)) { + ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err); + } + } +} + +#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM + +Waiter::Waiter() { + if (sem_init(&sem_, 0, 0) != 0) { + ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno); + } + wakeups_.store(0, std::memory_order_relaxed); +} + +Waiter::~Waiter() { + if (sem_destroy(&sem_) != 0) { + ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno); + } +} + +bool Waiter::Wait(KernelTimeout t) { + struct timespec abs_timeout; + if (t.has_timeout()) { + abs_timeout = t.MakeAbsTimespec(); + } + + // Loop until we timeout or consume a wakeup. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + while (true) { + int x = wakeups_.load(std::memory_order_relaxed); + while (x != 0) { + if (!wakeups_.compare_exchange_weak(x, x - 1, + std::memory_order_acquire, + std::memory_order_relaxed)) { + continue; // Raced with someone, retry. + } + // Successfully consumed a wakeup, we're done. + return true; + } + + if (!first_pass) MaybeBecomeIdle(); + // Nothing to consume, wait (looping on EINTR). + while (true) { + if (!t.has_timeout()) { + if (sem_wait(&sem_) == 0) break; + if (errno == EINTR) continue; + ABSL_RAW_LOG(FATAL, "sem_wait failed: %d", errno); + } else { + if (sem_timedwait(&sem_, &abs_timeout) == 0) break; + if (errno == EINTR) continue; + if (errno == ETIMEDOUT) return false; + ABSL_RAW_LOG(FATAL, "sem_timedwait failed: %d", errno); + } + } + first_pass = false; + } +} + +void Waiter::Post() { + // Post a wakeup. + if (wakeups_.fetch_add(1, std::memory_order_release) == 0) { + // We incremented from 0, need to wake a potential waiter. + Poke(); + } +} + +void Waiter::Poke() { + if (sem_post(&sem_) != 0) { // Wake any semaphore waiter. + ABSL_RAW_LOG(FATAL, "sem_post failed with errno %d\n", errno); + } +} + +#elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32 + +class Waiter::WinHelper { + public: + static SRWLOCK *GetLock(Waiter *w) { + return reinterpret_cast<SRWLOCK *>(&w->mu_storage_); + } + + static CONDITION_VARIABLE *GetCond(Waiter *w) { + return reinterpret_cast<CONDITION_VARIABLE *>(&w->cv_storage_); + } + + static_assert(sizeof(SRWLOCK) == sizeof(void *), + "`mu_storage_` does not have the same size as SRWLOCK"); + static_assert(alignof(SRWLOCK) == alignof(void *), + "`mu_storage_` does not have the same alignment as SRWLOCK"); + + static_assert(sizeof(CONDITION_VARIABLE) == sizeof(void *), + "`ABSL_CONDITION_VARIABLE_STORAGE` does not have the same size " + "as `CONDITION_VARIABLE`"); + static_assert( + alignof(CONDITION_VARIABLE) == alignof(void *), + "`cv_storage_` does not have the same alignment as `CONDITION_VARIABLE`"); + + // The SRWLOCK and CONDITION_VARIABLE types must be trivially constructible + // and destructible because we never call their constructors or destructors. + static_assert(std::is_trivially_constructible<SRWLOCK>::value, + "The `SRWLOCK` type must be trivially constructible"); + static_assert( + std::is_trivially_constructible<CONDITION_VARIABLE>::value, + "The `CONDITION_VARIABLE` type must be trivially constructible"); + static_assert(std::is_trivially_destructible<SRWLOCK>::value, + "The `SRWLOCK` type must be trivially destructible"); + static_assert(std::is_trivially_destructible<CONDITION_VARIABLE>::value, + "The `CONDITION_VARIABLE` type must be trivially destructible"); +}; + +class LockHolder { + public: + explicit LockHolder(SRWLOCK* mu) : mu_(mu) { + AcquireSRWLockExclusive(mu_); + } + + LockHolder(const LockHolder&) = delete; + LockHolder& operator=(const LockHolder&) = delete; + + ~LockHolder() { + ReleaseSRWLockExclusive(mu_); + } + + private: + SRWLOCK* mu_; +}; + +Waiter::Waiter() { + auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK; + auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE; + InitializeSRWLock(mu); + InitializeConditionVariable(cv); + waiter_count_ = 0; + wakeup_count_ = 0; +} + +// SRW locks and condition variables do not need to be explicitly destroyed. +// https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock +// https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with +Waiter::~Waiter() = default; + +bool Waiter::Wait(KernelTimeout t) { + SRWLOCK *mu = WinHelper::GetLock(this); + CONDITION_VARIABLE *cv = WinHelper::GetCond(this); + + LockHolder h(mu); + ++waiter_count_; + + // Loop until we find a wakeup to consume or timeout. + // Note that, since the thread ticker is just reset, we don't need to check + // whether the thread is idle on the very first pass of the loop. + bool first_pass = true; + while (wakeup_count_ == 0) { + if (!first_pass) MaybeBecomeIdle(); + // No wakeups available, time to wait. + if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) { + // GetLastError() returns a Win32 DWORD, but we assign to + // unsigned long to simplify the ABSL_RAW_LOG case below. The uniform + // initialization guarantees this is not a narrowing conversion. + const unsigned long err{GetLastError()}; // NOLINT(runtime/int) + if (err == ERROR_TIMEOUT) { + --waiter_count_; + return false; + } else { + ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err); + } + } + first_pass = false; + } + // Consume a wakeup and we're done. + --wakeup_count_; + --waiter_count_; + return true; +} + +void Waiter::Post() { + LockHolder h(WinHelper::GetLock(this)); + ++wakeup_count_; + InternalCondVarPoke(); +} + +void Waiter::Poke() { + LockHolder h(WinHelper::GetLock(this)); + InternalCondVarPoke(); +} + +void Waiter::InternalCondVarPoke() { + if (waiter_count_ != 0) { + WakeConditionVariable(WinHelper::GetCond(this)); + } +} + +#else +#error Unknown ABSL_WAITER_MODE +#endif + +} // namespace synchronization_internal +ABSL_NAMESPACE_END +} // namespace absl |