about summary refs log tree commit diff
path: root/tvix/store/src/chunkservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/chunkservice')
-rw-r--r--tvix/store/src/chunkservice/memory.rs53
-rw-r--r--tvix/store/src/chunkservice/mod.rs28
-rw-r--r--tvix/store/src/chunkservice/sled.rs70
-rw-r--r--tvix/store/src/chunkservice/util.rs85
4 files changed, 0 insertions, 236 deletions
diff --git a/tvix/store/src/chunkservice/memory.rs b/tvix/store/src/chunkservice/memory.rs
deleted file mode 100644
index 9fe4dc17d5ec..000000000000
--- a/tvix/store/src/chunkservice/memory.rs
+++ /dev/null
@@ -1,53 +0,0 @@
-use data_encoding::BASE64;
-use std::{
-    collections::HashMap,
-    sync::{Arc, RwLock},
-};
-use tracing::instrument;
-
-use crate::Error;
-
-use super::ChunkService;
-
-#[derive(Clone, Default)]
-pub struct MemoryChunkService {
-    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
-}
-
-impl ChunkService for MemoryChunkService {
-    #[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
-        let db = self.db.read().unwrap();
-        Ok(db.get(digest).is_some())
-    }
-
-    #[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))]
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
-        let db = self.db.read().unwrap();
-        match db.get(digest) {
-            None => Ok(None),
-            Some(data) => {
-                // calculate the hash to verify this is really what we expect
-                let actual_digest = blake3::hash(data).as_bytes().to_vec();
-                if actual_digest != digest {
-                    return Err(Error::StorageError(format!(
-                        "invalid hash encountered when reading chunk, expected {}, got {}",
-                        BASE64.encode(digest),
-                        BASE64.encode(&actual_digest),
-                    )));
-                }
-                Ok(Some(data.clone()))
-            }
-        }
-    }
-
-    #[instrument(skip(self, data))]
-    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
-        let digest = blake3::hash(&data);
-
-        let mut db = self.db.write().unwrap();
-        db.insert(*digest.as_bytes(), data);
-
-        Ok(*digest.as_bytes())
-    }
-}
diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs
deleted file mode 100644
index faf0a88f151a..000000000000
--- a/tvix/store/src/chunkservice/mod.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-mod util;
-
-pub mod memory;
-pub mod sled;
-
-use crate::Error;
-
-pub use self::memory::MemoryChunkService;
-pub use self::sled::SledChunkService;
-pub use self::util::read_all_and_chunk;
-pub use self::util::update_hasher;
-pub use self::util::upload_chunk;
-
-/// The base trait all ChunkService services need to implement.
-/// It allows checking for the existence, download and upload of chunks.
-/// It's usually used after consulting a [crate::blobservice::BlobService] for
-/// chunking information.
-pub trait ChunkService {
-    /// check if the service has a chunk, given by its digest.
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
-
-    /// retrieve a chunk by its digest. Implementations MUST validate the digest
-    /// matches.
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error>;
-
-    /// insert a chunk. returns the digest of the chunk, or an error.
-    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error>;
-}
diff --git a/tvix/store/src/chunkservice/sled.rs b/tvix/store/src/chunkservice/sled.rs
deleted file mode 100644
index 8e86e1825b28..000000000000
--- a/tvix/store/src/chunkservice/sled.rs
+++ /dev/null
@@ -1,70 +0,0 @@
-use std::path::PathBuf;
-
-use data_encoding::BASE64;
-use tracing::instrument;
-
-use crate::Error;
-
-use super::ChunkService;
-
-#[derive(Clone)]
-pub struct SledChunkService {
-    db: sled::Db,
-}
-
-impl SledChunkService {
-    pub fn new(p: PathBuf) -> Result<Self, sled::Error> {
-        let config = sled::Config::default().use_compression(true).path(p);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-
-    pub fn new_temporary() -> Result<Self, sled::Error> {
-        let config = sled::Config::default().temporary(true);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-}
-
-impl ChunkService for SledChunkService {
-    #[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
-        match self.db.get(digest) {
-            Ok(None) => Ok(false),
-            Ok(Some(_)) => Ok(true),
-            Err(e) => Err(Error::StorageError(e.to_string())),
-        }
-    }
-
-    #[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))]
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
-        match self.db.get(digest) {
-            Ok(None) => Ok(None),
-            Ok(Some(data)) => {
-                // calculate the hash to verify this is really what we expect
-                let actual_digest = blake3::hash(&data).as_bytes().to_vec();
-                if actual_digest != digest {
-                    return Err(Error::StorageError(format!(
-                        "invalid hash encountered when reading chunk, expected {}, got {}",
-                        BASE64.encode(digest),
-                        BASE64.encode(&actual_digest),
-                    )));
-                }
-                Ok(Some(Vec::from(&*data)))
-            }
-            Err(e) => Err(Error::StorageError(e.to_string())),
-        }
-    }
-
-    #[instrument(name = "SledChunkService::put", skip(self, data))]
-    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
-        let digest = blake3::hash(&data);
-        let result = self.db.insert(digest.as_bytes(), data);
-        if let Err(e) = result {
-            return Err(Error::StorageError(e.to_string()));
-        }
-        Ok(*digest.as_bytes())
-    }
-}
diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs
deleted file mode 100644
index 2ad663733b04..000000000000
--- a/tvix/store/src/chunkservice/util.rs
+++ /dev/null
@@ -1,85 +0,0 @@
-use crate::{proto, Error};
-use std::io::Read;
-use tracing::{debug, instrument};
-
-use super::ChunkService;
-
-/// uploads a chunk to a chunk service, and returns its digest (or an error) when done.
-#[instrument(skip_all, err)]
-pub fn upload_chunk<CS: ChunkService>(
-    chunk_service: &CS,
-    chunk_data: Vec<u8>,
-) -> Result<[u8; 32], Error> {
-    let mut hasher = blake3::Hasher::new();
-    update_hasher(&mut hasher, &chunk_data);
-    let digest = hasher.finalize();
-
-    if chunk_service.has(digest.as_bytes())? {
-        debug!("already has chunk, skipping");
-    }
-    let digest_resp = chunk_service.put(chunk_data)?;
-
-    assert_eq!(&digest_resp, digest.as_bytes());
-
-    Ok(*digest.as_bytes())
-}
-
-/// reads through a reader, writes chunks to a [ChunkService] and returns a
-/// [proto::BlobMeta] pointing to all the chunks.
-#[instrument(skip_all, err)]
-pub fn read_all_and_chunk<CS: ChunkService, R: Read>(
-    chunk_service: &CS,
-    r: R,
-) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
-    let mut blob_meta = proto::BlobMeta::default();
-
-    // hash the file contents, upload chunks if not there yet
-    let mut blob_hasher = blake3::Hasher::new();
-
-    // TODO: play with chunking sizes
-    let chunker_avg_size = 64 * 1024;
-    let chunker_min_size = chunker_avg_size / 4;
-    let chunker_max_size = chunker_avg_size * 4;
-
-    let chunker =
-        fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size);
-
-    for chunking_result in chunker {
-        let chunk = chunking_result.unwrap();
-        // TODO: convert to error::UnableToRead
-
-        let chunk_len = chunk.data.len() as u32;
-
-        // update calculate blob hash
-        update_hasher(&mut blob_hasher, &chunk.data);
-
-        let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
-
-        blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
-            digest: chunk_digest.to_vec(),
-            size: chunk_len,
-        });
-    }
-    Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta))
-}
-
-/// updates a given hasher with more data. Uses rayon if the data is
-/// sufficiently big.
-///
-/// From the docs:
-///
-/// To get any performance benefit from multithreading, the input buffer needs
-/// to be large. As a rule of thumb on x86_64, update_rayon is slower than
-/// update for inputs under 128 KiB. That threshold varies quite a lot across
-/// different processors, and it’s important to benchmark your specific use
-/// case.
-///
-/// We didn't benchmark yet, so these numbers might need tweaking.
-#[instrument(skip_all)]
-pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) {
-    if data.len() > 128 * 1024 {
-        hasher.update_rayon(data);
-    } else {
-        hasher.update(data);
-    }
-}