diff options
author | Florian Klink <flokli@flokli.de> | 2024-03-20T20·36+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2024-03-28T07·58+0000 |
commit | 74023a07a4b3d8e99645217b04683c0e54e23be8 (patch) | |
tree | e5c7f42540efe6dc94666e2694031b14ccd6715b /tvix/castore/src | |
parent | 05d3f21eaf0834d44b6f32a49ae3276ebcb6571c (diff) |
refactor(tvix/castore/*): drop utils.rs and grpc directorysvc tests r/7795
This drops pretty much all of castore/utils.rs. There were only two things left in there, both a bit messy and only used for tests: Some `gen_*_service()` helper functions. These can be expressed by `from_addr("memory://")`. The other thing was some plumbing code to test the gRPC layer, by exposing a in-memory implementation via gRPC, and then connecting to that channel via a gRPC client again. Previous CLs moved the connection setup code to {directory,blob}service::tests::utils, close to where we exercise them, the new rstest-based tests. The tests interacting directly on the gRPC types are removed, all scenarios that were in there show now be covered through the rstest ones on the trait level. Change-Id: I450ccccf983b4c62145a25d81c36a40846664814 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11223 Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src')
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 163 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/tests/mod.rs | 3 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/traverse.rs | 8 | ||||
-rw-r--r-- | tvix/castore/src/lib.rs | 1 | ||||
-rw-r--r-- | tvix/castore/src/proto/tests/grpc_blobservice.rs | 94 | ||||
-rw-r--r-- | tvix/castore/src/proto/tests/grpc_directoryservice.rs | 246 | ||||
-rw-r--r-- | tvix/castore/src/proto/tests/mod.rs | 2 | ||||
-rw-r--r-- | tvix/castore/src/tests/import.rs | 19 | ||||
-rw-r--r-- | tvix/castore/src/utils.rs | 89 |
9 files changed, 21 insertions, 604 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index fe410a38257d..84cf01e1679e 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -220,18 +220,6 @@ pub struct GRPCPutter { )>, } -impl GRPCPutter { - // allows checking if the tx part of the channel is closed. - // only used in the test case. - #[cfg(test)] - fn is_closed(&self) -> bool { - match self.rq { - None => true, - Some((_, ref directory_sender)) => directory_sender.is_closed(), - } - } -} - #[async_trait] impl DirectoryPutter for GRPCPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] @@ -280,163 +268,18 @@ impl DirectoryPutter for GRPCPutter { #[cfg(test)] mod tests { - use core::time; - use futures::StreamExt; - use std::{any::Any, time::Duration}; + use std::time::Duration; use tempfile::TempDir; use tokio::net::UnixListener; use tokio_retry::{strategy::ExponentialBackoff, Retry}; use tokio_stream::wrappers::UnixListenerStream; use crate::{ - directoryservice::{ - grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService, - MemoryDirectoryService, - }, - fixtures::{self, DIRECTORY_A, DIRECTORY_B}, + directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService}, + fixtures, proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper}, - utils::gen_directorysvc_grpc_client, }; - #[tokio::test] - async fn test() { - // create the GrpcDirectoryService - let directory_service = - super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await); - - // try to get DIRECTORY_A should return Ok(None) - assert_eq!( - None, - directory_service - .get(&DIRECTORY_A.digest()) - .await - .expect("must not fail") - ); - - // Now upload it - assert_eq!( - DIRECTORY_A.digest(), - directory_service - .put(DIRECTORY_A.clone()) - .await - .expect("must succeed") - ); - - // And retrieve it, compare for equality. - assert_eq!( - DIRECTORY_A.clone(), - directory_service - .get(&DIRECTORY_A.digest()) - .await - .expect("must succeed") - .expect("must be some") - ); - - // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. - directory_service - .put(DIRECTORY_B.clone()) - .await - .expect_err("must fail"); - - // Putting DIRECTORY_B in a put_multiple will succeed, but the close - // will always fail. - { - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - handle.close().await.expect_err("must fail"); - } - - // Uploading A and then B should succeed, and closing should return the digest of B. - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_A.clone()).await.expect("must succeed"); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - let digest = handle.close().await.expect("must succeed"); - assert_eq!(DIRECTORY_B.digest(), digest); - - // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. - let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); - assert_eq!( - DIRECTORY_B.clone(), - directories_it - .next() - .await - .expect("must be some") - .expect("must succeed") - ); - assert_eq!( - DIRECTORY_A.clone(), - directories_it - .next() - .await - .expect("must be some") - .expect("must succeed") - ); - - // Uploading B and then A should fail, because B refers to A, which - // hasn't been uploaded yet. - // However, the client can burst, so we might not have received the - // error back from the server. - { - let mut handle = directory_service.put_multiple_start(); - // sending out B will always be fine - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - - // whether we will be able to put A as well depends on whether we - // already received the error about B. - if handle.put(DIRECTORY_A.clone()).await.is_ok() { - // If we didn't, and this was Ok(_), … - // a subsequent close MUST fail (because it waits for the - // server) - handle.close().await.expect_err("must fail"); - } - } - - // Now we do the same test as before, send B, then A, but wait - // a long long time so we already received the error from the server - // (causing the internal stream to be closed). - // Uploading anything else subsequently should then fail. - { - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - - // get a GRPCPutter, so we can peek at [is_closed]. - let handle_any = &mut handle as &mut dyn Any; - - // `unchecked_downcast_mut` is unstable for now, - // https://github.com/rust-lang/rust/issues/90850 - // We do the same thing here. - // The reason for why we cannot use the checked downcast lies - // in the fact that: - // - GRPCPutter has type ID A - // - Box<GRPCPutter> has type ID B - // - "Box<dyn GRPCPutter>" (invalid type) has type ID C - // B seems different from C in this context. - // We cannot unpack and perform upcast coercion of the traits as it's an unstable - // feature. - // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose - // of not making leak `is_closed` in the original trait. - let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box<GRPCPutter>) }; - let mut is_closed = false; - for _try in 1..1000 { - if handle.is_closed() { - is_closed = true; - break; - } - tokio::time::sleep(time::Duration::from_millis(10)).await; - } - - assert!( - is_closed, - "expected channel to eventually close, but never happened" - ); - - handle - .put(DIRECTORY_A.clone()) - .await - .expect_err("must fail"); - } - } - /// This ensures connecting via gRPC works as expected. #[tokio::test] async fn test_valid_unix_path_ping_pong() { diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index 5eb2d1919e80..cec49bb2c66a 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -180,7 +180,8 @@ async fn upload_reject_failing_validation(directory_service: impl DirectoryServi ); // Try to upload via put_multiple. We're a bit more permissive here, the - // intermediate .put() might succeed, but then the close MUST fail. + // intermediate .put() might succeed, due to client-side bursting (in the + // case of gRPC), but then the close MUST fail. let mut handle = directory_service.put_multiple_start(); if handle.put(broken_directory).await.is_ok() { assert!( diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs index 5c6975351b40..573581edbdd7 100644 --- a/tvix/castore/src/directoryservice/traverse.rs +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -87,14 +87,16 @@ where mod tests { use std::path::PathBuf; - use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}; - use crate::utils::gen_directory_service; + use crate::{ + directoryservice, + fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, + }; use super::descend_to; #[tokio::test] async fn test_descend_to() { - let directory_service = gen_directory_service(); + let directory_service = directoryservice::from_addr("memory://").await.unwrap(); let mut handle = directory_service.put_multiple_start(); handle diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index 1ce092135b83..1a7ac6b4b415 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -12,7 +12,6 @@ pub mod fs; pub mod import; pub mod proto; pub mod tonic; -pub mod utils; pub use digests::{B3Digest, B3_LEN}; pub use errors::Error; diff --git a/tvix/castore/src/proto/tests/grpc_blobservice.rs b/tvix/castore/src/proto/tests/grpc_blobservice.rs deleted file mode 100644 index fb202b7d8a51..000000000000 --- a/tvix/castore/src/proto/tests/grpc_blobservice.rs +++ /dev/null @@ -1,94 +0,0 @@ -use crate::fixtures::{BLOB_A, BLOB_A_DIGEST}; -use crate::proto::{BlobChunk, ReadBlobRequest, StatBlobRequest}; -use crate::utils::gen_blobsvc_grpc_client; -use tokio_stream::StreamExt; - -/// Trying to read a non-existent blob should return a not found error. -#[tokio::test] -async fn not_found_read() { - let mut grpc_client = gen_blobsvc_grpc_client().await; - - let resp = grpc_client - .read(ReadBlobRequest { - digest: BLOB_A_DIGEST.clone().into(), - }) - .await; - - // We can't use unwrap_err here, because the Ok value doesn't implement - // debug. - if let Err(e) = resp { - assert_eq!(e.code(), tonic::Code::NotFound); - } else { - panic!("resp is not err") - } -} - -/// Trying to stat a non-existent blob should return a not found error. -#[tokio::test] -async fn not_found_stat() { - let mut grpc_client = gen_blobsvc_grpc_client().await; - - let resp = grpc_client - .stat(StatBlobRequest { - digest: BLOB_A_DIGEST.clone().into(), - ..Default::default() - }) - .await - .expect_err("must fail"); - - // The resp should be a status with Code::NotFound - assert_eq!(resp.code(), tonic::Code::NotFound); -} - -/// Put a blob in the store, get it back. -#[tokio::test] -async fn put_read_stat() { - let mut grpc_client = gen_blobsvc_grpc_client().await; - - // Send blob A. - let put_resp = grpc_client - .put(tokio_stream::once(BlobChunk { - data: BLOB_A.clone(), - })) - .await - .expect("must succeed") - .into_inner(); - - assert_eq!(BLOB_A_DIGEST.as_slice(), put_resp.digest); - - // Stat for the digest of A. - // We currently don't ask for more granular chunking data, as we don't - // expose it yet. - let _resp = grpc_client - .stat(StatBlobRequest { - digest: BLOB_A_DIGEST.clone().into(), - ..Default::default() - }) - .await - .expect("must succeed") - .into_inner(); - - // Read the blob. It should return the same data. - let resp = grpc_client - .read(ReadBlobRequest { - digest: BLOB_A_DIGEST.clone().into(), - }) - .await; - - let mut rx = resp.ok().unwrap().into_inner(); - - // the stream should contain one element, a BlobChunk with the same contents as BLOB_A. - let item = rx - .next() - .await - .expect("must be some") - .expect("must succeed"); - - assert_eq!(BLOB_A.clone(), item.data); - - // … and no more elements - assert!(rx.next().await.is_none()); - - // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks. - // Test with some bigger blob too -} diff --git a/tvix/castore/src/proto/tests/grpc_directoryservice.rs b/tvix/castore/src/proto/tests/grpc_directoryservice.rs deleted file mode 100644 index dcd9a0ef010e..000000000000 --- a/tvix/castore/src/proto/tests/grpc_directoryservice.rs +++ /dev/null @@ -1,246 +0,0 @@ -use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; -use crate::proto::directory_service_client::DirectoryServiceClient; -use crate::proto::get_directory_request::ByWhat; -use crate::proto::GetDirectoryRequest; -use crate::proto::{Directory, DirectoryNode, SymlinkNode}; -use crate::utils::gen_directorysvc_grpc_client; -use tokio_stream::StreamExt; -use tonic::transport::Channel; -use tonic::Status; - -/// Send the specified GetDirectoryRequest. -/// Returns an error in the case of an error response, or an error in one of -/// the items in the stream, or a Vec<Directory> in the case of a successful -/// request. -async fn get_directories( - grpc_client: &mut DirectoryServiceClient<Channel>, - get_directory_request: GetDirectoryRequest, -) -> Result<Vec<Directory>, Status> { - let resp = grpc_client.get(get_directory_request).await; - - // if the response is an error itself, return the error, otherwise unpack - let stream = match resp { - Ok(resp) => resp, - Err(status) => return Err(status), - } - .into_inner(); - - let directory_results: Vec<Result<Directory, Status>> = stream.collect().await; - - // turn Vec<Result<Directory, Status> into Result<Vec<Directory>,Status> - directory_results.into_iter().collect() -} - -/// Trying to get a non-existent Directory should return a not found error. -#[tokio::test] -async fn not_found() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - let resp = grpc_client - .get(GetDirectoryRequest { - by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())), - ..Default::default() - }) - .await; - - let stream = resp.expect("must succeed").into_inner(); - - let items: Vec<_> = stream.collect().await; - - // The stream should contain one element, an error with Code::NotFound. - assert_eq!(1, items.len()); - let item = items[0].clone(); - - assert!(item.is_err(), "must be err"); - assert_eq!( - tonic::Code::NotFound, - item.unwrap_err().code(), - "must be err" - ); -} - -/// Put a Directory into the store, get it back. -#[tokio::test] -async fn put_get() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - // send directory A. - let put_resp = { - grpc_client - .put(tokio_stream::once(DIRECTORY_A.clone())) - .await - .expect("must succeed") - .into_inner() - }; - - // the sent root_digest should match the calculated digest - assert_eq!(put_resp.root_digest, DIRECTORY_A.digest().as_slice()); - - // get it back - let items = get_directories( - &mut grpc_client, - GetDirectoryRequest { - by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())), - ..Default::default() - }, - ) - .await - .expect("must not error"); - - assert_eq!(vec![DIRECTORY_A.clone()], items); -} - -/// Put multiple Directories into the store, and get them back -#[tokio::test] -async fn put_get_multiple() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - // sending "b" (which refers to "a") without sending "a" first should fail. - let put_resp = { - grpc_client - .put(tokio_stream::once(DIRECTORY_B.clone())) - .await - .expect_err("must fail") - }; - - assert_eq!(tonic::Code::InvalidArgument, put_resp.code()); - - // sending "a", then "b" should succeed, and the response should contain the digest of b. - let put_resp = { - grpc_client - .put(tokio_stream::iter(vec![ - DIRECTORY_A.clone(), - DIRECTORY_B.clone(), - ])) - .await - .expect("must succeed") - .into_inner() - }; - - assert_eq!(DIRECTORY_B.digest().as_slice(), put_resp.root_digest); - - // now, request b, first in non-recursive mode. - let items = get_directories( - &mut grpc_client, - GetDirectoryRequest { - recursive: false, - by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())), - }, - ) - .await - .expect("must not error"); - - // We expect to only get b. - assert_eq!(vec![DIRECTORY_B.clone()], items); - - // now, request b, but in recursive mode. - let items = get_directories( - &mut grpc_client, - GetDirectoryRequest { - recursive: true, - by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())), - }, - ) - .await - .expect("must not error"); - - // We expect to get b, and then a, because that's how we traverse down. - assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], items); -} - -/// Put multiple Directories into the store, and omit duplicates. -#[tokio::test] -async fn put_get_dedup() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - // Send "A", then "C", which refers to "A" two times - // Pretend we're a dumb client sending A twice. - let put_resp = { - grpc_client - .put(tokio_stream::iter(vec![ - DIRECTORY_A.clone(), - DIRECTORY_A.clone(), - DIRECTORY_C.clone(), - ])) - .await - .expect("must succeed") - }; - - assert_eq!( - DIRECTORY_C.digest().as_slice(), - put_resp.into_inner().root_digest - ); - - // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice. - let items = get_directories( - &mut grpc_client, - GetDirectoryRequest { - recursive: true, - by_what: Some(ByWhat::Digest(DIRECTORY_C.digest().into())), - }, - ) - .await - .expect("must not error"); - - // We expect to get C, and then A (once, as the second A has been deduplicated). - assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items); -} - -/// Trying to upload a Directory failing validation should fail. -#[tokio::test] -async fn put_reject_failed_validation() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - // construct a broken Directory message that fails validation - let broken_directory = Directory { - symlinks: vec![SymlinkNode { - name: "".into(), - target: "doesntmatter".into(), - }], - ..Default::default() - }; - assert!(broken_directory.validate().is_err()); - - // send it over, it must fail - let put_resp = { - grpc_client - .put(tokio_stream::once(broken_directory)) - .await - .expect_err("must fail") - }; - - assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); -} - -/// Trying to upload a Directory with wrong size should fail. -#[tokio::test] -async fn put_reject_wrong_size() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - // Construct a directory referring to DIRECTORY_A, but with wrong size. - let broken_parent_directory = Directory { - directories: vec![DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: 42, - }], - ..Default::default() - }; - // Make sure we got the size wrong. - assert_ne!( - broken_parent_directory.directories[0].size, - DIRECTORY_A.size() - ); - - // now upload both (first A, then the broken parent). This must fail. - let put_resp = { - grpc_client - .put(tokio_stream::iter(vec![ - DIRECTORY_A.clone(), - broken_parent_directory, - ])) - .await - .expect_err("must fail") - }; - assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); -} diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs index 8b62fadeb5a6..8d903bacb6c5 100644 --- a/tvix/castore/src/proto/tests/mod.rs +++ b/tvix/castore/src/proto/tests/mod.rs @@ -1,4 +1,2 @@ mod directory; mod directory_nodes_iterator; -mod grpc_blobservice; -mod grpc_directoryservice; diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs index 99e993f36da3..b44b71cd784d 100644 --- a/tvix/castore/src/tests/import.rs +++ b/tvix/castore/src/tests/import.rs @@ -1,8 +1,9 @@ -use crate::blobservice::BlobService; +use crate::blobservice::{self, BlobService}; +use crate::directoryservice; use crate::fixtures::*; use crate::import::ingest_path; use crate::proto; -use crate::utils::{gen_blob_service, gen_directory_service}; + use std::sync::Arc; use tempfile::TempDir; @@ -12,8 +13,8 @@ use std::os::unix::ffi::OsStrExt; #[cfg(target_family = "unix")] #[tokio::test] async fn symlink() { - let blob_service = gen_blob_service(); - let directory_service = gen_directory_service(); + let blob_service = blobservice::from_addr("memory://").await.unwrap(); + let directory_service = directoryservice::from_addr("memory://").await.unwrap(); let tmpdir = TempDir::new().unwrap(); @@ -43,8 +44,9 @@ async fn symlink() { #[tokio::test] async fn single_file() { - let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); - let directory_service = gen_directory_service(); + let blob_service = + Arc::from(blobservice::from_addr("memory://").await.unwrap()) as Arc<dyn BlobService>; + let directory_service = directoryservice::from_addr("memory://").await.unwrap(); let tmpdir = TempDir::new().unwrap(); @@ -75,8 +77,9 @@ async fn single_file() { #[cfg(target_family = "unix")] #[tokio::test] async fn complicated() { - let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); - let directory_service = gen_directory_service(); + let blob_service = + Arc::from(blobservice::from_addr("memory://").await.unwrap()) as Arc<dyn BlobService>; + let directory_service = directoryservice::from_addr("memory://").await.unwrap(); let tmpdir = TempDir::new().unwrap(); diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs deleted file mode 100644 index ca68f9f26c5a..000000000000 --- a/tvix/castore/src/utils.rs +++ /dev/null @@ -1,89 +0,0 @@ -//! A crate containing constructors to provide instances of a BlobService and -//! DirectoryService. Only used for testing purposes, but across crates. -//! Should be removed once we have a better concept of a "Service registry". -use tonic::transport::{Channel, Endpoint, Server, Uri}; - -use crate::{ - blobservice::{BlobService, MemoryBlobService}, - directoryservice::{DirectoryService, MemoryDirectoryService}, - proto::{ - blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer, - directory_service_client::DirectoryServiceClient, - directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper, - GRPCDirectoryServiceWrapper, - }, -}; - -pub fn gen_blob_service() -> Box<dyn BlobService> { - Box::<MemoryBlobService>::default() -} - -pub fn gen_directory_service() -> Box<dyn DirectoryService> { - Box::<MemoryDirectoryService>::default() -} - -/// This will spawn the a gRPC server with a DirectoryService client, connect a -/// gRPC DirectoryService client and return it. -#[allow(dead_code)] -pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> { - let (left, right) = tokio::io::duplex(64); - - // spin up a server, which will only connect once, to the left side. - tokio::spawn(async { - // spin up a new DirectoryService - let mut server = Server::builder(); - let router = server.add_service(DirectoryServiceServer::new( - GRPCDirectoryServiceWrapper::new(gen_directory_service()), - )); - - router - .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) - .await - }); - - // Create a client, connecting to the right side. The URI is unused. - let mut maybe_right = Some(right); - DirectoryServiceClient::new( - Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(tower::service_fn(move |_: Uri| { - let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(right) } - })) - .await - .unwrap(), - ) -} - -/// This will spawn the a gRPC server with a BlobService client, connect a -/// gRPC BlobService client and return it. -#[allow(dead_code)] -pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> { - let (left, right) = tokio::io::duplex(64); - - // spin up a server, which will only connect once, to the left side. - tokio::spawn(async { - // spin up a new DirectoryService - let mut server = Server::builder(); - let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( - gen_blob_service(), - ))); - - router - .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) - .await - }); - - // Create a client, connecting to the right side. The URI is unused. - let mut maybe_right = Some(right); - BlobServiceClient::new( - Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(tower::service_fn(move |_: Uri| { - let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(right) } - })) - .await - .unwrap(), - ) -} |