diff options
author | Yureka <tvl@yuka.dev> | 2024-07-29T12·34+0200 |
---|---|---|
committer | yuka <tvl@yuka.dev> | 2024-08-13T12·17+0000 |
commit | 3ca0b53840b352b24f3a315404df11458b0bdbbb (patch) | |
tree | 2635e8d67d0c334cc5760dc4205b7515c6283a77 /tvix/castore/src | |
parent | 5d3f3158d6102baee48d2772e85f05cfc1fac95e (diff) |
refactor(tvix/castore): use Directory struct separate from proto one r/8484
This uses our own data type to deal with Directories in the castore model. It makes some undesired states unrepresentable, removing the need for conversions and checking in various places: - In the protobuf, blake3 digests could have a wrong length, as proto doesn't know fixed-size fields. We now use `B3Digest`, which makes cloning cheaper, and removes the need to do size-checking everywhere. - In the protobuf, we had three different lists for `files`, `symlinks` and `directories`. This was mostly a protobuf size optimization, but made interacting with them a bit awkward. This has now been replaced with a list of enums, and convenience iterators to get various nodes, and add new ones. Change-Id: I7b92691bb06d77ff3f58a5ccea94a22c16f84f04 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12057 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/castore/src')
32 files changed, 1131 insertions, 1061 deletions
diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs index ef9a7326b3fb..4d919ff0d873 100644 --- a/tvix/castore/src/digests.rs +++ b/tvix/castore/src/digests.rs @@ -6,7 +6,7 @@ use thiserror::Error; pub struct B3Digest(Bytes); // TODO: allow converting these errors to crate::Error -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq)] pub enum Error { #[error("invalid digest length: {0}")] InvalidDigestLen(usize), diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index d10dddaf9f60..73ab4342d832 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -9,7 +9,9 @@ use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, trace, warn}; -use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter}; +use super::{ + utils::traverse_directory, Directory, DirectoryPutter, DirectoryService, SimplePutter, +}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; @@ -149,7 +151,7 @@ fn derive_directory_key(digest: &B3Digest) -> String { #[async_trait] impl DirectoryService for BigtableDirectoryService { #[instrument(skip(self, digest), err, fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let mut client = self.client.clone(); let directory_key = derive_directory_key(digest); @@ -241,28 +243,20 @@ impl DirectoryService for BigtableDirectoryService { // Try to parse the value into a Directory message. let directory = proto::Directory::decode(Bytes::from(row_cell.value)) - .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?; - - // validate the Directory. - directory - .validate() + .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))? + .try_into() .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?; Ok(Some(directory)) } #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let directory_digest = directory.digest(); let mut client = self.client.clone(); let directory_key = derive_directory_key(&directory_digest); - // Ensure the directory we're trying to upload passes validation - directory - .validate() - .map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?; - - let data = directory.encode_to_vec(); + let data = proto::Directory::from(directory).encode_to_vec(); if data.len() as u64 > CELL_SIZE_LIMIT { return Err(Error::StorageError( "Directory exceeds cell limit on Bigtable".into(), @@ -310,7 +304,7 @@ impl DirectoryService for BigtableDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs index 0fdc82c16cb0..4283142231f9 100644 --- a/tvix/castore/src/directoryservice/combinators.rs +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -7,10 +7,9 @@ use futures::TryStreamExt; use tonic::async_trait; use tracing::{instrument, trace}; -use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; +use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::directoryservice::DirectoryPutter; -use crate::proto; use crate::B3Digest; use crate::Error; @@ -40,7 +39,7 @@ where DS2: DirectoryService + Clone + 'static, { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { match self.near.get(digest).await? { Some(directory) => { trace!("serving from cache"); @@ -82,7 +81,7 @@ where } #[instrument(skip_all)] - async fn put(&self, _directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> { Err(Error::StorageError("unimplemented".to_string())) } @@ -90,7 +89,7 @@ where fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let near = self.near.clone(); let far = self.far.clone(); let digest = root_directory_digest.clone(); diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs index e6b9b163370c..aa60f68a3197 100644 --- a/tvix/castore/src/directoryservice/directory_graph.rs +++ b/tvix/castore/src/directoryservice/directory_graph.rs @@ -10,10 +10,8 @@ use petgraph::{ use tracing::instrument; use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; -use crate::{ - proto::{self, Directory, DirectoryNode}, - B3Digest, -}; +use super::{Directory, DirectoryNode}; +use crate::B3Digest; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -88,7 +86,7 @@ fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> { impl DirectoryGraph<LeavesToRootValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { if !self.order_validator.add_directory(&directory) { return Err(Error::ValidationError( "unknown directory was referenced".into(), @@ -108,7 +106,7 @@ impl DirectoryGraph<RootToLeavesValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); if !self.order_validator.digest_allowed(&digest) { return Err(Error::ValidationError("unexpected digest".into())); @@ -129,12 +127,7 @@ impl<O: OrderValidator> DirectoryGraph<O> { } /// Adds a directory which has already been confirmed to be in-order to the graph - pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> { - // Do some basic validation - directory - .validate() - .map_err(|e| Error::ValidationError(e.to_string()))?; - + pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); // Teach the graph about the existence of a node with this digest @@ -149,12 +142,10 @@ impl<O: OrderValidator> DirectoryGraph<O> { } // set up edges to all child directories - for subdir in &directory.directories { - let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap(); - + for subdir in directory.directories() { let child_ix = *self .digest_to_node_ix - .entry(subdir_digest) + .entry(subdir.digest.clone()) .or_insert_with(|| self.graph.add_node(None)); let pending_edge_check = match &self.graph[child_ix] { @@ -266,37 +257,36 @@ impl ValidatedDirectoryGraph { .filter_map(move |i| nodes[i.index()].weight.take()) } } - -#[cfg(test)] -mod tests { - use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, - proto::{self, Directory}, - }; - use lazy_static::lazy_static; - use rstest::rstest; - - lazy_static! { +/* pub static ref BROKEN_DIRECTORY : Directory = Directory { - symlinks: vec![proto::SymlinkNode { + symlinks: vec![SymlinkNode { name: "".into(), // invalid name! target: "doesntmatter".into(), }], ..Default::default() }; +*/ +#[cfg(test)] +mod tests { + use crate::directoryservice::{Directory, DirectoryNode, Node}; + use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; + use lazy_static::lazy_static; + use rstest::rstest; - pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() + use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; + + lazy_static! { + pub static ref BROKEN_PARENT_DIRECTORY: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + "foo".into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size() + 42, // wrong! + ).unwrap())).unwrap(); + dir }; } - use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; - #[rstest] /// Uploading an empty directory should succeed. #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] @@ -312,8 +302,6 @@ mod tests { #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] /// Uploading B (referring to A) should fail immediately, because A was never uploaded. #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] - /// Uploading a directory failing validation should fail immediately. - #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] /// Uploading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] fn test_uploads( @@ -366,8 +354,6 @@ mod tests { #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)] /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root). #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)] - /// Downloading a directory failing validation should fail immediately. - #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)] /// Downloading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)] fn test_downloads( diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 4dc3931ed410..2079514d2b79 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,8 +1,9 @@ use std::collections::HashSet; -use super::{DirectoryPutter, DirectoryService}; +use super::{Directory, DirectoryPutter, DirectoryService}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::proto::{self, get_directory_request::ByWhat}; +use crate::ValidateDirectoryError; use crate::{B3Digest, Error}; use async_stream::try_stream; use futures::stream::BoxStream; @@ -41,10 +42,7 @@ where T::Future: Send, { #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] - async fn get( - &self, - digest: &B3Digest, - ) -> Result<Option<crate::proto::Directory>, crate::Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest_cpy = digest.clone(); @@ -72,15 +70,10 @@ where "requested directory with digest {}, but got {}", digest, actual_digest ))) - } else if let Err(e) = directory.validate() { - // Validate the Directory itself is valid. - warn!("directory failed validation: {}", e.to_string()); - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - digest, e, - ))) } else { - Ok(Some(directory)) + Ok(Some(directory.try_into().map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?)) } } Ok(None) => Ok(None), @@ -90,11 +83,11 @@ where } #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> { let resp = self .grpc_client .clone() - .put(tokio_stream::once(directory)) + .put(tokio_stream::once(proto::Directory::from(directory))) .await; match resp { @@ -113,7 +106,7 @@ where fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest = root_directory_digest.clone(); @@ -135,14 +128,6 @@ where loop { match stream.message().await { Ok(Some(directory)) => { - // validate the directory itself. - if let Err(e) = directory.validate() { - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - directory.digest(), - e, - )))?; - } // validate we actually expected that directory, and move it from expected to received. let directory_digest = directory.digest(); let was_expected = expected_directory_digests.remove(&directory_digest); @@ -168,6 +153,9 @@ where .insert(child_directory_digest); } + let directory = directory.try_into() + .map_err(|e: ValidateDirectoryError| Error::StorageError(e.to_string()))?; + yield directory; }, Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => { @@ -279,11 +267,11 @@ pub struct GRPCPutter { #[async_trait] impl DirectoryPutter for GRPCPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> { match self.rq { // If we're not already closed, send the directory to directory_sender. Some((_, ref directory_sender)) => { - if directory_sender.send(directory).is_err() { + if directory_sender.send(directory.into()).is_err() { // If the channel has been prematurely closed, invoke close (so we can peek at the error code) // That error code is much more helpful, because it // contains the error message from the server. diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index ada4606a5a57..b039d9bc7d84 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -1,4 +1,4 @@ -use crate::{proto, B3Digest, Error}; +use crate::{B3Digest, Error}; use futures::stream::BoxStream; use std::collections::HashMap; use std::sync::Arc; @@ -7,8 +7,9 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryPutter, DirectoryService, SimplePutter}; +use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter}; use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::proto; #[derive(Clone, Default)] pub struct MemoryDirectoryService { @@ -18,7 +19,7 @@ pub struct MemoryDirectoryService { #[async_trait] impl DirectoryService for MemoryDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.read().await; match db.get(digest) { @@ -37,35 +38,20 @@ impl DirectoryService for MemoryDirectoryService { ))); } - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } - - Ok(Some(directory.clone())) + Ok(Some(directory.clone().try_into().map_err(|e| { + crate::Error::StorageError(format!("corrupted directory: {}", e)) + })?)) } } } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } - // store it let mut db = self.db.write().await; - db.insert(digest.clone(), directory); + db.insert(digest.clone(), directory.into()); Ok(digest) } @@ -74,7 +60,7 @@ impl DirectoryService for MemoryDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index 17a78b179349..0141a48f75bc 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -1,6 +1,9 @@ use crate::composition::{Registry, ServiceBuilder}; -use crate::{proto, B3Digest, Error}; +use crate::proto; +use crate::{B3Digest, Error}; +use crate::{ValidateDirectoryError, ValidateNodeError}; +use bytes::Bytes; use futures::stream::BoxStream; use tonic::async_trait; mod combinators; @@ -38,7 +41,7 @@ mod bigtable; pub use self::bigtable::{BigtableDirectoryService, BigtableParameters}; /// The base trait all Directory services need to implement. -/// This is a simple get and put of [crate::proto::Directory], returning their +/// This is a simple get and put of [Directory], returning their /// digest. #[async_trait] pub trait DirectoryService: Send + Sync { @@ -50,14 +53,14 @@ pub trait DirectoryService: Send + Sync { /// Directory digests that are at the "root", aka the last element that's /// sent to a DirectoryPutter. This makes sense for implementations bundling /// closures of directories together in batches. - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error>; /// Uploads a single Directory message, and returns the calculated /// digest, or an error. An error *must* also be returned if the message is /// not valid. - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + async fn put(&self, directory: Directory) -> Result<B3Digest, Error>; - /// Looks up a closure of [proto::Directory]. - /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`, + /// Looks up a closure of [Directory]. + /// Ideally this would be a `impl Stream<Item = Result<Directory, Error>>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. /// @@ -75,9 +78,9 @@ pub trait DirectoryService: Send + Sync { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>>; + ) -> BoxStream<'static, Result<Directory, Error>>; - /// Allows persisting a closure of [proto::Directory], which is a graph of + /// Allows persisting a closure of [Directory], which is a graph of /// connected Directory messages. fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; } @@ -87,18 +90,18 @@ impl<A> DirectoryService for A where A: AsRef<dyn DirectoryService> + Send + Sync, { - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.as_ref().get(digest).await } - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { self.as_ref().put(directory).await } fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { self.as_ref().get_recursive(root_directory_digest) } @@ -107,7 +110,7 @@ where } } -/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// Provides a handle to put a closure of connected [Directory] elements. /// /// The consumer can periodically call [DirectoryPutter::put], starting from the /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to @@ -119,12 +122,12 @@ where /// but a single file or symlink. #[async_trait] pub trait DirectoryPutter: Send { - /// Put a individual [proto::Directory] into the store. + /// Put a individual [Directory] into the store. /// Error semantics and behaviour is up to the specific implementation of /// this trait. /// Due to bursting, the returned error might refer to an object previously /// sent via `put`. - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + async fn put(&mut self, directory: Directory) -> Result<(), Error>; /// Close the stream, and wait for any errors. /// If there's been any invalid Directory message uploaded, and error *must* @@ -145,3 +148,461 @@ pub(crate) fn register_directory_services(reg: &mut Registry) { reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable"); } } + +/// A Directory can contain Directory, File or Symlink nodes. +/// Each of these nodes have a name attribute, which is the basename in that +/// directory and node type specific attributes. +/// While a Node by itself may have any name, the names of Directory entries: +/// - MUST not contain slashes or null bytes +/// - MUST not be '.' or '..' +/// - MUST be unique across all three lists +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct Directory { + nodes: Vec<Node>, +} + +/// A DirectoryNode is a pointer to a [Directory], by its [Directory::digest]. +/// It also gives it a `name` and `size`. +/// Such a node is either an element in the [Directory] it itself is contained in, +/// or a standalone root node./ +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DirectoryNode { + /// The (base)name of the directory + name: Bytes, + /// The blake3 hash of a Directory message, serialized in protobuf canonical form. + digest: B3Digest, + /// Number of child elements in the Directory referred to by `digest`. + /// Calculated by summing up the numbers of nodes, and for each directory. + /// its size field. Can be used for inode allocation. + /// This field is precisely as verifiable as any other Merkle tree edge. + /// Resolve `digest`, and you can compute it incrementally. Resolve the entire + /// tree, and you can fully compute it from scratch. + /// A credulous implementation won't reject an excessive size, but this is + /// harmless: you'll have some ordinals without nodes. Undersizing is obvious + /// and easy to reject: you won't have an ordinal for some nodes. + size: u64, +} + +impl DirectoryNode { + pub fn new(name: Bytes, digest: B3Digest, size: u64) -> Result<Self, ValidateNodeError> { + Ok(Self { name, digest, size }) + } + + pub fn digest(&self) -> &B3Digest { + &self.digest + } + pub fn size(&self) -> u64 { + self.size + } +} + +/// A FileNode represents a regular or executable file in a Directory or at the root. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FileNode { + /// The (base)name of the file + name: Bytes, + + /// The blake3 digest of the file contents + digest: B3Digest, + + /// The file content size + size: u64, + + /// Whether the file is executable + executable: bool, +} + +impl FileNode { + pub fn new( + name: Bytes, + digest: B3Digest, + size: u64, + executable: bool, + ) -> Result<Self, ValidateNodeError> { + Ok(Self { + name, + digest, + size, + executable, + }) + } + + pub fn digest(&self) -> &B3Digest { + &self.digest + } + + pub fn size(&self) -> u64 { + self.size + } + + pub fn executable(&self) -> bool { + self.executable + } +} + +/// A SymlinkNode represents a symbolic link in a Directory or at the root. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SymlinkNode { + /// The (base)name of the symlink + name: Bytes, + /// The target of the symlink. + target: Bytes, +} + +impl SymlinkNode { + pub fn new(name: Bytes, target: Bytes) -> Result<Self, ValidateNodeError> { + if target.is_empty() || target.contains(&b'\0') { + return Err(ValidateNodeError::InvalidSymlinkTarget(target)); + } + Ok(Self { name, target }) + } + + pub fn target(&self) -> &bytes::Bytes { + &self.target + } +} + +/// A Node is either a [DirectoryNode], [FileNode] or [SymlinkNode]. +/// While a Node by itself may have any name, only those matching specific requirements +/// can can be added as entries to a [Directory] (see the documentation on [Directory] for details). +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Node { + Directory(DirectoryNode), + File(FileNode), + Symlink(SymlinkNode), +} + +/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] +/// and [Node], so we can ask all of them for the name easily. +pub trait NamedNode { + fn get_name(&self) -> &bytes::Bytes; +} + +impl NamedNode for &FileNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for FileNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &DirectoryNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for DirectoryNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &SymlinkNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for SymlinkNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &Node { + fn get_name(&self) -> &bytes::Bytes { + match self { + Node::File(node_file) => &node_file.name, + Node::Directory(node_directory) => &node_directory.name, + Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} +impl NamedNode for Node { + fn get_name(&self) -> &bytes::Bytes { + match self { + Node::File(node_file) => &node_file.name, + Node::Directory(node_directory) => &node_directory.name, + Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} + +impl Node { + /// Returns the node with a new name. + pub fn rename(self, name: bytes::Bytes) -> Self { + match self { + Node::Directory(n) => Node::Directory(DirectoryNode { name, ..n }), + Node::File(n) => Node::File(FileNode { name, ..n }), + Node::Symlink(n) => Node::Symlink(SymlinkNode { name, ..n }), + } + } +} + +impl PartialOrd for Node { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for Node { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for FileNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for FileNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for DirectoryNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for DirectoryNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for SymlinkNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for SymlinkNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { + iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) +} + +impl Directory { + pub fn new() -> Self { + Directory { nodes: vec![] } + } + + /// The size of a directory is the number of all regular and symlink elements, + /// the number of directory elements, and their size fields. + pub fn size(&self) -> u64 { + // It's impossible to create a Directory where the size overflows, because we + // check before every add() that the size won't overflow. + (self.nodes.len() as u64) + self.directories().map(|e| e.size).sum::<u64>() + } + + /// Calculates the digest of a Directory, which is the blake3 hash of a + /// Directory protobuf message, serialized in protobuf canonical form. + pub fn digest(&self) -> B3Digest { + proto::Directory::from(self).digest() + } + + /// Allows iterating over all nodes (directories, files and symlinks) + /// ordered by their name. + pub fn nodes(&self) -> impl Iterator<Item = &Node> + Send + Sync + '_ { + self.nodes.iter() + } + + /// Allows iterating over the FileNode entries of this directory + /// ordered by their name + pub fn files(&self) -> impl Iterator<Item = &FileNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::File(n) => Some(n), + _ => None, + }) + } + + /// Allows iterating over the subdirectories of this directory + /// ordered by their name + pub fn directories(&self) -> impl Iterator<Item = &DirectoryNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::Directory(n) => Some(n), + _ => None, + }) + } + + /// Allows iterating over the SymlinkNode entries of this directory + /// ordered by their name + pub fn symlinks(&self) -> impl Iterator<Item = &SymlinkNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::Symlink(n) => Some(n), + _ => None, + }) + } + + /// Checks a Node name for validity as a directory entry + /// We disallow slashes, null bytes, '.', '..' and the empty string. + pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { + if name.is_empty() + || name == b".." + || name == b"." + || name.contains(&0x00) + || name.contains(&b'/') + { + Err(ValidateNodeError::InvalidName(name.to_owned().into())) + } else { + Ok(()) + } + } + + /// Adds the specified [Node] to the [Directory], preserving sorted entries. + /// + /// Inserting an element that already exists with the same name in the directory will yield an + /// error. + /// Inserting an element will validate that its name fulfills the stricter requirements for + /// directory entries and yield an error if it is not. + pub fn add(&mut self, node: Node) -> Result<(), ValidateDirectoryError> { + Self::validate_node_name(node.get_name()) + .map_err(|e| ValidateDirectoryError::InvalidNode(node.get_name().clone().into(), e))?; + + // Check that the even after adding this new directory entry, the size calculation will not + // overflow + // FUTUREWORK: add some sort of batch add interface which only does this check once with + // all the to-be-added entries + checked_sum([ + self.size(), + 1, + match node { + Node::Directory(ref dir) => dir.size, + _ => 0, + }, + ]) + .ok_or(ValidateDirectoryError::SizeOverflow)?; + + // This assumes the [Directory] is sorted, since we don't allow accessing the nodes list + // directly and all previous inserts should have been in-order + let pos = match self + .nodes + .binary_search_by_key(&node.get_name(), |n| n.get_name()) + { + Err(pos) => pos, // There is no node with this name; good! + Ok(_) => { + return Err(ValidateDirectoryError::DuplicateName( + node.get_name().to_vec(), + )) + } + }; + + self.nodes.insert(pos, node); + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::{Directory, DirectoryNode, FileNode, Node, SymlinkNode}; + use crate::fixtures::DUMMY_DIGEST; + use crate::ValidateDirectoryError; + + #[test] + fn add_nodes_to_directory() { + let mut d = Directory::new(); + + d.add(Node::Directory( + DirectoryNode::new("b".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + d.add(Node::Directory( + DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + d.add(Node::Directory( + DirectoryNode::new("z".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + + d.add(Node::File( + FileNode::new("f".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + d.add(Node::File( + FileNode::new("c".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + d.add(Node::File( + FileNode::new("g".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + + d.add(Node::Symlink( + SymlinkNode::new("t".into(), "a".into()).unwrap(), + )) + .unwrap(); + d.add(Node::Symlink( + SymlinkNode::new("o".into(), "a".into()).unwrap(), + )) + .unwrap(); + d.add(Node::Symlink( + SymlinkNode::new("e".into(), "a".into()).unwrap(), + )) + .unwrap(); + + // Convert to proto struct and back to ensure we are not generating any invalid structures + crate::directoryservice::Directory::try_from(crate::proto::Directory::from(d)) + .expect("directory should be valid"); + } + + #[test] + fn validate_overflow() { + let mut d = Directory::new(); + + assert_eq!( + d.add(Node::Directory( + DirectoryNode::new("foo".into(), DUMMY_DIGEST.clone(), u64::MAX).unwrap(), + )), + Err(ValidateDirectoryError::SizeOverflow) + ); + } + + #[test] + fn add_duplicate_node_to_directory() { + let mut d = Directory::new(); + + d.add(Node::Directory( + DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + assert_eq!( + format!( + "{}", + d.add(Node::File( + FileNode::new("a".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .expect_err("adding duplicate dir entry must fail") + ), + "\"a\" is a duplicate name" + ); + } + + /// Attempt to add a directory entry with a name which should be rejected. + #[tokio::test] + async fn directory_reject_invalid_name() { + let mut dir = Directory::new(); + assert!( + dir.add(Node::Symlink( + SymlinkNode::new( + "".into(), // wrong! can not be added to directory + "doesntmatter".into(), + ) + .unwrap() + )) + .is_err(), + "invalid symlink entry be rejected" + ); + } +} diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index a9a2cc8ef5c0..6e71c2356def 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -17,7 +17,8 @@ use tracing::{instrument, trace, warn, Level}; use url::Url; use super::{ - DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator, + Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, + RootToLeavesValidator, }; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; @@ -78,13 +79,13 @@ impl DirectoryService for ObjectStoreDirectoryService { /// This is the same steps as for get_recursive anyways, so we just call get_recursive and /// return the first element of the stream and drop the request. #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.get_recursive(digest).take(1).next().await.transpose() } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - if !directory.directories.is_empty() { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { + if directory.directories().next().is_some() { return Err(Error::InvalidRequest( "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(), )); @@ -99,7 +100,7 @@ impl DirectoryService for ObjectStoreDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // Check that we are not passing on bogus from the object store to the client, and that the // trust chain from the root digest to the leaves is intact let mut order_validator = @@ -145,6 +146,10 @@ impl DirectoryService for ObjectStoreDirectoryService { warn!("unable to parse directory {}: {}", digest, e); Error::StorageError(e.to_string()) })?; + let directory = Directory::try_from(directory).map_err(|e| { + warn!("unable to convert directory {}: {}", digest, e); + Error::StorageError(e.to_string()) + })?; // Allow the children to appear next order_validator.add_directory_unchecked(&directory); @@ -244,7 +249,7 @@ impl ObjectStoreDirectoryPutter { #[async_trait] impl DirectoryPutter for ObjectStoreDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -302,7 +307,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { for directory in directories { directories_sink - .send(directory.encode_to_vec().into()) + .send(proto::Directory::from(directory).encode_to_vec().into()) .await?; } diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs index 6045f5d24198..17431fc8d3b0 100644 --- a/tvix/castore/src/directoryservice/order_validator.rs +++ b/tvix/castore/src/directoryservice/order_validator.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use tracing::warn; -use crate::{proto::Directory, B3Digest}; +use super::Directory; +use crate::B3Digest; pub trait OrderValidator { /// Update the order validator's state with the directory @@ -47,10 +48,9 @@ impl RootToLeavesValidator { self.expected_digests.insert(directory.digest()); } - for subdir in &directory.directories { + for subdir in directory.directories() { // Allow the children to appear next - let subdir_digest = subdir.digest.clone().try_into().unwrap(); - self.expected_digests.insert(subdir_digest); + self.expected_digests.insert(subdir.digest.clone()); } } } @@ -79,12 +79,11 @@ impl OrderValidator for LeavesToRootValidator { fn add_directory(&mut self, directory: &Directory) -> bool { let digest = directory.digest(); - for subdir in &directory.directories { - let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory() - if !self.allowed_references.contains(&subdir_digest) { + for subdir in directory.directories() { + if !self.allowed_references.contains(&subdir.digest) { warn!( directory.digest = %digest, - subdirectory.digest = %subdir_digest, + subdirectory.digest = %subdir.digest, "unexpected directory reference" ); return false; @@ -101,8 +100,8 @@ impl OrderValidator for LeavesToRootValidator { mod tests { use super::{LeavesToRootValidator, RootToLeavesValidator}; use crate::directoryservice::order_validator::OrderValidator; + use crate::directoryservice::Directory; use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; - use crate::proto::Directory; use rstest::rstest; #[rstest] diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs index 51dc87f92574..d253df503bb3 100644 --- a/tvix/castore/src/directoryservice/redb.rs +++ b/tvix/castore/src/directoryservice/redb.rs @@ -5,15 +5,15 @@ use std::{path::PathBuf, sync::Arc}; use tonic::async_trait; use tracing::{instrument, warn}; +use super::{ + traverse_directory, Directory, DirectoryGraph, DirectoryPutter, DirectoryService, + LeavesToRootValidator, +}; use crate::{ composition::{CompositionContext, ServiceBuilder}, digests, proto, B3Digest, Error, }; -use super::{ - traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, -}; - const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = TableDefinition::new("directory"); @@ -69,7 +69,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { #[async_trait] impl DirectoryService for RedbDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.clone(); // Retrieves the protobuf-encoded Directory for the corresponding digest. @@ -107,13 +107,10 @@ impl DirectoryService for RedbDirectoryService { let directory = match proto::Directory::decode(&*directory_data.value()) { Ok(dir) => { // The returned Directory must be valid. - if let Err(e) = dir.validate() { + dir.try_into().map_err(|e| { warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - dir + Error::StorageError("Directory failed validation".to_string()) + })? } Err(e) => { warn!(err=%e, "failed to parse Directory"); @@ -125,26 +122,21 @@ impl DirectoryService for RedbDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); move || { let digest = directory.digest(); - // Validate the directory. - if let Err(e) = directory.validate() { - warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - // Store the directory in the table. let txn = db.begin_write()?; { let mut table = txn.open_table(DIRECTORY_TABLE)?; let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } txn.commit()?; @@ -158,7 +150,7 @@ impl DirectoryService for RedbDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single // redb transaction to avoid constantly closing and opening new transactions for the // database. @@ -185,7 +177,7 @@ pub struct RedbDirectoryPutter { #[async_trait] impl DirectoryPutter for RedbDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -228,7 +220,10 @@ impl DirectoryPutter for RedbDirectoryPutter { for directory in directories { let digest_as_array: [u8; digests::B3_LEN] = directory.digest().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } } diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs index dc54e3d11d18..b4daaee61b22 100644 --- a/tvix/castore/src/directoryservice/simple_putter.rs +++ b/tvix/castore/src/directoryservice/simple_putter.rs @@ -1,7 +1,6 @@ use super::DirectoryPutter; use super::DirectoryService; -use super::{DirectoryGraph, LeavesToRootValidator}; -use crate::proto; +use super::{Directory, DirectoryGraph, LeavesToRootValidator}; use crate::B3Digest; use crate::Error; use tonic::async_trait; @@ -29,7 +28,7 @@ impl<DS: DirectoryService> SimplePutter<DS> { #[async_trait] impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index 5766dec1a5c2..4f3a860d14e4 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -1,5 +1,3 @@ -use crate::proto::Directory; -use crate::{proto, B3Digest, Error}; use futures::stream::BoxStream; use prost::Message; use std::ops::Deref; @@ -9,8 +7,9 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; +use super::{Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{proto, B3Digest, Error}; #[derive(Clone)] pub struct SledDirectoryService { @@ -44,7 +43,7 @@ impl SledDirectoryService { #[async_trait] impl DirectoryService for SledDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let resp = tokio::task::spawn_blocking({ let db = self.db.clone(); let digest = digest.clone(); @@ -61,7 +60,7 @@ impl DirectoryService for SledDirectoryService { None => Ok(None), // The directory was found, try to parse the data as Directory message - Some(data) => match Directory::decode(&*data) { + Some(data) => match proto::Directory::decode(&*data) { Ok(directory) => { // Validate the retrieved Directory indeed has the // digest we expect it to have, to detect corruptions. @@ -73,14 +72,10 @@ impl DirectoryService for SledDirectoryService { ))); } - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } + let directory = directory.try_into().map_err(|e| { + warn!("failed to retrieve directory: {}", e); + Error::StorageError(format!("failed to retrieve directory: {}", e)) + })?; Ok(Some(directory)) } @@ -93,22 +88,18 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); move || { let digest = directory.digest(); - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } // store it - db.insert(digest.as_slice(), directory.encode_to_vec()) - .map_err(|e| Error::StorageError(e.to_string()))?; + db.insert( + digest.as_slice(), + proto::Directory::from(directory).encode_to_vec(), + ) + .map_err(|e| Error::StorageError(e.to_string()))?; Ok(digest) } @@ -120,7 +111,7 @@ impl DirectoryService for SledDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } @@ -215,7 +206,7 @@ pub struct SledDirectoryPutter { #[async_trait] impl DirectoryPutter for SledDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -252,7 +243,10 @@ impl DirectoryPutter for SledDirectoryPutter { let mut batch = sled::Batch::default(); for directory in directories { - batch.insert(directory.digest().as_slice(), directory.encode_to_vec()); + batch.insert( + directory.digest().as_slice(), + proto::Directory::from(directory).encode_to_vec(), + ); } tree.apply_batch(batch).map_err(|e| { diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index b698f70ea469..3923e66dc2c2 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -8,10 +8,8 @@ use rstest_reuse::{self, *}; use super::DirectoryService; use crate::directoryservice; -use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D}, - proto::{self, Directory}, -}; +use crate::directoryservice::{Directory, DirectoryNode, Node}; +use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D}; mod utils; use self::utils::make_grpc_directory_service_client; @@ -41,10 +39,10 @@ async fn test_non_exist(directory_service: impl DirectoryService) { // recursive get assert_eq!( - Vec::<Result<proto::Directory, crate::Error>>::new(), + Vec::<Result<Directory, crate::Error>>::new(), directory_service .get_recursive(&DIRECTORY_A.digest()) - .collect::<Vec<Result<proto::Directory, crate::Error>>>() + .collect::<Vec<Result<Directory, crate::Error>>>() .await ); } @@ -212,59 +210,26 @@ async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService } } -/// Try uploading a Directory failing its internal validation, ensure it gets -/// rejected. -#[apply(directory_services)] -#[tokio::test] -async fn upload_reject_failing_validation(directory_service: impl DirectoryService) { - let broken_directory = Directory { - symlinks: vec![proto::SymlinkNode { - name: "".into(), // wrong! - target: "doesntmatter".into(), - }], - ..Default::default() - }; - assert!(broken_directory.validate().is_err()); - - // Try to upload via single upload. - assert!( - directory_service - .put(broken_directory.clone()) - .await - .is_err(), - "single upload must fail" - ); - - // Try to upload via put_multiple. We're a bit more permissive here, the - // intermediate .put() might succeed, due to client-side bursting (in the - // case of gRPC), but then the close MUST fail. - let mut handle = directory_service.put_multiple_start(); - if handle.put(broken_directory).await.is_ok() { - assert!( - handle.close().await.is_err(), - "when succeeding put, close must fail" - ) - } -} - /// Try uploading a Directory that refers to a previously-uploaded directory. /// Both pass their isolated validation, but the size field in the parent is wrong. /// This should be rejected. #[apply(directory_services)] #[tokio::test] async fn upload_reject_wrong_size(directory_service: impl DirectoryService) { - let wrong_parent_directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() + let wrong_parent_directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory( + DirectoryNode::new( + "foo".into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size() + 42, // wrong! + ) + .unwrap(), + )) + .unwrap(); + dir }; - // Make sure isolated validation itself is ok - assert!(wrong_parent_directory.validate().is_ok()); - // Now upload both. Ensure it either fails during the second put, or during // the close. let mut handle = directory_service.put_multiple_start(); diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs index 17a51ae2bbff..3e6dc73a4d6b 100644 --- a/tvix/castore/src/directoryservice/traverse.rs +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -1,8 +1,5 @@ -use super::DirectoryService; -use crate::{ - proto::{node::Node, NamedNode}, - B3Digest, Error, Path, -}; +use super::{DirectoryService, NamedNode, Node}; +use crate::{Error, Path}; use tracing::{instrument, warn}; /// This descends from a (root) node to the given (sub)path, returning the Node @@ -25,34 +22,31 @@ where return Ok(None); } Node::Directory(directory_node) => { - let digest: B3Digest = directory_node - .digest - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; - // fetch the linked node from the directory_service. - let directory = - directory_service - .as_ref() - .get(&digest) - .await? - .ok_or_else(|| { - // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! - warn!("directory {} does not exist", digest); - - Error::StorageError(format!("directory {} does not exist", digest)) - })?; + let directory = directory_service + .as_ref() + .get(&directory_node.digest) + .await? + .ok_or_else(|| { + // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! + warn!("directory {} does not exist", directory_node.digest); + + Error::StorageError(format!( + "directory {} does not exist", + directory_node.digest + )) + })?; // look for the component in the [Directory]. // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we // could stop as soon as e.name is larger than the search string. if let Some(child_node) = directory.nodes().find(|n| n.get_name() == component) { // child node found, update prev_node to that and continue. - parent_node = child_node; + parent_node = child_node.clone(); } else { // child node not found means there's no such element inside the directory. return Ok(None); - } + }; } } } @@ -65,6 +59,7 @@ where mod tests { use crate::{ directoryservice, + directoryservice::{DirectoryNode, Node}, fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, PathBuf, }; @@ -88,21 +83,21 @@ mod tests { handle.close().await.expect("must upload"); // construct the node for DIRECTORY_COMPLICATED - let node_directory_complicated = - crate::proto::node::Node::Directory(crate::proto::DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }); + let node_directory_complicated = Node::Directory( + DirectoryNode::new( + "doesntmatter".into(), + DIRECTORY_COMPLICATED.digest(), + DIRECTORY_COMPLICATED.size(), + ) + .unwrap(), + ); // construct the node for DIRECTORY_COMPLICATED - let node_directory_with_keep = crate::proto::node::Node::Directory( - DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), - ); + let node_directory_with_keep = + Node::Directory(DIRECTORY_COMPLICATED.directories().next().unwrap().clone()); // construct the node for the .keep file - let node_file_keep = - crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); + let node_file_keep = Node::File(DIRECTORY_WITH_KEEP.files().next().unwrap().clone()); // traversal to an empty subpath should return the root node. { diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index 726734f55eec..352362811a97 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -1,5 +1,5 @@ +use super::Directory; use super::DirectoryService; -use crate::proto; use crate::B3Digest; use crate::Error; use async_stream::try_stream; @@ -8,14 +8,14 @@ use std::collections::{HashSet, VecDeque}; use tracing::instrument; use tracing::warn; -/// Traverses a [proto::Directory] from the root to the children. +/// Traverses a [Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. #[instrument(skip(directory_service))] pub fn traverse_directory<'a, DS: DirectoryService + 'static>( directory_service: DS, root_directory_digest: &B3Digest, -) -> BoxStream<'a, Result<proto::Directory, Error>> { +) -> BoxStream<'a, Result<Directory, Error>> { // The list of all directories that still need to be traversed. The next // element is picked from the front, new elements are enqueued at the // back. @@ -50,16 +50,6 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( Some(dir) => dir, }; - - // validate, we don't want to send invalid directories. - current_directory.validate().map_err(|e| { - warn!("directory failed validation: {}", e.to_string()); - Error::StorageError(format!( - "invalid directory: {}", - current_directory_digest - )) - })?; - // We're about to send this directory, so let's avoid sending it again if a // descendant has it. sent_directory_digests.insert(current_directory_digest); @@ -67,9 +57,9 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( // enqueue all child directory digests to the work queue, as // long as they're not part of the worklist or already sent. // This panics if the digest looks invalid, it's supposed to be checked first. - for child_directory_node in ¤t_directory.directories { + for child_directory_node in current_directory.directories() { // TODO: propagate error - let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); + let child_digest: B3Digest = child_directory_node.digest.clone(); if worklist_directory_digests.contains(&child_digest) || sent_directory_digests.contains(&child_digest) diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs index 5bbcd7b04ef1..083f43036845 100644 --- a/tvix/castore/src/errors.rs +++ b/tvix/castore/src/errors.rs @@ -1,3 +1,4 @@ +use bstr::ByteSlice; use thiserror::Error; use tokio::task::JoinError; use tonic::Status; @@ -12,6 +13,46 @@ pub enum Error { StorageError(String), } +/// Errors that can occur during the validation of [Directory] messages. +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum ValidateDirectoryError { + /// Elements are not in sorted order + #[error("{:?} is not sorted", .0.as_bstr())] + WrongSorting(Vec<u8>), + /// Multiple elements with the same name encountered + #[error("{:?} is a duplicate name", .0.as_bstr())] + DuplicateName(Vec<u8>), + /// Invalid node + #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())] + InvalidNode(Vec<u8>, ValidateNodeError), + #[error("Total size exceeds u32::MAX")] + SizeOverflow, +} + +/// Errors that occur during Node validation +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum ValidateNodeError { + #[error("No node set")] + NoNodeSet, + /// Invalid digest length encountered + #[error("invalid digest length: {0}")] + InvalidDigestLen(usize), + /// Invalid name encountered + #[error("Invalid name: {}", .0.as_bstr())] + InvalidName(bytes::Bytes), + /// Invalid symlink target + #[error("Invalid symlink target: {}", .0.as_bstr())] + InvalidSymlinkTarget(bytes::Bytes), +} + +impl From<crate::digests::Error> for ValidateNodeError { + fn from(e: crate::digests::Error) -> Self { + match e { + crate::digests::Error::InvalidDigestLen(n) => ValidateNodeError::InvalidDigestLen(n), + } + } +} + impl From<JoinError> for Error { fn from(value: JoinError) -> Self { Error::StorageError(value.to_string()) diff --git a/tvix/castore/src/fixtures.rs b/tvix/castore/src/fixtures.rs index 3ebda64a818a..0e423d348522 100644 --- a/tvix/castore/src/fixtures.rs +++ b/tvix/castore/src/fixtures.rs @@ -1,5 +1,5 @@ use crate::{ - proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode}, + directoryservice::{Directory, DirectoryNode, FileNode, Node, SymlinkNode}, B3Digest, }; use lazy_static::lazy_static; @@ -34,70 +34,72 @@ lazy_static! { pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into(); // Directories - pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory { - directories: vec![], - files: vec![FileNode { - name: b".keep".to_vec().into(), - digest: EMPTY_BLOB_DIGEST.clone().into(), - size: 0, - executable: false, - }], - symlinks: vec![], + pub static ref DIRECTORY_WITH_KEEP: Directory = { + let mut dir = Directory::new(); + dir.add(Node::File(FileNode::new( + b".keep".to_vec().into(), + EMPTY_BLOB_DIGEST.clone(), + 0, + false + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory { - directories: vec![DirectoryNode { - name: b"keep".to_vec().into(), - digest: DIRECTORY_WITH_KEEP.digest().into(), - size: DIRECTORY_WITH_KEEP.size(), - }], - files: vec![FileNode { - name: b".keep".to_vec().into(), - digest: EMPTY_BLOB_DIGEST.clone().into(), - size: 0, - executable: false, - }], - symlinks: vec![SymlinkNode { - name: b"aa".to_vec().into(), - target: b"/nix/store/somewhereelse".to_vec().into(), - }], + pub static ref DIRECTORY_COMPLICATED: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"keep".to_vec().into(), + DIRECTORY_WITH_KEEP.digest(), + DIRECTORY_WITH_KEEP.size() + ).unwrap())).unwrap(); + dir.add(Node::File(FileNode::new( + b".keep".to_vec().into(), + EMPTY_BLOB_DIGEST.clone(), + 0, + false + ).unwrap())).unwrap(); + dir.add(Node::Symlink(SymlinkNode::new( + b"aa".to_vec().into(), + b"/nix/store/somewhereelse".to_vec().into() + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_A: Directory = Directory::default(); - pub static ref DIRECTORY_B: Directory = Directory { - directories: vec![DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - }], - ..Default::default() + pub static ref DIRECTORY_A: Directory = Directory::new(); + pub static ref DIRECTORY_B: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"a".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_C: Directory = Directory { - directories: vec![ - DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - }, - DirectoryNode { - name: b"a'".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - } - ], - ..Default::default() + pub static ref DIRECTORY_C: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"a".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir.add(Node::Directory(DirectoryNode::new( + b"a'".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_D: proto::Directory = proto::Directory { - directories: vec![ - DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - }, - DirectoryNode { - name: b"b'".to_vec().into(), - digest: DIRECTORY_B.digest().into(), - size: DIRECTORY_B.size(), - } - ], - ..Default::default() + pub static ref DIRECTORY_D: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"a".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir.add(Node::Directory(DirectoryNode::new( + b"b".to_vec().into(), + DIRECTORY_B.digest(), + DIRECTORY_B.size(), + ).unwrap())).unwrap(); + dir + }; } diff --git a/tvix/castore/src/fs/fuse/tests.rs b/tvix/castore/src/fs/fuse/tests.rs index bcebcf4a7292..726beb5858c5 100644 --- a/tvix/castore/src/fs/fuse/tests.rs +++ b/tvix/castore/src/fs/fuse/tests.rs @@ -13,11 +13,11 @@ use tokio_stream::{wrappers::ReadDirStream, StreamExt}; use super::FuseDaemon; use crate::fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST}; -use crate::proto as castorepb; -use crate::proto::node::Node; use crate::{ blobservice::{BlobService, MemoryBlobService}, - directoryservice::{DirectoryService, MemoryDirectoryService}, + directoryservice::{ + DirectoryNode, DirectoryService, FileNode, MemoryDirectoryService, Node, SymlinkNode, + }, fixtures, }; @@ -70,12 +70,15 @@ async fn populate_blob_a( root_nodes.insert( BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), - size: fixtures::BLOB_A.len() as u64, - executable: false, - }), + Node::File( + FileNode::new( + BLOB_A_NAME.into(), + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u64, + false, + ) + .unwrap(), + ), ); } @@ -91,12 +94,15 @@ async fn populate_blob_b( root_nodes.insert( BLOB_B_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_B_NAME.into(), - digest: fixtures::BLOB_B_DIGEST.clone().into(), - size: fixtures::BLOB_B.len() as u64, - executable: false, - }), + Node::File( + FileNode::new( + BLOB_B_NAME.into(), + fixtures::BLOB_B_DIGEST.clone(), + fixtures::BLOB_B.len() as u64, + false, + ) + .unwrap(), + ), ); } @@ -116,22 +122,22 @@ async fn populate_blob_helloworld( root_nodes.insert( HELLOWORLD_BLOB_NAME.into(), - Node::File(castorepb::FileNode { - name: HELLOWORLD_BLOB_NAME.into(), - digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), - size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, - executable: true, - }), + Node::File( + FileNode::new( + HELLOWORLD_BLOB_NAME.into(), + fixtures::HELLOWORLD_BLOB_DIGEST.clone(), + fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, + true, + ) + .unwrap(), + ), ); } async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( SYMLINK_NAME.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME.into(), - target: BLOB_A_NAME.into(), - }), + Node::Symlink(SymlinkNode::new(SYMLINK_NAME.into(), BLOB_A_NAME.into()).unwrap()), ); } @@ -140,10 +146,9 @@ async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( SYMLINK_NAME2.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME2.into(), - target: "/nix/store/somewhereelse".into(), - }), + Node::Symlink( + SymlinkNode::new(SYMLINK_NAME2.into(), "/nix/store/somewhereelse".into()).unwrap(), + ), ); } @@ -167,11 +172,14 @@ async fn populate_directory_with_keep( root_nodes.insert( DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), - size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + Node::Directory( + DirectoryNode::new( + DIRECTORY_WITH_KEEP_NAME.into(), + fixtures::DIRECTORY_WITH_KEEP.digest(), + fixtures::DIRECTORY_WITH_KEEP.size(), + ) + .unwrap(), + ), ); } @@ -180,11 +188,14 @@ async fn populate_directory_with_keep( async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), - size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + Node::Directory( + DirectoryNode::new( + DIRECTORY_WITH_KEEP_NAME.into(), + fixtures::DIRECTORY_WITH_KEEP.digest(), + fixtures::DIRECTORY_WITH_KEEP.size(), + ) + .unwrap(), + ), ); } @@ -192,12 +203,15 @@ async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Byte async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), - size: fixtures::BLOB_A.len() as u64, - executable: false, - }), + Node::File( + FileNode::new( + BLOB_A_NAME.into(), + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u64, + false, + ) + .unwrap(), + ), ); } @@ -227,11 +241,14 @@ async fn populate_directory_complicated( root_nodes.insert( DIRECTORY_COMPLICATED_NAME.into(), - Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_COMPLICATED_NAME.into(), - digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), - size: fixtures::DIRECTORY_COMPLICATED.size(), - }), + Node::Directory( + DirectoryNode::new( + DIRECTORY_COMPLICATED_NAME.into(), + fixtures::DIRECTORY_COMPLICATED.digest(), + fixtures::DIRECTORY_COMPLICATED.size(), + ) + .unwrap(), + ), ); } diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs index bdd459543470..379d4ab87318 100644 --- a/tvix/castore/src/fs/inodes.rs +++ b/tvix/castore/src/fs/inodes.rs @@ -4,7 +4,7 @@ use std::time::Duration; use bytes::Bytes; -use crate::proto as castorepb; +use crate::directoryservice::{NamedNode, Node}; use crate::B3Digest; #[derive(Clone, Debug)] @@ -20,27 +20,24 @@ pub enum InodeData { /// lookup and did fetch the data. #[derive(Clone, Debug)] pub enum DirectoryInodeData { - Sparse(B3Digest, u64), // digest, size - Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)] + Sparse(B3Digest, u64), // digest, size + Populated(B3Digest, Vec<(u64, Node)>), // [(child_inode, node)] } impl InodeData { /// Constructs a new InodeData by consuming a [Node]. /// It splits off the orginal name, so it can be used later. - pub fn from_node(node: castorepb::node::Node) -> (Self, Bytes) { + pub fn from_node(node: &Node) -> (Self, Bytes) { match node { - castorepb::node::Node::Directory(n) => ( - Self::Directory(DirectoryInodeData::Sparse( - n.digest.try_into().unwrap(), - n.size, - )), - n.name, + Node::Directory(n) => ( + Self::Directory(DirectoryInodeData::Sparse(n.digest().clone(), n.size())), + n.get_name().clone(), ), - castorepb::node::Node::File(n) => ( - Self::Regular(n.digest.try_into().unwrap(), n.size, n.executable), - n.name, + Node::File(n) => ( + Self::Regular(n.digest().clone(), n.size(), n.executable()), + n.get_name().clone(), ), - castorepb::node::Node::Symlink(n) => (Self::Symlink(n.target), n.name), + Node::Symlink(n) => (Self::Symlink(n.target().clone()), n.get_name().clone()), } } diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index b565ed60ac42..4a6ca88d73fd 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -15,11 +15,9 @@ use self::{ inode_tracker::InodeTracker, inodes::{DirectoryInodeData, InodeData}, }; -use crate::proto as castorepb; use crate::{ blobservice::{BlobReader, BlobService}, - directoryservice::DirectoryService, - proto::{node::Node, NamedNode}, + directoryservice::{DirectoryService, NamedNode, Node}, B3Digest, }; use bstr::ByteVec; @@ -198,13 +196,13 @@ where let children = { let mut inode_tracker = self.inode_tracker.write(); - let children: Vec<(u64, castorepb::node::Node)> = directory + let children: Vec<(u64, Node)> = directory .nodes() .map(|child_node| { - let (inode_data, _) = InodeData::from_node(child_node.clone()); + let (inode_data, _) = InodeData::from_node(child_node); let child_ino = inode_tracker.put(inode_data); - (child_ino, child_node) + (child_ino, child_node.clone()) }) .collect(); @@ -287,7 +285,7 @@ where // insert the (sparse) inode data and register in // self.root_nodes. - let (inode_data, name) = InodeData::from_node(root_node); + let (inode_data, name) = InodeData::from_node(&root_node); let ino = inode_tracker.put(inode_data.clone()); root_nodes.insert(name, ino); @@ -468,7 +466,7 @@ where io::Error::from_raw_os_error(libc::EIO) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let (inode_data, name) = InodeData::from_node(&root_node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -498,7 +496,7 @@ where Span::current().record("directory.digest", parent_digest.to_string()); for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + let (inode_data, name) = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { @@ -555,7 +553,7 @@ where io::Error::from_raw_os_error(libc::EPERM) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let (inode_data, name) = InodeData::from_node(&root_node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -588,7 +586,7 @@ where Span::current().record("directory.digest", parent_digest.to_string()); for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + let (inode_data, name) = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry( diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs index 6609e049a1fc..6d78b243d064 100644 --- a/tvix/castore/src/fs/root_nodes.rs +++ b/tvix/castore/src/fs/root_nodes.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use crate::{proto::node::Node, Error}; +use crate::{directoryservice::Node, Error}; use bytes::Bytes; use futures::stream::BoxStream; use tonic::async_trait; diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index cd5b1290e031..930f33f68b46 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -11,9 +11,8 @@ use tokio_tar::Archive; use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; -use crate::directoryservice::DirectoryService; +use crate::directoryservice::{DirectoryService, Node}; use crate::import::{ingest_entries, IngestionEntry, IngestionError}; -use crate::proto::node::Node; use super::blobs::{self, ConcurrentBlobUploader}; diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index dc7821b8101e..f156c8a73527 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -15,8 +15,7 @@ use walkdir::DirEntry; use walkdir::WalkDir; use crate::blobservice::BlobService; -use crate::directoryservice::DirectoryService; -use crate::proto::node::Node; +use crate::directoryservice::{DirectoryService, Node}; use crate::B3Digest; use super::ingest_entries; diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index a9ac0be6b064..03e2b6c7db41 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -4,14 +4,14 @@ //! Specific implementations, such as ingesting from the filesystem, live in //! child modules. +use crate::directoryservice::Directory; +use crate::directoryservice::DirectoryNode; use crate::directoryservice::DirectoryPutter; use crate::directoryservice::DirectoryService; +use crate::directoryservice::FileNode; +use crate::directoryservice::Node; +use crate::directoryservice::SymlinkNode; use crate::path::{Path, PathBuf}; -use crate::proto::node::Node; -use crate::proto::Directory; -use crate::proto::DirectoryNode; -use crate::proto::FileNode; -use crate::proto::SymlinkNode; use crate::B3Digest; use futures::{Stream, StreamExt}; use tracing::Level; @@ -98,27 +98,36 @@ where IngestionError::UploadDirectoryError(entry.path().to_owned(), e) })?; - Node::Directory(DirectoryNode { - name, - digest: directory_digest.into(), - size: directory_size, - }) + Node::Directory( + DirectoryNode::new(name, directory_digest, directory_size).map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?, + ) } - IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode { - name, - target: target.to_owned().into(), - }), + IngestionEntry::Symlink { ref target, .. } => Node::Symlink( + SymlinkNode::new(name, target.to_owned().into()).map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?, + ), IngestionEntry::Regular { size, executable, digest, .. - } => Node::File(FileNode { - name, - digest: digest.to_owned().into(), - size: *size, - executable: *executable, - }), + } => Node::File( + FileNode::new(name, digest.clone(), *size, *executable).map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?, + ), }; let parent = entry @@ -130,7 +139,16 @@ where break node; } else { // record node in parent directory, creating a new [Directory] if not there yet. - directories.entry(parent.to_owned()).or_default().add(node); + directories + .entry(parent.to_owned()) + .or_default() + .add(node) + .map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?; } }; @@ -156,14 +174,7 @@ where #[cfg(debug_assertions)] { if let Node::Directory(directory_node) = &root_node { - debug_assert_eq!( - root_directory_digest, - directory_node - .digest - .to_vec() - .try_into() - .expect("invalid digest len") - ) + debug_assert_eq!(&root_directory_digest, directory_node.digest()) } else { unreachable!("Tvix bug: directory putter initialized but no root directory node"); } @@ -208,9 +219,8 @@ impl IngestionEntry { mod test { use rstest::rstest; + use crate::directoryservice::{Directory, DirectoryNode, FileNode, Node, SymlinkNode}; use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST}; - use crate::proto::node::Node; - use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode}; use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST}; use super::ingest_entries; @@ -223,18 +233,18 @@ mod test { executable: true, digest: DUMMY_DIGEST.clone(), }], - Node::File(FileNode { name: "foo".into(), digest: DUMMY_DIGEST.clone().into(), size: 42, executable: true } - ))] + Node::File(FileNode::new("foo".into(), DUMMY_DIGEST.clone(), 42, true).unwrap()) + )] #[case::single_symlink(vec![IngestionEntry::Symlink { path: "foo".parse().unwrap(), target: b"blub".into(), }], - Node::Symlink(SymlinkNode { name: "foo".into(), target: "blub".into()}) + Node::Symlink(SymlinkNode::new("foo".into(), "blub".into()).unwrap()) )] #[case::single_dir(vec![IngestionEntry::Dir { path: "foo".parse().unwrap(), }], - Node::Directory(DirectoryNode { name: "foo".into(), digest: Directory::default().digest().into(), size: Directory::default().size()}) + Node::Directory(DirectoryNode::new("foo".into(), Directory::default().digest(), Directory::default().size()).unwrap()) )] #[case::dir_with_keep(vec![ IngestionEntry::Regular { @@ -247,7 +257,7 @@ mod test { path: "foo".parse().unwrap(), }, ], - Node::Directory(DirectoryNode { name: "foo".into(), digest: DIRECTORY_WITH_KEEP.digest().into(), size: DIRECTORY_WITH_KEEP.size() }) + Node::Directory(DirectoryNode::new("foo".into(), DIRECTORY_WITH_KEEP.digest(), DIRECTORY_WITH_KEEP.size()).unwrap()) )] /// This is intentionally a bit unsorted, though it still satisfies all /// requirements we have on the order of elements in the stream. @@ -275,7 +285,7 @@ mod test { path: "blub".parse().unwrap(), }, ], - Node::Directory(DirectoryNode { name: "blub".into(), digest: DIRECTORY_COMPLICATED.digest().into(), size:DIRECTORY_COMPLICATED.size() }) + Node::Directory(DirectoryNode::new("blub".into(), DIRECTORY_COMPLICATED.digest(), DIRECTORY_COMPLICATED.size()).unwrap()) )] #[tokio::test] async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) { diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index 4fca9801c97b..91302e90996c 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -18,7 +18,7 @@ pub mod proto; pub mod tonic; pub use digests::{B3Digest, B3_LEN}; -pub use errors::Error; +pub use errors::{Error, ValidateDirectoryError, ValidateNodeError}; pub use hashing_reader::{B3HashingReader, HashingReader}; #[cfg(test)] diff --git a/tvix/castore/src/path.rs b/tvix/castore/src/path.rs index fcc2bd01fbd6..77cabeccdb8c 100644 --- a/tvix/castore/src/path.rs +++ b/tvix/castore/src/path.rs @@ -10,7 +10,7 @@ use std::{ use bstr::ByteSlice; -use crate::proto::validate_node_name; +use crate::directoryservice::Directory; /// Represents a Path in the castore model. /// These are always relative, and platform-independent, which distinguishes @@ -38,7 +38,7 @@ impl Path { if !bytes.is_empty() { // Ensure all components are valid castore node names. for component in bytes.split_str(b"/") { - validate_node_name(component).ok()?; + Directory::validate_node_name(component).ok()?; } } @@ -211,7 +211,7 @@ impl PathBuf { /// Adjoins `name` to self. pub fn try_push(&mut self, name: &[u8]) -> Result<(), std::io::Error> { - validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?; + Directory::validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?; if !self.inner.is_empty() { self.inner.push(b'/'); diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs index ce1d2bcd244a..e65145509184 100644 --- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs @@ -1,7 +1,5 @@ -use crate::directoryservice::DirectoryGraph; -use crate::directoryservice::LeavesToRootValidator; -use crate::proto; -use crate::{directoryservice::DirectoryService, B3Digest}; +use crate::directoryservice::{DirectoryGraph, DirectoryService, LeavesToRootValidator}; +use crate::{proto, B3Digest, ValidateDirectoryError}; use futures::stream::BoxStream; use futures::TryStreamExt; use std::ops::Deref; @@ -58,13 +56,16 @@ where Status::not_found(format!("directory {} not found", digest)) })?; - Box::pin(once(Ok(directory))) + Box::pin(once(Ok(directory.into()))) } else { // If recursive was requested, traverse via get_recursive. Box::pin( - self.directory_service.get_recursive(&digest).map_err(|e| { - tonic::Status::new(tonic::Code::Internal, e.to_string()) - }), + self.directory_service + .get_recursive(&digest) + .map_ok(proto::Directory::from) + .map_err(|e| { + tonic::Status::new(tonic::Code::Internal, e.to_string()) + }), ) } })) @@ -83,7 +84,9 @@ where let mut validator = DirectoryGraph::<LeavesToRootValidator>::default(); while let Some(directory) = req_inner.message().await? { validator - .add(directory) + .add(directory.try_into().map_err(|e: ValidateDirectoryError| { + tonic::Status::new(tonic::Code::Internal, e.to_string()) + })?) .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?; } diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs index a0cec896f753..7e98cd74c591 100644 --- a/tvix/castore/src/proto/mod.rs +++ b/tvix/castore/src/proto/mod.rs @@ -1,7 +1,4 @@ -#![allow(non_snake_case)] -// https://github.com/hyperium/tonic/issues/1056 -use bstr::ByteSlice; -use std::{collections::HashSet, iter::Peekable, str}; +use std::str; use prost::Message; @@ -11,7 +8,8 @@ mod grpc_directoryservice_wrapper; pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; -use crate::{B3Digest, B3_LEN}; +use crate::directoryservice::NamedNode; +use crate::{B3Digest, ValidateDirectoryError, ValidateNodeError}; tonic::include_proto!("tvix.castore.v1"); @@ -24,38 +22,6 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix #[cfg(test)] mod tests; -/// Errors that can occur during the validation of [Directory] messages. -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub enum ValidateDirectoryError { - /// Elements are not in sorted order - #[error("{:?} is not sorted", .0.as_bstr())] - WrongSorting(Vec<u8>), - /// Multiple elements with the same name encountered - #[error("{:?} is a duplicate name", .0.as_bstr())] - DuplicateName(Vec<u8>), - /// Invalid node - #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())] - InvalidNode(Vec<u8>, ValidateNodeError), - #[error("Total size exceeds u32::MAX")] - SizeOverflow, -} - -/// Errors that occur during Node validation -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub enum ValidateNodeError { - #[error("No node set")] - NoNodeSet, - /// Invalid digest length encountered - #[error("Invalid Digest length: {0}")] - InvalidDigestLen(usize), - /// Invalid name encountered - #[error("Invalid name: {}", .0.as_bstr())] - InvalidName(Vec<u8>), - /// Invalid symlink target - #[error("Invalid symlink target: {}", .0.as_bstr())] - InvalidSymlinkTarget(Vec<u8>), -} - /// Errors that occur during StatBlobResponse validation #[derive(Debug, PartialEq, Eq, thiserror::Error)] pub enum ValidateStatBlobResponseError { @@ -64,186 +30,6 @@ pub enum ValidateStatBlobResponseError { InvalidDigestLen(usize, usize), } -/// Checks a Node name for validity as an intermediate node. -/// We disallow slashes, null bytes, '.', '..' and the empty string. -pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { - if name.is_empty() - || name == b".." - || name == b"." - || name.contains(&0x00) - || name.contains(&b'/') - { - Err(ValidateNodeError::InvalidName(name.to_owned())) - } else { - Ok(()) - } -} - -/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] -/// and [node::Node], so we can ask all of them for the name easily. -pub trait NamedNode { - fn get_name(&self) -> &[u8]; -} - -impl NamedNode for &FileNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for &DirectoryNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for &SymlinkNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for node::Node { - fn get_name(&self) -> &[u8] { - match self { - node::Node::File(node_file) => &node_file.name, - node::Node::Directory(node_directory) => &node_directory.name, - node::Node::Symlink(node_symlink) => &node_symlink.name, - } - } -} - -impl Node { - /// Ensures the node has a valid enum kind (is Some), and passes its - // per-enum validation. - // The inner root node is returned for easier consumption. - pub fn validate(&self) -> Result<&node::Node, ValidateNodeError> { - if let Some(node) = self.node.as_ref() { - node.validate()?; - Ok(node) - } else { - Err(ValidateNodeError::NoNodeSet) - } - } -} - -impl node::Node { - /// Returns the node with a new name. - pub fn rename(self, name: bytes::Bytes) -> Self { - match self { - node::Node::Directory(n) => node::Node::Directory(DirectoryNode { name, ..n }), - node::Node::File(n) => node::Node::File(FileNode { name, ..n }), - node::Node::Symlink(n) => node::Node::Symlink(SymlinkNode { name, ..n }), - } - } - - /// Ensures the node has a valid name, and checks the type-specific fields too. - pub fn validate(&self) -> Result<(), ValidateNodeError> { - match self { - // for a directory root node, ensure the digest has the appropriate size. - node::Node::Directory(directory_node) => { - if directory_node.digest.len() != B3_LEN { - Err(ValidateNodeError::InvalidDigestLen( - directory_node.digest.len(), - ))?; - } - validate_node_name(&directory_node.name) - } - // for a file root node, ensure the digest has the appropriate size. - node::Node::File(file_node) => { - if file_node.digest.len() != B3_LEN { - Err(ValidateNodeError::InvalidDigestLen(file_node.digest.len()))?; - } - validate_node_name(&file_node.name) - } - // ensure the symlink target is not empty and doesn't contain null bytes. - node::Node::Symlink(symlink_node) => { - if symlink_node.target.is_empty() || symlink_node.target.contains(&b'\0') { - Err(ValidateNodeError::InvalidSymlinkTarget( - symlink_node.target.to_vec(), - ))?; - } - validate_node_name(&symlink_node.name) - } - } - } -} - -impl PartialOrd for node::Node { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for node::Node { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for FileNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for FileNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for SymlinkNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for SymlinkNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for DirectoryNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for DirectoryNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -/// Accepts a name, and a mutable reference to the previous name. -/// If the passed name is larger than the previous one, the reference is updated. -/// If it's not, an error is returned. -fn update_if_lt_prev<'n>( - prev_name: &mut &'n [u8], - name: &'n [u8], -) -> Result<(), ValidateDirectoryError> { - if *name < **prev_name { - return Err(ValidateDirectoryError::WrongSorting(name.to_vec())); - } - *prev_name = name; - Ok(()) -} - -/// Inserts the given name into a HashSet if it's not already in there. -/// If it is, an error is returned. -fn insert_once<'n>( - seen_names: &mut HashSet<&'n [u8]>, - name: &'n [u8], -) -> Result<(), ValidateDirectoryError> { - if seen_names.get(name).is_some() { - return Err(ValidateDirectoryError::DuplicateName(name.to_vec())); - } - seen_names.insert(name); - Ok(()) -} - fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) } @@ -280,116 +66,213 @@ impl Directory { .as_bytes() .into() } +} + +/// Accepts a name, and a mutable reference to the previous name. +/// If the passed name is larger than the previous one, the reference is updated. +/// If it's not, an error is returned. +fn update_if_lt_prev<'n>( + prev_name: &mut &'n [u8], + name: &'n [u8], +) -> Result<(), ValidateDirectoryError> { + if *name < **prev_name { + return Err(ValidateDirectoryError::WrongSorting(name.to_vec())); + } + *prev_name = name; + Ok(()) +} - /// validate checks the directory for invalid data, such as: - /// - violations of name restrictions - /// - invalid digest lengths - /// - not properly sorted lists - /// - duplicate names in the three lists - pub fn validate(&self) -> Result<(), ValidateDirectoryError> { - let mut seen_names: HashSet<&[u8]> = HashSet::new(); +impl TryFrom<&node::Node> for crate::directoryservice::Node { + type Error = ValidateNodeError; - let mut last_directory_name: &[u8] = b""; - let mut last_file_name: &[u8] = b""; - let mut last_symlink_name: &[u8] = b""; + fn try_from(node: &node::Node) -> Result<crate::directoryservice::Node, ValidateNodeError> { + Ok(match node { + node::Node::Directory(n) => crate::directoryservice::Node::Directory(n.try_into()?), + node::Node::File(n) => crate::directoryservice::Node::File(n.try_into()?), + node::Node::Symlink(n) => crate::directoryservice::Node::Symlink(n.try_into()?), + }) + } +} - // check directories - for directory_node in &self.directories { - node::Node::Directory(directory_node.clone()) - .validate() - .map_err(|e| { - ValidateDirectoryError::InvalidNode(directory_node.name.to_vec(), e) - })?; +impl TryFrom<&Node> for crate::directoryservice::Node { + type Error = ValidateNodeError; - update_if_lt_prev(&mut last_directory_name, &directory_node.name)?; - insert_once(&mut seen_names, &directory_node.name)?; + fn try_from(node: &Node) -> Result<crate::directoryservice::Node, ValidateNodeError> { + match node { + Node { node: None } => Err(ValidateNodeError::NoNodeSet), + Node { node: Some(node) } => node.try_into(), } + } +} + +impl TryFrom<&DirectoryNode> for crate::directoryservice::DirectoryNode { + type Error = ValidateNodeError; + + fn try_from( + node: &DirectoryNode, + ) -> Result<crate::directoryservice::DirectoryNode, ValidateNodeError> { + crate::directoryservice::DirectoryNode::new( + node.name.clone(), + node.digest.clone().try_into()?, + node.size, + ) + } +} - // check files - for file_node in &self.files { - node::Node::File(file_node.clone()) - .validate() - .map_err(|e| ValidateDirectoryError::InvalidNode(file_node.name.to_vec(), e))?; +impl TryFrom<&SymlinkNode> for crate::directoryservice::SymlinkNode { + type Error = ValidateNodeError; - update_if_lt_prev(&mut last_file_name, &file_node.name)?; - insert_once(&mut seen_names, &file_node.name)?; + fn try_from( + node: &SymlinkNode, + ) -> Result<crate::directoryservice::SymlinkNode, ValidateNodeError> { + crate::directoryservice::SymlinkNode::new(node.name.clone(), node.target.clone()) + } +} + +impl TryFrom<&FileNode> for crate::directoryservice::FileNode { + type Error = ValidateNodeError; + + fn try_from(node: &FileNode) -> Result<crate::directoryservice::FileNode, ValidateNodeError> { + crate::directoryservice::FileNode::new( + node.name.clone(), + node.digest.clone().try_into()?, + node.size, + node.executable, + ) + } +} + +impl TryFrom<Directory> for crate::directoryservice::Directory { + type Error = ValidateDirectoryError; + + fn try_from( + directory: Directory, + ) -> Result<crate::directoryservice::Directory, ValidateDirectoryError> { + (&directory).try_into() + } +} + +impl TryFrom<&Directory> for crate::directoryservice::Directory { + type Error = ValidateDirectoryError; + + fn try_from( + directory: &Directory, + ) -> Result<crate::directoryservice::Directory, ValidateDirectoryError> { + let mut dir = crate::directoryservice::Directory::new(); + let mut last_file_name: &[u8] = b""; + for file in directory.files.iter().map(move |file| { + update_if_lt_prev(&mut last_file_name, &file.name).map(|()| file.clone()) + }) { + let file = file?; + dir.add(crate::directoryservice::Node::File( + (&file) + .try_into() + .map_err(|e| ValidateDirectoryError::InvalidNode(file.name.into(), e))?, + ))?; + } + let mut last_directory_name: &[u8] = b""; + for directory in directory.directories.iter().map(move |directory| { + update_if_lt_prev(&mut last_directory_name, &directory.name).map(|()| directory.clone()) + }) { + let directory = directory?; + dir.add(crate::directoryservice::Node::Directory( + (&directory) + .try_into() + .map_err(|e| ValidateDirectoryError::InvalidNode(directory.name.into(), e))?, + ))?; } + let mut last_symlink_name: &[u8] = b""; + for symlink in directory.symlinks.iter().map(move |symlink| { + update_if_lt_prev(&mut last_symlink_name, &symlink.name).map(|()| symlink.clone()) + }) { + let symlink = symlink?; + dir.add(crate::directoryservice::Node::Symlink( + (&symlink) + .try_into() + .map_err(|e| ValidateDirectoryError::InvalidNode(symlink.name.into(), e))?, + ))?; + } + Ok(dir) + } +} - // check symlinks - for symlink_node in &self.symlinks { - node::Node::Symlink(symlink_node.clone()) - .validate() - .map_err(|e| ValidateDirectoryError::InvalidNode(symlink_node.name.to_vec(), e))?; +impl From<&crate::directoryservice::Node> for node::Node { + fn from(node: &crate::directoryservice::Node) -> node::Node { + match node { + crate::directoryservice::Node::Directory(n) => node::Node::Directory(n.into()), + crate::directoryservice::Node::File(n) => node::Node::File(n.into()), + crate::directoryservice::Node::Symlink(n) => node::Node::Symlink(n.into()), + } + } +} - update_if_lt_prev(&mut last_symlink_name, &symlink_node.name)?; - insert_once(&mut seen_names, &symlink_node.name)?; +impl From<&crate::directoryservice::Node> for Node { + fn from(node: &crate::directoryservice::Node) -> Node { + Node { + node: Some(node.into()), } + } +} - self.size_checked() - .ok_or(ValidateDirectoryError::SizeOverflow)?; +impl From<&crate::directoryservice::DirectoryNode> for DirectoryNode { + fn from(node: &crate::directoryservice::DirectoryNode) -> DirectoryNode { + DirectoryNode { + digest: node.digest().clone().into(), + size: node.size(), + name: node.get_name().clone(), + } + } +} - Ok(()) +impl From<&crate::directoryservice::FileNode> for FileNode { + fn from(node: &crate::directoryservice::FileNode) -> FileNode { + FileNode { + digest: node.digest().clone().into(), + size: node.size(), + name: node.get_name().clone(), + executable: node.executable(), + } } +} - /// Allows iterating over all three nodes ([DirectoryNode], [FileNode], - /// [SymlinkNode]) in an ordered fashion, as long as the individual lists - /// are sorted (which can be checked by the [Directory::validate]). - pub fn nodes(&self) -> DirectoryNodesIterator { - return DirectoryNodesIterator { - i_directories: self.directories.iter().peekable(), - i_files: self.files.iter().peekable(), - i_symlinks: self.symlinks.iter().peekable(), - }; +impl From<&crate::directoryservice::SymlinkNode> for SymlinkNode { + fn from(node: &crate::directoryservice::SymlinkNode) -> SymlinkNode { + SymlinkNode { + name: node.get_name().clone(), + target: node.target().clone(), + } } +} - /// Adds the specified [node::Node] to the [Directory], preserving sorted entries. - /// This assumes the [Directory] to be sorted prior to adding the node. - /// - /// Inserting an element that already exists with the same name in the directory is not - /// supported. - pub fn add(&mut self, node: node::Node) { - debug_assert!( - !self.files.iter().any(|x| x.get_name() == node.get_name()), - "name already exists in files" - ); - debug_assert!( - !self - .directories - .iter() - .any(|x| x.get_name() == node.get_name()), - "name already exists in directories" - ); - debug_assert!( - !self - .symlinks - .iter() - .any(|x| x.get_name() == node.get_name()), - "name already exists in symlinks" - ); +impl From<crate::directoryservice::Directory> for Directory { + fn from(directory: crate::directoryservice::Directory) -> Directory { + (&directory).into() + } +} - match node { - node::Node::File(node) => { - let pos = self - .files - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.files.insert(pos, node); - } - node::Node::Directory(node) => { - let pos = self - .directories - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.directories.insert(pos, node); - } - node::Node::Symlink(node) => { - let pos = self - .symlinks - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.symlinks.insert(pos, node); +impl From<&crate::directoryservice::Directory> for Directory { + fn from(directory: &crate::directoryservice::Directory) -> Directory { + let mut directories = vec![]; + let mut files = vec![]; + let mut symlinks = vec![]; + for node in directory.nodes() { + match node { + crate::directoryservice::Node::File(n) => { + files.push(n.into()); + } + crate::directoryservice::Node::Directory(n) => { + directories.push(n.into()); + } + crate::directoryservice::Node::Symlink(n) => { + symlinks.push(n.into()); + } } } + Directory { + directories, + files, + symlinks, + } } } @@ -409,65 +292,3 @@ impl StatBlobResponse { Ok(()) } } - -/// Struct to hold the state of an iterator over all nodes of a Directory. -/// -/// Internally, this keeps peekable Iterators over all three lists of a -/// Directory message. -pub struct DirectoryNodesIterator<'a> { - // directory: &Directory, - i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>, - i_files: Peekable<std::slice::Iter<'a, FileNode>>, - i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>, -} - -/// looks at two elements implementing NamedNode, and returns true if "left -/// is smaller / comes first". -/// -/// Some(_) is preferred over None. -fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool { - match left { - // if left is None, right always wins - None => false, - Some(left_inner) => { - // left is Some. - match right { - // left is Some, right is None - left wins. - None => true, - Some(right_inner) => { - // both are Some - compare the name. - return left_inner.get_name() < right_inner.get_name(); - } - } - } - } -} - -impl Iterator for DirectoryNodesIterator<'_> { - type Item = node::Node; - - // next returns the next node in the Directory. - // we peek at all three internal iterators, and pick the one with the - // smallest name, to ensure lexicographical ordering. - // The individual lists are already known to be sorted. - fn next(&mut self) -> Option<Self::Item> { - if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) { - // i_directories is still in the game, compare with symlinks - if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) { - self.i_directories - .next() - .cloned() - .map(node::Node::Directory) - } else { - self.i_symlinks.next().cloned().map(node::Node::Symlink) - } - } else { - // i_files is still in the game, compare with symlinks - if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) { - self.i_files.next().cloned().map(node::Node::File) - } else { - self.i_symlinks.next().cloned().map(node::Node::Symlink) - } - } - } -} diff --git a/tvix/castore/src/proto/tests/directory.rs b/tvix/castore/src/proto/tests/directory.rs index 81b73a048d52..78b2cf7668e3 100644 --- a/tvix/castore/src/proto/tests/directory.rs +++ b/tvix/castore/src/proto/tests/directory.rs @@ -1,7 +1,5 @@ -use crate::proto::{ - node, Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError, - ValidateNodeError, -}; +use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError}; +use crate::ValidateNodeError; use hex_literal::hex; @@ -149,7 +147,7 @@ fn digest() { #[test] fn validate_empty() { let d = Directory::default(); - assert_eq!(d.validate(), Ok(())); + assert!(crate::directoryservice::Directory::try_from(d).is_ok()); } #[test] @@ -163,7 +161,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"") } @@ -180,7 +178,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b".") } @@ -198,7 +196,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"..") } @@ -214,7 +212,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"\x00") } @@ -230,7 +228,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"foo/bar") } @@ -249,7 +247,7 @@ fn validate_invalid_digest() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => { assert_eq!(n, 2) } @@ -276,7 +274,7 @@ fn validate_sorting() { ], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::WrongSorting(s) => { assert_eq!(s, b"a"); } @@ -301,7 +299,7 @@ fn validate_sorting() { ], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::DuplicateName(s) => { assert_eq!(s, b"a"); } @@ -327,7 +325,7 @@ fn validate_sorting() { ..Default::default() }; - d.validate().expect("validate shouldn't error"); + crate::directoryservice::Directory::try_from(d).expect("validate shouldn't error"); } // [b, c] and [a] are both properly sorted. @@ -352,101 +350,6 @@ fn validate_sorting() { ..Default::default() }; - d.validate().expect("validate shouldn't error"); + crate::directoryservice::Directory::try_from(d).expect("validate shouldn't error"); } } - -#[test] -fn validate_overflow() { - let d = Directory { - directories: vec![DirectoryNode { - name: "foo".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: u64::MAX, - }], - ..Default::default() - }; - - match d.validate().expect_err("must fail") { - ValidateDirectoryError::SizeOverflow => {} - _ => panic!("unexpected error"), - } -} - -#[test] -fn add_nodes_to_directory() { - let mut d = Directory { - ..Default::default() - }; - - d.add(node::Node::Directory(DirectoryNode { - name: "b".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::Directory(DirectoryNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::Directory(DirectoryNode { - name: "z".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - - d.add(node::Node::File(FileNode { - name: "f".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - d.add(node::Node::File(FileNode { - name: "c".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - d.add(node::Node::File(FileNode { - name: "g".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - - d.add(node::Node::Symlink(SymlinkNode { - name: "t".into(), - target: "a".into(), - })); - d.add(node::Node::Symlink(SymlinkNode { - name: "o".into(), - target: "a".into(), - })); - d.add(node::Node::Symlink(SymlinkNode { - name: "e".into(), - target: "a".into(), - })); - - d.validate().expect("directory should be valid"); -} - -#[test] -#[cfg_attr(not(debug_assertions), ignore)] -#[should_panic = "name already exists in directories"] -fn add_duplicate_node_to_directory_panics() { - let mut d = Directory { - ..Default::default() - }; - - d.add(node::Node::Directory(DirectoryNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::File(FileNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); -} diff --git a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs deleted file mode 100644 index 68f147a33210..000000000000 --- a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::proto::Directory; -use crate::proto::DirectoryNode; -use crate::proto::FileNode; -use crate::proto::NamedNode; -use crate::proto::SymlinkNode; - -#[test] -fn iterator() { - let d = Directory { - directories: vec![ - DirectoryNode { - name: "c".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "d".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "h".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "l".into(), - ..DirectoryNode::default() - }, - ], - files: vec![ - FileNode { - name: "b".into(), - ..FileNode::default() - }, - FileNode { - name: "e".into(), - ..FileNode::default() - }, - FileNode { - name: "g".into(), - ..FileNode::default() - }, - FileNode { - name: "j".into(), - ..FileNode::default() - }, - ], - symlinks: vec![ - SymlinkNode { - name: "a".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "f".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "i".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "k".into(), - ..SymlinkNode::default() - }, - ], - }; - - // We keep this strings here and convert to string to make the comparison - // less messy. - let mut node_names: Vec<String> = vec![]; - - for node in d.nodes() { - node_names.push(String::from_utf8(node.get_name().to_vec()).unwrap()); - } - - assert_eq!( - vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"], - node_names - ); -} diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs index 8d903bacb6c5..74334029e84c 100644 --- a/tvix/castore/src/proto/tests/mod.rs +++ b/tvix/castore/src/proto/tests/mod.rs @@ -1,2 +1 @@ mod directory; -mod directory_nodes_iterator; diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs index 72fb06aea877..ba54078653e2 100644 --- a/tvix/castore/src/tests/import.rs +++ b/tvix/castore/src/tests/import.rs @@ -1,5 +1,6 @@ use crate::blobservice::{self, BlobService}; use crate::directoryservice; +use crate::directoryservice::{DirectoryNode, Node, SymlinkNode}; use crate::fixtures::*; use crate::import::fs::ingest_path; use crate::proto; @@ -33,10 +34,9 @@ async fn symlink() { .expect("must succeed"); assert_eq!( - proto::node::Node::Symlink(proto::SymlinkNode { - name: "doesntmatter".into(), - target: "/nix/store/somewhereelse".into(), - }), + Node::Symlink( + SymlinkNode::new("doesntmatter".into(), "/nix/store/somewhereelse".into(),).unwrap() + ), root_node, ) } @@ -65,7 +65,7 @@ async fn single_file() { size: HELLOWORLD_BLOB_CONTENTS.len() as u64, executable: false, }), - root_node, + (&root_node).into(), ); // ensure the blob has been uploaded @@ -95,17 +95,20 @@ async fn complicated() { // ensure root_node matched expectations assert_eq!( - proto::node::Node::Directory(proto::DirectoryNode { - name: tmpdir - .path() - .file_name() - .unwrap() - .as_bytes() - .to_owned() - .into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), + Node::Directory( + DirectoryNode::new( + tmpdir + .path() + .file_name() + .unwrap() + .as_bytes() + .to_owned() + .into(), + DIRECTORY_COMPLICATED.digest().clone(), + DIRECTORY_COMPLICATED.size(), + ) + .unwrap() + ), root_node, ); |