about summary refs log tree commit diff
path: root/tvix/castore/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src')
-rw-r--r--tvix/castore/src/blobservice/chunked_reader.rs10
1 files changed, 8 insertions, 2 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs
index 2aaea385aed2..6e8355874bca 100644
--- a/tvix/castore/src/blobservice/chunked_reader.rs
+++ b/tvix/castore/src/blobservice/chunked_reader.rs
@@ -3,7 +3,7 @@ use pin_project_lite::pin_project;
 use tokio::io::{AsyncRead, AsyncSeekExt};
 use tokio_stream::StreamExt;
 use tokio_util::io::{ReaderStream, StreamReader};
-use tracing::{instrument, warn};
+use tracing::{instrument, trace, warn};
 
 use crate::B3Digest;
 use std::{cmp::Ordering, pin::Pin};
@@ -114,6 +114,9 @@ where
 
             // Update the position and the internal reader.
             *this.pos = absolute_offset;
+
+            // FUTUREWORK: if we can seek forward, avoid re-assembling.
+            // At least if it's still in the same chunk?
             *this.r = this.chunked_blob.reader_skipped_offset(absolute_offset);
         }
 
@@ -174,6 +177,7 @@ where
 
     /// For a given position pos, return the chunk containing the data.
     /// In case this would range outside the blob, None is returned.
+    #[instrument(level = "trace", skip(self), ret)]
     fn get_chunk_idx_for_position(&self, pos: u64) -> Option<usize> {
         // FUTUREWORK: benchmark when to use linear search, binary_search and BTreeSet
         self.chunks
@@ -195,6 +199,7 @@ where
     /// From the first relevant chunk, the irrelevant bytes are skipped too.
     /// The returned boxed thing does not implement AsyncSeek on its own, but
     /// ChunkedReader does.
+    #[instrument(level = "trace", skip(self))]
     fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
         if offset == self.blob_length() {
             return Box::new(std::io::Cursor::new(vec![]));
@@ -210,10 +215,11 @@ where
         let blob_service = self.blob_service.clone();
         let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec();
         let readers_stream = tokio_stream::iter(chunks.into_iter().enumerate()).map(
-            move |(nth_chunk, (_chunk_start_offset, _chunk_size, chunk_digest))| {
+            move |(nth_chunk, (_chunk_start_offset, chunk_size, chunk_digest))| {
                 let chunk_digest = chunk_digest.to_owned();
                 let blob_service = blob_service.clone();
                 async move {
+                    trace!(chunk_size=%chunk_size, chunk_digest=%chunk_digest, "open_read on chunk in stream");
                     let mut blob_reader = blob_service
                         .as_ref()
                         .open_read(&chunk_digest.to_owned())