about summary refs log tree commit diff
path: root/tvix/store/src/directoryservice/mod.rs
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-09-19T16·46-0500
committerclbot <clbot@tvl.fyi>2023-09-21T17·58+0000
commit37a348b4fae16b2b1c5ec12deaa085a049833d7f (patch)
tree7a1b1a7160036777b010cd81628960c1ca07486e /tvix/store/src/directoryservice/mod.rs
parent7e737fde34260daa477794d63b0b3344b4a1d81b (diff)
refactor(tvix/store): Asyncify PathInfoService and DirectoryService r/6623
We've decided to asyncify all of the services to reduce some of the
pains going back and for between sync<->async. The end goal will be for
all the tvix-store internals to be async and then expose a sync
interface for things like tvix eval io.

Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369
Autosubmit: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/directoryservice/mod.rs')
-rw-r--r--tvix/store/src/directoryservice/mod.rs24
1 files changed, 17 insertions, 7 deletions
diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs
index fea4191400f3..09210dfed8e8 100644
--- a/tvix/store/src/directoryservice/mod.rs
+++ b/tvix/store/src/directoryservice/mod.rs
@@ -1,4 +1,7 @@
 use crate::{proto, B3Digest, Error};
+use futures::Stream;
+use std::pin::Pin;
+use tonic::async_trait;
 
 mod from_addr;
 mod grpc;
@@ -12,32 +15,38 @@ pub use self::grpc::GRPCDirectoryService;
 pub use self::memory::MemoryDirectoryService;
 pub use self::sled::SledDirectoryService;
 pub use self::traverse::traverse_to;
-pub use self::utils::DirectoryTraverser;
 
 /// The base trait all Directory services need to implement.
 /// This is a simple get and put of [crate::proto::Directory], returning their
 /// digest.
+#[async_trait]
 pub trait DirectoryService: Send + Sync {
     /// Create a new instance by passing in a connection URL.
+    /// TODO: check if we want to make this async, instead of lazily connecting
     fn from_url(url: &url::Url) -> Result<Self, Error>
     where
         Self: Sized;
 
     /// Get looks up a single Directory message by its digest.
     /// In case the directory is not found, Ok(None) is returned.
-    fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
+    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
     /// Get uploads a single Directory message, and returns the calculated
     /// digest, or an error.
-    fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
+    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
 
     /// Looks up a closure of [proto::Directory].
-    /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`,
+    /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`,
     /// and we'd be able to add a default implementation for it here, but
     /// we can't have that yet.
+    ///
+    /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily,
+    /// and the box allows different underlying stream implementations to be returned since
+    /// Rust doesn't support this as a generic in traits yet. This is the same thing that
+    /// [async_trait] generates, but for streams instead of futures.
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send>;
+    ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>>;
 
     /// Allows persisting a closure of [proto::Directory], which is a graph of
     /// connected Directory messages.
@@ -49,16 +58,17 @@ pub trait DirectoryService: Send + Sync {
 /// The consumer can periodically call [DirectoryPutter::put], starting from the
 /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to
 /// retrieve the root digest (or an error).
+#[async_trait]
 pub trait DirectoryPutter: Send {
     /// Put a individual [proto::Directory] into the store.
     /// Error semantics and behaviour is up to the specific implementation of
     /// this trait.
     /// Due to bursting, the returned error might refer to an object previously
     /// sent via `put`.
-    fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
 
     /// Close the stream, and wait for any errors.
-    fn close(&mut self) -> Result<B3Digest, Error>;
+    async fn close(&mut self) -> Result<B3Digest, Error>;
 
     /// Return whether the stream is closed or not.
     /// Used from some [DirectoryService] implementations only.