about summary refs log tree commit diff
path: root/tvix/castore/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src')
-rw-r--r--tvix/castore/src/blobservice/grpc.rs26
1 files changed, 17 insertions, 9 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 632acef158e2..5663cd3838ec 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -1,10 +1,15 @@
-use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter, ChunkedReader};
+use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
 use crate::{
     proto::{self, stat_blob_response::ChunkMeta},
     B3Digest,
 };
 use futures::sink::SinkExt;
-use std::{io, pin::pin, sync::Arc, task::Poll};
+use std::{
+    io::{self, Cursor},
+    pin::pin,
+    sync::Arc,
+    task::Poll,
+};
 use tokio::io::AsyncWriteExt;
 use tokio::task::JoinHandle;
 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
@@ -55,11 +60,10 @@ impl BlobService for GRPCBlobService {
     #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
     async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
         // First try to get a list of chunks. In case there's only one chunk returned,
-        // or the backend does not support chunking, return a NaiveSeeker.
-        // Otherwise use a ChunkedReader.
-        // TODO: we should check if we want to replace NaiveSeeker with a simple
-        // Cursor on the raw `Vec<u8>`, as seeking backwards is something that
-        // clients generally do.
+        // buffer its data into a Vec, otherwise use a ChunkedReader.
+        // We previously used NaiveSeeker here, but userland likes to seek backwards too often,
+        // and without store composition this will get very noisy.
+        // FUTUREWORK: use CombinedBlobService and store composition.
         match self.chunks(digest).await {
             Ok(None) => Ok(None),
             Ok(Some(chunks)) => {
@@ -82,9 +86,13 @@ impl BlobService for GRPCBlobService {
                             });
 
                             // Use StreamReader::new to convert to an AsyncRead.
-                            let data_reader = tokio_util::io::StreamReader::new(data_stream);
+                            let mut data_reader = tokio_util::io::StreamReader::new(data_stream);
+
+                            let mut buf = Vec::new();
+                            // TODO: only do this up to a certain limit.
+                            tokio::io::copy(&mut data_reader, &mut buf).await?;
 
-                            Ok(Some(Box::new(NaiveSeeker::new(data_reader))))
+                            Ok(Some(Box::new(Cursor::new(buf))))
                         }
                         Err(e) if e.code() == Code::NotFound => Ok(None),
                         Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),