diff options
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 3 | ||||
-rw-r--r-- | tvix/store/src/digests.rs | 49 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 79 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/memory.rs | 32 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 10 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/sled.rs | 32 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/traverse.rs | 13 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/utils.rs | 36 | ||||
-rw-r--r-- | tvix/store/src/lib.rs | 2 | ||||
-rw-r--r-- | tvix/store/src/nar/mod.rs | 15 | ||||
-rw-r--r-- | tvix/store/src/nar/non_caching_calculation_service.rs | 10 | ||||
-rw-r--r-- | tvix/store/src/nar/renderer.rs | 19 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_directoryservice_wrapper.rs | 36 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs | 5 | ||||
-rw-r--r-- | tvix/store/src/proto/mod.rs | 11 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/directory.rs | 10 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_directoryservice.rs | 2 |
17 files changed, 199 insertions, 165 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 807cc07643d1..a6bda8c1873b 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -141,7 +141,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { HashAlgo::Sha256, nar_calculation_service .calculate_nar(&root_node)? - .nar_sha256, + .1 + .to_vec(), )); let mut drv = Derivation::default(); diff --git a/tvix/store/src/digests.rs b/tvix/store/src/digests.rs new file mode 100644 index 000000000000..85664cc1667e --- /dev/null +++ b/tvix/store/src/digests.rs @@ -0,0 +1,49 @@ +use data_encoding::BASE64; +use thiserror::Error; + +// FUTUREWORK: make generic + +#[derive(PartialEq, Eq, Hash, Debug)] +pub struct B3Digest(Vec<u8>); + +// TODO: allow converting these errors to crate::Error +#[derive(Error, Debug)] +pub enum Error { + #[error("invalid digest length: {0}")] + InvalidDigestLen(usize), +} + +impl B3Digest { + // constructs a [B3Digest] from a [Vec<u8>]. + // Returns an error if the digest has the wrong length. + pub fn from_vec(value: Vec<u8>) -> Result<Self, Error> { + if value.len() != 32 { + Err(Error::InvalidDigestLen(value.len())) + } else { + Ok(Self(Vec::from(value))) + } + } + + // returns a copy of the inner [Vec<u8>]. + pub fn to_vec(&self) -> Vec<u8> { + self.0.to_vec() + } +} + +impl From<&[u8; 32]> for B3Digest { + fn from(value: &[u8; 32]) -> Self { + Self(value.to_vec()) + } +} + +impl Clone for B3Digest { + fn clone(&self) -> Self { + Self(self.0.to_owned()) + } +} + +impl std::fmt::Display for B3Digest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "b3:{}", BASE64.encode(self.0.as_slice())) + } +} diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index d4ac9fd925fd..46e19224c759 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -2,8 +2,7 @@ use std::collections::HashSet; use super::{DirectoryPutter, DirectoryService}; use crate::proto::{self, get_directory_request::ByWhat}; -use crate::Error; -use data_encoding::BASE64; +use crate::{B3Digest, Error}; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{transport::Channel, Status}; @@ -38,16 +37,16 @@ impl GRPCDirectoryService { impl DirectoryService for GRPCDirectoryService { type DirectoriesIterator = StreamIterator; - fn get(&self, digest: &[u8; 32]) -> Result<Option<crate::proto::Directory>, crate::Error> { + fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); - let digest = digest.to_owned(); + let digest_as_vec = digest.to_vec(); let task = self.tokio_handle.spawn(async move { let mut s = grpc_client .get(proto::GetDirectoryRequest { recursive: false, - by_what: Some(ByWhat::Digest(digest.to_vec())), + by_what: Some(ByWhat::Digest(digest_as_vec)), }) .await? .into_inner(); @@ -56,6 +55,7 @@ impl DirectoryService for GRPCDirectoryService { s.message().await }); + let digest = digest.clone(); match self.tokio_handle.block_on(task)? { Ok(Some(directory)) => { // Validate the retrieved Directory indeed has the @@ -64,16 +64,14 @@ impl DirectoryService for GRPCDirectoryService { if actual_digest != digest { Err(crate::Error::StorageError(format!( "requested directory with digest {}, but got {}", - BASE64.encode(&digest), - BASE64.encode(&actual_digest) + digest, actual_digest ))) } else if let Err(e) = directory.validate() { // Validate the Directory itself is valid. warn!("directory failed validation: {}", e.to_string()); Err(crate::Error::StorageError(format!( "directory {} failed validation: {}", - BASE64.encode(&digest), - e, + digest, e, ))) } else { Ok(Some(directory)) @@ -85,7 +83,7 @@ impl DirectoryService for GRPCDirectoryService { } } - fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> { + fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> { let mut grpc_client = self.grpc_client.clone(); let task = self @@ -93,29 +91,27 @@ impl DirectoryService for GRPCDirectoryService { .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await }); match self.tokio_handle.block_on(task)? { - Ok(put_directory_resp) => Ok(put_directory_resp - .into_inner() - .root_digest - .as_slice() - .try_into() - .map_err(|_| { - Error::StorageError("invalid root digest length in response".to_string()) - })?), + Ok(put_directory_resp) => Ok(B3Digest::from_vec( + put_directory_resp.into_inner().root_digest, + ) + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?), Err(e) => Err(crate::Error::StorageError(e.to_string())), } } - #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))] - fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator { + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { let mut grpc_client = self.grpc_client.clone(); - let root_directory_digest = root_directory_digest.to_owned(); + let root_directory_digest_as_vec = root_directory_digest.to_vec(); let task: tokio::task::JoinHandle<Result<Streaming<proto::Directory>, Status>> = self.tokio_handle.spawn(async move { let s = grpc_client .get(proto::GetDirectoryRequest { recursive: true, - by_what: Some(ByWhat::Digest(root_directory_digest.to_vec())), + by_what: Some(ByWhat::Digest(root_directory_digest_as_vec)), }) .await? .into_inner(); @@ -125,7 +121,11 @@ impl DirectoryService for GRPCDirectoryService { let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); - StreamIterator::new(self.tokio_handle.clone(), &root_directory_digest, stream) + StreamIterator::new( + self.tokio_handle.clone(), + root_directory_digest.clone(), + stream, + ) } type DirectoryPutter = GRPCPutter; @@ -159,22 +159,22 @@ pub struct StreamIterator { // A stream of [proto::Directory] stream: Streaming<proto::Directory>, // The Directory digests we received so far - received_directory_digests: HashSet<[u8; 32]>, + received_directory_digests: HashSet<B3Digest>, // The Directory digests we're still expecting to get sent. - expected_directory_digests: HashSet<[u8; 32]>, + expected_directory_digests: HashSet<B3Digest>, } impl StreamIterator { pub fn new( tokio_handle: tokio::runtime::Handle, - root_digest: &[u8; 32], + root_digest: B3Digest, stream: Streaming<proto::Directory>, ) -> Self { Self { tokio_handle, stream, received_directory_digests: HashSet::new(), - expected_directory_digests: HashSet::from([*root_digest]), + expected_directory_digests: HashSet::from([root_digest]), } } } @@ -190,7 +190,7 @@ impl Iterator for StreamIterator { if let Err(e) = directory.validate() { return Some(Err(crate::Error::StorageError(format!( "directory {} failed validation: {}", - BASE64.encode(&directory.digest()), + directory.digest(), e, )))); } @@ -204,16 +204,19 @@ impl Iterator for StreamIterator { // means it once was in expected_directory_digests) return Some(Err(crate::Error::StorageError(format!( "received unexpected directory {}", - BASE64.encode(&directory_digest) + directory_digest )))); } self.received_directory_digests.insert(directory_digest); // register all children in expected_directory_digests. - // We ran validate() above, so we know these digests must be correct. for child_directory in &directory.directories { + // We ran validate() above, so we know these digests must be correct. + let child_directory_digest = + B3Digest::from_vec(child_directory.digest.clone()).unwrap(); + self.expected_directory_digests - .insert(child_directory.digest.clone().try_into().unwrap()); + .insert(child_directory_digest); } Some(Ok(directory)) @@ -294,7 +297,7 @@ impl DirectoryPutter for GRPCPutter { } /// Closes the stream for sending, and returns the value - fn close(&mut self) -> Result<[u8; 32], crate::Error> { + fn close(&mut self) -> Result<B3Digest, crate::Error> { // get self.rq, and replace it with None. // This ensures we can only close it once. match std::mem::take(&mut self.rq) { @@ -303,15 +306,15 @@ impl DirectoryPutter for GRPCPutter { // close directory_sender, so blocking on task will finish. drop(directory_sender); - Ok(self + let root_digest = self .tokio_handle .block_on(task)? .map_err(|e| Error::StorageError(e.to_string()))? - .root_digest - .try_into() - .map_err(|_| { - Error::StorageError("invalid root digest length in response".to_string()) - })?) + .root_digest; + + B3Digest::from_vec(root_digest).map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + }) } } } diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs index 2b4668a15ecf..524c7664c651 100644 --- a/tvix/store/src/directoryservice/memory.rs +++ b/tvix/store/src/directoryservice/memory.rs @@ -1,5 +1,4 @@ -use crate::{proto, Error}; -use data_encoding::BASE64; +use crate::{proto, B3Digest, Error}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use tracing::{instrument, warn}; @@ -9,17 +8,17 @@ use super::{DirectoryService, DirectoryTraverser}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { - db: Arc<RwLock<HashMap<[u8; 32], proto::Directory>>>, + db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>, } impl DirectoryService for MemoryDirectoryService { type DirectoriesIterator = DirectoryTraverser<Self>; - #[instrument(skip(self, digest), fields(directory.digest = BASE64.encode(digest)))] - fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error> { + #[instrument(skip(self, digest), fields(directory.digest = %digest))] + fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { let db = self.db.read()?; - match db.get(digest) { + match db.get(&digest) { // The directory was not found, return None => Ok(None), @@ -28,11 +27,10 @@ impl DirectoryService for MemoryDirectoryService { // Validate the retrieved Directory indeed has the // digest we expect it to have, to detect corruptions. let actual_digest = directory.digest(); - if actual_digest.as_slice() != digest { + if actual_digest != *digest { return Err(Error::StorageError(format!( "requested directory with digest {}, but got {}", - BASE64.encode(digest), - BASE64.encode(&actual_digest) + digest, actual_digest ))); } @@ -41,8 +39,7 @@ impl DirectoryService for MemoryDirectoryService { warn!("directory failed validation: {}", e.to_string()); return Err(Error::StorageError(format!( "directory {} failed validation: {}", - BASE64.encode(&actual_digest), - e, + actual_digest, e, ))); } @@ -51,28 +48,27 @@ impl DirectoryService for MemoryDirectoryService { } } - #[instrument(skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))] - fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error> { + #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] + fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); // validate the directory itself. if let Err(e) = directory.validate() { return Err(Error::InvalidRequest(format!( "directory {} failed validation: {}", - BASE64.encode(&digest), - e, + digest, e, ))); } // store it let mut db = self.db.write()?; - db.insert(digest, directory); + db.insert(digest.clone(), directory); Ok(digest) } - #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))] - fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator { + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { DirectoryTraverser::with(self.clone(), root_directory_digest) } diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index e8f11269ec1d..f387d28948f0 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -1,4 +1,4 @@ -use crate::{proto, Error}; +use crate::{proto, B3Digest, Error}; mod grpc; mod memory; mod sled; @@ -20,16 +20,16 @@ pub trait DirectoryService { /// Get looks up a single Directory message by its digest. /// In case the directory is not found, Ok(None) is returned. - fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error>; + fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; /// Get uploads a single Directory message, and returns the calculated /// digest, or an error. - fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error>; + fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>; /// Looks up a closure of [proto::Directory]. /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. - fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator; + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator; /// Allows persisting a closure of [proto::Directory], which is a graph of /// connected Directory messages. @@ -50,5 +50,5 @@ pub trait DirectoryPutter { fn put(&mut self, directory: proto::Directory) -> Result<(), Error>; /// Close the stream, and wait for any errors. - fn close(&mut self) -> Result<[u8; 32], Error>; + fn close(&mut self) -> Result<B3Digest, Error>; } diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs index d060232307b1..e737ff9b8c99 100644 --- a/tvix/store/src/directoryservice/sled.rs +++ b/tvix/store/src/directoryservice/sled.rs @@ -1,6 +1,5 @@ use crate::proto::Directory; -use crate::{proto, Error}; -use data_encoding::BASE64; +use crate::{proto, B3Digest, Error}; use prost::Message; use std::path::PathBuf; use tracing::{instrument, warn}; @@ -32,9 +31,9 @@ impl SledDirectoryService { impl DirectoryService for SledDirectoryService { type DirectoriesIterator = DirectoryTraverser<Self>; - #[instrument(name = "SledDirectoryService::get", skip(self, digest), fields(directory.digest = BASE64.encode(digest)))] - fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error> { - match self.db.get(digest) { + #[instrument(name = "SledDirectoryService::get", skip(self, digest), fields(directory.digest = %digest))] + fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { + match self.db.get(digest.to_vec()) { // The directory was not found, return Ok(None) => Ok(None), @@ -44,11 +43,10 @@ impl DirectoryService for SledDirectoryService { // Validate the retrieved Directory indeed has the // digest we expect it to have, to detect corruptions. let actual_digest = directory.digest(); - if actual_digest.as_slice() != digest { + if actual_digest != *digest { return Err(Error::StorageError(format!( "requested directory with digest {}, but got {}", - BASE64.encode(digest), - BASE64.encode(&actual_digest) + digest, actual_digest ))); } @@ -57,15 +55,14 @@ impl DirectoryService for SledDirectoryService { warn!("directory failed validation: {}", e.to_string()); return Err(Error::StorageError(format!( "directory {} failed validation: {}", - BASE64.encode(&actual_digest), - e, + actual_digest, e, ))); } Ok(Some(directory)) } Err(e) => { - warn!("unable to parse directory {}: {}", BASE64.encode(digest), e); + warn!("unable to parse directory {}: {}", digest, e); Err(Error::StorageError(e.to_string())) } }, @@ -74,28 +71,27 @@ impl DirectoryService for SledDirectoryService { } } - #[instrument(name = "SledDirectoryService::put", skip(self, directory), fields(directory.digest = BASE64.encode(&directory.digest())))] - fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error> { + #[instrument(name = "SledDirectoryService::put", skip(self, directory), fields(directory.digest = %directory.digest()))] + fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> { let digest = directory.digest(); // validate the directory itself. if let Err(e) = directory.validate() { return Err(Error::InvalidRequest(format!( "directory {} failed validation: {}", - BASE64.encode(&digest), - e, + digest, e, ))); } // store it - let result = self.db.insert(digest, directory.encode_to_vec()); + let result = self.db.insert(digest.to_vec(), directory.encode_to_vec()); if let Err(e) = result { return Err(Error::StorageError(e.to_string())); } Ok(digest) } - #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))] - fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator { + #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] + fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { DirectoryTraverser::with(self.clone(), root_directory_digest) } diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs index 46bf0c4b6329..f6193f091817 100644 --- a/tvix/store/src/directoryservice/traverse.rs +++ b/tvix/store/src/directoryservice/traverse.rs @@ -1,5 +1,5 @@ use super::DirectoryService; -use crate::{proto::NamedNode, Error}; +use crate::{proto::NamedNode, B3Digest, Error}; use tracing::{instrument, warn}; /// This traverses from a (root) node to the given (sub)path, returning the Node @@ -39,21 +39,18 @@ pub fn traverse_to<DS: DirectoryService>( Ok(None) } crate::proto::node::Node::Directory(directory_node) => { - // fetch the linked node from the directory_service - let digest: [u8; 32] = directory_node - .digest - .try_into() + let digest = B3Digest::from_vec(directory_node.digest) .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; + // fetch the linked node from the directory_service match directory_service.get(&digest)? { // If we didn't get the directory node that's linked, that's a store inconsistency, bail out! None => { - let digest_b64 = data_encoding::BASE64.encode(&digest); - warn!("directory {} does not exist", digest_b64); + warn!("directory {} does not exist", digest); Err(Error::StorageError(format!( "directory {} does not exist", - digest_b64 + digest ))) } Some(directory) => { diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs index 7d41e8d3ba01..3661808734f3 100644 --- a/tvix/store/src/directoryservice/utils.rs +++ b/tvix/store/src/directoryservice/utils.rs @@ -1,6 +1,7 @@ use super::DirectoryPutter; use super::DirectoryService; use crate::proto; +use crate::B3Digest; use crate::Error; use std::collections::{HashSet, VecDeque}; use tracing::{debug_span, instrument, warn}; @@ -13,17 +14,17 @@ pub struct DirectoryTraverser<DS: DirectoryService> { /// The list of all directories that still need to be traversed. The next /// element is picked from the front, new elements are enqueued at the /// back. - worklist_directory_digests: VecDeque<[u8; 32]>, + worklist_directory_digests: VecDeque<B3Digest>, /// The list of directory digests already sent to the consumer. /// We omit sending the same directories multiple times. - sent_directory_digests: HashSet<[u8; 32]>, + sent_directory_digests: HashSet<B3Digest>, } impl<DS: DirectoryService> DirectoryTraverser<DS> { - pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self { + pub fn with(directory_service: DS, root_directory_digest: &B3Digest) -> Self { Self { directory_service, - worklist_directory_digests: VecDeque::from([*root_directory_digest]), + worklist_directory_digests: VecDeque::from([root_directory_digest.clone()]), sent_directory_digests: HashSet::new(), } } @@ -33,12 +34,8 @@ impl<DS: DirectoryService> DirectoryTraverser<DS> { // This panics if the digest looks invalid, it's supposed to be checked first. fn enqueue_child_directories(&mut self, directory: &proto::Directory) { for child_directory_node in &directory.directories { - let child_digest: [u8; 32] = child_directory_node - .digest - .as_slice() - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string())) - .unwrap(); + // TODO: propagate error + let child_digest = B3Digest::from_vec(child_directory_node.digest.clone()).unwrap(); if self.worklist_directory_digests.contains(&child_digest) || self.sent_directory_digests.contains(&child_digest) @@ -59,8 +56,7 @@ impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { match self.worklist_directory_digests.pop_front() { None => None, Some(current_directory_digest) => { - let current_directory_b64 = data_encoding::BASE64.encode(¤t_directory_digest); - let span = debug_span!("directory.digest", current_directory_b64); + let span = debug_span!("directory.digest", "{}", current_directory_digest); let _ = span.enter(); // look up the directory itself. @@ -73,24 +69,24 @@ impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { warn!("directory failed validation: {}", e.to_string()); return Some(Err(Error::StorageError(format!( "invalid directory: {}", - current_directory_b64 + current_directory_digest )))); } current_directory } // if it's not there, we have an inconsistent store! Ok(None) => { - warn!("directory {} does not exist", current_directory_b64); + warn!("directory {} does not exist", current_directory_digest); return Some(Err(Error::StorageError(format!( "directory {} does not exist", - current_directory_b64 + current_directory_digest )))); } Err(e) => { warn!("failed to look up directory"); return Some(Err(Error::StorageError(format!( "unable to look up directory {}: {}", - current_directory_b64, e + current_directory_digest, e )))); } }; @@ -110,7 +106,7 @@ impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { /// TODO: verify connectivity? Factor out these checks into generic helpers? pub struct SimplePutter<DS: DirectoryService> { directory_service: DS, - last_directory_digest: Option<[u8; 32]>, + last_directory_digest: Option<B3Digest>, } impl<DS: DirectoryService> SimplePutter<DS> { @@ -133,9 +129,9 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { } /// We need to be mutable here, as that's the signature of the trait. - fn close(&mut self) -> Result<[u8; 32], Error> { - match self.last_directory_digest { - Some(last_digest) => Ok(last_digest), + fn close(&mut self) -> Result<B3Digest, Error> { + match &self.last_directory_digest { + Some(last_digest) => Ok(last_digest.clone()), None => Err(Error::InvalidRequest( "no directories sent, can't show root digest".to_string(), )), diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index 3ce826f98ba6..da0b6400a10b 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,3 +1,4 @@ +mod digests; mod errors; pub mod blobservice; @@ -7,6 +8,7 @@ pub mod nar; pub mod pathinfoservice; pub mod proto; +pub use digests::B3Digest; pub use errors::Error; #[cfg(test)] diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs index d6a2f0889a5f..2bfb541733be 100644 --- a/tvix/store/src/nar/mod.rs +++ b/tvix/store/src/nar/mod.rs @@ -1,4 +1,4 @@ -use crate::proto; +use crate::{proto, B3Digest}; use data_encoding::BASE64; use thiserror::Error; @@ -14,14 +14,14 @@ pub enum RenderError { #[error("failure talking to a backing store client: {0}")] StoreError(crate::Error), - #[error("unable to find directory {}, referred from {}", BASE64.encode(.0), .1)] - DirectoryNotFound(Vec<u8>, String), + #[error("unable to find directory {}, referred from {}", .0, .1)] + DirectoryNotFound(B3Digest, String), #[error("unable to find blob {}, referred from {}", BASE64.encode(.0), .1)] - BlobNotFound(Vec<u8>, String), + BlobNotFound([u8; 32], String), #[error("unexpected size in metadata for blob {}, referred from {} returned, expected {}, got {}", BASE64.encode(.0), .1, .2, .3)] - UnexpectedBlobMeta(Vec<u8>, String, u32, u32), + UnexpectedBlobMeta([u8; 32], String, u32, u32), #[error("failure using the NAR writer: {0}")] NARWriterError(std::io::Error), @@ -29,8 +29,5 @@ pub enum RenderError { /// The base trait for something calculating NARs, and returning their size and sha256. pub trait NARCalculationService { - fn calculate_nar( - &self, - root_node: &proto::node::Node, - ) -> Result<proto::CalculateNarResponse, RenderError>; + fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), RenderError>; } diff --git a/tvix/store/src/nar/non_caching_calculation_service.rs b/tvix/store/src/nar/non_caching_calculation_service.rs index ff6eb4d5d057..8a080cb4df5e 100644 --- a/tvix/store/src/nar/non_caching_calculation_service.rs +++ b/tvix/store/src/nar/non_caching_calculation_service.rs @@ -26,18 +26,12 @@ impl<BS: BlobService, DS: DirectoryService> NonCachingNARCalculationService<BS, impl<BS: BlobService, DS: DirectoryService> NARCalculationService for NonCachingNARCalculationService<BS, DS> { - fn calculate_nar( - &self, - root_node: &proto::node::Node, - ) -> Result<proto::CalculateNarResponse, RenderError> { + fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), RenderError> { let h = Sha256::new(); let mut cw = CountWrite::from(h); self.nar_renderer.write_nar(&mut cw, root_node)?; - Ok(proto::CalculateNarResponse { - nar_size: cw.count() as u64, - nar_sha256: cw.into_inner().finalize().to_vec(), - }) + Ok((cw.count(), cw.into_inner().finalize().into())) } } diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index b080a713ec0a..9b71d24ac2a9 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -4,6 +4,7 @@ use crate::{ blobservice::BlobService, directoryservice::DirectoryService, proto::{self, NamedNode}, + B3Digest, }; use data_encoding::BASE64; use nix_compat::nar; @@ -82,16 +83,12 @@ impl<BS: BlobService, DS: DirectoryService> NARRenderer<BS, DS> { .map_err(RenderError::NARWriterError)?; } proto::node::Node::Directory(proto_directory_node) => { - let digest: [u8; 32] = - proto_directory_node - .digest - .to_owned() - .try_into() - .map_err(|_e| { - RenderError::StoreError(crate::Error::StorageError( - "invalid digest len in directory node".to_string(), - )) - })?; + let digest = + B3Digest::from_vec(proto_directory_node.digest.to_vec()).map_err(|_e| { + RenderError::StoreError(crate::Error::StorageError( + "invalid digest len in directory node".to_string(), + )) + })?; // look it up with the directory service let resp = self @@ -103,7 +100,7 @@ impl<BS: BlobService, DS: DirectoryService> NARRenderer<BS, DS> { // if it's None, that's an error! None => { return Err(RenderError::DirectoryNotFound( - digest.to_vec(), + digest, proto_directory_node.name.to_owned(), )) } diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs index 35af132215e9..6d2df310137f 100644 --- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs @@ -1,6 +1,5 @@ -use crate::directoryservice::DirectoryService; use crate::proto; -use data_encoding::BASE64; +use crate::{directoryservice::DirectoryService, B3Digest}; use std::collections::HashMap; use tokio::{sync::mpsc::channel, task}; use tokio_stream::wrappers::ReceiverStream; @@ -41,13 +40,9 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static> match &req_inner.by_what { None => return Err(Status::invalid_argument("by_what needs to be specified")), Some(proto::get_directory_request::ByWhat::Digest(digest)) => { - let digest: [u8; 32] = digest - .as_slice() - .try_into() + let digest = B3Digest::from_vec(digest.to_vec()) .map_err(|_e| Status::invalid_argument("invalid digest length"))?; - let digest_b64: String = BASE64.encode(&digest); - task::spawn(async move { if !req_inner.recursive { let e: Result<proto::Directory, Status> = @@ -55,7 +50,7 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static> Ok(Some(directory)) => Ok(directory), Ok(None) => Err(Status::not_found(format!( "directory {} not found", - digest_b64 + digest ))), Err(e) => Err(e.into()), }; @@ -97,8 +92,8 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static> // This keeps track of the seen directory keys, and their size. // This is used to validate the size field of a reference to a previously sent directory. // We don't need to keep the contents around, they're stored in the DB. - let mut seen_directories_sizes: HashMap<[u8; 32], u32> = HashMap::new(); - let mut last_directory_dgst: Option<[u8; 32]> = None; + let mut seen_directories_sizes: HashMap<B3Digest, u32> = HashMap::new(); + let mut last_directory_dgst: Option<B3Digest> = None; // Consume directories, and insert them into the store. // Reject directory messages that refer to Directories not sent in the same stream. @@ -107,7 +102,7 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static> if let Err(e) = directory.validate() { return Err(Status::invalid_argument(format!( "directory {} failed validation: {}", - BASE64.encode(&directory.digest()), + directory.digest(), e, ))); } @@ -116,10 +111,7 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static> // to ensure it has been seen already in this stream, and that the size // matches what we recorded. for child_directory in &directory.directories { - let child_directory_digest: [u8; 32] = child_directory - .digest - .clone() - .try_into() + let child_directory_digest = B3Digest::from_vec(child_directory.digest.to_vec()) .map_err(|_e| Status::internal("invalid child directory digest len"))?; match seen_directories_sizes.get(&child_directory_digest) { @@ -127,8 +119,8 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static> return Err(Status::invalid_argument(format!( "child directory '{}' ({}) in directory '{}' not seen yet", child_directory.name, - BASE64.encode(&child_directory_digest), - BASE64.encode(&directory.digest()), + &child_directory_digest, + &directory.digest(), ))); } Some(seen_child_directory_size) => { @@ -136,11 +128,11 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static> return Err(Status::invalid_argument(format!( "child directory '{}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}", child_directory.name, - BASE64.encode(&child_directory_digest), - BASE64.encode(&directory.digest()), + &child_directory_digest, + &directory.digest(), seen_child_directory_size, child_directory.size, - ))); + ))); } } } @@ -154,8 +146,8 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static> // reachable from that (root) node. let dgst = directory.digest(); - seen_directories_sizes.insert(dgst, directory.size()); - last_directory_dgst = Some(dgst); + seen_directories_sizes.insert(dgst.clone(), directory.size()); + last_directory_dgst = Some(dgst.clone()); // check if the directory already exists in the database. We can skip // inserting if it's already there, as that'd be a no-op. diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 8050ce10cc54..e82557b3a06c 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -70,7 +70,10 @@ impl< match request.into_inner().node { None => Err(Status::invalid_argument("no root node sent")), Some(root_node) => match self.nar_calculation_service.calculate_nar(&root_node) { - Ok(resp) => Ok(Response::new(resp)), + Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse { + nar_size, + nar_sha256: nar_sha256.to_vec(), + })), Err(e) => Err(e.into()), }, } diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs index 528d0fb061fd..4db0b9731edc 100644 --- a/tvix/store/src/proto/mod.rs +++ b/tvix/store/src/proto/mod.rs @@ -17,6 +17,8 @@ pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper; +use crate::B3Digest; + tonic::include_proto!("tvix.store.v1"); #[cfg(feature = "reflection")] @@ -238,10 +240,15 @@ impl Directory { /// Calculates the digest of a Directory, which is the blake3 hash of a /// Directory protobuf message, serialized in protobuf canonical form. - pub fn digest(&self) -> [u8; 32] { + pub fn digest(&self) -> B3Digest { let mut hasher = blake3::Hasher::new(); - *hasher.update(&self.encode_to_vec()).finalize().as_bytes() + let vec = hasher + .update(&self.encode_to_vec()) + .finalize() + .as_bytes() + .to_vec(); + B3Digest::from_vec(vec).unwrap() } /// validate checks the directory for invalid data, such as: diff --git a/tvix/store/src/proto/tests/directory.rs b/tvix/store/src/proto/tests/directory.rs index 03572629e606..8d6ca7241d7a 100644 --- a/tvix/store/src/proto/tests/directory.rs +++ b/tvix/store/src/proto/tests/directory.rs @@ -1,4 +1,7 @@ -use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError}; +use crate::{ + proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError}, + B3Digest, +}; use lazy_static::lazy_static; lazy_static! { @@ -66,11 +69,12 @@ fn digest() { assert_eq!( d.digest(), - [ + B3Digest::from_vec(vec![ 0xaf, 0x13, 0x49, 0xb9, 0xf5, 0xf9, 0xa1, 0xa6, 0xa0, 0x40, 0x4d, 0xea, 0x36, 0xdc, 0xc9, 0x49, 0x9b, 0xcb, 0x25, 0xc9, 0xad, 0xc1, 0x12, 0xb7, 0xcc, 0x9a, 0x93, 0xca, 0xe4, 0x1f, 0x32, 0x62 - ] + ]) + .unwrap() ) } diff --git a/tvix/store/src/proto/tests/grpc_directoryservice.rs b/tvix/store/src/proto/tests/grpc_directoryservice.rs index 37428fbfa339..069e82f6463e 100644 --- a/tvix/store/src/proto/tests/grpc_directoryservice.rs +++ b/tvix/store/src/proto/tests/grpc_directoryservice.rs @@ -76,7 +76,7 @@ async fn put_get() { .into_inner(); // the sent root_digest should match the calculated digest - assert_eq!(put_resp.root_digest, DIRECTORY_A.digest()); + assert_eq!(put_resp.root_digest, DIRECTORY_A.digest().to_vec()); // get it back let items = get_directories( |