about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs7
-rw-r--r--tvix/castore/src/directoryservice/mod.rs2
-rw-r--r--tvix/castore/src/directoryservice/tests/mod.rs19
-rw-r--r--tvix/castore/src/directoryservice/utils.rs26
4 files changed, 41 insertions, 13 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index ff08bad4bd0f..6d81be0e70db 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -130,7 +130,7 @@ where
             // The Directory digests we received so far
             let mut received_directory_digests: HashSet<B3Digest> = HashSet::new();
             // The Directory digests we're still expecting to get sent.
-            let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]);
+            let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest.clone()]);
 
             loop {
                 match stream.message().await {
@@ -170,6 +170,11 @@ where
 
                         yield directory;
                     },
+                    Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => {
+                        // The root directory of the requested closure was not found, return an
+                        // empty stream
+                        return
+                    }
                     Ok(None) => {
                         // If we were still expecting something, that's an error.
                         if !expected_directory_digests.is_empty() {
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
index 603e09d5ddaa..ffd9ea3636ec 100644
--- a/tvix/castore/src/directoryservice/mod.rs
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -68,6 +68,8 @@ pub trait DirectoryService: Send + Sync {
     /// Directories are sent in an order from the root to the leaves, so that
     /// the receiving side can validate each message to be a connected to the root
     /// that has initially been requested.
+    ///
+    /// In case the directory can not be found, this should return an empty stream.
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs
index cc3c5b788a2c..4d14fbea1a92 100644
--- a/tvix/castore/src/directoryservice/tests/mod.rs
+++ b/tvix/castore/src/directoryservice/tests/mod.rs
@@ -30,12 +30,25 @@ use self::utils::make_grpc_directory_service_client;
 #[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))]
 pub fn directory_services(#[case] directory_service: impl DirectoryService) {}
 
-/// Ensures asking for a directory that doesn't exist returns a Ok(None).
+/// Ensures asking for a directory that doesn't exist returns a Ok(None), and a get_recursive
+/// returns an empty stream.
 #[apply(directory_services)]
 #[tokio::test]
 async fn test_non_exist(directory_service: impl DirectoryService) {
-    let resp = directory_service.get(&DIRECTORY_A.digest()).await;
-    assert!(resp.unwrap().is_none())
+    // single get
+    assert_eq!(
+        Ok(None),
+        directory_service.get(&DIRECTORY_A.digest()).await
+    );
+
+    // recursive get
+    assert_eq!(
+        Vec::<Result<proto::Directory, crate::Error>>::new(),
+        directory_service
+            .get_recursive(&DIRECTORY_A.digest())
+            .collect::<Vec<Result<proto::Directory, crate::Error>>>()
+            .await
+    );
 }
 
 /// Putting a single directory into the store, and then getting it out both via
diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs
index a0ba395ecda8..726734f55eec 100644
--- a/tvix/castore/src/directoryservice/utils.rs
+++ b/tvix/castore/src/directoryservice/utils.rs
@@ -25,23 +25,31 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
     // We omit sending the same directories multiple times.
     let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new();
 
+    let root_directory_digest = root_directory_digest.clone();
+
     Box::pin(try_stream! {
         while let Some(current_directory_digest) = worklist_directory_digests.pop_front() {
-            let current_directory = directory_service.get(&current_directory_digest).await.map_err(|e| {
+            let current_directory = match directory_service.get(&current_directory_digest).await.map_err(|e| {
                 warn!("failed to look up directory");
                 Error::StorageError(format!(
                     "unable to look up directory {}: {}",
                     current_directory_digest, e
                 ))
-            })?.ok_or_else(|| {
-                // if it's not there, we have an inconsistent store!
-                warn!("directory {} does not exist", current_directory_digest);
-                Error::StorageError(format!(
-                    "directory {} does not exist",
-                    current_directory_digest
-                ))
+            })? {
+                // the root node of the requested closure was not found, return an empty list
+                None if current_directory_digest == root_directory_digest => break,
+                // if a child directory of the closure is not there, we have an inconsistent store!
+                None => {
+                    warn!("directory {} does not exist", current_directory_digest);
+                    Err(Error::StorageError(format!(
+                        "directory {} does not exist",
+                        current_directory_digest
+                    )))?;
+                    break;
+                }
+                Some(dir) => dir,
+            };
 
-            })?;
 
             // validate, we don't want to send invalid directories.
             current_directory.validate().map_err(|e| {