about summary refs log tree commit diff
path: root/tvix/store/src/fs/mod.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-10-12T18·20+0200
committerflokli <flokli@flokli.de>2023-10-14T12·34+0000
commit173641ed37318fe61886ccb0402a9929b0f5783d (patch)
treef52cf6f98423b07c7813aca453cd8a93fe6d7a49 /tvix/store/src/fs/mod.rs
parent9757bf637791b8c2169225990682e5d42141e97d (diff)
refactor(tvix/store/fs): make fetch_directory_inode_data async r/6803
To make this easier, move it outside of TvixStoreFs, and accept the
DirectoryService as a function argument, so we don't need to worry about
the lifetime of self.

This also aligns with how we spawn async tasks inside the rest of
TvixStoreFs.

Change-Id: I3b95072209d32039f05aed122240f2d6db7ad172
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9713
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to '')
-rw-r--r--tvix/store/src/fs/mod.rs62
1 files changed, 32 insertions, 30 deletions
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs
index 1333983460..d7bb1a50a2 100644
--- a/tvix/store/src/fs/mod.rs
+++ b/tvix/store/src/fs/mod.rs
@@ -30,7 +30,7 @@ use tokio::{
     io::{AsyncBufReadExt, AsyncSeekExt},
     sync::mpsc,
 };
-use tracing::{debug, info_span, warn};
+use tracing::{debug, info_span, instrument, warn};
 use tvix_castore::{
     blobservice::{BlobReader, BlobService},
     directoryservice::DirectoryService,
@@ -211,33 +211,6 @@ impl TvixStoreFs {
             }
         }
     }
-
-    /// This will lookup a directory by digest, and will turn it into a
-    /// [InodeData::Directory(DirectoryInodeData::Populated(..))].
-    /// This is both used to initially insert the root node of a store path,
-    /// as well as when looking up an intermediate DirectoryNode.
-    fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> {
-        let directory_service = self.directory_service.clone();
-        let directory_digest_clone = directory_digest.clone();
-        let task = self
-            .tokio_handle
-            .spawn(async move { directory_service.get(&directory_digest_clone).await });
-        match self.tokio_handle.block_on(task).unwrap() {
-            Err(e) => {
-                warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory");
-                Err(e)
-            }
-            // If the Directory can't be found, this is a hole, bail out.
-            Ok(None) => {
-                tracing::error!(directory.digest=%directory_digest, "directory not found in directory service");
-                Err(Error::StorageError(format!(
-                    "directory {} not found",
-                    directory_digest
-                )))
-            }
-            Ok(Some(directory)) => Ok(directory.into()),
-        }
-    }
 }
 
 impl FileSystem for TvixStoreFs {
@@ -319,7 +292,12 @@ impl FileSystem for TvixStoreFs {
                 return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
             }
             InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
-                match self.fetch_directory_inode_data(parent_digest) {
+                let directory_service = self.directory_service.clone();
+                let parent_digest = parent_digest.to_owned();
+                let task = self.tokio_handle.spawn(async move {
+                    fetch_directory_inode_data(directory_service, &parent_digest).await
+                });
+                match self.tokio_handle.block_on(task).unwrap() {
                     Ok(new_data) => {
                         // update data in [self.inode_tracker] with populated variant.
                         // FUTUREWORK: change put to return the data after
@@ -464,7 +442,12 @@ impl FileSystem for TvixStoreFs {
                 return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
             }
             InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => {
-                match self.fetch_directory_inode_data(directory_digest) {
+                let directory_digest = directory_digest.to_owned();
+                let directory_service = self.directory_service.clone();
+                let task = self.tokio_handle.spawn(async move {
+                    fetch_directory_inode_data(directory_service, &directory_digest).await
+                });
+                match self.tokio_handle.block_on(task).unwrap() {
                     Ok(new_data) => {
                         // update data in [self.inode_tracker] with populated variant.
                         // FUTUREWORK: change put to return the data after
@@ -688,3 +671,22 @@ impl FileSystem for TvixStoreFs {
         }
     }
 }
+
+/// This will lookup a directory by digest, and will turn it into a
+/// [InodeData::Directory(DirectoryInodeData::Populated(..))].
+/// This is both used to initially insert the root node of a store path,
+/// as well as when looking up an intermediate DirectoryNode.
+#[instrument(skip_all, fields(directory.digest = %directory_digest), err)]
+async fn fetch_directory_inode_data<DS: DirectoryService + ?Sized>(
+    directory_service: Arc<DS>,
+    directory_digest: &B3Digest,
+) -> Result<InodeData, Error> {
+    match directory_service.get(directory_digest).await? {
+        // If the Directory can't be found, this is a hole, bail out.
+        None => Err(Error::StorageError(format!(
+            "directory {} not found",
+            directory_digest
+        ))),
+        Some(directory) => Ok(directory.into()),
+    }
+}