diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 63 |
1 files changed, 61 insertions, 2 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 70302c3305da..7ef9f84b0a16 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -339,10 +339,16 @@ impl DirectoryPutter for GRPCPutter { mod tests { use core::time; use futures::StreamExt; + use std::{sync::Arc, time::Duration}; + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio_retry::{strategy::ExponentialBackoff, Retry}; + use tokio_stream::wrappers::UnixListenerStream; use crate::{ - directoryservice::DirectoryService, - fixtures::{DIRECTORY_A, DIRECTORY_B}, + directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService}, + fixtures::{self, DIRECTORY_A, DIRECTORY_B}, + proto::GRPCDirectoryServiceWrapper, utils::gen_directorysvc_grpc_client, }; @@ -467,4 +473,57 @@ mod tests { .expect_err("must fail"); } } + + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("daemon"); + + let path_clone = socket_path.clone(); + + // Spin up a server + tokio::spawn(async { + let uds = UnixListener::bind(path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = server.add_service( + crate::proto::directory_service_server::DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from( + Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService> + ), + ), + ); + router.serve_with_incoming(uds_stream).await + }); + + // wait for the socket to be created + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // prepare a client + let grpc_client = { + let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) + .expect("must parse"); + GRPCDirectoryService::from_url(&url).expect("must succeed") + }; + + assert!(grpc_client + .get(&fixtures::DIRECTORY_A.digest()) + .await + .expect("must not fail") + .is_none()) + } } |