diff options
Diffstat (limited to 'tvix/castore/src/directoryservice/object_store.rs')
-rw-r--r-- | tvix/castore/src/directoryservice/object_store.rs | 116 |
1 files changed, 91 insertions, 25 deletions
diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index 64ce335edb86..5b5281abcd2f 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::HashMap; use std::sync::Arc; use data_encoding::HEXLOWER; @@ -16,8 +16,12 @@ use tonic::async_trait; use tracing::{instrument, trace, warn, Level}; use url::Url; -use super::{ClosureValidator, DirectoryPutter, DirectoryService}; -use crate::{proto, B3Digest, Error}; +use super::{ + Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, + RootToLeavesValidator, +}; +use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{proto, B3Digest, Error, Node}; /// Stores directory closures in an object store. /// Notably, this makes use of the option to disallow accessing child directories except when @@ -45,7 +49,7 @@ fn derive_dirs_path(base_path: &Path, digest: &B3Digest) -> Path { const MAX_FRAME_LENGTH: usize = 1 * 1024 * 1024 * 1000; // 1 MiB // impl ObjectStoreDirectoryService { - /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by + /// Constructs a new [ObjectStoreDirectoryService] from a [Url] supported by /// [object_store]. /// Any path suffix becomes the base path of the object store. /// additional options, the same as in [object_store::parse_url_opts] can @@ -75,13 +79,17 @@ impl DirectoryService for ObjectStoreDirectoryService { /// This is the same steps as for get_recursive anyways, so we just call get_recursive and /// return the first element of the stream and drop the request. #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.get_recursive(digest).take(1).next().await.transpose() } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - if !directory.directories.is_empty() { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { + // Ensure the directory doesn't contain other directory children + if directory + .nodes() + .any(|(_, e)| matches!(e, Node::Directory { .. })) + { return Err(Error::InvalidRequest( "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(), )); @@ -96,10 +104,11 @@ impl DirectoryService for ObjectStoreDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { - // The Directory digests we're expecting to receive. - let mut expected_directory_digests: HashSet<B3Digest> = - HashSet::from([root_directory_digest.clone()]); + ) -> BoxStream<'static, Result<Directory, Error>> { + // Check that we are not passing on bogus from the object store to the client, and that the + // trust chain from the root digest to the leaves is intact + let mut order_validator = + RootToLeavesValidator::new_with_root_digest(root_directory_digest.clone()); let dir_path = derive_dirs_path(&self.base_path, root_directory_digest); let object_store = self.object_store.clone(); @@ -130,8 +139,7 @@ impl DirectoryService for ObjectStoreDirectoryService { let digest: B3Digest = hasher.update(&buf).finalize().as_bytes().into(); // Ensure to only decode the directory objects whose digests we trust - let was_expected = expected_directory_digests.remove(&digest); - if !was_expected { + if !order_validator.digest_allowed(&digest) { return Err(crate::Error::StorageError(format!( "received unexpected directory {}", digest @@ -142,14 +150,13 @@ impl DirectoryService for ObjectStoreDirectoryService { warn!("unable to parse directory {}: {}", digest, e); Error::StorageError(e.to_string()) })?; + let directory = Directory::try_from(directory).map_err(|e| { + warn!("unable to convert directory {}: {}", digest, e); + Error::StorageError(e.to_string()) + })?; - for directory in &directory.directories { - // Allow the children to appear next - expected_directory_digests.insert( - B3Digest::try_from(directory.digest.clone()) - .map_err(|e| Error::StorageError(e.to_string()))?, - ); - } + // Allow the children to appear next + order_validator.add_directory_unchecked(&directory); Ok(directory) })()) @@ -173,11 +180,64 @@ impl DirectoryService for ObjectStoreDirectoryService { } } +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ObjectStoreDirectoryServiceConfig { + object_store_url: String, + #[serde(default)] + object_store_options: HashMap<String, String>, +} + +impl TryFrom<url::Url> for ObjectStoreDirectoryServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // We need to convert the URL to string, strip the prefix there, and then + // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do. + let trimmed_url = { + let s = url.to_string(); + let mut url = Url::parse( + s.strip_prefix("objectstore+") + .ok_or(Error::StorageError("Missing objectstore uri".into()))?, + )?; + // trim the query pairs, they might contain credentials or local settings we don't want to send as-is. + url.set_query(None); + url + }; + Ok(ObjectStoreDirectoryServiceConfig { + object_store_url: trimmed_url.into(), + object_store_options: url + .query_pairs() + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + }) + } +} + +#[async_trait] +impl ServiceBuilder for ObjectStoreDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (object_store, path) = object_store::parse_url_opts( + &self.object_store_url.parse()?, + &self.object_store_options, + )?; + Ok(Arc::new(ObjectStoreDirectoryService { + object_store: Arc::new(object_store), + base_path: path, + })) + } +} + struct ObjectStoreDirectoryPutter { object_store: Arc<dyn ObjectStore>, base_path: Path, - directory_validator: Option<ClosureValidator>, + directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>, } impl ObjectStoreDirectoryPutter { @@ -193,11 +253,13 @@ impl ObjectStoreDirectoryPutter { #[async_trait] impl DirectoryPutter for ObjectStoreDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { - validator.add(directory)?; + validator + .add(directory) + .map_err(|e| Error::StorageError(e.to_string()))?; } } @@ -214,7 +276,11 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { // retrieve the validated directories. // It is important that they are in topological order (root first), // since that's how we want to retrieve them from the object store in the end. - let directories = validator.finalize_root_to_leaves()?; + let directories = validator + .validate() + .map_err(|e| Error::StorageError(e.to_string()))? + .drain_root_to_leaves() + .collect::<Vec<_>>(); // Get the root digest let root_digest = directories @@ -245,7 +311,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { for directory in directories { directories_sink - .send(directory.encode_to_vec().into()) + .send(proto::Directory::from(directory).encode_to_vec().into()) .await?; } |