about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-04-14T12·55+0300
committerclbot <clbot@tvl.fyi>2024-04-15T14·04+0000
commitc19bc95b8a0797c58da9ff42a3cfbae3f0f4d413 (patch)
treea0d834bd017b0112167a8b9d079ad16921506960
parentbccb4c92c357202e8be0f37345c0a933bd9c2cc0 (diff)
fix(tvix/castore/blobservice): update bytes_read only on successful read r/7922
We previously updated this.pos also in case the underlying read returned
an error.

Also, use the ready! macro to remove the match block, and instrument
errors returned during start_seek.

Change-Id: Ic32e26579d964a76b45687134acc48d72d67c36f
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11421
Reviewed-by: Brian Olsen <me@griff.name>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
-rw-r--r--tvix/castore/src/blobservice/chunked_reader.rs20
-rw-r--r--tvix/castore/src/blobservice/naive_seeker.rs18
2 files changed, 16 insertions, 22 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs
index e2a9f614c4..3f2949f1a7 100644
--- a/tvix/castore/src/blobservice/chunked_reader.rs
+++ b/tvix/castore/src/blobservice/chunked_reader.rs
@@ -1,12 +1,12 @@
-use futures::TryStreamExt;
+use futures::{ready, TryStreamExt};
 use pin_project_lite::pin_project;
 use tokio::io::{AsyncRead, AsyncSeekExt};
 use tokio_stream::StreamExt;
 use tokio_util::io::{ReaderStream, StreamReader};
-use tracing::warn;
+use tracing::{instrument, warn};
 
 use crate::B3Digest;
-use std::{cmp::Ordering, pin::Pin, task::Poll};
+use std::{cmp::Ordering, pin::Pin};
 
 use super::{BlobReader, BlobService};
 
@@ -60,15 +60,12 @@ where
         let filled_before = buf.filled().len();
 
         let this = self.project();
-        match this.r.poll_read(cx, buf) {
-            Poll::Ready(a) => {
-                let bytes_read = buf.filled().len() - filled_before;
-                *this.pos += bytes_read as u64;
 
-                Poll::Ready(a)
-            }
-            Poll::Pending => Poll::Pending,
-        }
+        ready!(this.r.poll_read(cx, buf))?;
+        let bytes_read = buf.filled().len() - filled_before;
+        *this.pos += bytes_read as u64;
+
+        Ok(()).into()
     }
 }
 
@@ -76,6 +73,7 @@ impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
 where
     BS: AsRef<dyn BlobService> + Clone + Send + 'static,
 {
+    #[instrument(skip(self), err(Debug))]
     fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
         let total_len = self.chunked_blob.blob_length();
         let current_pos = self.pos;
diff --git a/tvix/castore/src/blobservice/naive_seeker.rs b/tvix/castore/src/blobservice/naive_seeker.rs
index e65a82c7f4..1475de5808 100644
--- a/tvix/castore/src/blobservice/naive_seeker.rs
+++ b/tvix/castore/src/blobservice/naive_seeker.rs
@@ -1,4 +1,5 @@
 use super::BlobReader;
+use futures::ready;
 use pin_project_lite::pin_project;
 use std::io;
 use std::task::Poll;
@@ -83,18 +84,13 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> {
         // The amount of data read can be determined by the increase
         // in the length of the slice returned by `ReadBuf::filled`.
         let filled_before = buf.filled().len();
-        let this = self.project();
-        let pos: &mut u64 = this.pos;
 
-        match this.r.poll_read(cx, buf) {
-            Poll::Ready(a) => {
-                let bytes_read = buf.filled().len() - filled_before;
-                *pos += bytes_read as u64;
+        let this = self.project();
+        ready!(this.r.poll_read(cx, buf))?;
+        let bytes_read = buf.filled().len() - filled_before;
+        *this.pos += bytes_read as u64;
 
-                Poll::Ready(a)
-            }
-            Poll::Pending => Poll::Pending,
-        }
+        Ok(()).into()
     }
 }
 
@@ -115,7 +111,7 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> {
 }
 
 impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> {
-    #[instrument(skip(self))]
+    #[instrument(skip(self), err(Debug))]
     fn start_seek(
         self: std::pin::Pin<&mut Self>,
         position: std::io::SeekFrom,