use std::str;
use prost::Message;
mod grpc_blobservice_wrapper;
mod grpc_directoryservice_wrapper;
use crate::{B3Digest, DirectoryError};
pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper;
pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper;
tonic::include_proto!("tvix.castore.v1");
#[cfg(feature = "tonic-reflection")]
/// Compiled file descriptors for implementing [gRPC
/// reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) with e.g.
/// [`tonic_reflection`](https://docs.rs/tonic-reflection).
pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix.castore.v1");
#[cfg(test)]
mod tests;
/// Errors that occur during StatBlobResponse validation
#[derive(Debug, PartialEq, Eq, thiserror::Error)]
pub enum ValidateStatBlobResponseError {
/// Invalid digest length encountered
#[error("Invalid digest length {0} for chunk #{1}")]
InvalidDigestLen(usize, usize),
}
fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> {
iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i))
}
impl Directory {
/// The size of a directory is the number of all regular and symlink elements,
/// the number of directory elements, and their size fields.
pub fn size(&self) -> u64 {
if cfg!(debug_assertions) {
self.size_checked()
.expect("Directory::size exceeds u64::MAX")
} else {
self.size_checked().unwrap_or(u64::MAX)
}
}
fn size_checked(&self) -> Option<u64> {
checked_sum([
self.files.len().try_into().ok()?,
self.symlinks.len().try_into().ok()?,
self.directories.len().try_into().ok()?,
checked_sum(self.directories.iter().map(|e| e.size))?,
])
}
/// Calculates the digest of a Directory, which is the blake3 hash of a
/// Directory protobuf message, serialized in protobuf canonical form.
pub fn digest(&self) -> B3Digest {
let mut hasher = blake3::Hasher::new();
hasher
.update(&self.encode_to_vec())
.finalize()
.as_bytes()
.into()
}
}
/// Accepts a name, and a mutable reference to the previous name.
/// If the passed name is larger than the previous one, the reference is updated.
/// If it's not, an error is returned.
fn update_if_lt_prev<'n>(prev_name: &mut &'n [u8], name: &'n [u8]) -> Result<(), DirectoryError> {
if *name < **prev_name {
return Err(DirectoryError::WrongSorting(bytes::Bytes::copy_from_slice(
name,
)));
}
*prev_name = name;
Ok(())
}
// TODO: add a proper owned version here that moves various fields
impl TryFrom<Directory> for crate::Directory {
type Error = DirectoryError;
fn try_from(value: Directory) -> Result<Self, Self::Error> {
(&value).try_into()
}
}
impl TryFrom<&Directory> for crate::Directory {
type Error = DirectoryError;
fn try_from(directory: &Directory) -> Result<crate::Directory, DirectoryError> {
let mut dir = crate::Directory::new();
let mut last_file_name: &[u8] = b"";
// TODO: this currently loops over all three types separately, rather
// than peeking and picking from where would be the next.
for file in directory.files.iter().map(move |file| {
update_if_lt_prev(&mut last_file_name, &file.name).map(|()| file.clone())
}) {
let file = file?;
let (name, node) = Node {
node: Some(node::Node::File(file)),
}
.into_name_and_node()?;
dir.add(name, node)?;
}
let mut last_directory_name: &[u8] = b"";
for directory in directory.directories.iter().map(move |directory| {
update_if_lt_prev(&mut last_directory_name, &directory.name).map(|()| directory.clone())
}) {
let directory = directory?;
let (name, node) = Node {
node: Some(node::Node::Directory(directory)),
}
.into_name_and_node()?;
dir.add(name, node)?;
}
let mut last_symlink_name: &[u8] = b"";
for symlink in directory.symlinks.iter().map(move |symlink| {
update_if_lt_prev(&mut last_symlink_name, &symlink.name).map(|()| symlink.clone())
}) {
let symlink = symlink?;
let (name, node) = Node {
node: Some(node::Node::Symlink(symlink)),
}
.into_name_and_node()?;
dir.add(name, node)?;
}
Ok(dir)
}
}
// TODO: add a proper owned version here that moves various fields
impl From<crate::Directory> for Directory {
fn from(value: crate::Directory) -> Self {
(&value).into()
}
}
impl From<&crate::Directory> for Directory {
fn from(directory: &crate::Directory) -> Directory {
let mut directories = vec![];
let mut files = vec![];
let mut symlinks = vec![];
for (name, node) in directory.nodes() {
match node {
crate::Node::File {
digest,
size,
executable,
} => files.push(FileNode {
name: name.clone(),
digest: digest.to_owned().into(),
size: *size,
executable: *executable,
}),
crate::Node::Directory { digest, size } => directories.push(DirectoryNode {
name: name.clone(),
digest: digest.to_owned().into(),
size: *size,
}),
crate::Node::Symlink { target } => {
symlinks.push(SymlinkNode {
name: name.clone(),
target: target.to_owned().into(),
});
}
}
}
Directory {
directories,
files,
symlinks,
}
}
}
impl Node {
/// Converts a proto [Node] to a [crate::Node], and splits off the name.
pub fn into_name_and_node(self) -> Result<(bytes::Bytes, crate::Node), DirectoryError> {
match self.node.ok_or_else(|| DirectoryError::NoNodeSet)? {
node::Node::Directory(n) => {
let digest = B3Digest::try_from(n.digest)
.map_err(|e| DirectoryError::InvalidNode(n.name.to_owned(), e.into()))?;
let node = crate::Node::Directory {
digest,
size: n.size,
};
Ok((n.name, node))
}
node::Node::File(n) => {
let digest = B3Digest::try_from(n.digest)
.map_err(|e| DirectoryError::InvalidNode(n.name.to_owned(), e.into()))?;
let node = crate::Node::File {
digest,
size: n.size,
executable: n.executable,
};
Ok((n.name, node))
}
node::Node::Symlink(n) => {
let node = crate::Node::Symlink {
target: n
.target
.try_into()
.map_err(|e| DirectoryError::InvalidNode(n.name.to_owned(), e))?,
};
Ok((n.name, node))
}
}
}
/// Construsts a [Node] from a name and [crate::Node].
pub fn from_name_and_node(name: bytes::Bytes, n: crate::Node) -> Self {
match n {
crate::Node::Directory { digest, size } => Self {
node: Some(node::Node::Directory(DirectoryNode {
name,
digest: digest.into(),
size,
})),
},
crate::Node::File {
digest,
size,
executable,
} => Self {
node: Some(node::Node::File(FileNode {
name,
digest: digest.into(),
size,
executable,
})),
},
crate::Node::Symlink { target } => Self {
node: Some(node::Node::Symlink(SymlinkNode {
name,
target: target.into(),
})),
},
}
}
}
impl StatBlobResponse {
/// Validates a StatBlobResponse. All chunks must have valid blake3 digests.
/// It is allowed to send an empty list, if no more granular chunking is
/// available.
pub fn validate(&self) -> Result<(), ValidateStatBlobResponseError> {
for (i, chunk) in self.chunks.iter().enumerate() {
if chunk.digest.len() != blake3::KEY_LEN {
return Err(ValidateStatBlobResponseError::InvalidDigestLen(
chunk.digest.len(),
i,
));
}
}
Ok(())
}
}