diff options
Diffstat (limited to 'tvix/castore/src')
32 files changed, 1131 insertions, 1061 deletions
diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs index ef9a7326b3fb..4d919ff0d873 100644 --- a/tvix/castore/src/digests.rs +++ b/tvix/castore/src/digests.rs @@ -6,7 +6,7 @@ use thiserror::Error; pub struct B3Digest(Bytes); // TODO: allow converting these errors to crate::Error -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq)] pub enum Error { #[error("invalid digest length: {0}")] InvalidDigestLen(usize), diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index d10dddaf9f60..73ab4342d832 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -9,7 +9,9 @@ use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, trace, warn}; -use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter}; +use super::{ + utils::traverse_directory, Directory, DirectoryPutter, DirectoryService, SimplePutter, +}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; @@ -149,7 +151,7 @@ fn derive_directory_key(digest: &B3Digest) -> String { #[async_trait] impl DirectoryService for BigtableDirectoryService { #[instrument(skip(self, digest), err, fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let mut client = self.client.clone(); let directory_key = derive_directory_key(digest); @@ -241,28 +243,20 @@ impl DirectoryService for BigtableDirectoryService { // Try to parse the value into a Directory message. let directory = proto::Directory::decode(Bytes::from(row_cell.value)) - .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?; - - // validate the Directory. - directory - .validate() + .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))? + .try_into() .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?; Ok(Some(directory)) } #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let directory_digest = directory.digest(); let mut client = self.client.clone(); let directory_key = derive_directory_key(&directory_digest); - // Ensure the directory we're trying to upload passes validation - directory - .validate() - .map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?; - - let data = directory.encode_to_vec(); + let data = proto::Directory::from(directory).encode_to_vec(); if data.len() as u64 > CELL_SIZE_LIMIT { return Err(Error::StorageError( "Directory exceeds cell limit on Bigtable".into(), @@ -310,7 +304,7 @@ impl DirectoryService for BigtableDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs index 0fdc82c16cb0..4283142231f9 100644 --- a/tvix/castore/src/directoryservice/combinators.rs +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -7,10 +7,9 @@ use futures::TryStreamExt; use tonic::async_trait; use tracing::{instrument, trace}; -use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; +use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::directoryservice::DirectoryPutter; -use crate::proto; use crate::B3Digest; use crate::Error; @@ -40,7 +39,7 @@ where DS2: DirectoryService + Clone + 'static, { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { match self.near.get(digest).await? { Some(directory) => { trace!("serving from cache"); @@ -82,7 +81,7 @@ where } #[instrument(skip_all)] - async fn put(&self, _directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> { Err(Error::StorageError("unimplemented".to_string())) } @@ -90,7 +89,7 @@ where fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let near = self.near.clone(); let far = self.far.clone(); let digest = root_directory_digest.clone(); diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs index e6b9b163370c..aa60f68a3197 100644 --- a/tvix/castore/src/directoryservice/directory_graph.rs +++ b/tvix/castore/src/directoryservice/directory_graph.rs @@ -10,10 +10,8 @@ use petgraph::{ use tracing::instrument; use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; -use crate::{ - proto::{self, Directory, DirectoryNode}, - B3Digest, -}; +use super::{Directory, DirectoryNode}; +use crate::B3Digest; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -88,7 +86,7 @@ fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> { impl DirectoryGraph<LeavesToRootValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { if !self.order_validator.add_directory(&directory) { return Err(Error::ValidationError( "unknown directory was referenced".into(), @@ -108,7 +106,7 @@ impl DirectoryGraph<RootToLeavesValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); if !self.order_validator.digest_allowed(&digest) { return Err(Error::ValidationError("unexpected digest".into())); @@ -129,12 +127,7 @@ impl<O: OrderValidator> DirectoryGraph<O> { } /// Adds a directory which has already been confirmed to be in-order to the graph - pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> { - // Do some basic validation - directory - .validate() - .map_err(|e| Error::ValidationError(e.to_string()))?; - + pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); // Teach the graph about the existence of a node with this digest @@ -149,12 +142,10 @@ impl<O: OrderValidator> DirectoryGraph<O> { } // set up edges to all child directories - for subdir in &directory.directories { - let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap(); - + for subdir in directory.directories() { let child_ix = *self .digest_to_node_ix - .entry(subdir_digest) + .entry(subdir.digest.clone()) .or_insert_with(|| self.graph.add_node(None)); let pending_edge_check = match &self.graph[child_ix] { @@ -266,37 +257,36 @@ impl ValidatedDirectoryGraph { .filter_map(move |i| nodes[i.index()].weight.take()) } } - -#[cfg(test)] -mod tests { - use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, - proto::{self, Directory}, - }; - use lazy_static::lazy_static; - use rstest::rstest; - - lazy_static! { +/* pub static ref BROKEN_DIRECTORY : Directory = Directory { - symlinks: vec![proto::SymlinkNode { + symlinks: vec![SymlinkNode { name: "".into(), // invalid name! target: "doesntmatter".into(), }], ..Default::default() }; +*/ +#[cfg(test)] +mod tests { + use crate::directoryservice::{Directory, DirectoryNode, Node}; + use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; + use lazy_static::lazy_static; + use rstest::rstest; - pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() + use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; + + lazy_static! { + pub static ref BROKEN_PARENT_DIRECTORY: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + "foo".into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size() + 42, // wrong! + ).unwrap())).unwrap(); + dir }; } - use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; - #[rstest] /// Uploading an empty directory should succeed. #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] @@ -312,8 +302,6 @@ mod tests { #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] /// Uploading B (referring to A) should fail immediately, because A was never uploaded. #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] - /// Uploading a directory failing validation should fail immediately. - #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] /// Uploading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] fn test_uploads( @@ -366,8 +354,6 @@ mod tests { #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)] /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root). #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)] - /// Downloading a directory failing validation should fail immediately. - #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)] /// Downloading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)] fn test_downloads( diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 4dc3931ed410..2079514d2b79 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,8 +1,9 @@ use std::collections::HashSet; -use super::{DirectoryPutter, DirectoryService}; +use super::{Directory, DirectoryPutter, DirectoryService}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::proto::{self, get_directory_request::ByWhat}; +use crate::ValidateDirectoryError; use crate::{B3Digest, Error}; use async_stream::try_stream; use futures::stream::BoxStream; @@ -41,10 +42,7 @@ where T::Future: Send, { #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] - async fn get( - &self, - digest: &B3Digest, - ) -> Result<Option<crate::proto::Directory>, crate::Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest_cpy = digest.clone(); @@ -72,15 +70,10 @@ where "requested directory with digest {}, but got {}", digest, actual_digest ))) - } else if let Err(e) = directory.validate() { - // Validate the Directory itself is valid. - warn!("directory failed validation: {}", e.to_string()); - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - digest, e, - ))) } else { - Ok(Some(directory)) + Ok(Some(directory.try_into().map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?)) } } Ok(None) => Ok(None), @@ -90,11 +83,11 @@ where } #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> { let resp = self .grpc_client .clone() - .put(tokio_stream::once(directory)) + .put(tokio_stream::once(proto::Directory::from(directory))) .await; match resp { @@ -113,7 +106,7 @@ where fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest = root_directory_digest.clone(); @@ -135,14 +128,6 @@ where loop { match stream.message().await { Ok(Some(directory)) => { - // validate the directory itself. - if let Err(e) = directory.validate() { - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - directory.digest(), - e, - )))?; - } // validate we actually expected that directory, and move it from expected to received. let directory_digest = directory.digest(); let was_expected = expected_directory_digests.remove(&directory_digest); @@ -168,6 +153,9 @@ where .insert(child_directory_digest); } + let directory = directory.try_into() + .map_err(|e: ValidateDirectoryError| Error::StorageError(e.to_string()))?; + yield directory; }, Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => { @@ -279,11 +267,11 @@ pub struct GRPCPutter { #[async_trait] impl DirectoryPutter for GRPCPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> { match self.rq { // If we're not already closed, send the directory to directory_sender. Some((_, ref directory_sender)) => { - if directory_sender.send(directory).is_err() { + if directory_sender.send(directory.into()).is_err() { // If the channel has been prematurely closed, invoke close (so we can peek at the error code) // That error code is much more helpful, because it // contains the error message from the server. diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index ada4606a5a57..b039d9bc7d84 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -1,4 +1,4 @@ -use crate::{proto, B3Digest, Error}; +use crate::{B3Digest, Error}; use futures::stream::BoxStream; use std::collections::HashMap; use std::sync::Arc; @@ -7,8 +7,9 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryPutter, DirectoryService, SimplePutter}; +use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter}; use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::proto; #[derive(Clone, Default)] pub struct MemoryDirectoryService { @@ -18,7 +19,7 @@ pub struct MemoryDirectoryService { #[async_trait] impl DirectoryService for MemoryDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.read().await; match db.get(digest) { @@ -37,35 +38,20 @@ impl DirectoryService for MemoryDirectoryService { ))); } - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } - - Ok(Some(directory.clone())) + Ok(Some(directory.clone().try_into().map_err(|e| { + crate::Error::StorageError(format!("corrupted directory: {}", e)) + })?)) } } } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } - // store it let mut db = self.db.write().await; - db.insert(digest.clone(), directory); + db.insert(digest.clone(), directory.into()); Ok(digest) } @@ -74,7 +60,7 @@ impl DirectoryService for MemoryDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index 17a78b179349..0141a48f75bc 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -1,6 +1,9 @@ use crate::composition::{Registry, ServiceBuilder}; -use crate::{proto, B3Digest, Error}; +use crate::proto; +use crate::{B3Digest, Error}; +use crate::{ValidateDirectoryError, ValidateNodeError}; +use bytes::Bytes; use futures::stream::BoxStream; use tonic::async_trait; mod combinators; @@ -38,7 +41,7 @@ mod bigtable; pub use self::bigtable::{BigtableDirectoryService, BigtableParameters}; /// The base trait all Directory services need to implement. -/// This is a simple get and put of [crate::proto::Directory], returning their +/// This is a simple get and put of [Directory], returning their /// digest. #[async_trait] pub trait DirectoryService: Send + Sync { @@ -50,14 +53,14 @@ pub trait DirectoryService: Send + Sync { /// Directory digests that are at the "root", aka the last element that's /// sent to a DirectoryPutter. This makes sense for implementations bundling /// closures of directories together in batches. - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error>; /// Uploads a single Directory message, and returns the calculated /// digest, or an error. An error *must* also be returned if the message is /// not valid. - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + async fn put(&self, directory: Directory) -> Result<B3Digest, Error>; - /// Looks up a closure of [proto::Directory]. - /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`, + /// Looks up a closure of [Directory]. + /// Ideally this would be a `impl Stream<Item = Result<Directory, Error>>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. /// @@ -75,9 +78,9 @@ pub trait DirectoryService: Send + Sync { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>>; + ) -> BoxStream<'static, Result<Directory, Error>>; - /// Allows persisting a closure of [proto::Directory], which is a graph of + /// Allows persisting a closure of [Directory], which is a graph of /// connected Directory messages. fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; } @@ -87,18 +90,18 @@ impl<A> DirectoryService for A where A: AsRef<dyn DirectoryService> + Send + Sync, { - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.as_ref().get(digest).await } - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { self.as_ref().put(directory).await } fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { self.as_ref().get_recursive(root_directory_digest) } @@ -107,7 +110,7 @@ where } } -/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// Provides a handle to put a closure of connected [Directory] elements. /// /// The consumer can periodically call [DirectoryPutter::put], starting from the /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to @@ -119,12 +122,12 @@ where /// but a single file or symlink. #[async_trait] pub trait DirectoryPutter: Send { - /// Put a individual [proto::Directory] into the store. + /// Put a individual [Directory] into the store. /// Error semantics and behaviour is up to the specific implementation of /// this trait. /// Due to bursting, the returned error might refer to an object previously /// sent via `put`. - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + async fn put(&mut self, directory: Directory) -> Result<(), Error>; /// Close the stream, and wait for any errors. /// If there's been any invalid Directory message uploaded, and error *must* @@ -145,3 +148,461 @@ pub(crate) fn register_directory_services(reg: &mut Registry) { reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable"); } } + +/// A Directory can contain Directory, File or Symlink nodes. +/// Each of these nodes have a name attribute, which is the basename in that +/// directory and node type specific attributes. +/// While a Node by itself may have any name, the names of Directory entries: +/// - MUST not contain slashes or null bytes +/// - MUST not be '.' or '..' +/// - MUST be unique across all three lists +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct Directory { + nodes: Vec<Node>, +} + +/// A DirectoryNode is a pointer to a [Directory], by its [Directory::digest]. +/// It also gives it a `name` and `size`. +/// Such a node is either an element in the [Directory] it itself is contained in, +/// or a standalone root node./ +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DirectoryNode { + /// The (base)name of the directory + name: Bytes, + /// The blake3 hash of a Directory message, serialized in protobuf canonical form. + digest: B3Digest, + /// Number of child elements in the Directory referred to by `digest`. + /// Calculated by summing up the numbers of nodes, and for each directory. + /// its size field. Can be used for inode allocation. + /// This field is precisely as verifiable as any other Merkle tree edge. + /// Resolve `digest`, and you can compute it incrementally. Resolve the entire + /// tree, and you can fully compute it from scratch. + /// A credulous implementation won't reject an excessive size, but this is + /// harmless: you'll have some ordinals without nodes. Undersizing is obvious + /// and easy to reject: you won't have an ordinal for some nodes. + size: u64, +} + +impl DirectoryNode { + pub fn new(name: Bytes, digest: B3Digest, size: u64) -> Result<Self, ValidateNodeError> { + Ok(Self { name, digest, size }) + } + + pub fn digest(&self) -> &B3Digest { + &self.digest + } + pub fn size(&self) -> u64 { + self.size + } +} + +/// A FileNode represents a regular or executable file in a Directory or at the root. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FileNode { + /// The (base)name of the file + name: Bytes, + + /// The blake3 digest of the file contents + digest: B3Digest, + + /// The file content size + size: u64, + + /// Whether the file is executable + executable: bool, +} + +impl FileNode { + pub fn new( + name: Bytes, + digest: B3Digest, + size: u64, + executable: bool, + ) -> Result<Self, ValidateNodeError> { + Ok(Self { + name, + digest, + size, + executable, + }) + } + + pub fn digest(&self) -> &B3Digest { + &self.digest + } + + pub fn size(&self) -> u64 { + self.size + } + + pub fn executable(&self) -> bool { + self.executable + } +} + +/// A SymlinkNode represents a symbolic link in a Directory or at the root. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SymlinkNode { + /// The (base)name of the symlink + name: Bytes, + /// The target of the symlink. + target: Bytes, +} + +impl SymlinkNode { + pub fn new(name: Bytes, target: Bytes) -> Result<Self, ValidateNodeError> { + if target.is_empty() || target.contains(&b'\0') { + return Err(ValidateNodeError::InvalidSymlinkTarget(target)); + } + Ok(Self { name, target }) + } + + pub fn target(&self) -> &bytes::Bytes { + &self.target + } +} + +/// A Node is either a [DirectoryNode], [FileNode] or [SymlinkNode]. +/// While a Node by itself may have any name, only those matching specific requirements +/// can can be added as entries to a [Directory] (see the documentation on [Directory] for details). +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Node { + Directory(DirectoryNode), + File(FileNode), + Symlink(SymlinkNode), +} + +/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] +/// and [Node], so we can ask all of them for the name easily. +pub trait NamedNode { + fn get_name(&self) -> &bytes::Bytes; +} + +impl NamedNode for &FileNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for FileNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &DirectoryNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for DirectoryNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &SymlinkNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for SymlinkNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &Node { + fn get_name(&self) -> &bytes::Bytes { + match self { + Node::File(node_file) => &node_file.name, + Node::Directory(node_directory) => &node_directory.name, + Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} +impl NamedNode for Node { + fn get_name(&self) -> &bytes::Bytes { + match self { + Node::File(node_file) => &node_file.name, + Node::Directory(node_directory) => &node_directory.name, + Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} + +impl Node { + /// Returns the node with a new name. + pub fn rename(self, name: bytes::Bytes) -> Self { + match self { + Node::Directory(n) => Node::Directory(DirectoryNode { name, ..n }), + Node::File(n) => Node::File(FileNode { name, ..n }), + Node::Symlink(n) => Node::Symlink(SymlinkNode { name, ..n }), + } + } +} + +impl PartialOrd for Node { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for Node { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for FileNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for FileNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for DirectoryNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for DirectoryNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for SymlinkNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for SymlinkNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { + iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) +} + +impl Directory { + pub fn new() -> Self { + Directory { nodes: vec![] } + } + + /// The size of a directory is the number of all regular and symlink elements, + /// the number of directory elements, and their size fields. + pub fn size(&self) -> u64 { + // It's impossible to create a Directory where the size overflows, because we + // check before every add() that the size won't overflow. + (self.nodes.len() as u64) + self.directories().map(|e| e.size).sum::<u64>() + } + + /// Calculates the digest of a Directory, which is the blake3 hash of a + /// Directory protobuf message, serialized in protobuf canonical form. + pub fn digest(&self) -> B3Digest { + proto::Directory::from(self).digest() + } + + /// Allows iterating over all nodes (directories, files and symlinks) + /// ordered by their name. + pub fn nodes(&self) -> impl Iterator<Item = &Node> + Send + Sync + '_ { + self.nodes.iter() + } + + /// Allows iterating over the FileNode entries of this directory + /// ordered by their name + pub fn files(&self) -> impl Iterator<Item = &FileNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::File(n) => Some(n), + _ => None, + }) + } + + /// Allows iterating over the subdirectories of this directory + /// ordered by their name + pub fn directories(&self) -> impl Iterator<Item = &DirectoryNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::Directory(n) => Some(n), + _ => None, + }) + } + + /// Allows iterating over the SymlinkNode entries of this directory + /// ordered by their name + pub fn symlinks(&self) -> impl Iterator<Item = &SymlinkNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::Symlink(n) => Some(n), + _ => None, + }) + } + + /// Checks a Node name for validity as a directory entry + /// We disallow slashes, null bytes, '.', '..' and the empty string. + pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { + if name.is_empty() + || name == b".." + || name == b"." + || name.contains(&0x00) + || name.contains(&b'/') + { + Err(ValidateNodeError::InvalidName(name.to_owned().into())) + } else { + Ok(()) + } + } + + /// Adds the specified [Node] to the [Directory], preserving sorted entries. + /// + /// Inserting an element that already exists with the same name in the directory will yield an + /// error. + /// Inserting an element will validate that its name fulfills the stricter requirements for + /// directory entries and yield an error if it is not. + pub fn add(&mut self, node: Node) -> Result<(), ValidateDirectoryError> { + Self::validate_node_name(node.get_name()) + .map_err(|e| ValidateDirectoryError::InvalidNode(node.get_name().clone().into(), e))?; + + // Check that the even after adding this new directory entry, the size calculation will not + // overflow + // FUTUREWORK: add some sort of batch add interface which only does this check once with + // all the to-be-added entries + checked_sum([ + self.size(), + 1, + match node { + Node::Directory(ref dir) => dir.size, + _ => 0, + }, + ]) + .ok_or(ValidateDirectoryError::SizeOverflow)?; + + // This assumes the [Directory] is sorted, since we don't allow accessing the nodes list + // directly and all previous inserts should have been in-order + let pos = match self + .nodes + .binary_search_by_key(&node.get_name(), |n| n.get_name()) + { + Err(pos) => pos, // There is no node with this name; good! + Ok(_) => { + return Err(ValidateDirectoryError::DuplicateName( + node.get_name().to_vec(), + )) + } + }; + + self.nodes.insert(pos, node); + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::{Directory, DirectoryNode, FileNode, Node, SymlinkNode}; + use crate::fixtures::DUMMY_DIGEST; + use crate::ValidateDirectoryError; + + #[test] + fn add_nodes_to_directory() { + let mut d = Directory::new(); + + d.add(Node::Directory( + DirectoryNode::new("b".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + d.add(Node::Directory( + DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + d.add(Node::Directory( + DirectoryNode::new("z".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + + d.add(Node::File( + FileNode::new("f".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + d.add(Node::File( + FileNode::new("c".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + d.add(Node::File( + FileNode::new("g".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + + d.add(Node::Symlink( + SymlinkNode::new("t".into(), "a".into()).unwrap(), + )) + .unwrap(); + d.add(Node::Symlink( + SymlinkNode::new("o".into(), "a".into()).unwrap(), + )) + .unwrap(); + d.add(Node::Symlink( + SymlinkNode::new("e".into(), "a".into()).unwrap(), + )) + .unwrap(); + + // Convert to proto struct and back to ensure we are not generating any invalid structures + crate::directoryservice::Directory::try_from(crate::proto::Directory::from(d)) + .expect("directory should be valid"); + } + + #[test] + fn validate_overflow() { + let mut d = Directory::new(); + + assert_eq!( + d.add(Node::Directory( + DirectoryNode::new("foo".into(), DUMMY_DIGEST.clone(), u64::MAX).unwrap(), + )), + Err(ValidateDirectoryError::SizeOverflow) + ); + } + + #[test] + fn add_duplicate_node_to_directory() { + let mut d = Directory::new(); + + d.add(Node::Directory( + DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + assert_eq!( + format!( + "{}", + d.add(Node::File( + FileNode::new("a".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .expect_err("adding duplicate dir entry must fail") + ), + "\"a\" is a duplicate name" + ); + } + + /// Attempt to add a directory entry with a name which should be rejected. + #[tokio::test] + async fn directory_reject_invalid_name() { + let mut dir = Directory::new(); + assert!( + dir.add(Node::Symlink( + SymlinkNode::new( + "".into(), // wrong! can not be added to directory + "doesntmatter".into(), + ) + .unwrap() + )) + .is_err(), + "invalid symlink entry be rejected" + ); + } +} diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index a9a2cc8ef5c0..6e71c2356def 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -17,7 +17,8 @@ use tracing::{instrument, trace, warn, Level}; use url::Url; use super::{ - DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator, + Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, + RootToLeavesValidator, }; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; @@ -78,13 +79,13 @@ impl DirectoryService for ObjectStoreDirectoryService { /// This is the same steps as for get_recursive anyways, so we just call get_recursive and /// return the first element of the stream and drop the request. #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.get_recursive(digest).take(1).next().await.transpose() } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - if !directory.directories.is_empty() { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { + if directory.directories().next().is_some() { return Err(Error::InvalidRequest( "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(), )); @@ -99,7 +100,7 @@ impl DirectoryService for ObjectStoreDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // Check that we are not passing on bogus from the object store to the client, and that the // trust chain from the root digest to the leaves is intact let mut order_validator = @@ -145,6 +146,10 @@ impl DirectoryService for ObjectStoreDirectoryService { warn!("unable to parse directory {}: {}", digest, e); Error::StorageError(e.to_string()) })?; + let directory = Directory::try_from(directory).map_err(|e| { + warn!("unable to convert directory {}: {}", digest, e); + Error::StorageError(e.to_string()) + })?; // Allow the children to appear next order_validator.add_directory_unchecked(&directory); @@ -244,7 +249,7 @@ impl ObjectStoreDirectoryPutter { #[async_trait] impl DirectoryPutter for ObjectStoreDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -302,7 +307,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { for directory in directories { directories_sink - .send(directory.encode_to_vec().into()) + .send(proto::Directory::from(directory).encode_to_vec().into()) .await?; } diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs index 6045f5d24198..17431fc8d3b0 100644 --- a/tvix/castore/src/directoryservice/order_validator.rs +++ b/tvix/castore/src/directoryservice/order_validator.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use tracing::warn; -use crate::{proto::Directory, B3Digest}; +use super::Directory; +use crate::B3Digest; pub trait OrderValidator { /// Update the order validator's state with the directory @@ -47,10 +48,9 @@ impl RootToLeavesValidator { self.expected_digests.insert(directory.digest()); } - for subdir in &directory.directories { + for subdir in directory.directories() { // Allow the children to appear next - let subdir_digest = subdir.digest.clone().try_into().unwrap(); - self.expected_digests.insert(subdir_digest); + self.expected_digests.insert(subdir.digest.clone()); } } } @@ -79,12 +79,11 @@ impl OrderValidator for LeavesToRootValidator { fn add_directory(&mut self, directory: &Directory) -> bool { let digest = directory.digest(); - for subdir in &directory.directories { - let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory() - if !self.allowed_references.contains(&subdir_digest) { + for subdir in directory.directories() { + if !self.allowed_references.contains(&subdir.digest) { warn!( directory.digest = %digest, - subdirectory.digest = %subdir_digest, + subdirectory.digest = %subdir.digest, "unexpected directory reference" ); return false; @@ -101,8 +100,8 @@ impl OrderValidator for LeavesToRootValidator { mod tests { use super::{LeavesToRootValidator, RootToLeavesValidator}; use crate::directoryservice::order_validator::OrderValidator; + use crate::directoryservice::Directory; use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; - use crate::proto::Directory; use rstest::rstest; #[rstest] diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs index 51dc87f92574..d253df503bb3 100644 --- a/tvix/castore/src/directoryservice/redb.rs +++ b/tvix/castore/src/directoryservice/redb.rs @@ -5,15 +5,15 @@ use std::{path::PathBuf, sync::Arc}; use tonic::async_trait; use tracing::{instrument, warn}; +use super::{ + traverse_directory, Directory, DirectoryGraph, DirectoryPutter, DirectoryService, + LeavesToRootValidator, +}; use crate::{ composition::{CompositionContext, ServiceBuilder}, digests, proto, B3Digest, Error, }; -use super::{ - traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, -}; - const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = TableDefinition::new("directory"); @@ -69,7 +69,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { #[async_trait] impl DirectoryService for RedbDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.clone(); // Retrieves the protobuf-encoded Directory for the corresponding digest. @@ -107,13 +107,10 @@ impl DirectoryService for RedbDirectoryService { let directory = match proto::Directory::decode(&*directory_data.value()) { Ok(dir) => { // The returned Directory must be valid. - if let Err(e) = dir.validate() { + dir.try_into().map_err(|e| { warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - dir + Error::StorageError("Directory failed validation".to_string()) + })? } Err(e) => { warn!(err=%e, "failed to parse Directory"); @@ -125,26 +122,21 @@ impl DirectoryService for RedbDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); move || { let digest = directory.digest(); - // Validate the directory. - if let Err(e) = directory.validate() { - warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - // Store the directory in the table. let txn = db.begin_write()?; { let mut table = txn.open_table(DIRECTORY_TABLE)?; let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } txn.commit()?; @@ -158,7 +150,7 @@ impl DirectoryService for RedbDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single // redb transaction to avoid constantly closing and opening new transactions for the // database. @@ -185,7 +177,7 @@ pub struct RedbDirectoryPutter { #[async_trait] impl DirectoryPutter for RedbDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -228,7 +220,10 @@ impl DirectoryPutter for RedbDirectoryPutter { for directory in directories { let digest_as_array: [u8; digests::B3_LEN] = directory.digest().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } } diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs index dc54e3d11d18..b4daaee61b22 100644 --- a/tvix/castore/src/directoryservice/simple_putter.rs +++ b/tvix/castore/src/directoryservice/simple_putter.rs @@ -1,7 +1,6 @@ use super::DirectoryPutter; use super::DirectoryService; -use super::{DirectoryGraph, LeavesToRootValidator}; -use crate::proto; +use super::{Directory, DirectoryGraph, LeavesToRootValidator}; use crate::B3Digest; use crate::Error; use tonic::async_trait; @@ -29,7 +28,7 @@ impl<DS: DirectoryService> SimplePutter<DS> { #[async_trait] impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index 5766dec1a5c2..4f3a860d14e4 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -1,5 +1,3 @@ -use crate::proto::Directory; -use crate::{proto, B3Digest, Error}; use futures::stream::BoxStream; use prost::Message; use std::ops::Deref; @@ -9,8 +7,9 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; +use super::{Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{proto, B3Digest, Error}; #[derive(Clone)] pub struct SledDirectoryService { @@ -44,7 +43,7 @@ impl SledDirectoryService { #[async_trait] impl DirectoryService for SledDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let resp = tokio::task::spawn_blocking({ let db = self.db.clone(); let digest = digest.clone(); @@ -61,7 +60,7 @@ impl DirectoryService for SledDirectoryService { None => Ok(None), // The directory was found, try to parse the data as Directory message - Some(data) => match Directory::decode(&*data) { + Some(data) => match proto::Directory::decode(&*data) { Ok(directory) => { // Validate the retrieved Directory indeed has the // digest we expect it to have, to detect corruptions. @@ -73,14 +72,10 @@ impl DirectoryService for SledDirectoryService { ))); } - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } + let directory = directory.try_into().map_err(|e| { + warn!("failed to retrieve directory: {}", e); + Error::StorageError(format!("failed to retrieve directory: {}", e)) + })?; Ok(Some(directory)) } @@ -93,22 +88,18 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); move || { let digest = directory.digest(); - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } // store it - db.insert(digest.as_slice(), directory.encode_to_vec()) - .map_err(|e| Error::StorageError(e.to_string()))?; + db.insert( + digest.as_slice(), + proto::Directory::from(directory).encode_to_vec(), + ) + .map_err(|e| Error::StorageError(e.to_string()))?; Ok(digest) } @@ -120,7 +111,7 @@ impl DirectoryService for SledDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } @@ -215,7 +206,7 @@ pub struct SledDirectoryPutter { #[async_trait] impl DirectoryPutter for SledDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -252,7 +243,10 @@ impl DirectoryPutter for SledDirectoryPutter { let mut batch = sled::Batch::default(); for directory in directories { - batch.insert(directory.digest().as_slice(), directory.encode_to_vec()); + batch.insert( + directory.digest().as_slice(), + proto::Directory::from(directory).encode_to_vec(), + ); } tree.apply_batch(batch).map_err(|e| { diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index b698f70ea469..3923e66dc2c2 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -8,10 +8,8 @@ use rstest_reuse::{self, *}; use super::DirectoryService; use crate::directoryservice; -use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D}, - proto::{self, Directory}, -}; +use crate::directoryservice::{Directory, DirectoryNode, Node}; +use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D}; mod utils; use self::utils::make_grpc_directory_service_client; @@ -41,10 +39,10 @@ async fn test_non_exist(directory_service: impl DirectoryService) { // recursive get assert_eq!( - Vec::<Result<proto::Directory, crate::Error>>::new(), + Vec::<Result<Directory, crate::Error>>::new(), directory_service .get_recursive(&DIRECTORY_A.digest()) - .collect::<Vec<Result<proto::Directory, crate::Error>>>() + .collect::<Vec<Result<Directory, crate::Error>>>() .await ); } @@ -212,59 +210,26 @@ async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService } } -/// Try uploading a Directory failing its internal validation, ensure it gets -/// rejected. -#[apply(directory_services)] -#[tokio::test] -async fn upload_reject_failing_validation(directory_service: impl DirectoryService) { - let broken_directory = Directory { - symlinks: vec![proto::SymlinkNode { - name: "".into(), // wrong! - target: "doesntmatter".into(), - }], - ..Default::default() - }; - assert!(broken_directory.validate().is_err()); - - // Try to upload via single upload. - assert!( - directory_service - .put(broken_directory.clone()) - .await - .is_err(), - "single upload must fail" - ); - - // Try to upload via put_multiple. We're a bit more permissive here, the - // intermediate .put() might succeed, due to client-side bursting (in the - // case of gRPC), but then the close MUST fail. - let mut handle = directory_service.put_multiple_start(); - if handle.put(broken_directory).await.is_ok() { - assert!( - handle.close().await.is_err(), - "when succeeding put, close must fail" - ) - } -} - /// Try uploading a Directory that refers to a previously-uploaded directory. /// Both pass their isolated validation, but the size field in the parent is wrong. /// This should be rejected. #[apply(directory_services)] #[tokio::test] async fn upload_reject_wrong_size(directory_service: impl DirectoryService) { - let wrong_parent_directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() + let wrong_parent_directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory( + DirectoryNode::new( + "foo".into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size() + 42, // wrong! + ) + .unwrap(), + )) + .unwrap(); + dir }; - // Make sure isolated validation itself is ok - assert!(wrong_parent_directory.validate().is_ok()); - // Now upload both. Ensure it either fails during the second put, or during // the close. let mut handle = directory_service.put_multiple_start(); diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs index 17a51ae2bbff..3e6dc73a4d6b 100644 --- a/tvix/castore/src/directoryservice/traverse.rs +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -1,8 +1,5 @@ -use super::DirectoryService; -use crate::{ - proto::{node::Node, NamedNode}, - B3Digest, Error, Path, -}; +use super::{DirectoryService, NamedNode, Node}; +use crate::{Error, Path}; use tracing::{instrument, warn}; /// This descends from a (root) node to the given (sub)path, returning the Node @@ -25,34 +22,31 @@ where return Ok(None); } Node::Directory(directory_node) => { - let digest: B3Digest = directory_node - .digest - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; - // fetch the linked node from the directory_service. - let directory = - directory_service - .as_ref() - .get(&digest) - .await? - .ok_or_else(|| { - // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! - warn!("directory {} does not exist", digest); - - Error::StorageError(format!("directory {} does not exist", digest)) - })?; + let directory = directory_service + .as_ref() + .get(&directory_node.digest) + .await? + .ok_or_else(|| { + // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! + warn!("directory {} does not exist", directory_node.digest); + + Error::StorageError(format!( + "directory {} does not exist", + directory_node.digest + )) + })?; // look for the component in the [Directory]. // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we // could stop as soon as e.name is larger than the search string. if let Some(child_node) = directory.nodes().find(|n| n.get_name() == component) { // child node found, update prev_node to that and continue. - parent_node = child_node; + parent_node = child_node.clone(); } else { // child node not found means there's no such element inside the directory. return Ok(None); - } + }; } } } @@ -65,6 +59,7 @@ where mod tests { use crate::{ directoryservice, + directoryservice::{DirectoryNode, Node}, fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, PathBuf, }; @@ -88,21 +83,21 @@ mod tests { handle.close().await.expect("must upload"); // construct the node for DIRECTORY_COMPLICATED - let node_directory_complicated = - crate::proto::node::Node::Directory(crate::proto::DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }); + let node_directory_complicated = Node::Directory( + DirectoryNode::new( + "doesntmatter".into(), + DIRECTORY_COMPLICATED.digest(), + DIRECTORY_COMPLICATED.size(), + ) + .unwrap(), + ); // construct the node for DIRECTORY_COMPLICATED - let node_directory_with_keep = crate::proto::node::Node::Directory( - DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), - ); + let node_directory_with_keep = + Node::Directory(DIRECTORY_COMPLICATED.directories().next().unwrap().clone()); // construct the node for the .keep file - let node_file_keep = - crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); + let node_file_keep = Node::File(DIRECTORY_WITH_KEEP.files().next().unwrap().clone()); // traversal to an empty subpath should return the root node. { diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index 726734f55eec..352362811a97 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -1,5 +1,5 @@ +use super::Directory; use super::DirectoryService; -use crate::proto; use crate::B3Digest; use crate::Error; use async_stream::try_stream; @@ -8,14 +8,14 @@ use std::collections::{HashSet, VecDeque}; use tracing::instrument; use tracing::warn; -/// Traverses a [proto::Directory] from the root to the children. +/// Traverses a [Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. #[instrument(skip(directory_service))] pub fn traverse_directory<'a, DS: DirectoryService + 'static>( directory_service: DS, root_directory_digest: &B3Digest, -) -> BoxStream<'a, Result<proto::Directory, Error>> { +) -> BoxStream<'a, Result<Directory, Error>> { // The list of all directories that still need to be traversed. The next // element is picked from the front, new elements are enqueued at the // back. @@ -50,16 +50,6 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( Some(dir) => dir, }; - - // validate, we don't want to send invalid directories. - current_directory.validate().map_err(|e| { - warn!("directory failed validation: {}", e.to_string()); - Error::StorageError(format!( - "invalid directory: {}", - current_directory_digest - )) - })?; - // We're about to send this directory, so let's avoid sending it again if a // descendant has it. sent_directory_digests.insert(current_directory_digest); @@ -67,9 +57,9 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( // enqueue all child directory digests to the work queue, as // long as they're not part of the worklist or already sent. // This panics if the digest looks invalid, it's supposed to be checked first. - for child_directory_node in ¤t_directory.directories { + for child_directory_node in current_directory.directories() { // TODO: propagate error - let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); + let child_digest: B3Digest = child_directory_node.digest.clone(); if worklist_directory_digests.contains(&child_digest) || sent_directory_digests.contains(&child_digest) diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs index 5bbcd7b04ef1..083f43036845 100644 --- a/tvix/castore/src/errors.rs +++ b/tvix/castore/src/errors.rs @@ -1,3 +1,4 @@ +use bstr::ByteSlice; use thiserror::Error; use tokio::task::JoinError; use tonic::Status; @@ -12,6 +13,46 @@ pub enum Error { StorageError(String), } +/// Errors that can occur during the validation of [Directory] messages. +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum ValidateDirectoryError { + /// Elements are not in sorted order + #[error("{:?} is not sorted", .0.as_bstr())] + WrongSorting(Vec<u8>), + /// Multiple elements with the same name encountered + #[error("{:?} is a duplicate name", .0.as_bstr())] + DuplicateName(Vec<u8>), + /// Invalid node + #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())] + InvalidNode(Vec<u8>, ValidateNodeError), + #[error("Total size exceeds u32::MAX")] + SizeOverflow, +} + +/// Errors that occur during Node validation +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum ValidateNodeError { + #[error("No node set")] + NoNodeSet, + /// Invalid digest length encountered + #[error("invalid digest length: {0}")] + InvalidDigestLen(usize), + /// Invalid name encountered + #[error("Invalid name: {}", .0.as_bstr())] + InvalidName(bytes::Bytes), + /// Invalid symlink target + #[error("Invalid symlink target: {}", .0.as_bstr())] + InvalidSymlinkTarget(bytes::Bytes), +} + +impl From<crate::digests::Error> for ValidateNodeError { + fn from(e: crate::digests::Error) -> Self { + match e { + crate::digests::Error::InvalidDigestLen(n) => ValidateNodeError::InvalidDigestLen(n), + } + } +} + impl From<JoinError> for Error { fn from(value: JoinError) -> Self { Error::StorageError(value.to_string()) diff --git a/tvix/castore/src/fixtures.rs b/tvix/castore/src/fixtures.rs index 3ebda64a818a..0e423d348522 100644 --- a/tvix/castore/src/fixtures.rs +++ b/tvix/castore/src/fixtures.rs @@ -1,5 +1,5 @@ use crate::{ - proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode}, + directoryservice::{Directory, DirectoryNode, FileNode, Node, SymlinkNode}, B3Digest, }; use lazy_static::lazy_static; @@ -34,70 +34,72 @@ lazy_static! { pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into(); // Directories - pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory { - directories: vec![], - files: vec![FileNode { - name: b".keep".to_vec().into(), - digest: EMPTY_BLOB_DIGEST.clone().into(), - size: 0, - executable: false, - }], - symlinks: vec![], + pub static ref DIRECTORY_WITH_KEEP: Directory = { + let mut dir = Directory::new(); + dir.add(Node::File(FileNode::new( + b".keep".to_vec().into(), + EMPTY_BLOB_DIGEST.clone(), + 0, + false + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory { - directories: vec![DirectoryNode { - name: b"keep".to_vec().into(), - digest: DIRECTORY_WITH_KEEP.digest().into(), - size: DIRECTORY_WITH_KEEP.size(), - }], - files: vec![FileNode { - name: b".keep".to_vec().into(), - digest: EMPTY_BLOB_DIGEST.clone().into(), - size: 0, - executable: false, - }], - symlinks: vec![SymlinkNode { - name: b"aa".to_vec().into(), - target: b"/nix/store/somewhereelse".to_vec().into(), - }], + pub static ref DIRECTORY_COMPLICATED: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"keep".to_vec().into(), + DIRECTORY_WITH_KEEP.digest(), + DIRECTORY_WITH_KEEP.size() + ).unwrap())).unwrap(); + dir.add(Node::File(FileNode::new( + b".keep".to_vec().into(), + EMPTY_BLOB_DIGEST.clone(), + 0, + false + ).unwrap())).unwrap(); + dir.add(Node::Symlink(SymlinkNode::new( + b"aa".to_vec().into(), + b"/nix/store/somewhereelse".to_vec().into() + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_A: Directory = Directory::default(); - pub static ref DIRECTORY_B: Directory = Directory { - directories: vec![DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - }], - ..Default::default() + pub static ref DIRECTORY_A: Directory = Directory::new(); + pub static ref DIRECTORY_B: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"a".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_C: Directory = Directory { - directories: vec![ - DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - }, - DirectoryNode { - name: b"a'".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - } - ], - ..Default::default() + pub static ref DIRECTORY_C: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"a".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir.add(Node::Directory(DirectoryNode::new( + b"a'".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_D: proto::Directory = proto::Directory { - directories: vec![ - DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - }, - DirectoryNode { - name: b"b'".to_vec().into(), - digest: DIRECTORY_B.digest().into(), - size: DIRECTORY_B.size(), - } - ], - ..Default::default() + pub static ref DIRECTORY_D: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"a".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir.add(Node::Directory(DirectoryNode::new( + b"b".to_vec().into(), + DIRECTORY_B.digest(), + DIRECTORY_B.size(), + ).unwrap())).unwrap(); + dir + }; } diff --git a/tvix/castore/src/fs/fuse/tests.rs b/tvix/castore/src/fs/fuse/tests.rs index bcebcf4a7292..726beb5858c5 100644 --- a/tvix/castore/src/fs/fuse/tests.rs +++ b/tvix/castore/src/fs/fuse/tests.rs @@ -13,11 +13,11 @@ use tokio_stream::{wrappers::ReadDirStream, StreamExt}; use super::FuseDaemon; use crate::fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST}; -use crate::proto as castorepb; -use crate::proto::node::Node; use crate::{ blobservice::{BlobService, MemoryBlobService}, - directoryservice::{DirectoryService, MemoryDirectoryService}, + directoryservice::{ + DirectoryNode, DirectoryService, FileNode, MemoryDirectoryService, Node, SymlinkNode, + }, fixtures, }; @@ -70,12 +70,15 @@ async fn populate_blob_a( root_nodes.insert( BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), - size: fixtures::BLOB_A.len() as u64, - executable: false, - }), + Node::File( + FileNode::new( + BLOB_A_NAME.into(), + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u64, + false, + ) + .unwrap(), + ), ); } @@ -91,12 +94,15 @@ async fn populate_blob_b( root_nodes.insert( BLOB_B_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_B_NAME.into(), - digest: fixtures::BLOB_B_DIGEST.clone().into(), - size: fixtures::BLOB_B.len() as u64, - executable: false, - }), + Node::File( + FileNode::new( + BLOB_B_NAME.into(), + fixtures::BLOB_B_DIGEST.clone(), + fixtures::BLOB_B.len() as u64, + false, + ) + .unwrap(), + ), ); } @@ -116,22 +122,22 @@ async fn populate_blob_helloworld( root_nodes.insert( HELLOWORLD_BLOB_NAME.into(), - Node::File(castorepb::FileNode { - name: HELLOWORLD_BLOB_NAME.into(), - digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), - size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, - executable: true, - }), + Node::File( + FileNode::new( + HELLOWORLD_BLOB_NAME.into(), + fixtures::HELLOWORLD_BLOB_DIGEST.clone(), + fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, + true, + ) + .unwrap(), + ), ); } async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( SYMLINK_NAME.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME.into(), - target: BLOB_A_NAME.into(), - }), + Node::Symlink(SymlinkNode::new(SYMLINK_NAME.into(), BLOB_A_NAME.into()).unwrap()), ); } @@ -140,10 +146,9 @@ async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( SYMLINK_NAME2.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME2.into(), - target: "/nix/store/somewhereelse".into(), - }), + Node::Symlink( + SymlinkNode::new(SYMLINK_NAME2.into(), "/nix/store/somewhereelse".into()).unwrap(), + ), ); } @@ -167,11 +172,14 @@ async fn populate_directory_with_keep( root_nodes.insert( DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), - size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + Node::Directory( + DirectoryNode::new( + DIRECTORY_WITH_KEEP_NAME.into(), + fixtures::DIRECTORY_WITH_KEEP.digest(), + fixtures::DIRECTORY_WITH_KEEP.size(), + ) + .unwrap(), + ), ); } @@ -180,11 +188,14 @@ async fn populate_directory_with_keep( async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), - size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + Node::Directory( + DirectoryNode::new( + DIRECTORY_WITH_KEEP_NAME.into(), + fixtures::DIRECTORY_WITH_KEEP.digest(), + fixtures::DIRECTORY_WITH_KEEP.size(), + ) + .unwrap(), + ), ); } @@ -192,12 +203,15 @@ async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Byte async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), - size: fixtures::BLOB_A.len() as u64, - executable: false, - }), + Node::File( + FileNode::new( + BLOB_A_NAME.into(), + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u64, + false, + ) + .unwrap(), + ), ); } @@ -227,11 +241,14 @@ async fn populate_directory_complicated( root_nodes.insert( DIRECTORY_COMPLICATED_NAME.into(), - Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_COMPLICATED_NAME.into(), - digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), - size: fixtures::DIRECTORY_COMPLICATED.size(), - }), + Node::Directory( + DirectoryNode::new( + DIRECTORY_COMPLICATED_NAME.into(), + fixtures::DIRECTORY_COMPLICATED.digest(), + fixtures::DIRECTORY_COMPLICATED.size(), + ) + .unwrap(), + ), ); } diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs index bdd459543470..379d4ab87318 100644 --- a/tvix/castore/src/fs/inodes.rs +++ b/tvix/castore/src/fs/inodes.rs @@ -4,7 +4,7 @@ use std::time::Duration; use bytes::Bytes; -use crate::proto as castorepb; +use crate::directoryservice::{NamedNode, Node}; use crate::B3Digest; #[derive(Clone, Debug)] @@ -20,27 +20,24 @@ pub enum InodeData { /// lookup and did fetch the data. #[derive(Clone, Debug)] pub enum DirectoryInodeData { - Sparse(B3Digest, u64), // digest, size - Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)] + Sparse(B3Digest, u64), // digest, size + Populated(B3Digest, Vec<(u64, Node)>), // [(child_inode, node)] } impl InodeData { /// Constructs a new InodeData by consuming a [Node]. /// It splits off the orginal name, so it can be used later. - pub fn from_node(node: castorepb::node::Node) -> (Self, Bytes) { + pub fn from_node(node: &Node) -> (Self, Bytes) { match node { - castorepb::node::Node::Directory(n) => ( - Self::Directory(DirectoryInodeData::Sparse( - n.digest.try_into().unwrap(), - n.size, - )), - n.name, + Node::Directory(n) => ( + Self::Directory(DirectoryInodeData::Sparse(n.digest().clone(), n.size())), + n.get_name().clone(), ), - castorepb::node::Node::File(n) => ( - Self::Regular(n.digest.try_into().unwrap(), n.size, n.executable), - n.name, + Node::File(n) => ( + Self::Regular(n.digest().clone(), n.size(), n.executable()), + n.get_name().clone(), ), - castorepb::node::Node::Symlink(n) => (Self::Symlink(n.target), n.name), + Node::Symlink(n) => (Self::Symlink(n.target().clone()), n.get_name().clone()), } } diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index b565ed60ac42..4a6ca88d73fd 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -15,11 +15,9 @@ use self::{ inode_tracker::InodeTracker, inodes::{DirectoryInodeData, InodeData}, }; -use crate::proto as castorepb; use crate::{ blobservice::{BlobReader, BlobService}, - directoryservice::DirectoryService, - proto::{node::Node, NamedNode}, + directoryservice::{DirectoryService, NamedNode, Node}, B3Digest, }; use bstr::ByteVec; @@ -198,13 +196,13 @@ where let children = { let mut inode_tracker = self.inode_tracker.write(); - let children: Vec<(u64, castorepb::node::Node)> = directory + let children: Vec<(u64, Node)> = directory .nodes() .map(|child_node| { - let (inode_data, _) = InodeData::from_node(child_node.clone()); + let (inode_data, _) = InodeData::from_node(child_node); let child_ino = inode_tracker.put(inode_data); - (child_ino, child_node) + (child_ino, child_node.clone()) }) .collect(); @@ -287,7 +285,7 @@ where // insert the (sparse) inode data and register in // self.root_nodes. - let (inode_data, name) = InodeData::from_node(root_node); + let (inode_data, name) = InodeData::from_node(&root_node); let ino = inode_tracker.put(inode_data.clone()); root_nodes.insert(name, ino); @@ -468,7 +466,7 @@ where io::Error::from_raw_os_error(libc::EIO) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let (inode_data, name) = InodeData::from_node(&root_node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -498,7 +496,7 @@ where Span::current().record("directory.digest", parent_digest.to_string()); for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + let (inode_data, name) = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { @@ -555,7 +553,7 @@ where io::Error::from_raw_os_error(libc::EPERM) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let (inode_data, name) = InodeData::from_node(&root_node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -588,7 +586,7 @@ where Span::current().record("directory.digest", parent_digest.to_string()); for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + let (inode_data, name) = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry( diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs index 6609e049a1fc..6d78b243d064 100644 --- a/tvix/castore/src/fs/root_nodes.rs +++ b/tvix/castore/src/fs/root_nodes.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use crate::{proto::node::Node, Error}; +use crate::{directoryservice::Node, Error}; use bytes::Bytes; use futures::stream::BoxStream; use tonic::async_trait; diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index cd5b1290e031..930f33f68b46 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -11,9 +11,8 @@ use tokio_tar::Archive; use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; -use crate::directoryservice::DirectoryService; +use crate::directoryservice::{DirectoryService, Node}; use crate::import::{ingest_entries, IngestionEntry, IngestionError}; -use crate::proto::node::Node; use super::blobs::{self, ConcurrentBlobUploader}; diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index dc7821b8101e..f156c8a73527 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -15,8 +15,7 @@ use walkdir::DirEntry; use walkdir::WalkDir; use crate::blobservice::BlobService; -use crate::directoryservice::DirectoryService; -use crate::proto::node::Node; +use crate::directoryservice::{DirectoryService, Node}; use crate::B3Digest; use super::ingest_entries; diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index a9ac0be6b064..03e2b6c7db41 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -4,14 +4,14 @@ //! Specific implementations, such as ingesting from the filesystem, live in //! child modules. +use crate::directoryservice::Directory; +use crate::directoryservice::DirectoryNode; use crate::directoryservice::DirectoryPutter; use crate::directoryservice::DirectoryService; +use crate::directoryservice::FileNode; +use crate::directoryservice::Node; +use crate::directoryservice::SymlinkNode; use crate::path::{Path, PathBuf}; -use crate::proto::node::Node; -use crate::proto::Directory; -use crate::proto::DirectoryNode; -use crate::proto::FileNode; -use crate::proto::SymlinkNode; use crate::B3Digest; use futures::{Stream, StreamExt}; use tracing::Level; @@ -98,27 +98,36 @@ where IngestionError::UploadDirectoryError(entry.path().to_owned(), e) })?; - Node::Directory(DirectoryNode { - name, - digest: directory_digest.into(), - size: directory_size, - }) + Node::Directory( + DirectoryNode::new(name, directory_digest, directory_size).map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?, + ) } - IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode { - name, - target: target.to_owned().into(), - }), + IngestionEntry::Symlink { ref target, .. } => Node::Symlink( + SymlinkNode::new(name, target.to_owned().into()).map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?, + ), IngestionEntry::Regular { size, executable, digest, .. - } => Node::File(FileNode { - name, - digest: digest.to_owned().into(), - size: *size, - executable: *executable, - }), + } => Node::File( + FileNode::new(name, digest.clone(), *size, *executable).map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?, + ), }; let parent = entry @@ -130,7 +139,16 @@ where break node; } else { // record node in parent directory, creating a new [Directory] if not there yet. - directories.entry(parent.to_owned()).or_default().add(node); + directories + .entry(parent.to_owned()) + .or_default() + .add(node) + .map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?; } }; @@ -156,14 +174,7 @@ where #[cfg(debug_assertions)] { if let Node::Directory(directory_node) = &root_node { - debug_assert_eq!( - root_directory_digest, - directory_node - .digest - .to_vec() - .try_into() - .expect("invalid digest len") - ) + debug_assert_eq!(&root_directory_digest, directory_node.digest()) } else { unreachable!("Tvix bug: directory putter initialized but no root directory node"); } @@ -208,9 +219,8 @@ impl IngestionEntry { mod test { use rstest::rstest; + use crate::directoryservice::{Directory, DirectoryNode, FileNode, Node, SymlinkNode}; use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST}; - use crate::proto::node::Node; - use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode}; use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST}; use super::ingest_entries; @@ -223,18 +233,18 @@ mod test { executable: true, digest: DUMMY_DIGEST.clone(), }], - Node::File(FileNode { name: "foo".into(), digest: DUMMY_DIGEST.clone().into(), size: 42, executable: true } - ))] + Node::File(FileNode::new("foo".into(), DUMMY_DIGEST.clone(), 42, true).unwrap()) + )] #[case::single_symlink(vec![IngestionEntry::Symlink { path: "foo".parse().unwrap(), target: b"blub".into(), }], - Node::Symlink(SymlinkNode { name: "foo".into(), target: "blub".into()}) + Node::Symlink(SymlinkNode::new("foo".into(), "blub".into()).unwrap()) )] #[case::single_dir(vec![IngestionEntry::Dir { path: "foo".parse().unwrap(), }], - Node::Directory(DirectoryNode { name: "foo".into(), digest: Directory::default().digest().into(), size: Directory::default().size()}) + Node::Directory(DirectoryNode::new("foo".into(), Directory::default().digest(), Directory::default().size()).unwrap()) )] #[case::dir_with_keep(vec![ IngestionEntry::Regular { @@ -247,7 +257,7 @@ mod test { path: "foo".parse().unwrap(), }, ], - Node::Directory(DirectoryNode { name: "foo".into(), digest: DIRECTORY_WITH_KEEP.digest().into(), size: DIRECTORY_WITH_KEEP.size() }) + Node::Directory(DirectoryNode::new("foo".into(), DIRECTORY_WITH_KEEP.digest(), DIRECTORY_WITH_KEEP.size()).unwrap()) )] /// This is intentionally a bit unsorted, though it still satisfies all /// requirements we have on the order of elements in the stream. @@ -275,7 +285,7 @@ mod test { path: "blub".parse().unwrap(), }, ], - Node::Directory(DirectoryNode { name: "blub".into(), digest: DIRECTORY_COMPLICATED.digest().into(), size:DIRECTORY_COMPLICATED.size() }) + Node::Directory(DirectoryNode::new("blub".into(), DIRECTORY_COMPLICATED.digest(), DIRECTORY_COMPLICATED.size()).unwrap()) )] #[tokio::test] async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) { diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index 4fca9801c97b..91302e90996c 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -18,7 +18,7 @@ pub mod proto; pub mod tonic; pub use digests::{B3Digest, B3_LEN}; -pub use errors::Error; +pub use errors::{Error, ValidateDirectoryError, ValidateNodeError}; pub use hashing_reader::{B3HashingReader, HashingReader}; #[cfg(test)] diff --git a/tvix/castore/src/path.rs b/tvix/castore/src/path.rs index fcc2bd01fbd6..77cabeccdb8c 100644 --- a/tvix/castore/src/path.rs +++ b/tvix/castore/src/path.rs @@ -10,7 +10,7 @@ use std::{ use bstr::ByteSlice; -use crate::proto::validate_node_name; +use crate::directoryservice::Directory; /// Represents a Path in the castore model. /// These are always relative, and platform-independent, which distinguishes @@ -38,7 +38,7 @@ impl Path { if !bytes.is_empty() { // Ensure all components are valid castore node names. for component in bytes.split_str(b"/") { - validate_node_name(component).ok()?; + Directory::validate_node_name(component).ok()?; } } @@ -211,7 +211,7 @@ impl PathBuf { /// Adjoins `name` to self. pub fn try_push(&mut self, name: &[u8]) -> Result<(), std::io::Error> { - validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?; + Directory::validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?; if !self.inner.is_empty() { self.inner.push(b'/'); diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs index ce1d2bcd244a..e65145509184 100644 --- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs @@ -1,7 +1,5 @@ -use crate::directoryservice::DirectoryGraph; -use crate::directoryservice::LeavesToRootValidator; -use crate::proto; -use crate::{directoryservice::DirectoryService, B3Digest}; +use crate::directoryservice::{DirectoryGraph, DirectoryService, LeavesToRootValidator}; +use crate::{proto, B3Digest, ValidateDirectoryError}; use futures::stream::BoxStream; use futures::TryStreamExt; use std::ops::Deref; @@ -58,13 +56,16 @@ where Status::not_found(format!("directory {} not found", digest)) })?; - Box::pin(once(Ok(directory))) + Box::pin(once(Ok(directory.into()))) } else { // If recursive was requested, traverse via get_recursive. Box::pin( - self.directory_service.get_recursive(&digest).map_err(|e| { - tonic::Status::new(tonic::Code::Internal, e.to_string()) - }), + self.directory_service + .get_recursive(&digest) + .map_ok(proto::Directory::from) + .map_err(|e| { + tonic::Status::new(tonic::Code::Internal, e.to_string()) + }), ) } })) @@ -83,7 +84,9 @@ where let mut validator = DirectoryGraph::<LeavesToRootValidator>::default(); while let Some(directory) = req_inner.message().await? { validator - .add(directory) + .add(directory.try_into().map_err(|e: ValidateDirectoryError| { + tonic::Status::new(tonic::Code::Internal, e.to_string()) + })?) .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?; } diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs index a0cec896f753..7e98cd74c591 100644 --- a/tvix/castore/src/proto/mod.rs +++ b/tvix/castore/src/proto/mod.rs @@ -1,7 +1,4 @@ -#![allow(non_snake_case)] -// https://github.com/hyperium/tonic/issues/1056 -use bstr::ByteSlice; -use std::{collections::HashSet, iter::Peekable, str}; +use std::str; use prost::Message; @@ -11,7 +8,8 @@ mod grpc_directoryservice_wrapper; pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; -use crate::{B3Digest, B3_LEN}; +use crate::directoryservice::NamedNode; +use crate::{B3Digest, ValidateDirectoryError, ValidateNodeError}; tonic::include_proto!("tvix.castore.v1"); @@ -24,38 +22,6 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix #[cfg(test)] mod tests; -/// Errors that can occur during the validation of [Directory] messages. -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub enum ValidateDirectoryError { - /// Elements are not in sorted order - #[error("{:?} is not sorted", .0.as_bstr())] - WrongSorting(Vec<u8>), - /// Multiple elements with the same name encountered - #[error("{:?} is a duplicate name", .0.as_bstr())] - DuplicateName(Vec<u8>), - /// Invalid node - #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())] - InvalidNode(Vec<u8>, ValidateNodeError), - #[error("Total size exceeds u32::MAX")] - SizeOverflow, -} - -/// Errors that occur during Node validation -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub enum ValidateNodeError { - #[error("No node set")] - NoNodeSet, - /// Invalid digest length encountered - #[error("Invalid Digest length: {0}")] - InvalidDigestLen(usize), - /// Invalid name encountered - #[error("Invalid name: {}", .0.as_bstr())] - InvalidName(Vec<u8>), - /// Invalid symlink target - #[error("Invalid symlink target: {}", .0.as_bstr())] - InvalidSymlinkTarget(Vec<u8>), -} - /// Errors that occur during StatBlobResponse validation #[derive(Debug, PartialEq, Eq, thiserror::Error)] pub enum ValidateStatBlobResponseError { @@ -64,186 +30,6 @@ pub enum ValidateStatBlobResponseError { InvalidDigestLen(usize, usize), } -/// Checks a Node name for validity as an intermediate node. -/// We disallow slashes, null bytes, '.', '..' and the empty string. -pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { - if name.is_empty() - || name == b".." - || name == b"." - || name.contains(&0x00) - || name.contains(&b'/') - { - Err(ValidateNodeError::InvalidName(name.to_owned())) - } else { - Ok(()) - } -} - -/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] -/// and [node::Node], so we can ask all of them for the name easily. -pub trait NamedNode { - fn get_name(&self) -> &[u8]; -} - -impl NamedNode for &FileNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for &DirectoryNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for &SymlinkNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for node::Node { - fn get_name(&self) -> &[u8] { - match self { - node::Node::File(node_file) => &node_file.name, - node::Node::Directory(node_directory) => &node_directory.name, - node::Node::Symlink(node_symlink) => &node_symlink.name, - } - } -} - -impl Node { - /// Ensures the node has a valid enum kind (is Some), and passes its - // per-enum validation. - // The inner root node is returned for easier consumption. - pub fn validate(&self) -> Result<&node::Node, ValidateNodeError> { - if let Some(node) = self.node.as_ref() { - node.validate()?; - Ok(node) - } else { - Err(ValidateNodeError::NoNodeSet) - } - } -} - -impl node::Node { - /// Returns the node with a new name. - pub fn rename(self, name: bytes::Bytes) -> Self { - match self { - node::Node::Directory(n) => node::Node::Directory(DirectoryNode { name, ..n }), - node::Node::File(n) => node::Node::File(FileNode { name, ..n }), - node::Node::Symlink(n) => node::Node::Symlink(SymlinkNode { name, ..n }), - } - } - - /// Ensures the node has a valid name, and checks the type-specific fields too. - pub fn validate(&self) -> Result<(), ValidateNodeError> { - match self { - // for a directory root node, ensure the digest has the appropriate size. - node::Node::Directory(directory_node) => { - if directory_node.digest.len() != B3_LEN { - Err(ValidateNodeError::InvalidDigestLen( - directory_node.digest.len(), - ))?; - } - validate_node_name(&directory_node.name) - } - // for a file root node, ensure the digest has the appropriate size. - node::Node::File(file_node) => { - if file_node.digest.len() != B3_LEN { - Err(ValidateNodeError::InvalidDigestLen(file_node.digest.len()))?; - } - validate_node_name(&file_node.name) - } - // ensure the symlink target is not empty and doesn't contain null bytes. - node::Node::Symlink(symlink_node) => { - if symlink_node.target.is_empty() || symlink_node.target.contains(&b'\0') { - Err(ValidateNodeError::InvalidSymlinkTarget( - symlink_node.target.to_vec(), - ))?; - } - validate_node_name(&symlink_node.name) - } - } - } -} - -impl PartialOrd for node::Node { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for node::Node { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for FileNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for FileNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for SymlinkNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for SymlinkNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for DirectoryNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for DirectoryNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -/// Accepts a name, and a mutable reference to the previous name. -/// If the passed name is larger than the previous one, the reference is updated. -/// If it's not, an error is returned. -fn update_if_lt_prev<'n>( - prev_name: &mut &'n [u8], - name: &'n [u8], -) -> Result<(), ValidateDirectoryError> { - if *name < **prev_name { - return Err(ValidateDirectoryError::WrongSorting(name.to_vec())); - } - *prev_name = name; - Ok(()) -} - -/// Inserts the given name into a HashSet if it's not already in there. -/// If it is, an error is returned. -fn insert_once<'n>( - seen_names: &mut HashSet<&'n [u8]>, - name: &'n [u8], -) -> Result<(), ValidateDirectoryError> { - if seen_names.get(name).is_some() { - return Err(ValidateDirectoryError::DuplicateName(name.to_vec())); - } - seen_names.insert(name); - Ok(()) -} - fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) } @@ -280,116 +66,213 @@ impl Directory { .as_bytes() .into() } +} + +/// Accepts a name, and a mutable reference to the previous name. +/// If the passed name is larger than the previous one, the reference is updated. +/// If it's not, an error is returned. +fn update_if_lt_prev<'n>( + prev_name: &mut &'n [u8], + name: &'n [u8], +) -> Result<(), ValidateDirectoryError> { + if *name < **prev_name { + return Err(ValidateDirectoryError::WrongSorting(name.to_vec())); + } + *prev_name = name; + Ok(()) +} - /// validate checks the directory for invalid data, such as: - /// - violations of name restrictions - /// - invalid digest lengths - /// - not properly sorted lists - /// - duplicate names in the three lists - pub fn validate(&self) -> Result<(), ValidateDirectoryError> { - let mut seen_names: HashSet<&[u8]> = HashSet::new(); +impl TryFrom<&node::Node> for crate::directoryservice::Node { + type Error = ValidateNodeError; - let mut last_directory_name: &[u8] = b""; - let mut last_file_name: &[u8] = b""; - let mut last_symlink_name: &[u8] = b""; + fn try_from(node: &node::Node) -> Result<crate::directoryservice::Node, ValidateNodeError> { + Ok(match node { + node::Node::Directory(n) => crate::directoryservice::Node::Directory(n.try_into()?), + node::Node::File(n) => crate::directoryservice::Node::File(n.try_into()?), + node::Node::Symlink(n) => crate::directoryservice::Node::Symlink(n.try_into()?), + }) + } +} - // check directories - for directory_node in &self.directories { - node::Node::Directory(directory_node.clone()) - .validate() - .map_err(|e| { - ValidateDirectoryError::InvalidNode(directory_node.name.to_vec(), e) - })?; +impl TryFrom<&Node> for crate::directoryservice::Node { + type Error = ValidateNodeError; - update_if_lt_prev(&mut last_directory_name, &directory_node.name)?; - insert_once(&mut seen_names, &directory_node.name)?; + fn try_from(node: &Node) -> Result<crate::directoryservice::Node, ValidateNodeError> { + match node { + Node { node: None } => Err(ValidateNodeError::NoNodeSet), + Node { node: Some(node) } => node.try_into(), } + } +} + +impl TryFrom<&DirectoryNode> for crate::directoryservice::DirectoryNode { + type Error = ValidateNodeError; + + fn try_from( + node: &DirectoryNode, + ) -> Result<crate::directoryservice::DirectoryNode, ValidateNodeError> { + crate::directoryservice::DirectoryNode::new( + node.name.clone(), + node.digest.clone().try_into()?, + node.size, + ) + } +} - // check files - for file_node in &self.files { - node::Node::File(file_node.clone()) - .validate() - .map_err(|e| ValidateDirectoryError::InvalidNode(file_node.name.to_vec(), e))?; +impl TryFrom<&SymlinkNode> for crate::directoryservice::SymlinkNode { + type Error = ValidateNodeError; - update_if_lt_prev(&mut last_file_name, &file_node.name)?; - insert_once(&mut seen_names, &file_node.name)?; + fn try_from( + node: &SymlinkNode, + ) -> Result<crate::directoryservice::SymlinkNode, ValidateNodeError> { + crate::directoryservice::SymlinkNode::new(node.name.clone(), node.target.clone()) + } +} + +impl TryFrom<&FileNode> for crate::directoryservice::FileNode { + type Error = ValidateNodeError; + + fn try_from(node: &FileNode) -> Result<crate::directoryservice::FileNode, ValidateNodeError> { + crate::directoryservice::FileNode::new( + node.name.clone(), + node.digest.clone().try_into()?, + node.size, + node.executable, + ) + } +} + +impl TryFrom<Directory> for crate::directoryservice::Directory { + type Error = ValidateDirectoryError; + + fn try_from( + directory: Directory, + ) -> Result<crate::directoryservice::Directory, ValidateDirectoryError> { + (&directory).try_into() + } +} + +impl TryFrom<&Directory> for crate::directoryservice::Directory { + type Error = ValidateDirectoryError; + + fn try_from( + directory: &Directory, + ) -> Result<crate::directoryservice::Directory, ValidateDirectoryError> { + let mut dir = crate::directoryservice::Directory::new(); + let mut last_file_name: &[u8] = b""; + for file in directory.files.iter().map(move |file| { + update_if_lt_prev(&mut last_file_name, &file.name).map(|()| file.clone()) + }) { + let file = file?; + dir.add(crate::directoryservice::Node::File( + (&file) + .try_into() + .map_err(|e| ValidateDirectoryError::InvalidNode(file.name.into(), e))?, + ))?; + } + let mut last_directory_name: &[u8] = b""; + for directory in directory.directories.iter().map(move |directory| { + update_if_lt_prev(&mut last_directory_name, &directory.name).map(|()| directory.clone()) + }) { + let directory = directory?; + dir.add(crate::directoryservice::Node::Directory( + (&directory) + .try_into() + .map_err(|e| ValidateDirectoryError::InvalidNode(directory.name.into(), e))?, + ))?; } + let mut last_symlink_name: &[u8] = b""; + for symlink in directory.symlinks.iter().map(move |symlink| { + update_if_lt_prev(&mut last_symlink_name, &symlink.name).map(|()| symlink.clone()) + }) { + let symlink = symlink?; + dir.add(crate::directoryservice::Node::Symlink( + (&symlink) + .try_into() + .map_err(|e| ValidateDirectoryError::InvalidNode(symlink.name.into(), e))?, + ))?; + } + Ok(dir) + } +} - // check symlinks - for symlink_node in &self.symlinks { - node::Node::Symlink(symlink_node.clone()) - .validate() - .map_err(|e| ValidateDirectoryError::InvalidNode(symlink_node.name.to_vec(), e))?; +impl From<&crate::directoryservice::Node> for node::Node { + fn from(node: &crate::directoryservice::Node) -> node::Node { + match node { + crate::directoryservice::Node::Directory(n) => node::Node::Directory(n.into()), + crate::directoryservice::Node::File(n) => node::Node::File(n.into()), + crate::directoryservice::Node::Symlink(n) => node::Node::Symlink(n.into()), + } + } +} - update_if_lt_prev(&mut last_symlink_name, &symlink_node.name)?; - insert_once(&mut seen_names, &symlink_node.name)?; +impl From<&crate::directoryservice::Node> for Node { + fn from(node: &crate::directoryservice::Node) -> Node { + Node { + node: Some(node.into()), } + } +} - self.size_checked() - .ok_or(ValidateDirectoryError::SizeOverflow)?; +impl From<&crate::directoryservice::DirectoryNode> for DirectoryNode { + fn from(node: &crate::directoryservice::DirectoryNode) -> DirectoryNode { + DirectoryNode { + digest: node.digest().clone().into(), + size: node.size(), + name: node.get_name().clone(), + } + } +} - Ok(()) +impl From<&crate::directoryservice::FileNode> for FileNode { + fn from(node: &crate::directoryservice::FileNode) -> FileNode { + FileNode { + digest: node.digest().clone().into(), + size: node.size(), + name: node.get_name().clone(), + executable: node.executable(), + } } +} - /// Allows iterating over all three nodes ([DirectoryNode], [FileNode], - /// [SymlinkNode]) in an ordered fashion, as long as the individual lists - /// are sorted (which can be checked by the [Directory::validate]). - pub fn nodes(&self) -> DirectoryNodesIterator { - return DirectoryNodesIterator { - i_directories: self.directories.iter().peekable(), - i_files: self.files.iter().peekable(), - i_symlinks: self.symlinks.iter().peekable(), - }; +impl From<&crate::directoryservice::SymlinkNode> for SymlinkNode { + fn from(node: &crate::directoryservice::SymlinkNode) -> SymlinkNode { + SymlinkNode { + name: node.get_name().clone(), + target: node.target().clone(), + } } +} - /// Adds the specified [node::Node] to the [Directory], preserving sorted entries. - /// This assumes the [Directory] to be sorted prior to adding the node. - /// - /// Inserting an element that already exists with the same name in the directory is not - /// supported. - pub fn add(&mut self, node: node::Node) { - debug_assert!( - !self.files.iter().any(|x| x.get_name() == node.get_name()), - "name already exists in files" - ); - debug_assert!( - !self - .directories - .iter() - .any(|x| x.get_name() == node.get_name()), - "name already exists in directories" - ); - debug_assert!( - !self - .symlinks - .iter() - .any(|x| x.get_name() == node.get_name()), - "name already exists in symlinks" - ); +impl From<crate::directoryservice::Directory> for Directory { + fn from(directory: crate::directoryservice::Directory) -> Directory { + (&directory).into() + } +} - match node { - node::Node::File(node) => { - let pos = self - .files - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.files.insert(pos, node); - } - node::Node::Directory(node) => { - let pos = self - .directories - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.directories.insert(pos, node); - } - node::Node::Symlink(node) => { - let pos = self - .symlinks - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.symlinks.insert(pos, node); +impl From<&crate::directoryservice::Directory> for Directory { + fn from(directory: &crate::directoryservice::Directory) -> Directory { + let mut directories = vec![]; + let mut files = vec![]; + let mut symlinks = vec![]; + for node in directory.nodes() { + match node { + crate::directoryservice::Node::File(n) => { + files.push(n.into()); + } + crate::directoryservice::Node::Directory(n) => { + directories.push(n.into()); + } + crate::directoryservice::Node::Symlink(n) => { + symlinks.push(n.into()); + } } } + Directory { + directories, + files, + symlinks, + } } } @@ -409,65 +292,3 @@ impl StatBlobResponse { Ok(()) } } - -/// Struct to hold the state of an iterator over all nodes of a Directory. -/// -/// Internally, this keeps peekable Iterators over all three lists of a -/// Directory message. -pub struct DirectoryNodesIterator<'a> { - // directory: &Directory, - i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>, - i_files: Peekable<std::slice::Iter<'a, FileNode>>, - i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>, -} - -/// looks at two elements implementing NamedNode, and returns true if "left -/// is smaller / comes first". -/// -/// Some(_) is preferred over None. -fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool { - match left { - // if left is None, right always wins - None => false, - Some(left_inner) => { - // left is Some. - match right { - // left is Some, right is None - left wins. - None => true, - Some(right_inner) => { - // both are Some - compare the name. - return left_inner.get_name() < right_inner.get_name(); - } - } - } - } -} - -impl Iterator for DirectoryNodesIterator<'_> { - type Item = node::Node; - - // next returns the next node in the Directory. - // we peek at all three internal iterators, and pick the one with the - // smallest name, to ensure lexicographical ordering. - // The individual lists are already known to be sorted. - fn next(&mut self) -> Option<Self::Item> { - if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) { - // i_directories is still in the game, compare with symlinks - if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) { - self.i_directories - .next() - .cloned() - .map(node::Node::Directory) - } else { - self.i_symlinks.next().cloned().map(node::Node::Symlink) - } - } else { - // i_files is still in the game, compare with symlinks - if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) { - self.i_files.next().cloned().map(node::Node::File) - } else { - self.i_symlinks.next().cloned().map(node::Node::Symlink) - } - } - } -} diff --git a/tvix/castore/src/proto/tests/directory.rs b/tvix/castore/src/proto/tests/directory.rs index 81b73a048d52..78b2cf7668e3 100644 --- a/tvix/castore/src/proto/tests/directory.rs +++ b/tvix/castore/src/proto/tests/directory.rs @@ -1,7 +1,5 @@ -use crate::proto::{ - node, Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError, - ValidateNodeError, -}; +use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError}; +use crate::ValidateNodeError; use hex_literal::hex; @@ -149,7 +147,7 @@ fn digest() { #[test] fn validate_empty() { let d = Directory::default(); - assert_eq!(d.validate(), Ok(())); + assert!(crate::directoryservice::Directory::try_from(d).is_ok()); } #[test] @@ -163,7 +161,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"") } @@ -180,7 +178,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b".") } @@ -198,7 +196,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"..") } @@ -214,7 +212,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"\x00") } @@ -230,7 +228,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"foo/bar") } @@ -249,7 +247,7 @@ fn validate_invalid_digest() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => { assert_eq!(n, 2) } @@ -276,7 +274,7 @@ fn validate_sorting() { ], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::WrongSorting(s) => { assert_eq!(s, b"a"); } @@ -301,7 +299,7 @@ fn validate_sorting() { ], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::DuplicateName(s) => { assert_eq!(s, b"a"); } @@ -327,7 +325,7 @@ fn validate_sorting() { ..Default::default() }; - d.validate().expect("validate shouldn't error"); + crate::directoryservice::Directory::try_from(d).expect("validate shouldn't error"); } // [b, c] and [a] are both properly sorted. @@ -352,101 +350,6 @@ fn validate_sorting() { ..Default::default() }; - d.validate().expect("validate shouldn't error"); + crate::directoryservice::Directory::try_from(d).expect("validate shouldn't error"); } } - -#[test] -fn validate_overflow() { - let d = Directory { - directories: vec![DirectoryNode { - name: "foo".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: u64::MAX, - }], - ..Default::default() - }; - - match d.validate().expect_err("must fail") { - ValidateDirectoryError::SizeOverflow => {} - _ => panic!("unexpected error"), - } -} - -#[test] -fn add_nodes_to_directory() { - let mut d = Directory { - ..Default::default() - }; - - d.add(node::Node::Directory(DirectoryNode { - name: "b".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::Directory(DirectoryNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::Directory(DirectoryNode { - name: "z".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - - d.add(node::Node::File(FileNode { - name: "f".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - d.add(node::Node::File(FileNode { - name: "c".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - d.add(node::Node::File(FileNode { - name: "g".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - - d.add(node::Node::Symlink(SymlinkNode { - name: "t".into(), - target: "a".into(), - })); - d.add(node::Node::Symlink(SymlinkNode { - name: "o".into(), - target: "a".into(), - })); - d.add(node::Node::Symlink(SymlinkNode { - name: "e".into(), - target: "a".into(), - })); - - d.validate().expect("directory should be valid"); -} - -#[test] -#[cfg_attr(not(debug_assertions), ignore)] -#[should_panic = "name already exists in directories"] -fn add_duplicate_node_to_directory_panics() { - let mut d = Directory { - ..Default::default() - }; - - d.add(node::Node::Directory(DirectoryNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::File(FileNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); -} diff --git a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs deleted file mode 100644 index 68f147a33210..000000000000 --- a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::proto::Directory; -use crate::proto::DirectoryNode; -use crate::proto::FileNode; -use crate::proto::NamedNode; -use crate::proto::SymlinkNode; - -#[test] -fn iterator() { - let d = Directory { - directories: vec![ - DirectoryNode { - name: "c".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "d".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "h".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "l".into(), - ..DirectoryNode::default() - }, - ], - files: vec![ - FileNode { - name: "b".into(), - ..FileNode::default() - }, - FileNode { - name: "e".into(), - ..FileNode::default() - }, - FileNode { - name: "g".into(), - ..FileNode::default() - }, - FileNode { - name: "j".into(), - ..FileNode::default() - }, - ], - symlinks: vec![ - SymlinkNode { - name: "a".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "f".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "i".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "k".into(), - ..SymlinkNode::default() - }, - ], - }; - - // We keep this strings here and convert to string to make the comparison - // less messy. - let mut node_names: Vec<String> = vec![]; - - for node in d.nodes() { - node_names.push(String::from_utf8(node.get_name().to_vec()).unwrap()); - } - - assert_eq!( - vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"], - node_names - ); -} diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs index 8d903bacb6c5..74334029e84c 100644 --- a/tvix/castore/src/proto/tests/mod.rs +++ b/tvix/castore/src/proto/tests/mod.rs @@ -1,2 +1 @@ mod directory; -mod directory_nodes_iterator; diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs index 72fb06aea877..ba54078653e2 100644 --- a/tvix/castore/src/tests/import.rs +++ b/tvix/castore/src/tests/import.rs @@ -1,5 +1,6 @@ use crate::blobservice::{self, BlobService}; use crate::directoryservice; +use crate::directoryservice::{DirectoryNode, Node, SymlinkNode}; use crate::fixtures::*; use crate::import::fs::ingest_path; use crate::proto; @@ -33,10 +34,9 @@ async fn symlink() { .expect("must succeed"); assert_eq!( - proto::node::Node::Symlink(proto::SymlinkNode { - name: "doesntmatter".into(), - target: "/nix/store/somewhereelse".into(), - }), + Node::Symlink( + SymlinkNode::new("doesntmatter".into(), "/nix/store/somewhereelse".into(),).unwrap() + ), root_node, ) } @@ -65,7 +65,7 @@ async fn single_file() { size: HELLOWORLD_BLOB_CONTENTS.len() as u64, executable: false, }), - root_node, + (&root_node).into(), ); // ensure the blob has been uploaded @@ -95,17 +95,20 @@ async fn complicated() { // ensure root_node matched expectations assert_eq!( - proto::node::Node::Directory(proto::DirectoryNode { - name: tmpdir - .path() - .file_name() - .unwrap() - .as_bytes() - .to_owned() - .into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), + Node::Directory( + DirectoryNode::new( + tmpdir + .path() + .file_name() + .unwrap() + .as_bytes() + .to_owned() + .into(), + DIRECTORY_COMPLICATED.digest().clone(), + DIRECTORY_COMPLICATED.size(), + ) + .unwrap() + ), root_node, ); |