From 21782d24f5f16942cb67fee6c098aa15bdf88d0c Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sat, 11 Mar 2023 21:23:28 +0100 Subject: feat(tvix/store): drop BlobWriter All code initially using this has been replaced by the simpler and more performant implementation with StreamCDC and read_all_and_chunk. Change-Id: I08889e9a6984de91c5debcf2b612cb68ae5072d1 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8265 Autosubmit: flokli Reviewed-by: tazjin Reviewed-by: raitobezarius Tested-by: BuildkiteCI --- tvix/store/src/blobwriter.rs | 140 ------------------------------------------- tvix/store/src/lib.rs | 2 - 2 files changed, 142 deletions(-) delete mode 100644 tvix/store/src/blobwriter.rs diff --git a/tvix/store/src/blobwriter.rs b/tvix/store/src/blobwriter.rs deleted file mode 100644 index beade0c9a11b..000000000000 --- a/tvix/store/src/blobwriter.rs +++ /dev/null @@ -1,140 +0,0 @@ -use crate::chunkservice::{update_hasher, upload_chunk, ChunkService}; -use crate::{proto, Error}; -use rayon::prelude::*; -use tracing::instrument; - -pub struct BlobWriter<'a, CS: ChunkService> { - chunk_service: &'a CS, - - blob_hasher: blake3::Hasher, - - blob_meta: proto::BlobMeta, - - // filled with data from previous writes that didn't end up producing a full chunk. - buf: Vec, -} - -impl<'a, CS: ChunkService> BlobWriter<'a, CS> { - pub fn new(chunk_service: &'a CS) -> Self { - Self { - chunk_service, - blob_hasher: blake3::Hasher::new(), - blob_meta: proto::BlobMeta::default(), - buf: vec![], - } - } - - // Return the digest of the blob, as well as the blobmeta containing info of all the chunks, - // or an error if there's still bytes left to be flushed. - // In case there was still data to be written a last unfinalized chunk, - // it's written as well. - #[instrument(skip(self))] - pub fn finalize(&mut self) -> Result<(Vec, proto::BlobMeta), Error> { - // If there's something still left in self.buf, upload this as the last - // chunk to the chunk service and record it in BlobMeta. - if !self.buf.is_empty() { - // Also upload the last chunk (what's left in `self.buf`) to the chunk - // service and record it in BlobMeta. - let buf_len = self.buf.len() as u32; - let chunk_digest = upload_chunk(self.chunk_service, self.buf.clone())?; - - self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta { - digest: chunk_digest, - size: buf_len, - }); - - self.buf.clear(); - } - return Ok(( - self.blob_hasher.finalize().as_bytes().to_vec(), - self.blob_meta.clone(), - )); - } -} - -/// This chunks up all data written using fastcdc, uploads all chunks to the -// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks. -impl std::io::Write for BlobWriter<'_, CS> { - fn write(&mut self, input_buf: &[u8]) -> std::io::Result { - // calculate input_buf.len(), we need to return that later. - let input_buf_len = input_buf.len(); - - // update blob hash - update_hasher(&mut self.blob_hasher, input_buf); - - // prepend buf with existing data (from self.buf) - let buf: Vec = { - let mut b = Vec::new(); - b.append(&mut self.buf); - b.append(&mut input_buf.to_vec()); - b - }; - - // TODO: play with chunking sizes - let chunker_avg_size = 64 * 1024; - let chunker_min_size = chunker_avg_size / 4; - let chunker_max_size = chunker_avg_size * 4; - - // initialize a chunker with the buffer - let chunker = fastcdc::v2020::FastCDC::new( - &buf, - chunker_min_size, - chunker_avg_size, - chunker_max_size, - ); - - // assemble a list of byte slices to be uploaded - let mut chunk_slices: Vec<&[u8]> = Vec::new(); - - // ask the chunker for cutting points in the buffer. - let mut start_pos = 0_usize; - let rest = loop { - // ask the chunker for the next cutting point. - let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos); - - // whenever the last cut point is pointing to the end of the buffer, - // return that from the loop. - // We don't know if the chunker decided to cut here simply because it was - // at the end of the buffer, or if it would also cut if there - // were more data. - // - // Split off all previous chunks and keep this chunk data in the buffer. - if end_pos == buf.len() { - break &buf[start_pos..]; - } - - // if it's an intermediate chunk, add it to chunk_slices. - // We'll later upload all of them in batch. - chunk_slices.push(&buf[start_pos..end_pos]); - - // advance start_pos over the processed chunk. - start_pos = end_pos; - }; - - // Upload all chunks to the chunk service and map them to a ChunkMeta - let blob_meta_chunks: Vec> = chunk_slices - .into_par_iter() - .map(|chunk_slice| { - let chunk_digest = upload_chunk(self.chunk_service, chunk_slice.to_vec())?; - - Ok(proto::blob_meta::ChunkMeta { - digest: chunk_digest, - size: chunk_slice.len() as u32, - }) - }) - .collect(); - - self.blob_meta.chunks = blob_meta_chunks - .into_iter() - .collect::, Error>>()?; - - // update buf to point to the rest we didn't upload. - self.buf = rest.to_vec(); - - Ok(input_buf_len) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index 3e7852134810..aac650b1ca8b 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,5 +1,4 @@ mod blobreader; -mod blobwriter; mod errors; pub mod blobservice; @@ -11,7 +10,6 @@ pub mod pathinfoservice; pub mod proto; pub use blobreader::BlobReader; -pub use blobwriter::BlobWriter; pub use errors::Error; #[cfg(test)] -- cgit 1.4.1