use super::PathInfoService;
use crate::proto;
use tonic::{transport::Channel, Code, Status};
/// Connects to a (remote) tvix-store PathInfoService over gRPC.
#[derive(Clone)]
pub struct GRPCPathInfoService {
/// A handle into the active tokio runtime. Necessary to spawn tasks.
tokio_handle: tokio::runtime::Handle,
/// The internal reference to a gRPC client.
/// Cloning it is cheap, and it internally handles concurrent requests.
grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
}
impl GRPCPathInfoService {
/// Construct a new [GRPCPathInfoService], by passing a handle to the tokio
/// runtime, and a gRPC client.
pub fn new(
tokio_handle: tokio::runtime::Handle,
grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
) -> Self {
Self {
tokio_handle,
grpc_client,
}
}
/// construct a [GRPCDirectoryService] from a [proto::path_info_service_client::PathInfoServiceClient<Channel>].
/// panics if called outside the context of a tokio runtime.
pub fn from_client(
grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>,
) -> Self {
Self {
tokio_handle: tokio::runtime::Handle::current(),
grpc_client,
}
}
}
impl PathInfoService for GRPCPathInfoService {
fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> {
// Get a new handle to the gRPC client.
let mut grpc_client = self.grpc_client.clone();
let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> =
self.tokio_handle.spawn(async move {
let path_info = grpc_client
.get(proto::GetPathInfoRequest {
by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash(
digest.to_vec(),
)),
})
.await?
.into_inner();
Ok(path_info)
});
match self.tokio_handle.block_on(task)? {
Ok(path_info) => Ok(Some(path_info)),
Err(e) if e.code() == Code::NotFound => Ok(None),
Err(e) => Err(crate::Error::StorageError(e.to_string())),
}
}
fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> {
// Get a new handle to the gRPC client.
let mut grpc_client = self.grpc_client.clone();
let task: tokio::task::JoinHandle<Result<proto::PathInfo, Status>> =
self.tokio_handle.spawn(async move {
let path_info = grpc_client.put(path_info).await?.into_inner();
Ok(path_info)
});
self.tokio_handle
.block_on(task)?
.map_err(|e| crate::Error::StorageError(e.to_string()))
}
fn calculate_nar(
&self,
root_node: &proto::node::Node,
) -> Result<(u64, [u8; 32]), crate::Error> {
// Get a new handle to the gRPC client.
let mut grpc_client = self.grpc_client.clone();
let root_node = root_node.clone();
let task: tokio::task::JoinHandle<Result<_, Status>> =
self.tokio_handle.spawn(async move {
let path_info = grpc_client
.calculate_nar(proto::Node {
node: Some(root_node),
})
.await?
.into_inner();
Ok(path_info)
});
let resp = self
.tokio_handle
.block_on(task)?
.map_err(|e| crate::Error::StorageError(e.to_string()))?;
let nar_sha256: [u8; 32] = resp
.nar_sha256
.try_into()
.map_err(|_e| crate::Error::StorageError("invalid digest length".to_string()))?;
Ok((resp.nar_size, nar_sha256))
}
}