about summary refs log blame commit diff
path: root/tvix/castore/src/utils.rs
blob: ad0051ff9732df179d7d393799b903efe63e2a6a (plain) (tree)
1
2
3
4
5
6
7
8
9




                                                                             


                                  
                                                       



                                                                 
            
                                                                                       
                                                         

                                                                                 
      








                                                             
 







                                                 
 

                                                                               


                                                                                       
 






                                                                        
 


                                                                                   

       

                                                                        
                                







                                                                     
     
 





















                                                                                            
                           







                                                                     
     
 
//! A crate containing constructors to provide instances of a BlobService and
//! DirectoryService.
//! Only used for testing purposes, but across crates.
//! Should be removed once we have a better concept of a "Service registry".

use pin_project_lite::pin_project;
use std::sync::Arc;
use tokio::io::DuplexStream;
use tonic::transport::{Channel, Endpoint, Server, Uri};

use crate::{
    blobservice::{BlobService, MemoryBlobService},
    directoryservice::{DirectoryService, MemoryDirectoryService},
    proto::{
        blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer,
        directory_service_client::DirectoryServiceClient,
        directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper,
        GRPCDirectoryServiceWrapper,
    },
};

pub fn gen_blob_service() -> Arc<dyn BlobService> {
    Arc::new(MemoryBlobService::default())
}

pub fn gen_directory_service() -> Arc<dyn DirectoryService> {
    Arc::new(MemoryDirectoryService::default())
}

pin_project! {
    /// A wrapper around [DuplexStreamStream],
    /// implementing [AsyncRead] and [Connected].
    pub struct DuplexStreamWrapper {
            #[pin]
        inner: DuplexStream
    }
}

/// This will spawn the a gRPC server with a DirectoryService client, connect a
/// gRPC DirectoryService client and return it.
#[allow(dead_code)]
pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> {
    let (left, right) = tokio::io::duplex(64);

    // spin up a server, which will only connect once, to the left side.
    tokio::spawn(async {
        // spin up a new DirectoryService
        let mut server = Server::builder();
        let router = server.add_service(DirectoryServiceServer::new(
            GRPCDirectoryServiceWrapper::from(gen_directory_service()),
        ));

        router
            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
            .await
    });

    // Create a client, connecting to the right side. The URI is unused.
    let mut maybe_right = Some(right);
    DirectoryServiceClient::new(
        Endpoint::try_from("http://[::]:50051")
            .unwrap()
            .connect_with_connector(tower::service_fn(move |_: Uri| {
                let right = maybe_right.take().unwrap();
                async move { Ok::<_, std::io::Error>(right) }
            }))
            .await
            .unwrap(),
    )
}

/// This will spawn the a gRPC server with a BlobService client, connect a
/// gRPC BlobService client and return it.
#[allow(dead_code)]
pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> {
    let (left, right) = tokio::io::duplex(64);

    // spin up a server, which will only connect once, to the left side.
    tokio::spawn(async {
        // spin up a new DirectoryService
        let mut server = Server::builder();
        let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from(
            gen_blob_service(),
        )));

        router
            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
            .await
    });

    // Create a client, connecting to the right side. The URI is unused.
    let mut maybe_right = Some(right);
    BlobServiceClient::new(
        Endpoint::try_from("http://[::]:50051")
            .unwrap()
            .connect_with_connector(tower::service_fn(move |_: Uri| {
                let right = maybe_right.take().unwrap();
                async move { Ok::<_, std::io::Error>(right) }
            }))
            .await
            .unwrap(),
    )
}