diff options
Diffstat (limited to 'tvix/store/src/fs')
-rw-r--r-- | tvix/store/src/fs/mod.rs | 62 |
1 files changed, 32 insertions, 30 deletions
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs index 1333983460ea..d7bb1a50a2cf 100644 --- a/tvix/store/src/fs/mod.rs +++ b/tvix/store/src/fs/mod.rs @@ -30,7 +30,7 @@ use tokio::{ io::{AsyncBufReadExt, AsyncSeekExt}, sync::mpsc, }; -use tracing::{debug, info_span, warn}; +use tracing::{debug, info_span, instrument, warn}; use tvix_castore::{ blobservice::{BlobReader, BlobService}, directoryservice::DirectoryService, @@ -211,33 +211,6 @@ impl TvixStoreFs { } } } - - /// This will lookup a directory by digest, and will turn it into a - /// [InodeData::Directory(DirectoryInodeData::Populated(..))]. - /// This is both used to initially insert the root node of a store path, - /// as well as when looking up an intermediate DirectoryNode. - fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> { - let directory_service = self.directory_service.clone(); - let directory_digest_clone = directory_digest.clone(); - let task = self - .tokio_handle - .spawn(async move { directory_service.get(&directory_digest_clone).await }); - match self.tokio_handle.block_on(task).unwrap() { - Err(e) => { - warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory"); - Err(e) - } - // If the Directory can't be found, this is a hole, bail out. - Ok(None) => { - tracing::error!(directory.digest=%directory_digest, "directory not found in directory service"); - Err(Error::StorageError(format!( - "directory {} not found", - directory_digest - ))) - } - Ok(Some(directory)) => Ok(directory.into()), - } - } } impl FileSystem for TvixStoreFs { @@ -319,7 +292,12 @@ impl FileSystem for TvixStoreFs { return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); } InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { - match self.fetch_directory_inode_data(parent_digest) { + let directory_service = self.directory_service.clone(); + let parent_digest = parent_digest.to_owned(); + let task = self.tokio_handle.spawn(async move { + fetch_directory_inode_data(directory_service, &parent_digest).await + }); + match self.tokio_handle.block_on(task).unwrap() { Ok(new_data) => { // update data in [self.inode_tracker] with populated variant. // FUTUREWORK: change put to return the data after @@ -464,7 +442,12 @@ impl FileSystem for TvixStoreFs { return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); } InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => { - match self.fetch_directory_inode_data(directory_digest) { + let directory_digest = directory_digest.to_owned(); + let directory_service = self.directory_service.clone(); + let task = self.tokio_handle.spawn(async move { + fetch_directory_inode_data(directory_service, &directory_digest).await + }); + match self.tokio_handle.block_on(task).unwrap() { Ok(new_data) => { // update data in [self.inode_tracker] with populated variant. // FUTUREWORK: change put to return the data after @@ -688,3 +671,22 @@ impl FileSystem for TvixStoreFs { } } } + +/// This will lookup a directory by digest, and will turn it into a +/// [InodeData::Directory(DirectoryInodeData::Populated(..))]. +/// This is both used to initially insert the root node of a store path, +/// as well as when looking up an intermediate DirectoryNode. +#[instrument(skip_all, fields(directory.digest = %directory_digest), err)] +async fn fetch_directory_inode_data<DS: DirectoryService + ?Sized>( + directory_service: Arc<DS>, + directory_digest: &B3Digest, +) -> Result<InodeData, Error> { + match directory_service.get(directory_digest).await? { + // If the Directory can't be found, this is a hole, bail out. + None => Err(Error::StorageError(format!( + "directory {} not found", + directory_digest + ))), + Some(directory) => Ok(directory.into()), + } +} |