diff options
-rw-r--r-- | tvix/castore/src/directoryservice/closure_validator.rs | 2 | ||||
-rw-r--r-- | tvix/castore/src/proto/grpc_directoryservice_wrapper.rs | 106 |
2 files changed, 19 insertions, 89 deletions
diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs index ecd9acfca123..cc421d4ab5ad 100644 --- a/tvix/castore/src/directoryservice/closure_validator.rs +++ b/tvix/castore/src/directoryservice/closure_validator.rs @@ -39,7 +39,7 @@ pub struct ClosureValidator { impl ClosureValidator { /// Insert a new Directory into the closure. /// Perform individual Directory validation, validation of insertion order - // and size fields. + /// and size fields. #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)] pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> { let digest = directory.digest(); diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs index 87eca3d14988..7d741a3f07bb 100644 --- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs @@ -1,7 +1,7 @@ +use crate::directoryservice::ClosureValidator; use crate::proto; use crate::{directoryservice::DirectoryService, B3Digest}; use futures::StreamExt; -use std::collections::HashMap; use std::ops::Deref; use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; @@ -88,97 +88,27 @@ where request: Request<Streaming<proto::Directory>>, ) -> Result<Response<proto::PutDirectoryResponse>, Status> { let mut req_inner = request.into_inner(); - // TODO: let this use DirectoryPutter to the store it's connected to, - // and move the validation logic into [SimplePutter]. - - // This keeps track of the seen directory keys, and their size. - // This is used to validate the size field of a reference to a previously sent directory. - // We don't need to keep the contents around, they're stored in the DB. - // https://github.com/rust-lang/rust-clippy/issues/5812 - #[allow(clippy::mutable_key_type)] - let mut seen_directories_sizes: HashMap<B3Digest, u64> = HashMap::new(); - let mut last_directory_dgst: Option<B3Digest> = None; - - // Consume directories, and insert them into the store. - // Reject directory messages that refer to Directories not sent in the same stream. + + // We put all Directory messages we receive into ClosureValidator first. + let mut validator = ClosureValidator::default(); while let Some(directory) = req_inner.message().await? { - // validate the directory itself. - if let Err(e) = directory.validate() { - warn!(err = %e, "invalid directory"); - Err(Status::invalid_argument(format!( - "directory failed validation: {}", - e, - )))?; - } + validator.add(directory)?; + } - // for each child directory this directory refers to, we need - // to ensure it has been seen already in this stream, and that the size - // matches what we recorded. - for child_directory in &directory.directories { - let child_directory_digest: B3Digest = child_directory - .digest - .clone() - .try_into() - .map_err(|_e| Status::internal("invalid child directory digest len"))?; - - match seen_directories_sizes.get(&child_directory_digest) { - None => { - return Err(Status::invalid_argument(format!( - "child directory '{:?}' ({}) in directory '{}' not seen yet", - child_directory.name, - &child_directory_digest, - &directory.digest(), - ))); - } - Some(seen_child_directory_size) => { - if seen_child_directory_size != &child_directory.size { - return Err(Status::invalid_argument(format!( - "child directory '{:?}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}", - child_directory.name, - &child_directory_digest, - &directory.digest(), - seen_child_directory_size, - child_directory.size, - ))); - } - } - } - } + // drain, which validates connectivity too. + let directories = validator.finalize()?; - // NOTE: We can't know if a directory we're receiving actually is - // part of the closure, because we receive directories from the leaf nodes up to - // the root. - // The only thing we could to would be doing a final check when the - // last Directory was received, that all Directories received so far are - // reachable from that (root) node. - - let dgst = directory.digest(); - seen_directories_sizes.insert(dgst.clone(), directory.size()); - last_directory_dgst = Some(dgst.clone()); - - // check if the directory already exists in the database. We can skip - // inserting if it's already there, as that'd be a no-op. - match self.directory_service.get(&dgst).await { - Err(e) => { - warn!(err = %e, "failed to check if directory already exists"); - return Err(e.into()); - } - // skip if already exists - Ok(Some(_)) => {} - // insert if it doesn't already exist - Ok(None) => { - self.directory_service.put(directory).await?; - } - } + let mut directory_putter = self.directory_service.put_multiple_start(); + for directory in directories { + directory_putter.put(directory).await?; } - // We're done receiving. peek at last_directory_digest and either return the digest, - // or an error, if we received an empty stream. - match last_directory_dgst { - None => Err(Status::invalid_argument("no directories received")), - Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse { - root_digest: last_directory_dgst.into(), - })), - } + // Properly close the directory putter. Peek at last_directory_digest + // and return it, or propagate errors. + let last_directory_dgst = directory_putter.close().await?; + + Ok(Response::new(proto::PutDirectoryResponse { + root_digest: last_directory_dgst.into(), + })) } } |