about summary refs log tree commit diff
path: root/tvix/castore/src
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-10-07T06·35+0200
committerflokli <flokli@flokli.de>2023-10-08T07·30+0000
commita77914db7347ddfe8d3d7bc9614f42bc4cee8436 (patch)
tree9ba3a8ec79627105c48ca027911fe9a98f6b1da4 /tvix/castore/src
parent31f28b6105435869d4ad2d63b564f93509f346c6 (diff)
refactor(tvix/castore/directorysvc): factor out gRPC client gen r/6724
Move this code into a helper function, which we'll use in other places
in a bit.

Change-Id: Icae6f6dd2d4b2fa86fd2b836ddd7a4ca0e0354e7
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9559
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src')
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs278
-rw-r--r--tvix/castore/src/utils.rs73
2 files changed, 178 insertions, 173 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index febab2752a1f..b24e8da9d1f5 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -338,202 +338,136 @@ impl DirectoryPutter for GRPCPutter {
 #[cfg(test)]
 mod tests {
     use core::time;
-    use std::thread;
-
     use futures::StreamExt;
     use tempfile::TempDir;
-    use tokio::net::{UnixListener, UnixStream};
-    use tokio_stream::wrappers::UnixListenerStream;
-    use tonic::transport::{Endpoint, Server, Uri};
 
     use crate::{
         directoryservice::DirectoryService,
         fixtures::{DIRECTORY_A, DIRECTORY_B},
-        proto,
-        proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper},
-        utils::gen_directory_service,
+        utils::gen_directorysvc_grpc_client,
     };
 
-    #[test]
-    fn test() {
-        let tmpdir = TempDir::new().unwrap();
-        let socket_path = tmpdir.path().join("socket");
-
-        // Spin up a server, in a thread far away, which spawns its own tokio runtime,
-        // and blocks on the task.
-        let socket_path_clone = socket_path.clone();
-        thread::spawn(move || {
-            // Create the runtime
-            let rt = tokio::runtime::Runtime::new().unwrap();
-            // Get a handle from this runtime
-            let handle = rt.handle();
-
-            let task = handle.spawn(async {
-                let uds = UnixListener::bind(socket_path_clone).unwrap();
-                let uds_stream = UnixListenerStream::new(uds);
-
-                // spin up a new DirectoryService
-                let mut server = Server::builder();
-                let router = server.add_service(DirectoryServiceServer::new(
-                    GRPCDirectoryServiceWrapper::from(gen_directory_service()),
-                ));
-                router.serve_with_incoming(uds_stream).await
-            });
-
-            handle.block_on(task)
-        });
+    #[tokio::test]
+    async fn test() {
+        let tempdir = TempDir::new().expect("must succeed");
+        // create the GrpcDirectoryService
+        let directory_service = super::GRPCDirectoryService::from_client(
+            gen_directorysvc_grpc_client(tempdir.path()).await,
+        );
+
+        // try to get DIRECTORY_A should return Ok(None)
+        assert_eq!(
+            None,
+            directory_service
+                .get(&DIRECTORY_A.digest())
+                .await
+                .expect("must not fail")
+        );
 
-        // set up the local client runtime. This is similar to what the [tokio:test] macro desugars to.
-        let tester_runtime = tokio::runtime::Builder::new_current_thread()
-            .enable_all()
-            .build()
-            .unwrap();
+        // Now upload it
+        assert_eq!(
+            DIRECTORY_A.digest(),
+            directory_service
+                .put(DIRECTORY_A.clone())
+                .await
+                .expect("must succeed")
+        );
 
-        // wait for the socket to be created
+        // And retrieve it, compare for equality.
+        assert_eq!(
+            DIRECTORY_A.clone(),
+            directory_service
+                .get(&DIRECTORY_A.digest())
+                .await
+                .expect("must succeed")
+                .expect("must be some")
+        );
+
+        // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
+        directory_service
+            .put(DIRECTORY_B.clone())
+            .await
+            .expect_err("must fail");
+
+        // Putting DIRECTORY_B in a put_multiple will succeed, but the close
+        // will always fail.
         {
-            let mut socket_created = false;
-            for _try in 1..20 {
-                if socket_path.exists() {
-                    socket_created = true;
-                    break;
-                }
-                std::thread::sleep(time::Duration::from_millis(20))
-            }
-
-            assert!(
-                socket_created,
-                "expected socket path to eventually get created, but never happened"
-            );
+            let mut handle = directory_service.put_multiple_start();
+            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
+            handle.close().await.expect_err("must fail");
         }
 
-        tester_runtime.block_on(async move {
-            // Create a channel, connecting to the uds at socket_path.
-            // The URI is unused.
-            let channel = Endpoint::try_from("http://[::]:50051")
-                .unwrap()
-                .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
-                    UnixStream::connect(socket_path.clone())
-                }));
-
-            let grpc_client = proto::directory_service_client::DirectoryServiceClient::new(channel);
-
-            // create the GrpcDirectoryService, using the tester_runtime.
-            let directory_service = super::GRPCDirectoryService::from_client(grpc_client);
-
-            // try to get DIRECTORY_A should return Ok(None)
-            assert_eq!(
-                None,
-                directory_service
-                    .get(&DIRECTORY_A.digest())
-                    .await
-                    .expect("must not fail")
-            );
-
-            // Now upload it
-            assert_eq!(
-                DIRECTORY_A.digest(),
-                directory_service
-                    .put(DIRECTORY_A.clone())
-                    .await
-                    .expect("must succeed")
-            );
-
-            // And retrieve it, compare for equality.
-            assert_eq!(
-                DIRECTORY_A.clone(),
-                directory_service
-                    .get(&DIRECTORY_A.digest())
-                    .await
-                    .expect("must succeed")
-                    .expect("must be some")
-            );
-
-            // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
-            directory_service
-                .put(DIRECTORY_B.clone())
+        // Uploading A and then B should succeed, and closing should return the digest of B.
+        let mut handle = directory_service.put_multiple_start();
+        handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
+        handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
+        let digest = handle.close().await.expect("must succeed");
+        assert_eq!(DIRECTORY_B.digest(), digest);
+
+        // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
+        let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest());
+        assert_eq!(
+            DIRECTORY_B.clone(),
+            directories_it
+                .next()
                 .await
-                .expect_err("must fail");
+                .expect("must be some")
+                .expect("must succeed")
+        );
+        assert_eq!(
+            DIRECTORY_A.clone(),
+            directories_it
+                .next()
+                .await
+                .expect("must be some")
+                .expect("must succeed")
+        );
+
+        // Uploading B and then A should fail, because B refers to A, which
+        // hasn't been uploaded yet.
+        // However, the client can burst, so we might not have received the
+        // error back from the server.
+        {
+            let mut handle = directory_service.put_multiple_start();
+            // sending out B will always be fine
+            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
 
-            // Putting DIRECTORY_B in a put_multiple will succeed, but the close
-            // will always fail.
-            {
-                let mut handle = directory_service.put_multiple_start();
-                handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
+            // whether we will be able to put A as well depends on whether we
+            // already received the error about B.
+            if handle.put(DIRECTORY_A.clone()).await.is_ok() {
+                // If we didn't, and this was Ok(_), …
+                // a subsequent close MUST fail (because it waits for the
+                // server)
                 handle.close().await.expect_err("must fail");
             }
+        }
 
-            // Uploading A and then B should succeed, and closing should return the digest of B.
+        // Now we do the same test as before, send B, then A, but wait
+        // sufficiently enough for the server to have s
+        // to close us the stream,
+        // and then assert that uploading anything else via the handle will fail.
+        {
             let mut handle = directory_service.put_multiple_start();
-            handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
             handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-            let digest = handle.close().await.expect("must succeed");
-            assert_eq!(DIRECTORY_B.digest(), digest);
-
-            // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
-            let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest());
-            assert_eq!(
-                DIRECTORY_B.clone(),
-                directories_it
-                    .next()
-                    .await
-                    .expect("must be some")
-                    .expect("must succeed")
-            );
-            assert_eq!(
-                DIRECTORY_A.clone(),
-                directories_it
-                    .next()
-                    .await
-                    .expect("must be some")
-                    .expect("must succeed")
-            );
 
-            // Uploading B and then A should fail, because B refers to A, which
-            // hasn't been uploaded yet.
-            // However, the client can burst, so we might not have received the
-            // error back from the server.
-            {
-                let mut handle = directory_service.put_multiple_start();
-                // sending out B will always be fine
-                handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-
-                // whether we will be able to put A as well depends on whether we
-                // already received the error about B.
-                if handle.put(DIRECTORY_A.clone()).await.is_ok() {
-                    // If we didn't, and this was Ok(_), …
-                    // a subsequent close MUST fail (because it waits for the
-                    // server)
-                    handle.close().await.expect_err("must fail");
+            let mut is_closed = false;
+            for _try in 1..1000 {
+                if handle.is_closed() {
+                    is_closed = true;
+                    break;
                 }
+                tokio::time::sleep(time::Duration::from_millis(10)).await;
             }
 
-            // Now we do the same test as before, send B, then A, but wait
-            // sufficiently enough for the server to have s
-            // to close us the stream,
-            // and then assert that uploading anything else via the handle will fail.
-            {
-                let mut handle = directory_service.put_multiple_start();
-                handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
-
-                let mut is_closed = false;
-                for _try in 1..1000 {
-                    if handle.is_closed() {
-                        is_closed = true;
-                        break;
-                    }
-                    tokio::time::sleep(time::Duration::from_millis(10)).await;
-                }
-
-                assert!(
-                    is_closed,
-                    "expected channel to eventually close, but never happened"
-                );
+            assert!(
+                is_closed,
+                "expected channel to eventually close, but never happened"
+            );
 
-                handle
-                    .put(DIRECTORY_A.clone())
-                    .await
-                    .expect_err("must fail");
-            }
-        });
+            handle
+                .put(DIRECTORY_A.clone())
+                .await
+                .expect_err("must fail");
+        }
     }
 }
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs
index 660cb2f4b084..853ff4876e37 100644
--- a/tvix/castore/src/utils.rs
+++ b/tvix/castore/src/utils.rs
@@ -3,11 +3,20 @@
 //! Only used for testing purposes, but across crates.
 //! Should be removed once we have a better concept of a "Service registry".
 
