about summary refs log tree commit diff
path: root/tvix/castore/src/utils.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/utils.rs')
-rw-r--r--tvix/castore/src/utils.rs107
1 files changed, 107 insertions, 0 deletions
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs
new file mode 100644
index 000000000000..d9a967598841
--- /dev/null
+++ b/tvix/castore/src/utils.rs
@@ -0,0 +1,107 @@
+//! A crate containing constructors to provide instances of a BlobService and
+//! DirectoryService.
+//! Only used for testing purposes, but across crates.
+//! Should be removed once we have a better concept of a "Service registry".
+
+use pin_project_lite::pin_project;
+use std::sync::Arc;
+use tokio::io::DuplexStream;
+use tonic::transport::{Channel, Endpoint, Server, Uri};
+
+use crate::{
+    blobservice::{BlobService, MemoryBlobService},
+    directoryservice::{DirectoryService, MemoryDirectoryService},
+    proto::{
+        blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer,
+        directory_service_client::DirectoryServiceClient,
+        directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper,
+        GRPCDirectoryServiceWrapper,
+    },
+};
+
+pub fn gen_blob_service() -> Arc<dyn BlobService> {
+    Arc::new(MemoryBlobService::default())
+}
+
+pub fn gen_directory_service() -> Arc<dyn DirectoryService> {
+    Arc::new(MemoryDirectoryService::default())
+}
+
+pin_project! {
+    /// A wrapper around [DuplexStreamStream],
+    /// implementing [AsyncRead] and [Connected].
+    pub struct DuplexStreamWrapper {
+            #[pin]
+        inner: DuplexStream
+    }
+}
+
+/// This will spawn the a gRPC server with a DirectoryService client, connect a
+/// gRPC DirectoryService client and return it.
+#[allow(dead_code)]
+pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> {
+    let (left, right) = tokio::io::duplex(64);
+
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn(async {
+        // spin up a new DirectoryService
+        let mut server = Server::builder();
+        let router = server.add_service(DirectoryServiceServer::new(
+            GRPCDirectoryServiceWrapper::from(gen_directory_service()),
+        ));
+
+        router
+            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+            .await
+    });
+
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+    let grpc_client = DirectoryServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
+    );
+
+    grpc_client
+}
+
+/// This will spawn the a gRPC server with a BlobService client, connect a
+/// gRPC BlobService client and return it.
+#[allow(dead_code)]
+pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> {
+    let (left, right) = tokio::io::duplex(64);
+
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn(async {
+        // spin up a new DirectoryService
+        let mut server = Server::builder();
+        let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from(
+            gen_blob_service(),
+        )));
+
+        router
+            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+            .await
+    });
+
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+    let grpc_client = BlobServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
+    );
+
+    grpc_client
+}