about summary refs log tree commit diff
path: root/tvix/store/src/blobwriter.rs
use crate::chunkservice::ChunkService;
use crate::{proto, Error};
use rayon::prelude::*;
use tracing::{debug, instrument};

pub struct BlobWriter<'a, CS: ChunkService> {
    chunk_service: &'a CS,

    blob_hasher: blake3::Hasher,

    blob_meta: proto::BlobMeta,

    // filled with data from previous writes that didn't end up producing a full chunk.
    buf: Vec<u8>,
}

// upload a chunk to the chunk service, and return its digest (or an error) when done.
#[instrument(skip_all)]
fn upload_chunk<CS: ChunkService>(
    chunk_service: &CS,
    chunk_data: Vec<u8>,
) -> Result<Vec<u8>, Error> {
    let mut hasher = blake3::Hasher::new();
    // TODO: benchmark this number and factor it out
    if chunk_data.len() >= 128 * 1024 {
        hasher.update_rayon(&chunk_data);
    } else {
        hasher.update(&chunk_data);
    }
    let digest = hasher.finalize();

    if chunk_service.has(digest.as_bytes())? {
        debug!("already has chunk, skipping");
    }
    let digest_resp = chunk_service.put(chunk_data)?;

    assert_eq!(digest_resp, digest.as_bytes());

    Ok(digest.as_bytes().to_vec())
}

impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
    pub fn new(chunk_service: &'a CS) -> Self {
        Self {
            chunk_service,
            blob_hasher: blake3::Hasher::new(),
            blob_meta: proto::BlobMeta::default(),
            buf: vec![],
        }
    }

    // Return the digest of the blob, as well as the blobmeta containing info of all the chunks,
    // or an error if there's still bytes left to be flushed.
    // In case there was still data to be written a last unfinalized chunk,
    // it's written as well.
    #[instrument(skip(self))]
    pub fn finalize(&mut self) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
        // If there's something still left in self.buf, upload this as the last
        // chunk to the chunk service and record it in BlobMeta.
        if !self.buf.is_empty() {
            // Also upload the last chunk (what's left in `self.buf`) to the chunk
            // service and record it in BlobMeta.
            let buf_len = self.buf.len() as u32;
            let chunk_digest = upload_chunk(self.chunk_service, self.buf.clone())?;

            self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
                digest: chunk_digest,
                size: buf_len,
            });

            self.buf.clear();
        }
        return Ok((
            self.blob_hasher.finalize().as_bytes().to_vec(),
            self.blob_meta.clone(),
        ));
    }
}

/// This chunks up all data written using fastcdc, uploads all chunks to the
// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks.
impl<CS: ChunkService + std::marker::Sync> std::io::Write for BlobWriter<'_, CS> {
    fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> {
        // calculate input_buf.len(), we need to return that later.
        let input_buf_len = input_buf.len();

        // update calculate blob hash, and use rayon if data is > 128KiB.
        if input_buf.len() > 128 * 1024 {
            self.blob_hasher.update_rayon(input_buf);
        } else {
            self.blob_hasher.update(input_buf);
        }

        // prepend buf with existing data (from self.buf)
        let buf: Vec<u8> = {
            let mut b = Vec::new();
            b.append(&mut self.buf);
            b.append(&mut input_buf.to_vec());
            b
        };

        // TODO: play with chunking sizes
        let chunker_avg_size = 64 * 1024;
        let chunker_min_size = chunker_avg_size / 4;
        let chunker_max_size = chunker_avg_size * 4;

        // initialize a chunker with the buffer
        let chunker = fastcdc::v2020::FastCDC::new(
            &buf,
            chunker_min_size,
            chunker_avg_size,
            chunker_max_size,
        );

        // assemble a list of byte slices to be uploaded
        let mut chunk_slices: Vec<&[u8]> = Vec::new();

        // ask the chunker for cutting points in the buffer.
        let mut start_pos = 0_usize;
        let rest = loop {
            // ask the chunker for the next cutting point.
            let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);

            // whenever the last cut point is pointing to the end of the buffer,
            // return that from the loop.
            // We don't know if the chunker decided to cut here simply because it was
            // at the end of the buffer, or if it would also cut if there
            // were more data.
            //
            // Split off all previous chunks and keep this chunk data in the buffer.
            if end_pos == buf.len() {
                break &buf[start_pos..];
            }

            // if it's an intermediate chunk, add it to chunk_slices.
            // We'll later upload all of them in batch.
            chunk_slices.push(&buf[start_pos..end_pos]);

            // advance start_pos over the processed chunk.
            start_pos = end_pos;
        };

        // Upload all chunks to the chunk service and map them to a ChunkMeta
        let blob_meta_chunks: Vec<Result<proto::blob_meta::ChunkMeta, Error>> = chunk_slices
            .into_par_iter()
            .map(|chunk_slice| {
                let chunk_service = self.chunk_service.clone();
                let chunk_digest = upload_chunk(chunk_service.clone(), chunk_slice.to_vec())?;

                Ok(proto::blob_meta::ChunkMeta {
                    digest: chunk_digest,
                    size: chunk_slice.len() as u32,
                })
            })
            .collect();

        self.blob_meta.chunks = blob_meta_chunks
            .into_iter()
            .collect::<Result<Vec<proto::blob_meta::ChunkMeta>, Error>>()?;

        // update buf to point to the rest we didn't upload.
        self.buf = rest.to_vec();

        Ok(input_buf_len)
    }

    fn flush(&mut self) -> std::io::Result<()> {
        Ok(())
    }
}