From 9d9c731147f07ce898b840198e99ea4d90433301 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 16 Apr 2024 16:51:45 +0300 Subject: feat(tvix/castore/blob/chunked_reader): add some more traces Change-Id: I2408707a7bc0e1c0cd8bd2933f8d68805b9e12c9 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11444 Tested-by: BuildkiteCI Reviewed-by: raitobezarius --- tvix/castore/src/blobservice/chunked_reader.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'tvix/castore/src') diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs index 2aaea385aed2..6e8355874bca 100644 --- a/tvix/castore/src/blobservice/chunked_reader.rs +++ b/tvix/castore/src/blobservice/chunked_reader.rs @@ -3,7 +3,7 @@ use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncSeekExt}; use tokio_stream::StreamExt; use tokio_util::io::{ReaderStream, StreamReader}; -use tracing::{instrument, warn}; +use tracing::{instrument, trace, warn}; use crate::B3Digest; use std::{cmp::Ordering, pin::Pin}; @@ -114,6 +114,9 @@ where // Update the position and the internal reader. *this.pos = absolute_offset; + + // FUTUREWORK: if we can seek forward, avoid re-assembling. + // At least if it's still in the same chunk? *this.r = this.chunked_blob.reader_skipped_offset(absolute_offset); } @@ -174,6 +177,7 @@ where /// For a given position pos, return the chunk containing the data. /// In case this would range outside the blob, None is returned. + #[instrument(level = "trace", skip(self), ret)] fn get_chunk_idx_for_position(&self, pos: u64) -> Option { // FUTUREWORK: benchmark when to use linear search, binary_search and BTreeSet self.chunks @@ -195,6 +199,7 @@ where /// From the first relevant chunk, the irrelevant bytes are skipped too. /// The returned boxed thing does not implement AsyncSeek on its own, but /// ChunkedReader does. + #[instrument(level = "trace", skip(self))] fn reader_skipped_offset(&self, offset: u64) -> Box { if offset == self.blob_length() { return Box::new(std::io::Cursor::new(vec![])); @@ -210,10 +215,11 @@ where let blob_service = self.blob_service.clone(); let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec(); let readers_stream = tokio_stream::iter(chunks.into_iter().enumerate()).map( - move |(nth_chunk, (_chunk_start_offset, _chunk_size, chunk_digest))| { + move |(nth_chunk, (_chunk_start_offset, chunk_size, chunk_digest))| { let chunk_digest = chunk_digest.to_owned(); let blob_service = blob_service.clone(); async move { + trace!(chunk_size=%chunk_size, chunk_digest=%chunk_digest, "open_read on chunk in stream"); let mut blob_reader = blob_service .as_ref() .open_read(&chunk_digest.to_owned()) -- cgit 1.4.1