diff options
Diffstat (limited to 'tvix/castore/src')
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 632acef158e2..5663cd3838ec 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -1,10 +1,15 @@ -use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter, ChunkedReader}; +use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; use crate::{ proto::{self, stat_blob_response::ChunkMeta}, B3Digest, }; use futures::sink::SinkExt; -use std::{io, pin::pin, sync::Arc, task::Poll}; +use std::{ + io::{self, Cursor}, + pin::pin, + sync::Arc, + task::Poll, +}; use tokio::io::AsyncWriteExt; use tokio::task::JoinHandle; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; @@ -55,11 +60,10 @@ impl BlobService for GRPCBlobService { #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // First try to get a list of chunks. In case there's only one chunk returned, - // or the backend does not support chunking, return a NaiveSeeker. - // Otherwise use a ChunkedReader. - // TODO: we should check if we want to replace NaiveSeeker with a simple - // Cursor on the raw `Vec<u8>`, as seeking backwards is something that - // clients generally do. + // buffer its data into a Vec, otherwise use a ChunkedReader. + // We previously used NaiveSeeker here, but userland likes to seek backwards too often, + // and without store composition this will get very noisy. + // FUTUREWORK: use CombinedBlobService and store composition. match self.chunks(digest).await { Ok(None) => Ok(None), Ok(Some(chunks)) => { @@ -82,9 +86,13 @@ impl BlobService for GRPCBlobService { }); // Use StreamReader::new to convert to an AsyncRead. - let data_reader = tokio_util::io::StreamReader::new(data_stream); + let mut data_reader = tokio_util::io::StreamReader::new(data_stream); + + let mut buf = Vec::new(); + // TODO: only do this up to a certain limit. + tokio::io::copy(&mut data_reader, &mut buf).await?; - Ok(Some(Box::new(NaiveSeeker::new(data_reader)))) + Ok(Some(Box::new(Cursor::new(buf)))) } Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), |