diff options
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/src/errors.rs | 10 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 231 | ||||
-rw-r--r-- | tvix/store/src/proto/mod.rs | 3 |
3 files changed, 244 insertions, 0 deletions
diff --git a/tvix/store/src/errors.rs b/tvix/store/src/errors.rs index 7f7474428400..25e87c8aa239 100644 --- a/tvix/store/src/errors.rs +++ b/tvix/store/src/errors.rs @@ -1,5 +1,6 @@ use std::sync::PoisonError; use thiserror::Error; +use tonic::Status; /// Errors related to communication with the store. #[derive(Debug, Error)] @@ -16,3 +17,12 @@ impl<T> From<PoisonError<T>> for Error { Error::StorageError(value.to_string()) } } + +impl From<Error> for Status { + fn from(value: Error) -> Self { + match value { + Error::InvalidRequest(msg) => Status::invalid_argument(msg), + Error::StorageError(msg) => Status::data_loss(format!("storage error: {}", msg)), + } + } +} diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs new file mode 100644 index 000000000000..6204fc14005b --- /dev/null +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -0,0 +1,231 @@ +use crate::{blobservice::BlobService, chunkservice::ChunkService, Error}; +use data_encoding::BASE64; +use tokio::{sync::mpsc::channel, task}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{async_trait, Request, Response, Status, Streaming}; +use tracing::{debug, instrument}; + +pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> { + blob_service: BS, + chunk_service: CS, +} + +impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> { + pub fn new(blob_service: BS, chunk_service: CS) -> Self { + Self { + blob_service, + chunk_service, + } + } +} + +#[async_trait] +impl< + BS: BlobService + Send + Sync + Clone + 'static, + CS: ChunkService + Send + Sync + Clone + 'static, + > super::blob_service_server::BlobService for GRPCBlobServiceWrapper<BS, CS> +{ + type ReadStream = ReceiverStream<Result<super::BlobChunk, Status>>; + + #[instrument(skip(self))] + async fn stat( + &self, + request: Request<super::StatBlobRequest>, + ) -> Result<Response<super::BlobMeta>, Status> { + let rq = request.into_inner(); + match self.blob_service.stat(&rq) { + Ok(None) => Err(Status::not_found(format!( + "blob {} not found", + BASE64.encode(&rq.digest) + ))), + Ok(Some(blob_meta)) => Ok(Response::new(blob_meta)), + Err(e) => Err(e.into()), + } + } + + #[instrument(skip(self))] + async fn read( + &self, + request: Request<super::ReadBlobRequest>, + ) -> Result<Response<Self::ReadStream>, Status> { + let req = request.into_inner(); + let (tx, rx) = channel(5); + + // query the chunk service for more detailed blob info + let stat_resp = self.blob_service.stat(&super::StatBlobRequest { + digest: req.digest.to_vec(), + include_chunks: true, + ..Default::default() + })?; + + match stat_resp { + None => { + // If the stat didn't return any blobmeta, the client might + // still have asked for a single chunk to be read. + // Check the chunkstore. + if let Some(data) = self.chunk_service.get(&req.digest)? { + // We already know the hash matches, and contrary to + // iterating over a blobmeta, we can't know the size, + // so send the contents of that chunk over, + // as the first (and only) element of the stream. + task::spawn(async move { + let res = Ok(super::BlobChunk { data }); + // send the result to the client. If the client already left, that's also fine. + if (tx.send(res).await).is_err() { + debug!("receiver dropped"); + } + }); + } else { + return Err(Status::not_found(format!( + "blob {} not found", + BASE64.encode(&req.digest), + ))); + } + } + Some(blobmeta) => { + let chunk_client = self.chunk_service.clone(); + + // TODO: use BlobReader? + // But then we might not be able to send compressed chunks as-is. + // Might require implementing https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html for it + // first, so we can .next().await in here. + + task::spawn(async move { + for chunkmeta in blobmeta.chunks { + // request chunk. + // We don't need to validate the digest again, as + // that's required for all implementations of ChunkService. + let res = match chunk_client.get(&chunkmeta.digest) { + Err(e) => Err(e.into()), + // TODO: make this a separate error type + Ok(None) => Err(Error::StorageError(format!( + "consistency error: chunk {} for blob {} not found", + BASE64.encode(&chunkmeta.digest), + BASE64.encode(&req.digest), + )) + .into()), + Ok(Some(data)) => { + // We already know the hash matches, but also + // check the size matches what chunkmeta said. + if data.len() as u32 != chunkmeta.size { + Err(Error::StorageError(format!( + "consistency error: chunk {} for blob {} has wrong size, expected {}, got {}", + BASE64.encode(&chunkmeta.digest), + BASE64.encode(&req.digest), + chunkmeta.size, + data.len(), + )).into()) + } else { + // send out the current chunk + // TODO: we might want to break this up further if too big? + Ok(super::BlobChunk { data }) + } + } + }; + // send the result to the client + if (tx.send(res).await).is_err() { + debug!("receiver dropped"); + break; + } + } + }); + } + } + + let receiver_stream = ReceiverStream::new(rx); + Ok(Response::new(receiver_stream)) + } + + #[instrument(skip(self))] + async fn put( + &self, + request: Request<Streaming<super::BlobChunk>>, + ) -> Result<Response<super::PutBlobResponse>, Status> { + let mut req_inner = request.into_inner(); + + // TODO: for now, we collect all Chunks into a large Vec<u8>, and then + // pass it to a (content-defined) Chunker. + // This is because the fastcdc crate currently operates on byte slices, + // not on something implementing [std::io::Read]. + // (see https://github.com/nlfiedler/fastcdc-rs/issues/17) + + let mut blob_contents: Vec<u8> = Vec::new(); + + while let Some(mut blob_chunk) = req_inner.message().await? { + blob_contents.append(&mut blob_chunk.data); + } + + // initialize a new chunker + // TODO: play with chunking sizes + let chunker = fastcdc::FastCDC::new( + &blob_contents, + 64 * 1024 / 4, // min + 64 * 1024, // avg + 64 * 1024 * 4, // max + ); + + // initialize blake3 hashers. chunk_hasher is used and reset for each + // chunk, blob_hasher calculates the hash of the whole blob. + let mut chunk_hasher = blake3::Hasher::new(); + let mut blob_hasher = blake3::Hasher::new(); + + // start a BlobMeta, which we'll fill while looping over the chunks + let mut blob_meta = super::BlobMeta::default(); + + // loop over all the chunks + for chunk in chunker { + // extract the data itself + let chunk_data: Vec<u8> = + blob_contents[chunk.offset..chunk.offset + chunk.length].to_vec(); + + // calculate the digest of that chunk + chunk_hasher.update(&chunk_data); + let chunk_digest = chunk_hasher.finalize(); + chunk_hasher.reset(); + + // also update blob_hasher + blob_hasher.update(&chunk_data); + + // check if chunk is already in db, and if not, insert. + match self.chunk_service.has(chunk_digest.as_bytes()) { + Err(e) => { + return Err(Error::StorageError(format!( + "unable to check if chunk {} exists: {}", + BASE64.encode(chunk_digest.as_bytes()), + e + )) + .into()); + } + Ok(has_chunk) => { + if !has_chunk { + if let Err(e) = self.chunk_service.put(chunk_data.to_vec()) { + return Err(Error::StorageError(format!( + "unable to store chunk {}: {}", + BASE64.encode(chunk_digest.as_bytes()), + e + )) + .into()); + } + } + } + } + + // add chunk to blobmeta + blob_meta.chunks.push(super::blob_meta::ChunkMeta { + digest: chunk_digest.as_bytes().to_vec(), + size: chunk.length as u32, + }); + } + + // done reading data, finalize blob_hasher and insert blobmeta. + let blob_digest = blob_hasher.finalize(); + + // TODO: don't store if we already have it (potentially with different chunking) + match self.blob_service.put(blob_digest.as_bytes(), blob_meta) { + Ok(()) => Ok(Response::new(super::PutBlobResponse { + digest: blob_digest.as_bytes().to_vec(), + })), + Err(e) => Err(e.into()), + } + } +} diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs index 12a6bbae907b..b58766d2ea7c 100644 --- a/tvix/store/src/proto/mod.rs +++ b/tvix/store/src/proto/mod.rs @@ -7,6 +7,9 @@ use prost::Message; use nix_compat::store_path::{ParseStorePathError, StorePath}; +mod grpc_blobservice_wrapper; +pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; + tonic::include_proto!("tvix.store.v1"); #[cfg(feature = "reflection")] |