diff options
author | Florian Klink <flokli@flokli.de> | 2024-04-14T12·55+0300 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-04-15T14·04+0000 |
commit | c19bc95b8a0797c58da9ff42a3cfbae3f0f4d413 (patch) | |
tree | a0d834bd017b0112167a8b9d079ad16921506960 /tvix | |
parent | bccb4c92c357202e8be0f37345c0a933bd9c2cc0 (diff) |
fix(tvix/castore/blobservice): update bytes_read only on successful read r/7922
We previously updated this.pos also in case the underlying read returned an error. Also, use the ready! macro to remove the match block, and instrument errors returned during start_seek. Change-Id: Ic32e26579d964a76b45687134acc48d72d67c36f Reviewed-on: https://cl.tvl.fyi/c/depot/+/11421 Reviewed-by: Brian Olsen <me@griff.name> Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/castore/src/blobservice/chunked_reader.rs | 20 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/naive_seeker.rs | 18 |
2 files changed, 16 insertions, 22 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs index e2a9f614c456..3f2949f1a73a 100644 --- a/tvix/castore/src/blobservice/chunked_reader.rs +++ b/tvix/castore/src/blobservice/chunked_reader.rs @@ -1,12 +1,12 @@ -use futures::TryStreamExt; +use futures::{ready, TryStreamExt}; use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncSeekExt}; use tokio_stream::StreamExt; use tokio_util::io::{ReaderStream, StreamReader}; -use tracing::warn; +use tracing::{instrument, warn}; use crate::B3Digest; -use std::{cmp::Ordering, pin::Pin, task::Poll}; +use std::{cmp::Ordering, pin::Pin}; use super::{BlobReader, BlobService}; @@ -60,15 +60,12 @@ where let filled_before = buf.filled().len(); let this = self.project(); - match this.r.poll_read(cx, buf) { - Poll::Ready(a) => { - let bytes_read = buf.filled().len() - filled_before; - *this.pos += bytes_read as u64; - Poll::Ready(a) - } - Poll::Pending => Poll::Pending, - } + ready!(this.r.poll_read(cx, buf))?; + let bytes_read = buf.filled().len() - filled_before; + *this.pos += bytes_read as u64; + + Ok(()).into() } } @@ -76,6 +73,7 @@ impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS> where BS: AsRef<dyn BlobService> + Clone + Send + 'static, { + #[instrument(skip(self), err(Debug))] fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> { let total_len = self.chunked_blob.blob_length(); let current_pos = self.pos; diff --git a/tvix/castore/src/blobservice/naive_seeker.rs b/tvix/castore/src/blobservice/naive_seeker.rs index e65a82c7f45a..1475de580814 100644 --- a/tvix/castore/src/blobservice/naive_seeker.rs +++ b/tvix/castore/src/blobservice/naive_seeker.rs @@ -1,4 +1,5 @@ use super::BlobReader; +use futures::ready; use pin_project_lite::pin_project; use std::io; use std::task::Poll; @@ -83,18 +84,13 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> { // The amount of data read can be determined by the increase // in the length of the slice returned by `ReadBuf::filled`. let filled_before = buf.filled().len(); - let this = self.project(); - let pos: &mut u64 = this.pos; - match this.r.poll_read(cx, buf) { - Poll::Ready(a) => { - let bytes_read = buf.filled().len() - filled_before; - *pos += bytes_read as u64; + let this = self.project(); + ready!(this.r.poll_read(cx, buf))?; + let bytes_read = buf.filled().len() - filled_before; + *this.pos += bytes_read as u64; - Poll::Ready(a) - } - Poll::Pending => Poll::Pending, - } + Ok(()).into() } } @@ -115,7 +111,7 @@ impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> { } impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> { - #[instrument(skip(self))] + #[instrument(skip(self), err(Debug))] fn start_seek( self: std::pin::Pin<&mut Self>, position: std::io::SeekFrom, |