about summary refs log tree commit diff
path: root/tvix/store/src/blobservice/grpc.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-06-30T14·08+0200
committerclbot <clbot@tvl.fyi>2023-07-21T18·52+0000
commit7613e2e76972554ee2a5ae1397f8b5ca84f4f729 (patch)
tree366554a7137abbd63971e8c517a0926fcffc56a7 /tvix/store/src/blobservice/grpc.rs
parent42dc18353d99453bc0f83492f9f5bc4796f4cc4c (diff)
feat(tvix/store/blobservice): implement seek r/6434
For memory and sled, it's trivial, as we already have a Cursor<Vec<u8>>.
For gRPC, we simply reject going backwards, and skip n bytes for now.

Once the gRPC protocol gets support for offsets and verified streaming,
this can be improved.

Change-Id: I734066a514aed287ea3db64bfb1680911ac1eeb0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8885
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix/store/src/blobservice/grpc.rs')
-rw-r--r--tvix/store/src/blobservice/grpc.rs9
1 files changed, 3 insertions, 6 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs
index a0006eb72f9e..96e2869a4feb 100644
--- a/tvix/store/src/blobservice/grpc.rs
+++ b/tvix/store/src/blobservice/grpc.rs
@@ -1,4 +1,4 @@
-use super::{BlobService, BlobWriter};
+use super::{dumb_seeker::DumbSeeker, BlobReader, BlobService, BlobWriter};
 use crate::{proto, B3Digest};
 use futures::sink::{SinkExt, SinkMapErr};
 use std::{collections::VecDeque, io};
@@ -114,10 +114,7 @@ impl BlobService for GRPCBlobService {
 
     // On success, this returns a Ok(Some(io::Read)), which can be used to read
     // the contents of the Blob, identified by the digest.
-    fn open_read(
-        &self,
-        digest: &B3Digest,
-    ) -> Result<Option<Box<dyn io::Read + Send>>, crate::Error> {
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Box<dyn BlobReader>>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
         let digest = digest.clone();
@@ -155,7 +152,7 @@ impl BlobService for GRPCBlobService {
 
                 // Use SyncIoBridge to turn it into a sync Read.
                 let sync_reader = tokio_util::io::SyncIoBridge::new(data_reader);
-                Ok(Some(Box::new(sync_reader)))
+                Ok(Some(Box::new(DumbSeeker::new(sync_reader))))
             }
             Err(e) if e.code() == Code::NotFound => Ok(None),
             Err(e) => Err(crate::Error::StorageError(e.to_string())),