From a77914db7347ddfe8d3d7bc9614f42bc4cee8436 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sat, 7 Oct 2023 08:35:31 +0200 Subject: refactor(tvix/castore/directorysvc): factor out gRPC client gen Move this code into a helper function, which we'll use in other places in a bit. Change-Id: Icae6f6dd2d4b2fa86fd2b836ddd7a4ca0e0354e7 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9559 Autosubmit: flokli Reviewed-by: Connor Brewster Tested-by: BuildkiteCI --- tvix/castore/src/directoryservice/grpc.rs | 278 ++++++++++++------------------ tvix/castore/src/utils.rs | 73 +++++++- 2 files changed, 178 insertions(+), 173 deletions(-) diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index febab2752a1f..b24e8da9d1f5 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -338,202 +338,136 @@ impl DirectoryPutter for GRPCPutter { #[cfg(test)] mod tests { use core::time; - use std::thread; - use futures::StreamExt; use tempfile::TempDir; - use tokio::net::{UnixListener, UnixStream}; - use tokio_stream::wrappers::UnixListenerStream; - use tonic::transport::{Endpoint, Server, Uri}; use crate::{ directoryservice::DirectoryService, fixtures::{DIRECTORY_A, DIRECTORY_B}, - proto, - proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, - utils::gen_directory_service, + utils::gen_directorysvc_grpc_client, }; - #[test] - fn test() { - let tmpdir = TempDir::new().unwrap(); - let socket_path = tmpdir.path().join("socket"); - - // Spin up a server, in a thread far away, which spawns its own tokio runtime, - // and blocks on the task. - let socket_path_clone = socket_path.clone(); - thread::spawn(move || { - // Create the runtime - let rt = tokio::runtime::Runtime::new().unwrap(); - // Get a handle from this runtime - let handle = rt.handle(); - - let task = handle.spawn(async { - let uds = UnixListener::bind(socket_path_clone).unwrap(); - let uds_stream = UnixListenerStream::new(uds); - - // spin up a new DirectoryService - let mut server = Server::builder(); - let router = server.add_service(DirectoryServiceServer::new( - GRPCDirectoryServiceWrapper::from(gen_directory_service()), - )); - router.serve_with_incoming(uds_stream).await - }); - - handle.block_on(task) - }); + #[tokio::test] + async fn test() { + let tempdir = TempDir::new().expect("must succeed"); + // create the GrpcDirectoryService + let directory_service = super::GRPCDirectoryService::from_client( + gen_directorysvc_grpc_client(tempdir.path()).await, + ); + + // try to get DIRECTORY_A should return Ok(None) + assert_eq!( + None, + directory_service + .get(&DIRECTORY_A.digest()) + .await + .expect("must not fail") + ); - // set up the local client runtime. This is similar to what the [tokio:test] macro desugars to. - let tester_runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); + // Now upload it + assert_eq!( + DIRECTORY_A.digest(), + directory_service + .put(DIRECTORY_A.clone()) + .await + .expect("must succeed") + ); - // wait for the socket to be created + // And retrieve it, compare for equality. + assert_eq!( + DIRECTORY_A.clone(), + directory_service + .get(&DIRECTORY_A.digest()) + .await + .expect("must succeed") + .expect("must be some") + ); + + // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. + directory_service + .put(DIRECTORY_B.clone()) + .await + .expect_err("must fail"); + + // Putting DIRECTORY_B in a put_multiple will succeed, but the close + // will always fail. { - let mut socket_created = false; - for _try in 1..20 { - if socket_path.exists() { - socket_created = true; - break; - } - std::thread::sleep(time::Duration::from_millis(20)) - } - - assert!( - socket_created, - "expected socket path to eventually get created, but never happened" - ); + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + handle.close().await.expect_err("must fail"); } - tester_runtime.block_on(async move { - // Create a channel, connecting to the uds at socket_path. - // The URI is unused. - let channel = Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { - UnixStream::connect(socket_path.clone()) - })); - - let grpc_client = proto::directory_service_client::DirectoryServiceClient::new(channel); - - // create the GrpcDirectoryService, using the tester_runtime. - let directory_service = super::GRPCDirectoryService::from_client(grpc_client); - - // try to get DIRECTORY_A should return Ok(None) - assert_eq!( - None, - directory_service - .get(&DIRECTORY_A.digest()) - .await - .expect("must not fail") - ); - - // Now upload it - assert_eq!( - DIRECTORY_A.digest(), - directory_service - .put(DIRECTORY_A.clone()) - .await - .expect("must succeed") - ); - - // And retrieve it, compare for equality. - assert_eq!( - DIRECTORY_A.clone(), - directory_service - .get(&DIRECTORY_A.digest()) - .await - .expect("must succeed") - .expect("must be some") - ); - - // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. - directory_service - .put(DIRECTORY_B.clone()) + // Uploading A and then B should succeed, and closing should return the digest of B. + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_A.clone()).await.expect("must succeed"); + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + let digest = handle.close().await.expect("must succeed"); + assert_eq!(DIRECTORY_B.digest(), digest); + + // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. + let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); + assert_eq!( + DIRECTORY_B.clone(), + directories_it + .next() .await - .expect_err("must fail"); + .expect("must be some") + .expect("must succeed") + ); + assert_eq!( + DIRECTORY_A.clone(), + directories_it + .next() + .await + .expect("must be some") + .expect("must succeed") + ); + + // Uploading B and then A should fail, because B refers to A, which + // hasn't been uploaded yet. + // However, the client can burst, so we might not have received the + // error back from the server. + { + let mut handle = directory_service.put_multiple_start(); + // sending out B will always be fine + handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - // Putting DIRECTORY_B in a put_multiple will succeed, but the close - // will always fail. - { - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + // whether we will be able to put A as well depends on whether we + // already received the error about B. + if handle.put(DIRECTORY_A.clone()).await.is_ok() { + // If we didn't, and this was Ok(_), … + // a subsequent close MUST fail (because it waits for the + // server) handle.close().await.expect_err("must fail"); } + } - // Uploading A and then B should succeed, and closing should return the digest of B. + // Now we do the same test as before, send B, then A, but wait + // sufficiently enough for the server to have s + // to close us the stream, + // and then assert that uploading anything else via the handle will fail. + { let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_A.clone()).await.expect("must succeed"); handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - let digest = handle.close().await.expect("must succeed"); - assert_eq!(DIRECTORY_B.digest(), digest); - - // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. - let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); - assert_eq!( - DIRECTORY_B.clone(), - directories_it - .next() - .await - .expect("must be some") - .expect("must succeed") - ); - assert_eq!( - DIRECTORY_A.clone(), - directories_it - .next() - .await - .expect("must be some") - .expect("must succeed") - ); - // Uploading B and then A should fail, because B refers to A, which - // hasn't been uploaded yet. - // However, the client can burst, so we might not have received the - // error back from the server. - { - let mut handle = directory_service.put_multiple_start(); - // sending out B will always be fine - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - - // whether we will be able to put A as well depends on whether we - // already received the error about B. - if handle.put(DIRECTORY_A.clone()).await.is_ok() { - // If we didn't, and this was Ok(_), … - // a subsequent close MUST fail (because it waits for the - // server) - handle.close().await.expect_err("must fail"); + let mut is_closed = false; + for _try in 1..1000 { + if handle.is_closed() { + is_closed = true; + break; } + tokio::time::sleep(time::Duration::from_millis(10)).await; } - // Now we do the same test as before, send B, then A, but wait - // sufficiently enough for the server to have s - // to close us the stream, - // and then assert that uploading anything else via the handle will fail. - { - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - - let mut is_closed = false; - for _try in 1..1000 { - if handle.is_closed() { - is_closed = true; - break; - } - tokio::time::sleep(time::Duration::from_millis(10)).await; - } - - assert!( - is_closed, - "expected channel to eventually close, but never happened" - ); + assert!( + is_closed, + "expected channel to eventually close, but never happened" + ); - handle - .put(DIRECTORY_A.clone()) - .await - .expect_err("must fail"); - } - }); + handle + .put(DIRECTORY_A.clone()) + .await + .expect_err("must fail"); + } } } diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs index 660cb2f4b084..853ff4876e37 100644 --- a/tvix/castore/src/utils.rs +++ b/tvix/castore/src/utils.rs @@ -3,11 +3,20 @@ //! Only used for testing purposes, but across crates. //! Should be removed once we have a better concept of a "Service registry". -use std::sync::Arc; +use core::time; +use std::{path::Path, sync::Arc, thread}; + +use tokio::net::{UnixListener, UnixStream}; +use tokio_stream::wrappers::UnixListenerStream; +use tonic::transport::{Channel, Endpoint, Server, Uri}; use crate::{ blobservice::{BlobService, MemoryBlobService}, directoryservice::{DirectoryService, MemoryDirectoryService}, + proto::{ + directory_service_client::DirectoryServiceClient, + directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper, + }, }; pub fn gen_blob_service() -> Arc { @@ -17,3 +26,65 @@ pub fn gen_blob_service() -> Arc { pub fn gen_directory_service() -> Arc { Arc::new(MemoryDirectoryService::default()) } + +/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there. +/// Once it's listening, it'll start a gRPC client from the original thread, and return it. +/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones. +#[allow(dead_code)] +pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient { + let socket_path = tmpdir.join("socket"); + + // Spin up a server, in a thread far away, which spawns its own tokio runtime, + // and blocks on the task. + let socket_path_clone = socket_path.clone(); + thread::spawn(move || { + // Create the runtime + let rt = tokio::runtime::Runtime::new().unwrap(); + // Get a handle from this runtime + let handle = rt.handle(); + + let task = handle.spawn(async { + let uds = UnixListener::bind(socket_path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(gen_directory_service()), + )); + router.serve_with_incoming(uds_stream).await + }); + + handle.block_on(task) + }); + + // wait for the socket to be created + // TODO: pass around FDs instead? + { + let mut socket_created = false; + for _try in 1..20 { + if socket_path.exists() { + socket_created = true; + break; + } + tokio::time::sleep(time::Duration::from_millis(20)).await; + } + + assert!( + socket_created, + "expected socket path to eventually get created, but never happened" + ); + } + + // Create a channel, connecting to the uds at socket_path. + // The URI is unused. + let channel = Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { + UnixStream::connect(socket_path.clone()) + })); + + let grpc_client = DirectoryServiceClient::new(channel); + + grpc_client +} -- cgit 1.4.1