about summary refs log tree commit diff
path: root/tvix/store/src/blobwriter.rs
blob: 50471a97c3afc3181ff0919ff83ec9e76a59b4cf (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
use crate::chunkservice::ChunkService;
use crate::{proto, Error};
use rayon::prelude::*;
use tracing::{debug, instrument};

pub struct BlobWriter<'a, CS: ChunkService> {
    chunk_service: &'a CS,

    blob_hasher: blake3::Hasher,

    blob_meta: proto::BlobMeta,

    // filled with data from previous writes that didn't end up producing a full chunk.
    buf: Vec<u8>,
}

// upload a chunk to the chunk service, and return its digest (or an error) when done.
#[instrument(skip_all)]
fn upload_chunk<CS: ChunkService>(
    chunk_service: &CS,
    chunk_data: Vec<u8>,
) -> Result<Vec<u8>, Error> {
    let mut hasher = blake3::Hasher::new();
    // TODO: benchmark this number and factor it out
    if chunk_data.len() >= 128 * 1024 {
        hasher.update_rayon(&chunk_data);
    } else {
        hasher.update(&chunk_data);
    }
    let digest = hasher.finalize();

    if chunk_service.has(digest.as_bytes())? {
        debug!("already has chunk, skipping");
    }
    let digest_resp = chunk_service.put(chunk_data)?;

    assert_eq!(digest_resp, digest.as_bytes());

    Ok(digest.as_bytes().to_vec())
}

impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
    pub fn new(chunk_service: &'a CS) -> Self {
        Self {
            chunk_service,
            blob_hasher: blake3::Hasher::new(),
            blob_meta: proto::BlobMeta::default(),
            buf: vec![],
        }
    }

    // Return the digest of the blob, as well as the blobmeta containing info of all the chunks,
    // or an error if there's still bytes left to be flushed.
    // In case there was still data to be written a last unfinalized chunk,
    // it's written as well.
    #[instrument(skip(self))]
    pub fn finalize(&mut self) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
        // If there's something still left in self.buf, upload this as the last
        // chunk to the chunk service and record it in BlobMeta.
        if !self.buf.is_empty() {
            // Also upload the last chunk (what's left in `self.buf`) to the chunk
            // service and record it in BlobMeta.
            let buf_len = self.buf.len() as u32;
            let chunk_digest = upload_chunk(self.chunk_service, self.buf.clone())?;

            self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
                digest: chunk_digest,
                size: buf_len,
            });

            self.buf.clear();
        }
        return Ok((
            self.blob_hasher.finalize().as_bytes().to_vec(),
            self.blob_meta.clone(),
        ));
    }
}

/// This chunks up all data written using fastcdc, uploads all chunks to the
// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks.
impl<CS: ChunkService + std::marker::Sync> std::io::Write for BlobWriter<'_, CS> {
    fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> {
        // calculate input_buf.len(), we need to return that later.
        let input_buf_len = input_buf.len();

        // update calculate blob hash, and use rayon if data is > 128KiB.
        if input_buf.len() > 128 * 1024 {
            self.blob_hasher.update_rayon(input_buf);
        } else {
            self.blob_hasher.update(input_buf);
        }

        // prepend buf with existing data (from self.buf)
        let buf: Vec<u8> = {
            let mut b = Vec::new();
            b.append(&mut self.buf);
            b.append(&mut input_buf.to_vec());
            b
        };

        // TODO: play with chunking sizes
        let chunker_avg_size = 64 * 1024;
        let chunker_min_size = chunker_avg_size / 4;
        let chunker_max_size = chunker_avg_size * 4;

        // initialize a chunker with the buffer
        let chunker = fastcdc::v2020::FastCDC::new(
            &buf,
            chunker_min_size,
            chunker_avg_size,
            chunker_max_size,
        );

        // assemble a list of byte slices to be uploaded
        let mut chunk_slices: Vec<&[u8]> = Vec::new();

        // ask the chunker for cutting points in the buffer.
        let mut start_pos = 0_usize;
        let rest = loop {
            // ask the chunker for the next cutting point.
            let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);

            // whenever the last cut point is pointing to the end of the buffer,
            // return that from the loop.
            // We don't know if the chunker decided to cut here simply because it was
            // at the end of the buffer, or if it would also cut if there
            // were more data.
            //
            // Split off all previous chunks and keep this chunk data in the buffer.
            if end_pos == buf.len() {
                break &buf[start_pos..];
            }

            // if it's an intermediate chunk, add it to chunk_slices.
            // We'll later upload all of them in batch.
            chunk_slices.push(&buf[start_pos..end_pos]);

            // advance start_pos over the processed chunk.
            start_pos = end_pos;
        };

        // Upload all chunks to the chunk service and map them to a ChunkMeta
        let blob_meta_chunks: Vec<Result<proto::blob_meta::ChunkMeta, Error>> = chunk_slices
            .into_par_iter()
            .map(|chunk_slice| {
                let chunk_digest = upload_chunk(self.chunk_service, chunk_slice.to_vec())?;

                Ok(proto::blob_meta::ChunkMeta {
                    digest: chunk_digest,
                    size: chunk_slice.len() as u32,
                })
            })
            .collect();

        self.blob_meta.chunks = blob_meta_chunks
            .into_iter()
            .collect::<Result<Vec<proto::blob_meta::ChunkMeta>, Error>>()?;

        // update buf to point to the rest we didn't upload.
        self.buf = rest.to_vec();

        Ok(input_buf_len)
    }

    fn flush(&mut self) -> std::io::Result<()> {
        Ok(())
    }
}