diff options
Diffstat (limited to 'tvix/castore/src/blobservice/grpc.rs')
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 71 |
1 files changed, 55 insertions, 16 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 5663cd3838ec..0db3dfea4ad8 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -1,4 +1,5 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{ proto::{self, stat_blob_response::ChunkMeta}, B3Digest, @@ -17,40 +18,43 @@ use tokio_util::{ io::{CopyToBytes, SinkWriter}, sync::PollSender, }; -use tonic::{async_trait, transport::Channel, Code, Status}; -use tracing::instrument; +use tonic::{async_trait, Code, Status}; +use tracing::{instrument, Instrument as _}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] -pub struct GRPCBlobService { +pub struct GRPCBlobService<T> { /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + grpc_client: proto::blob_service_client::BlobServiceClient<T>, } -impl GRPCBlobService { +impl<T> GRPCBlobService<T> { /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. - /// panics if called outside the context of a tokio runtime. - pub fn from_client( - grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, - ) -> Self { + pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self { Self { grpc_client } } } #[async_trait] -impl BlobService for GRPCBlobService { +impl<T> BlobService for GRPCBlobService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ #[instrument(skip(self, digest), fields(blob.digest=%digest))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { - let mut grpc_client = self.grpc_client.clone(); - let resp = grpc_client + match self + .grpc_client + .clone() .stat(proto::StatBlobRequest { digest: digest.clone().into(), ..Default::default() }) - .await; - - match resp { + .await + { Ok(_blob_meta) => Ok(true), Err(e) if e.code() == Code::NotFound => Ok(false), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), @@ -133,6 +137,8 @@ impl BlobService for GRPCBlobService { let task = tokio::spawn({ let mut grpc_client = self.grpc_client.clone(); async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) } + // instrument the task with the current span, this is not done by default + .in_current_span() }); // The tx part of the channel is converted to a sink of byte chunks. @@ -175,6 +181,40 @@ impl BlobService for GRPCBlobService { } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCBlobServiceConfig { + url: String, +} + +impl TryFrom<url::Url> for GRPCBlobServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + Ok(GRPCBlobServiceConfig { + url: url.to_string(), + }) + } +} + +#[async_trait] +impl ServiceBuilder for GRPCBlobServiceConfig { + type Output = dyn BlobService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let client = proto::blob_service_client::BlobServiceClient::new( + crate::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCBlobService::from_client(client))) + } +} + pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> { /// The task containing the put request, and the inner writer, if we're still writing. task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>, @@ -335,7 +375,6 @@ mod tests { .await .expect("must succeed"), ); - GRPCBlobService::from_client(client) }; |