about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice/grpc.rs
blob: 0e4cbb758c0568fa2e447feb59706161163596db (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use super::PathInfoService;
use crate::proto;
use tonic::{transport::Channel, Code, Status};

/// Connects to a (remote) tvix-store PathInfoService over gRPC.
#[derive(Clone)]
pub struct GRPCPathInfoService {
    /// A handle into the active tokio runtime. Necessary to spawn tasks.
    tokio_handle: tokio::runtime::Handle,

    /// The internal reference to a gRPC client.
    /// Cloning it is cheap, and it internally handles concurrent requests.
    grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
}

impl GRPCPathInfoService {
    /// Construct a new [GRPCPathInfoService], by passing a handle to the tokio
    /// runtime, and a gRPC client.
    pub fn new(
        tokio_handle: tokio::runtime::Handle,
        grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
    ) -> Self {
        Self {
            tokio_handle,
            grpc_client,
        }
    }
}

impl PathInfoService for GRPCPathInfoService {
    fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> {
        // Get a new handle to the gRPC client.
        let mut grpc_client = self.grpc_client.clone();

        let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> =
            self.tokio_handle.spawn(async move {
                let path_info = grpc_client
                    .get(proto::GetPathInfoRequest {
                        by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
                            digest.to_vec(),
                        )),
                    })
                    .await?
                    .into_inner();

                Ok(path_info)
            });

        match self.tokio_handle.block_on(task)? {
            Ok(path_info) => Ok(Some(path_info)),
            Err(e) if e.code() == Code::NotFound => Ok(None),
            Err(e) => Err(crate::Error::StorageError(e.to_string())),
        }
    }

    fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> {
        // Get a new handle to the gRPC client.
        let mut grpc_client = self.grpc_client.clone();

        let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> =
            self.tokio_handle.spawn(async move {
                let path_info = grpc_client.put(path_info).await?.into_inner();
                Ok(path_info)
            });

        self.tokio_handle
            .block_on(task)?
            .map_err(|e| crate::Error::StorageError(e.to_string()))
    }
}