From 3af467d7eee2c82b7d3efa3ea14dbe8e63422f56 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sun, 12 Feb 2023 14:24:43 +0100 Subject: feat(tvix/store): add directoryservice This adds a DirectoryService trait, and an implementation for it using sled, and one using a HashMap. Change-Id: Ida61524b2ca949e1b3a78089a5aa7d9f9800c8d7 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8093 Tested-by: BuildkiteCI Reviewed-by: raitobezarius --- tvix/store/src/directoryservice/memory.rs | 76 ++++++++++++++++++++++++++++ tvix/store/src/directoryservice/mod.rs | 21 ++++++++ tvix/store/src/directoryservice/sled.rs | 84 +++++++++++++++++++++++++++++++ tvix/store/src/lib.rs | 1 + 4 files changed, 182 insertions(+) create mode 100644 tvix/store/src/directoryservice/memory.rs create mode 100644 tvix/store/src/directoryservice/mod.rs create mode 100644 tvix/store/src/directoryservice/sled.rs diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs new file mode 100644 index 0000000000..34ee452033 --- /dev/null +++ b/tvix/store/src/directoryservice/memory.rs @@ -0,0 +1,76 @@ +use crate::{proto, Error}; +use data_encoding::BASE64; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tracing::{instrument, warn}; + +use super::DirectoryService; + +#[derive(Clone)] +pub struct MemoryDirectoryService { + db: Arc, proto::Directory>>>, +} + +impl MemoryDirectoryService { + pub fn new() -> Self { + let db = Arc::new(RwLock::new(HashMap::default())); + + Self { db } + } +} + +impl DirectoryService for MemoryDirectoryService { + // TODO: change api to only be by digest + #[instrument(skip(self, by_what))] + fn get( + &self, + by_what: &proto::get_directory_request::ByWhat, + ) -> Result, Error> { + match by_what { + proto::get_directory_request::ByWhat::Digest(digest) => { + let db = self.db.read()?; + + match db.get(digest) { + // The directory was not found, return + None => Ok(None), + + // The directory was found, try to parse the data as Directory message + Some(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest.as_slice() != digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + BASE64.encode(digest), + BASE64.encode(&actual_digest) + ))); + } + + Ok(Some(directory.clone())) + } + } + } + } + } + + #[instrument(skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))] + fn put(&self, directory: proto::Directory) -> Result, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + BASE64.encode(&digest), + e, + ))); + } + + // store it + let mut db = self.db.write()?; + db.insert(digest.clone(), directory); + + Ok(digest) + } +} diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs new file mode 100644 index 0000000000..4abf823b23 --- /dev/null +++ b/tvix/store/src/directoryservice/mod.rs @@ -0,0 +1,21 @@ +use crate::{proto, Error}; +mod memory; +mod sled; + +pub use self::memory::MemoryDirectoryService; +pub use self::sled::SledDirectoryService; + +/// The base trait all Directory services need to implement. +/// This is a simple get and put of [crate::proto::Directory], returning their +/// digest. +pub trait DirectoryService { + /// Get looks up a single Directory message by its digest. + /// In case the directory is not found, Ok(None) is returned. + fn get( + &self, + by_what: &proto::get_directory_request::ByWhat, + ) -> Result, Error>; + /// Get uploads a single Directory message, and returns the calculated + /// digest, or an error. + fn put(&self, directory: proto::Directory) -> Result, Error>; +} diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs new file mode 100644 index 0000000000..caa78615d3 --- /dev/null +++ b/tvix/store/src/directoryservice/sled.rs @@ -0,0 +1,84 @@ +use crate::proto::Directory; +use crate::{proto, Error}; +use data_encoding::BASE64; +use prost::Message; +use std::path::PathBuf; +use tracing::{instrument, warn}; + +use super::DirectoryService; + +#[derive(Clone)] +pub struct SledDirectoryService { + db: sled::Db, +} + +impl SledDirectoryService { + pub fn new(p: PathBuf) -> Result { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } +} + +impl DirectoryService for SledDirectoryService { + // TODO: change api to only be by digest + #[instrument(name = "SledDirectoryService::get", skip(self, by_what))] + fn get( + &self, + by_what: &proto::get_directory_request::ByWhat, + ) -> Result, Error> { + match by_what { + proto::get_directory_request::ByWhat::Digest(digest) => { + match self.db.get(digest) { + // The directory was not found, return + Ok(None) => Ok(None), + + // The directory was found, try to parse the data as Directory message + Ok(Some(data)) => match Directory::decode(&*data) { + Ok(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest.as_slice() != digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + BASE64.encode(digest), + BASE64.encode(&actual_digest) + ))); + } + + Ok(Some(directory)) + } + Err(e) => { + warn!("unable to parse directory {}: {}", BASE64.encode(digest), e); + Err(Error::StorageError(e.to_string())) + } + }, + // some storage error? + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + } + } + + #[instrument(name = "SledDirectoryService::put", skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))] + fn put(&self, directory: proto::Directory) -> Result, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + BASE64.encode(&digest), + e, + ))); + } + // store it + let result = self.db.insert(&digest, directory.encode_to_vec()); + if let Err(e) = result { + return Err(Error::StorageError(e.to_string())); + } + Ok(digest) + } +} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index 12250a9c0d..0279d13f9b 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -5,6 +5,7 @@ mod errors; pub mod blobservice; pub mod chunkservice; +pub mod directoryservice; pub mod proto; pub use blobreader::BlobReader; -- cgit 1.4.1