about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/src/blobservice/grpc.rs32
-rw-r--r--tvix/castore/src/blobservice/mod.rs17
2 files changed, 47 insertions, 2 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 183ebb76e10c..acc0125c82ed 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -1,5 +1,8 @@
 use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
-use crate::{proto, B3Digest};
+use crate::{
+    proto::{self, stat_blob_response::ChunkMeta},
+    B3Digest,
+};
 use futures::sink::SinkExt;
 use std::{io, pin::pin, task::Poll};
 use tokio::io::AsyncWriteExt;
@@ -10,7 +13,7 @@ use tokio_util::{
     sync::PollSender,
 };
 use tonic::{async_trait, transport::Channel, Code, Status};
-use tracing::instrument;
+use tracing::{instrument, warn};
 
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
@@ -108,6 +111,31 @@ impl BlobService for GRPCBlobService {
             digest: None,
         })
     }
+
+    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
+        let resp = self
+            .grpc_client
+            .clone()
+            .stat(proto::StatBlobRequest {
+                digest: digest.clone().into(),
+                send_chunks: true,
+                ..Default::default()
+            })
+            .await;
+
+        match resp {
+            Err(e) if e.code() == Code::NotFound => Ok(None),
+            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+            Ok(resp) => {
+                let resp = resp.into_inner();
+                if resp.chunks.is_empty() {
+                    warn!("chunk list is empty");
+                }
+                Ok(Some(resp.chunks))
+            }
+        }
+    }
 }
 
 pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> {
diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs
index d024121eaa39..197b51b1d9e1 100644
--- a/tvix/castore/src/blobservice/mod.rs
+++ b/tvix/castore/src/blobservice/mod.rs
@@ -1,6 +1,7 @@
 use std::io;
 use tonic::async_trait;
 
+use crate::proto::stat_blob_response::ChunkMeta;
 use crate::B3Digest;
 
 mod from_addr;
@@ -35,6 +36,22 @@ pub trait BlobService: Send + Sync {
     /// Insert a new blob into the store. Returns a [BlobWriter], which
     /// implements [io::Write] and a [BlobWriter::close].
     async fn open_write(&self) -> Box<dyn BlobWriter>;
+
+    /// Return a list of chunks for a given blob.
+    /// There's a distinction between returning Ok(None) and Ok(Some(vec![])).
+    /// The former return value is sent in case the blob is not present at all,
+    /// while the second one is sent in case there's no more granular chunks (or
+    /// the backend does not support chunking).
+    /// A default implementation signalling the backend does not support
+    /// chunking is provided.
+    async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
+        if !self.has(digest).await? {
+            return Ok(None);
+        } else {
+            // default implementation, signalling the backend does not support chunking.
+            return Ok(Some(vec![]));
+        }
+    }
 }
 
 /// A [tokio::io::AsyncWrite] that you need to close() afterwards, and get back