about summary refs log tree commit diff
path: root/tvix/store/src/chunkservice/util.rs
use crate::{proto, Error};
use std::io::Read;
use tracing::{debug, instrument};

use super::ChunkService;

/// uploads a chunk to a chunk service, and returns its digest (or an error) when done.
#[instrument(skip_all, err)]
pub fn upload_chunk<CS: ChunkService>(
    chunk_service: &CS,
    chunk_data: Vec<u8>,
) -> Result<Vec<u8>, Error> {
    let mut hasher = blake3::Hasher::new();
    update_hasher(&mut hasher, &chunk_data);
    let digest = hasher.finalize();

    if chunk_service.has(digest.as_bytes())? {
        debug!("already has chunk, skipping");
    }
    let digest_resp = chunk_service.put(chunk_data)?;

    assert_eq!(digest_resp, digest.as_bytes());

    Ok(digest.as_bytes().to_vec())
}

/// reads through a reader, writes chunks to a [ChunkService] and returns a
/// [proto::BlobMeta] pointing to all the chunks.
#[instrument(skip_all, err)]
pub fn read_all_and_chunk<CS: ChunkService, R: Read>(
    chunk_service: &CS,
    r: R,
) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
    let mut blob_meta = proto::BlobMeta::default();

    // hash the file contents, upload chunks if not there yet
    let mut blob_hasher = blake3::Hasher::new();

    // TODO: play with chunking sizes
    let chunker_avg_size = 64 * 1024;
    let chunker_min_size = chunker_avg_size / 4;
    let chunker_max_size = chunker_avg_size * 4;

    let chunker =
        fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size);

    for chunking_result in chunker {
        let chunk = chunking_result.unwrap();
        // TODO: convert to error::UnableToRead

        let chunk_len = chunk.data.len() as u32;

        // update calculate blob hash
        update_hasher(&mut blob_hasher, &chunk.data);

        let chunk_digest = upload_chunk(chunk_service, chunk.data)?;

        blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
            digest: chunk_digest,
            size: chunk_len,
        });
    }
    Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta))
}

/// updates a given hasher with more data. Uses rayon if the data is
/// sufficiently big.
///
/// From the docs:
///
/// To get any performance benefit from multithreading, the input buffer needs
/// to be large. As a rule of thumb on x86_64, update_rayon is slower than
/// update for inputs under 128 KiB. That threshold varies quite a lot across
/// different processors, and it’s important to benchmark your specific use
/// case.
///
/// We didn't benchmark yet, so these numbers might need tweaking.
#[instrument(skip_all)]
pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) {
    if data.len() > 128 * 1024 {
        hasher.update_rayon(data);
    } else {
        hasher.update(data);
    }
}