diff options
author | Florian Klink <flokli@flokli.de> | 2023-11-05T08·45+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-11-05T15·13+0000 |
commit | 67999f0dcf715962b8f56c9bfd8c5c403213cb02 (patch) | |
tree | b5c6e5f793f22c5ca1af99309d31541d7c37fbb4 /tvix/castore | |
parent | b921e3a7e38186204da02de4860803f60022ec4e (diff) |
feat(tvix/castore): extend blobstore protos for verified streaming r/6954
This pdates the proto docstrings a bit, especially w.r.t. verified streaming. It also adds send_chunks, send_bao fields to StatBlobRequest (renamed from BlobMeta) Change-Id: I590cc8646d86b73bca9f38a9b6d9ea15e4df5cb6 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9951 Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/castore')
-rw-r--r-- | tvix/castore/protos/rpc_blobstore.proto | 57 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 1 | ||||
-rw-r--r-- | tvix/castore/src/proto/grpc_blobservice_wrapper.rs | 4 |
3 files changed, 48 insertions, 14 deletions
diff --git a/tvix/castore/protos/rpc_blobstore.proto b/tvix/castore/protos/rpc_blobstore.proto index 458803fd57cc..93c02d397c44 100644 --- a/tvix/castore/protos/rpc_blobstore.proto +++ b/tvix/castore/protos/rpc_blobstore.proto @@ -6,33 +6,66 @@ package tvix.castore.v1; option go_package = "code.tvl.fyi/tvix/castore-go;castorev1"; +// BlobService allows reading (or uploading) content-addressed blobs of data. +// BLAKE3 is used as a hashing function for the data. Uploading a blob will +// return the BLAKE3 digest of it, and that's the identifier used to Read/Stat +// them too. service BlobService { - // In the future, Stat will expose more metadata about a given blob, - // such as more granular chunking, baos. - // For now, it's only used to check for the existence of a blob, as asking - // this for a non-existing Blob will return a Status::not_found gRPC error. - rpc Stat(StatBlobRequest) returns (BlobMeta); - - // Read returns a stream of BlobChunk, which is just a stream of bytes with - // the digest specified in ReadBlobRequest. - // + // Stat can be used to check for the existence of a blob, as well as + // gathering more data about it, like more granular chunking information + // or baos. + // Server implementations are not required to provide more granular chunking + // information, especially if the digest specified in [StatBlobRequest] is + // already a chunk of a blob. + rpc Stat(StatBlobRequest) returns (StatBlobResponse); + + // Read allows reading (all) data of a blob/chunk by the BLAKE3 digest of + // its contents. + // If the backend communicated more granular chunks in the `Stat` request, + // this can also be used to read chunks. + // This request returns a stream of BlobChunk, which is just a container for + // a stream of bytes. // The server may decide on whatever chunking it may seem fit as a size for - // the individual BlobChunk sent in the response stream. + // the individual BlobChunk sent in the response stream, this is mostly to + // keep individual messages at a manageable size. rpc Read(ReadBlobRequest) returns (stream BlobChunk); // Put uploads a Blob, by reading a stream of bytes. // // The way the data is chunked up in individual BlobChunk messages sent in - // the stream has no effect on how the server ends up chunking blobs up. + // the stream has no effect on how the server ends up chunking blobs up, if + // it does at all. rpc Put(stream BlobChunk) returns (PutBlobResponse); } message StatBlobRequest { // The blake3 digest of the blob requested bytes digest = 1; + + // Whether the server should reply with a list of more granular chunks. + bool send_chunks = 2; + + // Whether the server should reply with a bao. + bool send_bao = 3; } -message BlobMeta { +message StatBlobResponse { + // If `send_chunks` was set to true, this MAY contain a list of more + // granular chunks, which then may be read individually via the `Read` + // method. + repeated ChunkMeta chunks = 2; + + message ChunkMeta { + // Digest of that specific chunk + bytes digest = 1; + + // Length of that chunk, in bytes. + uint64 size = 2; + } + + // If `send_bao` was set to true, this MAY contain a outboard bao. + // The exact format and message types here will still be fleshed out. + bytes bao = 3; } message ReadBlobRequest { diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index d0f619fcebcb..8d3bdfa8fa25 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -56,6 +56,7 @@ impl BlobService for GRPCBlobService { let resp = grpc_client .stat(proto::StatBlobRequest { digest: digest.clone().into(), + ..Default::default() }) .await; diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs index 93db1deef69a..e7092bec1481 100644 --- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs @@ -93,7 +93,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { async fn stat( &self, request: Request<super::StatBlobRequest>, - ) -> Result<Response<super::BlobMeta>, Status> { + ) -> Result<Response<super::StatBlobResponse>, Status> { let rq = request.into_inner(); let req_digest = rq .digest @@ -101,7 +101,7 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { .map_err(|_e| Status::invalid_argument("invalid digest length"))?; match self.blob_service.has(&req_digest).await { - Ok(true) => Ok(Response::new(super::BlobMeta::default())), + Ok(true) => Ok(Response::new(super::StatBlobResponse::default())), Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))), Err(e) => Err(e.into()), } |