about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-03-11T20·23+0100
committerflokli <flokli@flokli.de>2023-03-13T10·05+0000
commit21782d24f5f16942cb67fee6c098aa15bdf88d0c (patch)
treecc9bf8bf2f5899720b0507af3dd608c5861c121e
parentc8bbddd5e55412c1b1319677e457d8175d6e78ab (diff)
feat(tvix/store): drop BlobWriter r/5960
All code initially using this has been replaced by the simpler and more
performant implementation with StreamCDC and read_all_and_chunk.

Change-Id: I08889e9a6984de91c5debcf2b612cb68ae5072d1
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8265
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
-rw-r--r--tvix/store/src/blobwriter.rs140
-rw-r--r--tvix/store/src/lib.rs2
2 files changed, 0 insertions, 142 deletions
diff --git a/tvix/store/src/blobwriter.rs b/tvix/store/src/blobwriter.rs
deleted file mode 100644
index beade0c9a1..0000000000
--- a/tvix/store/src/blobwriter.rs
+++ /dev/null
@@ -1,140 +0,0 @@
-use crate::chunkservice::{update_hasher, upload_chunk, ChunkService};
-use crate::{proto, Error};
-use rayon::prelude::*;
-use tracing::instrument;
-
-pub struct BlobWriter<'a, CS: ChunkService> {
-    chunk_service: &'a CS,
-
-    blob_hasher: blake3::Hasher,
-
-    blob_meta: proto::BlobMeta,
-
-    // filled with data from previous writes that didn't end up producing a full chunk.
-    buf: Vec<u8>,
-}
-
-impl<'a, CS: ChunkService> BlobWriter<'a, CS> {
-    pub fn new(chunk_service: &'a CS) -> Self {
-        Self {
-            chunk_service,
-            blob_hasher: blake3::Hasher::new(),
-            blob_meta: proto::BlobMeta::default(),
-            buf: vec![],
-        }
-    }
-
-    // Return the digest of the blob, as well as the blobmeta containing info of all the chunks,
-    // or an error if there's still bytes left to be flushed.
-    // In case there was still data to be written a last unfinalized chunk,
-    // it's written as well.
-    #[instrument(skip(self))]
-    pub fn finalize(&mut self) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
-        // If there's something still left in self.buf, upload this as the last
-        // chunk to the chunk service and record it in BlobMeta.
-        if !self.buf.is_empty() {
-            // Also upload the last chunk (what's left in `self.buf`) to the chunk
-            // service and record it in BlobMeta.
-            let buf_len = self.buf.len() as u32;
-            let chunk_digest = upload_chunk(self.chunk_service, self.buf.clone())?;
-
-            self.blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
-                digest: chunk_digest,
-                size: buf_len,
-            });
-
-            self.buf.clear();
-        }
-        return Ok((
-            self.blob_hasher.finalize().as_bytes().to_vec(),
-            self.blob_meta.clone(),
-        ));
-    }
-}
-
-/// This chunks up all data written using fastcdc, uploads all chunks to the
-// [ChunkService], and fills a [proto::BlobMeta] linking to these chunks.
-impl<CS: ChunkService + std::marker::Sync> std::io::Write for BlobWriter<'_, CS> {
-    fn write(&mut self, input_buf: &[u8]) -> std::io::Result<usize> {
-        // calculate input_buf.len(), we need to return that later.
-        let input_buf_len = input_buf.len();
-
-        // update blob hash
-        update_hasher(&mut self.blob_hasher, input_buf);
-
-        // prepend buf with existing data (from self.buf)
-        let buf: Vec<u8> = {
-            let mut b = Vec::new();
-            b.append(&mut self.buf);
-            b.append(&mut input_buf.to_vec());
-            b
-        };
-
-        // TODO: play with chunking sizes
-        let chunker_avg_size = 64 * 1024;
-        let chunker_min_size = chunker_avg_size / 4;
-        let chunker_max_size = chunker_avg_size * 4;
-
-        // initialize a chunker with the buffer
-        let chunker = fastcdc::v2020::FastCDC::new(
-            &buf,
-            chunker_min_size,
-            chunker_avg_size,
-            chunker_max_size,
-        );
-
-        // assemble a list of byte slices to be uploaded
-        let mut chunk_slices: Vec<&[u8]> = Vec::new();
-
-        // ask the chunker for cutting points in the buffer.
-        let mut start_pos = 0_usize;
-        let rest = loop {
-            // ask the chunker for the next cutting point.
-            let (_fp, end_pos) = chunker.cut(start_pos, buf.len() - start_pos);
-
-            // whenever the last cut point is pointing to the end of the buffer,
-            // return that from the loop.
-            // We don't know if the chunker decided to cut here simply because it was
-            // at the end of the buffer, or if it would also cut if there
-            // were more data.
-            //
-            // Split off all previous chunks and keep this chunk data in the buffer.
-            if end_pos == buf.len() {
-                break &buf[start_pos..];
-            }
-
-            // if it's an intermediate chunk, add it to chunk_slices.
-            // We'll later upload all of them in batch.
-            chunk_slices.push(&buf[start_pos..end_pos]);
-
-            // advance start_pos over the processed chunk.
-            start_pos = end_pos;
-        };
-
-        // Upload all chunks to the chunk service and map them to a ChunkMeta
-        let blob_meta_chunks: Vec<Result<proto::blob_meta::ChunkMeta, Error>> = chunk_slices
-            .into_par_iter()
-            .map(|chunk_slice| {
-                let chunk_digest = upload_chunk(self.chunk_service, chunk_slice.to_vec())?;
-
-                Ok(proto::blob_meta::ChunkMeta {
-                    digest: chunk_digest,
-                    size: chunk_slice.len() as u32,
-                })
-            })
-            .collect();
-
-        self.blob_meta.chunks = blob_meta_chunks
-            .into_iter()
-            .collect::<Result<Vec<proto::blob_meta::ChunkMeta>, Error>>()?;
-
-        // update buf to point to the rest we didn't upload.
-        self.buf = rest.to_vec();
-
-        Ok(input_buf_len)
-    }
-
-    fn flush(&mut self) -> std::io::Result<()> {
-        Ok(())
-    }
-}
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index 3e78521348..aac650b1ca 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -1,5 +1,4 @@
 mod blobreader;
-mod blobwriter;
 mod errors;
 
 pub mod blobservice;
@@ -11,7 +10,6 @@ pub mod pathinfoservice;
 pub mod proto;
 
 pub use blobreader::BlobReader;
-pub use blobwriter::BlobWriter;
 pub use errors::Error;
 
 #[cfg(test)]