diff options
author | Florian Klink <flokli@flokli.de> | 2024-04-16T14·48+0300 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2024-04-16T18·45+0000 |
commit | 538d5fc8eef825f97add6a34f708d9a6b87a3ebd (patch) | |
tree | b109c082d1fe67e31f49ff23dc4d58ed7e3635ea /tvix/castore/src/blobservice/grpc.rs | |
parent | 8107678632e06c5bb1504d9376e4e3deda546ee7 (diff) |
fix(tvix/castore/blobservice/grpc): don't use NaiveSeeker for now r/7950
Userland likes to seek backwards, and until we have store composition and can serve chunks from a local cache, we need to buffer the individual chunks in memory. Change-Id: I66978a0722d5f55ed4a9a49d116cecb64a01995d Reviewed-on: https://cl.tvl.fyi/c/depot/+/11448 Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/castore/src/blobservice/grpc.rs')
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 632acef158e2..5663cd3838ec 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -1,10 +1,15 @@ -use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter, ChunkedReader}; +use super::{BlobReader, BlobService, BlobWriter, ChunkedReader}; use crate::{ proto::{self, stat_blob_response::ChunkMeta}, B3Digest, }; use futures::sink::SinkExt; -use std::{io, pin::pin, sync::Arc, task::Poll}; +use std::{ + io::{self, Cursor}, + pin::pin, + sync::Arc, + task::Poll, +}; use tokio::io::AsyncWriteExt; use tokio::task::JoinHandle; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; @@ -55,11 +60,10 @@ impl BlobService for GRPCBlobService { #[instrument(skip(self, digest), fields(blob.digest=%digest), err)] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // First try to get a list of chunks. In case there's only one chunk returned, - // or the backend does not support chunking, return a NaiveSeeker. - // Otherwise use a ChunkedReader. - // TODO: we should check if we want to replace NaiveSeeker with a simple - // Cursor on the raw `Vec<u8>`, as seeking backwards is something that - // clients generally do. + // buffer its data into a Vec, otherwise use a ChunkedReader. + // We previously used NaiveSeeker here, but userland likes to seek backwards too often, + // and without store composition this will get very noisy. + // FUTUREWORK: use CombinedBlobService and store composition. match self.chunks(digest).await { Ok(None) => Ok(None), Ok(Some(chunks)) => { @@ -82,9 +86,13 @@ impl BlobService for GRPCBlobService { }); // Use StreamReader::new to convert to an AsyncRead. - let data_reader = tokio_util::io::StreamReader::new(data_stream); + let mut data_reader = tokio_util::io::StreamReader::new(data_stream); + + let mut buf = Vec::new(); + // TODO: only do this up to a certain limit. + tokio::io::copy(&mut data_reader, &mut buf).await?; - Ok(Some(Box::new(NaiveSeeker::new(data_reader)))) + Ok(Some(Box::new(Cursor::new(buf)))) } Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)), |