diff options
Diffstat (limited to 'tvix/store/src/directoryservice/memory.rs')
-rw-r--r-- | tvix/store/src/directoryservice/memory.rs | 20 |
1 files changed, 11 insertions, 9 deletions
diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs index 634dbf9922d0..ac67c999d01b 100644 --- a/tvix/store/src/directoryservice/memory.rs +++ b/tvix/store/src/directoryservice/memory.rs @@ -1,16 +1,20 @@ use crate::{proto, B3Digest, Error}; +use futures::Stream; use std::collections::HashMap; +use std::pin::Pin; use std::sync::{Arc, RwLock}; +use tonic::async_trait; use tracing::{instrument, warn}; -use super::utils::SimplePutter; -use super::{DirectoryPutter, DirectoryService, DirectoryTraverser}; +use super::utils::{traverse_directory, SimplePutter}; +use super::{DirectoryPutter, DirectoryService}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, } +#[async_trait] impl DirectoryService for MemoryDirectoryService { /// Constructs a [MemoryDirectoryService] from the passed [url::Url]: /// - scheme has to be `memory://` @@ -27,8 +31,9 @@ impl DirectoryService for MemoryDirectoryService { Ok(Self::default()) } + #[instrument(skip(self, digest), fields(directory.digest = %digest))] - fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { let db = self.db.read()?; match db.get(digest) { @@ -62,7 +67,7 @@ impl DirectoryService for MemoryDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); // validate the directory itself. @@ -84,11 +89,8 @@ impl DirectoryService for MemoryDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> { - Box::new(DirectoryTraverser::with( - self.clone(), - root_directory_digest, - )) + ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + traverse_directory(self.clone(), root_directory_digest) } #[instrument(skip_all)] |