From c19bc95b8a0797c58da9ff42a3cfbae3f0f4d413 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sun, 14 Apr 2024 15:55:50 +0300 Subject: fix(tvix/castore/blobservice): update bytes_read only on successful read We previously updated this.pos also in case the underlying read returned an error. Also, use the ready! macro to remove the match block, and instrument errors returned during start_seek. Change-Id: Ic32e26579d964a76b45687134acc48d72d67c36f Reviewed-on: https://cl.tvl.fyi/c/depot/+/11421 Reviewed-by: Brian Olsen Reviewed-by: Connor Brewster Tested-by: BuildkiteCI Autosubmit: flokli --- tvix/castore/src/blobservice/chunked_reader.rs | 20 +++++++++----------- tvix/castore/src/blobservice/naive_seeker.rs | 18 +++++++----------- 2 files changed, 16 insertions(+), 22 deletions(-) diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs index e2a9f614c4..3f2949f1a7 100644 --- a/tvix/castore/src/blobservice/chunked_reader.rs +++ b/tvix/castore/src/blobservice/chunked_reader.rs @@ -1,12 +1,12 @@ -use futures::TryStreamExt; +use futures::{ready, TryStreamExt}; use pin_project_lite::pin_project; use tokio::io::{AsyncRead, AsyncSeekExt}; use tokio_stream::StreamExt; use tokio_util::io::{ReaderStream, StreamReader}; -use tracing::warn; +use tracing::{instrument, warn}; use crate::B3Digest; -use std::{cmp::Ordering, pin::Pin, task::Poll}; +use std::{cmp::Ordering, pin::Pin}; use super::{BlobReader, BlobService}; @@ -60,15 +60,12 @@ where let filled_before = buf.filled().len(); let this = self.project(); - match this.r.poll_read(cx, buf) { - Poll::Ready(a) => { - let bytes_read = buf.filled().len() - filled_before; - *this.pos += bytes_read as u64; - Poll::Ready(a) - } - Poll::Pending => Poll::Pending, - } + ready!(this.r.poll_read(cx, buf))?; + let bytes_read = buf.filled().len() - filled_before; + *this.pos += bytes_read as u64; + + Ok(()).into() } } @@ -76,6 +73,7 @@ impl tokio::io::AsyncSeek for ChunkedReader where BS: AsRef + Clone + Send + 'static, { + #[instrument(skip(self), err(Debug))] fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> { let total_len = self.chunked_blob.blob_length(); let current_pos = self.pos; diff --git a/tvix/castore/src/blobservice/naive_seeker.rs b/tvix/castore/src/blobservice/naive_seeker.rs index e65a82c7f4..1475de5808 100644 --- a/tvix/castore/src/blobservice/naive_seeker.rs +++ b/tvix/castore/src/blobservice/naive_seeker.rs @@ -1,4 +1,5 @@ use super::BlobReader; +use futures::ready; use pin_project_lite::pin_project; use std::io; use std::task::Poll; @@ -83,18 +84,13 @@ impl tokio::io::AsyncRead for NaiveSeeker { // The amount of data read can be determined by the increase // in the length of the slice returned by `ReadBuf::filled`. let filled_before = buf.filled().len(); - let this = self.project(); - let pos: &mut u64 = this.pos; - match this.r.poll_read(cx, buf) { - Poll::Ready(a) => { - let bytes_read = buf.filled().len() - filled_before; - *pos += bytes_read as u64; + let this = self.project(); + ready!(this.r.poll_read(cx, buf))?; + let bytes_read = buf.filled().len() - filled_before; + *this.pos += bytes_read as u64; - Poll::Ready(a) - } - Poll::Pending => Poll::Pending, - } + Ok(()).into() } } @@ -115,7 +111,7 @@ impl tokio::io::AsyncBufRead for NaiveSeeker { } impl tokio::io::AsyncSeek for NaiveSeeker { - #[instrument(skip(self))] + #[instrument(skip(self), err(Debug))] fn start_seek( self: std::pin::Pin<&mut Self>, position: std::io::SeekFrom, -- cgit 1.4.1