diff options
Diffstat (limited to 'absl/synchronization')
-rw-r--r-- | absl/synchronization/internal/create_thread_identity.cc | 4 | ||||
-rw-r--r-- | absl/synchronization/internal/create_thread_identity.h | 4 | ||||
-rw-r--r-- | absl/synchronization/internal/per_thread_sem.cc | 6 | ||||
-rw-r--r-- | absl/synchronization/internal/per_thread_sem.h | 5 | ||||
-rw-r--r-- | absl/synchronization/internal/waiter.cc | 77 | ||||
-rw-r--r-- | absl/synchronization/internal/waiter.h | 27 |
6 files changed, 77 insertions, 46 deletions
diff --git a/absl/synchronization/internal/create_thread_identity.cc b/absl/synchronization/internal/create_thread_identity.cc index ce3676a88482..eef661f23d1c 100644 --- a/absl/synchronization/internal/create_thread_identity.cc +++ b/absl/synchronization/internal/create_thread_identity.cc @@ -37,7 +37,7 @@ static base_internal::ThreadIdentity* thread_identity_freelist; // A per-thread destructor for reclaiming associated ThreadIdentity objects. // Since we must preserve their storage we cache them for re-use. -static void ReclaimThreadIdentity(void* v) { +void ReclaimThreadIdentity(void* v) { base_internal::ThreadIdentity* identity = static_cast<base_internal::ThreadIdentity*>(v); @@ -47,6 +47,8 @@ static void ReclaimThreadIdentity(void* v) { base_internal::LowLevelAlloc::Free(identity->per_thread_synch.all_locks); } + PerThreadSem::Destroy(identity); + // We must explicitly clear the current thread's identity: // (a) Subsequent (unrelated) per-thread destructors may require an identity. // We must guarantee a new identity is used in this case (this instructor diff --git a/absl/synchronization/internal/create_thread_identity.h b/absl/synchronization/internal/create_thread_identity.h index ebb16c56a7ca..97237a6ede9a 100644 --- a/absl/synchronization/internal/create_thread_identity.h +++ b/absl/synchronization/internal/create_thread_identity.h @@ -35,6 +35,10 @@ namespace synchronization_internal { // For private use only. base_internal::ThreadIdentity* CreateThreadIdentity(); +// A per-thread destructor for reclaiming associated ThreadIdentity objects. +// For private use only. +void ReclaimThreadIdentity(void* v); + // Returns the ThreadIdentity object representing the calling thread; guaranteed // to be unique for its lifetime. The returned object will remain valid for the // program's lifetime; although it may be re-assigned to a subsequent thread. diff --git a/absl/synchronization/internal/per_thread_sem.cc b/absl/synchronization/internal/per_thread_sem.cc index b7014fb2b02d..2a78b20fc883 100644 --- a/absl/synchronization/internal/per_thread_sem.cc +++ b/absl/synchronization/internal/per_thread_sem.cc @@ -40,12 +40,16 @@ std::atomic<int> *PerThreadSem::GetThreadBlockedCounter() { } void PerThreadSem::Init(base_internal::ThreadIdentity *identity) { - Waiter::GetWaiter(identity)->Init(); + new (Waiter::GetWaiter(identity)) Waiter(); identity->ticker.store(0, std::memory_order_relaxed); identity->wait_start.store(0, std::memory_order_relaxed); identity->is_idle.store(false, std::memory_order_relaxed); } +void PerThreadSem::Destroy(base_internal::ThreadIdentity *identity) { + Waiter::GetWaiter(identity)->~Waiter(); +} + void PerThreadSem::Tick(base_internal::ThreadIdentity *identity) { const int ticker = identity->ticker.fetch_add(1, std::memory_order_relaxed) + 1; diff --git a/absl/synchronization/internal/per_thread_sem.h b/absl/synchronization/internal/per_thread_sem.h index e7da070ba72c..113cdfb7dc1e 100644 --- a/absl/synchronization/internal/per_thread_sem.h +++ b/absl/synchronization/internal/per_thread_sem.h @@ -65,6 +65,10 @@ class PerThreadSem { // REQUIRES: May only be called by ThreadIdentity. static void Init(base_internal::ThreadIdentity* identity); + // Destroy the PerThreadSem associated with "identity". + // REQUIRES: May only be called by ThreadIdentity. + static void Destroy(base_internal::ThreadIdentity* identity); + // Increments "identity"'s count. static inline void Post(base_internal::ThreadIdentity* identity); @@ -77,6 +81,7 @@ class PerThreadSem { friend class PerThreadSemTest; friend class absl::Mutex; friend absl::base_internal::ThreadIdentity* CreateThreadIdentity(); + friend void ReclaimThreadIdentity(void* v); }; } // namespace synchronization_internal diff --git a/absl/synchronization/internal/waiter.cc b/absl/synchronization/internal/waiter.cc index 2f41aa5f2b44..8e7137705325 100644 --- a/absl/synchronization/internal/waiter.cc +++ b/absl/synchronization/internal/waiter.cc @@ -122,10 +122,12 @@ class Futex { } }; -void Waiter::Init() { +Waiter::Waiter() { futex_.store(0, std::memory_order_relaxed); } +Waiter::~Waiter() = default; + bool Waiter::Wait(KernelTimeout t) { // Loop until we can atomically decrement futex from a positive // value, waiting on a futex while we believe it is zero. @@ -199,7 +201,7 @@ class PthreadMutexHolder { pthread_mutex_t *mu_; }; -void Waiter::Init() { +Waiter::Waiter() { const int err = pthread_mutex_init(&mu_, 0); if (err != 0) { ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err); @@ -214,6 +216,18 @@ void Waiter::Init() { wakeup_count_ = 0; } +Waiter::~Waiter() { + const int err = pthread_mutex_destroy(&mu_); + if (err != 0) { + ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err); + } + + const int err2 = pthread_cond_destroy(&cv_); + if (err2 != 0) { + ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2); + } +} + bool Waiter::Wait(KernelTimeout t) { struct timespec abs_timeout; if (t.has_timeout()) { @@ -274,13 +288,19 @@ void Waiter::InternalCondVarPoke() { #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM -void Waiter::Init() { +Waiter::Waiter() { if (sem_init(&sem_, 0, 0) != 0) { ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno); } wakeups_.store(0, std::memory_order_relaxed); } +Waiter::~Waiter() { + if (sem_destroy(&sem_) != 0) { + ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno); + } +} + bool Waiter::Wait(KernelTimeout t) { struct timespec abs_timeout; if (t.has_timeout()) { @@ -388,39 +408,32 @@ class LockHolder { SRWLOCK* mu_; }; -void Waiter::Init() { +Waiter::Waiter() { auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK; auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE; InitializeSRWLock(mu); InitializeConditionVariable(cv); - waiter_count_.store(0, std::memory_order_relaxed); - wakeup_count_.store(0, std::memory_order_relaxed); + waiter_count_ = 0; + wakeup_count_ = 0; } +// SRW locks and condition variables do not need to be explicitly destroyed. +// https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock +// https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with +Waiter::~Waiter() = default; + bool Waiter::Wait(KernelTimeout t) { SRWLOCK *mu = WinHelper::GetLock(this); CONDITION_VARIABLE *cv = WinHelper::GetCond(this); LockHolder h(mu); - waiter_count_.fetch_add(1, std::memory_order_relaxed); + ++waiter_count_; // Loop until we find a wakeup to consume or timeout. // Note that, since the thread ticker is just reset, we don't need to check // whether the thread is idle on the very first pass of the loop. bool first_pass = true; - while (true) { - int x = wakeup_count_.load(std::memory_order_relaxed); - if (x != 0) { - if (!wakeup_count_.compare_exchange_weak(x, x - 1, - std::memory_order_acquire, - std::memory_order_relaxed)) { - continue; // Raced with someone, retry. - } - // Successfully consumed a wakeup, we're done. - waiter_count_.fetch_sub(1, std::memory_order_relaxed); - return true; - } - + while (wakeup_count_ == 0) { if (!first_pass) MaybeBecomeIdle(); // No wakeups available, time to wait. if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) { @@ -429,7 +442,7 @@ bool Waiter::Wait(KernelTimeout t) { // initialization guarantees this is not a narrowing conversion. const unsigned long err{GetLastError()}; // NOLINT(runtime/int) if (err == ERROR_TIMEOUT) { - waiter_count_.fetch_sub(1, std::memory_order_relaxed); + --waiter_count_; return false; } else { ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err); @@ -437,23 +450,27 @@ bool Waiter::Wait(KernelTimeout t) { } first_pass = false; } + // Consume a wakeup and we're done. + --wakeup_count_; + --waiter_count_; + return true; } void Waiter::Post() { - wakeup_count_.fetch_add(1, std::memory_order_release); - Poke(); + LockHolder h(WinHelper::GetLock(this)); + ++wakeup_count_; + InternalCondVarPoke(); } void Waiter::Poke() { - if (waiter_count_.load(std::memory_order_relaxed) == 0) { - return; - } - // Potentially a waiter. Take the lock and check again. LockHolder h(WinHelper::GetLock(this)); - if (waiter_count_.load(std::memory_order_relaxed) == 0) { - return; + InternalCondVarPoke(); +} + +void Waiter::InternalCondVarPoke() { + if (waiter_count_ != 0) { + WakeConditionVariable(WinHelper::GetCond(this)); } - WakeConditionVariable(WinHelper::GetCond(this)); } #else diff --git a/absl/synchronization/internal/waiter.h b/absl/synchronization/internal/waiter.h index c23897007e70..cac81d5fd226 100644 --- a/absl/synchronization/internal/waiter.h +++ b/absl/synchronization/internal/waiter.h @@ -58,14 +58,15 @@ namespace synchronization_internal { // Waiter is an OS-specific semaphore. class Waiter { public: - // No constructor, instances use the reserved space in ThreadIdentity. - // All initialization logic belongs in `Init()`. - Waiter() = delete; + // Prepare any data to track waits. + Waiter(); + + // Not copyable or movable Waiter(const Waiter&) = delete; Waiter& operator=(const Waiter&) = delete; - // Prepare any data to track waits. - void Init(); + // Destroy any data to track waits. + ~Waiter(); // Blocks the calling thread until a matching call to `Post()` or // `t` has passed. Returns `true` if woken (`Post()` called), @@ -122,13 +123,8 @@ class Waiter { std::atomic<int> wakeups_; #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32 - // The Windows API has lots of choices for synchronization - // primivitives. We are using SRWLOCK and CONDITION_VARIABLE - // because they don't require a destructor to release system - // resources. - // - // However, we can't include Windows.h in our headers, so we use aligned - // storage buffers to define the storage. + // We can't include Windows.h in our headers, so we use aligned storage + // buffers to define the storage of SRWLOCK and CONDITION_VARIABLE. using SRWLockStorage = typename std::aligned_storage<sizeof(void*), alignof(void*)>::type; using ConditionVariableStorage = @@ -138,10 +134,13 @@ class Waiter { // condition variable storage once the types are complete. class WinHelper; + // REQUIRES: WinHelper::GetLock(this) must be held. + void InternalCondVarPoke(); + SRWLockStorage mu_storage_; ConditionVariableStorage cv_storage_; - std::atomic<int> waiter_count_; - std::atomic<int> wakeup_count_; + int waiter_count_; + int wakeup_count_; #else #error Unknown ABSL_WAITER_MODE |