1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
//! A crate containing constructors to provide instances of a BlobService and
//! DirectoryService. Only used for testing purposes, but across crates.
//! Should be removed once we have a better concept of a "Service registry".
use tonic::transport::{Channel, Endpoint, Server, Uri};
use crate::{
blobservice::{BlobService, MemoryBlobService},
directoryservice::{DirectoryService, MemoryDirectoryService},
proto::{
blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer,
directory_service_client::DirectoryServiceClient,
directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper,
GRPCDirectoryServiceWrapper,
},
};
pub fn gen_blob_service() -> Box<dyn BlobService> {
Box::<MemoryBlobService>::default()
}
pub fn gen_directory_service() -> Box<dyn DirectoryService> {
Box::<MemoryDirectoryService>::default()
}
/// This will spawn the a gRPC server with a DirectoryService client, connect a
/// gRPC DirectoryService client and return it.
#[allow(dead_code)]
pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> {
let (left, right) = tokio::io::duplex(64);
// spin up a server, which will only connect once, to the left side.
tokio::spawn(async {
// spin up a new DirectoryService
let mut server = Server::builder();
let router = server.add_service(DirectoryServiceServer::new(
GRPCDirectoryServiceWrapper::new(gen_directory_service()),
));
router
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
.await
});
// Create a client, connecting to the right side. The URI is unused.
let mut maybe_right = Some(right);
DirectoryServiceClient::new(
Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(tower::service_fn(move |_: Uri| {
let right = maybe_right.take().unwrap();
async move { Ok::<_, std::io::Error>(right) }
}))
.await
.unwrap(),
)
}
/// This will spawn the a gRPC server with a BlobService client, connect a
/// gRPC BlobService client and return it.
#[allow(dead_code)]
pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> {
let (left, right) = tokio::io::duplex(64);
// spin up a server, which will only connect once, to the left side.
tokio::spawn(async {
// spin up a new DirectoryService
let mut server = Server::builder();
let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
gen_blob_service(),
)));
router
.serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
.await
});
// Create a client, connecting to the right side. The URI is unused.
let mut maybe_right = Some(right);
BlobServiceClient::new(
Endpoint::try_from("http://[::]:50051")
.unwrap()
.connect_with_connector(tower::service_fn(move |_: Uri| {
let right = maybe_right.take().unwrap();
async move { Ok::<_, std::io::Error>(right) }
}))
.await
.unwrap(),
)
}
|