about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice/grpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/blobservice/grpc.rs')
-rw-r--r--tvix/castore/src/blobservice/grpc.rs71
1 files changed, 55 insertions, 16 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 5663cd3838ec..0db3dfea4ad8 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -1,4 +1,5 @@
 use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{
     proto::{self, stat_blob_response::ChunkMeta},
     B3Digest,
@@ -17,40 +18,43 @@ use tokio_util::{
     io::{CopyToBytes, SinkWriter},
     sync::PollSender,
 };
-use tonic::{async_trait, transport::Channel, Code, Status};
-use tracing::instrument;
+use tonic::{async_trait, Code, Status};
+use tracing::{instrument, Instrument as _};
 
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
-pub struct GRPCBlobService {
+pub struct GRPCBlobService<T> {
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
-    grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
+    grpc_client: proto::blob_service_client::BlobServiceClient<T>,
 }
 
-impl GRPCBlobService {
+impl<T> GRPCBlobService<T> {
     /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient].
-    /// panics if called outside the context of a tokio runtime.
-    pub fn from_client(
-        grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
-    ) -> Self {
+    pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self {
         Self { grpc_client }
     }
 }
 
 #[async_trait]
-impl BlobService for GRPCBlobService {
+impl<T> BlobService for GRPCBlobService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
+    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
+    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
+    T::Future: Send,
+{
     #[instrument(skip(self, digest), fields(blob.digest=%digest))]
     async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
-        let mut grpc_client = self.grpc_client.clone();
-        let resp = grpc_client
+        match self
+            .grpc_client
+            .clone()
             .stat(proto::StatBlobRequest {
                 digest: digest.clone().into(),
                 ..Default::default()
             })
-            .await;
-
-        match resp {
+            .await
+        {
             Ok(_blob_meta) => Ok(true),
             Err(e) if e.code() == Code::NotFound => Ok(false),
             Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
@@ -133,6 +137,8 @@ impl BlobService for GRPCBlobService {
         let task = tokio::spawn({
             let mut grpc_client = self.grpc_client.clone();
             async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) }
+                // instrument the task with the current span, this is not done by default
+                .in_current_span()
         });
 
         // The tx part of the channel is converted to a sink of byte chunks.
@@ -175,6 +181,40 @@ impl BlobService for GRPCBlobService {
     }
 }
 
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct GRPCBlobServiceConfig {
+    url: String,
+}
+
+impl TryFrom<url::Url> for GRPCBlobServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        //   normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
+        // - In the case of unix sockets, there must be a path, but may not be a host.
+        // - In the case of non-unix sockets, there must be a host, but no path.
+        // Constructing the channel is handled by tvix_castore::channel::from_url.
+        Ok(GRPCBlobServiceConfig {
+            url: url.to_string(),
+        })
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for GRPCBlobServiceConfig {
+    type Output = dyn BlobService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let client = proto::blob_service_client::BlobServiceClient::new(
+            crate::tonic::channel_from_url(&self.url.parse()?).await?,
+        );
+        Ok(Arc::new(GRPCBlobService::from_client(client)))
+    }
+}
+
 pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> {
     /// The task containing the put request, and the inner writer, if we're still writing.
     task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>,
@@ -335,7 +375,6 @@ mod tests {
                     .await
                     .expect("must succeed"),
             );
-
             GRPCBlobService::from_client(client)
         };