about summary refs log blame commit diff
path: root/src/util/promise.rs
blob: 41f3d76e773705648ab25779200bacb79e35abdc (plain) (tree)






























































































































































                                                                           
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll, Waker};

pub struct Promise<Env, T> {
    inner: Arc<RwLock<Inner<T>>>,
    waiters: Arc<RwLock<Vec<Box<dyn Fn(&mut Env, &T)>>>>,
}

pub struct Complete<T> {
    inner: Arc<RwLock<Inner<T>>>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Cancelled;

struct Inner<T> {
    value: Option<Arc<T>>,
    waker: Option<Waker>,
}

pub fn promise<Env, T>() -> (Complete<T>, Promise<Env, T>) {
    let inner = Arc::new(RwLock::new(Inner {
        value: None,
        waker: None,
    }));
    let promise = Promise {
        inner: inner.clone(),
        waiters: Arc::new(RwLock::new(Vec::new())),
    };
    let complete = Complete { inner: inner };
    (complete, promise)
}

impl<T> Complete<T> {
    pub fn fulfill(&self, val: T) {
        let mut inner = self.inner.write().unwrap();
        inner.value = Some(Arc::new(val));
        if let Some(waker) = inner.waker.take() {
            waker.wake()
        }
    }
}

impl<T> Complete<Result<T, Cancelled>> {
    pub fn cancel(&mut self) {
        self.fulfill(Err(Cancelled))
    }
}

impl<E, T> Complete<Result<T, E>> {
    pub fn ok(&mut self, val: T) {
        self.fulfill(Ok(val))
    }

    pub fn err(&mut self, e: E) {
        self.fulfill(Err(e))
    }
}

impl<Env, T> Promise<Env, T> {
    pub fn on_fulfill<F: Fn(&mut Env, &T) + 'static>(&mut self, f: F) {
        let mut waiters = self.waiters.write().unwrap();
        waiters.push(Box::new(f));
    }
}

impl<Env, T> Promise<Env, Result<T, Cancelled>> {
    pub fn on_cancel<F: Fn(&mut Env) + 'static>(&mut self, f: F) {
        self.on_err(move |env, _| f(env))
    }
}

impl<Env, E, T> Promise<Env, Result<T, E>> {
    pub fn on_ok<F: Fn(&mut Env, &T) + 'static>(&mut self, f: F) {
        self.on_fulfill(move |env, r| {
            if let Ok(val) = r {
                f(env, val)
            }
        })
    }

    pub fn on_err<F: Fn(&mut Env, &E) + 'static>(&mut self, f: F) {
        self.on_fulfill(move |env, r| {
            if let Err(e) = r {
                f(env, e)
            }
        })
    }
}

pub trait Give<Env> {
    fn give(&self, env: &mut Env) -> bool;
}

impl<Env, T> Give<Env> for Promise<Env, T> {
    fn give(&self, env: &mut Env) -> bool {
        let inner = self.inner.read().unwrap();
        if let Some(value) = &inner.value {
            let mut waiters = self.waiters.write().unwrap();
            for waiter in waiters.iter() {
                waiter(env, value);
            }
            waiters.clear();
            true
        } else {
            false
        }
    }
}

impl<Env, T> Clone for Promise<Env, T> {
    fn clone(&self) -> Self {
        Promise {
            inner: self.inner.clone(),
            waiters: self.waiters.clone(),
        }
    }
}

impl<Env, P: Give<Env>> Give<Env> for &P {
    fn give(&self, env: &mut Env) -> bool {
        (*self).give(env)
    }
}

impl<Env, T> Future for Promise<Env, T> {
    type Output = Arc<T>;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let mut inner = self.inner.write().unwrap();
        match inner.value {
            Some(ref v) => Poll::Ready(v.clone()),
            None => {
                inner.waker = Some(cx.waker().clone());
                Poll::Pending
            }
        }
    }
}

pub struct Promises<'a, Env> {
    ps: Vec<Box<dyn Give<Env> + 'a>>,
}

impl<'a, Env> Promises<'a, Env> {
    pub fn new() -> Self {
        Promises { ps: Vec::new() }
    }

    pub fn push(&mut self, p: Box<dyn Give<Env> + 'a>) {
        self.ps.push(p);
    }

    pub fn give_all(&mut self, env: &mut Env) {
        debug!("promises: {}", self.ps.len());
        self.ps.retain(|p| !p.give(env));
    }
}