diff options
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r-- | tvix/castore/src/directoryservice/bigtable.rs | 24 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/combinators.rs | 9 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/directory_graph.rs | 68 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 40 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/memory.rs | 34 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/mod.rs | 489 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/object_store.rs | 19 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/order_validator.rs | 17 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/redb.rs | 43 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/simple_putter.rs | 5 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/sled.rs | 46 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/tests/mod.rs | 67 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/traverse.rs | 63 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/utils.rs | 20 |
14 files changed, 650 insertions, 294 deletions
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index d10dddaf9f60..73ab4342d832 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -9,7 +9,9 @@ use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, trace, warn}; -use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter}; +use super::{ + utils::traverse_directory, Directory, DirectoryPutter, DirectoryService, SimplePutter, +}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; @@ -149,7 +151,7 @@ fn derive_directory_key(digest: &B3Digest) -> String { #[async_trait] impl DirectoryService for BigtableDirectoryService { #[instrument(skip(self, digest), err, fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let mut client = self.client.clone(); let directory_key = derive_directory_key(digest); @@ -241,28 +243,20 @@ impl DirectoryService for BigtableDirectoryService { // Try to parse the value into a Directory message. let directory = proto::Directory::decode(Bytes::from(row_cell.value)) - .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?; - - // validate the Directory. - directory - .validate() + .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))? + .try_into() .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?; Ok(Some(directory)) } #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let directory_digest = directory.digest(); let mut client = self.client.clone(); let directory_key = derive_directory_key(&directory_digest); - // Ensure the directory we're trying to upload passes validation - directory - .validate() - .map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?; - - let data = directory.encode_to_vec(); + let data = proto::Directory::from(directory).encode_to_vec(); if data.len() as u64 > CELL_SIZE_LIMIT { return Err(Error::StorageError( "Directory exceeds cell limit on Bigtable".into(), @@ -310,7 +304,7 @@ impl DirectoryService for BigtableDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs index 0fdc82c16cb0..4283142231f9 100644 --- a/tvix/castore/src/directoryservice/combinators.rs +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -7,10 +7,9 @@ use futures::TryStreamExt; use tonic::async_trait; use tracing::{instrument, trace}; -use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; +use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::directoryservice::DirectoryPutter; -use crate::proto; use crate::B3Digest; use crate::Error; @@ -40,7 +39,7 @@ where DS2: DirectoryService + Clone + 'static, { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { match self.near.get(digest).await? { Some(directory) => { trace!("serving from cache"); @@ -82,7 +81,7 @@ where } #[instrument(skip_all)] - async fn put(&self, _directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> { Err(Error::StorageError("unimplemented".to_string())) } @@ -90,7 +89,7 @@ where fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let near = self.near.clone(); let far = self.far.clone(); let digest = root_directory_digest.clone(); diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs index e6b9b163370c..aa60f68a3197 100644 --- a/tvix/castore/src/directoryservice/directory_graph.rs +++ b/tvix/castore/src/directoryservice/directory_graph.rs @@ -10,10 +10,8 @@ use petgraph::{ use tracing::instrument; use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; -use crate::{ - proto::{self, Directory, DirectoryNode}, - B3Digest, -}; +use super::{Directory, DirectoryNode}; +use crate::B3Digest; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -88,7 +86,7 @@ fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> { impl DirectoryGraph<LeavesToRootValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { if !self.order_validator.add_directory(&directory) { return Err(Error::ValidationError( "unknown directory was referenced".into(), @@ -108,7 +106,7 @@ impl DirectoryGraph<RootToLeavesValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); if !self.order_validator.digest_allowed(&digest) { return Err(Error::ValidationError("unexpected digest".into())); @@ -129,12 +127,7 @@ impl<O: OrderValidator> DirectoryGraph<O> { } /// Adds a directory which has already been confirmed to be in-order to the graph - pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> { - // Do some basic validation - directory - .validate() - .map_err(|e| Error::ValidationError(e.to_string()))?; - + pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); // Teach the graph about the existence of a node with this digest @@ -149,12 +142,10 @@ impl<O: OrderValidator> DirectoryGraph<O> { } // set up edges to all child directories - for subdir in &directory.directories { - let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap(); - + for subdir in directory.directories() { let child_ix = *self .digest_to_node_ix - .entry(subdir_digest) + .entry(subdir.digest.clone()) .or_insert_with(|| self.graph.add_node(None)); let pending_edge_check = match &self.graph[child_ix] { @@ -266,37 +257,36 @@ impl ValidatedDirectoryGraph { .filter_map(move |i| nodes[i.index()].weight.take()) } } - -#[cfg(test)] -mod tests { - use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, - proto::{self, Directory}, - }; - use lazy_static::lazy_static; - use rstest::rstest; - - lazy_static! { +/* pub static ref BROKEN_DIRECTORY : Directory = Directory { - symlinks: vec![proto::SymlinkNode { + symlinks: vec![SymlinkNode { name: "".into(), // invalid name! target: "doesntmatter".into(), }], ..Default::default() }; +*/ +#[cfg(test)] +mod tests { + use crate::directoryservice::{Directory, DirectoryNode, Node}; + use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; + use lazy_static::lazy_static; + use rstest::rstest; - pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() + use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; + + lazy_static! { + pub static ref BROKEN_PARENT_DIRECTORY: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + "foo".into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size() + 42, // wrong! + ).unwrap())).unwrap(); + dir }; } - use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; - #[rstest] /// Uploading an empty directory should succeed. #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] @@ -312,8 +302,6 @@ mod tests { #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] /// Uploading B (referring to A) should fail immediately, because A was never uploaded. #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] - /// Uploading a directory failing validation should fail immediately. - #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] /// Uploading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] fn test_uploads( @@ -366,8 +354,6 @@ mod tests { #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)] /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root). #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)] - /// Downloading a directory failing validation should fail immediately. - #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)] /// Downloading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)] fn test_downloads( diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 4dc3931ed410..2079514d2b79 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,8 +1,9 @@ use std::collections::HashSet; -use super::{DirectoryPutter, DirectoryService}; +use super::{Directory, DirectoryPutter, DirectoryService}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::proto::{self, get_directory_request::ByWhat}; +use crate::ValidateDirectoryError; use crate::{B3Digest, Error}; use async_stream::try_stream; use futures::stream::BoxStream; @@ -41,10 +42,7 @@ where T::Future: Send, { #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] - async fn get( - &self, - digest: &B3Digest, - ) -> Result<Option<crate::proto::Directory>, crate::Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest_cpy = digest.clone(); @@ -72,15 +70,10 @@ where "requested directory with digest {}, but got {}", digest, actual_digest ))) - } else if let Err(e) = directory.validate() { - // Validate the Directory itself is valid. - warn!("directory failed validation: {}", e.to_string()); - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - digest, e, - ))) } else { - Ok(Some(directory)) + Ok(Some(directory.try_into().map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?)) } } Ok(None) => Ok(None), @@ -90,11 +83,11 @@ where } #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> { let resp = self .grpc_client .clone() - .put(tokio_stream::once(directory)) + .put(tokio_stream::once(proto::Directory::from(directory))) .await; match resp { @@ -113,7 +106,7 @@ where fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest = root_directory_digest.clone(); @@ -135,14 +128,6 @@ where loop { match stream.message().await { Ok(Some(directory)) => { - // validate the directory itself. - if let Err(e) = directory.validate() { - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - directory.digest(), - e, - )))?; - } // validate we actually expected that directory, and move it from expected to received. let directory_digest = directory.digest(); let was_expected = expected_directory_digests.remove(&directory_digest); @@ -168,6 +153,9 @@ where .insert(child_directory_digest); } + let directory = directory.try_into() + .map_err(|e: ValidateDirectoryError| Error::StorageError(e.to_string()))?; + yield directory; }, Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => { @@ -279,11 +267,11 @@ pub struct GRPCPutter { #[async_trait] impl DirectoryPutter for GRPCPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> { match self.rq { // If we're not already closed, send the directory to directory_sender. Some((_, ref directory_sender)) => { - if directory_sender.send(directory).is_err() { + if directory_sender.send(directory.into()).is_err() { // If the channel has been prematurely closed, invoke close (so we can peek at the error code) // That error code is much more helpful, because it // contains the error message from the server. diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index ada4606a5a57..b039d9bc7d84 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -1,4 +1,4 @@ -use crate::{proto, B3Digest, Error}; +use crate::{B3Digest, Error}; use futures::stream::BoxStream; use std::collections::HashMap; use std::sync::Arc; @@ -7,8 +7,9 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryPutter, DirectoryService, SimplePutter}; +use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter}; use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::proto; #[derive(Clone, Default)] pub struct MemoryDirectoryService { @@ -18,7 +19,7 @@ pub struct MemoryDirectoryService { #[async_trait] impl DirectoryService for MemoryDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.read().await; match db.get(digest) { @@ -37,35 +38,20 @@ impl DirectoryService for MemoryDirectoryService { ))); } - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } - - Ok(Some(directory.clone())) + Ok(Some(directory.clone().try_into().map_err(|e| { + crate::Error::StorageError(format!("corrupted directory: {}", e)) + })?)) } } } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } - // store it let mut db = self.db.write().await; - db.insert(digest.clone(), directory); + db.insert(digest.clone(), directory.into()); Ok(digest) } @@ -74,7 +60,7 @@ impl DirectoryService for MemoryDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index 17a78b179349..0141a48f75bc 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -1,6 +1,9 @@ use crate::composition::{Registry, ServiceBuilder}; -use crate::{proto, B3Digest, Error}; +use crate::proto; +use crate::{B3Digest, Error}; +use crate::{ValidateDirectoryError, ValidateNodeError}; +use bytes::Bytes; use futures::stream::BoxStream; use tonic::async_trait; mod combinators; @@ -38,7 +41,7 @@ mod bigtable; pub use self::bigtable::{BigtableDirectoryService, BigtableParameters}; /// The base trait all Directory services need to implement. -/// This is a simple get and put of [crate::proto::Directory], returning their +/// This is a simple get and put of [Directory], returning their /// digest. #[async_trait] pub trait DirectoryService: Send + Sync { @@ -50,14 +53,14 @@ pub trait DirectoryService: Send + Sync { /// Directory digests that are at the "root", aka the last element that's /// sent to a DirectoryPutter. This makes sense for implementations bundling /// closures of directories together in batches. - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error>; /// Uploads a single Directory message, and returns the calculated /// digest, or an error. An error *must* also be returned if the message is /// not valid. - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + async fn put(&self, directory: Directory) -> Result<B3Digest, Error>; - /// Looks up a closure of [proto::Directory]. - /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`, + /// Looks up a closure of [Directory]. + /// Ideally this would be a `impl Stream<Item = Result<Directory, Error>>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. /// @@ -75,9 +78,9 @@ pub trait DirectoryService: Send + Sync { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>>; + ) -> BoxStream<'static, Result<Directory, Error>>; - /// Allows persisting a closure of [proto::Directory], which is a graph of + /// Allows persisting a closure of [Directory], which is a graph of /// connected Directory messages. fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; } @@ -87,18 +90,18 @@ impl<A> DirectoryService for A where A: AsRef<dyn DirectoryService> + Send + Sync, { - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.as_ref().get(digest).await } - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { self.as_ref().put(directory).await } fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { self.as_ref().get_recursive(root_directory_digest) } @@ -107,7 +110,7 @@ where } } -/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// Provides a handle to put a closure of connected [Directory] elements. /// /// The consumer can periodically call [DirectoryPutter::put], starting from the /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to @@ -119,12 +122,12 @@ where /// but a single file or symlink. #[async_trait] pub trait DirectoryPutter: Send { - /// Put a individual [proto::Directory] into the store. + /// Put a individual [Directory] into the store. /// Error semantics and behaviour is up to the specific implementation of /// this trait. /// Due to bursting, the returned error might refer to an object previously /// sent via `put`. - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + async fn put(&mut self, directory: Directory) -> Result<(), Error>; /// Close the stream, and wait for any errors. /// If there's been any invalid Directory message uploaded, and error *must* @@ -145,3 +148,461 @@ pub(crate) fn register_directory_services(reg: &mut Registry) { reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable"); } } + +/// A Directory can contain Directory, File or Symlink nodes. +/// Each of these nodes have a name attribute, which is the basename in that +/// directory and node type specific attributes. +/// While a Node by itself may have any name, the names of Directory entries: +/// - MUST not contain slashes or null bytes +/// - MUST not be '.' or '..' +/// - MUST be unique across all three lists +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct Directory { + nodes: Vec<Node>, +} + +/// A DirectoryNode is a pointer to a [Directory], by its [Directory::digest]. +/// It also gives it a `name` and `size`. +/// Such a node is either an element in the [Directory] it itself is contained in, +/// or a standalone root node./ +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DirectoryNode { + /// The (base)name of the directory + name: Bytes, + /// The blake3 hash of a Directory message, serialized in protobuf canonical form. + digest: B3Digest, + /// Number of child elements in the Directory referred to by `digest`. + /// Calculated by summing up the numbers of nodes, and for each directory. + /// its size field. Can be used for inode allocation. + /// This field is precisely as verifiable as any other Merkle tree edge. + /// Resolve `digest`, and you can compute it incrementally. Resolve the entire + /// tree, and you can fully compute it from scratch. + /// A credulous implementation won't reject an excessive size, but this is + /// harmless: you'll have some ordinals without nodes. Undersizing is obvious + /// and easy to reject: you won't have an ordinal for some nodes. + size: u64, +} + +impl DirectoryNode { + pub fn new(name: Bytes, digest: B3Digest, size: u64) -> Result<Self, ValidateNodeError> { + Ok(Self { name, digest, size }) + } + + pub fn digest(&self) -> &B3Digest { + &self.digest + } + pub fn size(&self) -> u64 { + self.size + } +} + +/// A FileNode represents a regular or executable file in a Directory or at the root. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FileNode { + /// The (base)name of the file + name: Bytes, + + /// The blake3 digest of the file contents + digest: B3Digest, + + /// The file content size + size: u64, + + /// Whether the file is executable + executable: bool, +} + +impl FileNode { + pub fn new( + name: Bytes, + digest: B3Digest, + size: u64, + executable: bool, + ) -> Result<Self, ValidateNodeError> { + Ok(Self { + name, + digest, + size, + executable, + }) + } + + pub fn digest(&self) -> &B3Digest { + &self.digest + } + + pub fn size(&self) -> u64 { + self.size + } + + pub fn executable(&self) -> bool { + self.executable + } +} + +/// A SymlinkNode represents a symbolic link in a Directory or at the root. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SymlinkNode { + /// The (base)name of the symlink + name: Bytes, + /// The target of the symlink. + target: Bytes, +} + +impl SymlinkNode { + pub fn new(name: Bytes, target: Bytes) -> Result<Self, ValidateNodeError> { + if target.is_empty() || target.contains(&b'\0') { + return Err(ValidateNodeError::InvalidSymlinkTarget(target)); + } + Ok(Self { name, target }) + } + + pub fn target(&self) -> &bytes::Bytes { + &self.target + } +} + +/// A Node is either a [DirectoryNode], [FileNode] or [SymlinkNode]. +/// While a Node by itself may have any name, only those matching specific requirements +/// can can be added as entries to a [Directory] (see the documentation on [Directory] for details). +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Node { + Directory(DirectoryNode), + File(FileNode), + Symlink(SymlinkNode), +} + +/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] +/// and [Node], so we can ask all of them for the name easily. +pub trait NamedNode { + fn get_name(&self) -> &bytes::Bytes; +} + +impl NamedNode for &FileNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for FileNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &DirectoryNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for DirectoryNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &SymlinkNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for SymlinkNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &Node { + fn get_name(&self) -> &bytes::Bytes { + match self { + Node::File(node_file) => &node_file.name, + Node::Directory(node_directory) => &node_directory.name, + Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} +impl NamedNode for Node { + fn get_name(&self) -> &bytes::Bytes { + match self { + Node::File(node_file) => &node_file.name, + Node::Directory(node_directory) => &node_directory.name, + Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} + +impl Node { + /// Returns the node with a new name. + pub fn rename(self, name: bytes::Bytes) -> Self { + match self { + Node::Directory(n) => Node::Directory(DirectoryNode { name, ..n }), + Node::File(n) => Node::File(FileNode { name, ..n }), + Node::Symlink(n) => Node::Symlink(SymlinkNode { name, ..n }), + } + } +} + +impl PartialOrd for Node { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for Node { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for FileNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for FileNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for DirectoryNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for DirectoryNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for SymlinkNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for SymlinkNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { + iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) +} + +impl Directory { + pub fn new() -> Self { + Directory { nodes: vec![] } + } + + /// The size of a directory is the number of all regular and symlink elements, + /// the number of directory elements, and their size fields. + pub fn size(&self) -> u64 { + // It's impossible to create a Directory where the size overflows, because we + // check before every add() that the size won't overflow. + (self.nodes.len() as u64) + self.directories().map(|e| e.size).sum::<u64>() + } + + /// Calculates the digest of a Directory, which is the blake3 hash of a + /// Directory protobuf message, serialized in protobuf canonical form. + pub fn digest(&self) -> B3Digest { + proto::Directory::from(self).digest() + } + + /// Allows iterating over all nodes (directories, files and symlinks) + /// ordered by their name. + pub fn nodes(&self) -> impl Iterator<Item = &Node> + Send + Sync + '_ { + self.nodes.iter() + } + + /// Allows iterating over the FileNode entries of this directory + /// ordered by their name + pub fn files(&self) -> impl Iterator<Item = &FileNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::File(n) => Some(n), + _ => None, + }) + } + + /// Allows iterating over the subdirectories of this directory + /// ordered by their name + pub fn directories(&self) -> impl Iterator<Item = &DirectoryNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::Directory(n) => Some(n), + _ => None, + }) + } + + /// Allows iterating over the SymlinkNode entries of this directory + /// ordered by their name + pub fn symlinks(&self) -> impl Iterator<Item = &SymlinkNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::Symlink(n) => Some(n), + _ => None, + }) + } + + /// Checks a Node name for validity as a directory entry + /// We disallow slashes, null bytes, '.', '..' and the empty string. + pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { + if name.is_empty() + || name == b".." + || name == b"." + || name.contains(&0x00) + || name.contains(&b'/') + { + Err(ValidateNodeError::InvalidName(name.to_owned().into())) + } else { + Ok(()) + } + } + + /// Adds the specified [Node] to the [Directory], preserving sorted entries. + /// + /// Inserting an element that already exists with the same name in the directory will yield an + /// error. + /// Inserting an element will validate that its name fulfills the stricter requirements for + /// directory entries and yield an error if it is not. + pub fn add(&mut self, node: Node) -> Result<(), ValidateDirectoryError> { + Self::validate_node_name(node.get_name()) + .map_err(|e| ValidateDirectoryError::InvalidNode(node.get_name().clone().into(), e))?; + + // Check that the even after adding this new directory entry, the size calculation will not + // overflow + // FUTUREWORK: add some sort of batch add interface which only does this check once with + // all the to-be-added entries + checked_sum([ + self.size(), + 1, + match node { + Node::Directory(ref dir) => dir.size, + _ => 0, + }, + ]) + .ok_or(ValidateDirectoryError::SizeOverflow)?; + + // This assumes the [Directory] is sorted, since we don't allow accessing the nodes list + // directly and all previous inserts should have been in-order + let pos = match self + .nodes + .binary_search_by_key(&node.get_name(), |n| n.get_name()) + { + Err(pos) => pos, // There is no node with this name; good! + Ok(_) => { + return Err(ValidateDirectoryError::DuplicateName( + node.get_name().to_vec(), + )) + } + }; + + self.nodes.insert(pos, node); + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::{Directory, DirectoryNode, FileNode, Node, SymlinkNode}; + use crate::fixtures::DUMMY_DIGEST; + use crate::ValidateDirectoryError; + + #[test] + fn add_nodes_to_directory() { + let mut d = Directory::new(); + + d.add(Node::Directory( + DirectoryNode::new("b".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + d.add(Node::Directory( + DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + d.add(Node::Directory( + DirectoryNode::new("z".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + + d.add(Node::File( + FileNode::new("f".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + d.add(Node::File( + FileNode::new("c".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + d.add(Node::File( + FileNode::new("g".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + + d.add(Node::Symlink( + SymlinkNode::new("t".into(), "a".into()).unwrap(), + )) + .unwrap(); + d.add(Node::Symlink( + SymlinkNode::new("o".into(), "a".into()).unwrap(), + )) + .unwrap(); + d.add(Node::Symlink( + SymlinkNode::new("e".into(), "a".into()).unwrap(), + )) + .unwrap(); + + // Convert to proto struct and back to ensure we are not generating any invalid structures + crate::directoryservice::Directory::try_from(crate::proto::Directory::from(d)) + .expect("directory should be valid"); + } + + #[test] + fn validate_overflow() { + let mut d = Directory::new(); + + assert_eq!( + d.add(Node::Directory( + DirectoryNode::new("foo".into(), DUMMY_DIGEST.clone(), u64::MAX).unwrap(), + )), + Err(ValidateDirectoryError::SizeOverflow) + ); + } + + #[test] + fn add_duplicate_node_to_directory() { + let mut d = Directory::new(); + + d.add(Node::Directory( + DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + assert_eq!( + format!( + "{}", + d.add(Node::File( + FileNode::new("a".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .expect_err("adding duplicate dir entry must fail") + ), + "\"a\" is a duplicate name" + ); + } + + /// Attempt to add a directory entry with a name which should be rejected. + #[tokio::test] + async fn directory_reject_invalid_name() { + let mut dir = Directory::new(); + assert!( + dir.add(Node::Symlink( + SymlinkNode::new( + "".into(), // wrong! can not be added to directory + "doesntmatter".into(), + ) + .unwrap() + )) + .is_err(), + "invalid symlink entry be rejected" + ); + } +} diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index a9a2cc8ef5c0..6e71c2356def 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -17,7 +17,8 @@ use tracing::{instrument, trace, warn, Level}; use url::Url; use super::{ - DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator, + Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, + RootToLeavesValidator, }; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; @@ -78,13 +79,13 @@ impl DirectoryService for ObjectStoreDirectoryService { /// This is the same steps as for get_recursive anyways, so we just call get_recursive and /// return the first element of the stream and drop the request. #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.get_recursive(digest).take(1).next().await.transpose() } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - if !directory.directories.is_empty() { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { + if directory.directories().next().is_some() { return Err(Error::InvalidRequest( "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(), )); @@ -99,7 +100,7 @@ impl DirectoryService for ObjectStoreDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // Check that we are not passing on bogus from the object store to the client, and that the // trust chain from the root digest to the leaves is intact let mut order_validator = @@ -145,6 +146,10 @@ impl DirectoryService for ObjectStoreDirectoryService { warn!("unable to parse directory {}: {}", digest, e); Error::StorageError(e.to_string()) })?; + let directory = Directory::try_from(directory).map_err(|e| { + warn!("unable to convert directory {}: {}", digest, e); + Error::StorageError(e.to_string()) + })?; // Allow the children to appear next order_validator.add_directory_unchecked(&directory); @@ -244,7 +249,7 @@ impl ObjectStoreDirectoryPutter { #[async_trait] impl DirectoryPutter for ObjectStoreDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -302,7 +307,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { for directory in directories { directories_sink - .send(directory.encode_to_vec().into()) + .send(proto::Directory::from(directory).encode_to_vec().into()) .await?; } diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs index 6045f5d24198..17431fc8d3b0 100644 --- a/tvix/castore/src/directoryservice/order_validator.rs +++ b/tvix/castore/src/directoryservice/order_validator.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use tracing::warn; -use crate::{proto::Directory, B3Digest}; +use super::Directory; +use crate::B3Digest; pub trait OrderValidator { /// Update the order validator's state with the directory @@ -47,10 +48,9 @@ impl RootToLeavesValidator { self.expected_digests.insert(directory.digest()); } - for subdir in &directory.directories { + for subdir in directory.directories() { // Allow the children to appear next - let subdir_digest = subdir.digest.clone().try_into().unwrap(); - self.expected_digests.insert(subdir_digest); + self.expected_digests.insert(subdir.digest.clone()); } } } @@ -79,12 +79,11 @@ impl OrderValidator for LeavesToRootValidator { fn add_directory(&mut self, directory: &Directory) -> bool { let digest = directory.digest(); - for subdir in &directory.directories { - let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory() - if !self.allowed_references.contains(&subdir_digest) { + for subdir in directory.directories() { + if !self.allowed_references.contains(&subdir.digest) { warn!( directory.digest = %digest, - subdirectory.digest = %subdir_digest, + subdirectory.digest = %subdir.digest, "unexpected directory reference" ); return false; @@ -101,8 +100,8 @@ impl OrderValidator for LeavesToRootValidator { mod tests { use super::{LeavesToRootValidator, RootToLeavesValidator}; use crate::directoryservice::order_validator::OrderValidator; + use crate::directoryservice::Directory; use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; - use crate::proto::Directory; use rstest::rstest; #[rstest] diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs index 51dc87f92574..d253df503bb3 100644 --- a/tvix/castore/src/directoryservice/redb.rs +++ b/tvix/castore/src/directoryservice/redb.rs @@ -5,15 +5,15 @@ use std::{path::PathBuf, sync::Arc}; use tonic::async_trait; use tracing::{instrument, warn}; +use super::{ + traverse_directory, Directory, DirectoryGraph, DirectoryPutter, DirectoryService, + LeavesToRootValidator, +}; use crate::{ composition::{CompositionContext, ServiceBuilder}, digests, proto, B3Digest, Error, }; -use super::{ - traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, -}; - const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = TableDefinition::new("directory"); @@ -69,7 +69,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { #[async_trait] impl DirectoryService for RedbDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.clone(); // Retrieves the protobuf-encoded Directory for the corresponding digest. @@ -107,13 +107,10 @@ impl DirectoryService for RedbDirectoryService { let directory = match proto::Directory::decode(&*directory_data.value()) { Ok(dir) => { // The returned Directory must be valid. - if let Err(e) = dir.validate() { + dir.try_into().map_err(|e| { warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - dir + Error::StorageError("Directory failed validation".to_string()) + })? } Err(e) => { warn!(err=%e, "failed to parse Directory"); @@ -125,26 +122,21 @@ impl DirectoryService for RedbDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); move || { let digest = directory.digest(); - // Validate the directory. - if let Err(e) = directory.validate() { - warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - // Store the directory in the table. let txn = db.begin_write()?; { let mut table = txn.open_table(DIRECTORY_TABLE)?; let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } txn.commit()?; @@ -158,7 +150,7 @@ impl DirectoryService for RedbDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single // redb transaction to avoid constantly closing and opening new transactions for the // database. @@ -185,7 +177,7 @@ pub struct RedbDirectoryPutter { #[async_trait] impl DirectoryPutter for RedbDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -228,7 +220,10 @@ impl DirectoryPutter for RedbDirectoryPutter { for directory in directories { let digest_as_array: [u8; digests::B3_LEN] = directory.digest().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } } diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs index dc54e3d11d18..b4daaee61b22 100644 --- a/tvix/castore/src/directoryservice/simple_putter.rs +++ b/tvix/castore/src/directoryservice/simple_putter.rs @@ -1,7 +1,6 @@ use super::DirectoryPutter; use super::DirectoryService; -use super::{DirectoryGraph, LeavesToRootValidator}; -use crate::proto; +use super::{Directory, DirectoryGraph, LeavesToRootValidator}; use crate::B3Digest; use crate::Error; use tonic::async_trait; @@ -29,7 +28,7 @@ impl<DS: DirectoryService> SimplePutter<DS> { #[async_trait] impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index 5766dec1a5c2..4f3a860d14e4 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -1,5 +1,3 @@ -use crate::proto::Directory; -use crate::{proto, B3Digest, Error}; use futures::stream::BoxStream; use prost::Message; use std::ops::Deref; @@ -9,8 +7,9 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; +use super::{Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{proto, B3Digest, Error}; #[derive(Clone)] pub struct SledDirectoryService { @@ -44,7 +43,7 @@ impl SledDirectoryService { #[async_trait] impl DirectoryService for SledDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let resp = tokio::task::spawn_blocking({ let db = self.db.clone(); let digest = digest.clone(); @@ -61,7 +60,7 @@ impl DirectoryService for SledDirectoryService { None => Ok(None), // The directory was found, try to parse the data as Directory message - Some(data) => match Directory::decode(&*data) { + Some(data) => match proto::Directory::decode(&*data) { Ok(directory) => { // Validate the retrieved Directory indeed has the // digest we expect it to have, to detect corruptions. @@ -73,14 +72,10 @@ impl DirectoryService for SledDirectoryService { ))); } - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } + let directory = directory.try_into().map_err(|e| { + warn!("failed to retrieve directory: {}", e); + Error::StorageError(format!("failed to retrieve directory: {}", e)) + })?; Ok(Some(directory)) } @@ -93,22 +88,18 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); move || { let digest = directory.digest(); - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } // store it - db.insert(digest.as_slice(), directory.encode_to_vec()) - .map_err(|e| Error::StorageError(e.to_string()))?; + db.insert( + digest.as_slice(), + proto::Directory::from(directory).encode_to_vec(), + ) + .map_err(|e| Error::StorageError(e.to_string()))?; Ok(digest) } @@ -120,7 +111,7 @@ impl DirectoryService for SledDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } @@ -215,7 +206,7 @@ pub struct SledDirectoryPutter { #[async_trait] impl DirectoryPutter for SledDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -252,7 +243,10 @@ impl DirectoryPutter for SledDirectoryPutter { let mut batch = sled::Batch::default(); for directory in directories { - batch.insert(directory.digest().as_slice(), directory.encode_to_vec()); + batch.insert( + directory.digest().as_slice(), + proto::Directory::from(directory).encode_to_vec(), + ); } tree.apply_batch(batch).map_err(|e| { diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index b698f70ea469..3923e66dc2c2 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -8,10 +8,8 @@ use rstest_reuse::{self, *}; use super::DirectoryService; use crate::directoryservice; -use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D}, - proto::{self, Directory}, -}; +use crate::directoryservice::{Directory, DirectoryNode, Node}; +use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D}; mod utils; use self::utils::make_grpc_directory_service_client; @@ -41,10 +39,10 @@ async fn test_non_exist(directory_service: impl DirectoryService) { // recursive get assert_eq!( - Vec::<Result<proto::Directory, crate::Error>>::new(), + Vec::<Result<Directory, crate::Error>>::new(), directory_service .get_recursive(&DIRECTORY_A.digest()) - .collect::<Vec<Result<proto::Directory, crate::Error>>>() + .collect::<Vec<Result<Directory, crate::Error>>>() .await ); } @@ -212,59 +210,26 @@ async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService } } -/// Try uploading a Directory failing its internal validation, ensure it gets -/// rejected. -#[apply(directory_services)] -#[tokio::test] -async fn upload_reject_failing_validation(directory_service: impl DirectoryService) { - let broken_directory = Directory { - symlinks: vec![proto::SymlinkNode { - name: "".into(), // wrong! - target: "doesntmatter".into(), - }], - ..Default::default() - }; - assert!(broken_directory.validate().is_err()); - - // Try to upload via single upload. - assert!( - directory_service - .put(broken_directory.clone()) - .await - .is_err(), - "single upload must fail" - ); - - // Try to upload via put_multiple. We're a bit more permissive here, the - // intermediate .put() might succeed, due to client-side bursting (in the - // case of gRPC), but then the close MUST fail. - let mut handle = directory_service.put_multiple_start(); - if handle.put(broken_directory).await.is_ok() { - assert!( - handle.close().await.is_err(), - "when succeeding put, close must fail" - ) - } -} - /// Try uploading a Directory that refers to a previously-uploaded directory. /// Both pass their isolated validation, but the size field in the parent is wrong. /// This should be rejected. #[apply(directory_services)] #[tokio::test] async fn upload_reject_wrong_size(directory_service: impl DirectoryService) { - let wrong_parent_directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() + let wrong_parent_directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory( + DirectoryNode::new( + "foo".into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size() + 42, // wrong! + ) + .unwrap(), + )) + .unwrap(); + dir }; - // Make sure isolated validation itself is ok - assert!(wrong_parent_directory.validate().is_ok()); - // Now upload both. Ensure it either fails during the second put, or during // the close. let mut handle = directory_service.put_multiple_start(); diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs index 17a51ae2bbff..3e6dc73a4d6b 100644 --- a/tvix/castore/src/directoryservice/traverse.rs +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -1,8 +1,5 @@ -use super::DirectoryService; -use crate::{ - proto::{node::Node, NamedNode}, - B3Digest, Error, Path, -}; +use super::{DirectoryService, NamedNode, Node}; +use crate::{Error, Path}; use tracing::{instrument, warn}; /// This descends from a (root) node to the given (sub)path, returning the Node @@ -25,34 +22,31 @@ where return Ok(None); } Node::Directory(directory_node) => { - let digest: B3Digest = directory_node - .digest - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; - // fetch the linked node from the directory_service. - let directory = - directory_service - .as_ref() - .get(&digest) - .await? - .ok_or_else(|| { - // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! - warn!("directory {} does not exist", digest); - - Error::StorageError(format!("directory {} does not exist", digest)) - })?; + let directory = directory_service + .as_ref() + .get(&directory_node.digest) + .await? + .ok_or_else(|| { + // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! + warn!("directory {} does not exist", directory_node.digest); + + Error::StorageError(format!( + "directory {} does not exist", + directory_node.digest + )) + })?; // look for the component in the [Directory]. // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we // could stop as soon as e.name is larger than the search string. if let Some(child_node) = directory.nodes().find(|n| n.get_name() == component) { // child node found, update prev_node to that and continue. - parent_node = child_node; + parent_node = child_node.clone(); } else { // child node not found means there's no such element inside the directory. return Ok(None); - } + }; } } } @@ -65,6 +59,7 @@ where mod tests { use crate::{ directoryservice, + directoryservice::{DirectoryNode, Node}, fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, PathBuf, }; @@ -88,21 +83,21 @@ mod tests { handle.close().await.expect("must upload"); // construct the node for DIRECTORY_COMPLICATED - let node_directory_complicated = - crate::proto::node::Node::Directory(crate::proto::DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }); + let node_directory_complicated = Node::Directory( + DirectoryNode::new( + "doesntmatter".into(), + DIRECTORY_COMPLICATED.digest(), + DIRECTORY_COMPLICATED.size(), + ) + .unwrap(), + ); // construct the node for DIRECTORY_COMPLICATED - let node_directory_with_keep = crate::proto::node::Node::Directory( - DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), - ); + let node_directory_with_keep = + Node::Directory(DIRECTORY_COMPLICATED.directories().next().unwrap().clone()); // construct the node for the .keep file - let node_file_keep = - crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); + let node_file_keep = Node::File(DIRECTORY_WITH_KEEP.files().next().unwrap().clone()); // traversal to an empty subpath should return the root node. { diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index 726734f55eec..352362811a97 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -1,5 +1,5 @@ +use super::Directory; use super::DirectoryService; -use crate::proto; use crate::B3Digest; use crate::Error; use async_stream::try_stream; @@ -8,14 +8,14 @@ use std::collections::{HashSet, VecDeque}; use tracing::instrument; use tracing::warn; -/// Traverses a [proto::Directory] from the root to the children. +/// Traverses a [Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. #[instrument(skip(directory_service))] pub fn traverse_directory<'a, DS: DirectoryService + 'static>( directory_service: DS, root_directory_digest: &B3Digest, -) -> BoxStream<'a, Result<proto::Directory, Error>> { +) -> BoxStream<'a, Result<Directory, Error>> { // The list of all directories that still need to be traversed. The next // element is picked from the front, new elements are enqueued at the // back. @@ -50,16 +50,6 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( Some(dir) => dir, }; - - // validate, we don't want to send invalid directories. - current_directory.validate().map_err(|e| { - warn!("directory failed validation: {}", e.to_string()); - Error::StorageError(format!( - "invalid directory: {}", - current_directory_digest - )) - })?; - // We're about to send this directory, so let's avoid sending it again if a // descendant has it. sent_directory_digests.insert(current_directory_digest); @@ -67,9 +57,9 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( // enqueue all child directory digests to the work queue, as // long as they're not part of the worklist or already sent. // This panics if the digest looks invalid, it's supposed to be checked first. - for child_directory_node in ¤t_directory.directories { + for child_directory_node in current_directory.directories() { // TODO: propagate error - let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); + let child_digest: B3Digest = child_directory_node.digest.clone(); if worklist_directory_digests.contains(&child_digest) || sent_directory_digests.contains(&child_digest) |