diff options
Diffstat (limited to 'tvix/store/src/directoryservice/grpc.rs')
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 56 |
1 files changed, 55 insertions, 1 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index 913dd5bad93f..cff3077c7e08 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -3,6 +3,7 @@ use std::collections::HashSet; use super::{DirectoryPutter, DirectoryService}; use crate::proto::{self, get_directory_request::ByWhat}; use crate::{B3Digest, Error}; +use tokio::net::UnixStream; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{transport::Channel, Status}; @@ -21,7 +22,7 @@ pub struct GRPCDirectoryService { } impl GRPCDirectoryService { - /// construct a [GRPCDirectoryService] from a [proto::blob_service_client::BlobServiceClient<Channel>]. + /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient<Channel>]. /// panics if called outside the context of a tokio runtime. pub fn from_client( grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, @@ -34,6 +35,59 @@ impl GRPCDirectoryService { } impl DirectoryService for GRPCDirectoryService { + /// Constructs a [GRPCDirectoryService] from the passed [url::Url]: + /// - scheme has to match `grpc+*://`. + /// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + /// - In the case of unix sockets, there must be a path, but may not be a host. + /// - In the case of non-unix sockets, there must be a host, but no path. + fn from_url(url: &url::Url) -> Result<Self, crate::Error> { + // Start checking for the scheme to start with grpc+. + match url.scheme().strip_prefix("grpc+") { + None => Err(crate::Error::StorageError("invalid scheme".to_string())), + Some(rest) => { + if rest == "unix" { + if url.host_str().is_some() { + return Err(crate::Error::StorageError( + "host may not be set".to_string(), + )); + } + let path = url.path().to_string(); + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter + .unwrap() + .connect_with_connector_lazy(tower::service_fn( + move |_: tonic::transport::Uri| UnixStream::connect(path.clone()), + )); + let grpc_client = + proto::directory_service_client::DirectoryServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } else { + // ensure path is empty, not supported with gRPC. + if !url.path().is_empty() { + return Err(crate::Error::StorageError( + "path may not be set".to_string(), + )); + } + + // clone the uri, and drop the grpc+ from the scheme. + // Recreate a new uri with the `grpc+` prefix dropped from the scheme. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let url = { + let url_str = url.to_string(); + let s_stripped = url_str.strip_prefix("grpc+").unwrap(); + url::Url::parse(s_stripped).unwrap() + }; + let channel = tonic::transport::Endpoint::try_from(url.to_string()) + .unwrap() + .connect_lazy(); + + let grpc_client = + proto::directory_service_client::DirectoryServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } + } + } + } fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); |