diff options
author | Florian Klink <flokli@flokli.de> | 2023-02-12T13·24+0100 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-03-10T10·58+0000 |
commit | 3af467d7eee2c82b7d3efa3ea14dbe8e63422f56 (patch) | |
tree | 79665d13bdbeb0b6ec64460914f09c3431d91bb4 /tvix/store | |
parent | 35ea0b0d2ed2d72fdafbb0b64756578a389ec484 (diff) |
feat(tvix/store): add directoryservice r/5912
This adds a DirectoryService trait, and an implementation for it using sled, and one using a HashMap. Change-Id: Ida61524b2ca949e1b3a78089a5aa7d9f9800c8d7 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8093 Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/src/directoryservice/memory.rs | 76 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 21 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/sled.rs | 84 | ||||
-rw-r--r-- | tvix/store/src/lib.rs | 1 |
4 files changed, 182 insertions, 0 deletions
diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs new file mode 100644 index 000000000000..34ee45203320 --- /dev/null +++ b/tvix/store/src/directoryservice/memory.rs @@ -0,0 +1,76 @@ +use crate::{proto, Error}; +use data_encoding::BASE64; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use tracing::{instrument, warn}; + +use super::DirectoryService; + +#[derive(Clone)] +pub struct MemoryDirectoryService { + db: Arc<RwLock<HashMap<Vec<u8>, proto::Directory>>>, +} + +impl MemoryDirectoryService { + pub fn new() -> Self { + let db = Arc::new(RwLock::new(HashMap::default())); + + Self { db } + } +} + +impl DirectoryService for MemoryDirectoryService { + // TODO: change api to only be by digest + #[instrument(skip(self, by_what))] + fn get( + &self, + by_what: &proto::get_directory_request::ByWhat, + ) -> Result<Option<proto::Directory>, Error> { + match by_what { + proto::get_directory_request::ByWhat::Digest(digest) => { + let db = self.db.read()?; + + match db.get(digest) { + // The directory was not found, return + None => Ok(None), + + // The directory was found, try to parse the data as Directory message + Some(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest.as_slice() != digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + BASE64.encode(digest), + BASE64.encode(&actual_digest) + ))); + } + + Ok(Some(directory.clone())) + } + } + } + } + } + + #[instrument(skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))] + fn put(&self, directory: proto::Directory) -> Result<Vec<u8>, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + BASE64.encode(&digest), + e, + ))); + } + + // store it + let mut db = self.db.write()?; + db.insert(digest.clone(), directory); + + Ok(digest) + } +} diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs new file mode 100644 index 000000000000..4abf823b23b4 --- /dev/null +++ b/tvix/store/src/directoryservice/mod.rs @@ -0,0 +1,21 @@ +use crate::{proto, Error}; +mod memory; +mod sled; + +pub use self::memory::MemoryDirectoryService; +pub use self::sled::SledDirectoryService; + +/// The base trait all Directory services need to implement. +/// This is a simple get and put of [crate::proto::Directory], returning their +/// digest. +pub trait DirectoryService { + /// Get looks up a single Directory message by its digest. + /// In case the directory is not found, Ok(None) is returned. + fn get( + &self, + by_what: &proto::get_directory_request::ByWhat, + ) -> Result<Option<proto::Directory>, Error>; + /// Get uploads a single Directory message, and returns the calculated + /// digest, or an error. + fn put(&self, directory: proto::Directory) -> Result<Vec<u8>, Error>; +} diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs new file mode 100644 index 000000000000..caa78615d30a --- /dev/null +++ b/tvix/store/src/directoryservice/sled.rs @@ -0,0 +1,84 @@ +use crate::proto::Directory; +use crate::{proto, Error}; +use data_encoding::BASE64; +use prost::Message; +use std::path::PathBuf; +use tracing::{instrument, warn}; + +use super::DirectoryService; + +#[derive(Clone)] +pub struct SledDirectoryService { + db: sled::Db, +} + +impl SledDirectoryService { + pub fn new(p: PathBuf) -> Result<Self, sled::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { db }) + } +} + +impl DirectoryService for SledDirectoryService { + // TODO: change api to only be by digest + #[instrument(name = "SledDirectoryService::get", skip(self, by_what))] + fn get( + &self, + by_what: &proto::get_directory_request::ByWhat, + ) -> Result<Option<proto::Directory>, Error> { + match by_what { + proto::get_directory_request::ByWhat::Digest(digest) => { + match self.db.get(digest) { + // The directory was not found, return + Ok(None) => Ok(None), + + // The directory was found, try to parse the data as Directory message + Ok(Some(data)) => match Directory::decode(&*data) { + Ok(directory) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest.as_slice() != digest { + return Err(Error::StorageError(format!( + "requested directory with digest {}, but got {}", + BASE64.encode(digest), + BASE64.encode(&actual_digest) + ))); + } + + Ok(Some(directory)) + } + Err(e) => { + warn!("unable to parse directory {}: {}", BASE64.encode(digest), e); + Err(Error::StorageError(e.to_string())) + } + }, + // some storage error? + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + } + } + + #[instrument(name = "SledDirectoryService::put", skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))] + fn put(&self, directory: proto::Directory) -> Result<Vec<u8>, Error> { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + BASE64.encode(&digest), + e, + ))); + } + // store it + let result = self.db.insert(&digest, directory.encode_to_vec()); + if let Err(e) = result { + return Err(Error::StorageError(e.to_string())); + } + Ok(digest) + } +} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index 12250a9c0d51..0279d13f9be7 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -5,6 +5,7 @@ mod errors; pub mod blobservice; pub mod chunkservice; +pub mod directoryservice; pub mod proto; pub use blobreader::BlobReader; |