diff options
-rw-r--r-- | tvix/castore/src/blobservice/chunked_reader.rs | 20 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/naive_seeker.rs | 18 |
2 files changed, 16 insertions, 22 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs index e2a9f614c456..3f2949f1a73a 100644 --- a/tvix/castore/src/blobservice/chunked_reader.rs +++ b/tvix/castore/src/blobservice/chunked_reader.rs @@ -1,12 +1,12 @@ -use futures::TryStreamExt; +use futures::{ready, TryStreamExt}; use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncSeekExt}; use tokio_stream::StreamExt; use tokio_util::io::{ReaderStream, StreamReader}; -use tracing::warn; +use tracing::{instrument, warn}; use crate::B3Digest; -use std::{cmp::Ordering, pin::Pin, task::Poll}; +use std::{cmp::Ordering, pin::Pin}; use super::{BlobReader, BlobService}; @@ -60,15 +60,12 @@ where let filled_before = buf.filled().len(); let this = self.project(); - match this.r.poll_read(cx, buf) { - Poll::Ready(a) => { - let bytes_read = buf.filled().len() - filled_before; - *this.pos += bytes_read as u64; - Poll::Ready(a) - } - Poll::Pending => Poll::Pending, - } + ready!(this.r.poll_read(cx, buf))?; + let bytes_read = buf.filled().len() - filled_before; + *this.pos += bytes_read as u64; + + Ok(()).into() } } @@ -76,6 +73,7 @@ impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS> where BS: AsRef<dyn BlobService> + Clone + Send + 'static, { + #[instrument(skip(self), err(Debug))] fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> { let total_len = self.chunked_blob.blob_length(); let current_pos = self.pos; diff --git a/tvix/castore/src/blobservice/naive_seeker.rs b/tvix/castore/src/blobservice/naive_seeker.rs index e65a82c7f45a..1475de580814 100644 --- a/tvix/castore/src/blobservice/naive_seeker.rs +++ b/tvix/castore/src/blobservice/naive_seeker.rs @@ -1,4 +1,5 @@ use super::BlobReader; +use futures::ready; use pin_project_lite::pin_project; use std::io; use std::task::Poll; @@ -83,18 +84,13 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> { // The amount of data read can be determined by the increase // in the length of the slice returned by `ReadBuf::filled`. let filled_before = buf.filled().len(); - let this = self.project(); - let pos: &mut u64 = this.pos; - match this.r.poll_read(cx, buf) { - Poll::Ready(a) => { - let bytes_read = buf.filled().len() - filled_before; - *pos += bytes_read as u64; + let this = self.project(); + ready!(this.r.poll_read(cx, buf))?; + let bytes_read = buf.filled().len() - filled_before; + *this.pos += bytes_read as u64; - Poll::Ready(a) - } - Poll::Pending => Poll::Pending, - } + Ok(()).into() } } @@ -115,7 +111,7 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> { } impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> { - #[instrument(skip(self))] + #[instrument(skip(self), err(Debug))] fn start_seek( self: std::pin::Pin<&mut Self>, position: std::io::SeekFrom, |