diff options
Diffstat (limited to 'tvix/castore')
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 7 | ||||
-rw-r--r-- | tvix/castore/src/proto/tests/grpc_directoryservice.rs | 19 | ||||
-rw-r--r-- | tvix/castore/src/utils.rs | 97 |
3 files changed, 47 insertions, 76 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index b24e8da9d1f5..70302c3305da 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -339,7 +339,6 @@ impl DirectoryPutter for GRPCPutter { mod tests { use core::time; use futures::StreamExt; - use tempfile::TempDir; use crate::{ directoryservice::DirectoryService, @@ -349,11 +348,9 @@ mod tests { #[tokio::test] async fn test() { - let tempdir = TempDir::new().expect("must succeed"); // create the GrpcDirectoryService - let directory_service = super::GRPCDirectoryService::from_client( - gen_directorysvc_grpc_client(tempdir.path()).await, - ); + let directory_service = + super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await); // try to get DIRECTORY_A should return Ok(None) assert_eq!( diff --git a/tvix/castore/src/proto/tests/grpc_directoryservice.rs b/tvix/castore/src/proto/tests/grpc_directoryservice.rs index bf60c56aad11..4262ab6da750 100644 --- a/tvix/castore/src/proto/tests/grpc_directoryservice.rs +++ b/tvix/castore/src/proto/tests/grpc_directoryservice.rs @@ -4,7 +4,6 @@ use crate::proto::get_directory_request::ByWhat; use crate::proto::GetDirectoryRequest; use crate::proto::{Directory, DirectoryNode, SymlinkNode}; use crate::utils::gen_directorysvc_grpc_client; -use tempfile::TempDir; use tokio_stream::StreamExt; use tonic::transport::Channel; use tonic::Status; @@ -37,8 +36,7 @@ async fn get_directories( /// Trying to get a non-existent Directory should return a not found error. #[tokio::test] async fn not_found() { - let tempdir = TempDir::new().expect("must succeed"); - let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await; + let mut grpc_client = gen_directorysvc_grpc_client().await; let resp = grpc_client .get(tonic::Request::new(GetDirectoryRequest { @@ -66,8 +64,7 @@ async fn not_found() { /// Put a Directory into the store, get it back. #[tokio::test] async fn put_get() { - let tempdir = TempDir::new().expect("must succeed"); - let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await; + let mut grpc_client = gen_directorysvc_grpc_client().await; // send directory A. let put_resp = { @@ -98,8 +95,7 @@ async fn put_get() { /// Put multiple Directories into the store, and get them back #[tokio::test] async fn put_get_multiple() { - let tempdir = TempDir::new().expect("must succeed"); - let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await; + let mut grpc_client = gen_directorysvc_grpc_client().await; // sending "b" (which refers to "a") without sending "a" first should fail. let put_resp = { @@ -157,8 +153,7 @@ async fn put_get_multiple() { /// Put multiple Directories into the store, and omit duplicates. #[tokio::test] async fn put_get_dedup() { - let tempdir = TempDir::new().expect("must succeed"); - let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await; + let mut grpc_client = gen_directorysvc_grpc_client().await; // Send "A", then "C", which refers to "A" two times // Pretend we're a dumb client sending A twice. @@ -196,8 +191,7 @@ async fn put_get_dedup() { /// Trying to upload a Directory failing validation should fail. #[tokio::test] async fn put_reject_failed_validation() { - let tempdir = TempDir::new().expect("must succeed"); - let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await; + let mut grpc_client = gen_directorysvc_grpc_client().await; // construct a broken Directory message that fails validation let broken_directory = Directory { @@ -223,8 +217,7 @@ async fn put_reject_failed_validation() { /// Trying to upload a Directory with wrong size should fail. #[tokio::test] async fn put_reject_wrong_size() { - let tempdir = TempDir::new().expect("must succeed"); - let mut grpc_client = gen_directorysvc_grpc_client(tempdir.path()).await; + let mut grpc_client = gen_directorysvc_grpc_client().await; // Construct a directory referring to DIRECTORY_A, but with wrong size. let broken_parent_directory = Directory { diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs index 853ff4876e37..1444351a5644 100644 --- a/tvix/castore/src/utils.rs +++ b/tvix/castore/src/utils.rs @@ -3,11 +3,9 @@ //! Only used for testing purposes, but across crates. //! Should be removed once we have a better concept of a "Service registry". -use core::time; -use std::{path::Path, sync::Arc, thread}; - -use tokio::net::{UnixListener, UnixStream}; -use tokio_stream::wrappers::UnixListenerStream; +use pin_project_lite::pin_project; +use std::sync::Arc; +use tokio::io::DuplexStream; use tonic::transport::{Channel, Endpoint, Server, Uri}; use crate::{ @@ -27,64 +25,47 @@ pub fn gen_directory_service() -> Arc<dyn DirectoryService> { Arc::new(MemoryDirectoryService::default()) } -/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there. -/// Once it's listening, it'll start a gRPC client from the original thread, and return it. -/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones. -#[allow(dead_code)] -pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient<Channel> { - let socket_path = tmpdir.join("socket"); - - // Spin up a server, in a thread far away, which spawns its own tokio runtime, - // and blocks on the task. - let socket_path_clone = socket_path.clone(); - thread::spawn(move || { - // Create the runtime - let rt = tokio::runtime::Runtime::new().unwrap(); - // Get a handle from this runtime - let handle = rt.handle(); +pin_project! { + /// A wrapper around [DuplexStreamStream], + /// implementing [AsyncRead] and [Connected]. + pub struct DuplexStreamWrapper { + #[pin] + inner: DuplexStream + } +} - let task = handle.spawn(async { - let uds = UnixListener::bind(socket_path_clone).unwrap(); - let uds_stream = UnixListenerStream::new(uds); +/// This will spawn the a gRPC server with a DirectoryService client, and +/// connect a gRPC DirectoryService client. +/// The client is returned. +#[allow(dead_code)] +pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> { + let (left, right) = tokio::io::duplex(64); - // spin up a new DirectoryService - let mut server = Server::builder(); - let router = server.add_service(DirectoryServiceServer::new( - GRPCDirectoryServiceWrapper::from(gen_directory_service()), - )); - router.serve_with_incoming(uds_stream).await - }); + // spin up a server, which will only connect once, to the left side. + tokio::spawn(async { + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(gen_directory_service()), + )); - handle.block_on(task) + router + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) + .await }); - // wait for the socket to be created - // TODO: pass around FDs instead? - { - let mut socket_created = false; - for _try in 1..20 { - if socket_path.exists() { - socket_created = true; - break; - } - tokio::time::sleep(time::Duration::from_millis(20)).await; - } - - assert!( - socket_created, - "expected socket path to eventually get created, but never happened" - ); - } - - // Create a channel, connecting to the uds at socket_path. - // The URI is unused. - let channel = Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { - UnixStream::connect(socket_path.clone()) - })); - - let grpc_client = DirectoryServiceClient::new(channel); + // Create a client, connecting to the right side. The URI is unused. + let mut maybe_right = Some(right); + let grpc_client = DirectoryServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(right) } + })) + .await + .unwrap(), + ); grpc_client } |