about summary refs log tree commit diff
path: root/tvix/castore/src/utils.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/utils.rs')
-rw-r--r--tvix/castore/src/utils.rs97
1 files changed, 39 insertions, 58 deletions
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs
index 853ff4876e..1444351a56 100644
--- a/tvix/castore/src/utils.rs
+++ b/tvix/castore/src/utils.rs
@@ -3,11 +3,9 @@
 //! Only used for testing purposes, but across crates.
 //! Should be removed once we have a better concept of a "Service registry".
 
-use core::time;
-use std::{path::Path, sync::Arc, thread};
-
-use tokio::net::{UnixListener, UnixStream};
-use tokio_stream::wrappers::UnixListenerStream;
+use pin_project_lite::pin_project;
+use std::sync::Arc;
+use tokio::io::DuplexStream;
 use tonic::transport::{Channel, Endpoint, Server, Uri};
 
 use crate::{
@@ -27,64 +25,47 @@ pub fn gen_directory_service() -> Arc<dyn DirectoryService> {
     Arc::new(MemoryDirectoryService::default())
 }
 
-/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there.
-/// Once it's listening, it'll start a gRPC client from the original thread, and return it.
-/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones.
-#[allow(dead_code)]
-pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient<Channel> {
-    let socket_path = tmpdir.join("socket");
-
-    // Spin up a server, in a thread far away, which spawns its own tokio runtime,
-    // and blocks on the task.
-    let socket_path_clone = socket_path.clone();
-    thread::spawn(move || {
-        // Create the runtime
-        let rt = tokio::runtime::Runtime::new().unwrap();
-        // Get a handle from this runtime
-        let handle = rt.handle();
+pin_project! {
+    /// A wrapper around [DuplexStreamStream],
+    /// implementing [AsyncRead] and [Connected].
+    pub struct DuplexStreamWrapper {
+            #[pin]
+        inner: DuplexStream
+    }
+}
 
-        let task = handle.spawn(async {
-            let uds = UnixListener::bind(socket_path_clone).unwrap();
-            let uds_stream = UnixListenerStream::new(uds);
+/// This will spawn the a gRPC server with a DirectoryService client, and
+/// connect a gRPC DirectoryService client.
+/// The client is returned.
+#[allow(dead_code)]
+pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> {
+    let (left, right) = tokio::io::duplex(64);
 
-            // spin up a new DirectoryService
-            let mut server = Server::builder();
-            let router = server.add_service(DirectoryServiceServer::new(
-                GRPCDirectoryServiceWrapper::from(gen_directory_service()),
-            ));
-            router.serve_with_incoming(uds_stream).await
-        });
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn(async {
+        // spin up a new DirectoryService
+        let mut server = Server::builder();
+        let router = server.add_service(DirectoryServiceServer::new(
+            GRPCDirectoryServiceWrapper::from(gen_directory_service()),
+        ));
 
-        handle.block_on(task)
+        router
+            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+            .await
     });
 
-    // wait for the socket to be created
-    // TODO: pass around FDs instead?
-    {
-        let mut socket_created = false;
-        for _try in 1..20 {
-            if socket_path.exists() {
-                socket_created = true;
-                break;
-            }
-            tokio::time::sleep(time::Duration::from_millis(20)).await;
-        }
-
-        assert!(
-            socket_created,
-            "expected socket path to eventually get created, but never happened"
-        );
-    }
-
-    // Create a channel, connecting to the uds at socket_path.
-    // The URI is unused.
-    let channel = Endpoint::try_from("http://[::]:50051")
-        .unwrap()
-        .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
-            UnixStream::connect(socket_path.clone())
-        }));
-
-    let grpc_client = DirectoryServiceClient::new(channel);
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+    let grpc_client = DirectoryServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
+    );
 
     grpc_client
 }