about summary refs log tree commit diff
path: root/absl/synchronization/internal
diff options
context:
space:
mode:
Diffstat (limited to 'absl/synchronization/internal')
-rw-r--r--absl/synchronization/internal/waiter.cc56
-rw-r--r--absl/synchronization/internal/waiter.h7
2 files changed, 29 insertions, 34 deletions
diff --git a/absl/synchronization/internal/waiter.cc b/absl/synchronization/internal/waiter.cc
index d83d8087f8d5..2f41aa5f2b44 100644
--- a/absl/synchronization/internal/waiter.cc
+++ b/absl/synchronization/internal/waiter.cc
@@ -161,7 +161,7 @@ bool Waiter::Wait(KernelTimeout t) {
 
 void Waiter::Post() {
   if (futex_.fetch_add(1, std::memory_order_release) == 0) {
-    // We incremented from 0, need to wake a potential waker.
+    // We incremented from 0, need to wake a potential waiter.
     Poke();
   }
 }
@@ -210,8 +210,8 @@ void Waiter::Init() {
     ABSL_RAW_LOG(FATAL, "pthread_cond_init failed: %d", err2);
   }
 
-  waiter_count_.store(0, std::memory_order_relaxed);
-  wakeup_count_.store(0, std::memory_order_relaxed);
+  waiter_count_ = 0;
+  wakeup_count_ = 0;
 }
 
 bool Waiter::Wait(KernelTimeout t) {
@@ -221,24 +221,12 @@ bool Waiter::Wait(KernelTimeout t) {
   }
 
   PthreadMutexHolder h(&mu_);
-  waiter_count_.fetch_add(1, std::memory_order_relaxed);
+  ++waiter_count_;
   // Loop until we find a wakeup to consume or timeout.
   // Note that, since the thread ticker is just reset, we don't need to check
   // whether the thread is idle on the very first pass of the loop.
   bool first_pass = true;
-  while (true) {
-    int x = wakeup_count_.load(std::memory_order_relaxed);
-    if (x != 0) {
-      if (!wakeup_count_.compare_exchange_weak(x, x - 1,
-                                               std::memory_order_acquire,
-                                               std::memory_order_relaxed)) {
-        continue;  // Raced with someone, retry.
-      }
-      // Successfully consumed a wakeup, we're done.
-      waiter_count_.fetch_sub(1, std::memory_order_relaxed);
-      return true;
-    }
-
+  while (wakeup_count_ == 0) {
     if (!first_pass) MaybeBecomeIdle();
     // No wakeups available, time to wait.
     if (!t.has_timeout()) {
@@ -249,34 +237,38 @@ bool Waiter::Wait(KernelTimeout t) {
     } else {
       const int err = pthread_cond_timedwait(&cv_, &mu_, &abs_timeout);
       if (err == ETIMEDOUT) {
-        waiter_count_.fetch_sub(1, std::memory_order_relaxed);
+        --waiter_count_;
         return false;
       }
       if (err != 0) {
-        ABSL_RAW_LOG(FATAL, "pthread_cond_wait failed: %d", err);
+        ABSL_RAW_LOG(FATAL, "pthread_cond_timedwait failed: %d", err);
       }
     }
     first_pass = false;
   }
+  // Consume a wakeup and we're done.
+  --wakeup_count_;
+  --waiter_count_;
+  return true;
 }
 
 void Waiter::Post() {
-  wakeup_count_.fetch_add(1, std::memory_order_release);
-  Poke();
+  PthreadMutexHolder h(&mu_);
+  ++wakeup_count_;
+  InternalCondVarPoke();
 }
 
 void Waiter::Poke() {
-  if (waiter_count_.load(std::memory_order_relaxed) == 0) {
-    return;
-  }
-  // Potentially a waker. Take the lock and check again.
   PthreadMutexHolder h(&mu_);
-  if (waiter_count_.load(std::memory_order_relaxed) == 0) {
-    return;
-  }
-  const int err = pthread_cond_signal(&cv_);
-  if (err != 0) {
-    ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
+  InternalCondVarPoke();
+}
+
+void Waiter::InternalCondVarPoke() {
+  if (waiter_count_ != 0) {
+    const int err = pthread_cond_signal(&cv_);
+    if (ABSL_PREDICT_FALSE(err != 0)) {
+      ABSL_RAW_LOG(FATAL, "pthread_cond_signal failed: %d", err);
+    }
   }
 }
 
@@ -456,7 +448,7 @@ void Waiter::Poke() {
   if (waiter_count_.load(std::memory_order_relaxed) == 0) {
     return;
   }
-  // Potentially a waker. Take the lock and check again.
+  // Potentially a waiter. Take the lock and check again.
   LockHolder h(WinHelper::GetLock(this));
   if (waiter_count_.load(std::memory_order_relaxed) == 0) {
     return;
diff --git a/absl/synchronization/internal/waiter.h b/absl/synchronization/internal/waiter.h
index 099935453bee..c23897007e70 100644
--- a/absl/synchronization/internal/waiter.h
+++ b/absl/synchronization/internal/waiter.h
@@ -106,10 +106,13 @@ class Waiter {
   static_assert(sizeof(int32_t) == sizeof(futex_), "Wrong size for futex");
 
 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_CONDVAR
+  // REQUIRES: mu_ must be held.
+  void InternalCondVarPoke();
+
   pthread_mutex_t mu_;
   pthread_cond_t cv_;
-  std::atomic<int> waiter_count_;
-  std::atomic<int> wakeup_count_;  // Unclaimed wakeups, written under lock.
+  int waiter_count_;
+  int wakeup_count_;  // Unclaimed wakeups.
 
 #elif ABSL_WAITER_MODE == ABSL_WAITER_MODE_SEM
   sem_t sem_;