about summary refs log tree commit diff
path: root/tvix/castore/src/utils.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-10-07T06·35+0200
committerflokli <flokli@flokli.de>2023-10-08T07·30+0000
commita77914db7347ddfe8d3d7bc9614f42bc4cee8436 (patch)
tree9ba3a8ec79627105c48ca027911fe9a98f6b1da4 /tvix/castore/src/utils.rs
parent31f28b6105435869d4ad2d63b564f93509f346c6 (diff)
refactor(tvix/castore/directorysvc): factor out gRPC client gen r/6724
Move this code into a helper function, which we'll use in other places
in a bit.

Change-Id: Icae6f6dd2d4b2fa86fd2b836ddd7a4ca0e0354e7
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9559
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/utils.rs')
-rw-r--r--tvix/castore/src/utils.rs73
1 files changed, 72 insertions, 1 deletions
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs
index 660cb2f4b084..853ff4876e37 100644
--- a/tvix/castore/src/utils.rs
+++ b/tvix/castore/src/utils.rs
@@ -3,11 +3,20 @@
 //! Only used for testing purposes, but across crates.
 //! Should be removed once we have a better concept of a "Service registry".
 
-use std::sync::Arc;
+use core::time;
+use std::{path::Path, sync::Arc, thread};
+
+use tokio::net::{UnixListener, UnixStream};
+use tokio_stream::wrappers::UnixListenerStream;
+use tonic::transport::{Channel, Endpoint, Server, Uri};
 
 use crate::{
     blobservice::{BlobService, MemoryBlobService},
     directoryservice::{DirectoryService, MemoryDirectoryService},
+    proto::{
+        directory_service_client::DirectoryServiceClient,
+        directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper,
+    },
 };
 
 pub fn gen_blob_service() -> Arc<dyn BlobService> {
@@ -17,3 +26,65 @@ pub fn gen_blob_service() -> Arc<dyn BlobService> {
 pub fn gen_directory_service() -> Arc<dyn DirectoryService> {
     Arc::new(MemoryDirectoryService::default())
 }
+
+/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there.
+/// Once it's listening, it'll start a gRPC client from the original thread, and return it.
+/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones.
+#[allow(dead_code)]
+pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient<Channel> {
+    let socket_path = tmpdir.join("socket");
+
+    // Spin up a server, in a thread far away, which spawns its own tokio runtime,
+    // and blocks on the task.
+    let socket_path_clone = socket_path.clone();
+    thread::spawn(move || {
+        // Create the runtime
+        let rt = tokio::runtime::Runtime::new().unwrap();
+        // Get a handle from this runtime
+        let handle = rt.handle();
+
+        let task = handle.spawn(async {
+            let uds = UnixListener::bind(socket_path_clone).unwrap();
+            let uds_stream = UnixListenerStream::new(uds);
+
+            // spin up a new DirectoryService
+            let mut server = Server::builder();
+            let router = server.add_service(DirectoryServiceServer::new(
+                GRPCDirectoryServiceWrapper::from(gen_directory_service()),
+            ));
+            router.serve_with_incoming(uds_stream).await
+        });
+
+        handle.block_on(task)
+    });
+
+    // wait for the socket to be created
+    // TODO: pass around FDs instead?
+    {
+        let mut socket_created = false;
+        for _try in 1..20 {
+            if socket_path.exists() {
+                socket_created = true;
+                break;
+            }
+            tokio::time::sleep(time::Duration::from_millis(20)).await;
+        }
+
+        assert!(
+            socket_created,
+            "expected socket path to eventually get created, but never happened"
+        );
+    }
+
+    // Create a channel, connecting to the uds at socket_path.
+    // The URI is unused.
+    let channel = Endpoint::try_from("http://[::]:50051")
+        .unwrap()
+        .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
+            UnixStream::connect(socket_path.clone())
+        }));
+
+    let grpc_client = DirectoryServiceClient::new(channel);
+
+    grpc_client
+}