-use std::sync::Arc;
+use core::time;
+use std::{path::Path, sync::Arc, thread};
+
+use tokio::net::{UnixListener, UnixStream};
+use tokio_stream::wrappers::UnixListenerStream;
+use tonic::transport::{Channel, Endpoint, Server, Uri};
 
 use crate::{
     blobservice::{BlobService, MemoryBlobService},
     directoryservice::{DirectoryService, MemoryDirectoryService},
+    proto::{
+        directory_service_client::DirectoryServiceClient,
+        directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper,
+    },
 };
 
 pub fn gen_blob_service() -> Arc<dyn BlobService> {
@@ -17,3 +26,65 @@ pub fn gen_blob_service() -> Arc<dyn BlobService> {
 pub fn gen_directory_service() -> Arc<dyn DirectoryService> {
     Arc::new(MemoryDirectoryService::default())
 }
+
+/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there.
+/// Once it's listening, it'll start a gRPC client from the original thread, and return it.
+/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones.
+#[allow(dead_code)]
+pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient<Channel> {
+    let socket_path = tmpdir.join("socket");
+
+    // Spin up a server, in a thread far away, which spawns its own tokio runtime,
+    // and blocks on the task.
+    let socket_path_clone = socket_path.clone();
+    thread::spawn(move || {
+        // Create the runtime
+        let rt = tokio::runtime::Runtime::new().unwrap();
+        // Get a handle from this runtime
+        let handle = rt.handle();
+
+        let task = handle.spawn(async {
+            let uds = UnixListener::bind(socket_path_clone).unwrap();
+            let uds_stream = UnixListenerStream::new(uds);
+
+            // spin up a new DirectoryService
+            let mut server = Server::builder();
+            let router = server.add_service(DirectoryServiceServer::new(
+                GRPCDirectoryServiceWrapper::from(gen_directory_service()),
+            ));
+            router.serve_with_incoming(uds_stream).await
+        });
+
+        handle.block_on(task)
+    });
+
+    // wait for the socket to be created
+    // TODO: pass around FDs instead?
+    {
+        let mut socket_created = false;
+        for _try in 1..20 {
+            if socket_path.exists() {
+                socket_created = true;
+                break;
+            }
+            tokio::time::sleep(time::Duration::from_millis(20)).await;
+        }
+
+        assert!(
+            socket_created,
+            "expected socket path to eventually get created, but never happened"
+        );
+    }
+
+    // Create a channel, connecting to the uds at socket_path.
+    // The URI is unused.
+    let channel = Endpoint::try_from("http://[::]:50051")
+        .unwrap()
+        .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
+            UnixStream::connect(socket_path.clone())
+        }));
+
+    let grpc_client = DirectoryServiceClient::new(channel);
+
+    grpc_client
+}