about summary refs log tree commit diff
path: root/absl/synchronization
diff options
context:
space:
mode:
Diffstat (limited to 'absl/synchronization')
-rw-r--r--absl/synchronization/internal/create_thread_identity.cc4
-rw-r--r--absl/synchronization/internal/create_thread_identity.h4
-rw-r--r--absl/synchronization/internal/per_thread_sem.cc6
-rw-r--r--absl/synchronization/internal/per_thread_sem.h5
-rw-r--r--absl/synchronization/internal/waiter.cc77
-rw-r--r--absl/synchronization/internal/waiter.h27
6 files changed, 77 insertions, 46 deletions
diff --git a/absl/synchronization/internal/create_thread_identity.cc b/absl/synchronization/internal/create_thread_identity.cc
index ce3676a88482..eef661f23d1c 100644
--- a/absl/synchronization/internal/create_thread_identity.cc
+++ b/absl/synchronization/internal/create_thread_identity.cc
@@ -37,7 +37,7 @@ static base_internal::ThreadIdentity* thread_identity_freelist;
 
 // A per-thread destructor for reclaiming associated ThreadIdentity objects.
 // Since we must preserve their storage we cache them for re-use.
-static void ReclaimThreadIdentity(void* v) {
+void ReclaimThreadIdentity(void* v) {
   base_internal::ThreadIdentity* identity =
       static_cast<base_internal::ThreadIdentity*>(v);
 
@@ -47,6 +47,8 @@ static void ReclaimThreadIdentity(void* v) {
     base_internal::LowLevelAlloc::Free(identity->per_thread_synch.all_locks);
   }
 
+  PerThreadSem::Destroy(identity);
+
   // We must explicitly clear the current thread's identity:
   // (a) Subsequent (unrelated) per-thread destructors may require an identity.
   //     We must guarantee a new identity is used in this case (this instructor
diff --git a/absl/synchronization/internal/create_thread_identity.h b/absl/synchronization/internal/create_thread_identity.h
index ebb16c56a7ca..97237a6ede9a 100644
--- a/absl/synchronization/internal/create_thread_identity.h
+++ b/absl/synchronization/internal/create_thread_identity.h
@@ -35,6 +35,10 @@ namespace synchronization_internal {
 // For private use only.
 base_internal::ThreadIdentity* CreateThreadIdentity();
 
+// A per-thread destructor for reclaiming associated ThreadIdentity objects.
+// For private use only.
+void ReclaimThreadIdentity(void* v);
+
 // Returns the ThreadIdentity object representing the calling thread; guaranteed
 // to be unique for its lifetime.  The returned object will remain valid for the
 // program's lifetime; although it may be re-assigned to a subsequent thread.
diff --git a/absl/synchronization/internal/per_thread_sem.cc b/absl/synchronization/internal/per_thread_sem.cc
index b7014fb2b02d..2a78b20fc883 100644
--- a/absl/synchronization/internal/per_thread_sem.cc
+++ b/absl/synchronization/internal/per_thread_sem.cc
@@ -40,12 +40,16 @@ std::atomic<int> *PerThreadSem::GetThreadBlockedCounter() {
 }
 
 void PerThreadSem::Init(base_internal::ThreadIdentity *identity) {
-  Waiter::GetWaiter(identity)->Init();
+  new (Waiter::GetWaiter(identity)) Waiter();
   identity->ticker.store(0, std::memory_order_relaxed);
   identity->wait_start.store(0, std::memory_order_relaxed);
   identity->is_idle.store(false, std::memory_order_relaxed);
 }
 
+void PerThreadSem::Destroy(base_internal::ThreadIdentity *identity) {
+  Waiter::GetWaiter(identity)->~Waiter();
+}
+
 void PerThreadSem::Tick(base_internal::ThreadIdentity *identity) {
   const int ticker =
       identity->ticker.fetch_add(1, std::memory_order_relaxed) + 1;
diff --git a/absl/synchronization/internal/per_thread_sem.h b/absl/synchronization/internal/per_thread_sem.h
index e7da070ba72c..113cdfb7dc1e 100644
--- a/absl/synchronization/internal/per_thread_sem.h
+++ b/absl/synchronization/internal/per_thread_sem.h
@@ -65,6 +65,10 @@ class PerThreadSem {
   // REQUIRES: May only be called by ThreadIdentity.
   static void Init(base_internal::ThreadIdentity* identity);
 
+  // Destroy the PerThreadSem associated with "identity".
+  // REQUIRES: May only be called by ThreadIdentity.
+  static void Destroy(base_internal::ThreadIdentity* identity);
+
   // Increments "identity"'s count.
   static inline void Post(base_internal::ThreadIdentity* identity);
 
@@ -77,6 +81,7 @@ class PerThreadSem {
   friend class PerThreadSemTest;
   friend class absl::Mutex;
   friend absl::base_internal::ThreadIdentity* CreateThreadIdentity();
+  friend void ReclaimThreadIdentity(void* v);
 };
 
 }  // namespace synchronization_internal
diff --git a/absl/synchronization/internal/waiter.cc b/absl/synchronization/internal/waiter.cc
index 2f41aa5f2b44..8e7137705325 100644
--- a/absl/synchronization/internal/waiter.cc
+++ b/absl/synchronization/internal/waiter.cc
@@ -122,10 +122,12 @@ class Futex {
   }
 };
 
-void Waiter::Init() {
+Waiter::Waiter() {
   futex_.store(0, std::memory_order_relaxed);
 }
 
+Waiter::~Waiter() = default;
+
 bool Waiter::Wait(KernelTimeout t) {
   // Loop until we can atomically decrement futex from a positive
   // value, waiting on a futex while we believe it is zero.
@@ -199,7 +201,7 @@ class PthreadMutexHolder {
   pthread_mutex_t *mu_;
 };
 
-void Waiter::Init() {
+Waiter::Waiter() {
   const int err = pthread_mutex_init(&mu_, 0);
   if (err != 0) {
     ABSL_RAW_LOG(FATAL, "pthread_mutex_init failed: %d", err);
@@ -214,6 +216,18 @@ void Waiter::Init() {
   wakeup_count_ = 0;
 }
 
+Waiter::~Waiter() {
+  const int err = pthread_mutex_destroy(&mu_);
+  if (err != 0) {
+    ABSL_RAW_LOG(FATAL, "pthread_mutex_destroy failed: %d", err);
+  }
+
+  const int err2 = pthread_cond_destroy(&cv_);
+  if (err2 != 0) {
+    ABSL_RAW_LOG(FATAL, "pthread_cond_destroy failed: %d", err2);
+  }
+}
+
 bool Waiter::Wait(KernelTimeout t) {
   struct timespec abs_timeout;
   if (t.has_timeout()) {
@@ -274,13 +288,19 @@ void Waiter::InternalCondVarPoke() {
 
 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
 
-void Waiter::Init() {
+Waiter::Waiter() {
   if (sem_init(&sem_, 0, 0) != 0) {
     ABSL_RAW_LOG(FATAL, "sem_init failed with errno %d\n", errno);
   }
   wakeups_.store(0, std::memory_order_relaxed);
 }
 
+Waiter::~Waiter() {
+  if (sem_destroy(&sem_) != 0) {
+    ABSL_RAW_LOG(FATAL, "sem_destroy failed with errno %d\n", errno);
+  }
+}
+
 bool Waiter::Wait(KernelTimeout t) {
   struct timespec abs_timeout;
   if (t.has_timeout()) {
@@ -388,39 +408,32 @@ class LockHolder {
   SRWLOCK* mu_;
 };
 
-void Waiter::Init() {
+Waiter::Waiter() {
   auto *mu = ::new (static_cast<void *>(&mu_storage_)) SRWLOCK;
   auto *cv = ::new (static_cast<void *>(&cv_storage_)) CONDITION_VARIABLE;
   InitializeSRWLock(mu);
   InitializeConditionVariable(cv);
-  waiter_count_.store(0, std::memory_order_relaxed);
-  wakeup_count_.store(0, std::memory_order_relaxed);
+  waiter_count_ = 0;
+  wakeup_count_ = 0;
 }
 
+// SRW locks and condition variables do not need to be explicitly destroyed.
+// https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-initializesrwlock
+// https://stackoverflow.com/questions/28975958/why-does-windows-have-no-deleteconditionvariable-function-to-go-together-with
+Waiter::~Waiter() = default;
+
 bool Waiter::Wait(KernelTimeout t) {
   SRWLOCK *mu = WinHelper::GetLock(this);
   CONDITION_VARIABLE *cv = WinHelper::GetCond(this);
 
   LockHolder h(mu);
-  waiter_count_.fetch_add(1, std::memory_order_relaxed);
+  ++waiter_count_;
 
   // Loop until we find a wakeup to consume or timeout.
   // Note that, since the thread ticker is just reset, we don't need to check
   // whether the thread is idle on the very first pass of the loop.
   bool first_pass = true;
-  while (true) {
-    int x = wakeup_count_.load(std::memory_order_relaxed);
-    if (x != 0) {
-      if (!wakeup_count_.compare_exchange_weak(x, x - 1,
-                                               std::memory_order_acquire,
-                                               std::memory_order_relaxed)) {
-        continue;  // Raced with someone, retry.
-      }
-      // Successfully consumed a wakeup, we're done.
-      waiter_count_.fetch_sub(1, std::memory_order_relaxed);
-      return true;
-    }
-
+  while (wakeup_count_ == 0) {
     if (!first_pass) MaybeBecomeIdle();
     // No wakeups available, time to wait.
     if (!SleepConditionVariableSRW(cv, mu, t.InMillisecondsFromNow(), 0)) {
@@ -429,7 +442,7 @@ bool Waiter::Wait(KernelTimeout t) {
       // initialization guarantees this is not a narrowing conversion.
       const unsigned long err{GetLastError()};  // NOLINT(runtime/int)
       if (err == ERROR_TIMEOUT) {
-        waiter_count_.fetch_sub(1, std::memory_order_relaxed);
+        --waiter_count_;
         return false;
       } else {
         ABSL_RAW_LOG(FATAL, "SleepConditionVariableSRW failed: %lu", err);
@@ -437,23 +450,27 @@ bool Waiter::Wait(KernelTimeout t) {
     }
     first_pass = false;
   }
+  // Consume a wakeup and we're done.
+  --wakeup_count_;
+  --waiter_count_;
+  return true;
 }
 
 void Waiter::Post() {
-  wakeup_count_.fetch_add(1, std::memory_order_release);
-  Poke();
+  LockHolder h(WinHelper::GetLock(this));
+  ++wakeup_count_;
+  InternalCondVarPoke();
 }
 
 void Waiter::Poke() {
-  if (waiter_count_.load(std::memory_order_relaxed) == 0) {
-    return;
-  }
-  // Potentially a waiter. Take the lock and check again.
   LockHolder h(WinHelper::GetLock(this));
-  if (waiter_count_.load(std::memory_order_relaxed) == 0) {
-    return;
+  InternalCondVarPoke();
+}
+
+void Waiter::InternalCondVarPoke() {
+  if (waiter_count_ != 0) {
+    WakeConditionVariable(WinHelper::GetCond(this));
   }
-  WakeConditionVariable(WinHelper::GetCond(this));
 }
 
 #else
diff --git a/absl/synchronization/internal/waiter.h b/absl/synchronization/internal/waiter.h
index c23897007e70..cac81d5fd226 100644
--- a/absl/synchronization/internal/waiter.h
+++ b/absl/synchronization/internal/waiter.h
@@ -58,14 +58,15 @@ namespace synchronization_internal {
 // Waiter is an OS-specific semaphore.
 class Waiter {
  public:
-  // No constructor, instances use the reserved space in ThreadIdentity.
-  // All initialization logic belongs in `Init()`.
-  Waiter() = delete;
+  // Prepare any data to track waits.
+  Waiter();
+
+  // Not copyable or movable
   Waiter(const Waiter&) = delete;
   Waiter& operator=(const Waiter&) = delete;
 
-  // Prepare any data to track waits.
-  void Init();
+  // Destroy any data to track waits.
+  ~Waiter();
 
   // Blocks the calling thread until a matching call to `Post()` or
   // `t` has passed. Returns `true` if woken (`Post()` called),
@@ -122,13 +123,8 @@ class Waiter {
   std::atomic<int> wakeups_;
 
 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_WIN32
-  // The Windows API has lots of choices for synchronization
-  // primivitives.  We are using SRWLOCK and CONDITION_VARIABLE
-  // because they don't require a destructor to release system
-  // resources.
-  //
-  // However, we can't include Windows.h in our headers, so we use aligned
-  // storage buffers to define the storage.
+  // We can't include Windows.h in our headers, so we use aligned storage
+  // buffers to define the storage of SRWLOCK and CONDITION_VARIABLE.
   using SRWLockStorage =
       typename std::aligned_storage<sizeof(void*), alignof(void*)>::type;
   using ConditionVariableStorage =
@@ -138,10 +134,13 @@ class Waiter {
   // condition variable storage once the types are complete.
   class WinHelper;
 
+  // REQUIRES: WinHelper::GetLock(this) must be held.
+  void InternalCondVarPoke();
+
   SRWLockStorage mu_storage_;
   ConditionVariableStorage cv_storage_;
-  std::atomic<int> waiter_count_;
-  std::atomic<int> wakeup_count_;
+  int waiter_count_;
+  int wakeup_count_;
 
 #else
   #error Unknown ABSL_WAITER_MODE