diff options
author | Yureka <tvl@yuka.dev> | 2024-07-29T12·34+0200 |
---|---|---|
committer | yuka <tvl@yuka.dev> | 2024-08-13T12·17+0000 |
commit | 3ca0b53840b352b24f3a315404df11458b0bdbbb (patch) | |
tree | 2635e8d67d0c334cc5760dc4205b7515c6283a77 | |
parent | 5d3f3158d6102baee48d2772e85f05cfc1fac95e (diff) |
refactor(tvix/castore): use Directory struct separate from proto one r/8484
This uses our own data type to deal with Directories in the castore model. It makes some undesired states unrepresentable, removing the need for conversions and checking in various places: - In the protobuf, blake3 digests could have a wrong length, as proto doesn't know fixed-size fields. We now use `B3Digest`, which makes cloning cheaper, and removes the need to do size-checking everywhere. - In the protobuf, we had three different lists for `files`, `symlinks` and `directories`. This was mostly a protobuf size optimization, but made interacting with them a bit awkward. This has now been replaced with a list of enums, and convenience iterators to get various nodes, and add new ones. Change-Id: I7b92691bb06d77ff3f58a5ccea94a22c16f84f04 Reviewed-on: https://cl.tvl.fyi/c/depot/+/12057 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
53 files changed, 1417 insertions, 1365 deletions
diff --git a/tvix/build/src/proto/mod.rs b/tvix/build/src/proto/mod.rs index e359b5b5b70e..528817b9707d 100644 --- a/tvix/build/src/proto/mod.rs +++ b/tvix/build/src/proto/mod.rs @@ -1,7 +1,9 @@ use std::path::{Path, PathBuf}; use itertools::Itertools; -use tvix_castore::proto::{NamedNode, ValidateNodeError}; +use tvix_castore::directoryservice::NamedNode; +use tvix_castore::directoryservice::Node; +use tvix_castore::ValidateNodeError; mod grpc_buildservice_wrapper; @@ -123,18 +125,17 @@ impl BuildRequest { /// and all restrictions around paths themselves (relative, clean, …) need // to be fulfilled. pub fn validate(&self) -> Result<(), ValidateBuildRequestError> { - // validate all input nodes - for (i, n) in self.inputs.iter().enumerate() { - // ensure the input node itself is valid - n.validate() - .map_err(|e| ValidateBuildRequestError::InvalidInputNode(i, e))?; - } - // now we can look at the names, and make sure they're sorted. if !is_sorted( self.inputs .iter() - .map(|e| e.node.as_ref().unwrap().get_name()), + // TODO(flokli) handle conversion errors and store result somewhere + .map(|e| { + Node::try_from(e.node.as_ref().unwrap()) + .unwrap() + .get_name() + .clone() + }), ) { Err(ValidateBuildRequestError::InputNodesNotSorted)? } diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs index ef9a7326b3fb..4d919ff0d873 100644 --- a/tvix/castore/src/digests.rs +++ b/tvix/castore/src/digests.rs @@ -6,7 +6,7 @@ use thiserror::Error; pub struct B3Digest(Bytes); // TODO: allow converting these errors to crate::Error -#[derive(Error, Debug)] +#[derive(Error, Debug, PartialEq)] pub enum Error { #[error("invalid digest length: {0}")] InvalidDigestLen(usize), diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index d10dddaf9f60..73ab4342d832 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -9,7 +9,9 @@ use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, trace, warn}; -use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter}; +use super::{ + utils::traverse_directory, Directory, DirectoryPutter, DirectoryService, SimplePutter, +}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; @@ -149,7 +151,7 @@ fn derive_directory_key(digest: &B3Digest) -> String { #[async_trait] impl DirectoryService for BigtableDirectoryService { #[instrument(skip(self, digest), err, fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let mut client = self.client.clone(); let directory_key = derive_directory_key(digest); @@ -241,28 +243,20 @@ impl DirectoryService for BigtableDirectoryService { // Try to parse the value into a Directory message. let directory = proto::Directory::decode(Bytes::from(row_cell.value)) - .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?; - - // validate the Directory. - directory - .validate() + .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))? + .try_into() .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?; Ok(Some(directory)) } #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let directory_digest = directory.digest(); let mut client = self.client.clone(); let directory_key = derive_directory_key(&directory_digest); - // Ensure the directory we're trying to upload passes validation - directory - .validate() - .map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?; - - let data = directory.encode_to_vec(); + let data = proto::Directory::from(directory).encode_to_vec(); if data.len() as u64 > CELL_SIZE_LIMIT { return Err(Error::StorageError( "Directory exceeds cell limit on Bigtable".into(), @@ -310,7 +304,7 @@ impl DirectoryService for BigtableDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs index 0fdc82c16cb0..4283142231f9 100644 --- a/tvix/castore/src/directoryservice/combinators.rs +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -7,10 +7,9 @@ use futures::TryStreamExt; use tonic::async_trait; use tracing::{instrument, trace}; -use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; +use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::directoryservice::DirectoryPutter; -use crate::proto; use crate::B3Digest; use crate::Error; @@ -40,7 +39,7 @@ where DS2: DirectoryService + Clone + 'static, { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { match self.near.get(digest).await? { Some(directory) => { trace!("serving from cache"); @@ -82,7 +81,7 @@ where } #[instrument(skip_all)] - async fn put(&self, _directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> { Err(Error::StorageError("unimplemented".to_string())) } @@ -90,7 +89,7 @@ where fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let near = self.near.clone(); let far = self.far.clone(); let digest = root_directory_digest.clone(); diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs index e6b9b163370c..aa60f68a3197 100644 --- a/tvix/castore/src/directoryservice/directory_graph.rs +++ b/tvix/castore/src/directoryservice/directory_graph.rs @@ -10,10 +10,8 @@ use petgraph::{ use tracing::instrument; use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; -use crate::{ - proto::{self, Directory, DirectoryNode}, - B3Digest, -}; +use super::{Directory, DirectoryNode}; +use crate::B3Digest; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -88,7 +86,7 @@ fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> { impl DirectoryGraph<LeavesToRootValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { if !self.order_validator.add_directory(&directory) { return Err(Error::ValidationError( "unknown directory was referenced".into(), @@ -108,7 +106,7 @@ impl DirectoryGraph<RootToLeavesValidator> { /// Insert a new Directory into the closure #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)] - pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { + pub fn add(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); if !self.order_validator.digest_allowed(&digest) { return Err(Error::ValidationError("unexpected digest".into())); @@ -129,12 +127,7 @@ impl<O: OrderValidator> DirectoryGraph<O> { } /// Adds a directory which has already been confirmed to be in-order to the graph - pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> { - // Do some basic validation - directory - .validate() - .map_err(|e| Error::ValidationError(e.to_string()))?; - + pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> { let digest = directory.digest(); // Teach the graph about the existence of a node with this digest @@ -149,12 +142,10 @@ impl<O: OrderValidator> DirectoryGraph<O> { } // set up edges to all child directories - for subdir in &directory.directories { - let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap(); - + for subdir in directory.directories() { let child_ix = *self .digest_to_node_ix - .entry(subdir_digest) + .entry(subdir.digest.clone()) .or_insert_with(|| self.graph.add_node(None)); let pending_edge_check = match &self.graph[child_ix] { @@ -266,37 +257,36 @@ impl ValidatedDirectoryGraph { .filter_map(move |i| nodes[i.index()].weight.take()) } } - -#[cfg(test)] -mod tests { - use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}, - proto::{self, Directory}, - }; - use lazy_static::lazy_static; - use rstest::rstest; - - lazy_static! { +/* pub static ref BROKEN_DIRECTORY : Directory = Directory { - symlinks: vec![proto::SymlinkNode { + symlinks: vec![SymlinkNode { name: "".into(), // invalid name! target: "doesntmatter".into(), }], ..Default::default() }; +*/ +#[cfg(test)] +mod tests { + use crate::directoryservice::{Directory, DirectoryNode, Node}; + use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; + use lazy_static::lazy_static; + use rstest::rstest; - pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() + use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; + + lazy_static! { + pub static ref BROKEN_PARENT_DIRECTORY: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + "foo".into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size() + 42, // wrong! + ).unwrap())).unwrap(); + dir }; } - use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator}; - #[rstest] /// Uploading an empty directory should succeed. #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))] @@ -312,8 +302,6 @@ mod tests { #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)] /// Uploading B (referring to A) should fail immediately, because A was never uploaded. #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)] - /// Uploading a directory failing validation should fail immediately. - #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)] /// Uploading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)] fn test_uploads( @@ -366,8 +354,6 @@ mod tests { #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)] /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root). #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)] - /// Downloading a directory failing validation should fail immediately. - #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)] /// Downloading a directory which refers to another Directory with a wrong size should fail. #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)] fn test_downloads( diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 4dc3931ed410..2079514d2b79 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,8 +1,9 @@ use std::collections::HashSet; -use super::{DirectoryPutter, DirectoryService}; +use super::{Directory, DirectoryPutter, DirectoryService}; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::proto::{self, get_directory_request::ByWhat}; +use crate::ValidateDirectoryError; use crate::{B3Digest, Error}; use async_stream::try_stream; use futures::stream::BoxStream; @@ -41,10 +42,7 @@ where T::Future: Send, { #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] - async fn get( - &self, - digest: &B3Digest, - ) -> Result<Option<crate::proto::Directory>, crate::Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest_cpy = digest.clone(); @@ -72,15 +70,10 @@ where "requested directory with digest {}, but got {}", digest, actual_digest ))) - } else if let Err(e) = directory.validate() { - // Validate the Directory itself is valid. - warn!("directory failed validation: {}", e.to_string()); - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - digest, e, - ))) } else { - Ok(Some(directory)) + Ok(Some(directory.try_into().map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?)) } } Ok(None) => Ok(None), @@ -90,11 +83,11 @@ where } #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> { let resp = self .grpc_client .clone() - .put(tokio_stream::once(directory)) + .put(tokio_stream::once(proto::Directory::from(directory))) .await; match resp { @@ -113,7 +106,7 @@ where fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest = root_directory_digest.clone(); @@ -135,14 +128,6 @@ where loop { match stream.message().await { Ok(Some(directory)) => { - // validate the directory itself. - if let Err(e) = directory.validate() { - Err(crate::Error::StorageError(format!( - "directory {} failed validation: {}", - directory.digest(), - e, - )))?; - } // validate we actually expected that directory, and move it from expected to received. let directory_digest = directory.digest(); let was_expected = expected_directory_digests.remove(&directory_digest); @@ -168,6 +153,9 @@ where .insert(child_directory_digest); } + let directory = directory.try_into() + .map_err(|e: ValidateDirectoryError| Error::StorageError(e.to_string()))?; + yield directory; }, Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => { @@ -279,11 +267,11 @@ pub struct GRPCPutter { #[async_trait] impl DirectoryPutter for GRPCPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> { match self.rq { // If we're not already closed, send the directory to directory_sender. Some((_, ref directory_sender)) => { - if directory_sender.send(directory).is_err() { + if directory_sender.send(directory.into()).is_err() { // If the channel has been prematurely closed, invoke close (so we can peek at the error code) // That error code is much more helpful, because it // contains the error message from the server. diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index ada4606a5a57..b039d9bc7d84 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -1,4 +1,4 @@ -use crate::{proto, B3Digest, Error}; +use crate::{B3Digest, Error}; use futures::stream::BoxStream; use std::collections::HashMap; use std::sync::Arc; @@ -7,8 +7,9 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryPutter, DirectoryService, SimplePutter}; +use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter}; use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::proto; #[derive(Clone, Default)] pub struct MemoryDirectoryService { @@ -18,7 +19,7 @@ pub struct MemoryDirectoryService { #[async_trait] impl DirectoryService for MemoryDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.read().await; match db.get(digest) { @@ -37,35 +38,20 @@ impl DirectoryService for MemoryDirectoryService { ))); } - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } - - Ok(Some(directory.clone())) + Ok(Some(directory.clone().try_into().map_err(|e| { + crate::Error::StorageError(format!("corrupted directory: {}", e)) + })?)) } } } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } - // store it let mut db = self.db.write().await; - db.insert(digest.clone(), directory); + db.insert(digest.clone(), directory.into()); Ok(digest) } @@ -74,7 +60,7 @@ impl DirectoryService for MemoryDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index 17a78b179349..0141a48f75bc 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -1,6 +1,9 @@ use crate::composition::{Registry, ServiceBuilder}; -use crate::{proto, B3Digest, Error}; +use crate::proto; +use crate::{B3Digest, Error}; +use crate::{ValidateDirectoryError, ValidateNodeError}; +use bytes::Bytes; use futures::stream::BoxStream; use tonic::async_trait; mod combinators; @@ -38,7 +41,7 @@ mod bigtable; pub use self::bigtable::{BigtableDirectoryService, BigtableParameters}; /// The base trait all Directory services need to implement. -/// This is a simple get and put of [crate::proto::Directory], returning their +/// This is a simple get and put of [Directory], returning their /// digest. #[async_trait] pub trait DirectoryService: Send + Sync { @@ -50,14 +53,14 @@ pub trait DirectoryService: Send + Sync { /// Directory digests that are at the "root", aka the last element that's /// sent to a DirectoryPutter. This makes sense for implementations bundling /// closures of directories together in batches. - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error>; /// Uploads a single Directory message, and returns the calculated /// digest, or an error. An error *must* also be returned if the message is /// not valid. - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; + async fn put(&self, directory: Directory) -> Result<B3Digest, Error>; - /// Looks up a closure of [proto::Directory]. - /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`, + /// Looks up a closure of [Directory]. + /// Ideally this would be a `impl Stream<Item = Result<Directory, Error>>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. /// @@ -75,9 +78,9 @@ pub trait DirectoryService: Send + Sync { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>>; + ) -> BoxStream<'static, Result<Directory, Error>>; - /// Allows persisting a closure of [proto::Directory], which is a graph of + /// Allows persisting a closure of [Directory], which is a graph of /// connected Directory messages. fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; } @@ -87,18 +90,18 @@ impl<A> DirectoryService for A where A: AsRef<dyn DirectoryService> + Send + Sync, { - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.as_ref().get(digest).await } - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { self.as_ref().put(directory).await } fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { self.as_ref().get_recursive(root_directory_digest) } @@ -107,7 +110,7 @@ where } } -/// Provides a handle to put a closure of connected [proto::Directory] elements. +/// Provides a handle to put a closure of connected [Directory] elements. /// /// The consumer can periodically call [DirectoryPutter::put], starting from the /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to @@ -119,12 +122,12 @@ where /// but a single file or symlink. #[async_trait] pub trait DirectoryPutter: Send { - /// Put a individual [proto::Directory] into the store. + /// Put a individual [Directory] into the store. /// Error semantics and behaviour is up to the specific implementation of /// this trait. /// Due to bursting, the returned error might refer to an object previously /// sent via `put`. - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; + async fn put(&mut self, directory: Directory) -> Result<(), Error>; /// Close the stream, and wait for any errors. /// If there's been any invalid Directory message uploaded, and error *must* @@ -145,3 +148,461 @@ pub(crate) fn register_directory_services(reg: &mut Registry) { reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable"); } } + +/// A Directory can contain Directory, File or Symlink nodes. +/// Each of these nodes have a name attribute, which is the basename in that +/// directory and node type specific attributes. +/// While a Node by itself may have any name, the names of Directory entries: +/// - MUST not contain slashes or null bytes +/// - MUST not be '.' or '..' +/// - MUST be unique across all three lists +#[derive(Default, Debug, Clone, PartialEq, Eq)] +pub struct Directory { + nodes: Vec<Node>, +} + +/// A DirectoryNode is a pointer to a [Directory], by its [Directory::digest]. +/// It also gives it a `name` and `size`. +/// Such a node is either an element in the [Directory] it itself is contained in, +/// or a standalone root node./ +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DirectoryNode { + /// The (base)name of the directory + name: Bytes, + /// The blake3 hash of a Directory message, serialized in protobuf canonical form. + digest: B3Digest, + /// Number of child elements in the Directory referred to by `digest`. + /// Calculated by summing up the numbers of nodes, and for each directory. + /// its size field. Can be used for inode allocation. + /// This field is precisely as verifiable as any other Merkle tree edge. + /// Resolve `digest`, and you can compute it incrementally. Resolve the entire + /// tree, and you can fully compute it from scratch. + /// A credulous implementation won't reject an excessive size, but this is + /// harmless: you'll have some ordinals without nodes. Undersizing is obvious + /// and easy to reject: you won't have an ordinal for some nodes. + size: u64, +} + +impl DirectoryNode { + pub fn new(name: Bytes, digest: B3Digest, size: u64) -> Result<Self, ValidateNodeError> { + Ok(Self { name, digest, size }) + } + + pub fn digest(&self) -> &B3Digest { + &self.digest + } + pub fn size(&self) -> u64 { + self.size + } +} + +/// A FileNode represents a regular or executable file in a Directory or at the root. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FileNode { + /// The (base)name of the file + name: Bytes, + + /// The blake3 digest of the file contents + digest: B3Digest, + + /// The file content size + size: u64, + + /// Whether the file is executable + executable: bool, +} + +impl FileNode { + pub fn new( + name: Bytes, + digest: B3Digest, + size: u64, + executable: bool, + ) -> Result<Self, ValidateNodeError> { + Ok(Self { + name, + digest, + size, + executable, + }) + } + + pub fn digest(&self) -> &B3Digest { + &self.digest + } + + pub fn size(&self) -> u64 { + self.size + } + + pub fn executable(&self) -> bool { + self.executable + } +} + +/// A SymlinkNode represents a symbolic link in a Directory or at the root. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SymlinkNode { + /// The (base)name of the symlink + name: Bytes, + /// The target of the symlink. + target: Bytes, +} + +impl SymlinkNode { + pub fn new(name: Bytes, target: Bytes) -> Result<Self, ValidateNodeError> { + if target.is_empty() || target.contains(&b'\0') { + return Err(ValidateNodeError::InvalidSymlinkTarget(target)); + } + Ok(Self { name, target }) + } + + pub fn target(&self) -> &bytes::Bytes { + &self.target + } +} + +/// A Node is either a [DirectoryNode], [FileNode] or [SymlinkNode]. +/// While a Node by itself may have any name, only those matching specific requirements +/// can can be added as entries to a [Directory] (see the documentation on [Directory] for details). +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Node { + Directory(DirectoryNode), + File(FileNode), + Symlink(SymlinkNode), +} + +/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] +/// and [Node], so we can ask all of them for the name easily. +pub trait NamedNode { + fn get_name(&self) -> &bytes::Bytes; +} + +impl NamedNode for &FileNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for FileNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &DirectoryNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for DirectoryNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &SymlinkNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} +impl NamedNode for SymlinkNode { + fn get_name(&self) -> &bytes::Bytes { + &self.name + } +} + +impl NamedNode for &Node { + fn get_name(&self) -> &bytes::Bytes { + match self { + Node::File(node_file) => &node_file.name, + Node::Directory(node_directory) => &node_directory.name, + Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} +impl NamedNode for Node { + fn get_name(&self) -> &bytes::Bytes { + match self { + Node::File(node_file) => &node_file.name, + Node::Directory(node_directory) => &node_directory.name, + Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} + +impl Node { + /// Returns the node with a new name. + pub fn rename(self, name: bytes::Bytes) -> Self { + match self { + Node::Directory(n) => Node::Directory(DirectoryNode { name, ..n }), + Node::File(n) => Node::File(FileNode { name, ..n }), + Node::Symlink(n) => Node::Symlink(SymlinkNode { name, ..n }), + } + } +} + +impl PartialOrd for Node { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for Node { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for FileNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for FileNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for DirectoryNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for DirectoryNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for SymlinkNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for SymlinkNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { + iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) +} + +impl Directory { + pub fn new() -> Self { + Directory { nodes: vec![] } + } + + /// The size of a directory is the number of all regular and symlink elements, + /// the number of directory elements, and their size fields. + pub fn size(&self) -> u64 { + // It's impossible to create a Directory where the size overflows, because we + // check before every add() that the size won't overflow. + (self.nodes.len() as u64) + self.directories().map(|e| e.size).sum::<u64>() + } + + /// Calculates the digest of a Directory, which is the blake3 hash of a + /// Directory protobuf message, serialized in protobuf canonical form. + pub fn digest(&self) -> B3Digest { + proto::Directory::from(self).digest() + } + + /// Allows iterating over all nodes (directories, files and symlinks) + /// ordered by their name. + pub fn nodes(&self) -> impl Iterator<Item = &Node> + Send + Sync + '_ { + self.nodes.iter() + } + + /// Allows iterating over the FileNode entries of this directory + /// ordered by their name + pub fn files(&self) -> impl Iterator<Item = &FileNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::File(n) => Some(n), + _ => None, + }) + } + + /// Allows iterating over the subdirectories of this directory + /// ordered by their name + pub fn directories(&self) -> impl Iterator<Item = &DirectoryNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::Directory(n) => Some(n), + _ => None, + }) + } + + /// Allows iterating over the SymlinkNode entries of this directory + /// ordered by their name + pub fn symlinks(&self) -> impl Iterator<Item = &SymlinkNode> + Send + Sync + '_ { + self.nodes.iter().filter_map(|node| match node { + Node::Symlink(n) => Some(n), + _ => None, + }) + } + + /// Checks a Node name for validity as a directory entry + /// We disallow slashes, null bytes, '.', '..' and the empty string. + pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { + if name.is_empty() + || name == b".." + || name == b"." + || name.contains(&0x00) + || name.contains(&b'/') + { + Err(ValidateNodeError::InvalidName(name.to_owned().into())) + } else { + Ok(()) + } + } + + /// Adds the specified [Node] to the [Directory], preserving sorted entries. + /// + /// Inserting an element that already exists with the same name in the directory will yield an + /// error. + /// Inserting an element will validate that its name fulfills the stricter requirements for + /// directory entries and yield an error if it is not. + pub fn add(&mut self, node: Node) -> Result<(), ValidateDirectoryError> { + Self::validate_node_name(node.get_name()) + .map_err(|e| ValidateDirectoryError::InvalidNode(node.get_name().clone().into(), e))?; + + // Check that the even after adding this new directory entry, the size calculation will not + // overflow + // FUTUREWORK: add some sort of batch add interface which only does this check once with + // all the to-be-added entries + checked_sum([ + self.size(), + 1, + match node { + Node::Directory(ref dir) => dir.size, + _ => 0, + }, + ]) + .ok_or(ValidateDirectoryError::SizeOverflow)?; + + // This assumes the [Directory] is sorted, since we don't allow accessing the nodes list + // directly and all previous inserts should have been in-order + let pos = match self + .nodes + .binary_search_by_key(&node.get_name(), |n| n.get_name()) + { + Err(pos) => pos, // There is no node with this name; good! + Ok(_) => { + return Err(ValidateDirectoryError::DuplicateName( + node.get_name().to_vec(), + )) + } + }; + + self.nodes.insert(pos, node); + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::{Directory, DirectoryNode, FileNode, Node, SymlinkNode}; + use crate::fixtures::DUMMY_DIGEST; + use crate::ValidateDirectoryError; + + #[test] + fn add_nodes_to_directory() { + let mut d = Directory::new(); + + d.add(Node::Directory( + DirectoryNode::new("b".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + d.add(Node::Directory( + DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + d.add(Node::Directory( + DirectoryNode::new("z".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + + d.add(Node::File( + FileNode::new("f".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + d.add(Node::File( + FileNode::new("c".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + d.add(Node::File( + FileNode::new("g".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .unwrap(); + + d.add(Node::Symlink( + SymlinkNode::new("t".into(), "a".into()).unwrap(), + )) + .unwrap(); + d.add(Node::Symlink( + SymlinkNode::new("o".into(), "a".into()).unwrap(), + )) + .unwrap(); + d.add(Node::Symlink( + SymlinkNode::new("e".into(), "a".into()).unwrap(), + )) + .unwrap(); + + // Convert to proto struct and back to ensure we are not generating any invalid structures + crate::directoryservice::Directory::try_from(crate::proto::Directory::from(d)) + .expect("directory should be valid"); + } + + #[test] + fn validate_overflow() { + let mut d = Directory::new(); + + assert_eq!( + d.add(Node::Directory( + DirectoryNode::new("foo".into(), DUMMY_DIGEST.clone(), u64::MAX).unwrap(), + )), + Err(ValidateDirectoryError::SizeOverflow) + ); + } + + #[test] + fn add_duplicate_node_to_directory() { + let mut d = Directory::new(); + + d.add(Node::Directory( + DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(), + )) + .unwrap(); + assert_eq!( + format!( + "{}", + d.add(Node::File( + FileNode::new("a".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(), + )) + .expect_err("adding duplicate dir entry must fail") + ), + "\"a\" is a duplicate name" + ); + } + + /// Attempt to add a directory entry with a name which should be rejected. + #[tokio::test] + async fn directory_reject_invalid_name() { + let mut dir = Directory::new(); + assert!( + dir.add(Node::Symlink( + SymlinkNode::new( + "".into(), // wrong! can not be added to directory + "doesntmatter".into(), + ) + .unwrap() + )) + .is_err(), + "invalid symlink entry be rejected" + ); + } +} diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index a9a2cc8ef5c0..6e71c2356def 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -17,7 +17,8 @@ use tracing::{instrument, trace, warn, Level}; use url::Url; use super::{ - DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator, + Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, + RootToLeavesValidator, }; use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; @@ -78,13 +79,13 @@ impl DirectoryService for ObjectStoreDirectoryService { /// This is the same steps as for get_recursive anyways, so we just call get_recursive and /// return the first element of the stream and drop the request. #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { self.get_recursive(digest).take(1).next().await.transpose() } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { - if !directory.directories.is_empty() { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { + if directory.directories().next().is_some() { return Err(Error::InvalidRequest( "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(), )); @@ -99,7 +100,7 @@ impl DirectoryService for ObjectStoreDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // Check that we are not passing on bogus from the object store to the client, and that the // trust chain from the root digest to the leaves is intact let mut order_validator = @@ -145,6 +146,10 @@ impl DirectoryService for ObjectStoreDirectoryService { warn!("unable to parse directory {}: {}", digest, e); Error::StorageError(e.to_string()) })?; + let directory = Directory::try_from(directory).map_err(|e| { + warn!("unable to convert directory {}: {}", digest, e); + Error::StorageError(e.to_string()) + })?; // Allow the children to appear next order_validator.add_directory_unchecked(&directory); @@ -244,7 +249,7 @@ impl ObjectStoreDirectoryPutter { #[async_trait] impl DirectoryPutter for ObjectStoreDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -302,7 +307,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter { for directory in directories { directories_sink - .send(directory.encode_to_vec().into()) + .send(proto::Directory::from(directory).encode_to_vec().into()) .await?; } diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs index 6045f5d24198..17431fc8d3b0 100644 --- a/tvix/castore/src/directoryservice/order_validator.rs +++ b/tvix/castore/src/directoryservice/order_validator.rs @@ -1,7 +1,8 @@ use std::collections::HashSet; use tracing::warn; -use crate::{proto::Directory, B3Digest}; +use super::Directory; +use crate::B3Digest; pub trait OrderValidator { /// Update the order validator's state with the directory @@ -47,10 +48,9 @@ impl RootToLeavesValidator { self.expected_digests.insert(directory.digest()); } - for subdir in &directory.directories { + for subdir in directory.directories() { // Allow the children to appear next - let subdir_digest = subdir.digest.clone().try_into().unwrap(); - self.expected_digests.insert(subdir_digest); + self.expected_digests.insert(subdir.digest.clone()); } } } @@ -79,12 +79,11 @@ impl OrderValidator for LeavesToRootValidator { fn add_directory(&mut self, directory: &Directory) -> bool { let digest = directory.digest(); - for subdir in &directory.directories { - let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory() - if !self.allowed_references.contains(&subdir_digest) { + for subdir in directory.directories() { + if !self.allowed_references.contains(&subdir.digest) { warn!( directory.digest = %digest, - subdirectory.digest = %subdir_digest, + subdirectory.digest = %subdir.digest, "unexpected directory reference" ); return false; @@ -101,8 +100,8 @@ impl OrderValidator for LeavesToRootValidator { mod tests { use super::{LeavesToRootValidator, RootToLeavesValidator}; use crate::directoryservice::order_validator::OrderValidator; + use crate::directoryservice::Directory; use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C}; - use crate::proto::Directory; use rstest::rstest; #[rstest] diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs index 51dc87f92574..d253df503bb3 100644 --- a/tvix/castore/src/directoryservice/redb.rs +++ b/tvix/castore/src/directoryservice/redb.rs @@ -5,15 +5,15 @@ use std::{path::PathBuf, sync::Arc}; use tonic::async_trait; use tracing::{instrument, warn}; +use super::{ + traverse_directory, Directory, DirectoryGraph, DirectoryPutter, DirectoryService, + LeavesToRootValidator, +}; use crate::{ composition::{CompositionContext, ServiceBuilder}, digests, proto, B3Digest, Error, }; -use super::{ - traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, -}; - const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> = TableDefinition::new("directory"); @@ -69,7 +69,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { #[async_trait] impl DirectoryService for RedbDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let db = self.db.clone(); // Retrieves the protobuf-encoded Directory for the corresponding digest. @@ -107,13 +107,10 @@ impl DirectoryService for RedbDirectoryService { let directory = match proto::Directory::decode(&*directory_data.value()) { Ok(dir) => { // The returned Directory must be valid. - if let Err(e) = dir.validate() { + dir.try_into().map_err(|e| { warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - dir + Error::StorageError("Directory failed validation".to_string()) + })? } Err(e) => { warn!(err=%e, "failed to parse Directory"); @@ -125,26 +122,21 @@ impl DirectoryService for RedbDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); move || { let digest = directory.digest(); - // Validate the directory. - if let Err(e) = directory.validate() { - warn!(err=%e, "Directory failed validation"); - return Err(Error::StorageError( - "Directory failed validation".to_string(), - )); - } - // Store the directory in the table. let txn = db.begin_write()?; { let mut table = txn.open_table(DIRECTORY_TABLE)?; let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } txn.commit()?; @@ -158,7 +150,7 @@ impl DirectoryService for RedbDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single // redb transaction to avoid constantly closing and opening new transactions for the // database. @@ -185,7 +177,7 @@ pub struct RedbDirectoryPutter { #[async_trait] impl DirectoryPutter for RedbDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -228,7 +220,10 @@ impl DirectoryPutter for RedbDirectoryPutter { for directory in directories { let digest_as_array: [u8; digests::B3_LEN] = directory.digest().into(); - table.insert(digest_as_array, directory.encode_to_vec())?; + table.insert( + digest_as_array, + proto::Directory::from(directory).encode_to_vec(), + )?; } } diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs index dc54e3d11d18..b4daaee61b22 100644 --- a/tvix/castore/src/directoryservice/simple_putter.rs +++ b/tvix/castore/src/directoryservice/simple_putter.rs @@ -1,7 +1,6 @@ use super::DirectoryPutter; use super::DirectoryService; -use super::{DirectoryGraph, LeavesToRootValidator}; -use crate::proto; +use super::{Directory, DirectoryGraph, LeavesToRootValidator}; use crate::B3Digest; use crate::Error; use tonic::async_trait; @@ -29,7 +28,7 @@ impl<DS: DirectoryService> SimplePutter<DS> { #[async_trait] impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index 5766dec1a5c2..4f3a860d14e4 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -1,5 +1,3 @@ -use crate::proto::Directory; -use crate::{proto, B3Digest, Error}; use futures::stream::BoxStream; use prost::Message; use std::ops::Deref; @@ -9,8 +7,9 @@ use tonic::async_trait; use tracing::{instrument, warn}; use super::utils::traverse_directory; -use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; +use super::{Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator}; use crate::composition::{CompositionContext, ServiceBuilder}; +use crate::{proto, B3Digest, Error}; #[derive(Clone)] pub struct SledDirectoryService { @@ -44,7 +43,7 @@ impl SledDirectoryService { #[async_trait] impl DirectoryService for SledDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] - async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> { let resp = tokio::task::spawn_blocking({ let db = self.db.clone(); let digest = digest.clone(); @@ -61,7 +60,7 @@ impl DirectoryService for SledDirectoryService { None => Ok(None), // The directory was found, try to parse the data as Directory message - Some(data) => match Directory::decode(&*data) { + Some(data) => match proto::Directory::decode(&*data) { Ok(directory) => { // Validate the retrieved Directory indeed has the // digest we expect it to have, to detect corruptions. @@ -73,14 +72,10 @@ impl DirectoryService for SledDirectoryService { ))); } - // Validate the Directory itself is valid. - if let Err(e) = directory.validate() { - warn!("directory failed validation: {}", e.to_string()); - return Err(Error::StorageError(format!( - "directory {} failed validation: {}", - actual_digest, e, - ))); - } + let directory = directory.try_into().map_err(|e| { + warn!("failed to retrieve directory: {}", e); + Error::StorageError(format!("failed to retrieve directory: {}", e)) + })?; Ok(Some(directory)) } @@ -93,22 +88,18 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] - async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { + async fn put(&self, directory: Directory) -> Result<B3Digest, Error> { tokio::task::spawn_blocking({ let db = self.db.clone(); move || { let digest = directory.digest(); - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } // store it - db.insert(digest.as_slice(), directory.encode_to_vec()) - .map_err(|e| Error::StorageError(e.to_string()))?; + db.insert( + digest.as_slice(), + proto::Directory::from(directory).encode_to_vec(), + ) + .map_err(|e| Error::StorageError(e.to_string()))?; Ok(digest) } @@ -120,7 +111,7 @@ impl DirectoryService for SledDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> BoxStream<'static, Result<proto::Directory, Error>> { + ) -> BoxStream<'static, Result<Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } @@ -215,7 +206,7 @@ pub struct SledDirectoryPutter { #[async_trait] impl DirectoryPutter for SledDirectoryPutter { #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] - async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + async fn put(&mut self, directory: Directory) -> Result<(), Error> { match self.directory_validator { None => return Err(Error::StorageError("already closed".to_string())), Some(ref mut validator) => { @@ -252,7 +243,10 @@ impl DirectoryPutter for SledDirectoryPutter { let mut batch = sled::Batch::default(); for directory in directories { - batch.insert(directory.digest().as_slice(), directory.encode_to_vec()); + batch.insert( + directory.digest().as_slice(), + proto::Directory::from(directory).encode_to_vec(), + ); } tree.apply_batch(batch).map_err(|e| { diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index b698f70ea469..3923e66dc2c2 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -8,10 +8,8 @@ use rstest_reuse::{self, *}; use super::DirectoryService; use crate::directoryservice; -use crate::{ - fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D}, - proto::{self, Directory}, -}; +use crate::directoryservice::{Directory, DirectoryNode, Node}; +use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D}; mod utils; use self::utils::make_grpc_directory_service_client; @@ -41,10 +39,10 @@ async fn test_non_exist(directory_service: impl DirectoryService) { // recursive get assert_eq!( - Vec::<Result<proto::Directory, crate::Error>>::new(), + Vec::<Result<Directory, crate::Error>>::new(), directory_service .get_recursive(&DIRECTORY_A.digest()) - .collect::<Vec<Result<proto::Directory, crate::Error>>>() + .collect::<Vec<Result<Directory, crate::Error>>>() .await ); } @@ -212,59 +210,26 @@ async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService } } -/// Try uploading a Directory failing its internal validation, ensure it gets -/// rejected. -#[apply(directory_services)] -#[tokio::test] -async fn upload_reject_failing_validation(directory_service: impl DirectoryService) { - let broken_directory = Directory { - symlinks: vec![proto::SymlinkNode { - name: "".into(), // wrong! - target: "doesntmatter".into(), - }], - ..Default::default() - }; - assert!(broken_directory.validate().is_err()); - - // Try to upload via single upload. - assert!( - directory_service - .put(broken_directory.clone()) - .await - .is_err(), - "single upload must fail" - ); - - // Try to upload via put_multiple. We're a bit more permissive here, the - // intermediate .put() might succeed, due to client-side bursting (in the - // case of gRPC), but then the close MUST fail. - let mut handle = directory_service.put_multiple_start(); - if handle.put(broken_directory).await.is_ok() { - assert!( - handle.close().await.is_err(), - "when succeeding put, close must fail" - ) - } -} - /// Try uploading a Directory that refers to a previously-uploaded directory. /// Both pass their isolated validation, but the size field in the parent is wrong. /// This should be rejected. #[apply(directory_services)] #[tokio::test] async fn upload_reject_wrong_size(directory_service: impl DirectoryService) { - let wrong_parent_directory = Directory { - directories: vec![proto::DirectoryNode { - name: "foo".into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size() + 42, // wrong! - }], - ..Default::default() + let wrong_parent_directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory( + DirectoryNode::new( + "foo".into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size() + 42, // wrong! + ) + .unwrap(), + )) + .unwrap(); + dir }; - // Make sure isolated validation itself is ok - assert!(wrong_parent_directory.validate().is_ok()); - // Now upload both. Ensure it either fails during the second put, or during // the close. let mut handle = directory_service.put_multiple_start(); diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs index 17a51ae2bbff..3e6dc73a4d6b 100644 --- a/tvix/castore/src/directoryservice/traverse.rs +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -1,8 +1,5 @@ -use super::DirectoryService; -use crate::{ - proto::{node::Node, NamedNode}, - B3Digest, Error, Path, -}; +use super::{DirectoryService, NamedNode, Node}; +use crate::{Error, Path}; use tracing::{instrument, warn}; /// This descends from a (root) node to the given (sub)path, returning the Node @@ -25,34 +22,31 @@ where return Ok(None); } Node::Directory(directory_node) => { - let digest: B3Digest = directory_node - .digest - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; - // fetch the linked node from the directory_service. - let directory = - directory_service - .as_ref() - .get(&digest) - .await? - .ok_or_else(|| { - // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! - warn!("directory {} does not exist", digest); - - Error::StorageError(format!("directory {} does not exist", digest)) - })?; + let directory = directory_service + .as_ref() + .get(&directory_node.digest) + .await? + .ok_or_else(|| { + // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! + warn!("directory {} does not exist", directory_node.digest); + + Error::StorageError(format!( + "directory {} does not exist", + directory_node.digest + )) + })?; // look for the component in the [Directory]. // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we // could stop as soon as e.name is larger than the search string. if let Some(child_node) = directory.nodes().find(|n| n.get_name() == component) { // child node found, update prev_node to that and continue. - parent_node = child_node; + parent_node = child_node.clone(); } else { // child node not found means there's no such element inside the directory. return Ok(None); - } + }; } } } @@ -65,6 +59,7 @@ where mod tests { use crate::{ directoryservice, + directoryservice::{DirectoryNode, Node}, fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, PathBuf, }; @@ -88,21 +83,21 @@ mod tests { handle.close().await.expect("must upload"); // construct the node for DIRECTORY_COMPLICATED - let node_directory_complicated = - crate::proto::node::Node::Directory(crate::proto::DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }); + let node_directory_complicated = Node::Directory( + DirectoryNode::new( + "doesntmatter".into(), + DIRECTORY_COMPLICATED.digest(), + DIRECTORY_COMPLICATED.size(), + ) + .unwrap(), + ); // construct the node for DIRECTORY_COMPLICATED - let node_directory_with_keep = crate::proto::node::Node::Directory( - DIRECTORY_COMPLICATED.directories.first().unwrap().clone(), - ); + let node_directory_with_keep = + Node::Directory(DIRECTORY_COMPLICATED.directories().next().unwrap().clone()); // construct the node for the .keep file - let node_file_keep = - crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone()); + let node_file_keep = Node::File(DIRECTORY_WITH_KEEP.files().next().unwrap().clone()); // traversal to an empty subpath should return the root node. { diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index 726734f55eec..352362811a97 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -1,5 +1,5 @@ +use super::Directory; use super::DirectoryService; -use crate::proto; use crate::B3Digest; use crate::Error; use async_stream::try_stream; @@ -8,14 +8,14 @@ use std::collections::{HashSet, VecDeque}; use tracing::instrument; use tracing::warn; -/// Traverses a [proto::Directory] from the root to the children. +/// Traverses a [Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. #[instrument(skip(directory_service))] pub fn traverse_directory<'a, DS: DirectoryService + 'static>( directory_service: DS, root_directory_digest: &B3Digest, -) -> BoxStream<'a, Result<proto::Directory, Error>> { +) -> BoxStream<'a, Result<Directory, Error>> { // The list of all directories that still need to be traversed. The next // element is picked from the front, new elements are enqueued at the // back. @@ -50,16 +50,6 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( Some(dir) => dir, }; - - // validate, we don't want to send invalid directories. - current_directory.validate().map_err(|e| { - warn!("directory failed validation: {}", e.to_string()); - Error::StorageError(format!( - "invalid directory: {}", - current_directory_digest - )) - })?; - // We're about to send this directory, so let's avoid sending it again if a // descendant has it. sent_directory_digests.insert(current_directory_digest); @@ -67,9 +57,9 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>( // enqueue all child directory digests to the work queue, as // long as they're not part of the worklist or already sent. // This panics if the digest looks invalid, it's supposed to be checked first. - for child_directory_node in ¤t_directory.directories { + for child_directory_node in current_directory.directories() { // TODO: propagate error - let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); + let child_digest: B3Digest = child_directory_node.digest.clone(); if worklist_directory_digests.contains(&child_digest) || sent_directory_digests.contains(&child_digest) diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs index 5bbcd7b04ef1..083f43036845 100644 --- a/tvix/castore/src/errors.rs +++ b/tvix/castore/src/errors.rs @@ -1,3 +1,4 @@ +use bstr::ByteSlice; use thiserror::Error; use tokio::task::JoinError; use tonic::Status; @@ -12,6 +13,46 @@ pub enum Error { StorageError(String), } +/// Errors that can occur during the validation of [Directory] messages. +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum ValidateDirectoryError { + /// Elements are not in sorted order + #[error("{:?} is not sorted", .0.as_bstr())] + WrongSorting(Vec<u8>), + /// Multiple elements with the same name encountered + #[error("{:?} is a duplicate name", .0.as_bstr())] + DuplicateName(Vec<u8>), + /// Invalid node + #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())] + InvalidNode(Vec<u8>, ValidateNodeError), + #[error("Total size exceeds u32::MAX")] + SizeOverflow, +} + +/// Errors that occur during Node validation +#[derive(Debug, thiserror::Error, PartialEq)] +pub enum ValidateNodeError { + #[error("No node set")] + NoNodeSet, + /// Invalid digest length encountered + #[error("invalid digest length: {0}")] + InvalidDigestLen(usize), + /// Invalid name encountered + #[error("Invalid name: {}", .0.as_bstr())] + InvalidName(bytes::Bytes), + /// Invalid symlink target + #[error("Invalid symlink target: {}", .0.as_bstr())] + InvalidSymlinkTarget(bytes::Bytes), +} + +impl From<crate::digests::Error> for ValidateNodeError { + fn from(e: crate::digests::Error) -> Self { + match e { + crate::digests::Error::InvalidDigestLen(n) => ValidateNodeError::InvalidDigestLen(n), + } + } +} + impl From<JoinError> for Error { fn from(value: JoinError) -> Self { Error::StorageError(value.to_string()) diff --git a/tvix/castore/src/fixtures.rs b/tvix/castore/src/fixtures.rs index 3ebda64a818a..0e423d348522 100644 --- a/tvix/castore/src/fixtures.rs +++ b/tvix/castore/src/fixtures.rs @@ -1,5 +1,5 @@ use crate::{ - proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode}, + directoryservice::{Directory, DirectoryNode, FileNode, Node, SymlinkNode}, B3Digest, }; use lazy_static::lazy_static; @@ -34,70 +34,72 @@ lazy_static! { pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into(); // Directories - pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory { - directories: vec![], - files: vec![FileNode { - name: b".keep".to_vec().into(), - digest: EMPTY_BLOB_DIGEST.clone().into(), - size: 0, - executable: false, - }], - symlinks: vec![], + pub static ref DIRECTORY_WITH_KEEP: Directory = { + let mut dir = Directory::new(); + dir.add(Node::File(FileNode::new( + b".keep".to_vec().into(), + EMPTY_BLOB_DIGEST.clone(), + 0, + false + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory { - directories: vec![DirectoryNode { - name: b"keep".to_vec().into(), - digest: DIRECTORY_WITH_KEEP.digest().into(), - size: DIRECTORY_WITH_KEEP.size(), - }], - files: vec![FileNode { - name: b".keep".to_vec().into(), - digest: EMPTY_BLOB_DIGEST.clone().into(), - size: 0, - executable: false, - }], - symlinks: vec![SymlinkNode { - name: b"aa".to_vec().into(), - target: b"/nix/store/somewhereelse".to_vec().into(), - }], + pub static ref DIRECTORY_COMPLICATED: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"keep".to_vec().into(), + DIRECTORY_WITH_KEEP.digest(), + DIRECTORY_WITH_KEEP.size() + ).unwrap())).unwrap(); + dir.add(Node::File(FileNode::new( + b".keep".to_vec().into(), + EMPTY_BLOB_DIGEST.clone(), + 0, + false + ).unwrap())).unwrap(); + dir.add(Node::Symlink(SymlinkNode::new( + b"aa".to_vec().into(), + b"/nix/store/somewhereelse".to_vec().into() + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_A: Directory = Directory::default(); - pub static ref DIRECTORY_B: Directory = Directory { - directories: vec![DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - }], - ..Default::default() + pub static ref DIRECTORY_A: Directory = Directory::new(); + pub static ref DIRECTORY_B: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"a".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_C: Directory = Directory { - directories: vec![ - DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - }, - DirectoryNode { - name: b"a'".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - } - ], - ..Default::default() + pub static ref DIRECTORY_C: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"a".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir.add(Node::Directory(DirectoryNode::new( + b"a'".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir }; - pub static ref DIRECTORY_D: proto::Directory = proto::Directory { - directories: vec![ - DirectoryNode { - name: b"a".to_vec().into(), - digest: DIRECTORY_A.digest().into(), - size: DIRECTORY_A.size(), - }, - DirectoryNode { - name: b"b'".to_vec().into(), - digest: DIRECTORY_B.digest().into(), - size: DIRECTORY_B.size(), - } - ], - ..Default::default() + pub static ref DIRECTORY_D: Directory = { + let mut dir = Directory::new(); + dir.add(Node::Directory(DirectoryNode::new( + b"a".to_vec().into(), + DIRECTORY_A.digest(), + DIRECTORY_A.size(), + ).unwrap())).unwrap(); + dir.add(Node::Directory(DirectoryNode::new( + b"b".to_vec().into(), + DIRECTORY_B.digest(), + DIRECTORY_B.size(), + ).unwrap())).unwrap(); + dir + }; } diff --git a/tvix/castore/src/fs/fuse/tests.rs b/tvix/castore/src/fs/fuse/tests.rs index bcebcf4a7292..726beb5858c5 100644 --- a/tvix/castore/src/fs/fuse/tests.rs +++ b/tvix/castore/src/fs/fuse/tests.rs @@ -13,11 +13,11 @@ use tokio_stream::{wrappers::ReadDirStream, StreamExt}; use super::FuseDaemon; use crate::fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST}; -use crate::proto as castorepb; -use crate::proto::node::Node; use crate::{ blobservice::{BlobService, MemoryBlobService}, - directoryservice::{DirectoryService, MemoryDirectoryService}, + directoryservice::{ + DirectoryNode, DirectoryService, FileNode, MemoryDirectoryService, Node, SymlinkNode, + }, fixtures, }; @@ -70,12 +70,15 @@ async fn populate_blob_a( root_nodes.insert( BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), - size: fixtures::BLOB_A.len() as u64, - executable: false, - }), + Node::File( + FileNode::new( + BLOB_A_NAME.into(), + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u64, + false, + ) + .unwrap(), + ), ); } @@ -91,12 +94,15 @@ async fn populate_blob_b( root_nodes.insert( BLOB_B_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_B_NAME.into(), - digest: fixtures::BLOB_B_DIGEST.clone().into(), - size: fixtures::BLOB_B.len() as u64, - executable: false, - }), + Node::File( + FileNode::new( + BLOB_B_NAME.into(), + fixtures::BLOB_B_DIGEST.clone(), + fixtures::BLOB_B.len() as u64, + false, + ) + .unwrap(), + ), ); } @@ -116,22 +122,22 @@ async fn populate_blob_helloworld( root_nodes.insert( HELLOWORLD_BLOB_NAME.into(), - Node::File(castorepb::FileNode { - name: HELLOWORLD_BLOB_NAME.into(), - digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), - size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, - executable: true, - }), + Node::File( + FileNode::new( + HELLOWORLD_BLOB_NAME.into(), + fixtures::HELLOWORLD_BLOB_DIGEST.clone(), + fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, + true, + ) + .unwrap(), + ), ); } async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( SYMLINK_NAME.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME.into(), - target: BLOB_A_NAME.into(), - }), + Node::Symlink(SymlinkNode::new(SYMLINK_NAME.into(), BLOB_A_NAME.into()).unwrap()), ); } @@ -140,10 +146,9 @@ async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( SYMLINK_NAME2.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME2.into(), - target: "/nix/store/somewhereelse".into(), - }), + Node::Symlink( + SymlinkNode::new(SYMLINK_NAME2.into(), "/nix/store/somewhereelse".into()).unwrap(), + ), ); } @@ -167,11 +172,14 @@ async fn populate_directory_with_keep( root_nodes.insert( DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), - size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + Node::Directory( + DirectoryNode::new( + DIRECTORY_WITH_KEEP_NAME.into(), + fixtures::DIRECTORY_WITH_KEEP.digest(), + fixtures::DIRECTORY_WITH_KEEP.size(), + ) + .unwrap(), + ), ); } @@ -180,11 +188,14 @@ async fn populate_directory_with_keep( async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), - size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + Node::Directory( + DirectoryNode::new( + DIRECTORY_WITH_KEEP_NAME.into(), + fixtures::DIRECTORY_WITH_KEEP.digest(), + fixtures::DIRECTORY_WITH_KEEP.size(), + ) + .unwrap(), + ), ); } @@ -192,12 +203,15 @@ async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Byte async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), - size: fixtures::BLOB_A.len() as u64, - executable: false, - }), + Node::File( + FileNode::new( + BLOB_A_NAME.into(), + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u64, + false, + ) + .unwrap(), + ), ); } @@ -227,11 +241,14 @@ async fn populate_directory_complicated( root_nodes.insert( DIRECTORY_COMPLICATED_NAME.into(), - Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_COMPLICATED_NAME.into(), - digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), - size: fixtures::DIRECTORY_COMPLICATED.size(), - }), + Node::Directory( + DirectoryNode::new( + DIRECTORY_COMPLICATED_NAME.into(), + fixtures::DIRECTORY_COMPLICATED.digest(), + fixtures::DIRECTORY_COMPLICATED.size(), + ) + .unwrap(), + ), ); } diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs index bdd459543470..379d4ab87318 100644 --- a/tvix/castore/src/fs/inodes.rs +++ b/tvix/castore/src/fs/inodes.rs @@ -4,7 +4,7 @@ use std::time::Duration; use bytes::Bytes; -use crate::proto as castorepb; +use crate::directoryservice::{NamedNode, Node}; use crate::B3Digest; #[derive(Clone, Debug)] @@ -20,27 +20,24 @@ pub enum InodeData { /// lookup and did fetch the data. #[derive(Clone, Debug)] pub enum DirectoryInodeData { - Sparse(B3Digest, u64), // digest, size - Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)] + Sparse(B3Digest, u64), // digest, size + Populated(B3Digest, Vec<(u64, Node)>), // [(child_inode, node)] } impl InodeData { /// Constructs a new InodeData by consuming a [Node]. /// It splits off the orginal name, so it can be used later. - pub fn from_node(node: castorepb::node::Node) -> (Self, Bytes) { + pub fn from_node(node: &Node) -> (Self, Bytes) { match node { - castorepb::node::Node::Directory(n) => ( - Self::Directory(DirectoryInodeData::Sparse( - n.digest.try_into().unwrap(), - n.size, - )), - n.name, + Node::Directory(n) => ( + Self::Directory(DirectoryInodeData::Sparse(n.digest().clone(), n.size())), + n.get_name().clone(), ), - castorepb::node::Node::File(n) => ( - Self::Regular(n.digest.try_into().unwrap(), n.size, n.executable), - n.name, + Node::File(n) => ( + Self::Regular(n.digest().clone(), n.size(), n.executable()), + n.get_name().clone(), ), - castorepb::node::Node::Symlink(n) => (Self::Symlink(n.target), n.name), + Node::Symlink(n) => (Self::Symlink(n.target().clone()), n.get_name().clone()), } } diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index b565ed60ac42..4a6ca88d73fd 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -15,11 +15,9 @@ use self::{ inode_tracker::InodeTracker, inodes::{DirectoryInodeData, InodeData}, }; -use crate::proto as castorepb; use crate::{ blobservice::{BlobReader, BlobService}, - directoryservice::DirectoryService, - proto::{node::Node, NamedNode}, + directoryservice::{DirectoryService, NamedNode, Node}, B3Digest, }; use bstr::ByteVec; @@ -198,13 +196,13 @@ where let children = { let mut inode_tracker = self.inode_tracker.write(); - let children: Vec<(u64, castorepb::node::Node)> = directory + let children: Vec<(u64, Node)> = directory .nodes() .map(|child_node| { - let (inode_data, _) = InodeData::from_node(child_node.clone()); + let (inode_data, _) = InodeData::from_node(child_node); let child_ino = inode_tracker.put(inode_data); - (child_ino, child_node) + (child_ino, child_node.clone()) }) .collect(); @@ -287,7 +285,7 @@ where // insert the (sparse) inode data and register in // self.root_nodes. - let (inode_data, name) = InodeData::from_node(root_node); + let (inode_data, name) = InodeData::from_node(&root_node); let ino = inode_tracker.put(inode_data.clone()); root_nodes.insert(name, ino); @@ -468,7 +466,7 @@ where io::Error::from_raw_os_error(libc::EIO) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let (inode_data, name) = InodeData::from_node(&root_node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -498,7 +496,7 @@ where Span::current().record("directory.digest", parent_digest.to_string()); for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + let (inode_data, name) = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { @@ -555,7 +553,7 @@ where io::Error::from_raw_os_error(libc::EPERM) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let (inode_data, name) = InodeData::from_node(&root_node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -588,7 +586,7 @@ where Span::current().record("directory.digest", parent_digest.to_string()); for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + let (inode_data, name) = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry( diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs index 6609e049a1fc..6d78b243d064 100644 --- a/tvix/castore/src/fs/root_nodes.rs +++ b/tvix/castore/src/fs/root_nodes.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use crate::{proto::node::Node, Error}; +use crate::{directoryservice::Node, Error}; use bytes::Bytes; use futures::stream::BoxStream; use tonic::async_trait; diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index cd5b1290e031..930f33f68b46 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -11,9 +11,8 @@ use tokio_tar::Archive; use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; -use crate::directoryservice::DirectoryService; +use crate::directoryservice::{DirectoryService, Node}; use crate::import::{ingest_entries, IngestionEntry, IngestionError}; -use crate::proto::node::Node; use super::blobs::{self, ConcurrentBlobUploader}; diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index dc7821b8101e..f156c8a73527 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -15,8 +15,7 @@ use walkdir::DirEntry; use walkdir::WalkDir; use crate::blobservice::BlobService; -use crate::directoryservice::DirectoryService; -use crate::proto::node::Node; +use crate::directoryservice::{DirectoryService, Node}; use crate::B3Digest; use super::ingest_entries; diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index a9ac0be6b064..03e2b6c7db41 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -4,14 +4,14 @@ //! Specific implementations, such as ingesting from the filesystem, live in //! child modules. +use crate::directoryservice::Directory; +use crate::directoryservice::DirectoryNode; use crate::directoryservice::DirectoryPutter; use crate::directoryservice::DirectoryService; +use crate::directoryservice::FileNode; +use crate::directoryservice::Node; +use crate::directoryservice::SymlinkNode; use crate::path::{Path, PathBuf}; -use crate::proto::node::Node; -use crate::proto::Directory; -use crate::proto::DirectoryNode; -use crate::proto::FileNode; -use crate::proto::SymlinkNode; use crate::B3Digest; use futures::{Stream, StreamExt}; use tracing::Level; @@ -98,27 +98,36 @@ where IngestionError::UploadDirectoryError(entry.path().to_owned(), e) })?; - Node::Directory(DirectoryNode { - name, - digest: directory_digest.into(), - size: directory_size, - }) + Node::Directory( + DirectoryNode::new(name, directory_digest, directory_size).map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?, + ) } - IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode { - name, - target: target.to_owned().into(), - }), + IngestionEntry::Symlink { ref target, .. } => Node::Symlink( + SymlinkNode::new(name, target.to_owned().into()).map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?, + ), IngestionEntry::Regular { size, executable, digest, .. - } => Node::File(FileNode { - name, - digest: digest.to_owned().into(), - size: *size, - executable: *executable, - }), + } => Node::File( + FileNode::new(name, digest.clone(), *size, *executable).map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?, + ), }; let parent = entry @@ -130,7 +139,16 @@ where break node; } else { // record node in parent directory, creating a new [Directory] if not there yet. - directories.entry(parent.to_owned()).or_default().add(node); + directories + .entry(parent.to_owned()) + .or_default() + .add(node) + .map_err(|e| { + IngestionError::UploadDirectoryError( + entry.path().to_owned(), + crate::Error::StorageError(e.to_string()), + ) + })?; } }; @@ -156,14 +174,7 @@ where #[cfg(debug_assertions)] { if let Node::Directory(directory_node) = &root_node { - debug_assert_eq!( - root_directory_digest, - directory_node - .digest - .to_vec() - .try_into() - .expect("invalid digest len") - ) + debug_assert_eq!(&root_directory_digest, directory_node.digest()) } else { unreachable!("Tvix bug: directory putter initialized but no root directory node"); } @@ -208,9 +219,8 @@ impl IngestionEntry { mod test { use rstest::rstest; + use crate::directoryservice::{Directory, DirectoryNode, FileNode, Node, SymlinkNode}; use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST}; - use crate::proto::node::Node; - use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode}; use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST}; use super::ingest_entries; @@ -223,18 +233,18 @@ mod test { executable: true, digest: DUMMY_DIGEST.clone(), }], - Node::File(FileNode { name: "foo".into(), digest: DUMMY_DIGEST.clone().into(), size: 42, executable: true } - ))] + Node::File(FileNode::new("foo".into(), DUMMY_DIGEST.clone(), 42, true).unwrap()) + )] #[case::single_symlink(vec![IngestionEntry::Symlink { path: "foo".parse().unwrap(), target: b"blub".into(), }], - Node::Symlink(SymlinkNode { name: "foo".into(), target: "blub".into()}) + Node::Symlink(SymlinkNode::new("foo".into(), "blub".into()).unwrap()) )] #[case::single_dir(vec![IngestionEntry::Dir { path: "foo".parse().unwrap(), }], - Node::Directory(DirectoryNode { name: "foo".into(), digest: Directory::default().digest().into(), size: Directory::default().size()}) + Node::Directory(DirectoryNode::new("foo".into(), Directory::default().digest(), Directory::default().size()).unwrap()) )] #[case::dir_with_keep(vec![ IngestionEntry::Regular { @@ -247,7 +257,7 @@ mod test { path: "foo".parse().unwrap(), }, ], - Node::Directory(DirectoryNode { name: "foo".into(), digest: DIRECTORY_WITH_KEEP.digest().into(), size: DIRECTORY_WITH_KEEP.size() }) + Node::Directory(DirectoryNode::new("foo".into(), DIRECTORY_WITH_KEEP.digest(), DIRECTORY_WITH_KEEP.size()).unwrap()) )] /// This is intentionally a bit unsorted, though it still satisfies all /// requirements we have on the order of elements in the stream. @@ -275,7 +285,7 @@ mod test { path: "blub".parse().unwrap(), }, ], - Node::Directory(DirectoryNode { name: "blub".into(), digest: DIRECTORY_COMPLICATED.digest().into(), size:DIRECTORY_COMPLICATED.size() }) + Node::Directory(DirectoryNode::new("blub".into(), DIRECTORY_COMPLICATED.digest(), DIRECTORY_COMPLICATED.size()).unwrap()) )] #[tokio::test] async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) { diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index 4fca9801c97b..91302e90996c 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -18,7 +18,7 @@ pub mod proto; pub mod tonic; pub use digests::{B3Digest, B3_LEN}; -pub use errors::Error; +pub use errors::{Error, ValidateDirectoryError, ValidateNodeError}; pub use hashing_reader::{B3HashingReader, HashingReader}; #[cfg(test)] diff --git a/tvix/castore/src/path.rs b/tvix/castore/src/path.rs index fcc2bd01fbd6..77cabeccdb8c 100644 --- a/tvix/castore/src/path.rs +++ b/tvix/castore/src/path.rs @@ -10,7 +10,7 @@ use std::{ use bstr::ByteSlice; -use crate::proto::validate_node_name; +use crate::directoryservice::Directory; /// Represents a Path in the castore model. /// These are always relative, and platform-independent, which distinguishes @@ -38,7 +38,7 @@ impl Path { if !bytes.is_empty() { // Ensure all components are valid castore node names. for component in bytes.split_str(b"/") { - validate_node_name(component).ok()?; + Directory::validate_node_name(component).ok()?; } } @@ -211,7 +211,7 @@ impl PathBuf { /// Adjoins `name` to self. pub fn try_push(&mut self, name: &[u8]) -> Result<(), std::io::Error> { - validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?; + Directory::validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?; if !self.inner.is_empty() { self.inner.push(b'/'); diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs index ce1d2bcd244a..e65145509184 100644 --- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs @@ -1,7 +1,5 @@ -use crate::directoryservice::DirectoryGraph; -use crate::directoryservice::LeavesToRootValidator; -use crate::proto; -use crate::{directoryservice::DirectoryService, B3Digest}; +use crate::directoryservice::{DirectoryGraph, DirectoryService, LeavesToRootValidator}; +use crate::{proto, B3Digest, ValidateDirectoryError}; use futures::stream::BoxStream; use futures::TryStreamExt; use std::ops::Deref; @@ -58,13 +56,16 @@ where Status::not_found(format!("directory {} not found", digest)) })?; - Box::pin(once(Ok(directory))) + Box::pin(once(Ok(directory.into()))) } else { // If recursive was requested, traverse via get_recursive. Box::pin( - self.directory_service.get_recursive(&digest).map_err(|e| { - tonic::Status::new(tonic::Code::Internal, e.to_string()) - }), + self.directory_service + .get_recursive(&digest) + .map_ok(proto::Directory::from) + .map_err(|e| { + tonic::Status::new(tonic::Code::Internal, e.to_string()) + }), ) } })) @@ -83,7 +84,9 @@ where let mut validator = DirectoryGraph::<LeavesToRootValidator>::default(); while let Some(directory) = req_inner.message().await? { validator - .add(directory) + .add(directory.try_into().map_err(|e: ValidateDirectoryError| { + tonic::Status::new(tonic::Code::Internal, e.to_string()) + })?) .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?; } diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs index a0cec896f753..7e98cd74c591 100644 --- a/tvix/castore/src/proto/mod.rs +++ b/tvix/castore/src/proto/mod.rs @@ -1,7 +1,4 @@ -#![allow(non_snake_case)] -// https://github.com/hyperium/tonic/issues/1056 -use bstr::ByteSlice; -use std::{collections::HashSet, iter::Peekable, str}; +use std::str; use prost::Message; @@ -11,7 +8,8 @@ mod grpc_directoryservice_wrapper; pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; -use crate::{B3Digest, B3_LEN}; +use crate::directoryservice::NamedNode; +use crate::{B3Digest, ValidateDirectoryError, ValidateNodeError}; tonic::include_proto!("tvix.castore.v1"); @@ -24,38 +22,6 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix #[cfg(test)] mod tests; -/// Errors that can occur during the validation of [Directory] messages. -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub enum ValidateDirectoryError { - /// Elements are not in sorted order - #[error("{:?} is not sorted", .0.as_bstr())] - WrongSorting(Vec<u8>), - /// Multiple elements with the same name encountered - #[error("{:?} is a duplicate name", .0.as_bstr())] - DuplicateName(Vec<u8>), - /// Invalid node - #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())] - InvalidNode(Vec<u8>, ValidateNodeError), - #[error("Total size exceeds u32::MAX")] - SizeOverflow, -} - -/// Errors that occur during Node validation -#[derive(Debug, PartialEq, Eq, thiserror::Error)] -pub enum ValidateNodeError { - #[error("No node set")] - NoNodeSet, - /// Invalid digest length encountered - #[error("Invalid Digest length: {0}")] - InvalidDigestLen(usize), - /// Invalid name encountered - #[error("Invalid name: {}", .0.as_bstr())] - InvalidName(Vec<u8>), - /// Invalid symlink target - #[error("Invalid symlink target: {}", .0.as_bstr())] - InvalidSymlinkTarget(Vec<u8>), -} - /// Errors that occur during StatBlobResponse validation #[derive(Debug, PartialEq, Eq, thiserror::Error)] pub enum ValidateStatBlobResponseError { @@ -64,186 +30,6 @@ pub enum ValidateStatBlobResponseError { InvalidDigestLen(usize, usize), } -/// Checks a Node name for validity as an intermediate node. -/// We disallow slashes, null bytes, '.', '..' and the empty string. -pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { - if name.is_empty() - || name == b".." - || name == b"." - || name.contains(&0x00) - || name.contains(&b'/') - { - Err(ValidateNodeError::InvalidName(name.to_owned())) - } else { - Ok(()) - } -} - -/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] -/// and [node::Node], so we can ask all of them for the name easily. -pub trait NamedNode { - fn get_name(&self) -> &[u8]; -} - -impl NamedNode for &FileNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for &DirectoryNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for &SymlinkNode { - fn get_name(&self) -> &[u8] { - &self.name - } -} - -impl NamedNode for node::Node { - fn get_name(&self) -> &[u8] { - match self { - node::Node::File(node_file) => &node_file.name, - node::Node::Directory(node_directory) => &node_directory.name, - node::Node::Symlink(node_symlink) => &node_symlink.name, - } - } -} - -impl Node { - /// Ensures the node has a valid enum kind (is Some), and passes its - // per-enum validation. - // The inner root node is returned for easier consumption. - pub fn validate(&self) -> Result<&node::Node, ValidateNodeError> { - if let Some(node) = self.node.as_ref() { - node.validate()?; - Ok(node) - } else { - Err(ValidateNodeError::NoNodeSet) - } - } -} - -impl node::Node { - /// Returns the node with a new name. - pub fn rename(self, name: bytes::Bytes) -> Self { - match self { - node::Node::Directory(n) => node::Node::Directory(DirectoryNode { name, ..n }), - node::Node::File(n) => node::Node::File(FileNode { name, ..n }), - node::Node::Symlink(n) => node::Node::Symlink(SymlinkNode { name, ..n }), - } - } - - /// Ensures the node has a valid name, and checks the type-specific fields too. - pub fn validate(&self) -> Result<(), ValidateNodeError> { - match self { - // for a directory root node, ensure the digest has the appropriate size. - node::Node::Directory(directory_node) => { - if directory_node.digest.len() != B3_LEN { - Err(ValidateNodeError::InvalidDigestLen( - directory_node.digest.len(), - ))?; - } - validate_node_name(&directory_node.name) - } - // for a file root node, ensure the digest has the appropriate size. - node::Node::File(file_node) => { - if file_node.digest.len() != B3_LEN { - Err(ValidateNodeError::InvalidDigestLen(file_node.digest.len()))?; - } - validate_node_name(&file_node.name) - } - // ensure the symlink target is not empty and doesn't contain null bytes. - node::Node::Symlink(symlink_node) => { - if symlink_node.target.is_empty() || symlink_node.target.contains(&b'\0') { - Err(ValidateNodeError::InvalidSymlinkTarget( - symlink_node.target.to_vec(), - ))?; - } - validate_node_name(&symlink_node.name) - } - } - } -} - -impl PartialOrd for node::Node { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for node::Node { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for FileNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for FileNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for SymlinkNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for SymlinkNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -impl PartialOrd for DirectoryNode { - fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { - Some(self.cmp(other)) - } -} - -impl Ord for DirectoryNode { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.get_name().cmp(other.get_name()) - } -} - -/// Accepts a name, and a mutable reference to the previous name. -/// If the passed name is larger than the previous one, the reference is updated. -/// If it's not, an error is returned. -fn update_if_lt_prev<'n>( - prev_name: &mut &'n [u8], - name: &'n [u8], -) -> Result<(), ValidateDirectoryError> { - if *name < **prev_name { - return Err(ValidateDirectoryError::WrongSorting(name.to_vec())); - } - *prev_name = name; - Ok(()) -} - -/// Inserts the given name into a HashSet if it's not already in there. -/// If it is, an error is returned. -fn insert_once<'n>( - seen_names: &mut HashSet<&'n [u8]>, - name: &'n [u8], -) -> Result<(), ValidateDirectoryError> { - if seen_names.get(name).is_some() { - return Err(ValidateDirectoryError::DuplicateName(name.to_vec())); - } - seen_names.insert(name); - Ok(()) -} - fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) } @@ -280,116 +66,213 @@ impl Directory { .as_bytes() .into() } +} + +/// Accepts a name, and a mutable reference to the previous name. +/// If the passed name is larger than the previous one, the reference is updated. +/// If it's not, an error is returned. +fn update_if_lt_prev<'n>( + prev_name: &mut &'n [u8], + name: &'n [u8], +) -> Result<(), ValidateDirectoryError> { + if *name < **prev_name { + return Err(ValidateDirectoryError::WrongSorting(name.to_vec())); + } + *prev_name = name; + Ok(()) +} - /// validate checks the directory for invalid data, such as: - /// - violations of name restrictions - /// - invalid digest lengths - /// - not properly sorted lists - /// - duplicate names in the three lists - pub fn validate(&self) -> Result<(), ValidateDirectoryError> { - let mut seen_names: HashSet<&[u8]> = HashSet::new(); +impl TryFrom<&node::Node> for crate::directoryservice::Node { + type Error = ValidateNodeError; - let mut last_directory_name: &[u8] = b""; - let mut last_file_name: &[u8] = b""; - let mut last_symlink_name: &[u8] = b""; + fn try_from(node: &node::Node) -> Result<crate::directoryservice::Node, ValidateNodeError> { + Ok(match node { + node::Node::Directory(n) => crate::directoryservice::Node::Directory(n.try_into()?), + node::Node::File(n) => crate::directoryservice::Node::File(n.try_into()?), + node::Node::Symlink(n) => crate::directoryservice::Node::Symlink(n.try_into()?), + }) + } +} - // check directories - for directory_node in &self.directories { - node::Node::Directory(directory_node.clone()) - .validate() - .map_err(|e| { - ValidateDirectoryError::InvalidNode(directory_node.name.to_vec(), e) - })?; +impl TryFrom<&Node> for crate::directoryservice::Node { + type Error = ValidateNodeError; - update_if_lt_prev(&mut last_directory_name, &directory_node.name)?; - insert_once(&mut seen_names, &directory_node.name)?; + fn try_from(node: &Node) -> Result<crate::directoryservice::Node, ValidateNodeError> { + match node { + Node { node: None } => Err(ValidateNodeError::NoNodeSet), + Node { node: Some(node) } => node.try_into(), } + } +} + +impl TryFrom<&DirectoryNode> for crate::directoryservice::DirectoryNode { + type Error = ValidateNodeError; + + fn try_from( + node: &DirectoryNode, + ) -> Result<crate::directoryservice::DirectoryNode, ValidateNodeError> { + crate::directoryservice::DirectoryNode::new( + node.name.clone(), + node.digest.clone().try_into()?, + node.size, + ) + } +} - // check files - for file_node in &self.files { - node::Node::File(file_node.clone()) - .validate() - .map_err(|e| ValidateDirectoryError::InvalidNode(file_node.name.to_vec(), e))?; +impl TryFrom<&SymlinkNode> for crate::directoryservice::SymlinkNode { + type Error = ValidateNodeError; - update_if_lt_prev(&mut last_file_name, &file_node.name)?; - insert_once(&mut seen_names, &file_node.name)?; + fn try_from( + node: &SymlinkNode, + ) -> Result<crate::directoryservice::SymlinkNode, ValidateNodeError> { + crate::directoryservice::SymlinkNode::new(node.name.clone(), node.target.clone()) + } +} + +impl TryFrom<&FileNode> for crate::directoryservice::FileNode { + type Error = ValidateNodeError; + + fn try_from(node: &FileNode) -> Result<crate::directoryservice::FileNode, ValidateNodeError> { + crate::directoryservice::FileNode::new( + node.name.clone(), + node.digest.clone().try_into()?, + node.size, + node.executable, + ) + } +} + +impl TryFrom<Directory> for crate::directoryservice::Directory { + type Error = ValidateDirectoryError; + + fn try_from( + directory: Directory, + ) -> Result<crate::directoryservice::Directory, ValidateDirectoryError> { + (&directory).try_into() + } +} + +impl TryFrom<&Directory> for crate::directoryservice::Directory { + type Error = ValidateDirectoryError; + + fn try_from( + directory: &Directory, + ) -> Result<crate::directoryservice::Directory, ValidateDirectoryError> { + let mut dir = crate::directoryservice::Directory::new(); + let mut last_file_name: &[u8] = b""; + for file in directory.files.iter().map(move |file| { + update_if_lt_prev(&mut last_file_name, &file.name).map(|()| file.clone()) + }) { + let file = file?; + dir.add(crate::directoryservice::Node::File( + (&file) + .try_into() + .map_err(|e| ValidateDirectoryError::InvalidNode(file.name.into(), e))?, + ))?; + } + let mut last_directory_name: &[u8] = b""; + for directory in directory.directories.iter().map(move |directory| { + update_if_lt_prev(&mut last_directory_name, &directory.name).map(|()| directory.clone()) + }) { + let directory = directory?; + dir.add(crate::directoryservice::Node::Directory( + (&directory) + .try_into() + .map_err(|e| ValidateDirectoryError::InvalidNode(directory.name.into(), e))?, + ))?; } + let mut last_symlink_name: &[u8] = b""; + for symlink in directory.symlinks.iter().map(move |symlink| { + update_if_lt_prev(&mut last_symlink_name, &symlink.name).map(|()| symlink.clone()) + }) { + let symlink = symlink?; + dir.add(crate::directoryservice::Node::Symlink( + (&symlink) + .try_into() + .map_err(|e| ValidateDirectoryError::InvalidNode(symlink.name.into(), e))?, + ))?; + } + Ok(dir) + } +} - // check symlinks - for symlink_node in &self.symlinks { - node::Node::Symlink(symlink_node.clone()) - .validate() - .map_err(|e| ValidateDirectoryError::InvalidNode(symlink_node.name.to_vec(), e))?; +impl From<&crate::directoryservice::Node> for node::Node { + fn from(node: &crate::directoryservice::Node) -> node::Node { + match node { + crate::directoryservice::Node::Directory(n) => node::Node::Directory(n.into()), + crate::directoryservice::Node::File(n) => node::Node::File(n.into()), + crate::directoryservice::Node::Symlink(n) => node::Node::Symlink(n.into()), + } + } +} - update_if_lt_prev(&mut last_symlink_name, &symlink_node.name)?; - insert_once(&mut seen_names, &symlink_node.name)?; +impl From<&crate::directoryservice::Node> for Node { + fn from(node: &crate::directoryservice::Node) -> Node { + Node { + node: Some(node.into()), } + } +} - self.size_checked() - .ok_or(ValidateDirectoryError::SizeOverflow)?; +impl From<&crate::directoryservice::DirectoryNode> for DirectoryNode { + fn from(node: &crate::directoryservice::DirectoryNode) -> DirectoryNode { + DirectoryNode { + digest: node.digest().clone().into(), + size: node.size(), + name: node.get_name().clone(), + } + } +} - Ok(()) +impl From<&crate::directoryservice::FileNode> for FileNode { + fn from(node: &crate::directoryservice::FileNode) -> FileNode { + FileNode { + digest: node.digest().clone().into(), + size: node.size(), + name: node.get_name().clone(), + executable: node.executable(), + } } +} - /// Allows iterating over all three nodes ([DirectoryNode], [FileNode], - /// [SymlinkNode]) in an ordered fashion, as long as the individual lists - /// are sorted (which can be checked by the [Directory::validate]). - pub fn nodes(&self) -> DirectoryNodesIterator { - return DirectoryNodesIterator { - i_directories: self.directories.iter().peekable(), - i_files: self.files.iter().peekable(), - i_symlinks: self.symlinks.iter().peekable(), - }; +impl From<&crate::directoryservice::SymlinkNode> for SymlinkNode { + fn from(node: &crate::directoryservice::SymlinkNode) -> SymlinkNode { + SymlinkNode { + name: node.get_name().clone(), + target: node.target().clone(), + } } +} - /// Adds the specified [node::Node] to the [Directory], preserving sorted entries. - /// This assumes the [Directory] to be sorted prior to adding the node. - /// - /// Inserting an element that already exists with the same name in the directory is not - /// supported. - pub fn add(&mut self, node: node::Node) { - debug_assert!( - !self.files.iter().any(|x| x.get_name() == node.get_name()), - "name already exists in files" - ); - debug_assert!( - !self - .directories - .iter() - .any(|x| x.get_name() == node.get_name()), - "name already exists in directories" - ); - debug_assert!( - !self - .symlinks - .iter() - .any(|x| x.get_name() == node.get_name()), - "name already exists in symlinks" - ); +impl From<crate::directoryservice::Directory> for Directory { + fn from(directory: crate::directoryservice::Directory) -> Directory { + (&directory).into() + } +} - match node { - node::Node::File(node) => { - let pos = self - .files - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.files.insert(pos, node); - } - node::Node::Directory(node) => { - let pos = self - .directories - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.directories.insert(pos, node); - } - node::Node::Symlink(node) => { - let pos = self - .symlinks - .binary_search(&node) - .expect_err("Tvix bug: dir entry with name already exists"); - self.symlinks.insert(pos, node); +impl From<&crate::directoryservice::Directory> for Directory { + fn from(directory: &crate::directoryservice::Directory) -> Directory { + let mut directories = vec![]; + let mut files = vec![]; + let mut symlinks = vec![]; + for node in directory.nodes() { + match node { + crate::directoryservice::Node::File(n) => { + files.push(n.into()); + } + crate::directoryservice::Node::Directory(n) => { + directories.push(n.into()); + } + crate::directoryservice::Node::Symlink(n) => { + symlinks.push(n.into()); + } } } + Directory { + directories, + files, + symlinks, + } } } @@ -409,65 +292,3 @@ impl StatBlobResponse { Ok(()) } } - -/// Struct to hold the state of an iterator over all nodes of a Directory. -/// -/// Internally, this keeps peekable Iterators over all three lists of a -/// Directory message. -pub struct DirectoryNodesIterator<'a> { - // directory: &Directory, - i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>, - i_files: Peekable<std::slice::Iter<'a, FileNode>>, - i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>, -} - -/// looks at two elements implementing NamedNode, and returns true if "left -/// is smaller / comes first". -/// -/// Some(_) is preferred over None. -fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool { - match left { - // if left is None, right always wins - None => false, - Some(left_inner) => { - // left is Some. - match right { - // left is Some, right is None - left wins. - None => true, - Some(right_inner) => { - // both are Some - compare the name. - return left_inner.get_name() < right_inner.get_name(); - } - } - } - } -} - -impl Iterator for DirectoryNodesIterator<'_> { - type Item = node::Node; - - // next returns the next node in the Directory. - // we peek at all three internal iterators, and pick the one with the - // smallest name, to ensure lexicographical ordering. - // The individual lists are already known to be sorted. - fn next(&mut self) -> Option<Self::Item> { - if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) { - // i_directories is still in the game, compare with symlinks - if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) { - self.i_directories - .next() - .cloned() - .map(node::Node::Directory) - } else { - self.i_symlinks.next().cloned().map(node::Node::Symlink) - } - } else { - // i_files is still in the game, compare with symlinks - if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) { - self.i_files.next().cloned().map(node::Node::File) - } else { - self.i_symlinks.next().cloned().map(node::Node::Symlink) - } - } - } -} diff --git a/tvix/castore/src/proto/tests/directory.rs b/tvix/castore/src/proto/tests/directory.rs index 81b73a048d52..78b2cf7668e3 100644 --- a/tvix/castore/src/proto/tests/directory.rs +++ b/tvix/castore/src/proto/tests/directory.rs @@ -1,7 +1,5 @@ -use crate::proto::{ - node, Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError, - ValidateNodeError, -}; +use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError}; +use crate::ValidateNodeError; use hex_literal::hex; @@ -149,7 +147,7 @@ fn digest() { #[test] fn validate_empty() { let d = Directory::default(); - assert_eq!(d.validate(), Ok(())); + assert!(crate::directoryservice::Directory::try_from(d).is_ok()); } #[test] @@ -163,7 +161,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"") } @@ -180,7 +178,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b".") } @@ -198,7 +196,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"..") } @@ -214,7 +212,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"\x00") } @@ -230,7 +228,7 @@ fn validate_invalid_names() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { assert_eq!(n, b"foo/bar") } @@ -249,7 +247,7 @@ fn validate_invalid_digest() { }], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => { assert_eq!(n, 2) } @@ -276,7 +274,7 @@ fn validate_sorting() { ], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::WrongSorting(s) => { assert_eq!(s, b"a"); } @@ -301,7 +299,7 @@ fn validate_sorting() { ], ..Default::default() }; - match d.validate().expect_err("must fail") { + match crate::directoryservice::Directory::try_from(d).expect_err("must fail") { ValidateDirectoryError::DuplicateName(s) => { assert_eq!(s, b"a"); } @@ -327,7 +325,7 @@ fn validate_sorting() { ..Default::default() }; - d.validate().expect("validate shouldn't error"); + crate::directoryservice::Directory::try_from(d).expect("validate shouldn't error"); } // [b, c] and [a] are both properly sorted. @@ -352,101 +350,6 @@ fn validate_sorting() { ..Default::default() }; - d.validate().expect("validate shouldn't error"); + crate::directoryservice::Directory::try_from(d).expect("validate shouldn't error"); } } - -#[test] -fn validate_overflow() { - let d = Directory { - directories: vec![DirectoryNode { - name: "foo".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: u64::MAX, - }], - ..Default::default() - }; - - match d.validate().expect_err("must fail") { - ValidateDirectoryError::SizeOverflow => {} - _ => panic!("unexpected error"), - } -} - -#[test] -fn add_nodes_to_directory() { - let mut d = Directory { - ..Default::default() - }; - - d.add(node::Node::Directory(DirectoryNode { - name: "b".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::Directory(DirectoryNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::Directory(DirectoryNode { - name: "z".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - - d.add(node::Node::File(FileNode { - name: "f".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - d.add(node::Node::File(FileNode { - name: "c".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - d.add(node::Node::File(FileNode { - name: "g".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); - - d.add(node::Node::Symlink(SymlinkNode { - name: "t".into(), - target: "a".into(), - })); - d.add(node::Node::Symlink(SymlinkNode { - name: "o".into(), - target: "a".into(), - })); - d.add(node::Node::Symlink(SymlinkNode { - name: "e".into(), - target: "a".into(), - })); - - d.validate().expect("directory should be valid"); -} - -#[test] -#[cfg_attr(not(debug_assertions), ignore)] -#[should_panic = "name already exists in directories"] -fn add_duplicate_node_to_directory_panics() { - let mut d = Directory { - ..Default::default() - }; - - d.add(node::Node::Directory(DirectoryNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - })); - d.add(node::Node::File(FileNode { - name: "a".into(), - digest: DUMMY_DIGEST.to_vec().into(), - size: 1, - executable: true, - })); -} diff --git a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs deleted file mode 100644 index 68f147a33210..000000000000 --- a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs +++ /dev/null @@ -1,78 +0,0 @@ -use crate::proto::Directory; -use crate::proto::DirectoryNode; -use crate::proto::FileNode; -use crate::proto::NamedNode; -use crate::proto::SymlinkNode; - -#[test] -fn iterator() { - let d = Directory { - directories: vec![ - DirectoryNode { - name: "c".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "d".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "h".into(), - ..DirectoryNode::default() - }, - DirectoryNode { - name: "l".into(), - ..DirectoryNode::default() - }, - ], - files: vec![ - FileNode { - name: "b".into(), - ..FileNode::default() - }, - FileNode { - name: "e".into(), - ..FileNode::default() - }, - FileNode { - name: "g".into(), - ..FileNode::default() - }, - FileNode { - name: "j".into(), - ..FileNode::default() - }, - ], - symlinks: vec![ - SymlinkNode { - name: "a".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "f".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "i".into(), - ..SymlinkNode::default() - }, - SymlinkNode { - name: "k".into(), - ..SymlinkNode::default() - }, - ], - }; - - // We keep this strings here and convert to string to make the comparison - // less messy. - let mut node_names: Vec<String> = vec![]; - - for node in d.nodes() { - node_names.push(String::from_utf8(node.get_name().to_vec()).unwrap()); - } - - assert_eq!( - vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"], - node_names - ); -} diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs index 8d903bacb6c5..74334029e84c 100644 --- a/tvix/castore/src/proto/tests/mod.rs +++ b/tvix/castore/src/proto/tests/mod.rs @@ -1,2 +1 @@ mod directory; -mod directory_nodes_iterator; diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs index 72fb06aea877..ba54078653e2 100644 --- a/tvix/castore/src/tests/import.rs +++ b/tvix/castore/src/tests/import.rs @@ -1,5 +1,6 @@ use crate::blobservice::{self, BlobService}; use crate::directoryservice; +use crate::directoryservice::{DirectoryNode, Node, SymlinkNode}; use crate::fixtures::*; use crate::import::fs::ingest_path; use crate::proto; @@ -33,10 +34,9 @@ async fn symlink() { .expect("must succeed"); assert_eq!( - proto::node::Node::Symlink(proto::SymlinkNode { - name: "doesntmatter".into(), - target: "/nix/store/somewhereelse".into(), - }), + Node::Symlink( + SymlinkNode::new("doesntmatter".into(), "/nix/store/somewhereelse".into(),).unwrap() + ), root_node, ) } @@ -65,7 +65,7 @@ async fn single_file() { size: HELLOWORLD_BLOB_CONTENTS.len() as u64, executable: false, }), - root_node, + (&root_node).into(), ); // ensure the blob has been uploaded @@ -95,17 +95,20 @@ async fn complicated() { // ensure root_node matched expectations assert_eq!( - proto::node::Node::Directory(proto::DirectoryNode { - name: tmpdir - .path() - .file_name() - .unwrap() - .as_bytes() - .to_owned() - .into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), + Node::Directory( + DirectoryNode::new( + tmpdir + .path() + .file_name() + .unwrap() + .as_bytes() + .to_owned() + .into(), + DIRECTORY_COMPLICATED.digest().clone(), + DIRECTORY_COMPLICATED.size(), + ) + .unwrap() + ), root_node, ); diff --git a/tvix/glue/src/builtins/derivation.rs b/tvix/glue/src/builtins/derivation.rs index b17b90466965..9074d418422e 100644 --- a/tvix/glue/src/builtins/derivation.rs +++ b/tvix/glue/src/builtins/derivation.rs @@ -179,9 +179,7 @@ pub(crate) mod derivation_builtins { use nix_compat::nixhash::CAHash; use nix_compat::store_path::{build_ca_path, hash_placeholder}; use sha2::Sha256; - use tvix_castore::proto as castorepb; - use tvix_castore::proto::node::Node; - use tvix_castore::proto::FileNode; + use tvix_castore::directoryservice::{FileNode, Node}; use tvix_eval::generators::Gen; use tvix_eval::{NixContext, NixContextElement, NixString}; use tvix_store::proto::{NarInfo, PathInfo}; @@ -579,12 +577,10 @@ pub(crate) mod derivation_builtins { }) .map_err(DerivationError::InvalidDerivation)?; - let root_node = Node::File(FileNode { - name: store_path.to_string().into(), - digest: blob_digest.into(), - size: blob_size, - executable: false, - }); + let root_node = Node::File( + FileNode::new(store_path.to_string().into(), blob_digest, blob_size, false) + .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?, + ); // calculate the nar hash let (nar_size, nar_sha256) = state @@ -604,9 +600,7 @@ pub(crate) mod derivation_builtins { state .path_info_service .put(PathInfo { - node: Some(castorepb::Node { - node: Some(root_node), - }), + node: Some((&root_node).into()), references: reference_paths .iter() .map(|x| bytes::Bytes::copy_from_slice(x.digest())) diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs index 273be08ef7b6..aa0d0d54fc8a 100644 --- a/tvix/glue/src/builtins/import.rs +++ b/tvix/glue/src/builtins/import.rs @@ -2,6 +2,8 @@ use crate::builtins::errors::ImportError; use std::path::Path; +use tvix_castore::directoryservice::FileNode; +use tvix_castore::directoryservice::Node; use tvix_castore::import::ingest_entries; use tvix_eval::{ builtin_macros::builtins, @@ -16,7 +18,7 @@ async fn filtered_ingest( co: GenCo, path: &Path, filter: Option<&Value>, -) -> Result<tvix_castore::proto::node::Node, ErrorKind> { +) -> Result<Node, ErrorKind> { let mut entries: Vec<walkdir::DirEntry> = vec![]; let mut it = walkdir::WalkDir::new(path) .follow_links(false) @@ -114,8 +116,6 @@ mod import_builtins { use nix_compat::store_path::StorePath; use sha2::Digest; use tokio::io::AsyncWriteExt; - use tvix_castore::proto::node::Node; - use tvix_castore::proto::FileNode; use tvix_eval::builtins::coerce_value_to_path; use tvix_eval::generators::Gen; use tvix_eval::{generators::GenCo, ErrorKind, Value}; @@ -214,13 +214,16 @@ mod import_builtins { .tokio_handle .block_on(async { blob_writer.close().await })?; - let root_node = Node::File(FileNode { - // The name gets set further down, while constructing the PathInfo. - name: "".into(), - digest: blob_digest.into(), - size: blob_size, - executable: false, - }); + let root_node = Node::File( + FileNode::new( + // The name gets set further down, while constructing the PathInfo. + "".into(), + blob_digest, + blob_size, + false, + ) + .map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?, + ); let ca_hash = if recursive_ingestion { let (_nar_size, nar_sha256) = state diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs index eb035a5a905c..9cab1adc04a3 100644 --- a/tvix/glue/src/fetchers/mod.rs +++ b/tvix/glue/src/fetchers/mod.rs @@ -12,8 +12,8 @@ use tracing::{instrument, warn, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_castore::{ blobservice::BlobService, - directoryservice::DirectoryService, - proto::{node::Node, FileNode}, + directoryservice::{DirectoryService, FileNode, Node}, + ValidateNodeError, }; use tvix_store::{nar::NarCalculationService, pathinfoservice::PathInfoService, proto::PathInfo}; use url::Url; @@ -331,12 +331,10 @@ where // Construct and return the FileNode describing the downloaded contents. Ok(( - Node::File(FileNode { - name: vec![].into(), - digest: blob_writer.close().await?.into(), - size: blob_size, - executable: false, - }), + Node::File( + FileNode::new(vec![].into(), blob_writer.close().await?, blob_size, false) + .map_err(|e| FetcherError::Io(std::io::Error::other(e.to_string())))?, + ), CAHash::Flat(actual_hash), blob_size, )) @@ -531,12 +529,13 @@ where // Construct and return the FileNode describing the downloaded contents, // make it executable. - let root_node = Node::File(FileNode { - name: vec![].into(), - digest: blob_digest.into(), - size: file_size, - executable: true, - }); + let root_node = Node::File( + FileNode::new(vec![].into(), blob_digest, file_size, true).map_err( + |e: ValidateNodeError| { + FetcherError::Io(std::io::Error::other(e.to_string())) + }, + )?, + ); Ok((root_node, CAHash::Nar(actual_hash), file_size)) } @@ -580,7 +579,7 @@ where // Construct the PathInfo and persist it. let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { node: Some(node) }), + node: Some((&node).into()), references: vec![], narinfo: Some(tvix_store::proto::NarInfo { nar_size, @@ -598,7 +597,14 @@ where .await .map_err(|e| FetcherError::Io(e.into()))?; - Ok((store_path, path_info.node.unwrap().node.unwrap())) + Ok(( + store_path, + (&path_info.node.unwrap().node.unwrap()) + .try_into() + .map_err(|e: ValidateNodeError| { + FetcherError::Io(std::io::Error::other(e.to_string())) + })?, + )) } } diff --git a/tvix/glue/src/tvix_build.rs b/tvix/glue/src/tvix_build.rs index e9eb1725ef3e..0f930bfe5099 100644 --- a/tvix/glue/src/tvix_build.rs +++ b/tvix/glue/src/tvix_build.rs @@ -10,7 +10,7 @@ use tvix_build::proto::{ build_request::{AdditionalFile, BuildConstraints, EnvVar}, BuildRequest, }; -use tvix_castore::proto::{self, node::Node}; +use tvix_castore::directoryservice::Node; /// These are the environment variables that Nix sets in its sandbox for every /// build. @@ -109,10 +109,7 @@ pub(crate) fn derivation_to_build_request( .into_iter() .map(|(key, value)| EnvVar { key, value }) .collect(), - inputs: inputs - .into_iter() - .map(|n| proto::Node { node: Some(n) }) - .collect(), + inputs: inputs.iter().map(Into::into).collect(), inputs_dir: nix_compat::store_path::STORE_DIR[1..].into(), constraints, working_dir: "build".into(), @@ -200,10 +197,8 @@ mod test { build_request::{AdditionalFile, BuildConstraints, EnvVar}, BuildRequest, }; - use tvix_castore::{ - fixtures::DUMMY_DIGEST, - proto::{self, node::Node, DirectoryNode}, - }; + use tvix_castore::directoryservice::{DirectoryNode, Node}; + use tvix_castore::fixtures::DUMMY_DIGEST; use crate::tvix_build::NIX_ENVIRONMENT_VARS; @@ -211,11 +206,14 @@ mod test { use lazy_static::lazy_static; lazy_static! { - static ref INPUT_NODE_FOO: Node = Node::Directory(DirectoryNode { - name: Bytes::from("mp57d33657rf34lzvlbpfa1gjfv5gmpg-bar"), - digest: DUMMY_DIGEST.clone().into(), - size: 42, - }); + static ref INPUT_NODE_FOO: Node = Node::Directory( + DirectoryNode::new( + Bytes::from("mp57d33657rf34lzvlbpfa1gjfv5gmpg-bar"), + DUMMY_DIGEST.clone(), + 42, + ) + .unwrap() + ); } #[test] @@ -263,9 +261,7 @@ mod test { command_args: vec![":".into()], outputs: vec!["nix/store/fhaj6gmwns62s6ypkcldbaj2ybvkhx3p-foo".into()], environment_vars: expected_environment_vars, - inputs: vec![proto::Node { - node: Some(INPUT_NODE_FOO.clone()) - }], + inputs: vec![(&*INPUT_NODE_FOO).into()], inputs_dir: "nix/store".into(), constraints: Some(BuildConstraints { system: derivation.system.clone(), diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index 54e3550fc940..c92a9c57e0c7 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -15,15 +15,12 @@ use tokio_util::io::SyncIoBridge; use tracing::{error, instrument, warn, Level, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_build::buildservice::BuildService; -use tvix_castore::proto::node::Node; use tvix_eval::{EvalIO, FileType, StdIO}; use tvix_store::nar::NarCalculationService; use tvix_castore::{ blobservice::BlobService, - directoryservice::{self, DirectoryService}, - proto::NamedNode, - B3Digest, + directoryservice::{self, DirectoryService, NamedNode, Node}, }; use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; @@ -122,7 +119,12 @@ impl TvixStoreIO { .await? { // if we have a PathInfo, we know there will be a root_node (due to validation) - Some(path_info) => path_info.node.expect("no node").node.expect("no node"), + Some(path_info) => path_info + .node + .as_ref() + .expect("no node") + .try_into() + .expect("invalid node"), // If there's no PathInfo found, this normally means we have to // trigger the build (and insert into PathInfoService, after // reference scanning). @@ -284,19 +286,17 @@ impl TvixStoreIO { // For each output, insert a PathInfo. for output in &build_result.outputs { - let root_node = output.node.as_ref().expect("invalid root node"); + let root_node = output.try_into().expect("invalid root node"); // calculate the nar representation let (nar_size, nar_sha256) = self .nar_calculation_service - .calculate_nar(root_node) + .calculate_nar(&root_node) .await?; // assemble the PathInfo to persist let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(root_node.clone()), - }), + node: Some((&root_node).into()), references: vec![], // TODO: refscan narinfo: Some(tvix_store::proto::NarInfo { nar_size, @@ -332,13 +332,11 @@ impl TvixStoreIO { build_result .outputs .into_iter() + .map(|output_node| Node::try_from(&output_node).expect("invalid node")) .find(|output_node| { - output_node.node.as_ref().expect("invalid node").get_name() - == store_path.to_string().as_bytes() + output_node.get_name() == store_path.to_string().as_bytes() }) .expect("build didn't produce the store path") - .node - .expect("invalid node") } } } @@ -460,20 +458,12 @@ impl EvalIO for TvixStoreIO { )) } Node::File(file_node) => { - let digest: B3Digest = - file_node.digest.clone().try_into().map_err(|_e| { - error!( - file_node = ?file_node, - "invalid digest" - ); - io::Error::new( - io::ErrorKind::InvalidData, - format!("invalid digest length in file node: {:?}", file_node), - ) - })?; - self.tokio_handle.block_on(async { - let resp = self.blob_service.as_ref().open_read(&digest).await?; + let resp = self + .blob_service + .as_ref() + .open_read(file_node.digest()) + .await?; match resp { Some(blob_reader) => { // The VM Response needs a sync [std::io::Reader]. @@ -482,12 +472,12 @@ impl EvalIO for TvixStoreIO { } None => { error!( - blob.digest = %digest, + blob.digest = %file_node.digest(), "blob not found", ); Err(io::Error::new( io::ErrorKind::NotFound, - format!("blob {} not found", &digest), + format!("blob {} not found", &file_node.digest()), )) } } @@ -543,16 +533,7 @@ impl EvalIO for TvixStoreIO { match node { Node::Directory(directory_node) => { // fetch the Directory itself. - let digest: B3Digest = - directory_node.digest.clone().try_into().map_err(|_e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!( - "invalid digest length in directory node: {:?}", - directory_node - ), - ) - })?; + let digest = directory_node.digest().clone(); if let Some(directory) = self.tokio_handle.block_on(async { self.directory_service.as_ref().get(&digest).await @@ -560,9 +541,11 @@ impl EvalIO for TvixStoreIO { let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new(); for node in directory.nodes() { children.push(match node { - Node::Directory(e) => (e.name, FileType::Directory), - Node::File(e) => (e.name, FileType::Regular), - Node::Symlink(e) => (e.name, FileType::Symlink), + Node::Directory(e) => { + (e.get_name().clone(), FileType::Directory) + } + Node::File(e) => (e.get_name().clone(), FileType::Regular), + Node::Symlink(e) => (e.get_name().clone(), FileType::Symlink), }) } Ok(children) diff --git a/tvix/nar-bridge/src/nar.rs b/tvix/nar-bridge/src/nar.rs index 9ee27c7df5ca..1dba05f5b38f 100644 --- a/tvix/nar-bridge/src/nar.rs +++ b/tvix/nar-bridge/src/nar.rs @@ -46,25 +46,20 @@ pub async fn get( } // parse the proto - let mut root_node: tvix_castore::proto::Node = Message::decode(Bytes::from(root_node_proto)) + let root_node: tvix_castore::proto::Node = Message::decode(Bytes::from(root_node_proto)) .map_err(|e| { warn!(err=%e, "unable to decode root node proto"); StatusCode::NOT_FOUND })?; + let root_node: tvix_castore::directoryservice::Node = (&root_node).try_into().map_err(|e| { + warn!(err=%e, "root node validation failed"); + StatusCode::BAD_REQUEST + })?; + // validate the node, but add a dummy node name, as we only send unnamed // nodes - if let Some(rn) = root_node.node { - root_node.node = Some(rn.rename("00000000000000000000000000000000-dummy".into())) - } - - let root_node = root_node - .validate() - .map_err(|e| { - warn!(err=%e, "root node validation failed"); - StatusCode::BAD_REQUEST - })? - .to_owned(); + let root_node = root_node.rename("00000000000000000000000000000000-dummy".into()); let (w, r) = tokio::io::duplex(1024 * 8); @@ -130,7 +125,7 @@ pub async fn put( // store mapping of narhash to root node into root_nodes. // we need it later to populate the root node when accepting the PathInfo. - root_nodes.write().put(nar_hash_actual, root_node); + root_nodes.write().put(nar_hash_actual, (&root_node).into()); Ok("") } diff --git a/tvix/nar-bridge/src/narinfo.rs b/tvix/nar-bridge/src/narinfo.rs index 4706be64d5b4..a883ac88ce18 100644 --- a/tvix/nar-bridge/src/narinfo.rs +++ b/tvix/nar-bridge/src/narinfo.rs @@ -61,22 +61,20 @@ pub async fn get( StatusCode::INTERNAL_SERVER_ERROR })?; - let mut narinfo = path_info.to_narinfo(store_path).ok_or_else(|| { + let mut narinfo = path_info.to_narinfo(store_path.as_ref()).ok_or_else(|| { warn!(path_info=?path_info, "PathInfo contained no NAR data"); StatusCode::INTERNAL_SERVER_ERROR })?; // encode the (unnamed) root node in the NAR url itself. - let root_node = path_info - .node - .as_ref() - .and_then(|n| n.node.as_ref()) - .expect("root node must not be none") - .clone() - .rename("".into()); + let root_node = tvix_castore::directoryservice::Node::try_from( + path_info.node.as_ref().expect("root node must not be none"), + ) + .unwrap() // PathInfo is validated + .rename("".into()); let mut buf = Vec::new(); - Node::encode(&root_node, &mut buf); + Node::encode(&(&root_node).into(), &mut buf); let url = format!( "nar/tvix-castore/{}?narsize={}", @@ -128,10 +126,10 @@ pub async fn put( // Lookup root node with peek, as we don't want to update the LRU list. // We need to be careful to not hold the RwLock across the await point. - let maybe_root_node = root_nodes + let maybe_root_node: Option<tvix_castore::directoryservice::Node> = root_nodes .read() .peek(&narinfo.nar_hash) - .map(|v| v.to_owned()); + .and_then(|v| v.try_into().ok()); match maybe_root_node { Some(root_node) => { @@ -139,7 +137,7 @@ pub async fn put( // We need to rename the node to the narinfo storepath basename, as // that's where it's stored in PathInfo. pathinfo.node = Some(castorepb::Node { - node: Some(root_node.rename(narinfo.store_path.to_string().into())), + node: Some((&root_node.rename(narinfo.store_path.to_string().into())).into()), }); // Persist the PathInfo. diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index d639a6e9d78d..2af8f0ee436b 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -352,7 +352,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync // annotated with information we have from the reference graph. let path_info = PathInfo { node: Some(tvix_castore::proto::Node { - node: Some(root_node), + node: Some((&root_node).into()), }), references: Vec::from_iter( elem.references.iter().map(|e| e.digest().to_vec().into()), diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index 70a97982e642..7dd4770f4b99 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,8 +1,9 @@ use std::path::Path; use tracing::{debug, instrument}; use tvix_castore::{ - blobservice::BlobService, directoryservice::DirectoryService, import::fs::ingest_path, - proto::node::Node, B3Digest, + blobservice::BlobService, + directoryservice::{DirectoryService, NamedNode, Node}, + import::fs::ingest_path, }; use nix_compat::{ @@ -32,24 +33,24 @@ pub fn log_node(node: &Node, path: &Path) { Node::Directory(directory_node) => { debug!( path = ?path, - name = ?directory_node.name, - digest = %B3Digest::try_from(directory_node.digest.clone()).unwrap(), + name = ?directory_node.get_name(), + digest = %directory_node.digest(), "import successful", ) } Node::File(file_node) => { debug!( path = ?path, - name = ?file_node.name, - digest = %B3Digest::try_from(file_node.digest.clone()).unwrap(), + name = ?file_node.get_name(), + digest = %file_node.digest(), "import successful" ) } Node::Symlink(symlink_node) => { debug!( path = ?path, - name = ?symlink_node.name, - target = ?symlink_node.target, + name = ?symlink_node.get_name(), + target = ?symlink_node.target(), "import successful" ) } @@ -87,7 +88,7 @@ pub fn derive_nar_ca_path_info( // assemble the [crate::proto::PathInfo] object. PathInfo { node: Some(tvix_castore::proto::Node { - node: Some(root_node), + node: Some((&root_node).into()), }), // There's no reference scanning on path contents ingested like this. references: vec![], diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 32c2f4e58061..9f3d0e0d74ea 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -7,12 +7,11 @@ use tokio::{ }; use tvix_castore::{ blobservice::BlobService, - directoryservice::DirectoryService, + directoryservice::{DirectoryService, NamedNode, Node}, import::{ blobs::{self, ConcurrentBlobUploader}, ingest_entries, IngestionEntry, IngestionError, }, - proto::{node::Node, NamedNode}, PathBuf, }; @@ -99,7 +98,7 @@ where let (_, node) = try_join!(produce, consume)?; // remove the fake "root" name again - debug_assert_eq!(&node.get_name(), b"root"); + debug_assert_eq!(&node.get_name()[..], b"root"); Ok(node.rename("".into())) } @@ -172,12 +171,13 @@ mod test { use rstest::*; use tokio_stream::StreamExt; use tvix_castore::blobservice::BlobService; - use tvix_castore::directoryservice::DirectoryService; + use tvix_castore::directoryservice::{ + Directory, DirectoryNode, DirectoryService, FileNode, Node, SymlinkNode, + }; use tvix_castore::fixtures::{ DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS, HELLOWORLD_BLOB_DIGEST, }; - use tvix_castore::proto as castorepb; use crate::tests::fixtures::{ blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD, @@ -199,10 +199,13 @@ mod test { .expect("must parse"); assert_eq!( - castorepb::node::Node::Symlink(castorepb::SymlinkNode { - name: "".into(), // name must be empty - target: "/nix/store/somewhereelse".into(), - }), + Node::Symlink( + SymlinkNode::new( + "".into(), // name must be empty + "/nix/store/somewhereelse".into(), + ) + .unwrap() + ), root_node ); } @@ -222,12 +225,15 @@ mod test { .expect("must parse"); assert_eq!( - castorepb::node::Node::File(castorepb::FileNode { - name: "".into(), // name must be empty - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u64, - executable: false, - }), + Node::File( + FileNode::new( + "".into(), // name must be empty + HELLOWORLD_BLOB_DIGEST.clone(), + HELLOWORLD_BLOB_CONTENTS.len() as u64, + false, + ) + .unwrap() + ), root_node ); @@ -250,11 +256,14 @@ mod test { .expect("must parse"); assert_eq!( - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: "".into(), // name must be empty - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), + Node::Directory( + DirectoryNode::new( + "".into(), // name must be empty + DIRECTORY_COMPLICATED.digest(), + DIRECTORY_COMPLICATED.size(), + ) + .unwrap() + ), root_node, ); @@ -262,7 +271,7 @@ mod test { assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap()); // directoryservice must contain the directories, at least with get_recursive. - let resp: Result<Vec<castorepb::Directory>, _> = directory_service + let resp: Result<Vec<Directory>, _> = directory_service .get_recursive(&DIRECTORY_COMPLICATED.digest()) .collect() .await; diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs index 8cbb091f1aab..8a19f6bd6c47 100644 --- a/tvix/store/src/nar/mod.rs +++ b/tvix/store/src/nar/mod.rs @@ -8,16 +8,14 @@ pub use import::ingest_nar_and_hash; pub use renderer::calculate_size_and_sha256; pub use renderer::write_nar; pub use renderer::SimpleRenderer; -use tvix_castore::proto as castorepb; +use tvix_castore::directoryservice::Node; #[async_trait] pub trait NarCalculationService: Send + Sync { /// Return the nar size and nar sha256 digest for a given root node. /// This can be used to calculate NAR-based output paths. - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), tvix_castore::Error>; + async fn calculate_nar(&self, root_node: &Node) + -> Result<(u64, [u8; 32]), tvix_castore::Error>; } #[async_trait] @@ -27,7 +25,7 @@ where { async fn calculate_nar( &self, - root_node: &castorepb::node::Node, + root_node: &Node, ) -> Result<(u64, [u8; 32]), tvix_castore::Error> { self.as_ref().calculate_nar(root_node).await } diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index e3cb54dd229f..bb60f7835810 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -10,8 +10,7 @@ use tracing::{instrument, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_castore::{ blobservice::BlobService, - directoryservice::DirectoryService, - proto::{self as castorepb, NamedNode}, + directoryservice::{DirectoryService, NamedNode, Node}, }; pub struct SimpleRenderer<BS, DS> { @@ -36,7 +35,7 @@ where { async fn calculate_nar( &self, - root_node: &castorepb::node::Node, + root_node: &Node, ) -> Result<(u64, [u8; 32]), tvix_castore::Error> { calculate_size_and_sha256( root_node, @@ -52,7 +51,7 @@ where /// NAR output. #[instrument(skip_all, fields(indicatif.pb_show=1))] pub async fn calculate_size_and_sha256<BS, DS>( - root_node: &castorepb::node::Node, + root_node: &Node, blob_service: BS, directory_service: DS, ) -> Result<(u64, [u8; 32]), RenderError> @@ -80,13 +79,13 @@ where Ok((cw.count(), h.finalize().into())) } -/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path, +/// Accepts a [Node] pointing to the root of a (store) path, /// and uses the passed blob_service and directory_service to perform the /// necessary lookups as it traverses the structure. /// The contents in NAR serialization are writen to the passed [AsyncWrite]. pub async fn write_nar<W, BS, DS>( mut w: W, - proto_root_node: &castorepb::node::Node, + proto_root_node: &Node, blob_service: BS, directory_service: DS, ) -> Result<(), RenderError> @@ -115,7 +114,7 @@ where /// This consumes the node. async fn walk_node<BS, DS>( nar_node: nar_writer::Node<'_, '_>, - proto_node: &castorepb::node::Node, + proto_node: &Node, blob_service: BS, directory_service: DS, ) -> Result<(BS, DS), RenderError> @@ -124,23 +123,17 @@ where DS: DirectoryService + Send, { match proto_node { - castorepb::node::Node::Symlink(proto_symlink_node) => { + Node::Symlink(proto_symlink_node) => { nar_node - .symlink(&proto_symlink_node.target) + .symlink(proto_symlink_node.target()) .await .map_err(RenderError::NARWriterError)?; } - castorepb::node::Node::File(proto_file_node) => { - let digest_len = proto_file_node.digest.len(); - let digest = proto_file_node.digest.clone().try_into().map_err(|_| { - RenderError::StoreError(io::Error::new( - io::ErrorKind::Other, - format!("invalid digest len {} in file node", digest_len), - )) - })?; + Node::File(proto_file_node) => { + let digest = proto_file_node.digest(); let mut blob_reader = match blob_service - .open_read(&digest) + .open_read(digest) .await .map_err(RenderError::StoreError)? { @@ -153,36 +146,24 @@ where nar_node .file( - proto_file_node.executable, - proto_file_node.size, + proto_file_node.executable(), + proto_file_node.size(), &mut blob_reader, ) .await .map_err(RenderError::NARWriterError)?; } - castorepb::node::Node::Directory(proto_directory_node) => { - let digest_len = proto_directory_node.digest.len(); - let digest = proto_directory_node - .digest - .clone() - .try_into() - .map_err(|_| { - RenderError::StoreError(io::Error::new( - io::ErrorKind::InvalidData, - format!("invalid digest len {} in directory node", digest_len), - )) - })?; - + Node::Directory(proto_directory_node) => { // look it up with the directory service match directory_service - .get(&digest) + .get(proto_directory_node.digest()) .await .map_err(|e| RenderError::StoreError(e.into()))? { // if it's None, that's an error! None => Err(RenderError::DirectoryNotFound( - digest, - proto_directory_node.name.clone(), + proto_directory_node.digest().clone(), + proto_directory_node.get_name().clone(), ))?, Some(proto_directory) => { // start a directory node @@ -206,7 +187,7 @@ where (blob_service, directory_service) = Box::pin(walk_node( child_node, - &proto_node, + proto_node, blob_service, directory_service, )) diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs index aa64b1c01f16..664cb8bbd54e 100644 --- a/tvix/store/src/pathinfoservice/fs/mod.rs +++ b/tvix/store/src/pathinfoservice/fs/mod.rs @@ -1,10 +1,10 @@ use futures::stream::BoxStream; use futures::StreamExt; use tonic::async_trait; +use tvix_castore::directoryservice::Node; use tvix_castore::fs::{RootNodes, TvixStoreFs}; -use tvix_castore::proto as castorepb; -use tvix_castore::Error; use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; +use tvix_castore::{Error, ValidateNodeError}; use super::PathInfoService; @@ -48,7 +48,7 @@ impl<T> RootNodes for RootNodesWrapper<T> where T: AsRef<dyn PathInfoService> + Send + Sync, { - async fn get_by_basename(&self, name: &[u8]) -> Result<Option<castorepb::node::Node>, Error> { + async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> { let Ok(store_path) = nix_compat::store_path::StorePath::from_bytes(name) else { return Ok(None); }; @@ -61,20 +61,23 @@ where .map(|path_info| { path_info .node + .as_ref() .expect("missing root node") - .node - .expect("empty node") - })) + .try_into() + .map_err(|e: ValidateNodeError| Error::StorageError(e.to_string())) + }) + .transpose()?) } - fn list(&self) -> BoxStream<Result<castorepb::node::Node, Error>> { + fn list(&self) -> BoxStream<Result<Node, Error>> { Box::pin(self.0.as_ref().list().map(|result| { - result.map(|path_info| { + result.and_then(|path_info| { path_info .node + .as_ref() .expect("missing root node") - .node - .expect("empty node") + .try_into() + .map_err(|e: ValidateNodeError| Error::StorageError(e.to_string())) }) })) } diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 2ac0e43303cb..1ba1279ec626 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -11,7 +11,8 @@ use tonic::{async_trait, Code}; use tracing::{instrument, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_castore::composition::{CompositionContext, ServiceBuilder}; -use tvix_castore::{proto as castorepb, Error}; +use tvix_castore::directoryservice::Node; +use tvix_castore::Error; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] @@ -123,10 +124,7 @@ where T::Future: Send, { #[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=1))] - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { + async fn calculate_nar(&self, root_node: &Node) -> Result<(u64, [u8; 32]), Error> { let span = Span::current(); span.pb_set_message("Waiting for NAR calculation"); span.pb_start(); @@ -134,8 +132,8 @@ where let path_info = self .grpc_client .clone() - .calculate_nar(castorepb::Node { - node: Some(root_node.clone()), + .calculate_nar(tvix_castore::proto::Node { + node: Some(root_node.into()), }) .await .map_err(|e| Error::StorageError(e.to_string()))? diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs index 39c592bc96fb..5d808cd988aa 100644 --- a/tvix/store/src/pathinfoservice/lru.rs +++ b/tvix/store/src/pathinfoservice/lru.rs @@ -109,7 +109,10 @@ mod test { let root_node = p.node.as_mut().unwrap(); if let castorepb::Node { node: Some(node) } = root_node { let n = node.to_owned(); - *node = n.rename("11111111111111111111111111111111-dummy2".into()); + *node = (&tvix_castore::directoryservice::Node::try_from(&n) + .unwrap() + .rename("11111111111111111111111111111111-dummy2".into())) + .into(); } else { unreachable!() } diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index a2756033172f..c10b97857ea1 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -230,7 +230,7 @@ where Ok(Some(PathInfo { node: Some(castorepb::Node { // set the name of the root node to the digest-name of the store path. - node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())), + node: Some((&root_node.rename(narinfo.store_path.to_string().into())).into()), }), references: pathinfo.references, narinfo: pathinfo.narinfo, diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 68f557567629..e420801ce528 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -74,24 +74,19 @@ where &self, request: Request<castorepb::Node>, ) -> Result<Response<proto::CalculateNarResponse>> { - match request.into_inner().node { - None => Err(Status::invalid_argument("no root node sent")), - Some(root_node) => { - if let Err(e) = root_node.validate() { - warn!(err = %e, "invalid root node"); - Err(Status::invalid_argument("invalid root node"))? - } + let root_node = (&request.into_inner()).try_into().map_err(|e| { + warn!(err = %e, "invalid root node"); + Status::invalid_argument("invalid root node") + })?; - match self.nar_calculation_service.calculate_nar(&root_node).await { - Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse { - nar_size, - nar_sha256: nar_sha256.to_vec().into(), - })), - Err(e) => { - warn!(err = %e, "error during NAR calculation"); - Err(e.into()) - } - } + match self.nar_calculation_service.calculate_nar(&root_node).await { + Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse { + nar_size, + nar_sha256: nar_sha256.to_vec().into(), + })), + Err(e) => { + warn!(err = %e, "error during NAR calculation"); + Err(e.into()) } } } diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs index b45e6fda46fe..5434df49daaa 100644 --- a/tvix/store/src/proto/mod.rs +++ b/tvix/store/src/proto/mod.rs @@ -9,7 +9,8 @@ use nix_compat::{ store_path::{self, StorePathRef}, }; use thiserror::Error; -use tvix_castore::proto::{self as castorepb, NamedNode, ValidateNodeError}; +use tvix_castore::directoryservice::NamedNode; +use tvix_castore::ValidateNodeError; mod grpc_pathinfoservice_wrapper; @@ -87,7 +88,7 @@ impl PathInfo { /// validate performs some checks on the PathInfo struct, /// Returning either a [store_path::StorePath] of the root node, or a /// [ValidatePathInfoError]. - pub fn validate(&self) -> Result<store_path::StorePathRef<'_>, ValidatePathInfoError> { + pub fn validate(&self) -> Result<store_path::StorePath, ValidatePathInfoError> { // ensure the references have the right number of bytes. for (i, reference) in self.references.iter().enumerate() { if reference.len() != store_path::DIGEST_SIZE { @@ -158,14 +159,15 @@ impl PathInfo { // Ensure there is a (root) node present, and it properly parses to a [store_path::StorePath]. let root_nix_path = match &self.node { - None | Some(castorepb::Node { node: None }) => { - Err(ValidatePathInfoError::NoNodePresent)? - } - Some(castorepb::Node { node: Some(node) }) => { - node.validate() + None => Err(ValidatePathInfoError::NoNodePresent)?, + Some(node) => { + // TODO save result somewhere + let node: tvix_castore::directoryservice::Node = node + .try_into() .map_err(ValidatePathInfoError::InvalidRootNode)?; // parse the name of the node itself and return parse_node_name_root(node.get_name(), ValidatePathInfoError::InvalidNodeName)? + .to_owned() } }; diff --git a/tvix/store/src/proto/tests/pathinfo.rs b/tvix/store/src/proto/tests/pathinfo.rs index 4d0834878d7c..1e4e7199049a 100644 --- a/tvix/store/src/proto/tests/pathinfo.rs +++ b/tvix/store/src/proto/tests/pathinfo.rs @@ -3,17 +3,18 @@ use crate::tests::fixtures::*; use bytes::Bytes; use data_encoding::BASE64; use nix_compat::nixbase32; -use nix_compat::store_path::{self, StorePathRef}; +use nix_compat::store_path::{self, StorePath, StorePathRef}; use rstest::rstest; use tvix_castore::proto as castorepb; +use tvix_castore::ValidateNodeError; #[rstest] #[case::no_node(None, Err(ValidatePathInfoError::NoNodePresent))] -#[case::no_node_2(Some(castorepb::Node { node: None}), Err(ValidatePathInfoError::NoNodePresent))] +#[case::no_node_2(Some(castorepb::Node { node: None}), Err(ValidatePathInfoError::InvalidRootNode(ValidateNodeError::NoNodeSet)))] fn validate_pathinfo( #[case] node: Option<castorepb::Node>, - #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, + #[case] exp_result: Result<StorePath, ValidatePathInfoError>, ) { // construct the PathInfo object let p = PathInfo { @@ -22,9 +23,6 @@ fn validate_pathinfo( }; assert_eq!(exp_result, p.validate()); - - let err = p.validate().expect_err("validation should fail"); - assert!(matches!(err, ValidatePathInfoError::NoNodePresent)); } #[rstest] @@ -32,12 +30,12 @@ fn validate_pathinfo( name: DUMMY_PATH.into(), digest: DUMMY_DIGEST.clone().into(), size: 0, -}, Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()))] +}, Ok(StorePath::from_bytes(DUMMY_PATH.as_bytes()).unwrap()))] #[case::invalid_digest_length(castorepb::DirectoryNode { name: DUMMY_PATH.into(), digest: Bytes::new(), size: 0, -}, Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))))] +}, Err(ValidatePathInfoError::InvalidRootNode(tvix_castore::ValidateNodeError::InvalidDigestLen(0))))] #[case::invalid_node_name_no_storepath(castorepb::DirectoryNode { name: "invalid".into(), digest: DUMMY_DIGEST.clone().into(), @@ -48,7 +46,7 @@ fn validate_pathinfo( )))] fn validate_directory( #[case] directory_node: castorepb::DirectoryNode, - #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, + #[case] exp_result: Result<StorePath, ValidatePathInfoError>, ) { // construct the PathInfo object let p = PathInfo { @@ -68,7 +66,7 @@ fn validate_directory( size: 0, executable: false, }, - Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()) + Ok(StorePath::from_bytes(DUMMY_PATH.as_bytes()).unwrap()) )] #[case::invalid_digest_len( castorepb::FileNode { @@ -76,7 +74,7 @@ fn validate_directory( digest: Bytes::new(), ..Default::default() }, - Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))) + Err(ValidatePathInfoError::InvalidRootNode(tvix_castore::ValidateNodeError::InvalidDigestLen(0))) )] #[case::invalid_node_name( castorepb::FileNode { @@ -91,7 +89,7 @@ fn validate_directory( )] fn validate_file( #[case] file_node: castorepb::FileNode, - #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, + #[case] exp_result: Result<StorePath, ValidatePathInfoError>, ) { // construct the PathInfo object let p = PathInfo { @@ -109,7 +107,7 @@ fn validate_file( name: DUMMY_PATH.into(), target: "foo".into(), }, - Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()) + Ok(StorePath::from_bytes(DUMMY_PATH.as_bytes()).unwrap()) )] #[case::invalid_node_name( castorepb::SymlinkNode { @@ -123,7 +121,7 @@ fn validate_file( )] fn validate_symlink( #[case] symlink_node: castorepb::SymlinkNode, - #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, + #[case] exp_result: Result<StorePath, ValidatePathInfoError>, ) { // construct the PathInfo object let p = PathInfo { @@ -233,7 +231,7 @@ fn validate_symlink_empty_target_invalid() { target: "".into(), }); - node.validate().expect_err("must fail validation"); + tvix_castore::directoryservice::Node::try_from(&node).expect_err("must fail validation"); } /// Create a node with a symlink target including null bytes, and ensure it @@ -245,7 +243,7 @@ fn validate_symlink_target_null_byte_invalid() { target: "foo\0".into(), }); - node.validate().expect_err("must fail validation"); + tvix_castore::directoryservice::Node::try_from(&node).expect_err("must fail validation"); } /// Create a PathInfo with a correct deriver field and ensure it succeeds. diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 8bfb5a72bb2f..d2ee42f8df18 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -9,8 +9,9 @@ use std::io; use std::sync::Arc; use tokio::io::sink; use tvix_castore::blobservice::BlobService; -use tvix_castore::directoryservice::DirectoryService; -use tvix_castore::proto as castorepb; +use tvix_castore::directoryservice::{ + DirectoryNode, DirectoryService, FileNode, Node, SymlinkNode, +}; #[rstest] #[tokio::test] @@ -22,10 +23,9 @@ async fn single_symlink( write_nar( &mut buf, - &castorepb::node::Node::Symlink(castorepb::SymlinkNode { - name: "doesntmatter".into(), - target: "/nix/store/somewhereelse".into(), - }), + &Node::Symlink( + SymlinkNode::new("doesntmatter".into(), "/nix/store/somewhereelse".into()).unwrap(), + ), // don't put anything in the stores, as we don't actually do any requests. blob_service, directory_service, @@ -45,12 +45,15 @@ async fn single_file_missing_blob( ) { let e = write_nar( sink(), - &castorepb::node::Node::File(castorepb::FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u64, - executable: false, - }), + &Node::File( + FileNode::new( + "doesntmatter".into(), + HELLOWORLD_BLOB_DIGEST.clone(), + HELLOWORLD_BLOB_CONTENTS.len() as u64, + false, + ) + .unwrap(), + ), // the blobservice is empty intentionally, to provoke the error. blob_service, directory_service, @@ -90,12 +93,15 @@ async fn single_file_wrong_blob_size( // Test with a root FileNode of a too big size let e = write_nar( sink(), - &castorepb::node::Node::File(castorepb::FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: 42, // <- note the wrong size here! - executable: false, - }), + &Node::File( + FileNode::new( + "doesntmatter".into(), + HELLOWORLD_BLOB_DIGEST.clone(), + 42, // <- note the wrong size here! + false, + ) + .unwrap(), + ), blob_service.clone(), directory_service.clone(), ) @@ -112,12 +118,15 @@ async fn single_file_wrong_blob_size( // Test with a root FileNode of a too small size let e = write_nar( sink(), - &castorepb::node::Node::File(castorepb::FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: 2, // <- note the wrong size here! - executable: false, - }), + &Node::File( + FileNode::new( + "doesntmatter".into(), + HELLOWORLD_BLOB_DIGEST.clone(), + 2, // <- note the wrong size here! + false, + ) + .unwrap(), + ), blob_service, directory_service, ) @@ -153,12 +162,15 @@ async fn single_file( write_nar( &mut buf, - &castorepb::node::Node::File(castorepb::FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u64, - executable: false, - }), + &Node::File( + FileNode::new( + "doesntmatter".into(), + HELLOWORLD_BLOB_DIGEST.clone(), + HELLOWORLD_BLOB_CONTENTS.len() as u64, + false, + ) + .unwrap(), + ), blob_service, directory_service, ) @@ -196,11 +208,14 @@ async fn test_complicated( write_nar( &mut buf, - &castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), + &Node::Directory( + DirectoryNode::new( + "doesntmatter".into(), + DIRECTORY_COMPLICATED.digest(), + DIRECTORY_COMPLICATED.size(), + ) + .unwrap(), + ), blob_service.clone(), directory_service.clone(), ) @@ -211,11 +226,14 @@ async fn test_complicated( // ensure calculate_nar does return the correct sha256 digest and sum. let (nar_size, nar_digest) = calculate_size_and_sha256( - &castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), + &Node::Directory( + DirectoryNode::new( + "doesntmatter".into(), + DIRECTORY_COMPLICATED.digest(), + DIRECTORY_COMPLICATED.size(), + ) + .unwrap(), + ), blob_service, directory_service, ) |