use crate::chunkservice::ChunkService; use crate::{proto, Error}; use tracing::{debug, instrument}; pub struct BlobWriter<'a, CS: ChunkService> { chunk_service: &'a CS, blob_hasher: blake3::Hasher, blob_meta: proto::BlobMeta, // filled with data from previous writes that didn't end up producing a full chunk. buf: Vec, } impl<'a, CS: ChunkService> BlobWriter<'a, CS> { pub fn new(chunk_service: &'a CS) -> Self { Self { chunk_service, blob_hasher: blake3::Hasher::new(), blob_meta: proto::BlobMeta::default(), buf: vec![], } } // Return the digest of the blob, as well as the blobmeta containing info of all the chunks, // or an error if there's still bytes left to be flushed. // In case there was still data to be written a last unfinalized chunk, // it's written as well. #[instrument(skip(self))] pub fn finalize(&mut self) -> Result<(Vec, proto::BlobMeta), Error> { // If there's something still left in self.buf, upload this as the last // chunk to the chunk service and record it in BlobMeta. if !self.buf.is_empty() { // Also upload the last chunk (what's left in `self.buf`) to the chunk // service and record it in BlobMeta. let buf_len = self.buf.len() as u32; let chunk_digest = self.upload_chunk(self.buf.clone())?; self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta { digest: chunk_digest, size: buf_len, }); self.buf.clear(); } return Ok(( self.blob_hasher.finalize().as_bytes().to_vec(), self.blob_meta.clone(), )); } // upload a chunk to the chunk service, and return its digest (or an error) when done. #[instrument(skip(self, chunk_data))] fn upload_chunk(&mut self, chunk_data: Vec) -> Result, Error> { let mut hasher = blake3::Hasher::new(); if chunk_data.len() >= 128 * 1024 { hasher.update_rayon(&chunk_data); } else { hasher.update(&chunk_data); } let digest = hasher.finalize(); if self.chunk_service.has(digest.as_bytes())? { debug!("already has chunk, skipping"); } let digest_resp = self.chunk_service.put(chunk_data)?; assert_eq!(digest_resp, digest.as_bytes()); Ok(digest.as_bytes().to_vec()) } } /// This chunks up all data written using fastcdc, uploads all chunks to the // [ChunkService], and fills a [proto::BlobMeta] linking to these chunks. impl std::io::Write for BlobWriter<'_, CS> { fn write(&mut self, input_buf: &[u8]) -> std::io::Result { // calculate input_buf.len(), we need to return that later. let input_buf_len = input_buf.len(); // update calculate blob hash, and use rayon if data is > 128KiB. if input_buf.len() > 128 * 1024 { self.blob_hasher.update_rayon(input_buf); } else { self.blob_hasher.update(input_buf); } // prepend buf with existing data (from self.buf) let buf: Vec = { let mut b = Vec::new(); b.append(&mut self.buf); b.append(&mut input_buf.to_vec()); b }; // TODO: play with chunking sizes let chunker_avg_size = 64 * 1024; let chunker_min_size = chunker_avg_size / 4; let chunker_max_size = chunker_avg_size * 4; // initialize a chunker with the buffer let chunker = fastcdc::v2020::FastCDC::new( &buf, chunker_min_size, chunker_avg_size, chunker_max_size, ); // ask the chunker for cutting points in the buffer. let mut start_pos = 0_usize; let rest = loop { // ask the chunker for the next cutting point. let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos); // whenever the last cut point is pointing to the end of the buffer, // keep that chunk left in there. // We don't know if the chunker decided to cut here simply because it was // at the end of the buffer, or if it would also cut if there // were more data. // // Split off all previous chunks and keep this chunk data in the buffer. if end_pos == buf.len() { break buf[start_pos..].to_vec(); } // Upload that chunk to the chunk service and record it in BlobMeta. // TODO: make upload_chunk async and upload concurrently? let chunk_data = &buf[start_pos..end_pos]; let chunk_digest = self.upload_chunk(chunk_data.to_vec())?; self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta { digest: chunk_digest, size: chunk_data.len() as u32, }); // move start_pos over the processed chunk. start_pos = end_pos; }; self.buf = rest; Ok(input_buf_len) } fn flush(&mut self) -> std::io::Result<()> { Ok(()) } }