about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice/grpc.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-01-09T11·55+0200
committerclbot <clbot@tvl.fyi>2024-01-09T14·03+0000
commit8fbdf72825843416dc1923d91cb20059cdbc07b1 (patch)
tree9f0624af6fa96a4d21c8ecd1694b1f3ec9f5a295 /tvix/castore/src/blobservice/grpc.rs
parent17eaacb139029f481ff821768c386061886c7059 (diff)
feat(tvix/castore/blobsvc/grpc): rm VecDec, fix docstring r/7358
The docstrings were not updated once we made the BlobService trait async.
There's no more need to turn things into a sync reader.

Also, rearrange the stream manipulation a bit, and remove the need to
create a new VecDeque for each element in the stream. bytes::Bytes
implements the Buf trait.

Fixes b/289.

Change-Id: Id2bbedca5876b462e630c144b74cc289c3916c4d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10582
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/castore/src/blobservice/grpc.rs')
-rw-r--r--tvix/castore/src/blobservice/grpc.rs30
1 files changed, 9 insertions, 21 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index faf50eff88ce..3765dda78012 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -1,12 +1,7 @@
 use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
 use crate::{proto, B3Digest};
 use futures::sink::SinkExt;
-use std::{
-    collections::VecDeque,
-    io::{self},
-    pin::pin,
-    task::Poll,
-};
+use std::{io, pin::pin, task::Poll};
 use tokio::io::AsyncWriteExt;
 use tokio::task::JoinHandle;
 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
@@ -54,30 +49,23 @@ impl BlobService for GRPCBlobService {
         }
     }
 
-    // On success, this returns a Ok(Some(io::Read)), which can be used to read
-    // the contents of the Blob, identified by the digest.
     async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
         // Get a stream of [proto::BlobChunk], or return an error if the blob
         // doesn't exist.
-        let resp = self
+        match self
             .grpc_client
             .clone()
             .read(proto::ReadBlobRequest {
                 digest: digest.clone().into(),
             })
-            .await;
-
-        // This runs the task to completion, which on success will return a stream.
-        // On reading from it, we receive individual [proto::BlobChunk], so we
-        // massage this to a stream of bytes,
-        // then create an [AsyncRead], which we'll turn into a [io::Read],
-        // that's returned from the function.
-        match resp {
+            .await
+        {
             Ok(stream) => {
-                // map the stream of proto::BlobChunk to bytes.
-                let data_stream = stream.into_inner().map(|x| {
-                    x.map(|x| VecDeque::from(x.data.to_vec()))
-                        .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
+                // on success, this is a stream of tonic::Result<proto::BlobChunk>,
+                // so access .data and map errors into std::io::Error.
+                let data_stream = stream.into_inner().map(|e| {
+                    e.map(|c| c.data)
+                        .map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s))
                 });
 
                 // Use StreamReader::new to convert to an AsyncRead.