diff options
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 32 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/mod.rs | 17 |
2 files changed, 47 insertions, 2 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 183ebb76e10c..acc0125c82ed 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -1,5 +1,8 @@ use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; -use crate::{proto, B3Digest}; +use crate::{ + proto::{self, stat_blob_response::ChunkMeta}, + B3Digest, +}; use futures::sink::SinkExt; use std::{io, pin::pin, task::Poll}; use tokio::io::AsyncWriteExt; @@ -10,7 +13,7 @@ use tokio_util::{ sync::PollSender, }; use tonic::{async_trait, transport::Channel, Code, Status}; -use tracing::instrument; +use tracing::{instrument, warn}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] @@ -108,6 +111,31 @@ impl BlobService for GRPCBlobService { digest: None, }) } + + #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] + async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { + let resp = self + .grpc_client + .clone() + .stat(proto::StatBlobRequest { + digest: digest.clone().into(), + send_chunks: true, + ..Default::default() + }) + .await; + + match resp { + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + Ok(resp) => { + let resp = resp.into_inner(); + if resp.chunks.is_empty() { + warn!("chunk list is empty"); + } + Ok(Some(resp.chunks)) + } + } + } } pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> { diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs index d024121eaa39..197b51b1d9e1 100644 --- a/tvix/castore/src/blobservice/mod.rs +++ b/tvix/castore/src/blobservice/mod.rs @@ -1,6 +1,7 @@ use std::io; use tonic::async_trait; +use crate::proto::stat_blob_response::ChunkMeta; use crate::B3Digest; mod from_addr; @@ -35,6 +36,22 @@ pub trait BlobService: Send + Sync { /// Insert a new blob into the store. Returns a [BlobWriter], which /// implements [io::Write] and a [BlobWriter::close]. async fn open_write(&self) -> Box<dyn BlobWriter>; + + /// Return a list of chunks for a given blob. + /// There's a distinction between returning Ok(None) and Ok(Some(vec![])). + /// The former return value is sent in case the blob is not present at all, + /// while the second one is sent in case there's no more granular chunks (or + /// the backend does not support chunking). + /// A default implementation signalling the backend does not support + /// chunking is provided. + async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> { + if !self.has(digest).await? { + return Ok(None); + } else { + // default implementation, signalling the backend does not support chunking. + return Ok(Some(vec![])); + } + } } /// A [tokio::io::AsyncWrite] that you need to close() afterwards, and get back |