about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice/grpc.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-01-09T13·00+0200
committerclbot <clbot@tvl.fyi>2024-01-09T17·31+0000
commit719cbad871b5c67ca2f52a6cda342e788497549f (patch)
tree95542dd2f409b5b40db7974a343945c99b73186c /tvix/castore/src/blobservice/grpc.rs
parent9596c5caff78cfdd702f0267ff44b8b68f2a8a65 (diff)
feat(tvix/castore/blobsvc): add chunks method r/7365
This adds support to retrieve a list of chunks for a given blob to the
BlobService interface.

While theoretically all chunk-awareness could be kept private inside
each BlobService reader, we'd not be able to resolve individual chunks
from different Blobservices - and due to this, not able to substitute
chunks we already have in a more local store.

This function allows asking a BlobService for the list of chunks,
leaving any actual fetching up to the caller (be it through individual
calls to open_read), or asking another store for it.

Change-Id: I1d33c591195ed494be3aec71a8c804743cbe0dca
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10586
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/blobservice/grpc.rs')
-rw-r--r--tvix/castore/src/blobservice/grpc.rs32
1 files changed, 30 insertions, 2 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 183ebb76e1..acc0125c82 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -1,5 +1,8 @@
 use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
-use crate::{proto, B3Digest};
+use crate::{
+    proto::{self, stat_blob_response::ChunkMeta},
+    B3Digest,
+};
 use futures::sink::SinkExt;
 use std::{io, pin::pin, task::Poll};
 use tokio::io::AsyncWriteExt;
@@ -10,7 +13,7 @@ use tokio_util::{
     sync::PollSender,
 };
 use tonic::{async_trait, transport::Channel, Code, Status};
-use tracing::instrument;
+use tracing::{instrument, warn};
 
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
@@ -108,6 +111,31 @@ impl BlobService for GRPCBlobService {
             digest: None,
         })
     }
+
+    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
+        let resp = self
+            .grpc_client
+            .clone()
+            .stat(proto::StatBlobRequest {
+                digest: digest.clone().into(),
+                send_chunks: true,
+                ..Default::default()
+            })
+            .await;
+
+        match resp {
+            Err(e) if e.code() == Code::NotFound => Ok(None),
+            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+            Ok(resp) => {
+                let resp = resp.into_inner();
+                if resp.chunks.is_empty() {
+                    warn!("chunk list is empty");
+                }
+                Ok(Some(resp.chunks))
+            }
+        }
+    }
 }
 
 pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> {