about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/src/blobservice/grpc.rs112
1 files changed, 47 insertions, 65 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index db9d4a9c00f0..b0544387bb0a 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -281,7 +281,6 @@ impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for GRPCBlobWriter<
 #[cfg(test)]
 mod tests {
     use std::sync::Arc;
-    use std::thread;
 
     use tempfile::TempDir;
     use tokio::net::UnixListener;
@@ -350,77 +349,60 @@ mod tests {
         assert!(GRPCBlobService::from_url(&url).is_err());
     }
 
-    /// This uses the correct scheme for a unix socket, and provides a server on the other side.
-    /// This is not a tokio::test, because spawn two separate tokio runtimes and
-    // want to have explicit control.
-    #[test]
-    fn test_valid_unix_path_ping_pong() {
+    /// This ensures connecting via gRPC works as expected.
+    #[tokio::test]
+    async fn test_valid_unix_path_ping_pong() {
         let tmpdir = TempDir::new().unwrap();
-        let path = tmpdir.path().join("daemon");
-
-        let path_clone = path.clone();
-
-        // Spin up a server, in a thread far away, which spawns its own tokio runtime,
-        // and blocks on the task.
-        thread::spawn(move || {
-            // Create the runtime
-            let rt = tokio::runtime::Runtime::new().unwrap();
-
-            let task = rt.spawn(async {
-                let uds = UnixListener::bind(path_clone).unwrap();
-                let uds_stream = UnixListenerStream::new(uds);
-
-                // spin up a new server
-                let mut server = tonic::transport::Server::builder();
-                let router =
-                    server.add_service(crate::proto::blob_service_server::BlobServiceServer::new(
-                        GRPCBlobServiceWrapper::from(
-                            Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>
-                        ),
-                    ));
-                router.serve_with_incoming(uds_stream).await
-            });
-
-            rt.block_on(task).unwrap().unwrap();
+        let socket_path = tmpdir.path().join("daemon");
+
+        let path_clone = socket_path.clone();
+
+        // Spin up a server
+        tokio::spawn(async {
+            let uds = UnixListener::bind(path_clone).unwrap();
+            let uds_stream = UnixListenerStream::new(uds);
+
+            // spin up a new server
+            let mut server = tonic::transport::Server::builder();
+            let router =
+                server.add_service(crate::proto::blob_service_server::BlobServiceServer::new(
+                    GRPCBlobServiceWrapper::from(
+                        Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>
+                    ),
+                ));
+            router.serve_with_incoming(uds_stream).await
         });
 
-        // Now create another tokio runtime which we'll use in the main test code.
-        let rt = tokio::runtime::Runtime::new().unwrap();
-
-        let task = rt.spawn(async move {
-            // wait for the socket to be created
-            {
-                let mut socket_created = false;
-                // TODO: exponential backoff urgently
-                for _try in 1..20 {
-                    if path.exists() {
-                        socket_created = true;
-                        break;
-                    }
-                    tokio::time::sleep(time::Duration::from_millis(20)).await;
+        // wait for the socket to be created
+        {
+            let mut socket_created = false;
+            // TODO: exponential backoff urgently
+            for _try in 1..20 {
+                if socket_path.exists() {
+                    socket_created = true;
+                    break;
                 }
-
-                assert!(
-                    socket_created,
-                    "expected socket path to eventually get created, but never happened"
-                );
+                tokio::time::sleep(time::Duration::from_millis(20)).await;
             }
 
-            // prepare a client
-            let client = {
-                let mut url =
-                    url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse");
-                url.set_path(path.to_str().unwrap());
-                GRPCBlobService::from_url(&url).expect("must succeed")
-            };
+            assert!(
+                socket_created,
+                "expected socket path to eventually get created, but never happened"
+            );
+        }
 
-            let has = client
-                .has(&fixtures::BLOB_A_DIGEST)
-                .await
-                .expect("must not be err");
+        // prepare a client
+        let grpc_client = {
+            let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display()))
+                .expect("must parse");
+            GRPCBlobService::from_url(&url).expect("must succeed")
+        };
 
-            assert!(!has);
-        });
-        rt.block_on(task).unwrap()
+        let has = grpc_client
+            .has(&fixtures::BLOB_A_DIGEST)
+            .await
+            .expect("must not be err");
+
+        assert!(!has);
     }
 }