diff options
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 80 |
1 files changed, 54 insertions, 26 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index d98a9b517724..9230abaf346f 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -1,10 +1,10 @@ -use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; +use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter, ChunkedReader}; use crate::{ proto::{self, stat_blob_response::ChunkMeta}, B3Digest, }; use futures::sink::SinkExt; -use std::{io, pin::pin, task::Poll}; +use std::{io, pin::pin, sync::Arc, task::Poll}; use tokio::io::AsyncWriteExt; use tokio::task::JoinHandle; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; @@ -54,31 +54,59 @@ impl BlobService for GRPCBlobService { #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { - // Get a stream of [proto::BlobChunk], or return an error if the blob - // doesn't exist. - match self - .grpc_client - .clone() - .read(proto::ReadBlobRequest { - digest: digest.clone().into(), - }) - .await - { - Ok(stream) => { - // on success, this is a stream of tonic::Result<proto::BlobChunk>, - // so access .data and map errors into std::io::Error. - let data_stream = stream.into_inner().map(|e| { - e.map(|c| c.data) - .map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s)) - }); - - // Use StreamReader::new to convert to an AsyncRead. - let data_reader = tokio_util::io::StreamReader::new(data_stream); - - Ok(Some(Box::new(NaiveSeeker::new(data_reader)))) + // First try to get a list of chunks. In case there's only one chunk returned, + // or the backend does not support chunking, return a NaiveSeeker. + // Otherwise use a ChunkedReader. + // TODO: we should check if we want to replace NaiveSeeker with a simple + // Cursor on the raw `Vec<u8>`, as seeking backwards is something that + // clients generally do. + match self.chunks(digest).await { + Ok(None) => Ok(None), + Ok(Some(chunks)) => { + if chunks.is_empty() || chunks.len() == 1 { + // No more granular chunking info, treat this as an individual chunk. + // Get a stream of [proto::BlobChunk], or return an error if the blob + // doesn't exist. + return match self + .grpc_client + .clone() + .read(proto::ReadBlobRequest { + digest: digest.clone().into(), + }) + .await + { + Ok(stream) => { + let data_stream = stream.into_inner().map(|e| { + e.map(|c| c.data) + .map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s)) + }); + + // Use StreamReader::new to convert to an AsyncRead. + let data_reader = tokio_util::io::StreamReader::new(data_stream); + + Ok(Some(Box::new(NaiveSeeker::new(data_reader)))) + } + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + }; + } + + // The chunked case. Let ChunkedReader do individual reads. + // TODO: we should store the chunking data in some local cache, + // so `ChunkedReader` doesn't call `self.chunks` *again* for every chunk. + // Think about how store composition will fix this. + let chunked_reader = ChunkedReader::from_chunks( + chunks.into_iter().map(|chunk| { + ( + chunk.digest.try_into().expect("invalid b3 digest"), + chunk.size, + ) + }), + Arc::new(self.clone()) as Arc<dyn BlobService>, + ); + Ok(Some(Box::new(chunked_reader))) } - Err(e) if e.code() == Code::NotFound => Ok(None), - Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), + Err(e) => Err(e)?, } } |