diff options
Diffstat (limited to 'tvix/store/src/chunkservice')
-rw-r--r-- | tvix/store/src/chunkservice/memory.rs | 53 | ||||
-rw-r--r-- | tvix/store/src/chunkservice/mod.rs | 28 | ||||
-rw-r--r-- | tvix/store/src/chunkservice/sled.rs | 70 | ||||
-rw-r--r-- | tvix/store/src/chunkservice/util.rs | 85 |
4 files changed, 0 insertions, 236 deletions
diff --git a/tvix/store/src/chunkservice/memory.rs b/tvix/store/src/chunkservice/memory.rs deleted file mode 100644 index 9fe4dc17d5ec..000000000000 --- a/tvix/store/src/chunkservice/memory.rs +++ /dev/null @@ -1,53 +0,0 @@ -use data_encoding::BASE64; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; -use tracing::instrument; - -use crate::Error; - -use super::ChunkService; - -#[derive(Clone, Default)] -pub struct MemoryChunkService { - db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, -} - -impl ChunkService for MemoryChunkService { - #[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))] - fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { - let db = self.db.read().unwrap(); - Ok(db.get(digest).is_some()) - } - - #[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))] - fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> { - let db = self.db.read().unwrap(); - match db.get(digest) { - None => Ok(None), - Some(data) => { - // calculate the hash to verify this is really what we expect - let actual_digest = blake3::hash(data).as_bytes().to_vec(); - if actual_digest != digest { - return Err(Error::StorageError(format!( - "invalid hash encountered when reading chunk, expected {}, got {}", - BASE64.encode(digest), - BASE64.encode(&actual_digest), - ))); - } - Ok(Some(data.clone())) - } - } - } - - #[instrument(skip(self, data))] - fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> { - let digest = blake3::hash(&data); - - let mut db = self.db.write().unwrap(); - db.insert(*digest.as_bytes(), data); - - Ok(*digest.as_bytes()) - } -} diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs deleted file mode 100644 index faf0a88f151a..000000000000 --- a/tvix/store/src/chunkservice/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -mod util; - -pub mod memory; -pub mod sled; - -use crate::Error; - -pub use self::memory::MemoryChunkService; -pub use self::sled::SledChunkService; -pub use self::util::read_all_and_chunk; -pub use self::util::update_hasher; -pub use self::util::upload_chunk; - -/// The base trait all ChunkService services need to implement. -/// It allows checking for the existence, download and upload of chunks. -/// It's usually used after consulting a [crate::blobservice::BlobService] for -/// chunking information. -pub trait ChunkService { - /// check if the service has a chunk, given by its digest. - fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>; - - /// retrieve a chunk by its digest. Implementations MUST validate the digest - /// matches. - fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error>; - - /// insert a chunk. returns the digest of the chunk, or an error. - fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error>; -} diff --git a/tvix/store/src/chunkservice/sled.rs b/tvix/store/src/chunkservice/sled.rs deleted file mode 100644 index 8e86e1825b28..000000000000 --- a/tvix/store/src/chunkservice/sled.rs +++ /dev/null @@ -1,70 +0,0 @@ -use std::path::PathBuf; - -use data_encoding::BASE64; -use tracing::instrument; - -use crate::Error; - -use super::ChunkService; - -#[derive(Clone)] -pub struct SledChunkService { - db: sled::Db, -} - -impl SledChunkService { - pub fn new(p: PathBuf) -> Result<Self, sled::Error> { - let config = sled::Config::default().use_compression(true).path(p); - let db = config.open()?; - - Ok(Self { db }) - } - - pub fn new_temporary() -> Result<Self, sled::Error> { - let config = sled::Config::default().temporary(true); - let db = config.open()?; - - Ok(Self { db }) - } -} - -impl ChunkService for SledChunkService { - #[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))] - fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { - match self.db.get(digest) { - Ok(None) => Ok(false), - Ok(Some(_)) => Ok(true), - Err(e) => Err(Error::StorageError(e.to_string())), - } - } - - #[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))] - fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> { - match self.db.get(digest) { - Ok(None) => Ok(None), - Ok(Some(data)) => { - // calculate the hash to verify this is really what we expect - let actual_digest = blake3::hash(&data).as_bytes().to_vec(); - if actual_digest != digest { - return Err(Error::StorageError(format!( - "invalid hash encountered when reading chunk, expected {}, got {}", - BASE64.encode(digest), - BASE64.encode(&actual_digest), - ))); - } - Ok(Some(Vec::from(&*data))) - } - Err(e) => Err(Error::StorageError(e.to_string())), - } - } - - #[instrument(name = "SledChunkService::put", skip(self, data))] - fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> { - let digest = blake3::hash(&data); - let result = self.db.insert(digest.as_bytes(), data); - if let Err(e) = result { - return Err(Error::StorageError(e.to_string())); - } - Ok(*digest.as_bytes()) - } -} diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs deleted file mode 100644 index 2ad663733b04..000000000000 --- a/tvix/store/src/chunkservice/util.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::{proto, Error}; -use std::io::Read; -use tracing::{debug, instrument}; - -use super::ChunkService; - -/// uploads a chunk to a chunk service, and returns its digest (or an error) when done. -#[instrument(skip_all, err)] -pub fn upload_chunk<CS: ChunkService>( - chunk_service: &CS, - chunk_data: Vec<u8>, -) -> Result<[u8; 32], Error> { - let mut hasher = blake3::Hasher::new(); - update_hasher(&mut hasher, &chunk_data); - let digest = hasher.finalize(); - - if chunk_service.has(digest.as_bytes())? { - debug!("already has chunk, skipping"); - } - let digest_resp = chunk_service.put(chunk_data)?; - - assert_eq!(&digest_resp, digest.as_bytes()); - - Ok(*digest.as_bytes()) -} - -/// reads through a reader, writes chunks to a [ChunkService] and returns a -/// [proto::BlobMeta] pointing to all the chunks. -#[instrument(skip_all, err)] -pub fn read_all_and_chunk<CS: ChunkService, R: Read>( - chunk_service: &CS, - r: R, -) -> Result<(Vec<u8>, proto::BlobMeta), Error> { - let mut blob_meta = proto::BlobMeta::default(); - - // hash the file contents, upload chunks if not there yet - let mut blob_hasher = blake3::Hasher::new(); - - // TODO: play with chunking sizes - let chunker_avg_size = 64 * 1024; - let chunker_min_size = chunker_avg_size / 4; - let chunker_max_size = chunker_avg_size * 4; - - let chunker = - fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size); - - for chunking_result in chunker { - let chunk = chunking_result.unwrap(); - // TODO: convert to error::UnableToRead - - let chunk_len = chunk.data.len() as u32; - - // update calculate blob hash - update_hasher(&mut blob_hasher, &chunk.data); - - let chunk_digest = upload_chunk(chunk_service, chunk.data)?; - - blob_meta.chunks.push(proto::blob_meta::ChunkMeta { - digest: chunk_digest.to_vec(), - size: chunk_len, - }); - } - Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta)) -} - -/// updates a given hasher with more data. Uses rayon if the data is -/// sufficiently big. -/// -/// From the docs: -/// -/// To get any performance benefit from multithreading, the input buffer needs -/// to be large. As a rule of thumb on x86_64, update_rayon is slower than -/// update for inputs under 128 KiB. That threshold varies quite a lot across -/// different processors, and it’s important to benchmark your specific use -/// case. -/// -/// We didn't benchmark yet, so these numbers might need tweaking. -#[instrument(skip_all)] -pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) { - if data.len() > 128 * 1024 { - hasher.update_rayon(data); - } else { - hasher.update(data); - } -} |