about summary refs log tree commit diff
path: root/tvix/castore/src/utils.rs
blob: 853ff4876e372dcddb23988351f43f8a480e3999 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
//! A crate containing constructors to provide instances of a BlobService and
//! DirectoryService.
//! Only used for testing purposes, but across crates.
//! Should be removed once we have a better concept of a "Service registry".

use core::time;
use std::{path::Path, sync::Arc, thread};

use tokio::net::{UnixListener, UnixStream};
use tokio_stream::wrappers::UnixListenerStream;
use tonic::transport::{Channel, Endpoint, Server, Uri};

use crate::{
    blobservice::{BlobService, MemoryBlobService},
    directoryservice::{DirectoryService, MemoryDirectoryService},
    proto::{
        directory_service_client::DirectoryServiceClient,
        directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper,
    },
};

pub fn gen_blob_service() -> Arc<dyn BlobService> {
    Arc::new(MemoryBlobService::default())
}

pub fn gen_directory_service() -> Arc<dyn DirectoryService> {
    Arc::new(MemoryDirectoryService::default())
}

/// This will spawn a separate thread, with its own tokio runtime, and start a gRPC server there.
/// Once it's listening, it'll start a gRPC client from the original thread, and return it.
/// FUTUREWORK: accept a closure to create the service, so we can test this with different ones.
#[allow(dead_code)]
pub(crate) async fn gen_directorysvc_grpc_client(tmpdir: &Path) -> DirectoryServiceClient<Channel> {
    let socket_path = tmpdir.join("socket");

    // Spin up a server, in a thread far away, which spawns its own tokio runtime,
    // and blocks on the task.
    let socket_path_clone = socket_path.clone();
    thread::spawn(move || {
        // Create the runtime
        let rt = tokio::runtime::Runtime::new().unwrap();
        // Get a handle from this runtime
        let handle = rt.handle();

        let task = handle.spawn(async {
            let uds = UnixListener::bind(socket_path_clone).unwrap();
            let uds_stream = UnixListenerStream::new(uds);

            // spin up a new DirectoryService
            let mut server = Server::builder();
            let router = server.add_service(DirectoryServiceServer::new(
                GRPCDirectoryServiceWrapper::from(gen_directory_service()),
            ));
            router.serve_with_incoming(uds_stream).await
        });

        handle.block_on(task)
    });

    // wait for the socket to be created
    // TODO: pass around FDs instead?
    {
        let mut socket_created = false;
        for _try in 1..20 {
            if socket_path.exists() {
                socket_created = true;
                break;
            }
            tokio::time::sleep(time::Duration::from_millis(20)).await;
        }

        assert!(
            socket_created,
            "expected socket path to eventually get created, but never happened"
        );
    }

    // Create a channel, connecting to the uds at socket_path.
    // The URI is unused.
    let channel = Endpoint::try_from("http://[::]:50051")
        .unwrap()
        .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
            UnixStream::connect(socket_path.clone())
        }));

    let grpc_client = DirectoryServiceClient::new(channel);

    grpc_client
}