about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs7
-rw-r--r--tvix/castore/src/proto/tests/grpc_directoryservice.rs19
-rw-r--r--tvix/castore/src/utils.rs97
3 files changed, 47 insertions, 76 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index b24e8da9d1f5..70302c3305da 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -339,7 +339,6 @@ impl DirectoryPutter for GRPCPutter {
 mod tests {
     use core::time;
     use futures::StreamExt;
-    use tempfile::TempDir;
 
     use crate::{
         directoryservice::DirectoryService,
@@ -349,11 +348,9 @@ mod tests {
 
     #[tokio::test]
     async fn test() {
-        let tempdir = TempDir::new().expect("must succeed");
         // create the GrpcDirectoryService
-        let directory_service = super::GRPCDirectoryService::from_client(
-            gen_directorysvc_grpc_client(tempdir.path()).await,
-        );
+        let directory_service =
+            super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await);
 
         // try to get DIRECTORY_A should return Ok(None)
         assert_eq!(
diff --git a/tvix/castore/src/proto/tests/grpc_directoryservice.rs b/tvix/castore/src/proto/tests/grpc_directoryservice.rs
index bf60c56aad11..4262ab6da750 100644
--- a/tvix/castore/src/proto/tests/grpc_directoryservice.rs
+++ b/tvix/castore/src/proto/tests/grpc_directoryservice.rs
@@ -4,7 +4,6 @@ use crate::proto::get_directory_request::ByWhat;
 use crate::proto::GetDirectoryRequest;
 use crate::proto::{Directory, DirectoryNode, SymlinkNode};
 use crate::utils::gen_directorysvc_grpc_client;
-use tempfile::TempDir;
 use tokio_stream::StreamExt;
 use tonic::transport::Channel;
 use tonic::Status;
@@ -37,8 +36,7 @@ async fn get_directories(
 /// Trying to get a non-existent Directory should return a not found error.
 #[tokio::test]
 async fn not_found() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     let resp = grpc_client
         .get(tonic::Request::new(GetDirectoryRequest {
@@ -66,8 +64,7 @@ async fn not_found() {
 /// Put a Directory into the store, get it back.
 #[tokio::test]
 async fn put_get() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     // send directory A.
     let put_resp = {
@@ -98,8 +95,7 @@ async fn put_get() {
 /// Put multiple Directories into the store, and get them back
 #[tokio::test]
 async fn put_get_multiple() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     // sending "b" (which refers to "a") without sending "a" first should fail.
     let put_resp = {
@@ -157,8 +153,7 @@ async fn put_get_multiple() {
 /// Put multiple Directories into the store, and omit duplicates.
 #[tokio::test]
 async fn put_get_dedup() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     // Send "A", then "C", which refers to "A" two times
     // Pretend we're a dumb client sending A twice.
@@ -196,8 +191,7 @@ async fn put_get_dedup() {
 /// Trying to upload a Directory failing validation should fail.
 #[tokio::test]
 async fn put_reject_failed_validation() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     // construct a broken Directory message that fails validation
     let broken_directory = Directory {
@@ -223,8 +217,7 @@ async fn put_reject_failed_validation() {
 /// Trying to upload a Directory with wrong size should fail.
 #[tokio::test]
 async fn put_reject_wrong_size() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     // Construct a directory referring to DIRECTORY_A, but with wrong size.
     let broken_parent_directory = Directory {
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs
index 853ff4876e37..1444351a5644 100644
--- a/tvix/castore/src/utils.rs
+++ b/tvix/castore/src/utils.rs
@@ -3,11 +3,9 @@
 //! Only used for testing purposes, but across crates.
 //! Should be removed once we have a better concept of a "Service registry".
 
-use core::time;
-use std::{path::Path, sync::Arc, thread};
-
-use tokio::net::{UnixListener, UnixStream};
-use tokio_stream::wrappers::UnixListenerStream;
+use pin_project_lite::pin_project;
+use std::sync::Arc;
+use tokio::io::DuplexStream;
 use tonic::transport::{Channel, Endpoint, Server, Uri};
 
 use crate::{
@@ -27,64 +25,47 @@ pub fn gen_directory_service() -> Arc<dyn DirectoryService> {
     Arc::new(MemoryDirectoryService::default())
 }
 
-/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there.
-/// Once it's listening, it'll start a gRPC client from the original thread, and return it.
-/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones.
-#[allow(dead_code)]
-pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient<Channel> {
-    let socket_path = tmpdir.join("socket");
-
-    // Spin up a server, in a thread far away, which spawns its own tokio runtime,
-    // and blocks on the task.
-    let socket_path_clone = socket_path.clone();
-    thread::spawn(move || {
-        // Create the runtime
-        let rt = tokio::runtime::Runtime::new().unwrap();
-        // Get a handle from this runtime
-        let handle = rt.handle();
+pin_project! {
+    /// A wrapper around [DuplexStreamStream],
+    /// implementing [AsyncRead] and [Connected].
+    pub struct DuplexStreamWrapper {
+            #[pin]
+        inner: DuplexStream
+    }
+}
 
-        let task = handle.spawn(async {
-            let uds = UnixListener::bind(socket_path_clone).unwrap();
-            let uds_stream = UnixListenerStream::new(uds);
+/// This will spawn the a gRPC server with a DirectoryService client, and
+/// connect a gRPC DirectoryService client.
+/// The client is returned.
+#[allow(dead_code)]
+pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> {
+    let (left, right) = tokio::io::duplex(64);
 
-            // spin up a new DirectoryService
-            let mut server = Server::builder();
-            let router = server.add_service(DirectoryServiceServer::new(
-                GRPCDirectoryServiceWrapper::from(gen_directory_service()),
-            ));
-            router.serve_with_incoming(uds_stream).await
-        });
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn(async {
+        // spin up a new DirectoryService
+        let mut server = Server::builder();
+        let router = server.add_service(DirectoryServiceServer::new(
+            GRPCDirectoryServiceWrapper::from(gen_directory_service()),
+        ));
 
-        handle.block_on(task)
+        router
+            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+            .await
     });
 
-    // wait for the socket to be created
-    // TODO: pass around FDs instead?
-    {
-        let mut socket_created = false;
-        for _try in 1..20 {
-            if socket_path.exists() {
-                socket_created = true;
-                break;
-            }
-            tokio::time::sleep(time::Duration::from_millis(20)).await;
-        }
-
-        assert!(
-            socket_created,
-            "expected socket path to eventually get created, but never happened"
-        );
-    }
-
-    // Create a channel, connecting to the uds at socket_path.
-    // The URI is unused.
-    let channel = Endpoint::try_from("http://[::]:50051")
-        .unwrap()
-        .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
-            UnixStream::connect(socket_path.clone())
-        }));
-
-    let grpc_client = DirectoryServiceClient::new(channel);
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+    let grpc_client = DirectoryServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
+    );
 
     grpc_client
 }