diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/castore/src/blobservice/naive_seeker.rs | 27 |
1 files changed, 17 insertions, 10 deletions
diff --git a/tvix/castore/src/blobservice/naive_seeker.rs b/tvix/castore/src/blobservice/naive_seeker.rs index 5c943b9870b4..f5a530715093 100644 --- a/tvix/castore/src/blobservice/naive_seeker.rs +++ b/tvix/castore/src/blobservice/naive_seeker.rs @@ -4,7 +4,7 @@ use pin_project_lite::pin_project; use std::io; use std::task::Poll; use tokio::io::AsyncRead; -use tracing::{debug, instrument}; +use tracing::{debug, instrument, trace, warn}; pin_project! { /// This implements [tokio::io::AsyncSeek] for and [tokio::io::AsyncRead] by @@ -79,6 +79,7 @@ impl<R: tokio::io::AsyncRead> NaiveSeeker<R> { } impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> { + #[instrument(level = "trace", skip_all)] fn poll_read( self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, @@ -90,9 +91,12 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> { let this = self.project(); ready!(this.r.poll_read(cx, buf))?; + let bytes_read = buf.filled().len() - filled_before; *this.pos += bytes_read as u64; + trace!(bytes_read = bytes_read, new_pos = this.pos, "poll_read"); + Ok(()).into() } } @@ -105,16 +109,18 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> { self.project().r.poll_fill_buf(cx) } + #[instrument(level = "trace", skip(self))] fn consume(self: std::pin::Pin<&mut Self>, amt: usize) { let this = self.project(); this.r.consume(amt); - let pos: &mut u64 = this.pos; - *pos += amt as u64; + *this.pos += amt as u64; + + trace!(new_pos = this.pos, "consume"); } } impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> { - #[instrument(skip(self), err(Debug))] + #[instrument(level="trace", skip(self), fields(inner_pos=%self.pos), err(Debug))] fn start_seek( self: std::pin::Pin<&mut Self>, position: std::io::SeekFrom, @@ -149,23 +155,24 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> { } }; - debug!(absolute_offset=?absolute_offset, "seek"); - - // we already know absolute_offset is larger than self.pos + // we already know absolute_offset is >= self.pos debug_assert!( absolute_offset >= self.pos, - "absolute_offset {} is larger than self.pos {}", + "absolute_offset {} must be >= self.pos {}", absolute_offset, self.pos ); // calculate bytes to skip - *self.project().bytes_to_skip = absolute_offset - self.pos; + let this = self.project(); + *this.bytes_to_skip = absolute_offset - *this.pos; + + debug!(bytes_to_skip = *this.bytes_to_skip, "seek"); Ok(()) } - #[instrument(skip(self))] + #[instrument(skip_all)] fn poll_complete( mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, |