use crate::chunkservice::ChunkService;
use crate::{proto, Error};
use rayon::prelude::*;
use tracing::{debug, instrument};
pub struct BlobWriter<'a, CS: ChunkService> {
chunk_service: &'a CS,
blob_hasher: blake3::Hasher,
blob_meta: proto::BlobMeta,
// filled with data from previous writes that didn't end up producing a full chunk.
buf: Vec<u8>,
}
// upload a chunk to the chunk service, and return its digest (or an error) when done.
#[instrument(skip_all)]
fn upload_chunk<CS: ChunkService>(
chunk_service: &CS,
chunk_data: Vec<u8>,
) -> Result<Vec<u8>, Error> {
let mut hasher = blake3::Hasher::new();
// TODO: benchmark this number and factor it out
if chunk_data.len() >= 128 * 1024 {
hasher.update_rayon(&chunk_data);
} else {
hasher.update(&chunk_data);
}
let digest = hasher.finalize();
if chunk_service.has(digest.as_bytes())? {
debug!("already has chunk, skipping");
}
let digest_resp = chunk_service.put(chunk_data)?;
assert_eq!(digest_resp, digest.as_bytes());
Ok(digest.as_bytes().to_vec())
}
impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
pub fn new(chunk_service: &'a CS) -> Self {
Self {
chunk_service,
blob_hasher: blake3::Hasher::new(),
blob_meta: proto::BlobMeta::default(),
buf: vec![],
}
}
// Return the digest of the blob, as well as the blobmeta containing info of all the chunks,
// or an error if there's still bytes left to be flushed.
// In case there was still data to be written a last unfinalized chunk,
// it's written as well.
#[instrument(skip(self))]
pub fn finalize(&mut self) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
// If there's something still left in self.buf, upload this as the last
// chunk to the chunk service and record it in BlobMeta.
if !self.buf.is_empty() {
// Also upload the last chunk (what's left in `self.buf`) to the chunk
// service and record it in BlobMeta.
let buf_len = self.buf.len() as u32;
let chunk_digest = upload_chunk(self.chunk_service, self.buf.clone())?;
self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
digest: chunk_digest,
size: buf_len,
});
self.buf.clear();
}
return Ok((
self.blob_hasher.finalize().as_bytes().to_vec(),
self.blob_meta.clone(),
));
}
}
/// This chunks up all data written using fastcdc, uploads all chunks to the
// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks.
impl<CS: ChunkService + std::marker::Sync> std::io::Write for BlobWriter<'_, CS> {
fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> {
// calculate input_buf.len(), we need to return that later.
let input_buf_len = input_buf.len();
// update calculate blob hash, and use rayon if data is > 128KiB.
if input_buf.len() > 128 * 1024 {
self.blob_hasher.update_rayon(input_buf);
} else {
self.blob_hasher.update(input_buf);
}
// prepend buf with existing data (from self.buf)
let buf: Vec<u8> = {
let mut b = Vec::new();
b.append(&mut self.buf);
b.append(&mut input_buf.to_vec());
b
};
// TODO: play with chunking sizes
let chunker_avg_size = 64 * 1024;
let chunker_min_size = chunker_avg_size / 4;
let chunker_max_size = chunker_avg_size * 4;
// initialize a chunker with the buffer
let chunker = fastcdc::v2020::FastCDC::new(
&buf,
chunker_min_size,
chunker_avg_size,
chunker_max_size,
);
// assemble a list of byte slices to be uploaded
let mut chunk_slices: Vec<&[u8]> = Vec::new();
// ask the chunker for cutting points in the buffer.
let mut start_pos = 0_usize;
let rest = loop {
// ask the chunker for the next cutting point.
let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
// whenever the last cut point is pointing to the end of the buffer,
// return that from the loop.
// We don't know if the chunker decided to cut here simply because it was
// at the end of the buffer, or if it would also cut if there
// were more data.
//
// Split off all previous chunks and keep this chunk data in the buffer.
if end_pos == buf.len() {
break &buf[start_pos..];
}
// if it's an intermediate chunk, add it to chunk_slices.
// We'll later upload all of them in batch.
chunk_slices.push(&buf[start_pos..end_pos]);
// advance start_pos over the processed chunk.
start_pos = end_pos;
};
// Upload all chunks to the chunk service and map them to a ChunkMeta
let blob_meta_chunks: Vec<Result<proto::blob_meta::ChunkMeta, Error>> = chunk_slices
.into_par_iter()
.map(|chunk_slice| {
let chunk_digest = upload_chunk(self.chunk_service, chunk_slice.to_vec())?;
Ok(proto::blob_meta::ChunkMeta {
digest: chunk_digest,
size: chunk_slice.len() as u32,
})
})
.collect();
self.blob_meta.chunks = blob_meta_chunks
.into_iter()
.collect::<Result<Vec<proto::blob_meta::ChunkMeta>, Error>>()?;
// update buf to point to the rest we didn't upload.
self.buf = rest.to_vec();
Ok(input_buf_len)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}