diff options
Diffstat (limited to 'tvix/castore/src')
-rw-r--r-- | tvix/castore/src/proto/tests/grpc_blobservice.rs | 46 | ||||
-rw-r--r-- | tvix/castore/src/proto/tests/grpc_directoryservice.rs | 8 | ||||
-rw-r--r-- | tvix/castore/src/utils.rs | 44 |
3 files changed, 63 insertions, 35 deletions
diff --git a/tvix/castore/src/proto/tests/grpc_blobservice.rs b/tvix/castore/src/proto/tests/grpc_blobservice.rs index 0d7b340b4409..a7816bd1e961 100644 --- a/tvix/castore/src/proto/tests/grpc_blobservice.rs +++ b/tvix/castore/src/proto/tests/grpc_blobservice.rs @@ -1,23 +1,17 @@ use crate::fixtures::{BLOB_A, BLOB_A_DIGEST}; -use crate::proto::blob_service_server::BlobService as GRPCBlobService; -use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest}; -use crate::utils::gen_blob_service; +use crate::proto::{BlobChunk, ReadBlobRequest, StatBlobRequest}; +use crate::utils::gen_blobsvc_grpc_client; use tokio_stream::StreamExt; -fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper { - let blob_service = gen_blob_service(); - GRPCBlobServiceWrapper::from(blob_service) -} - /// Trying to read a non-existent blob should return a not found error. #[tokio::test] async fn not_found_read() { - let service = gen_grpc_blob_service(); + let mut grpc_client = gen_blobsvc_grpc_client().await; - let resp = service - .read(tonic::Request::new(ReadBlobRequest { + let resp = grpc_client + .read(ReadBlobRequest { digest: BLOB_A_DIGEST.clone().into(), - })) + }) .await; // We can't use unwrap_err here, because the Ok value doesn't implement @@ -32,13 +26,13 @@ async fn not_found_read() { /// Trying to stat a non-existent blob should return a not found error. #[tokio::test] async fn not_found_stat() { - let service = gen_grpc_blob_service(); + let mut grpc_client = gen_blobsvc_grpc_client().await; - let resp = service - .stat(tonic::Request::new(StatBlobRequest { + let resp = grpc_client + .stat(StatBlobRequest { digest: BLOB_A_DIGEST.clone().into(), ..Default::default() - })) + }) .await .expect_err("must fail"); @@ -49,13 +43,13 @@ async fn not_found_stat() { /// Put a blob in the store, get it back. #[tokio::test] async fn put_read_stat() { - let service = gen_grpc_blob_service(); + let mut grpc_client = gen_blobsvc_grpc_client().await; // Send blob A. - let put_resp = service - .put(tonic_mock::streaming_request(vec![BlobChunk { + let put_resp = grpc_client + .put(tokio_stream::once(BlobChunk { data: BLOB_A.clone(), - }])) + })) .await .expect("must succeed") .into_inner(); @@ -65,20 +59,20 @@ async fn put_read_stat() { // Stat for the digest of A. // We currently don't ask for more granular chunking data, as we don't // expose it yet. - let _resp = service - .stat(tonic::Request::new(StatBlobRequest { + let _resp = grpc_client + .stat(StatBlobRequest { digest: BLOB_A_DIGEST.clone().into(), ..Default::default() - })) + }) .await .expect("must succeed") .into_inner(); // Read the blob. It should return the same data. - let resp = service - .read(tonic::Request::new(ReadBlobRequest { + let resp = grpc_client + .read(ReadBlobRequest { digest: BLOB_A_DIGEST.clone().into(), - })) + }) .await; let mut rx = resp.ok().unwrap().into_inner(); diff --git a/tvix/castore/src/proto/tests/grpc_directoryservice.rs b/tvix/castore/src/proto/tests/grpc_directoryservice.rs index 4262ab6da750..e443b4b1916a 100644 --- a/tvix/castore/src/proto/tests/grpc_directoryservice.rs +++ b/tvix/castore/src/proto/tests/grpc_directoryservice.rs @@ -16,9 +16,7 @@ async fn get_directories( grpc_client: &mut DirectoryServiceClient<Channel>, get_directory_request: GetDirectoryRequest, ) -> Result<Vec<Directory>, Status> { - let resp = grpc_client - .get(tonic::Request::new(get_directory_request)) - .await; + let resp = grpc_client.get(get_directory_request).await; // if the response is an error itself, return the error, otherwise unpack let stream = match resp { @@ -39,10 +37,10 @@ async fn not_found() { let mut grpc_client = gen_directorysvc_grpc_client().await; let resp = grpc_client - .get(tonic::Request::new(GetDirectoryRequest { + .get(GetDirectoryRequest { by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())), ..Default::default() - })) + }) .await; let stream = resp.expect("must succeed").into_inner(); diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs index 1444351a5644..d9a967598841 100644 --- a/tvix/castore/src/utils.rs +++ b/tvix/castore/src/utils.rs @@ -12,8 +12,10 @@ use crate::{ blobservice::{BlobService, MemoryBlobService}, directoryservice::{DirectoryService, MemoryDirectoryService}, proto::{ + blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer, directory_service_client::DirectoryServiceClient, - directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper, + directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper, + GRPCDirectoryServiceWrapper, }, }; @@ -34,9 +36,8 @@ pin_project! { } } -/// This will spawn the a gRPC server with a DirectoryService client, and -/// connect a gRPC DirectoryService client. -/// The client is returned. +/// This will spawn the a gRPC server with a DirectoryService client, connect a +/// gRPC DirectoryService client and return it. #[allow(dead_code)] pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> { let (left, right) = tokio::io::duplex(64); @@ -69,3 +70,38 @@ pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Cha grpc_client } + +/// This will spawn the a gRPC server with a BlobService client, connect a +/// gRPC BlobService client and return it. +#[allow(dead_code)] +pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> { + let (left, right) = tokio::io::duplex(64); + + // spin up a server, which will only connect once, to the left side. + tokio::spawn(async { + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( + gen_blob_service(), + ))); + + router + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) + .await + }); + + // Create a client, connecting to the right side. The URI is unused. + let mut maybe_right = Some(right); + let grpc_client = BlobServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(right) } + })) + .await + .unwrap(), + ); + + grpc_client +} |