diff options
Diffstat (limited to 'tvix/store/src/proto/grpc_blobservice_wrapper.rs')
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 242 |
1 files changed, 84 insertions, 158 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index e891caa0268e..d1e98a5b7953 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,54 +1,34 @@ -use std::collections::VecDeque; - use crate::{ - blobservice::BlobService, - chunkservice::{read_all_and_chunk, update_hasher, ChunkService}, - Error, + blobservice::{BlobService, BlobWriter}, + proto::sync_read_into_async_read::SyncReadIntoAsyncRead, }; use data_encoding::BASE64; -use tokio::{sync::mpsc::channel, task}; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use std::{collections::VecDeque, io, pin::Pin}; +use tokio::task; +use tokio_stream::StreamExt; +use tokio_util::io::ReaderStream; use tonic::{async_trait, Request, Response, Status, Streaming}; -use tracing::{debug, instrument, warn}; +use tracing::{instrument, warn}; -pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> { +pub struct GRPCBlobServiceWrapper<BS: BlobService> { blob_service: BS, - chunk_service: CS, } -impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> { - pub fn new(blob_service: BS, chunk_service: CS) -> Self { +impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> { + fn from(value: BS) -> Self { Self { - blob_service, - chunk_service, - } - } - - // upload the chunk to the chunk service, and return its digest (or an error) when done. - #[instrument(skip(chunk_service))] - fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> { - let mut hasher = blake3::Hasher::new(); - update_hasher(&mut hasher, &chunk_data); - let digest = hasher.finalize(); - - if chunk_service.has(digest.as_bytes())? { - debug!("already has chunk, skipping"); + blob_service: value, } - let digest_resp = chunk_service.put(chunk_data)?; - - assert_eq!(&digest_resp, digest.as_bytes()); - - Ok(digest.as_bytes().to_vec()) } } #[async_trait] -impl< - BS: BlobService + Send + Sync + Clone + 'static, - CS: ChunkService + Send + Sync + Clone + 'static, - > super::blob_service_server::BlobService for GRPCBlobServiceWrapper<BS, CS> +impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server::BlobService + for GRPCBlobServiceWrapper<BS> { - type ReadStream = ReceiverStream<Result<super::BlobChunk, Status>>; + // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 + type ReadStream = + Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>; #[instrument(skip(self))] async fn stat( @@ -56,12 +36,22 @@ impl< request: Request<super::StatBlobRequest>, ) -> Result<Response<super::BlobMeta>, Status> { let rq = request.into_inner(); - match self.blob_service.stat(&rq) { - Ok(None) => Err(Status::not_found(format!( + let req_digest: [u8; 32] = rq + .digest + .clone() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + if rq.include_chunks || rq.include_bao { + return Err(Status::internal("not implemented")); + } + + match self.blob_service.has(&req_digest) { + Ok(true) => Ok(Response::new(super::BlobMeta::default())), + Ok(false) => Err(Status::not_found(format!( "blob {} not found", - BASE64.encode(&rq.digest) + BASE64.encode(&req_digest) ))), - Ok(Some(blob_meta)) => Ok(Response::new(blob_meta)), Err(e) => Err(e.into()), } } @@ -71,99 +61,38 @@ impl< &self, request: Request<super::ReadBlobRequest>, ) -> Result<Response<Self::ReadStream>, Status> { - let req = request.into_inner(); - let (tx, rx) = channel(5); + let rq = request.into_inner(); - let req_digest: [u8; 32] = req + let req_digest: [u8; 32] = rq .digest + .clone() .try_into() - .map_err(|_e| Status::invalid_argument("invalid digest length"))?; - - // query the blob service for more detailed blob info - let stat_resp = self.blob_service.stat(&super::StatBlobRequest { - digest: req_digest.to_vec(), - include_chunks: true, - ..Default::default() - })?; - - match stat_resp { - None => { - // If the stat didn't return any blobmeta, the client might - // still have asked for a single chunk to be read. - // Check the chunkstore. - if let Some(data) = self.chunk_service.get(&req_digest)? { - // We already know the hash matches, and contrary to - // iterating over a blobmeta, we can't know the size, - // so send the contents of that chunk over, - // as the first (and only) element of the stream. - task::spawn(async move { - let res = Ok(super::BlobChunk { data }); - // send the result to the client. If the client already left, that's also fine. - if (tx.send(res).await).is_err() { - debug!("receiver dropped"); - } - }); - } else { - return Err(Status::not_found(format!( - "blob {} not found", - BASE64.encode(&req_digest), - ))); - } - } - Some(blobmeta) => { - let chunk_client = self.chunk_service.clone(); - - // TODO: use BlobReader? - // But then we might not be able to send compressed chunks as-is. - // Might require implementing https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html for it - // first, so we can .next().await in here. - - task::spawn(async move { - for chunkmeta in blobmeta.chunks { - // request chunk. - // We don't need to validate the digest again, as - // that's required for all implementations of ChunkService. - // TODO: handle error - let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap(); - let res = match chunk_client.get(chunkmeta_digest) { - Err(e) => Err(e.into()), - // TODO: make this a separate error type - Ok(None) => Err(Error::StorageError(format!( - "consistency error: chunk {} for blob {} not found", - BASE64.encode(chunkmeta_digest), - BASE64.encode(&req_digest), - )) - .into()), - Ok(Some(data)) => { - // We already know the hash matches, but also - // check the size matches what chunkmeta said. - if data.len() as u32 != chunkmeta.size { - Err(Error::StorageError(format!( - "consistency error: chunk {} for blob {} has wrong size, expected {}, got {}", - BASE64.encode(chunkmeta_digest), - BASE64.encode(&req_digest), - chunkmeta.size, - data.len(), - )).into()) - } else { - // send out the current chunk - // TODO: we might want to break this up further if too big? - Ok(super::BlobChunk { data }) - } - } - }; - // send the result to the client - if (tx.send(res).await).is_err() { - debug!("receiver dropped"); - break; - } + .map_err(|_| Status::invalid_argument("invalid digest length"))?; + + match self.blob_service.open_read(&req_digest) { + Ok(Some(reader)) => { + let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into(); + + fn stream_mapper( + x: Result<bytes::Bytes, io::Error>, + ) -> Result<super::BlobChunk, Status> { + match x { + Ok(bytes) => Ok(super::BlobChunk { + data: bytes.to_vec(), + }), + Err(e) => Err(Status::from(e)), } - }); + } + + let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper); + Ok(Response::new(Box::pin(chunks_stream))) } + Ok(None) => Err(Status::not_found(format!( + "blob {} not found", + BASE64.encode(&rq.digest) + ))), + Err(e) => Err(e.into()), } - - let receiver_stream = ReceiverStream::new(rx); - Ok(Response::new(receiver_stream)) } #[instrument(skip(self))] @@ -180,37 +109,34 @@ impl< let data_reader = tokio_util::io::StreamReader::new(data_stream); - // TODO: can we get rid of this clone? - let chunk_service = self.chunk_service.clone(); - - let (blob_digest, blob_meta) = - task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> { - // feed read_all_and_chunk a (sync) reader to the data retrieved from the stream. - read_all_and_chunk( - &chunk_service, - tokio_util::io::SyncIoBridge::new(data_reader), - ) - }) - .await - .map_err(|e| Status::internal(e.to_string()))??; - - // upload blobmeta if not there yet - if self + // prepare a writer, which we'll use in the blocking task below. + let mut writer = self .blob_service - .stat(&super::StatBlobRequest { - digest: blob_digest.to_vec(), - include_chunks: false, - include_bao: false, - })? - .is_none() - { - // upload blobmeta - self.blob_service.put(&blob_digest, blob_meta)?; - } - - // return to client. - Ok(Response::new(super::PutBlobResponse { - digest: blob_digest, - })) + .open_write() + .map_err(|e| Status::internal(format!("unable to open for write: {}", e)))?; + + let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> { + // construct a sync reader to the data + let mut reader = tokio_util::io::SyncIoBridge::new(data_reader); + + io::copy(&mut reader, &mut writer).map_err(|e| { + warn!("error copying: {}", e); + Status::internal("error copying") + })?; + + let digest = writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") + })? + .to_vec(); + + Ok(super::PutBlobResponse { digest }) + }) + .await + .map_err(|_| Status::internal("failed to wait for task"))??; + + Ok(Response::new(result)) } } |