diff options
author | Florian Klink <flokli@flokli.de> | 2023-10-12T18·20+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-10-14T12·34+0000 |
commit | 173641ed37318fe61886ccb0402a9929b0f5783d (patch) | |
tree | f52cf6f98423b07c7813aca453cd8a93fe6d7a49 /tvix/store/src/fs | |
parent | 9757bf637791b8c2169225990682e5d42141e97d (diff) |
refactor(tvix/store/fs): make fetch_directory_inode_data async r/6803
To make this easier, move it outside of TvixStoreFs, and accept the DirectoryService as a function argument, so we don't need to worry about the lifetime of self. This also aligns with how we spawn async tasks inside the rest of TvixStoreFs. Change-Id: I3b95072209d32039f05aed122240f2d6db7ad172 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9713 Reviewed-by: Connor Brewster <cbrewster@hey.com> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/fs')
-rw-r--r-- | tvix/store/src/fs/mod.rs | 62 |
1 files changed, 32 insertions, 30 deletions
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs index 1333983460ea..d7bb1a50a2cf 100644 --- a/tvix/store/src/fs/mod.rs +++ b/tvix/store/src/fs/mod.rs @@ -30,7 +30,7 @@ use tokio::{ io::{AsyncBufReadExt, AsyncSeekExt}, sync::mpsc, }; -use tracing::{debug, info_span, warn}; +use tracing::{debug, info_span, instrument, warn}; use tvix_castore::{ blobservice::{BlobReader, BlobService}, directoryservice::DirectoryService, @@ -211,33 +211,6 @@ impl TvixStoreFs { } } } - - /// This will lookup a directory by digest, and will turn it into a - /// [InodeData::Directory(DirectoryInodeData::Populated(..))]. - /// This is both used to initially insert the root node of a store path, - /// as well as when looking up an intermediate DirectoryNode. - fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> { - let directory_service = self.directory_service.clone(); - let directory_digest_clone = directory_digest.clone(); - let task = self - .tokio_handle - .spawn(async move { directory_service.get(&directory_digest_clone).await }); - match self.tokio_handle.block_on(task).unwrap() { - Err(e) => { - warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory"); - Err(e) - } - // If the Directory can't be found, this is a hole, bail out. - Ok(None) => { - tracing::error!(directory.digest=%directory_digest, "directory not found in directory service"); - Err(Error::StorageError(format!( - "directory {} not found", - directory_digest - ))) - } - Ok(Some(directory)) => Ok(directory.into()), - } - } } impl FileSystem for TvixStoreFs { @@ -319,7 +292,12 @@ impl FileSystem for TvixStoreFs { return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); } InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { - match self.fetch_directory_inode_data(parent_digest) { + let directory_service = self.directory_service.clone(); + let parent_digest = parent_digest.to_owned(); + let task = self.tokio_handle.spawn(async move { + fetch_directory_inode_data(directory_service, &parent_digest).await + }); + match self.tokio_handle.block_on(task).unwrap() { Ok(new_data) => { // update data in [self.inode_tracker] with populated variant. // FUTUREWORK: change put to return the data after @@ -464,7 +442,12 @@ impl FileSystem for TvixStoreFs { return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); } InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => { - match self.fetch_directory_inode_data(directory_digest) { + let directory_digest = directory_digest.to_owned(); + let directory_service = self.directory_service.clone(); + let task = self.tokio_handle.spawn(async move { + fetch_directory_inode_data(directory_service, &directory_digest).await + }); + match self.tokio_handle.block_on(task).unwrap() { Ok(new_data) => { // update data in [self.inode_tracker] with populated variant. // FUTUREWORK: change put to return the data after @@ -688,3 +671,22 @@ impl FileSystem for TvixStoreFs { } } } + +/// This will lookup a directory by digest, and will turn it into a +/// [InodeData::Directory(DirectoryInodeData::Populated(..))]. +/// This is both used to initially insert the root node of a store path, +/// as well as when looking up an intermediate DirectoryNode. +#[instrument(skip_all, fields(directory.digest = %directory_digest), err)] +async fn fetch_directory_inode_data<DS: DirectoryService + ?Sized>( + directory_service: Arc<DS>, + directory_digest: &B3Digest, +) -> Result<InodeData, Error> { + match directory_service.get(directory_digest).await? { + // If the Directory can't be found, this is a hole, bail out. + None => Err(Error::StorageError(format!( + "directory {} not found", + directory_digest + ))), + Some(directory) => Ok(directory.into()), + } +} |