diff options
author | Florian Klink <flokli@flokli.de> | 2024-01-09T11·55+0200 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2024-01-09T14·03+0000 |
commit | 8fbdf72825843416dc1923d91cb20059cdbc07b1 (patch) | |
tree | 9f0624af6fa96a4d21c8ecd1694b1f3ec9f5a295 /tvix/castore/src/blobservice/grpc.rs | |
parent | 17eaacb139029f481ff821768c386061886c7059 (diff) |
feat(tvix/castore/blobsvc/grpc): rm VecDec, fix docstring r/7358
The docstrings were not updated once we made the BlobService trait async. There's no more need to turn things into a sync reader. Also, rearrange the stream manipulation a bit, and remove the need to create a new VecDeque for each element in the stream. bytes::Bytes implements the Buf trait. Fixes b/289. Change-Id: Id2bbedca5876b462e630c144b74cc289c3916c4d Reviewed-on: https://cl.tvl.fyi/c/depot/+/10582 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/castore/src/blobservice/grpc.rs')
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 30 |
1 files changed, 9 insertions, 21 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index faf50eff88ce..3765dda78012 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -1,12 +1,7 @@ use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter}; use crate::{proto, B3Digest}; use futures::sink::SinkExt; -use std::{ - collections::VecDeque, - io::{self}, - pin::pin, - task::Poll, -}; +use std::{io, pin::pin, task::Poll}; use tokio::io::AsyncWriteExt; use tokio::task::JoinHandle; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; @@ -54,30 +49,23 @@ impl BlobService for GRPCBlobService { } } - // On success, this returns a Ok(Some(io::Read)), which can be used to read - // the contents of the Blob, identified by the digest. async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { // Get a stream of [proto::BlobChunk], or return an error if the blob // doesn't exist. - let resp = self + match self .grpc_client .clone() .read(proto::ReadBlobRequest { digest: digest.clone().into(), }) - .await; - - // This runs the task to completion, which on success will return a stream. - // On reading from it, we receive individual [proto::BlobChunk], so we - // massage this to a stream of bytes, - // then create an [AsyncRead], which we'll turn into a [io::Read], - // that's returned from the function. - match resp { + .await + { Ok(stream) => { - // map the stream of proto::BlobChunk to bytes. - let data_stream = stream.into_inner().map(|x| { - x.map(|x| VecDeque::from(x.data.to_vec())) - .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) + // on success, this is a stream of tonic::Result<proto::BlobChunk>, + // so access .data and map errors into std::io::Error. + let data_stream = stream.into_inner().map(|e| { + e.map(|c| c.data) + .map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s)) }); // Use StreamReader::new to convert to an AsyncRead. |