diff options
Diffstat (limited to 'tvix/castore/src/utils.rs')
-rw-r--r-- | tvix/castore/src/utils.rs | 97 |
1 files changed, 39 insertions, 58 deletions
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs index 853ff4876e37..1444351a5644 100644 --- a/tvix/castore/src/utils.rs +++ b/tvix/castore/src/utils.rs @@ -3,11 +3,9 @@ //! Only used for testing purposes, but across crates. //! Should be removed once we have a better concept of a "Service registry". -use core::time; -use std::{path::Path, sync::Arc, thread}; - -use tokio::net::{UnixListener, UnixStream}; -use tokio_stream::wrappers::UnixListenerStream; +use pin_project_lite::pin_project; +use std::sync::Arc; +use tokio::io::DuplexStream; use tonic::transport::{Channel, Endpoint, Server, Uri}; use crate::{ @@ -27,64 +25,47 @@ pub fn gen_directory_service() -> Arc<dyn DirectoryService> { Arc::new(MemoryDirectoryService::default()) } -/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there. -/// Once it's listening, it'll start a gRPC client from the original thread, and return it. -/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones. -#[allow(dead_code)] -pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient<Channel> { - let socket_path = tmpdir.join("socket"); - - // Spin up a server, in a thread far away, which spawns its own tokio runtime, - // and blocks on the task. - let socket_path_clone = socket_path.clone(); - thread::spawn(move || { - // Create the runtime - let rt = tokio::runtime::Runtime::new().unwrap(); - // Get a handle from this runtime - let handle = rt.handle(); +pin_project! { + /// A wrapper around [DuplexStreamStream], + /// implementing [AsyncRead] and [Connected]. + pub struct DuplexStreamWrapper { + #[pin] + inner: DuplexStream + } +} - let task = handle.spawn(async { - let uds = UnixListener::bind(socket_path_clone).unwrap(); - let uds_stream = UnixListenerStream::new(uds); +/// This will spawn the a gRPC server with a DirectoryService client, and +/// connect a gRPC DirectoryService client. +/// The client is returned. +#[allow(dead_code)] +pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> { + let (left, right) = tokio::io::duplex(64); - // spin up a new DirectoryService - let mut server = Server::builder(); - let router = server.add_service(DirectoryServiceServer::new( - GRPCDirectoryServiceWrapper::from(gen_directory_service()), - )); - router.serve_with_incoming(uds_stream).await - }); + // spin up a server, which will only connect once, to the left side. + tokio::spawn(async { + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(gen_directory_service()), + )); - handle.block_on(task) + router + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) + .await }); - // wait for the socket to be created - // TODO: pass around FDs instead? - { - let mut socket_created = false; - for _try in 1..20 { - if socket_path.exists() { - socket_created = true; - break; - } - tokio::time::sleep(time::Duration::from_millis(20)).await; - } - - assert!( - socket_created, - "expected socket path to eventually get created, but never happened" - ); - } - - // Create a channel, connecting to the uds at socket_path. - // The URI is unused. - let channel = Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { - UnixStream::connect(socket_path.clone()) - })); - - let grpc_client = DirectoryServiceClient::new(channel); + // Create a client, connecting to the right side. The URI is unused. + let mut maybe_right = Some(right); + let grpc_client = DirectoryServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(right) } + })) + .await + .unwrap(), + ); grpc_client } |