diff options
Diffstat (limited to 'tvix/castore/src/proto')
-rw-r--r-- | tvix/castore/src/proto/grpc_blobservice_wrapper.rs | 175 | ||||
-rw-r--r-- | tvix/castore/src/proto/grpc_directoryservice_wrapper.rs | 103 | ||||
-rw-r--r-- | tvix/castore/src/proto/mod.rs | 471 | ||||
-rw-r--r-- | tvix/castore/src/proto/tests/directory.rs | 452 | ||||
-rw-r--r-- | tvix/castore/src/proto/tests/directory_nodes_iterator.rs | 78 | ||||
-rw-r--r-- | tvix/castore/src/proto/tests/mod.rs | 2 |
6 files changed, 1281 insertions, 0 deletions
diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs new file mode 100644 index 0000000000..41bd0698ec --- /dev/null +++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs @@ -0,0 +1,175 @@ +use crate::blobservice::BlobService; +use core::pin::pin; +use data_encoding::BASE64; +use futures::{stream::BoxStream, TryFutureExt}; +use std::{ + collections::VecDeque, + ops::{Deref, DerefMut}, +}; +use tokio_stream::StreamExt; +use tokio_util::io::ReaderStream; +use tonic::{async_trait, Request, Response, Status, Streaming}; +use tracing::{instrument, warn}; + +pub struct GRPCBlobServiceWrapper<T> { + blob_service: T, +} + +impl<T> GRPCBlobServiceWrapper<T> { + pub fn new(blob_service: T) -> Self { + Self { blob_service } + } +} + +// This is necessary because bytes::BytesMut comes up with +// a default 64 bytes capacity that cannot be changed +// easily if you assume a bytes::BufMut trait implementation +// Therefore, we override the Default implementation here +// TODO(raitobezarius?): upstream me properly +struct BytesMutWithDefaultCapacity<const N: usize> { + inner: bytes::BytesMut, +} + +impl<const N: usize> Deref for BytesMutWithDefaultCapacity<N> { + type Target = bytes::BytesMut; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<const N: usize> DerefMut for BytesMutWithDefaultCapacity<N> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +impl<const N: usize> Default for BytesMutWithDefaultCapacity<N> { + fn default() -> Self { + BytesMutWithDefaultCapacity { + inner: bytes::BytesMut::with_capacity(N), + } + } +} + +impl<const N: usize> bytes::Buf for BytesMutWithDefaultCapacity<N> { + fn remaining(&self) -> usize { + self.inner.remaining() + } + + fn chunk(&self) -> &[u8] { + self.inner.chunk() + } + + fn advance(&mut self, cnt: usize) { + self.inner.advance(cnt); + } +} + +unsafe impl<const N: usize> bytes::BufMut for BytesMutWithDefaultCapacity<N> { + fn remaining_mut(&self) -> usize { + self.inner.remaining_mut() + } + + unsafe fn advance_mut(&mut self, cnt: usize) { + self.inner.advance_mut(cnt); + } + + fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice { + self.inner.chunk_mut() + } +} + +#[async_trait] +impl<T> super::blob_service_server::BlobService for GRPCBlobServiceWrapper<T> +where + T: Deref<Target = dyn BlobService> + Send + Sync + 'static, +{ + // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 + type ReadStream = BoxStream<'static, Result<super::BlobChunk, Status>>; + + #[instrument(skip_all, fields(blob.digest=format!("b3:{}", BASE64.encode(&request.get_ref().digest))))] + async fn stat( + &self, + request: Request<super::StatBlobRequest>, + ) -> Result<Response<super::StatBlobResponse>, Status> { + let rq = request.into_inner(); + let req_digest = rq + .digest + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + match self.blob_service.chunks(&req_digest).await { + Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), + Ok(Some(chunk_metas)) => Ok(Response::new(super::StatBlobResponse { + chunks: chunk_metas, + ..Default::default() + })), + Err(e) => { + warn!(err=%e, "failed to request chunks"); + Err(e.into()) + } + } + } + + #[instrument(skip_all, fields(blob.digest=format!("b3:{}", BASE64.encode(&request.get_ref().digest))))] + async fn read( + &self, + request: Request<super::ReadBlobRequest>, + ) -> Result<Response<Self::ReadStream>, Status> { + let rq = request.into_inner(); + + let req_digest = rq + .digest + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + match self.blob_service.open_read(&req_digest).await { + Ok(Some(r)) => { + let chunks_stream = + ReaderStream::new(r).map(|chunk| Ok(super::BlobChunk { data: chunk? })); + Ok(Response::new(Box::pin(chunks_stream))) + } + Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))), + Err(e) => { + warn!(err=%e, "failed to call open_read"); + Err(e.into()) + } + } + } + + #[instrument(skip_all)] + async fn put( + &self, + request: Request<Streaming<super::BlobChunk>>, + ) -> Result<Response<super::PutBlobResponse>, Status> { + let req_inner = request.into_inner(); + + let data_stream = req_inner.map(|x| { + x.map(|x| VecDeque::from(x.data.to_vec())) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) + }); + + let mut data_reader = tokio_util::io::StreamReader::new(data_stream); + + let mut blob_writer = pin!(self.blob_service.open_write().await); + + tokio::io::copy(&mut data_reader, &mut blob_writer) + .await + .map_err(|e| { + warn!("error copying: {}", e); + Status::internal("error copying") + })?; + + let digest = blob_writer + .close() + .map_err(|e| { + warn!("error closing stream: {}", e); + Status::internal("error closing stream") + }) + .await?; + + Ok(Response::new(super::PutBlobResponse { + digest: digest.into(), + })) + } +} diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs new file mode 100644 index 0000000000..5c1428690c --- /dev/null +++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs @@ -0,0 +1,103 @@ +use crate::directoryservice::ClosureValidator; +use crate::proto; +use crate::{directoryservice::DirectoryService, B3Digest}; +use futures::stream::BoxStream; +use futures::TryStreamExt; +use std::ops::Deref; +use tokio_stream::once; +use tonic::{async_trait, Request, Response, Status, Streaming}; +use tracing::{instrument, warn}; + +pub struct GRPCDirectoryServiceWrapper<T> { + directory_service: T, +} + +impl<T> GRPCDirectoryServiceWrapper<T> { + pub fn new(directory_service: T) -> Self { + Self { directory_service } + } +} + +#[async_trait] +impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<T> +where + T: Deref<Target = dyn DirectoryService> + Send + Sync + 'static, +{ + type GetStream = BoxStream<'static, tonic::Result<proto::Directory, Status>>; + + #[instrument(skip_all)] + async fn get<'a>( + &'a self, + request: Request<proto::GetDirectoryRequest>, + ) -> Result<Response<Self::GetStream>, Status> { + let req_inner = request.into_inner(); + + let by_what = &req_inner + .by_what + .ok_or_else(|| Status::invalid_argument("invalid by_what"))?; + + match by_what { + proto::get_directory_request::ByWhat::Digest(ref digest) => { + let digest: B3Digest = digest + .clone() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + Ok(tonic::Response::new({ + if !req_inner.recursive { + let directory = self + .directory_service + .get(&digest) + .await + .map_err(|e| { + warn!(err = %e, directory.digest=%digest, "failed to get directory"); + tonic::Status::new(tonic::Code::Internal, e.to_string()) + })? + .ok_or_else(|| { + Status::not_found(format!("directory {} not found", digest)) + })?; + + Box::pin(once(Ok(directory))) + } else { + // If recursive was requested, traverse via get_recursive. + Box::pin( + self.directory_service.get_recursive(&digest).map_err(|e| { + tonic::Status::new(tonic::Code::Internal, e.to_string()) + }), + ) + } + })) + } + } + } + + #[instrument(skip_all)] + async fn put( + &self, + request: Request<Streaming<proto::Directory>>, + ) -> Result<Response<proto::PutDirectoryResponse>, Status> { + let mut req_inner = request.into_inner(); + + // We put all Directory messages we receive into ClosureValidator first. + let mut validator = ClosureValidator::default(); + while let Some(directory) = req_inner.message().await? { + validator.add(directory)?; + } + + // drain, which validates connectivity too. + let directories = validator.finalize()?; + + let mut directory_putter = self.directory_service.put_multiple_start(); + for directory in directories { + directory_putter.put(directory).await?; + } + + // Properly close the directory putter. Peek at last_directory_digest + // and return it, or propagate errors. + let last_directory_dgst = directory_putter.close().await?; + + Ok(Response::new(proto::PutDirectoryResponse { + root_digest: last_directory_dgst.into(), + })) + } +} diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs new file mode 100644 index 0000000000..5374e3ae5a --- /dev/null +++ b/tvix/castore/src/proto/mod.rs @@ -0,0 +1,471 @@ +#![allow(non_snake_case)] +// https://github.com/hyperium/tonic/issues/1056 +use bstr::ByteSlice; +use std::{collections::HashSet, iter::Peekable, str}; + +use prost::Message; + +mod grpc_blobservice_wrapper; +mod grpc_directoryservice_wrapper; + +pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; +pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; + +use crate::{B3Digest, B3_LEN}; + +tonic::include_proto!("tvix.castore.v1"); + +#[cfg(feature = "tonic-reflection")] +/// Compiled file descriptors for implementing [gRPC +/// reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) with e.g. +/// [`tonic_reflection`](https://docs.rs/tonic-reflection). +pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix.castore.v1"); + +#[cfg(test)] +mod tests; + +/// Errors that can occur during the validation of [Directory] messages. +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +pub enum ValidateDirectoryError { + /// Elements are not in sorted order + #[error("{:?} is not sorted", .0.as_bstr())] + WrongSorting(Vec<u8>), + /// Multiple elements with the same name encountered + #[error("{:?} is a duplicate name", .0.as_bstr())] + DuplicateName(Vec<u8>), + /// Invalid node + #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())] + InvalidNode(Vec<u8>, ValidateNodeError), + #[error("Total size exceeds u32::MAX")] + SizeOverflow, +} + +/// Errors that occur during Node validation +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +pub enum ValidateNodeError { + #[error("No node set")] + NoNodeSet, + /// Invalid digest length encountered + #[error("Invalid Digest length: {0}")] + InvalidDigestLen(usize), + /// Invalid name encountered + #[error("Invalid name: {}", .0.as_bstr())] + InvalidName(Vec<u8>), + /// Invalid symlink target + #[error("Invalid symlink target: {}", .0.as_bstr())] + InvalidSymlinkTarget(Vec<u8>), +} + +/// Errors that occur during StatBlobResponse validation +#[derive(Debug, PartialEq, Eq, thiserror::Error)] +pub enum ValidateStatBlobResponseError { + /// Invalid digest length encountered + #[error("Invalid digest length {0} for chunk #{1}")] + InvalidDigestLen(usize, usize), +} + +/// Checks a Node name for validity as an intermediate node. +/// We disallow slashes, null bytes, '.', '..' and the empty string. +pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { + if name.is_empty() + || name == b".." + || name == b"." + || name.contains(&0x00) + || name.contains(&b'/') + { + Err(ValidateNodeError::InvalidName(name.to_owned())) + } else { + Ok(()) + } +} + +/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode] +/// and [node::Node], so we can ask all of them for the name easily. +pub trait NamedNode { + fn get_name(&self) -> &[u8]; +} + +impl NamedNode for &FileNode { + fn get_name(&self) -> &[u8] { + &self.name + } +} + +impl NamedNode for &DirectoryNode { + fn get_name(&self) -> &[u8] { + &self.name + } +} + +impl NamedNode for &SymlinkNode { + fn get_name(&self) -> &[u8] { + &self.name + } +} + +impl NamedNode for node::Node { + fn get_name(&self) -> &[u8] { + match self { + node::Node::File(node_file) => &node_file.name, + node::Node::Directory(node_directory) => &node_directory.name, + node::Node::Symlink(node_symlink) => &node_symlink.name, + } + } +} + +impl Node { + /// Ensures the node has a valid enum kind (is Some), and passes its + // per-enum validation. + pub fn validate(&self) -> Result<(), ValidateNodeError> { + if let Some(node) = self.node.as_ref() { + node.validate() + } else { + Err(ValidateNodeError::NoNodeSet) + } + } +} + +impl node::Node { + /// Returns the node with a new name. + pub fn rename(self, name: bytes::Bytes) -> Self { + match self { + node::Node::Directory(n) => node::Node::Directory(DirectoryNode { name, ..n }), + node::Node::File(n) => node::Node::File(FileNode { name, ..n }), + node::Node::Symlink(n) => node::Node::Symlink(SymlinkNode { name, ..n }), + } + } + + /// Ensures the node has a valid name, and checks the type-specific fields too. + pub fn validate(&self) -> Result<(), ValidateNodeError> { + match self { + // for a directory root node, ensure the digest has the appropriate size. + node::Node::Directory(directory_node) => { + if directory_node.digest.len() != B3_LEN { + Err(ValidateNodeError::InvalidDigestLen( + directory_node.digest.len(), + ))?; + } + validate_node_name(&directory_node.name) + } + // for a file root node, ensure the digest has the appropriate size. + node::Node::File(file_node) => { + if file_node.digest.len() != B3_LEN { + Err(ValidateNodeError::InvalidDigestLen(file_node.digest.len()))?; + } + validate_node_name(&file_node.name) + } + // ensure the symlink target is not empty and doesn't contain null bytes. + node::Node::Symlink(symlink_node) => { + if symlink_node.target.is_empty() || symlink_node.target.contains(&b'\0') { + Err(ValidateNodeError::InvalidSymlinkTarget( + symlink_node.target.to_vec(), + ))?; + } + validate_node_name(&symlink_node.name) + } + } + } +} + +impl PartialOrd for node::Node { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for node::Node { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for FileNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for FileNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for SymlinkNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for SymlinkNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +impl PartialOrd for DirectoryNode { + fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { + Some(self.cmp(other)) + } +} + +impl Ord for DirectoryNode { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.get_name().cmp(other.get_name()) + } +} + +/// Accepts a name, and a mutable reference to the previous name. +/// If the passed name is larger than the previous one, the reference is updated. +/// If it's not, an error is returned. +fn update_if_lt_prev<'n>( + prev_name: &mut &'n [u8], + name: &'n [u8], +) -> Result<(), ValidateDirectoryError> { + if *name < **prev_name { + return Err(ValidateDirectoryError::WrongSorting(name.to_vec())); + } + *prev_name = name; + Ok(()) +} + +/// Inserts the given name into a HashSet if it's not already in there. +/// If it is, an error is returned. +fn insert_once<'n>( + seen_names: &mut HashSet<&'n [u8]>, + name: &'n [u8], +) -> Result<(), ValidateDirectoryError> { + if seen_names.get(name).is_some() { + return Err(ValidateDirectoryError::DuplicateName(name.to_vec())); + } + seen_names.insert(name); + Ok(()) +} + +fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> { + iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i)) +} + +impl Directory { + /// The size of a directory is the number of all regular and symlink elements, + /// the number of directory elements, and their size fields. + pub fn size(&self) -> u64 { + if cfg!(debug_assertions) { + self.size_checked() + .expect("Directory::size exceeds u64::MAX") + } else { + self.size_checked().unwrap_or(u64::MAX) + } + } + + fn size_checked(&self) -> Option<u64> { + checked_sum([ + self.files.len().try_into().ok()?, + self.symlinks.len().try_into().ok()?, + self.directories.len().try_into().ok()?, + checked_sum(self.directories.iter().map(|e| e.size))?, + ]) + } + + /// Calculates the digest of a Directory, which is the blake3 hash of a + /// Directory protobuf message, serialized in protobuf canonical form. + pub fn digest(&self) -> B3Digest { + let mut hasher = blake3::Hasher::new(); + + hasher + .update(&self.encode_to_vec()) + .finalize() + .as_bytes() + .into() + } + + /// validate checks the directory for invalid data, such as: + /// - violations of name restrictions + /// - invalid digest lengths + /// - not properly sorted lists + /// - duplicate names in the three lists + pub fn validate(&self) -> Result<(), ValidateDirectoryError> { + let mut seen_names: HashSet<&[u8]> = HashSet::new(); + + let mut last_directory_name: &[u8] = b""; + let mut last_file_name: &[u8] = b""; + let mut last_symlink_name: &[u8] = b""; + + // check directories + for directory_node in &self.directories { + node::Node::Directory(directory_node.clone()) + .validate() + .map_err(|e| { + ValidateDirectoryError::InvalidNode(directory_node.name.to_vec(), e) + })?; + + update_if_lt_prev(&mut last_directory_name, &directory_node.name)?; + insert_once(&mut seen_names, &directory_node.name)?; + } + + // check files + for file_node in &self.files { + node::Node::File(file_node.clone()) + .validate() + .map_err(|e| ValidateDirectoryError::InvalidNode(file_node.name.to_vec(), e))?; + + update_if_lt_prev(&mut last_file_name, &file_node.name)?; + insert_once(&mut seen_names, &file_node.name)?; + } + + // check symlinks + for symlink_node in &self.symlinks { + node::Node::Symlink(symlink_node.clone()) + .validate() + .map_err(|e| ValidateDirectoryError::InvalidNode(symlink_node.name.to_vec(), e))?; + + update_if_lt_prev(&mut last_symlink_name, &symlink_node.name)?; + insert_once(&mut seen_names, &symlink_node.name)?; + } + + self.size_checked() + .ok_or(ValidateDirectoryError::SizeOverflow)?; + + Ok(()) + } + + /// Allows iterating over all three nodes ([DirectoryNode], [FileNode], + /// [SymlinkNode]) in an ordered fashion, as long as the individual lists + /// are sorted (which can be checked by the [Directory::validate]). + pub fn nodes(&self) -> DirectoryNodesIterator { + return DirectoryNodesIterator { + i_directories: self.directories.iter().peekable(), + i_files: self.files.iter().peekable(), + i_symlinks: self.symlinks.iter().peekable(), + }; + } + + /// Adds the specified [node::Node] to the [Directory], preserving sorted entries. + /// This assumes the [Directory] to be sorted prior to adding the node. + /// + /// Inserting an element that already exists with the same name in the directory is not + /// supported. + pub fn add(&mut self, node: node::Node) { + debug_assert!( + !self.files.iter().any(|x| x.get_name() == node.get_name()), + "name already exists in files" + ); + debug_assert!( + !self + .directories + .iter() + .any(|x| x.get_name() == node.get_name()), + "name already exists in directories" + ); + debug_assert!( + !self + .symlinks + .iter() + .any(|x| x.get_name() == node.get_name()), + "name already exists in symlinks" + ); + + match node { + node::Node::File(node) => { + let pos = self + .files + .binary_search(&node) + .expect_err("Tvix bug: dir entry with name already exists"); + self.files.insert(pos, node); + } + node::Node::Directory(node) => { + let pos = self + .directories + .binary_search(&node) + .expect_err("Tvix bug: dir entry with name already exists"); + self.directories.insert(pos, node); + } + node::Node::Symlink(node) => { + let pos = self + .symlinks + .binary_search(&node) + .expect_err("Tvix bug: dir entry with name already exists"); + self.symlinks.insert(pos, node); + } + } + } +} + +impl StatBlobResponse { + /// Validates a StatBlobResponse. All chunks must have valid blake3 digests. + /// It is allowed to send an empty list, if no more granular chunking is + /// available. + pub fn validate(&self) -> Result<(), ValidateStatBlobResponseError> { + for (i, chunk) in self.chunks.iter().enumerate() { + if chunk.digest.len() != blake3::KEY_LEN { + return Err(ValidateStatBlobResponseError::InvalidDigestLen( + chunk.digest.len(), + i, + )); + } + } + Ok(()) + } +} + +/// Struct to hold the state of an iterator over all nodes of a Directory. +/// +/// Internally, this keeps peekable Iterators over all three lists of a +/// Directory message. +pub struct DirectoryNodesIterator<'a> { + // directory: &Directory, + i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>, + i_files: Peekable<std::slice::Iter<'a, FileNode>>, + i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>, +} + +/// looks at two elements implementing NamedNode, and returns true if "left +/// is smaller / comes first". +/// +/// Some(_) is preferred over None. +fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool { + match left { + // if left is None, right always wins + None => false, + Some(left_inner) => { + // left is Some. + match right { + // left is Some, right is None - left wins. + None => true, + Some(right_inner) => { + // both are Some - compare the name. + return left_inner.get_name() < right_inner.get_name(); + } + } + } + } +} + +impl Iterator for DirectoryNodesIterator<'_> { + type Item = node::Node; + + // next returns the next node in the Directory. + // we peek at all three internal iterators, and pick the one with the + // smallest name, to ensure lexicographical ordering. + // The individual lists are already known to be sorted. + fn next(&mut self) -> Option<Self::Item> { + if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) { + // i_directories is still in the game, compare with symlinks + if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) { + self.i_directories + .next() + .cloned() + .map(node::Node::Directory) + } else { + self.i_symlinks.next().cloned().map(node::Node::Symlink) + } + } else { + // i_files is still in the game, compare with symlinks + if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) { + self.i_files.next().cloned().map(node::Node::File) + } else { + self.i_symlinks.next().cloned().map(node::Node::Symlink) + } + } + } +} diff --git a/tvix/castore/src/proto/tests/directory.rs b/tvix/castore/src/proto/tests/directory.rs new file mode 100644 index 0000000000..81b73a048d --- /dev/null +++ b/tvix/castore/src/proto/tests/directory.rs @@ -0,0 +1,452 @@ +use crate::proto::{ + node, Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError, + ValidateNodeError, +}; + +use hex_literal::hex; + +const DUMMY_DIGEST: [u8; 32] = [0; 32]; + +#[test] +fn size() { + { + let d = Directory::default(); + assert_eq!(d.size(), 0); + } + { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 0, + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } + { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 4, + }], + ..Default::default() + }; + assert_eq!(d.size(), 5); + } + { + let d = Directory { + files: vec![FileNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + executable: false, + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: "foo".into(), + target: "bar".into(), + }], + ..Default::default() + }; + assert_eq!(d.size(), 1); + } +} + +#[test] +#[cfg_attr(not(debug_assertions), ignore)] +#[should_panic = "Directory::size exceeds u64::MAX"] +fn size_unchecked_panic() { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX, + }], + ..Default::default() + }; + + d.size(); +} + +#[test] +#[cfg_attr(debug_assertions, ignore)] +fn size_unchecked_saturate() { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX, + }], + ..Default::default() + }; + + assert_eq!(d.size(), u64::MAX); +} + +#[test] +fn size_checked() { + // We don't test the overflow cases that rely purely on immediate + // child count, since that would take an absurd amount of memory. + { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX - 1, + }], + ..Default::default() + }; + assert_eq!(d.size_checked(), Some(u64::MAX)); + } + { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX, + }], + ..Default::default() + }; + assert_eq!(d.size_checked(), None); + } + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX / 2, + }, + DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX / 2, + }, + ], + ..Default::default() + }; + assert_eq!(d.size_checked(), None); + } +} + +#[test] +fn digest() { + let d = Directory::default(); + + assert_eq!( + d.digest(), + (&hex!("af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262")).into() + ) +} + +#[test] +fn validate_empty() { + let d = Directory::default(); + assert_eq!(d.validate(), Ok(())); +} + +#[test] +fn validate_invalid_names() { + { + let d = Directory { + directories: vec![DirectoryNode { + name: "".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { + assert_eq!(n, b"") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + directories: vec![DirectoryNode { + name: ".".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { + assert_eq!(n, b".") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + files: vec![FileNode { + name: "..".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + executable: false, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { + assert_eq!(n, b"..") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: "\x00".into(), + target: "foo".into(), + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { + assert_eq!(n, b"\x00") + } + _ => panic!("unexpected error"), + }; + } + + { + let d = Directory { + symlinks: vec![SymlinkNode { + name: "foo/bar".into(), + target: "foo".into(), + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => { + assert_eq!(n, b"foo/bar") + } + _ => panic!("unexpected error"), + }; + } +} + +#[test] +fn validate_invalid_digest() { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: vec![0x00, 0x42].into(), // invalid length + size: 42, + }], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => { + assert_eq!(n, 2) + } + _ => panic!("unexpected error"), + } +} + +#[test] +fn validate_sorting() { + // "b" comes before "a", bad. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "b".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::WrongSorting(s) => { + assert_eq!(s, b"a"); + } + _ => panic!("unexpected error"), + } + } + + // "a" exists twice, bad. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + ..Default::default() + }; + match d.validate().expect_err("must fail") { + ValidateDirectoryError::DuplicateName(s) => { + assert_eq!(s, b"a"); + } + _ => panic!("unexpected error"), + } + } + + // "a" comes before "b", all good. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "b".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + ..Default::default() + }; + + d.validate().expect("validate shouldn't error"); + } + + // [b, c] and [a] are both properly sorted. + { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "b".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + DirectoryNode { + name: "c".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 42, + }, + ], + symlinks: vec![SymlinkNode { + name: "a".into(), + target: "foo".into(), + }], + ..Default::default() + }; + + d.validate().expect("validate shouldn't error"); + } +} + +#[test] +fn validate_overflow() { + let d = Directory { + directories: vec![DirectoryNode { + name: "foo".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: u64::MAX, + }], + ..Default::default() + }; + + match d.validate().expect_err("must fail") { + ValidateDirectoryError::SizeOverflow => {} + _ => panic!("unexpected error"), + } +} + +#[test] +fn add_nodes_to_directory() { + let mut d = Directory { + ..Default::default() + }; + + d.add(node::Node::Directory(DirectoryNode { + name: "b".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 1, + })); + d.add(node::Node::Directory(DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 1, + })); + d.add(node::Node::Directory(DirectoryNode { + name: "z".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 1, + })); + + d.add(node::Node::File(FileNode { + name: "f".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 1, + executable: true, + })); + d.add(node::Node::File(FileNode { + name: "c".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 1, + executable: true, + })); + d.add(node::Node::File(FileNode { + name: "g".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 1, + executable: true, + })); + + d.add(node::Node::Symlink(SymlinkNode { + name: "t".into(), + target: "a".into(), + })); + d.add(node::Node::Symlink(SymlinkNode { + name: "o".into(), + target: "a".into(), + })); + d.add(node::Node::Symlink(SymlinkNode { + name: "e".into(), + target: "a".into(), + })); + + d.validate().expect("directory should be valid"); +} + +#[test] +#[cfg_attr(not(debug_assertions), ignore)] +#[should_panic = "name already exists in directories"] +fn add_duplicate_node_to_directory_panics() { + let mut d = Directory { + ..Default::default() + }; + + d.add(node::Node::Directory(DirectoryNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 1, + })); + d.add(node::Node::File(FileNode { + name: "a".into(), + digest: DUMMY_DIGEST.to_vec().into(), + size: 1, + executable: true, + })); +} diff --git a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs new file mode 100644 index 0000000000..68f147a332 --- /dev/null +++ b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs @@ -0,0 +1,78 @@ +use crate::proto::Directory; +use crate::proto::DirectoryNode; +use crate::proto::FileNode; +use crate::proto::NamedNode; +use crate::proto::SymlinkNode; + +#[test] +fn iterator() { + let d = Directory { + directories: vec![ + DirectoryNode { + name: "c".into(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "d".into(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "h".into(), + ..DirectoryNode::default() + }, + DirectoryNode { + name: "l".into(), + ..DirectoryNode::default() + }, + ], + files: vec![ + FileNode { + name: "b".into(), + ..FileNode::default() + }, + FileNode { + name: "e".into(), + ..FileNode::default() + }, + FileNode { + name: "g".into(), + ..FileNode::default() + }, + FileNode { + name: "j".into(), + ..FileNode::default() + }, + ], + symlinks: vec![ + SymlinkNode { + name: "a".into(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "f".into(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "i".into(), + ..SymlinkNode::default() + }, + SymlinkNode { + name: "k".into(), + ..SymlinkNode::default() + }, + ], + }; + + // We keep this strings here and convert to string to make the comparison + // less messy. + let mut node_names: Vec<String> = vec![]; + + for node in d.nodes() { + node_names.push(String::from_utf8(node.get_name().to_vec()).unwrap()); + } + + assert_eq!( + vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"], + node_names + ); +} diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs new file mode 100644 index 0000000000..8d903bacb6 --- /dev/null +++ b/tvix/castore/src/proto/tests/mod.rs @@ -0,0 +1,2 @@ +mod directory; +mod directory_nodes_iterator; |