From 37a348b4fae16b2b1c5ec12deaa085a049833d7f Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Tue, 19 Sep 2023 11:46:41 -0500 Subject: refactor(tvix/store): Asyncify PathInfoService and DirectoryService We've decided to asyncify all of the services to reduce some of the pains going back and for between sync<->async. The end goal will be for all the tvix-store internals to be async and then expose a sync interface for things like tvix eval io. Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369 Autosubmit: Connor Brewster Tested-by: BuildkiteCI Reviewed-by: flokli --- tvix/store/src/directoryservice/memory.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) (limited to 'tvix/store/src/directoryservice/memory.rs') diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs index 634dbf9922..ac67c999d0 100644 --- a/tvix/store/src/directoryservice/memory.rs +++ b/tvix/store/src/directoryservice/memory.rs @@ -1,16 +1,20 @@ use crate::{proto, B3Digest, Error}; +use futures::Stream; use std::collections::HashMap; +use std::pin::Pin; use std::sync::{Arc, RwLock}; +use tonic::async_trait; use tracing::{instrument, warn}; -use super::utils::SimplePutter; -use super::{DirectoryPutter, DirectoryService, DirectoryTraverser}; +use super::utils::{traverse_directory, SimplePutter}; +use super::{DirectoryPutter, DirectoryService}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { db: Arc>>, } +#[async_trait] impl DirectoryService for MemoryDirectoryService { /// Constructs a [MemoryDirectoryService] from the passed [url::Url]: /// - scheme has to be `memory://` @@ -27,8 +31,9 @@ impl DirectoryService for MemoryDirectoryService { Ok(Self::default()) } + #[instrument(skip(self, digest), fields(directory.digest = %digest))] - fn get(&self, digest: &B3Digest) -> Result, Error> { + async fn get(&self, digest: &B3Digest) -> Result, Error> { let db = self.db.read()?; match db.get(digest) { @@ -62,7 +67,7 @@ impl DirectoryService for MemoryDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - fn put(&self, directory: proto::Directory) -> Result { + async fn put(&self, directory: proto::Directory) -> Result { let digest = directory.digest(); // validate the directory itself. @@ -84,11 +89,8 @@ impl DirectoryService for MemoryDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Box> + Send> { - Box::new(DirectoryTraverser::with( - self.clone(), - root_directory_digest, - )) + ) -> Pin> + Send>> { + traverse_directory(self.clone(), root_directory_digest) } #[instrument(skip_all)] -- cgit 1.4.1