From 119aa43171c2e2c654d513ea2f2ee740f962a398 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sun, 12 Feb 2023 12:00:00 +0100 Subject: feat(tvix/store): add chunkservice This adds the simpler ChunkService trait, and an implementation for it using sled, and one using a HashMap. Change-Id: Icb0fdc41b37b44e9e9e4f548d0f4acae1d83b71e Reviewed-on: https://cl.tvl.fyi/c/depot/+/8086 Reviewed-by: raitobezarius Tested-by: BuildkiteCI --- tvix/store/src/chunkservice/memory.rs | 61 +++++++++++++++++++++++++++++++++ tvix/store/src/chunkservice/mod.rs | 23 +++++++++++++ tvix/store/src/chunkservice/sled.rs | 63 +++++++++++++++++++++++++++++++++++ tvix/store/src/lib.rs | 1 + 4 files changed, 148 insertions(+) create mode 100644 tvix/store/src/chunkservice/memory.rs create mode 100644 tvix/store/src/chunkservice/mod.rs create mode 100644 tvix/store/src/chunkservice/sled.rs (limited to 'tvix') diff --git a/tvix/store/src/chunkservice/memory.rs b/tvix/store/src/chunkservice/memory.rs new file mode 100644 index 0000000000..1ae8b91305 --- /dev/null +++ b/tvix/store/src/chunkservice/memory.rs @@ -0,0 +1,61 @@ +use data_encoding::BASE64; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; +use tracing::instrument; + +use crate::Error; + +use super::ChunkService; + +#[derive(Clone)] +pub struct MemoryChunkService { + db: Arc, Vec>>>, +} + +impl MemoryChunkService { + pub fn new() -> Self { + let db = Arc::new(RwLock::new(HashMap::default())); + + Self { db } + } +} + +impl ChunkService for MemoryChunkService { + #[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))] + fn has(&self, digest: &[u8]) -> Result { + let db = self.db.read().unwrap(); + Ok(db.get(digest).is_some()) + } + + #[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))] + fn get(&self, digest: &[u8]) -> Result>, Error> { + let db = self.db.read().unwrap(); + match db.get(digest) { + None => Ok(None), + Some(data) => { + // calculate the hash to verify this is really what we expect + let actual_digest = blake3::hash(&data).as_bytes().to_vec(); + if actual_digest != digest { + return Err(Error::StorageError(format!( + "invalid hash encountered when reading chunk, expected {}, got {}", + BASE64.encode(digest), + BASE64.encode(&actual_digest), + ))); + } + Ok(Some(data.clone())) + } + } + } + + #[instrument(skip(self, data))] + fn put(&self, data: Vec) -> Result, Error> { + let digest = blake3::hash(&data).as_bytes().to_vec(); + + let mut db = self.db.write().unwrap(); + db.insert(digest.clone(), data); + + Ok(digest) + } +} diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs new file mode 100644 index 0000000000..83c91fd96e --- /dev/null +++ b/tvix/store/src/chunkservice/mod.rs @@ -0,0 +1,23 @@ +pub mod memory; +pub mod sled; + +use crate::Error; + +pub use self::memory::MemoryChunkService; +pub use self::sled::SledChunkService; + +/// The base trait all ChunkService services need to implement. +/// It allows checking for the existence, download and upload of chunks. +/// It's usually used after consulting a [crate::blobservice::BlobService] for +/// chunking information. +pub trait ChunkService { + /// check if the service has a chunk, given by its digest. + fn has(&self, digest: &[u8]) -> Result; + + /// retrieve a chunk by its digest. Implementations MUST validate the digest + /// matches. + fn get(&self, digest: &[u8]) -> Result>, Error>; + + /// insert a chunk. returns the digest of the chunk, or an error. + fn put(&self, data: Vec) -> Result, Error>; +} diff --git a/tvix/store/src/chunkservice/sled.rs b/tvix/store/src/chunkservice/sled.rs new file mode 100644 index 0000000000..06194ddc59 --- /dev/null +++ b/tvix/store/src/chunkservice/sled.rs @@ -0,0 +1,63 @@ +use std::path::PathBuf; + +use data_encoding::BASE64; +use tracing::instrument; + +use crate::Error; + +use super::ChunkService; + +#[derive(Clone)] +pub struct SledChunkService { + db: sled::Db, +} + +impl SledChunkService { + pub fn new(p: PathBuf) -> Result { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } +} + +impl ChunkService for SledChunkService { + #[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))] + fn has(&self, digest: &[u8]) -> Result { + match self.db.get(digest) { + Ok(None) => Ok(false), + Ok(Some(_)) => Ok(true), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))] + fn get(&self, digest: &[u8]) -> Result>, Error> { + match self.db.get(digest) { + Ok(None) => Ok(None), + Ok(Some(data)) => { + // calculate the hash to verify this is really what we expect + let actual_digest = blake3::hash(&data).as_bytes().to_vec(); + if actual_digest != digest { + return Err(Error::StorageError(format!( + "invalid hash encountered when reading chunk, expected {}, got {}", + BASE64.encode(digest), + BASE64.encode(&actual_digest), + ))); + } + Ok(Some(Vec::from(&*data))) + } + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(name = "SledChunkService::put", skip(self, data))] + fn put(&self, data: Vec) -> Result, Error> { + let digest = blake3::hash(&data).as_bytes().to_vec(); + let result = self.db.insert(&digest, data); + if let Err(e) = result { + return Err(Error::StorageError(e.to_string())); + } + Ok(digest) + } +} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index 2c8e8610da..e4d770c312 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -2,6 +2,7 @@ pub mod client; mod errors; +pub mod chunkservice; pub mod proto; pub mod dummy_blob_service; -- cgit 1.4.1