about summary refs log tree commit diff
path: root/tvix/store/src/blobwriter.rs
blob: 8cb09ecc6d3cb84ab72b93b262ca8d75ee2637af (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use crate::chunkservice::ChunkService;
use crate::{proto, Error};
use tracing::{debug, instrument};

pub struct BlobWriter<'a, CS: ChunkService> {
    chunk_service: &'a CS,

    blob_hasher: blake3::Hasher,

    blob_meta: proto::BlobMeta,

    // filled with data from previous writes that didn't end up producing a full chunk.
    buf: Vec<u8>,
}

impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
    pub fn new(chunk_service: &'a CS) -> Self {
        Self {
            chunk_service,
            blob_hasher: blake3::Hasher::new(),
            blob_meta: proto::BlobMeta::default(),
            buf: vec![],
        }
    }

    // Return the digest of the blob, as well as the blobmeta containing info of all the chunks,
    // or an error if there's still bytes left to be flushed.
    // In case there was still data to be written a last unfinalized chunk,
    // it's written as well.
    #[instrument(skip(self))]
    pub fn finalize(&mut self) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
        // If there's something still left in self.buf, upload this as the last
        // chunk to the chunk service and record it in BlobMeta.
        if !self.buf.is_empty() {
            // Also upload the last chunk (what's left in `self.buf`) to the chunk
            // service and record it in BlobMeta.
            let buf_len = self.buf.len() as u32;
            let chunk_digest = self.upload_chunk(self.buf.clone())?;

            self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
                digest: chunk_digest,
                size: buf_len,
            });

            self.buf.clear();
        }
        return Ok((
            self.blob_hasher.finalize().as_bytes().to_vec(),
            self.blob_meta.clone(),
        ));
    }

    // upload a chunk to the chunk service, and return its digest (or an error) when done.
    #[instrument(skip(self, chunk_data))]
    fn upload_chunk(&mut self, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
        let mut hasher = blake3::Hasher::new();
        if chunk_data.len() >= 128 * 1024 {
            hasher.update_rayon(&chunk_data);
        } else {
            hasher.update(&chunk_data);
        }
        let digest = hasher.finalize();

        if self.chunk_service.has(digest.as_bytes())? {
            debug!("already has chunk, skipping");
        }
        let digest_resp = self.chunk_service.put(chunk_data)?;

        assert_eq!(digest_resp, digest.as_bytes());

        Ok(digest.as_bytes().to_vec())
    }
}

/// This chunks up all data written using fastcdc, uploads all chunks to the
// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks.
impl<CS: ChunkService> std::io::Write for BlobWriter<'_, CS> {
    fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> {
        // calculate input_buf.len(), we need to return that later.
        let input_buf_len = input_buf.len();

        // update calculate blob hash, and use rayon if data is > 128KiB.
        if input_buf.len() > 128 * 1024 {
            self.blob_hasher.update_rayon(input_buf);
        } else {
            self.blob_hasher.update(input_buf);
        }

        // prepend buf with existing data (from self.buf)
        let buf: Vec<u8> = {
            let mut b = Vec::new();
            b.append(&mut self.buf);
            b.append(&mut input_buf.to_vec());
            b
        };

        // TODO: play with chunking sizes
        let chunker_avg_size = 64 * 1024;
        let chunker_min_size = chunker_avg_size / 4;
        let chunker_max_size = chunker_avg_size * 4;

        // initialize a chunker with the buffer
        let chunker = fastcdc::v2020::FastCDC::new(
            &buf,
            chunker_min_size,
            chunker_avg_size,
            chunker_max_size,
        );

        // ask the chunker for cutting points in the buffer.
        let mut start_pos = 0_usize;
        let rest = loop {
            // ask the chunker for the next cutting point.
            let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);

            // whenever the last cut point is pointing to the end of the buffer,
            // keep that chunk left in there.
            // We don't know if the chunker decided to cut here simply because it was
            // at the end of the buffer, or if it would also cut if there
            // were more data.
            //
            // Split off all previous chunks and keep this chunk data in the buffer.
            if end_pos == buf.len() {
                break buf[start_pos..].to_vec();
            }

            // Upload that chunk to the chunk service and record it in BlobMeta.
            // TODO: make upload_chunk async and upload concurrently?
            let chunk_data = &buf[start_pos..end_pos];
            let chunk_digest = self.upload_chunk(chunk_data.to_vec())?;

            self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
                digest: chunk_digest,
                size: chunk_data.len() as u32,
            });

            // move start_pos over the processed chunk.
            start_pos = end_pos;
        };

        self.buf = rest;

        Ok(input_buf_len)
    }

    fn flush(&mut self) -> std::io::Result<()> {
        Ok(())
    }
}