diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/build/src/buildservice/from_addr.rs | 18 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 163 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/tests/mod.rs | 3 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/traverse.rs | 8 | ||||
-rw-r--r-- | tvix/castore/src/lib.rs | 1 | ||||
-rw-r--r-- | tvix/castore/src/proto/tests/grpc_blobservice.rs | 94 | ||||
-rw-r--r-- | tvix/castore/src/proto/tests/grpc_directoryservice.rs | 246 | ||||
-rw-r--r-- | tvix/castore/src/proto/tests/mod.rs | 2 | ||||
-rw-r--r-- | tvix/castore/src/tests/import.rs | 19 | ||||
-rw-r--r-- | tvix/castore/src/utils.rs | 89 |
10 files changed, 31 insertions, 612 deletions
diff --git a/tvix/build/src/buildservice/from_addr.rs b/tvix/build/src/buildservice/from_addr.rs index ee2b4e50b4da..f5c4e6a490bb 100644 --- a/tvix/build/src/buildservice/from_addr.rs +++ b/tvix/build/src/buildservice/from_addr.rs @@ -51,7 +51,10 @@ mod tests { use super::from_addr; use test_case::test_case; - use tvix_castore::utils::{gen_blob_service, gen_directory_service}; + use tvix_castore::{ + blobservice::{BlobService, MemoryBlobService}, + directoryservice::{DirectoryService, MemoryDirectoryService}, + }; /// This uses an unsupported scheme. #[test_case("http://foo.example/test", false; "unsupported scheme")] @@ -71,14 +74,13 @@ mod tests { #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")] #[tokio::test] async fn test_from_addr(uri_str: &str, is_ok: bool) { + let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default()); + let directory_service: Arc<dyn DirectoryService> = + Arc::from(MemoryDirectoryService::default()); assert_eq!( - from_addr( - uri_str, - Arc::from(gen_blob_service()), - Arc::from(gen_directory_service()) - ) - .await - .is_ok(), + from_addr(uri_str, blob_service, directory_service) + .await + .is_ok(), is_ok ) } diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index fe410a38257d..84cf01e1679e 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -220,18 +220,6 @@ pub struct GRPCPutter { )>, } -impl GRPCPutter { - // allows checking if the tx part of the channel is closed. - // only used in the test case. - #[cfg(test)] - fn is_closed(&self) -> bool { - match self.rq { - None => true, - Some((_, ref directory_sender)) => directory_sender.is_closed(), - } - } -} - #[async_trait] impl DirectoryPutter for GRPCPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] @@ -280,163 +268,18 @@ impl DirectoryPutter for GRPCPutter { #[cfg(test)] mod tests { - use core::time; - use futures::StreamExt; - use std::{any::Any, time::Duration}; + use std::time::Duration; use tempfile::TempDir; use tokio::net::UnixListener; use tokio_retry::{strategy::ExponentialBackoff, Retry}; use tokio_stream::wrappers::UnixListenerStream; use crate::{ - directoryservice::{ - grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService, - MemoryDirectoryService, - }, - fixtures::{self, DIRECTORY_A, DIRECTORY_B}, + directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService}, + fixtures, proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper}, - utils::gen_directorysvc_grpc_client, }; - #[tokio::test] - async fn test() { - // create the GrpcDirectoryService - let directory_service = - super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await); - - // try to get DIRECTORY_A should return Ok(None) - assert_eq!( - None, - directory_service - .get(&DIRECTORY_A.digest()) - .await - .expect("must not fail") - ); - - // Now upload it - assert_eq!( - DIRECTORY_A.digest(), - directory_service - .put(DIRECTORY_A.clone()) - .await - .expect("must succeed") - ); - - // And retrieve it, compare for equality. - assert_eq!( - DIRECTORY_A.clone(), - directory_service - .get(&DIRECTORY_A.digest()) - .await - .expect("must succeed") - .expect("must be some") - ); - - // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. - directory_service - .put(DIRECTORY_B.clone()) - .await - .expect_err("must fail"); - - // Putting DIRECTORY_B in a put_multiple will succeed, but the close - // will always fail. - { - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - handle.close().await.expect_err("must fail"); - } - - // Uploading A and then B should succeed, and closing should return the digest of B. - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_A.clone()).await.expect("must succeed"); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - let digest = handle.close().await.expect("must succeed"); - assert_eq!(DIRECTORY_B.digest(), digest); - - // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. - let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); - assert_eq!( - DIRECTORY_B.clone(), - directories_it - .next() - .await - .expect("must be some") - .expect("must succeed") - ); - assert_eq!( - DIRECTORY_A.clone(), - directories_it - .next() - .await - .expect("must be some") - .expect("must succeed") - ); - - // Uploading B and then A should fail, because B refers to A, which - // hasn't been uploaded yet. - // However, the client can burst, so we might not have received the - // error back from the server. - { - let mut handle = directory_service.put_multiple_start(); - // sending out B will always be fine - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - - // whether we will be able to put A as well depends on whether we - // already received the error about B. - if handle.put(DIRECTORY_A.clone()).await.is_ok() { - // If we didn't, and this was Ok(_), … - // a subsequent close MUST fail (because it waits for the - // server) - handle.close().await.expect_err("must fail"); - } - } - - // Now we do the same test as before, send B, then A, but wait - // a long long time so we already received the error from the server - // (causing the internal stream to be closed). - // Uploading anything else subsequently should then fail. - { - let mut handle = directory_service.put_multiple_start(); - handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); - - // get a GRPCPutter, so we can peek at [is_closed]. - let handle_any = &mut handle as &mut dyn Any; - - // `unchecked_downcast_mut` is unstable for now, - // https://github.com/rust-lang/rust/issues/90850 - // We do the same thing here. - // The reason for why we cannot use the checked downcast lies - // in the fact that: - // - GRPCPutter has type ID A - // - Box<GRPCPutter> has type ID B - // - "Box<dyn GRPCPutter>" (invalid type) has type ID C - // B seems different from C in this context. - // We cannot unpack and perform upcast coercion of the traits as it's an unstable - // feature. - // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose - // of not making leak `is_closed` in the original trait. - let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box<GRPCPutter>) }; - let mut is_closed = false; - for _try in 1..1000 { - if handle.is_closed() { - is_closed = true; - break; - } - tokio::time::sleep(time::Duration::from_millis(10)).await; - } - - assert!( - is_closed, - "expected channel to eventually close, but never happened" - ); - - handle - .put(DIRECTORY_A.clone()) - .await - .expect_err("must fail"); - } - } - /// This ensures connecting via gRPC works as expected. #[tokio::test] async fn test_valid_unix_path_ping_pong() { diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index 5eb2d1919e80..cec49bb2c66a 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -180,7 +180,8 @@ async fn upload_reject_failing_validation(directory_service: impl DirectoryServi ); // Try to upload via put_multiple. We're a bit more permissive here, the - // intermediate .put() might succeed, but then the close MUST fail. + // intermediate .put() might succeed, due to client-side bursting (in the + // case of gRPC), but then the close MUST fail. let mut handle = directory_service.put_multiple_start(); if handle.put(broken_directory).await.is_ok() { assert!( diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs index 5c6975351b40..573581edbdd7 100644 --- a/tvix/castore/src/directoryservice/traverse.rs +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -87,14 +87,16 @@ where mod tests { use std::path::PathBuf; - use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}; - use crate::utils::gen_directory_service; + use crate::{ + directoryservice, + fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, + }; use super::descend_to; #[tokio::test] async fn test_descend_to() { - let directory_service = gen_directory_service(); + let directory_service = directoryservice::from_addr("memory://").await.unwrap(); let mut handle = directory_service.put_multiple_start(); handle diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index 1ce092135b83..1a7ac6b4b415 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -12,7 +12,6 @@ pub mod fs; pub mod import; pub mod proto; pub mod tonic; -pub mod utils; pub use digests::{B3Digest, B3_LEN}; pub use errors::Error; diff --git a/tvix/castore/src/proto/tests/grpc_blobservice.rs b/tvix/castore/src/proto/tests/grpc_blobservice.rs deleted file mode 100644 index fb202b7d8a51..000000000000 --- a/tvix/castore/src/proto/tests/grpc_blobservice.rs +++ /dev/null @@ -1,94 +0,0 @@ -use crate::fixtures::{BLOB_A, BLOB_A_DIGEST}; -use crate::proto::{BlobChunk, ReadBlobRequest, StatBlobRequest}; -use crate::utils::gen_blobsvc_grpc_client; -use tokio_stream::StreamExt; - -/// Trying to read a non-existent blob should return a not found error. -#[tokio::test] -async fn not_found_read() { - let mut grpc_client = gen_blobsvc_grpc_client().await; - - let resp = grpc_client - .read(ReadBlobRequest { - digest: BLOB_A_DIGEST.clone().into(), - }) - .await; - - // We can't use unwrap_err here, because the Ok value doesn't implement - // debug. - if let Err(e) = resp { - assert_eq!(e.code(), tonic::Code::NotFound); - } else { - panic!("resp is not err") - } -} - -/// Trying to stat a non-existent blob should return a not found error. -#[tokio::test] -async fn not_found_stat() { - let mut grpc_client = gen_blobsvc_grpc_client().await; - - let resp = grpc_client - .stat(StatBlobRequest { - digest: BLOB_A_DIGEST.clone().into(), - ..Default::default() - }) - .await - .expect_err("must fail"); - - // The resp should be a status with Code::NotFound - assert_eq!(resp.code(), tonic::Code::NotFound); -} - -/// Put a blob in the store, get it back. -#[tokio::test] -async fn put_read_stat() { - let mut grpc_client = gen_blobsvc_grpc_client().await; - - // Send blob A. - let put_resp = grpc_client - .put(tokio_stream::once(BlobChunk { - data: BLOB_A.clone(), - })) - .await - .expect("must succeed") - .into_inner(); - - assert_eq!(BLOB_A_DIGEST.as_slice(), put_resp.digest); - - // Stat for the digest of A. - // We currently don't ask for more granular chunking data, as we don't - // expose it yet. - let _resp = grpc_client - .stat(StatBlobRequest { - digest: BLOB_A_DIGEST.clone().into(), - ..Default::default() - }) - .await - .expect("must succeed") - .into_inner(); - - // Read the blob. It should return the same data. - let resp = grpc_client - .read(ReadBlobRequest { - digest: BLOB_A_DIGEST.clone().into(), - }) - .await; - - let mut rx = resp.ok().unwrap().into_inner(); - - // the stream should contain one element, a BlobChunk with the same contents as BLOB_A. - let item = rx - .next() - .await - .expect("must be some") - .expect("must succeed"); - - assert_eq!(BLOB_A.clone(), item.data); - - // … and no more elements - assert!(rx.next().await.is_none()); - - // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks. - // Test with some bigger blob too -} diff --git a/tvix/castore/src/proto/tests/grpc_directoryservice.rs b/tvix/castore/src/proto/tests/grpc_directoryservice.rs deleted file mode 100644 index dcd9a0ef010e..000000000000 --- a/tvix/castore/src/proto/tests/grpc_directoryservice.rs +++ /dev/null @@ -1,246 +0,0 @@ -use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; -use crate::proto::directory_service_client::DirectoryServiceClient; -use crate::proto::get_directory_request::ByWhat; -use crate::proto::GetDirectoryRequest; -use crate::proto::{Directory, DirectoryNode, SymlinkNode}; -use crate::utils::gen_directorysvc_grpc_client; -use tokio_stream::StreamExt; -use tonic::transport::Channel; -use tonic::Status; - -/// Send the specified GetDirectoryRequest. -/// Returns an error in the case of an error response, or an error in one of -/// the items in the stream, or a Vec<Directory> in the case of a successful -/// request. -async fn get_directories( - grpc_client: &mut DirectoryServiceClient<Channel>, - get_directory_request: GetDirectoryRequest, -) -> Result<Vec<Directory>, Status> { - let resp = grpc_client.get(get_directory_request).await; - - // if the response is an error itself, return the error, otherwise unpack - let stream = match resp { - Ok(resp) => resp, - Err(status) => return Err(status), - } - .into_inner(); - - let directory_results: Vec<Result<Directory, Status>> = stream.collect().await; - - // turn Vec<Result<Directory, Status> into Result<Vec<Directory>,Status> - directory_results.into_iter().collect() -} - -/// Trying to get a non-existent Directory should return a not found error. -#[tokio::test] -async fn not_found() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - let resp = grpc_client - .get(GetDirectoryRequest { - by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())), - ..Default::default() - }) - .await; - - let stream = resp.expect("must succeed").into_inner(); - - let items: Vec<_> = stream.collect().await; - - // The stream should contain one element, an error with Code::NotFound. - assert_eq!(1, items.len()); - let item = items[0].clone(); - - assert!(item.is_err(), "must be err"); - assert_eq!( - tonic::Code::NotFound, - item.unwrap_err().code(), - "must be err" - ); -} - -/// Put a Directory into the store, get it back. -#[tokio::test] -async fn put_get() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - // send directory A. - let put_resp = { - grpc_client - .put(tokio_stream::once(DIRECTORY_A.clone())) - .await - .expect("must succeed") - .into_inner() - }; - - // the sent root_digest should match the calculated digest - assert_eq!(put_resp.root_digest, DIRECTORY_A.digest().as_slice()); - - // get it back - let items = get_directories( - &mut grpc_client, - GetDirectoryRequest { - by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())), - ..Default::default() - }, - ) - .await - .expect("must not error"); - - assert_eq!(vec![DIRECTORY_A.clone()], items); -} - -/// Put multiple Directories into the store, and get them back -#[tokio::test] -async fn put_get_multiple() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - // sending "b" (which refers to "a") without sending "a" first should fail. - let put_resp = { - grpc_client - .put(tokio_stream::once(DIRECTORY_B.clone())) - .await - .expect_err("must fail") - }; - - assert_eq!(tonic::Code::InvalidArgument, put_resp.code()); - - // sending "a", then "b" should succeed, and the response should contain the digest of b. - let put_resp = { - grpc_client - .put(tokio_stream::iter(vec![ - DIRECTORY_A.clone(), - DIRECTORY_B.clone(), - ])) - .await - .expect("must succeed") - .into_inner() - }; - - assert_eq!(DIRECTORY_B.digest().as_slice(), put_resp.root_digest); - - // now, request b, first in non-recursive mode. - let items = get_directories( - &mut grpc_client, - GetDirectoryRequest { - recursive: false, - by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())), - }, - ) - .await - .expect("must not error"); - - // We expect to only get b. - assert_eq!(vec![DIRECTORY_B.clone()], items); - - // now, request b, but in recursive mode. - let items = get_directories( - &mut grpc_client, - GetDirectoryRequest { - recursive: true, - by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())), - }, - ) - .await - .expect("must not error"); - - // We expect to get b, and then a, because that's how we traverse down. - assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], items); -} - -/// Put multiple Directories into the store, and omit duplicates. -#[tokio::test] -async fn put_get_dedup() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - // Send "A", then "C", which refers to "A" two times - // Pretend we're a dumb client sending A twice. - let put_resp = { - grpc_client - .put(tokio_stream::iter(vec![ - DIRECTORY_A.clone(), - DIRECTORY_A.clone(), - DIRECTORY_C.clone(), - ])) - .await - .expect("must succeed") - }; - - assert_eq!( - DIRECTORY_C.digest().as_slice(), - put_resp.into_inner().root_digest - ); - - // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice. - let items = get_directories( - &mut grpc_client, - GetDirectoryRequest { - recursive: true, - by_what: Some(ByWhat::Digest(DIRECTORY_C.digest().into())), - }, - ) - .await - .expect("must not error"); - - // We expect to get C, and then A (once, as the second A has been deduplicated). - assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items); -} - -/// Trying to upload a Directory failing validation should fail. -#[tokio::test] -async fn put_reject_failed_validation() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - // construct a broken Directory message that fails validation - let broken_directory = Directory { - symlinks: vec![SymlinkNode { - name: "".into(), - target: "doesntmatter".into(), - }], - ..Default::default() - }; - assert!(broken_directory.validate().is_err()); - - // send it over, it must fail - let put_resp = { - grpc_client - .put(tokio_stream::once(broken_directory)) - .await - .expect_err("must fail") - }; - - assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); -} - -/// Trying to upload a Directory with wrong size should fail. -#[tokio::test] -async fn put_reject_wrong_size() { - let mut grpc_client = gen_directorysvc_grpc_client().await; - - // Construct a directory referring to DIRECTORY_A, but with wrong size. - let broken_parent_directory = Directory { - directories: vec![DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: 42, - }], - ..Default::default() - }; - // Make sure we got the size wrong. - assert_ne!( - broken_parent_directory.directories[0].size, - DIRECTORY_A.size() - ); - - // now upload both (first A, then the broken parent). This must fail. - let put_resp = { - grpc_client - .put(tokio_stream::iter(vec![ - DIRECTORY_A.clone(), - broken_parent_directory, - ])) - .await - .expect_err("must fail") - }; - assert_eq!(put_resp.code(), tonic::Code::InvalidArgument); -} diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs index 8b62fadeb5a6..8d903bacb6c5 100644 --- a/tvix/castore/src/proto/tests/mod.rs +++ b/tvix/castore/src/proto/tests/mod.rs @@ -1,4 +1,2 @@ mod directory; mod directory_nodes_iterator; -mod grpc_blobservice; -mod grpc_directoryservice; diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs index 99e993f36da3..b44b71cd784d 100644 --- a/tvix/castore/src/tests/import.rs +++ b/tvix/castore/src/tests/import.rs @@ -1,8 +1,9 @@ -use crate::blobservice::BlobService; +use crate::blobservice::{self, BlobService}; +use crate::directoryservice; use crate::fixtures::*; use crate::import::ingest_path; use crate::proto; -use crate::utils::{gen_blob_service, gen_directory_service}; + use std::sync::Arc; use tempfile::TempDir; @@ -12,8 +13,8 @@ use std::os::unix::ffi::OsStrExt; #[cfg(target_family = "unix")] #[tokio::test] async fn symlink() { - let blob_service = gen_blob_service(); - let directory_service = gen_directory_service(); + let blob_service = blobservice::from_addr("memory://").await.unwrap(); + let directory_service = directoryservice::from_addr("memory://").await.unwrap(); let tmpdir = TempDir::new().unwrap(); @@ -43,8 +44,9 @@ async fn symlink() { #[tokio::test] async fn single_file() { - let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); - let directory_service = gen_directory_service(); + let blob_service = + Arc::from(blobservice::from_addr("memory://").await.unwrap()) as Arc<dyn BlobService>; + let directory_service = directoryservice::from_addr("memory://").await.unwrap(); let tmpdir = TempDir::new().unwrap(); @@ -75,8 +77,9 @@ async fn single_file() { #[cfg(target_family = "unix")] #[tokio::test] async fn complicated() { - let blob_service: Arc<dyn BlobService> = gen_blob_service().into(); - let directory_service = gen_directory_service(); + let blob_service = + Arc::from(blobservice::from_addr("memory://").await.unwrap()) as Arc<dyn BlobService>; + let directory_service = directoryservice::from_addr("memory://").await.unwrap(); let tmpdir = TempDir::new().unwrap(); diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs deleted file mode 100644 index ca68f9f26c5a..000000000000 --- a/tvix/castore/src/utils.rs +++ /dev/null @@ -1,89 +0,0 @@ -//! A crate containing constructors to provide instances of a BlobService and -//! DirectoryService. Only used for testing purposes, but across crates. -//! Should be removed once we have a better concept of a "Service registry". -use tonic::transport::{Channel, Endpoint, Server, Uri}; - -use crate::{ - blobservice::{BlobService, MemoryBlobService}, - directoryservice::{DirectoryService, MemoryDirectoryService}, - proto::{ - blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer, - directory_service_client::DirectoryServiceClient, - directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper, - GRPCDirectoryServiceWrapper, - }, -}; - -pub fn gen_blob_service() -> Box<dyn BlobService> { - Box::<MemoryBlobService>::default() -} - -pub fn gen_directory_service() -> Box<dyn DirectoryService> { - Box::<MemoryDirectoryService>::default() -} - -/// This will spawn the a gRPC server with a DirectoryService client, connect a -/// gRPC DirectoryService client and return it. -#[allow(dead_code)] -pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> { - let (left, right) = tokio::io::duplex(64); - - // spin up a server, which will only connect once, to the left side. - tokio::spawn(async { - // spin up a new DirectoryService - let mut server = Server::builder(); - let router = server.add_service(DirectoryServiceServer::new( - GRPCDirectoryServiceWrapper::new(gen_directory_service()), - )); - - router - .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) - .await - }); - - // Create a client, connecting to the right side. The URI is unused. - let mut maybe_right = Some(right); - DirectoryServiceClient::new( - Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(tower::service_fn(move |_: Uri| { - let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(right) } - })) - .await - .unwrap(), - ) -} - -/// This will spawn the a gRPC server with a BlobService client, connect a -/// gRPC BlobService client and return it. -#[allow(dead_code)] -pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> { - let (left, right) = tokio::io::duplex(64); - - // spin up a server, which will only connect once, to the left side. - tokio::spawn(async { - // spin up a new DirectoryService - let mut server = Server::builder(); - let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( - gen_blob_service(), - ))); - - router - .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) - .await - }); - - // Create a client, connecting to the right side. The URI is unused. - let mut maybe_right = Some(right); - BlobServiceClient::new( - Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(tower::service_fn(move |_: Uri| { - let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(right) } - })) - .await - .unwrap(), - ) -} |