From 719cbad871b5c67ca2f52a6cda342e788497549f Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 9 Jan 2024 15:00:01 +0200 Subject: feat(tvix/castore/blobsvc): add chunks method This adds support to retrieve a list of chunks for a given blob to the BlobService interface. While theoretically all chunk-awareness could be kept private inside each BlobService reader, we'd not be able to resolve individual chunks from different Blobservices - and due to this, not able to substitute chunks we already have in a more local store. This function allows asking a BlobService for the list of chunks, leaving any actual fetching up to the caller (be it through individual calls to open_read), or asking another store for it. Change-Id: I1d33c591195ed494be3aec71a8c804743cbe0dca Reviewed-on: https://cl.tvl.fyi/c/depot/+/10586 Autosubmit: flokli Reviewed-by: raitobezarius Tested-by: BuildkiteCI --- tvix/castore/src/blobservice/grpc.rs | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) (limited to 'tvix/castore/src/blobservice/grpc.rs') diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 183ebb76e1..acc0125c82 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -1,5 +1,8 @@ use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; -use crate::{proto, B3Digest}; +use crate::{ + proto::{self, stat_blob_response::ChunkMeta}, + B3Digest, +}; use futures::sink::SinkExt; use std::{io, pin::pin, task::Poll}; use tokio::io::AsyncWriteExt; @@ -10,7 +13,7 @@ use tokio_util::{ sync::PollSender, }; use tonic::{async_trait, transport::Channel, Code, Status}; -use tracing::instrument; +use tracing::{instrument, warn}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] @@ -108,6 +111,31 @@ impl BlobService for GRPCBlobService { digest: None, }) } + + #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + async fn chunks(&self, digest: &B3Digest) -> io::Result>> { + let resp = self + .grpc_client + .clone() + .stat(proto::StatBlobRequest { + digest: digest.clone().into(), + send_chunks: true, + ..Default::default() + }) + .await; + + match resp { + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + Ok(resp) => { + let resp = resp.into_inner(); + if resp.chunks.is_empty() { + warn!("chunk list is empty"); + } + Ok(Some(resp.chunks)) + } + } + } } pub struct GRPCBlobWriter { -- cgit 1.4.1