about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-10-07T21·01+0200
committerflokli <flokli@flokli.de>2023-10-08T11·25+0000
commitb172c804b0995cbd74960c534452b19eaaf4a2af (patch)
tree8e93a758691118cc1b82c517e93446e5d4b85655 /tvix
parentd1adefc9f99fa47c4837baf9252a6d21cf273c2c (diff)
refactor(tvix/castore): use DuplexStream instead of unix socket r/6728
We can use DuplexStream to create to bidirectional pairs, which avoids
manually waiting for unix sockets to pop up and connect, and creating
temporary directoires to create the unix sockets in.

Turns out, we also don't actually need to spawn the server in a separate
runtime, it works just fine these days. This might be due to all the
sync barriers in between being gone.

Change-Id: I6b79823bc6209cbcb343b7a498c64a2ba6e0aee7
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9562
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix')
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs7
-rw-r--r--tvix/castore/src/proto/tests/grpc_directoryservice.rs19
-rw-r--r--tvix/castore/src/utils.rs97
3 files changed, 47 insertions, 76 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index b24e8da9d1f5..70302c3305da 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -339,7 +339,6 @@ impl DirectoryPutter for GRPCPutter {
 mod tests {
     use core::time;
     use futures::StreamExt;
-    use tempfile::TempDir;
 
     use crate::{
         directoryservice::DirectoryService,
@@ -349,11 +348,9 @@ mod tests {
 
     #[tokio::test]
     async fn test() {
-        let tempdir = TempDir::new().expect("must succeed");
         // create the GrpcDirectoryService
-        let directory_service = super::GRPCDirectoryService::from_client(
-            gen_directorysvc_grpc_client(tempdir.path()).await,
-        );
+        let directory_service =
+            super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await);
 
         // try to get DIRECTORY_A should return Ok(None)
         assert_eq!(
diff --git a/tvix/castore/src/proto/tests/grpc_directoryservice.rs b/tvix/castore/src/proto/tests/grpc_directoryservice.rs
index bf60c56aad11..4262ab6da750 100644
--- a/tvix/castore/src/proto/tests/grpc_directoryservice.rs
+++ b/tvix/castore/src/proto/tests/grpc_directoryservice.rs
@@ -4,7 +4,6 @@ use crate::proto::get_directory_request::ByWhat;
 use crate::proto::GetDirectoryRequest;
 use crate::proto::{Directory, DirectoryNode, SymlinkNode};
 use crate::utils::gen_directorysvc_grpc_client;
-use tempfile::TempDir;
 use tokio_stream::StreamExt;
 use tonic::transport::Channel;
 use tonic::Status;
@@ -37,8 +36,7 @@ async fn get_directories(
 /// Trying to get a non-existent Directory should return a not found error.
 #[tokio::test]
 async fn not_found() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     let resp = grpc_client
         .get(tonic::Request::new(GetDirectoryRequest {
@@ -66,8 +64,7 @@ async fn not_found() {
 /// Put a Directory into the store, get it back.
 #[tokio::test]
 async fn put_get() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     // send directory A.
     let put_resp = {
@@ -98,8 +95,7 @@ async fn put_get() {
 /// Put multiple Directories into the store, and get them back
 #[tokio::test]
 async fn put_get_multiple() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     // sending "b" (which refers to "a") without sending "a" first should fail.
     let put_resp = {
@@ -157,8 +153,7 @@ async fn put_get_multiple() {
 /// Put multiple Directories into the store, and omit duplicates.
 #[tokio::test]
 async fn put_get_dedup() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     // Send "A", then "C", which refers to "A" two times
     // Pretend we're a dumb client sending A twice.
@@ -196,8 +191,7 @@ async fn put_get_dedup() {
 /// Trying to upload a Directory failing validation should fail.
 #[tokio::test]
 async fn put_reject_failed_validation() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     // construct a broken Directory message that fails validation
     let broken_directory = Directory {
@@ -223,8 +217,7 @@ async fn put_reject_failed_validation() {
 /// Trying to upload a Directory with wrong size should fail.
 #[tokio::test]
 async fn put_reject_wrong_size() {
-    let tempdir = TempDir::new().expect("must succeed");
-    let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await;
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
 
     // Construct a directory referring to DIRECTORY_A, but with wrong size.
     let broken_parent_directory = Directory {
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs
index 853ff4876e37..1444351a5644 100644
--- a/tvix/castore/src/utils.rs
+++ b/tvix/castore/src/utils.rs
@@ -3,11 +3,9 @@
 //! Only used for testing purposes, but across crates.
 //! Should be removed once we have a better concept of a "Service registry".
 
-use core::time;
-use std::{path::Path, sync::Arc, thread};
-
-use tokio::net::{UnixListener, UnixStream};
-use tokio_stream::wrappers::UnixListenerStream;
+use pin_project_lite::pin_project;
+use std::sync::Arc;
+use tokio::io::DuplexStream;
 use tonic::transport::{Channel, Endpoint, Server, Uri};
 
 use crate::{
@@ -27,64 +25,47 @@ pub fn gen_directory_service() -> Arc<dyn DirectoryService> {
     Arc::new(MemoryDirectoryService::default())
 }
 
-/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there.
-/// Once it's listening, it'll start a gRPC client from the original thread, and return it.
-/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones.
-#[allow(dead_code)]
-pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient<Channel> {
-    let socket_path = tmpdir.join("socket");
-
-    // Spin up a server, in a thread far away, which spawns its own tokio runtime,
-    // and blocks on the task.
-    let socket_path_clone = socket_path.clone();
-    thread::spawn(move || {
-        // Create the runtime
-        let rt = tokio::runtime::Runtime::new().unwrap();
-        // Get a handle from this runtime
-        let handle = rt.handle();
+pin_project! {
+    /// A wrapper around [DuplexStreamStream],
+    /// implementing [AsyncRead] and [Connected].
+    pub struct DuplexStreamWrapper {
+            #[pin]
+        inner: DuplexStream
+    }
+}
 
-        let task = handle.spawn(async {
-            let uds = UnixListener::bind(socket_path_clone).unwrap();
-            let uds_stream = UnixListenerStream::new(uds);
+/// This will spawn the a gRPC server with a DirectoryService client, and
+/// connect a gRPC DirectoryService client.
+/// The client is returned.
+#[allow(dead_code)]
+pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> {
+    let (left, right) = tokio::io::duplex(64);
 
-            // spin up a new DirectoryService
-            let mut server = Server::builder();
-            let router = server.add_service(DirectoryServiceServer::new(
-                GRPCDirectoryServiceWrapper::from(gen_directory_service()),
-            ));
-            router.serve_with_incoming(uds_stream).await
-        });
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn(async {
+        // spin up a new DirectoryService
+        let mut server = Server::builder();
+        let router = server.add_service(DirectoryServiceServer::new(
+            GRPCDirectoryServiceWrapper::from(gen_directory_service()),
+        ));
 
-        handle.block_on(task)
+        router
+            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+            .await
     });
 
-    // wait for the socket to be created
-    // TODO: pass around FDs instead?
-    {
-        let mut socket_created = false;
-        for _try in 1..20 {
-            if socket_path.exists() {
-                socket_created = true;
-                break;
-            }
-            tokio::time::sleep(time::Duration::from_millis(20)).await;
-        }
-
-        assert!(
-            socket_created,
-            "expected socket path to eventually get created, but never happened"
-        );
-    }
-
-    // Create a channel, connecting to the uds at socket_path.
-    // The URI is unused.
-    let channel = Endpoint::try_from("http://[::]:50051")
-        .unwrap()
-        .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
-            UnixStream::connect(socket_path.clone())
-        }));
-
-    let grpc_client = DirectoryServiceClient::new(channel);
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+    let grpc_client = DirectoryServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
+    );
 
     grpc_client
 }