diff options
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 163 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/tests/mod.rs | 3 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/traverse.rs | 8 |
3 files changed, 10 insertions, 164 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index fe410a38257d..84cf01e1679e 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -220,18 +220,6 @@ pub struct GRPCPutter { )>, } -impl GRPCPutter { - // allows checking if the tx part of the channel is closed. - // only used in the test case. - #[cfg(test)] - fn is_closed(&self) -> bool { - match self.rq { - None => true, - Some((_, ref directory_sender)) => directory_sender.is_closed(), - } - } -} - #[async_trait] impl DirectoryPutter for GRPCPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] @@ -280,163 +268,18 @@ impl DirectoryPutter for GRPCPutter { #[cfg(test)] mod tests { - use core::time; - use futures::StreamExt; - use std::{any::Any, time::Duration}; + use std::time::Duration; use tempfile::TempDir; use tokio::net::UnixListener; use tokio_retry::{strategy::ExponentialBackoff, Retry}; use tokio_stream::wrappers::UnixListenerStream; use crate::{ - directoryservice::{ - grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService, - MemoryDirectoryService, - }, - fixtures::{self, DIRECTORY_A, DIRECTORY_B}, + directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService}, + fixtures, proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper}, - utils::gen_directorysvc_grpc_client, }; - #[tokio::test] - async fn test() { - // create the GrpcDirectoryService - let directory_service = - super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await); - - // try to get DIRECTORY_A should return Ok(None) - assert_eq!( - None, - directory_service - .get(&DIRECTORY_A.digest()) - .await - .expect("must not fail") - ); - - // Now upload it - assert_eq!( - DIRECTORY_A.digest(), - directory_service - .put(DIRECTORY_A.clone()) - .await - .expect("must succeed") - ); - - // And retrieve it, compare for equality. - assert_eq!( - DIRECTORY_A.clone(), - directory_service - .get(&DIRECTORY_A.digest()) - .await - .expect("must succeed") - .expect("must be some") - ); - - // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. - directory_service - .put(DIRECTORY_B.clone()) - .await - .expect_err("must fail"); - - // Putting DIRECTORY_B in a put_multiple will succeed, but the close - // will always fail. - { - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - handle.close().await.expect_err("must fail"); - } - - // Uploading A and then B should succeed, and closing should return the digest of B. - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_A.clone()).await.expect("must succeed"); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - let digest = handle.close().await.expect("must succeed"); - assert_eq!(DIRECTORY_B.digest(), digest); - - // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. - let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); - assert_eq!( - DIRECTORY_B.clone(), - directories_it - .next() - .await - .expect("must be some") - .expect("must succeed") - ); - assert_eq!( - DIRECTORY_A.clone(), - directories_it - .next() - .await - .expect("must be some") - .expect("must succeed") - ); - - // Uploading B and then A should fail, because B refers to A, which - // hasn't been uploaded yet. - // However, the client can burst, so we might not have received the - // error back from the server. - { - let mut handle = directory_service.put_multiple_start(); - // sending out B will always be fine - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - - // whether we will be able to put A as well depends on whether we - // already received the error about B. - if handle.put(DIRECTORY_A.clone()).await.is_ok() { - // If we didn't, and this was Ok(_), … - // a subsequent close MUST fail (because it waits for the - // server) - handle.close().await.expect_err("must fail"); - } - } - - // Now we do the same test as before, send B, then A, but wait - // a long long time so we already received the error from the server - // (causing the internal stream to be closed). - // Uploading anything else subsequently should then fail. - { - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - - // get a GRPCPutter, so we can peek at [is_closed]. - let handle_any = &mut handle as &mut dyn Any; - - // `unchecked_downcast_mut` is unstable for now, - // https://github.com/rust-lang/rust/issues/90850 - // We do the same thing here. - // The reason for why we cannot use the checked downcast lies - // in the fact that: - // - GRPCPutter has type ID A - // - Box<GRPCPutter> has type ID B - // - "Box<dyn GRPCPutter>" (invalid type) has type ID C - // B seems different from C in this context. - // We cannot unpack and perform upcast coercion of the traits as it's an unstable - // feature. - // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose - // of not making leak `is_closed` in the original trait. - let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box<GRPCPutter>) }; - let mut is_closed = false; - for _try in 1..1000 { - if handle.is_closed() { - is_closed = true; - break; - } - tokio::time::sleep(time::Duration::from_millis(10)).await; - } - - assert!( - is_closed, - "expected channel to eventually close, but never happened" - ); - - handle - .put(DIRECTORY_A.clone()) - .await - .expect_err("must fail"); - } - } - /// This ensures connecting via gRPC works as expected. #[tokio::test] async fn test_valid_unix_path_ping_pong() { diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index 5eb2d1919e80..cec49bb2c66a 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -180,7 +180,8 @@ async fn upload_reject_failing_validation(directory_service: impl DirectoryServi ); // Try to upload via put_multiple. We're a bit more permissive here, the - // intermediate .put() might succeed, but then the close MUST fail. + // intermediate .put() might succeed, due to client-side bursting (in the + // case of gRPC), but then the close MUST fail. let mut handle = directory_service.put_multiple_start(); if handle.put(broken_directory).await.is_ok() { assert!( diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs index 5c6975351b40..573581edbdd7 100644 --- a/tvix/castore/src/directoryservice/traverse.rs +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -87,14 +87,16 @@ where mod tests { use std::path::PathBuf; - use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}; - use crate::utils::gen_directory_service; + use crate::{ + directoryservice, + fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, + }; use super::descend_to; #[tokio::test] async fn test_descend_to() { - let directory_service = gen_directory_service(); + let directory_service = directoryservice::from_addr("memory://").await.unwrap(); let mut handle = directory_service.put_multiple_start(); handle |