about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/src/blobservice/grpc.rs80
1 files changed, 54 insertions, 26 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index d98a9b517724..9230abaf346f 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -1,10 +1,10 @@
-use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
+use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter, ChunkedReader};
 use crate::{
     proto::{self, stat_blob_response::ChunkMeta},
     B3Digest,
 };
 use futures::sink::SinkExt;
-use std::{io, pin::pin, task::Poll};
+use std::{io, pin::pin, sync::Arc, task::Poll};
 use tokio::io::AsyncWriteExt;
 use tokio::task::JoinHandle;
 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
@@ -54,31 +54,59 @@ impl BlobService for GRPCBlobService {
 
     #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
     async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
-        // Get a stream of [proto::BlobChunk], or return an error if the blob
-        // doesn't exist.
-        match self
-            .grpc_client
-            .clone()
-            .read(proto::ReadBlobRequest {
-                digest: digest.clone().into(),
-            })
-            .await
-        {
-            Ok(stream) => {
-                // on success, this is a stream of tonic::Result<proto::BlobChunk>,
-                // so access .data and map errors into std::io::Error.
-                let data_stream = stream.into_inner().map(|e| {
-                    e.map(|c| c.data)
-                        .map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s))
-                });
-
-                // Use StreamReader::new to convert to an AsyncRead.
-                let data_reader = tokio_util::io::StreamReader::new(data_stream);
-
-                Ok(Some(Box::new(NaiveSeeker::new(data_reader))))
+        // First try to get a list of chunks. In case there's only one chunk returned,
+        // or the backend does not support chunking, return a NaiveSeeker.
+        // Otherwise use a ChunkedReader.
+        // TODO: we should check if we want to replace NaiveSeeker with a simple
+        // Cursor on the raw `Vec<u8>`, as seeking backwards is something that
+        // clients generally do.
+        match self.chunks(digest).await {
+            Ok(None) => Ok(None),
+            Ok(Some(chunks)) => {
+                if chunks.is_empty() || chunks.len() == 1 {
+                    // No more granular chunking info, treat this as an individual chunk.
+                    // Get a stream of [proto::BlobChunk], or return an error if the blob
+                    // doesn't exist.
+                    return match self
+                        .grpc_client
+                        .clone()
+                        .read(proto::ReadBlobRequest {
+                            digest: digest.clone().into(),
+                        })
+                        .await
+                    {
+                        Ok(stream) => {
+                            let data_stream = stream.into_inner().map(|e| {
+                                e.map(|c| c.data)
+                                    .map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s))
+                            });
+
+                            // Use StreamReader::new to convert to an AsyncRead.
+                            let data_reader = tokio_util::io::StreamReader::new(data_stream);
+
+                            Ok(Some(Box::new(NaiveSeeker::new(data_reader))))
+                        }
+                        Err(e) if e.code() == Code::NotFound => Ok(None),
+                        Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+                    };
+                }
+
+                // The chunked case. Let ChunkedReader do individual reads.
+                // TODO: we should store the chunking data in some local cache,
+                // so `ChunkedReader` doesn't call `self.chunks` *again* for every chunk.
+                // Think about how store composition will fix this.
+                let chunked_reader = ChunkedReader::from_chunks(
+                    chunks.into_iter().map(|chunk| {
+                        (
+                            chunk.digest.try_into().expect("invalid b3 digest"),
+                            chunk.size,
+                        )
+                    }),
+                    Arc::new(self.clone()) as Arc<dyn BlobService>,
+                );
+                Ok(Some(Box::new(chunked_reader)))
             }
-            Err(e) if e.code() == Code::NotFound => Ok(None),
-            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+            Err(e) => Err(e)?,
         }
     }