about summary refs log tree commit diff
path: root/tvix/castore/src/utils.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-10-07T21·01+0200
committerflokli <flokli@flokli.de>2023-10-08T11·25+0000
commitb172c804b0995cbd74960c534452b19eaaf4a2af (patch)
tree8e93a758691118cc1b82c517e93446e5d4b85655 /tvix/castore/src/utils.rs
parentd1adefc9f99fa47c4837baf9252a6d21cf273c2c (diff)
refactor(tvix/castore): use DuplexStream instead of unix socket r/6728
We can use DuplexStream to create to bidirectional pairs, which avoids
manually waiting for unix sockets to pop up and connect, and creating
temporary directoires to create the unix sockets in.

Turns out, we also don't actually need to spawn the server in a separate
runtime, it works just fine these days. This might be due to all the
sync barriers in between being gone.

Change-Id: I6b79823bc6209cbcb343b7a498c64a2ba6e0aee7
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9562
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to '')
-rw-r--r--tvix/castore/src/utils.rs97
1 files changed, 39 insertions, 58 deletions
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs
index 853ff4876e..1444351a56 100644
--- a/tvix/castore/src/utils.rs
+++ b/tvix/castore/src/utils.rs
@@ -3,11 +3,9 @@
 //! Only used for testing purposes, but across crates.
 //! Should be removed once we have a better concept of a "Service registry".
 
-use core::time;
-use std::{path::Path, sync::Arc, thread};
-
-use tokio::net::{UnixListener, UnixStream};
-use tokio_stream::wrappers::UnixListenerStream;
+use pin_project_lite::pin_project;
+use std::sync::Arc;
+use tokio::io::DuplexStream;
 use tonic::transport::{Channel, Endpoint, Server, Uri};
 
 use crate::{
@@ -27,64 +25,47 @@ pub fn gen_directory_service() -> Arc<dyn DirectoryService> {
     Arc::new(MemoryDirectoryService::default())
 }
 
-/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there.
-/// Once it's listening, it'll start a gRPC client from the original thread, and return it.
-/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones.
-#[allow(dead_code)]
-pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient<Channel> {
-    let socket_path = tmpdir.join("socket");
-
-    // Spin up a server, in a thread far away, which spawns its own tokio runtime,
-    // and blocks on the task.
-    let socket_path_clone = socket_path.clone();
-    thread::spawn(move || {
-        // Create the runtime
-        let rt = tokio::runtime::Runtime::new().unwrap();
-        // Get a handle from this runtime
-        let handle = rt.handle();
+pin_project! {
+    /// A wrapper around [DuplexStreamStream],
+    /// implementing [AsyncRead] and [Connected].
+    pub struct DuplexStreamWrapper {
+            #[pin]
+        inner: DuplexStream
+    }
+}
 
-        let task = handle.spawn(async {
-            let uds = UnixListener::bind(socket_path_clone).unwrap();
-            let uds_stream = UnixListenerStream::new(uds);
+/// This will spawn the a gRPC server with a DirectoryService client, and
+/// connect a gRPC DirectoryService client.
+/// The client is returned.
+#[allow(dead_code)]
+pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> {
+    let (left, right) = tokio::io::duplex(64);
 
-            // spin up a new DirectoryService
-            let mut server = Server::builder();
-            let router = server.add_service(DirectoryServiceServer::new(
-                GRPCDirectoryServiceWrapper::from(gen_directory_service()),
-            ));
-            router.serve_with_incoming(uds_stream).await
-        });
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn(async {
+        // spin up a new DirectoryService
+        let mut server = Server::builder();
+        let router = server.add_service(DirectoryServiceServer::new(
+            GRPCDirectoryServiceWrapper::from(gen_directory_service()),
+        ));
 
-        handle.block_on(task)
+        router
+            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+            .await
     });
 
-    // wait for the socket to be created
-    // TODO: pass around FDs instead?
-    {
-        let mut socket_created = false;
-        for _try in 1..20 {
-            if socket_path.exists() {
-                socket_created = true;
-                break;
-            }
-            tokio::time::sleep(time::Duration::from_millis(20)).await;
-        }
-
-        assert!(
-            socket_created,
-            "expected socket path to eventually get created, but never happened"
-        );
-    }
-
-    // Create a channel, connecting to the uds at socket_path.
-    // The URI is unused.
-    let channel = Endpoint::try_from("http://[::]:50051")
-        .unwrap()
-        .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
-            UnixStream::connect(socket_path.clone())
-        }));
-
-    let grpc_client = DirectoryServiceClient::new(channel);
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+    let grpc_client = DirectoryServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
+    );
 
     grpc_client
 }