diff options
author | Florian Klink <flokli@flokli.de> | 2023-10-07T21·01+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-10-08T11·25+0000 |
commit | b172c804b0995cbd74960c534452b19eaaf4a2af (patch) | |
tree | 8e93a758691118cc1b82c517e93446e5d4b85655 /tvix/castore/src/utils.rs | |
parent | d1adefc9f99fa47c4837baf9252a6d21cf273c2c (diff) |
refactor(tvix/castore): use DuplexStream instead of unix socket r/6728
We can use DuplexStream to create to bidirectional pairs, which avoids manually waiting for unix sockets to pop up and connect, and creating temporary directoires to create the unix sockets in. Turns out, we also don't actually need to spawn the server in a separate runtime, it works just fine these days. This might be due to all the sync barriers in between being gone. Change-Id: I6b79823bc6209cbcb343b7a498c64a2ba6e0aee7 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9562 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix/castore/src/utils.rs')
-rw-r--r-- | tvix/castore/src/utils.rs | 97 |
1 files changed, 39 insertions, 58 deletions
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs index 853ff4876e37..1444351a5644 100644 --- a/tvix/castore/src/utils.rs +++ b/tvix/castore/src/utils.rs @@ -3,11 +3,9 @@ //! Only used for testing purposes, but across crates. //! Should be removed once we have a better concept of a "Service registry". -use core::time; -use std::{path::Path, sync::Arc, thread}; - -use tokio::net::{UnixListener, UnixStream}; -use tokio_stream::wrappers::UnixListenerStream; +use pin_project_lite::pin_project; +use std::sync::Arc; +use tokio::io::DuplexStream; use tonic::transport::{Channel, Endpoint, Server, Uri}; use crate::{ @@ -27,64 +25,47 @@ pub fn gen_directory_service() -> Arc<dyn DirectoryService> { Arc::new(MemoryDirectoryService::default()) } -/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there. -/// Once it's listening, it'll start a gRPC client from the original thread, and return it. -/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones. -#[allow(dead_code)] -pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient<Channel> { - let socket_path = tmpdir.join("socket"); - - // Spin up a server, in a thread far away, which spawns its own tokio runtime, - // and blocks on the task. - let socket_path_clone = socket_path.clone(); - thread::spawn(move || { - // Create the runtime - let rt = tokio::runtime::Runtime::new().unwrap(); - // Get a handle from this runtime - let handle = rt.handle(); +pin_project! { + /// A wrapper around [DuplexStreamStream], + /// implementing [AsyncRead] and [Connected]. + pub struct DuplexStreamWrapper { + #[pin] + inner: DuplexStream + } +} - let task = handle.spawn(async { - let uds = UnixListener::bind(socket_path_clone).unwrap(); - let uds_stream = UnixListenerStream::new(uds); +/// This will spawn the a gRPC server with a DirectoryService client, and +/// connect a gRPC DirectoryService client. +/// The client is returned. +#[allow(dead_code)] +pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> { + let (left, right) = tokio::io::duplex(64); - // spin up a new DirectoryService - let mut server = Server::builder(); - let router = server.add_service(DirectoryServiceServer::new( - GRPCDirectoryServiceWrapper::from(gen_directory_service()), - )); - router.serve_with_incoming(uds_stream).await - }); + // spin up a server, which will only connect once, to the left side. + tokio::spawn(async { + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(gen_directory_service()), + )); - handle.block_on(task) + router + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) + .await }); - // wait for the socket to be created - // TODO: pass around FDs instead? - { - let mut socket_created = false; - for _try in 1..20 { - if socket_path.exists() { - socket_created = true; - break; - } - tokio::time::sleep(time::Duration::from_millis(20)).await; - } - - assert!( - socket_created, - "expected socket path to eventually get created, but never happened" - ); - } - - // Create a channel, connecting to the uds at socket_path. - // The URI is unused. - let channel = Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { - UnixStream::connect(socket_path.clone()) - })); - - let grpc_client = DirectoryServiceClient::new(channel); + // Create a client, connecting to the right side. The URI is unused. + let mut maybe_right = Some(right); + let grpc_client = DirectoryServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(right) } + })) + .await + .unwrap(), + ); grpc_client } |