about summary refs log tree commit diff
path: root/src/util/promise.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/util/promise.rs')
-rw-r--r--src/util/promise.rs159
1 files changed, 159 insertions, 0 deletions
diff --git a/src/util/promise.rs b/src/util/promise.rs
new file mode 100644
index 000000000000..41f3d76e7737
--- /dev/null
+++ b/src/util/promise.rs
@@ -0,0 +1,159 @@
+use std::future::Future;
+use std::pin::Pin;
+use std::sync::{Arc, RwLock};
+use std::task::{Context, Poll, Waker};
+
+pub struct Promise<Env, T> {
+    inner: Arc<RwLock<Inner<T>>>,
+    waiters: Arc<RwLock<Vec<Box<dyn Fn(&mut Env, &T)>>>>,
+}
+
+pub struct Complete<T> {
+    inner: Arc<RwLock<Inner<T>>>,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct Cancelled;
+
+struct Inner<T> {
+    value: Option<Arc<T>>,
+    waker: Option<Waker>,
+}
+
+pub fn promise<Env, T>() -> (Complete<T>, Promise<Env, T>) {
+    let inner = Arc::new(RwLock::new(Inner {
+        value: None,
+        waker: None,
+    }));
+    let promise = Promise {
+        inner: inner.clone(),
+        waiters: Arc::new(RwLock::new(Vec::new())),
+    };
+    let complete = Complete { inner: inner };
+    (complete, promise)
+}
+
+impl<T> Complete<T> {
+    pub fn fulfill(&self, val: T) {
+        let mut inner = self.inner.write().unwrap();
+        inner.value = Some(Arc::new(val));
+        if let Some(waker) = inner.waker.take() {
+            waker.wake()
+        }
+    }
+}
+
+impl<T> Complete<Result<T, Cancelled>> {
+    pub fn cancel(&mut self) {
+        self.fulfill(Err(Cancelled))
+    }
+}
+
+impl<E, T> Complete<Result<T, E>> {
+    pub fn ok(&mut self, val: T) {
+        self.fulfill(Ok(val))
+    }
+
+    pub fn err(&mut self, e: E) {
+        self.fulfill(Err(e))
+    }
+}
+
+impl<Env, T> Promise<Env, T> {
+    pub fn on_fulfill<F: Fn(&mut Env, &T) + 'static>(&mut self, f: F) {
+        let mut waiters = self.waiters.write().unwrap();
+        waiters.push(Box::new(f));
+    }
+}
+
+impl<Env, T> Promise<Env, Result<T, Cancelled>> {
+    pub fn on_cancel<F: Fn(&mut Env) + 'static>(&mut self, f: F) {
+        self.on_err(move |env, _| f(env))
+    }
+}
+
+impl<Env, E, T> Promise<Env, Result<T, E>> {
+    pub fn on_ok<F: Fn(&mut Env, &T) + 'static>(&mut self, f: F) {
+        self.on_fulfill(move |env, r| {
+            if let Ok(val) = r {
+                f(env, val)
+            }
+        })
+    }
+
+    pub fn on_err<F: Fn(&mut Env, &E) + 'static>(&mut self, f: F) {
+        self.on_fulfill(move |env, r| {
+            if let Err(e) = r {
+                f(env, e)
+            }
+        })
+    }
+}
+
+pub trait Give<Env> {
+    fn give(&self, env: &mut Env) -> bool;
+}
+
+impl<Env, T> Give<Env> for Promise<Env, T> {
+    fn give(&self, env: &mut Env) -> bool {
+        let inner = self.inner.read().unwrap();
+        if let Some(value) = &inner.value {
+            let mut waiters = self.waiters.write().unwrap();
+            for waiter in waiters.iter() {
+                waiter(env, value);
+            }
+            waiters.clear();
+            true
+        } else {
+            false
+        }
+    }
+}
+
+impl<Env, T> Clone for Promise<Env, T> {
+    fn clone(&self) -> Self {
+        Promise {
+            inner: self.inner.clone(),
+            waiters: self.waiters.clone(),
+        }
+    }
+}
+
+impl<Env, P: Give<Env>> Give<Env> for &P {
+    fn give(&self, env: &mut Env) -> bool {
+        (*self).give(env)
+    }
+}
+
+impl<Env, T> Future for Promise<Env, T> {
+    type Output = Arc<T>;
+    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
+        let mut inner = self.inner.write().unwrap();
+        match inner.value {
+            Some(ref v) => Poll::Ready(v.clone()),
+            None => {
+                inner.waker = Some(cx.waker().clone());
+                Poll::Pending
+            }
+        }
+    }
+}
+
+pub struct Promises<'a, Env> {
+    ps: Vec<Box<dyn Give<Env> + 'a>>,
+}
+
+impl<'a, Env> Promises<'a, Env> {
+    pub fn new() -> Self {
+        Promises { ps: Vec::new() }
+    }
+
+    pub fn push(&mut self, p: Box<dyn Give<Env> + 'a>) {
+        self.ps.push(p);
+    }
+
+    pub fn give_all(&mut self, env: &mut Env) {
+        debug!("promises: {}", self.ps.len());
+        self.ps.retain(|p| !p.give(env));
+    }
+}