about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs163
-rw-r--r--tvix/castore/src/directoryservice/tests/mod.rs3
-rw-r--r--tvix/castore/src/directoryservice/traverse.rs8
3 files changed, 10 insertions, 164 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index fe410a38257d..84cf01e1679e 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -220,18 +220,6 @@ pub struct GRPCPutter {
     )>,
 }
 
-impl GRPCPutter {
-    // allows checking if the tx part of the channel is closed.
-    // only used in the test case.
-    #[cfg(test)]
-    fn is_closed(&self) -> bool {
-        match self.rq {
-            None => true,
-            Some((_, ref directory_sender)) => directory_sender.is_closed(),
-        }
-    }
-}
-
 #[async_trait]
 impl DirectoryPutter for GRPCPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
@@ -280,163 +268,18 @@ impl DirectoryPutter for GRPCPutter {
 
 #[cfg(test)]
 mod tests {
-    use core::time;
-    use futures::StreamExt;
-    use std::{any::Any, time::Duration};
+    use std::time::Duration;
     use tempfile::TempDir;
     use tokio::net::UnixListener;
     use tokio_retry::{strategy::ExponentialBackoff, Retry};
     use tokio_stream::wrappers::UnixListenerStream;
 
     use crate::{
-        directoryservice::{
-            grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService,
-            MemoryDirectoryService,
-        },
-        fixtures::{self, DIRECTORY_A, DIRECTORY_B},
+        directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService},
+        fixtures,
         proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper},
-        utils::gen_directorysvc_grpc_client,
     };
 
-    #[tokio::test]
-    async fn test() {
-        // create the GrpcDirectoryService
-        let directory_service =
-            super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await);
-
-        // try to get DIRECTORY_A should return Ok(None)
-        assert_eq!(
-            None,
-            directory_service
-                .get(&DIRECTORY_A.digest())
-                .await
-                .expect("must not fail")
-        );
-
-        // Now upload it
-        assert_eq!(
-            DIRECTORY_A.digest(),
-            directory_service
-                .put(DIRECTORY_A.clone())
-                .await
-                .expect("must succeed")
-        );
-
-        // And retrieve it, compare for equality.
-        assert_eq!(
-            DIRECTORY_A.clone(),
-            directory_service
-                .get(&DIRECTORY_A.digest())
-                .await
-                .expect("must succeed")
-                .expect("must be some")
-        );
-
-        // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
-        directory_service
-            .put(DIRECTORY_B.clone())
-            .await
-            .expect_err("must fail");
-
-        // Putting DIRECTORY_B in a put_multiple will succeed, but the close
-        // will always fail.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-            handle.close().await.expect_err("must fail");
-        }
-
-        // Uploading A and then B should succeed, and closing should return the digest of B.
-        let mut handle = directory_service.put_multiple_start();
-        handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
-        handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-        let digest = handle.close().await.expect("must succeed");
-        assert_eq!(DIRECTORY_B.digest(), digest);
-
-        // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
-        let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest());
-        assert_eq!(
-            DIRECTORY_B.clone(),
-            directories_it
-                .next()
-                .await
-                .expect("must be some")
-                .expect("must succeed")
-        );
-        assert_eq!(
-            DIRECTORY_A.clone(),
-            directories_it
-                .next()
-                .await
-                .expect("must be some")
-                .expect("must succeed")
-        );
-
-        // Uploading B and then A should fail, because B refers to A, which
-        // hasn't been uploaded yet.
-        // However, the client can burst, so we might not have received the
-        // error back from the server.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            // sending out B will always be fine
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-
-            // whether we will be able to put A as well depends on whether we
-            // already received the error about B.
-            if handle.put(DIRECTORY_A.clone()).await.is_ok() {
-                // If we didn't, and this was Ok(_), …
-                // a subsequent close MUST fail (because it waits for the
-                // server)
-                handle.close().await.expect_err("must fail");
-            }
-        }
-
-        // Now we do the same test as before, send B, then A, but wait
-        // a long long time so we already received the error from the server
-        // (causing the internal stream to be closed).
-        // Uploading anything else subsequently should then fail.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-
-            // get a GRPCPutter, so we can peek at [is_closed].
-            let handle_any = &mut handle as &mut dyn Any;
-
-            // `unchecked_downcast_mut` is unstable for now,
-            // https://github.com/rust-lang/rust/issues/90850
-            // We do the same thing here.
-            // The reason for why we cannot use the checked downcast lies
-            // in the fact that:
-            // - GRPCPutter has type ID A
-            // - Box<GRPCPutter> has type ID B
-            // - "Box<dyn GRPCPutter>" (invalid type) has type ID C
-            // B seems different from C in this context.
-            // We cannot unpack and perform upcast coercion of the traits as it's an unstable
-            // feature.
-            // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose
-            // of not making leak `is_closed` in the original trait.
-            let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box<GRPCPutter>) };
-            let mut is_closed = false;
-            for _try in 1..1000 {
-                if handle.is_closed() {
-                    is_closed = true;
-                    break;
-                }
-                tokio::time::sleep(time::Duration::from_millis(10)).await;
-            }
-
-            assert!(
-                is_closed,
-                "expected channel to eventually close, but never happened"
-            );
-
-            handle
-                .put(DIRECTORY_A.clone())
-                .await
-                .expect_err("must fail");
-        }
-    }
-
     /// This ensures connecting via gRPC works as expected.
     #[tokio::test]
     async fn test_valid_unix_path_ping_pong() {
diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs
index 5eb2d1919e80..cec49bb2c66a 100644
--- a/tvix/castore/src/directoryservice/tests/mod.rs
+++ b/tvix/castore/src/directoryservice/tests/mod.rs
@@ -180,7 +180,8 @@ async fn upload_reject_failing_validation(directory_service: impl DirectoryServi
     );
 
     // Try to upload via put_multiple. We're a bit more permissive here, the
-    // intermediate .put() might succeed, but then the close MUST fail.
+    // intermediate .put() might succeed, due to client-side bursting (in the
+    // case of gRPC), but then the close MUST fail.
     let mut handle = directory_service.put_multiple_start();
     if handle.put(broken_directory).await.is_ok() {
         assert!(
diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs
index 5c6975351b40..573581edbdd7 100644
--- a/tvix/castore/src/directoryservice/traverse.rs
+++ b/tvix/castore/src/directoryservice/traverse.rs
@@ -87,14 +87,16 @@ where
 mod tests {
     use std::path::PathBuf;
 
-    use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP};
-    use crate::utils::gen_directory_service;
+    use crate::{
+        directoryservice,
+        fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP},
+    };
 
     use super::descend_to;
 
     #[tokio::test]
     async fn test_descend_to() {
-        let directory_service = gen_directory_service();
+        let directory_service = directoryservice::from_addr("memory://").await.unwrap();
 
         let mut handle = directory_service.put_multiple_start();
         handle