about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/castore/src/blobservice/chunked_reader.rs20
-rw-r--r--tvix/castore/src/blobservice/naive_seeker.rs18
2 files changed, 16 insertions, 22 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs
index e2a9f614c456..3f2949f1a73a 100644
--- a/tvix/castore/src/blobservice/chunked_reader.rs
+++ b/tvix/castore/src/blobservice/chunked_reader.rs
@@ -1,12 +1,12 @@
-use futures::TryStreamExt;
+use futures::{ready, TryStreamExt};
 use pin_project_lite::pin_project;
 use tokio::io::{AsyncRead, AsyncSeekExt};
 use tokio_stream::StreamExt;
 use tokio_util::io::{ReaderStream, StreamReader};
-use tracing::warn;
+use tracing::{instrument, warn};
 
 use crate::B3Digest;
-use std::{cmp::Ordering, pin::Pin, task::Poll};
+use std::{cmp::Ordering, pin::Pin};
 
 use super::{BlobReader, BlobService};
 
@@ -60,15 +60,12 @@ where
         let filled_before = buf.filled().len();
 
         let this = self.project();
-        match this.r.poll_read(cx, buf) {
-            Poll::Ready(a) => {
-                let bytes_read = buf.filled().len() - filled_before;
-                *this.pos += bytes_read as u64;
 
-                Poll::Ready(a)
-            }
-            Poll::Pending => Poll::Pending,
-        }
+        ready!(this.r.poll_read(cx, buf))?;
+        let bytes_read = buf.filled().len() - filled_before;
+        *this.pos += bytes_read as u64;
+
+        Ok(()).into()
     }
 }
 
@@ -76,6 +73,7 @@ impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
 where
     BS: AsRef<dyn BlobService> + Clone + Send + 'static,
 {
+    #[instrument(skip(self), err(Debug))]
     fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
         let total_len = self.chunked_blob.blob_length();
         let current_pos = self.pos;
diff --git a/tvix/castore/src/blobservice/naive_seeker.rs b/tvix/castore/src/blobservice/naive_seeker.rs
index e65a82c7f45a..1475de580814 100644
--- a/tvix/castore/src/blobservice/naive_seeker.rs
+++ b/tvix/castore/src/blobservice/naive_seeker.rs
@@ -1,4 +1,5 @@
 use super::BlobReader;
+use futures::ready;
 use pin_project_lite::pin_project;
 use std::io;
 use std::task::Poll;
@@ -83,18 +84,13 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> {
         // The amount of data read can be determined by the increase
         // in the length of the slice returned by `ReadBuf::filled`.
         let filled_before = buf.filled().len();
-        let this = self.project();
-        let pos: &mut u64 = this.pos;
 
-        match this.r.poll_read(cx, buf) {
-            Poll::Ready(a) => {
-                let bytes_read = buf.filled().len() - filled_before;
-                *pos += bytes_read as u64;
+        let this = self.project();
+        ready!(this.r.poll_read(cx, buf))?;
+        let bytes_read = buf.filled().len() - filled_before;
+        *this.pos += bytes_read as u64;
 
-                Poll::Ready(a)
-            }
-            Poll::Pending => Poll::Pending,
-        }
+        Ok(()).into()
     }
 }
 
@@ -115,7 +111,7 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> {
 }
 
 impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> {
-    #[instrument(skip(self))]
+    #[instrument(skip(self), err(Debug))]
     fn start_seek(
         self: std::pin::Pin<&mut Self>,
         position: std::io::SeekFrom,