about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-11-05T08·45+0200
committerflokli <flokli@flokli.de>2023-11-05T15·13+0000
commit67999f0dcf715962b8f56c9bfd8c5c403213cb02 (patch)
treeb5c6e5f793f22c5ca1af99309d31541d7c37fbb4 /tvix/castore
parentb921e3a7e38186204da02de4860803f60022ec4e (diff)
feat(tvix/castore): extend blobstore protos for verified streaming r/6954
This pdates the proto docstrings a bit, especially w.r.t. verified
streaming.
It also adds send_chunks, send_bao fields to StatBlobRequest (renamed
from BlobMeta)

Change-Id: I590cc8646d86b73bca9f38a9b6d9ea15e4df5cb6
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9951
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/protos/rpc_blobstore.proto57
-rw-r--r--tvix/castore/src/blobservice/grpc.rs1
-rw-r--r--tvix/castore/src/proto/grpc_blobservice_wrapper.rs4
3 files changed, 48 insertions, 14 deletions
diff --git a/tvix/castore/protos/rpc_blobstore.proto b/tvix/castore/protos/rpc_blobstore.proto
index 458803fd57cc..93c02d397c44 100644
--- a/tvix/castore/protos/rpc_blobstore.proto
+++ b/tvix/castore/protos/rpc_blobstore.proto
@@ -6,33 +6,66 @@ package tvix.castore.v1;
 
 option go_package = "code.tvl.fyi/tvix/castore-go;castorev1";
 
+// BlobService allows reading (or uploading) content-addressed blobs of data.
+// BLAKE3 is used as a hashing function for the data. Uploading a blob will
+// return the BLAKE3 digest of it, and that's the identifier used to Read/Stat
+// them too.
 service BlobService {
-    // In the future, Stat will expose more metadata about a given blob,
-    // such as more granular chunking, baos.
-    // For now, it's only used to check for the existence of a blob, as asking
-    // this for a non-existing Blob will return a Status::not_found gRPC error.
-    rpc Stat(StatBlobRequest) returns (BlobMeta);
-
-    // Read returns a stream of BlobChunk, which is just a stream of bytes with
-    // the digest specified in ReadBlobRequest.
-    //
+    // Stat can be used to check for the existence of a blob, as well as
+    // gathering more data about it, like more granular chunking information
+    // or baos.
+    // Server implementations are not required to provide more granular chunking
+    // information, especially if the digest specified in [StatBlobRequest] is
+    // already a chunk of a blob.
+    rpc Stat(StatBlobRequest) returns (StatBlobResponse);
+
+    // Read allows reading (all) data of a blob/chunk by the BLAKE3 digest of
+    // its contents.
+    // If the backend communicated more granular chunks in the `Stat` request,
+    // this can also be used to read chunks.
+    // This request returns a stream of BlobChunk, which is just a container for
+    // a stream of bytes.
     // The server may decide on whatever chunking it may seem fit as a size for
-    // the individual BlobChunk sent in the response stream.
+    // the individual BlobChunk sent in the response stream, this is mostly to
+    // keep individual messages at a manageable size.
     rpc Read(ReadBlobRequest) returns (stream BlobChunk);
 
     // Put uploads a Blob, by reading a stream of bytes.
     //
     // The way the data is chunked up in individual BlobChunk messages sent in
-    // the stream has no effect on how the server ends up chunking blobs up.
+    // the stream has no effect on how the server ends up chunking blobs up, if
+    // it does at all.
     rpc Put(stream BlobChunk) returns (PutBlobResponse);
 }
 
 message StatBlobRequest {
     // The blake3 digest of the blob requested
     bytes digest = 1;
+
+    // Whether the server should reply with a list of more granular chunks.
+    bool send_chunks = 2;
+
+    // Whether the server should reply with a bao.
+    bool send_bao = 3;
 }
 
-message BlobMeta {
+message StatBlobResponse {
+    // If `send_chunks` was set to true, this MAY contain a list of more
+    // granular chunks, which then may be read individually via the `Read`
+    // method.
+    repeated ChunkMeta chunks = 2;
+
+    message ChunkMeta {
+        // Digest of that specific chunk
+        bytes digest = 1;
+
+        // Length of that chunk, in bytes.
+        uint64 size = 2;
+    }
+
+    // If `send_bao` was set to true, this MAY contain a outboard bao.
+    // The exact format and message types here will still be fleshed out.
+    bytes bao = 3;
 }
 
 message ReadBlobRequest {
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index d0f619fcebcb..8d3bdfa8fa25 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -56,6 +56,7 @@ impl BlobService for GRPCBlobService {
         let resp = grpc_client
             .stat(proto::StatBlobRequest {
                 digest: digest.clone().into(),
+                ..Default::default()
             })
             .await;
 
diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
index 93db1deef69a..e7092bec1481 100644
--- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
@@ -93,7 +93,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
     async fn stat(
         &self,
         request: Request<super::StatBlobRequest>,
-    ) -> Result<Response<super::BlobMeta>, Status> {
+    ) -> Result<Response<super::StatBlobResponse>, Status> {
         let rq = request.into_inner();
         let req_digest = rq
             .digest
@@ -101,7 +101,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
             .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
 
         match self.blob_service.has(&req_digest).await {
-            Ok(true) => Ok(Response::new(super::BlobMeta::default())),
+            Ok(true) => Ok(Response::new(super::StatBlobResponse::default())),
             Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
             Err(e) => Err(e.into()),
         }