about summary refs log tree commit diff
path: root/tvix/store/src/proto/grpc_blobservice_wrapper.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-05-11T12·49+0300
committerflokli <flokli@flokli.de>2023-05-11T14·27+0000
commit616fa4476f93e1782e68dc713e9e8cb77a426c7d (patch)
treef76a43e95c75d848d079706fbccfd442210ebebc /tvix/store/src/proto/grpc_blobservice_wrapper.rs
parentb22b685f0b2524c088deacbf4e80e7b7c73b5afc (diff)
refactor(tvix/store): remove ChunkService r/6133
Whether chunking is involved or not, is an implementation detail of each
Blobstore. Consumers of a whole blob shouldn't need to worry about that.
It currently is not visible in the gRPC interface either. It
shouldn't bleed into everything.

Let the BlobService trait provide `open_read` and `open_write` methods,
which return handles providing io::Read or io::Write, and leave the
details up to the implementation.

This means, our custom BlobReader module can go away, and all the
chunking bits in there, too.

In the future, we might still want to add more chunking-aware syncing,
but as a syncing strategy some stores can expose, not as a fundamental
protocol component.

This currently needs "SyncReadIntoAsyncRead", taken and vendored in from
https://github.com/tokio-rs/tokio/pull/5669.
It provides a AsyncRead for a sync Read, which is necessary to connect
our (sync) BlobReader interface to a GRPC server implementation.

As an alternative, we could also make the BlobReader itself async, and
let consumers of the trait (EvalIO) deal with the async-ness, but this
is less of a change for now.

In terms of vendoring, I initially tried to move our tokio crate to
these commits, but ended up in version incompatibilities, so let's
vendor it in for now.

Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix/store/src/proto/grpc_blobservice_wrapper.rs')
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs242
1 files changed, 84 insertions, 158 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index e891caa0268e..d1e98a5b7953 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,54 +1,34 @@
-use std::collections::VecDeque;
-
 use crate::{
-    blobservice::BlobService,
-    chunkservice::{read_all_and_chunk, update_hasher, ChunkService},
-    Error,
+    blobservice::{BlobService, BlobWriter},
+    proto::sync_read_into_async_read::SyncReadIntoAsyncRead,
 };
 use data_encoding::BASE64;
-use tokio::{sync::mpsc::channel, task};
-use tokio_stream::{wrappers::ReceiverStream, StreamExt};
+use std::{collections::VecDeque, io, pin::Pin};
+use tokio::task;
+use tokio_stream::StreamExt;
+use tokio_util::io::ReaderStream;
 use tonic::{async_trait, Request, Response, Status, Streaming};
-use tracing::{debug, instrument, warn};
+use tracing::{instrument, warn};
 
-pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> {
+pub struct GRPCBlobServiceWrapper<BS: BlobService> {
     blob_service: BS,
-    chunk_service: CS,
 }
 
-impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
-    pub fn new(blob_service: BS, chunk_service: CS) -> Self {
+impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> {
+    fn from(value: BS) -> Self {
         Self {
-            blob_service,
-            chunk_service,
-        }
-    }
-
-    // upload the chunk to the chunk service, and return its digest (or an error) when done.
-    #[instrument(skip(chunk_service))]
-    fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
-        let mut hasher = blake3::Hasher::new();
-        update_hasher(&mut hasher, &chunk_data);
-        let digest = hasher.finalize();
-
-        if chunk_service.has(digest.as_bytes())? {
-            debug!("already has chunk, skipping");
+            blob_service: value,
         }
-        let digest_resp = chunk_service.put(chunk_data)?;
-
-        assert_eq!(&digest_resp, digest.as_bytes());
-
-        Ok(digest.as_bytes().to_vec())
     }
 }
 
 #[async_trait]
-impl<
-        BS: BlobService + Send + Sync + Clone + 'static,
-        CS: ChunkService + Send + Sync + Clone + 'static,
-    > super::blob_service_server::BlobService for GRPCBlobServiceWrapper<BS, CS>
+impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server::BlobService
+    for GRPCBlobServiceWrapper<BS>
 {
-    type ReadStream = ReceiverStream<Result<super::BlobChunk, Status>>;
+    // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
+    type ReadStream =
+        Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>;
 
     #[instrument(skip(self))]
     async fn stat(
@@ -56,12 +36,22 @@ impl<
         request: Request<super::StatBlobRequest>,
     ) -> Result<Response<super::BlobMeta>, Status> {
         let rq = request.into_inner();
-        match self.blob_service.stat(&rq) {
-            Ok(None) => Err(Status::not_found(format!(
+        let req_digest: [u8; 32] = rq
+            .digest
+            .clone()
+            .try_into()
+            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
+
+        if rq.include_chunks || rq.include_bao {
+            return Err(Status::internal("not implemented"));
+        }
+
+        match self.blob_service.has(&req_digest) {
+            Ok(true) => Ok(Response::new(super::BlobMeta::default())),
+            Ok(false) => Err(Status::not_found(format!(
                 "blob {} not found",
-                BASE64.encode(&rq.digest)
+                BASE64.encode(&req_digest)
             ))),
-            Ok(Some(blob_meta)) => Ok(Response::new(blob_meta)),
             Err(e) => Err(e.into()),
         }
     }
@@ -71,99 +61,38 @@ impl<
         &self,
         request: Request<super::ReadBlobRequest>,
     ) -> Result<Response<Self::ReadStream>, Status> {
-        let req = request.into_inner();
-        let (tx, rx) = channel(5);
+        let rq = request.into_inner();
 
-        let req_digest: [u8; 32] = req
+        let req_digest: [u8; 32] = rq
             .digest
+            .clone()
             .try_into()
-            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
-
-        // query the blob service for more detailed blob info
-        let stat_resp = self.blob_service.stat(&super::StatBlobRequest {
-            digest: req_digest.to_vec(),
-            include_chunks: true,
-            ..Default::default()
-        })?;
-
-        match stat_resp {
-            None => {
-                // If the stat didn't return any blobmeta, the client might
-                // still have asked for a single chunk to be read.
-                // Check the chunkstore.
-                if let Some(data) = self.chunk_service.get(&req_digest)? {
-                    // We already know the hash matches, and contrary to
-                    // iterating over a blobmeta, we can't know the size,
-                    // so send the contents of that chunk over,
-                    // as the first (and only) element of the stream.
-                    task::spawn(async move {
-                        let res = Ok(super::BlobChunk { data });
-                        // send the result to the client. If the client already left, that's also fine.
-                        if (tx.send(res).await).is_err() {
-                            debug!("receiver dropped");
-                        }
-                    });
-                } else {
-                    return Err(Status::not_found(format!(
-                        "blob {} not found",
-                        BASE64.encode(&req_digest),
-                    )));
-                }
-            }
-            Some(blobmeta) => {
-                let chunk_client = self.chunk_service.clone();
-
-                // TODO: use BlobReader?
-                // But then we might not be able to send compressed chunks as-is.
-                // Might require implementing https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html for it
-                // first, so we can .next().await in here.
-
-                task::spawn(async move {
-                    for chunkmeta in blobmeta.chunks {
-                        // request chunk.
-                        // We don't need to validate the digest again, as
-                        // that's required for all implementations of ChunkService.
-                        // TODO: handle error
-                        let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap();
-                        let res = match chunk_client.get(chunkmeta_digest) {
-                            Err(e) => Err(e.into()),
-                            // TODO: make this a separate error type
-                            Ok(None) => Err(Error::StorageError(format!(
-                                "consistency error: chunk {} for blob {} not found",
-                                BASE64.encode(chunkmeta_digest),
-                                BASE64.encode(&req_digest),
-                            ))
-                            .into()),
-                            Ok(Some(data)) => {
-                                // We already know the hash matches, but also
-                                // check the size matches what chunkmeta said.
-                                if data.len() as u32 != chunkmeta.size {
-                                    Err(Error::StorageError(format!(
-                                        "consistency error: chunk {} for blob {} has wrong size, expected {}, got {}",
-                                        BASE64.encode(chunkmeta_digest),
-                                        BASE64.encode(&req_digest),
-                                        chunkmeta.size,
-                                        data.len(),
-                                    )).into())
-                                } else {
-                                    // send out the current chunk
-                                    // TODO: we might want to break this up further if too big?
-                                    Ok(super::BlobChunk { data })
-                                }
-                            }
-                        };
-                        // send the result to the client
-                        if (tx.send(res).await).is_err() {
-                            debug!("receiver dropped");
-                            break;
-                        }
+            .map_err(|_| Status::invalid_argument("invalid digest length"))?;
+
+        match self.blob_service.open_read(&req_digest) {
+            Ok(Some(reader)) => {
+                let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into();
+
+                fn stream_mapper(
+                    x: Result<bytes::Bytes, io::Error>,
+                ) -> Result<super::BlobChunk, Status> {
+                    match x {
+                        Ok(bytes) => Ok(super::BlobChunk {
+                            data: bytes.to_vec(),
+                        }),
+                        Err(e) => Err(Status::from(e)),
                     }
-                });
+                }
+
+                let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper);
+                Ok(Response::new(Box::pin(chunks_stream)))
             }
+            Ok(None) => Err(Status::not_found(format!(
+                "blob {} not found",
+                BASE64.encode(&rq.digest)
+            ))),
+            Err(e) => Err(e.into()),
         }
-
-        let receiver_stream = ReceiverStream::new(rx);
-        Ok(Response::new(receiver_stream))
     }
 
     #[instrument(skip(self))]
@@ -180,37 +109,34 @@ impl<
 
         let data_reader = tokio_util::io::StreamReader::new(data_stream);
 
-        // TODO: can we get rid of this clone?
-        let chunk_service = self.chunk_service.clone();
-
-        let (blob_digest, blob_meta) =
-            task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> {
-                // feed read_all_and_chunk a (sync) reader to the data retrieved from the stream.
-                read_all_and_chunk(
-                    &chunk_service,
-                    tokio_util::io::SyncIoBridge::new(data_reader),
-                )
-            })
-            .await
-            .map_err(|e| Status::internal(e.to_string()))??;
-
-        // upload blobmeta if not there yet
-        if self
+        // prepare a writer, which we'll use in the blocking task below.
+        let mut writer = self
             .blob_service
-            .stat(&super::StatBlobRequest {
-                digest: blob_digest.to_vec(),
-                include_chunks: false,
-                include_bao: false,
-            })?
-            .is_none()
-        {
-            // upload blobmeta
-            self.blob_service.put(&blob_digest, blob_meta)?;
-        }
-
-        // return to client.
-        Ok(Response::new(super::PutBlobResponse {
-            digest: blob_digest,
-        }))
+            .open_write()
+            .map_err(|e| Status::internal(format!("unable to open for write: {}", e)))?;
+
+        let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> {
+            // construct a sync reader to the data
+            let mut reader = tokio_util::io::SyncIoBridge::new(data_reader);
+
+            io::copy(&mut reader, &mut writer).map_err(|e| {
+                warn!("error copying: {}", e);
+                Status::internal("error copying")
+            })?;
+
+            let digest = writer
+                .close()
+                .map_err(|e| {
+                    warn!("error closing stream: {}", e);
+                    Status::internal("error closing stream")
+                })?
+                .to_vec();
+
+            Ok(super::PutBlobResponse { digest })
+        })
+        .await
+        .map_err(|_| Status::internal("failed to wait for task"))??;
+
+        Ok(Response::new(result))
     }
 }