about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice/grpc.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-03-20T20·36+0200
committerflokli <flokli@flokli.de>2024-03-28T07·58+0000
commit74023a07a4b3d8e99645217b04683c0e54e23be8 (patch)
treee5c7f42540efe6dc94666e2694031b14ccd6715b /tvix/castore/src/directoryservice/grpc.rs
parent05d3f21eaf0834d44b6f32a49ae3276ebcb6571c (diff)
refactor(tvix/castore/*): drop utils.rs and grpc directorysvc tests r/7795
This drops pretty much all of castore/utils.rs.

There were only two things left in there, both a bit messy and only used
for tests:

Some `gen_*_service()` helper functions. These can be expressed by
`from_addr("memory://")`.

The other thing was some plumbing code to test the gRPC layer, by
exposing a in-memory implementation via gRPC, and then connecting to
that channel via a gRPC client again.

Previous CLs moved the connection setup code to
{directory,blob}service::tests::utils, close to where we exercise them,
the new rstest-based tests.

The tests interacting directly on the gRPC types are removed, all
scenarios that were in there show now be covered through the rstest ones
on the trait level.

Change-Id: I450ccccf983b4c62145a25d81c36a40846664814
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11223
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/directoryservice/grpc.rs')
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs163
1 files changed, 3 insertions, 160 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index fe410a38257d..84cf01e1679e 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -220,18 +220,6 @@ pub struct GRPCPutter {
     )>,
 }
 
-impl GRPCPutter {
-    // allows checking if the tx part of the channel is closed.
-    // only used in the test case.
-    #[cfg(test)]
-    fn is_closed(&self) -> bool {
-        match self.rq {
-            None => true,
-            Some((_, ref directory_sender)) => directory_sender.is_closed(),
-        }
-    }
-}
-
 #[async_trait]
 impl DirectoryPutter for GRPCPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
@@ -280,163 +268,18 @@ impl DirectoryPutter for GRPCPutter {
 
 #[cfg(test)]
 mod tests {
-    use core::time;
-    use futures::StreamExt;
-    use std::{any::Any, time::Duration};
+    use std::time::Duration;
     use tempfile::TempDir;
     use tokio::net::UnixListener;
     use tokio_retry::{strategy::ExponentialBackoff, Retry};
     use tokio_stream::wrappers::UnixListenerStream;
 
     use crate::{
-        directoryservice::{
-            grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService,
-            MemoryDirectoryService,
-        },
-        fixtures::{self, DIRECTORY_A, DIRECTORY_B},
+        directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService},
+        fixtures,
         proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper},
-        utils::gen_directorysvc_grpc_client,
     };
 
-    #[tokio::test]
-    async fn test() {
-        // create the GrpcDirectoryService
-        let directory_service =
-            super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await);
-
-        // try to get DIRECTORY_A should return Ok(None)
-        assert_eq!(
-            None,
-            directory_service
-                .get(&DIRECTORY_A.digest())
-                .await
-                .expect("must not fail")
-        );
-
-        // Now upload it
-        assert_eq!(
-            DIRECTORY_A.digest(),
-            directory_service
-                .put(DIRECTORY_A.clone())
-                .await
-                .expect("must succeed")
-        );
-
-        // And retrieve it, compare for equality.
-        assert_eq!(
-            DIRECTORY_A.clone(),
-            directory_service
-                .get(&DIRECTORY_A.digest())
-                .await
-                .expect("must succeed")
-                .expect("must be some")
-        );
-
-        // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
-        directory_service
-            .put(DIRECTORY_B.clone())
-            .await
-            .expect_err("must fail");
-
-        // Putting DIRECTORY_B in a put_multiple will succeed, but the close
-        // will always fail.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-            handle.close().await.expect_err("must fail");
-        }
-
-        // Uploading A and then B should succeed, and closing should return the digest of B.
-        let mut handle = directory_service.put_multiple_start();
-        handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
-        handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-        let digest = handle.close().await.expect("must succeed");
-        assert_eq!(DIRECTORY_B.digest(), digest);
-
-        // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
-        let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest());
-        assert_eq!(
-            DIRECTORY_B.clone(),
-            directories_it
-                .next()
-                .await
-                .expect("must be some")
-                .expect("must succeed")
-        );
-        assert_eq!(
-            DIRECTORY_A.clone(),
-            directories_it
-                .next()
-                .await
-                .expect("must be some")
-                .expect("must succeed")
-        );
-
-        // Uploading B and then A should fail, because B refers to A, which
-        // hasn't been uploaded yet.
-        // However, the client can burst, so we might not have received the
-        // error back from the server.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            // sending out B will always be fine
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-
-            // whether we will be able to put A as well depends on whether we
-            // already received the error about B.
-            if handle.put(DIRECTORY_A.clone()).await.is_ok() {
-                // If we didn't, and this was Ok(_), …
-                // a subsequent close MUST fail (because it waits for the
-                // server)
-                handle.close().await.expect_err("must fail");
-            }
-        }
-
-        // Now we do the same test as before, send B, then A, but wait
-        // a long long time so we already received the error from the server
-        // (causing the internal stream to be closed).
-        // Uploading anything else subsequently should then fail.
-        {
-            let mut handle = directory_service.put_multiple_start();
-            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-
-            // get a GRPCPutter, so we can peek at [is_closed].
-            let handle_any = &mut handle as &mut dyn Any;
-
-            // `unchecked_downcast_mut` is unstable for now,
-            // https://github.com/rust-lang/rust/issues/90850
-            // We do the same thing here.
-            // The reason for why we cannot use the checked downcast lies
-            // in the fact that:
-            // - GRPCPutter has type ID A
-            // - Box<GRPCPutter> has type ID B
-            // - "Box<dyn GRPCPutter>" (invalid type) has type ID C
-            // B seems different from C in this context.
-            // We cannot unpack and perform upcast coercion of the traits as it's an unstable
-            // feature.
-            // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose
-            // of not making leak `is_closed` in the original trait.
-            let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box<GRPCPutter>) };
-            let mut is_closed = false;
-            for _try in 1..1000 {
-                if handle.is_closed() {
-                    is_closed = true;
-                    break;
-                }
-                tokio::time::sleep(time::Duration::from_millis(10)).await;
-            }
-
-            assert!(
-                is_closed,
-                "expected channel to eventually close, but never happened"
-            );
-
-            handle
-                .put(DIRECTORY_A.clone())
-                .await
-                .expect_err("must fail");
-        }
-    }
-
     /// This ensures connecting via gRPC works as expected.
     #[tokio::test]
     async fn test_valid_unix_path_ping_pong() {