diff options
Diffstat (limited to 'tvix/store/src/directoryservice')
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 36 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/memory.rs | 20 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 16 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/sled.rs | 20 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/traverse.rs | 14 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/utils.rs | 19 |
6 files changed, 73 insertions, 52 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index 1b33572cf7de..3b1a7906f7d0 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -46,8 +46,6 @@ impl GRPCDirectoryService { } impl DirectoryService for GRPCDirectoryService { - type DirectoriesIterator = StreamIterator; - fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); @@ -113,7 +111,10 @@ impl DirectoryService for GRPCDirectoryService { } #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest_as_vec = root_directory_digest.to_vec(); @@ -132,17 +133,15 @@ impl DirectoryService for GRPCDirectoryService { let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); - StreamIterator::new( + Box::new(StreamIterator::new( self.tokio_handle.clone(), root_directory_digest.clone(), stream, - ) + )) } - type DirectoryPutter = GRPCPutter; - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Self::DirectoryPutter + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, { @@ -160,7 +159,7 @@ impl DirectoryService for GRPCDirectoryService { Ok(s) }); - GRPCPutter::new(self.tokio_handle.clone(), tx, task) + Box::new(GRPCPutter::new(self.tokio_handle.clone(), tx, task)) } } @@ -276,15 +275,6 @@ impl GRPCPutter { rq: Some((task, directory_sender)), } } - - #[allow(dead_code)] - // allows checking if the tx part of the channel is closed. - fn is_closed(&self) -> bool { - match self.rq { - None => true, - Some((_, ref directory_sender)) => directory_sender.is_closed(), - } - } } impl DirectoryPutter for GRPCPutter { @@ -329,6 +319,14 @@ impl DirectoryPutter for GRPCPutter { } } } + + // allows checking if the tx part of the channel is closed. + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } } #[cfg(test)] @@ -342,7 +340,7 @@ mod tests { use tonic::transport::{Endpoint, Server, Uri}; use crate::{ - directoryservice::{DirectoryPutter, DirectoryService}, + directoryservice::DirectoryService, proto, proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, tests::{ diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs index 1fd619f7c8cb..1f203834a7cf 100644 --- a/tvix/store/src/directoryservice/memory.rs +++ b/tvix/store/src/directoryservice/memory.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, RwLock}; use tracing::{instrument, warn}; use super::utils::SimplePutter; -use super::{DirectoryService, DirectoryTraverser}; +use super::{DirectoryPutter, DirectoryService, DirectoryTraverser}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { @@ -12,8 +12,6 @@ pub struct MemoryDirectoryService { } impl DirectoryService for MemoryDirectoryService { - type DirectoriesIterator = DirectoryTraverser<Self>; - #[instrument(skip(self, digest), fields(directory.digest = %digest))] fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { let db = self.db.read()?; @@ -68,17 +66,21 @@ impl DirectoryService for MemoryDirectoryService { } #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { - DirectoryTraverser::with(self.clone(), root_directory_digest) + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> { + Box::new(DirectoryTraverser::with( + self.clone(), + root_directory_digest, + )) } - type DirectoryPutter = SimplePutter<Self>; - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Self::DirectoryPutter + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, { - SimplePutter::new(self.clone()) + Box::new(SimplePutter::new(self.clone())) } } diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index f387d28948f0..6589a5b62599 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -14,10 +14,7 @@ pub use self::utils::DirectoryTraverser; /// The base trait all Directory services need to implement. /// This is a simple get and put of [crate::proto::Directory], returning their /// digest. -pub trait DirectoryService { - type DirectoriesIterator: Iterator<Item = Result<proto::Directory, Error>> + Send; - type DirectoryPutter: DirectoryPutter; - +pub trait DirectoryService: Send + Sync { /// Get looks up a single Directory message by its digest. /// In case the directory is not found, Ok(None) is returned. fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; @@ -29,11 +26,14 @@ pub trait DirectoryService { /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator; + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send>; /// Allows persisting a closure of [proto::Directory], which is a graph of /// connected Directory messages. - fn put_multiple_start(&self) -> Self::DirectoryPutter; + fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; } /// Provides a handle to put a closure of connected [proto::Directory] elements. @@ -51,4 +51,8 @@ pub trait DirectoryPutter { /// Close the stream, and wait for any errors. fn close(&mut self) -> Result<B3Digest, Error>; + + /// Return whether the stream is closed or not. + /// Used from some [DirectoryService] implementations only. + fn is_closed(&self) -> bool; } diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs index e189e8acf507..8ed2c59c2d0f 100644 --- a/tvix/store/src/directoryservice/sled.rs +++ b/tvix/store/src/directoryservice/sled.rs @@ -1,3 +1,4 @@ +use crate::directoryservice::DirectoryPutter; use crate::proto::Directory; use crate::{proto, B3Digest, Error}; use prost::Message; @@ -29,8 +30,6 @@ impl SledDirectoryService { } impl DirectoryService for SledDirectoryService { - type DirectoriesIterator = DirectoryTraverser<Self>; - #[instrument(skip(self, digest), fields(directory.digest = %digest))] fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { match self.db.get(digest.to_vec()) { @@ -91,17 +90,22 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { - DirectoryTraverser::with(self.clone(), root_directory_digest) + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box<(dyn Iterator<Item = Result<proto::Directory, Error>> + std::marker::Send + 'static)> + { + Box::new(DirectoryTraverser::with( + self.clone(), + root_directory_digest, + )) } - type DirectoryPutter = SimplePutter<Self>; - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Self::DirectoryPutter + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, { - SimplePutter::new(self.clone()) + Box::new(SimplePutter::new(self.clone())) } } diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs index 8691baa8b73f..c1c0c6f8df64 100644 --- a/tvix/store/src/directoryservice/traverse.rs +++ b/tvix/store/src/directoryservice/traverse.rs @@ -10,8 +10,8 @@ use tracing::{instrument, warn}; /// TODO: the name of this function (and mod) is a bit bad, because it doesn't /// clearly distinguish it from the BFS traversers. #[instrument(skip(directory_service))] -pub fn traverse_to<DS: DirectoryService>( - directory_service: &DS, +pub fn traverse_to( + directory_service: &Box<dyn DirectoryService>, node: crate::proto::node::Node, path: &std::path::Path, ) -> Result<Option<crate::proto::node::Node>, Error> { @@ -82,13 +82,9 @@ pub fn traverse_to<DS: DirectoryService>( mod tests { use std::path::PathBuf; - use crate::{ - directoryservice::DirectoryPutter, - directoryservice::DirectoryService, - tests::{ - fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, - utils::gen_directory_service, - }, + use crate::tests::{ + fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, + utils::gen_directory_service, }; use super::traverse_to; diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs index 3661808734f3..d152fb78a90c 100644 --- a/tvix/store/src/directoryservice/utils.rs +++ b/tvix/store/src/directoryservice/utils.rs @@ -107,12 +107,14 @@ impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { pub struct SimplePutter<DS: DirectoryService> { directory_service: DS, last_directory_digest: Option<B3Digest>, + closed: bool, } impl<DS: DirectoryService> SimplePutter<DS> { pub fn new(directory_service: DS) -> Self { Self { directory_service, + closed: false, last_directory_digest: None, } } @@ -120,6 +122,10 @@ impl<DS: DirectoryService> SimplePutter<DS> { impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + let digest = self.directory_service.put(directory)?; // track the last directory digest @@ -130,11 +136,22 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { /// We need to be mutable here, as that's the signature of the trait. fn close(&mut self) -> Result<B3Digest, Error> { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + match &self.last_directory_digest { - Some(last_digest) => Ok(last_digest.clone()), + Some(last_digest) => { + self.closed = true; + Ok(last_digest.clone()) + } None => Err(Error::InvalidRequest( "no directories sent, can't show root digest".to_string(), )), } } + + fn is_closed(&self) -> bool { + self.closed + } } |