diff options
author | Yureka <tvl@yuka.dev> | 2024-07-23T13·32+0200 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-07-23T14·04+0000 |
commit | 94a0e21e68e75c758d1df055890bde8b843fb5ef (patch) | |
tree | 73c2b574798ec6701c65851318cfed8ac32846a8 | |
parent | b9aa6456e22c562fbf849609112194d7ae7c1447 (diff) |
fix(tvix/directoryservice): clarify get_recursive not found r/8407
Change-Id: I47f21b4b7db9304eadb04094d41bf04d443fcc3b Reviewed-on: https://cl.tvl.fyi/c/depot/+/12025 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de> Autosubmit: yuka <yuka@yuka.dev>
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 7 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/mod.rs | 2 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/tests/mod.rs | 19 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/utils.rs | 26 |
4 files changed, 41 insertions, 13 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index ff08bad4bd0f..6d81be0e70db 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -130,7 +130,7 @@ where // The Directory digests we received so far let mut received_directory_digests: HashSet<B3Digest> = HashSet::new(); // The Directory digests we're still expecting to get sent. - let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]); + let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest.clone()]); loop { match stream.message().await { @@ -170,6 +170,11 @@ where yield directory; }, + Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => { + // The root directory of the requested closure was not found, return an + // empty stream + return + } Ok(None) => { // If we were still expecting something, that's an error. if !expected_directory_digests.is_empty() { diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index 603e09d5ddaa..ffd9ea3636ec 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -68,6 +68,8 @@ pub trait DirectoryService: Send + Sync { /// Directories are sent in an order from the root to the leaves, so that /// the receiving side can validate each message to be a connected to the root /// that has initially been requested. + /// + /// In case the directory can not be found, this should return an empty stream. fn get_recursive( &self, root_directory_digest: &B3Digest, diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index cc3c5b788a2c..4d14fbea1a92 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -30,12 +30,25 @@ use self::utils::make_grpc_directory_service_client; #[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))] pub fn directory_services(#[case] directory_service: impl DirectoryService) {} -/// Ensures asking for a directory that doesn't exist returns a Ok(None). +/// Ensures asking for a directory that doesn't exist returns a Ok(None), and a get_recursive +/// returns an empty stream. #[apply(directory_services)] #[tokio::test] async fn test_non_exist(directory_service: impl DirectoryService) { - let resp = directory_service.get(&DIRECTORY_A.digest()).await; - assert!(resp.unwrap().is_none()) + // single get + assert_eq!( + Ok(None), + directory_service.get(&DIRECTORY_A.digest()).await + ); + + // recursive get + assert_eq!( + Vec::<Result<proto::Directory, crate::Error>>::new(), + directory_service + .get_recursive(&DIRECTORY_A.digest()) + .collect::<Vec<Result<proto::Directory, crate::Error>>>() + .await + ); } /// Putting a single directory into the store, and then getting it out both via diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index a0ba395ecda8..726734f55eec 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -25,23 +25,31 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( // We omit sending the same directories multiple times. let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new(); + let root_directory_digest = root_directory_digest.clone(); + Box::pin(try_stream! { while let Some(current_directory_digest) = worklist_directory_digests.pop_front() { - let current_directory = directory_service.get(¤t_directory_digest).await.map_err(|e| { + let current_directory = match directory_service.get(¤t_directory_digest).await.map_err(|e| { warn!("failed to look up directory"); Error::StorageError(format!( "unable to look up directory {}: {}", current_directory_digest, e )) - })?.ok_or_else(|| { - // if it's not there, we have an inconsistent store! - warn!("directory {} does not exist", current_directory_digest); - Error::StorageError(format!( - "directory {} does not exist", - current_directory_digest - )) + })? { + // the root node of the requested closure was not found, return an empty list + None if current_directory_digest == root_directory_digest => break, + // if a child directory of the closure is not there, we have an inconsistent store! + None => { + warn!("directory {} does not exist", current_directory_digest); + Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_digest + )))?; + break; + } + Some(dir) => dir, + }; - })?; // validate, we don't want to send invalid directories. current_directory.validate().map_err(|e| { |