about summary refs log tree commit diff
path: root/tvix
diff options
Diffstat (limited to 'tvix')
8 files changed, 640 insertions, 328 deletions
diff --git a/tvix/castore/src/directoryservice/closure_validator.rs b/tvix/castore/src/directoryservice/closure_validator.rs
deleted file mode 100644
index b9746a5a0501..000000000000
--- a/tvix/castore/src/directoryservice/closure_validator.rs
+++ /dev/null
@@ -1,309 +0,0 @@
-use std::collections::{HashMap, HashSet};
-use bstr::ByteSlice;
-use petgraph::{
-    graph::{DiGraph, NodeIndex},
-    visit::{Bfs, Walker},
-use tracing::instrument;
-use crate::{
-    proto::{self, Directory},
-    B3Digest, Error,
-type DirectoryGraph = DiGraph<Directory, ()>;
-/// This can be used to validate a Directory closure (DAG of connected
-/// Directories), and their insertion order.
-/// Directories need to be inserted (via `add`), in an order from the leaves to
-/// the root (DFS Post-Order).
-/// During insertion, We validate as much as we can at that time:
-///  - individual validation of Directory messages
-///  - validation of insertion order (no upload of not-yet-known Directories)
-///  - validation of size fields of referred Directories
-/// Internally it keeps all received Directories in a directed graph,
-/// with node weights being the Directories and edges pointing to child
-/// directories.
-/// Once all Directories have been inserted, a finalize function can be
-/// called to get a (deduplicated and) validated list of directories, in
-/// insertion order.
-/// During finalize, a check for graph connectivity is performed too, to ensure
-/// there's no disconnected components, and only one root.
-pub struct ClosureValidator {
-    // A directed graph, using Directory as node weight, without edge weights.
-    // Edges point from parents to children.
-    graph: DirectoryGraph,
-    // A lookup table from directory digest to node index.
-    digest_to_node_ix: HashMap<B3Digest, NodeIndex>,
-    /// Keeps track of the last-inserted directory graph node index.
-    /// On a correct insert, this will be the root node, from which the DFS post
-    /// order traversal will start from.
-    last_directory_ix: Option<NodeIndex>,
-impl ClosureValidator {
-    /// Insert a new Directory into the closure.
-    /// Perform individual Directory validation, validation of insertion order
-    /// and size fields.
-    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
-    pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
-        let digest = directory.digest();
-        // If we already saw this node previously, it's already validated and in the graph.
-        if self.digest_to_node_ix.contains_key(&digest) {
-            return Ok(());
-        }
-        // Do some general validation
-        directory
-            .validate()
-            .map_err(|e| Error::InvalidRequest(e.to_string()))?;
-        // Ensure the directory only refers to directories which we already accepted.
-        // We lookup their node indices and add them to a HashSet.
-        let mut child_ixs = HashSet::new();
-        for dir in &directory.directories {
-            let child_digest = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated
-            // Ensure the digest has already been seen
-            let child_ix = *self.digest_to_node_ix.get(&child_digest).ok_or_else(|| {
-                Error::InvalidRequest(format!(
-                    "'{}' refers to unseen child dir: {}",
-                    dir.name.as_bstr(),
-                    &child_digest
-                ))
-            })?;
-            // Ensure the size specified in the child node matches the directory size itself.
-            let recorded_child_size = self
-                .graph
-                .node_weight(child_ix)
-                .expect("node not found")
-                .size();
-            // Ensure the size specified in the child node matches our records.
-            if dir.size != recorded_child_size {
-                return Err(Error::InvalidRequest(format!(
-                    "'{}' has wrong size, specified {}, recorded {}",
-                    dir.name.as_bstr(),
-                    dir.size,
-                    recorded_child_size
-                )));
-            }
-            child_ixs.insert(child_ix);
-        }
-        // Insert node into the graph, and add edges to all children.
-        let node_ix = self.graph.add_node(directory);
-        for child_ix in child_ixs {
-            self.graph.add_edge(node_ix, child_ix, ());
-        }
-        // Record the mapping from digest to node_ix in our lookup table.
-        self.digest_to_node_ix.insert(digest, node_ix);
-        // Update last_directory_ix.
-        self.last_directory_ix = Some(node_ix);
-        Ok(())
-    }
-    /// Ensure that all inserted Directories are connected, then return a
-    /// (deduplicated) and validated list of directories, in from-leaves-to-root
-    /// order.
-    /// In case no elements have been inserted, returns an empty list.
-    #[instrument(level = "trace", skip_all, err)]
-    pub(crate) fn finalize(self) -> Result<Vec<Directory>, Error> {
-        let (graph, _) = match self.finalize_raw()? {
-            None => return Ok(vec![]),
-            Some(v) => v,
-        };
-        // Dissolve the graph, returning the nodes as a Vec.
-        // As the graph was populated in a valid DFS PostOrder, we can return
-        // nodes in that same order.
-        let (nodes, _edges) = graph.into_nodes_edges();
-        Ok(nodes.into_iter().map(|x| x.weight).collect())
-    }
-    /// Ensure that all inserted Directories are connected, then return a
-    /// (deduplicated) and validated list of directories, in from-root-to-leaves
-    /// order.
-    /// In case no elements have been inserted, returns an empty list.
-    #[instrument(level = "trace", skip_all, err)]
-    pub(crate) fn finalize_root_to_leaves(self) -> Result<Vec<Directory>, Error> {
-        let (graph, root) = match self.finalize_raw()? {
-            None => return Ok(vec![]),
-            Some(v) => v,
-        };
-        // do a BFS traversal of the graph, starting with the root node to get
-        // all nodes reachable from there.
-        let traversal = Bfs::new(&graph, root);
-        let order = traversal.iter(&graph).collect::<Vec<_>>();
-        let (nodes, _edges) = graph.into_nodes_edges();
-        // Convert to option, so that we can take individual nodes out without messing up the
-        // indices
-        let mut nodes = nodes.into_iter().map(Some).collect::<Vec<_>>();
-        Ok(order
-            .iter()
-            .map(|i| nodes[i.index()].take().unwrap().weight)
-            .collect())
-    }
-    /// Internal implementation of closure validation
-    #[instrument(level = "trace", skip_all, err)]
-    fn finalize_raw(self) -> Result<Option<(DirectoryGraph, NodeIndex)>, Error> {
-        // If no nodes were inserted, an empty list is returned.
-        let last_directory_ix = if let Some(x) = self.last_directory_ix {
-            x
-        } else {
-            return Ok(None);
-        };
-        // do a BFS traversal of the graph, starting with the root node to get
-        // (the count of) all nodes reachable from there.
-        let mut traversal = Bfs::new(&self.graph, last_directory_ix);
-        let mut visited_directory_count = 0;
-        #[cfg(debug_assertions)]
-        let mut visited_directory_ixs = HashSet::new();
-        #[cfg_attr(not(debug_assertions), allow(unused))]
-        while let Some(directory_ix) = traversal.next(&self.graph) {
-            #[cfg(debug_assertions)]
-            visited_directory_ixs.insert(directory_ix);
-            visited_directory_count += 1;
-        }
-        // If the number of nodes collected equals the total number of nodes in
-        // the graph, we know all nodes are connected.
-        if visited_directory_count != self.graph.node_count() {
-            // more or less exhaustive error reporting.
-            #[cfg(debug_assertions)]
-            {
-                let all_directory_ixs: HashSet<_> = self.graph.node_indices().collect();
-                let unvisited_directories: HashSet<_> = all_directory_ixs
-                    .difference(&visited_directory_ixs)
-                    .map(|ix| self.graph.node_weight(*ix).expect("node not found"))
-                    .collect();
-                return Err(Error::InvalidRequest(format!(
-                    "found {} disconnected directories: {:?}",
-                    self.graph.node_count() - visited_directory_ixs.len(),
-                    unvisited_directories
-                )));
-            }
-            #[cfg(not(debug_assertions))]
-            {
-                return Err(Error::InvalidRequest(format!(
-                    "found {} disconnected directories",
-                    self.graph.node_count() - visited_directory_count
-                )));
-            }
-        }
-        Ok(Some((self.graph, last_directory_ix)))
-    }
-mod tests {
-    use crate::{
-        proto::{self, Directory},
-    };
-    use lazy_static::lazy_static;
-    use rstest::rstest;
-    lazy_static! {
-        pub static ref BROKEN_DIRECTORY : Directory = Directory {
-            symlinks: vec![proto::SymlinkNode {
-                name: "".into(), // invalid name!
-                target: "doesntmatter".into(),
-            }],
-            ..Default::default()
-        };
-        pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory {
-            directories: vec![proto::DirectoryNode {
-                name: "foo".into(),
-                digest: DIRECTORY_A.digest().into(),
-                size: DIRECTORY_A.size() + 42, // wrong!
-            }],
-            ..Default::default()
-        };
-    }
-    use super::ClosureValidator;
-    #[rstest]
-    /// Uploading an empty directory should succeed.
-    #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))]
-    /// Uploading A, then B (referring to A) should succeed.
-    #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))]
-    /// Uploading A, then A, then C (referring to A twice) should succeed.
-    /// We pretend to be a dumb client not deduping directories.
-    #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
-    /// Uploading A, then C (referring to A twice) should succeed.
-    #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
-    /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close,
-    /// as B itself would be left unconnected.
-    #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)]
-    /// Uploading B (referring to A) should fail immediately, because A was never uploaded.
-    #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)]
-    /// Uploading a directory failing validation should fail immediately.
-    #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)]
-    /// Uploading a directory which refers to another Directory with a wrong size should fail.
-    #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)]
-    fn test_uploads(
-        #[case] directories_to_upload: &[&Directory],
-        #[case] exp_fail_upload_last: bool,
-        #[case] exp_finalize: Option<Vec<&Directory>>, // Some(_) if finalize successful, None if not.
-    ) {
-        let mut dcv = ClosureValidator::default();
-        let len_directories_to_upload = directories_to_upload.len();
-        for (i, d) in directories_to_upload.iter().enumerate() {
-            let resp = dcv.add((*d).clone());
-            if i == len_directories_to_upload - 1 && exp_fail_upload_last {
-                assert!(resp.is_err(), "expect last put to fail");
-                // We don't really care anymore what finalize() would return, as
-                // the add() failed.
-                return;
-            } else {
-                assert!(resp.is_ok(), "expect put to succeed");
-            }
-        }
-        // everything was uploaded successfully. Test finalize().
-        let resp = dcv.finalize();
-        match exp_finalize {
-            Some(directories) => {
-                assert_eq!(
-                    Vec::from_iter(directories.iter().map(|e| (*e).to_owned())),
-                    resp.expect("drain should succeed")
-                );
-            }
-            None => {
-                resp.expect_err("drain should fail");
-            }
-        }
-    }
diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs
new file mode 100644
index 000000000000..e6b9b163370c
--- /dev/null
+++ b/tvix/castore/src/directoryservice/directory_graph.rs
@@ -0,0 +1,413 @@
+use std::collections::HashMap;
+use bstr::ByteSlice;
+use petgraph::{
+    graph::{DiGraph, NodeIndex},
+    visit::{Bfs, DfsPostOrder, EdgeRef, IntoNodeIdentifiers, Walker},
+    Direction, Incoming,
+use tracing::instrument;
+use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
+use crate::{
+    proto::{self, Directory, DirectoryNode},
+    B3Digest,
+#[derive(thiserror::Error, Debug)]
+pub enum Error {
+    #[error("{0}")]
+    ValidationError(String),
+/// This can be used to validate and/or re-order a Directory closure (DAG of
+/// connected Directories), and their insertion order.
+/// The DirectoryGraph is parametrized on the insertion order, and can be
+/// constructed using the Default trait, or using `with_order` if the
+/// OrderValidator needs to be customized.
+/// If the user is receiving directories from canonical protobuf encoding in
+/// root-to-leaves order, and parsing them, she can call `digest_allowed`
+/// _before_ parsing the protobuf record and then add it with `add_unchecked`.
+/// All other users insert the directories via `add`, in their specified order.
+/// During insertion, we validate as much as we can at that time:
+///  - individual validation of Directory messages
+///  - validation of insertion order
+///  - validation of size fields of referred Directories
+/// Internally it keeps all received Directories in a directed graph,
+/// with node weights being the Directories and edges pointing to child/parent
+/// directories.
+/// Once all Directories have been inserted, a validate function can be
+/// called to perform a check for graph connectivity and ensure there's no
+/// disconnected components or missing nodes.
+/// Finally, the `drain_leaves_to_root` or `drain_root_to_leaves` can be
+/// _chained_ on validate to get an iterator over the (deduplicated and)
+/// validated list of directories in either order.
+pub struct DirectoryGraph<O> {
+    // A directed graph, using Directory as node weight.
+    // Edges point from parents to children.
+    //
+    // Nodes with None weigths might exist when a digest has been referred to but the directory
+    // with this digest has not yet been sent.
+    //
+    // The option in the edge weight tracks the pending validation state of the respective edge, for example if
+    // the child has not been added yet.
+    graph: DiGraph<Option<Directory>, Option<DirectoryNode>>,
+    // A lookup table from directory digest to node index.
+    digest_to_node_ix: HashMap<B3Digest, NodeIndex>,
+    order_validator: O,
+pub struct ValidatedDirectoryGraph {
+    graph: DiGraph<Option<Directory>, Option<DirectoryNode>>,
+    root: Option<NodeIndex>,
+fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> {
+    // Ensure the size specified in the child node matches our records.
+    if dir.size != child.size() {
+        return Err(Error::ValidationError(format!(
+            "'{}' has wrong size, specified {}, recorded {}",
+            dir.name.as_bstr(),
+            dir.size,
+            child.size(),
+        )));
+    }
+    Ok(())
+impl DirectoryGraph<LeavesToRootValidator> {
+    /// Insert a new Directory into the closure
+    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
+    pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
+        if !self.order_validator.add_directory(&directory) {
+            return Err(Error::ValidationError(
+                "unknown directory was referenced".into(),
+            ));
+        }
+        self.add_order_unchecked(directory)
+    }
+impl DirectoryGraph<RootToLeavesValidator> {
+    /// If the user is parsing directories from canonical protobuf encoding, she can
+    /// call `digest_allowed` _before_ parsing the protobuf record and then add it
+    /// with `add_unchecked`.
+    pub fn digest_allowed(&self, digest: B3Digest) -> bool {
+        self.order_validator.digest_allowed(&digest)
+    }
+    /// Insert a new Directory into the closure
+    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
+    pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
+        let digest = directory.digest();
+        if !self.order_validator.digest_allowed(&digest) {
+            return Err(Error::ValidationError("unexpected digest".into()));
+        }
+        self.order_validator.add_directory_unchecked(&directory);
+        self.add_order_unchecked(directory)
+    }
+impl<O: OrderValidator> DirectoryGraph<O> {
+    /// Customize the ordering, i.e. for pre-setting the root of the RootToLeavesValidator
+    pub fn with_order(order_validator: O) -> Self {
+        Self {
+            graph: Default::default(),
+            digest_to_node_ix: Default::default(),
+            order_validator,
+        }
+    }
+    /// Adds a directory which has already been confirmed to be in-order to the graph
+    pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> {
+        // Do some basic validation
+        directory
+            .validate()
+            .map_err(|e| Error::ValidationError(e.to_string()))?;
+        let digest = directory.digest();
+        // Teach the graph about the existence of a node with this digest
+        let ix = *self
+            .digest_to_node_ix
+            .entry(digest)
+            .or_insert_with(|| self.graph.add_node(None));
+        if self.graph[ix].is_some() {
+            // The node is already in the graph, there is nothing to do here.
+            return Ok(());
+        }
+        // set up edges to all child directories
+        for subdir in &directory.directories {
+            let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap();
+            let child_ix = *self
+                .digest_to_node_ix
+                .entry(subdir_digest)
+                .or_insert_with(|| self.graph.add_node(None));
+            let pending_edge_check = match &self.graph[child_ix] {
+                Some(child) => {
+                    // child is already available, validate the edge now
+                    check_edge(subdir, child)?;
+                    None
+                }
+                None => Some(subdir.clone()), // pending validation
+            };
+            self.graph.add_edge(ix, child_ix, pending_edge_check);
+        }
+        // validate the edges from parents to this node
+        // this collects edge ids in a Vec because there is no edges_directed_mut :'c
+        for edge_id in self
+            .graph
+            .edges_directed(ix, Direction::Incoming)
+            .map(|edge_ref| edge_ref.id())
+            .collect::<Vec<_>>()
+            .into_iter()
+        {
+            let edge_weight = self
+                .graph
+                .edge_weight_mut(edge_id)
+                .expect("edge not found")
+                .take()
+                .expect("edge is already validated");
+            check_edge(&edge_weight, &directory)?;
+        }
+        // finally, store the directory information in the node weight
+        self.graph[ix] = Some(directory);
+        Ok(())
+    }
+    #[instrument(level = "trace", skip_all, err)]
+    pub fn validate(self) -> Result<ValidatedDirectoryGraph, Error> {
+        // find all initial nodes (nodes without incoming edges)
+        let mut roots = self
+            .graph
+            .node_identifiers()
+            .filter(|&a| self.graph.neighbors_directed(a, Incoming).next().is_none());
+        let root = roots.next();
+        if roots.next().is_some() {
+            return Err(Error::ValidationError(
+                "graph has disconnected roots".into(),
+            ));
+        }
+        // test that the graph is complete
+        if self.graph.raw_nodes().iter().any(|n| n.weight.is_none()) {
+            return Err(Error::ValidationError("graph is incomplete".into()));
+        }
+        Ok(ValidatedDirectoryGraph {
+            graph: self.graph,
+            root,
+        })
+    }
+impl ValidatedDirectoryGraph {
+    /// Return the list of directories in from-root-to-leaves order.
+    /// In case no elements have been inserted, returns an empty list.
+    ///
+    /// panics if the specified root is not in the graph
+    #[instrument(level = "trace", skip_all)]
+    pub fn drain_root_to_leaves(self) -> impl Iterator<Item = Directory> {
+        let order = match self.root {
+            Some(root) => {
+                // do a BFS traversal of the graph, starting with the root node
+                Bfs::new(&self.graph, root)
+                    .iter(&self.graph)
+                    .collect::<Vec<_>>()
+            }
+            None => vec![], // No nodes have been inserted, do not traverse
+        };
+        let (mut nodes, _edges) = self.graph.into_nodes_edges();
+        order
+            .into_iter()
+            .filter_map(move |i| nodes[i.index()].weight.take())
+    }
+    /// Return the list of directories in from-leaves-to-root order.
+    /// In case no elements have been inserted, returns an empty list.
+    ///
+    /// panics when the specified root is not in the graph
+    #[instrument(level = "trace", skip_all)]
+    pub fn drain_leaves_to_root(self) -> impl Iterator<Item = Directory> {
+        let order = match self.root {
+            Some(root) => {
+                // do a DFS Post-Order traversal of the graph, starting with the root node
+                DfsPostOrder::new(&self.graph, root)
+                    .iter(&self.graph)
+                    .collect::<Vec<_>>()
+            }
+            None => vec![], // No nodes have been inserted, do not traverse
+        };
+        let (mut nodes, _edges) = self.graph.into_nodes_edges();
+        order
+            .into_iter()
+            .filter_map(move |i| nodes[i.index()].weight.take())
+    }
+mod tests {
+    use crate::{
+        proto::{self, Directory},
+    };
+    use lazy_static::lazy_static;
+    use rstest::rstest;
+    lazy_static! {
+        pub static ref BROKEN_DIRECTORY : Directory = Directory {
+            symlinks: vec![proto::SymlinkNode {
+                name: "".into(), // invalid name!
+                target: "doesntmatter".into(),
+            }],
+            ..Default::default()
+        };
+        pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory {
+            directories: vec![proto::DirectoryNode {
+                name: "foo".into(),
+                digest: DIRECTORY_A.digest().into(),
+                size: DIRECTORY_A.size() + 42, // wrong!
+            }],
+            ..Default::default()
+        };
+    }
+    use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator};
+    #[rstest]
+    /// Uploading an empty directory should succeed.
+    #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))]
+    /// Uploading A, then B (referring to A) should succeed.
+    #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))]
+    /// Uploading A, then A, then C (referring to A twice) should succeed.
+    /// We pretend to be a dumb client not deduping directories.
+    #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
+    /// Uploading A, then C (referring to A twice) should succeed.
+    #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
+    /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close,
+    /// as B itself would be left unconnected.
+    #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)]
+    /// Uploading B (referring to A) should fail immediately, because A was never uploaded.
+    #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)]
+    /// Uploading a directory failing validation should fail immediately.
+    #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)]
+    /// Uploading a directory which refers to another Directory with a wrong size should fail.
+    #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)]
+    fn test_uploads(
+        #[case] directories_to_upload: &[&Directory],
+        #[case] exp_fail_upload_last: bool,
+        #[case] exp_finalize: Option<Vec<&Directory>>, // Some(_) if finalize successful, None if not.
+    ) {
+        let mut dcv = DirectoryGraph::<LeavesToRootValidator>::default();
+        let len_directories_to_upload = directories_to_upload.len();
+        for (i, d) in directories_to_upload.iter().enumerate() {
+            let resp = dcv.add((*d).clone());
+            if i == len_directories_to_upload - 1 && exp_fail_upload_last {
+                assert!(resp.is_err(), "expect last put to fail");
+                // We don't really care anymore what finalize() would return, as
+                // the add() failed.
+                return;
+            } else {
+                assert!(resp.is_ok(), "expect put to succeed");
+            }
+        }
+        // everything was uploaded successfully. Test finalize().
+        let resp = dcv
+            .validate()
+            .map(|validated| validated.drain_leaves_to_root().collect::<Vec<_>>());
+        match exp_finalize {
+            Some(directories) => {
+                assert_eq!(
+                    Vec::from_iter(directories.iter().map(|e| (*e).to_owned())),
+                    resp.expect("drain should succeed")
+                );
+            }
+            None => {
+                resp.expect_err("drain should fail");
+            }
+        }
+    }
+    #[rstest]
+    /// Downloading an empty directory should succeed.
+    #[case::empty_directory(&*DIRECTORY_A, &[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))]
+    /// Downlading B, then A (referenced by B) should succeed.
+    #[case::simple_closure(&*DIRECTORY_B, &[&*DIRECTORY_B, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))]
+    /// Downloading C (referring to A twice), then A should succeed.
+    #[case::same_child_dedup(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
+    /// Downloading C, then B (both referring to A but not referring to each other) should fail immediately as B has no connection to C (the root)
+    #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)]
+    /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root).
+    #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)]
+    /// Downloading a directory failing validation should fail immediately.
+    #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)]
+    /// Downloading a directory which refers to another Directory with a wrong size should fail.
+    #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)]
+    fn test_downloads(
+        #[case] root: &Directory,
+        #[case] directories_to_upload: &[&Directory],
+        #[case] exp_fail_upload_last: bool,
+        #[case] exp_finalize: Option<Vec<&Directory>>, // Some(_) if finalize successful, None if not.
+    ) {
+        let mut dcv =
+            DirectoryGraph::with_order(RootToLeavesValidator::new_with_root_digest(root.digest()));
+        let len_directories_to_upload = directories_to_upload.len();
+        for (i, d) in directories_to_upload.iter().enumerate() {
+            let resp = dcv.add((*d).clone());
+            if i == len_directories_to_upload - 1 && exp_fail_upload_last {
+                assert!(resp.is_err(), "expect last put to fail");
+                // We don't really care anymore what finalize() would return, as
+                // the add() failed.
+                return;
+            } else {
+                assert!(resp.is_ok(), "expect put to succeed");
+            }
+        }
+        // everything was uploaded successfully. Test finalize().
+        let resp = dcv
+            .validate()
+            .map(|validated| validated.drain_leaves_to_root().collect::<Vec<_>>());
+        match exp_finalize {
+            Some(directories) => {
+                assert_eq!(
+                    Vec::from_iter(directories.iter().map(|e| (*e).to_owned())),
+                    resp.expect("drain should succeed")
+                );
+            }
+            None => {
+                resp.expect_err("drain should fail");
+            }
+        }
+    }
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
index 3f180ef162d8..0d717a8a3f74 100644
--- a/tvix/castore/src/directoryservice/mod.rs
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -2,11 +2,12 @@ use crate::{proto, B3Digest, Error};
 use futures::stream::BoxStream;
 use tonic::async_trait;
-mod closure_validator;
+mod directory_graph;
 mod from_addr;
 mod grpc;
 mod memory;
 mod object_store;
+mod order_validator;
 mod simple_putter;
 mod sled;
@@ -14,11 +15,12 @@ pub mod tests;
 mod traverse;
 mod utils;
-pub use self::closure_validator::ClosureValidator;
+pub use self::directory_graph::DirectoryGraph;
 pub use self::from_addr::from_addr;
 pub use self::grpc::GRPCDirectoryService;
 pub use self::memory::MemoryDirectoryService;
 pub use self::object_store::ObjectStoreDirectoryService;
+pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
 pub use self::simple_putter::SimplePutter;
 pub use self::sled::SledDirectoryService;
 pub use self::traverse::descend_to;
diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs
index 64ce335edb86..90e53f9286a0 100644
--- a/tvix/castore/src/directoryservice/object_store.rs
+++ b/tvix/castore/src/directoryservice/object_store.rs
@@ -16,7 +16,7 @@ use tonic::async_trait;
 use tracing::{instrument, trace, warn, Level};
 use url::Url;
-use super::{ClosureValidator, DirectoryPutter, DirectoryService};
+use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator};
 use crate::{proto, B3Digest, Error};
 /// Stores directory closures in an object store.
@@ -177,7 +177,7 @@ struct ObjectStoreDirectoryPutter {
     object_store: Arc<dyn ObjectStore>,
     base_path: Path,
-    directory_validator: Option<ClosureValidator>,
+    directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>,
 impl ObjectStoreDirectoryPutter {
@@ -197,7 +197,9 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
-                validator.add(directory)?;
+                validator
+                    .add(directory)
+                    .map_err(|e| Error::StorageError(e.to_string()))?;
@@ -214,7 +216,11 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter {
         // retrieve the validated directories.
         // It is important that they are in topological order (root first),
         // since that's how we want to retrieve them from the object store in the end.
-        let directories = validator.finalize_root_to_leaves()?;
+        let directories = validator
+            .validate()
+            .map_err(|e| Error::StorageError(e.to_string()))?
+            .drain_root_to_leaves()
+            .collect::<Vec<_>>();
         // Get the root digest
         let root_digest = directories
diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs
new file mode 100644
index 000000000000..6045f5d24198
--- /dev/null
+++ b/tvix/castore/src/directoryservice/order_validator.rs
@@ -0,0 +1,181 @@
+use std::collections::HashSet;
+use tracing::warn;
+use crate::{proto::Directory, B3Digest};
+pub trait OrderValidator {
+    /// Update the order validator's state with the directory
+    /// Returns whether the directory was accepted
+    fn add_directory(&mut self, directory: &Directory) -> bool;
+/// Validates that newly introduced directories are already referenced from
+/// the root via existing directories.
+/// Commonly used when _receiving_ a directory closure _from_ a store.
+pub struct RootToLeavesValidator {
+    /// Only used to remember the root node, not for validation
+    expected_digests: HashSet<B3Digest>,
+impl RootToLeavesValidator {
+    /// Use to validate the root digest of the closure upon receiving the first
+    /// directory.
+    pub fn new_with_root_digest(root_digest: B3Digest) -> Self {
+        let mut this = Self::default();
+        this.expected_digests.insert(root_digest);
+        this
+    }
+    /// Checks if a directory is in-order based on its digest.
+    ///
+    /// Particularly useful when receiving directories in canonical protobuf
+    /// encoding, so that directories not connected to the root can be rejected
+    /// without parsing.
+    ///
+    /// After parsing, the directory must be passed to `add_directory_unchecked`
+    /// to add its children to the list of expected digests.
+    pub fn digest_allowed(&self, digest: &B3Digest) -> bool {
+        self.expected_digests.is_empty() // we don't know the root node; allow any
+            || self.expected_digests.contains(digest)
+    }
+    /// Update the order validator's state with the directory
+    pub fn add_directory_unchecked(&mut self, directory: &Directory) {
+        // No initial root was specified and this is the first directory
+        if self.expected_digests.is_empty() {
+            self.expected_digests.insert(directory.digest());
+        }
+        for subdir in &directory.directories {
+            // Allow the children to appear next
+            let subdir_digest = subdir.digest.clone().try_into().unwrap();
+            self.expected_digests.insert(subdir_digest);
+        }
+    }
+impl OrderValidator for RootToLeavesValidator {
+    fn add_directory(&mut self, directory: &Directory) -> bool {
+        if !self.digest_allowed(&directory.digest()) {
+            return false;
+        }
+        self.add_directory_unchecked(directory);
+        true
+    }
+/// Validates that newly uploaded directories only reference directories which
+/// have already been introduced.
+/// Commonly used when _uploading_ a directory closure _to_ a store.
+pub struct LeavesToRootValidator {
+    /// This is empty in the beginning, and gets filled as leaves and intermediates are
+    /// inserted
+    allowed_references: HashSet<B3Digest>,
+impl OrderValidator for LeavesToRootValidator {
+    fn add_directory(&mut self, directory: &Directory) -> bool {
+        let digest = directory.digest();
+        for subdir in &directory.directories {
+            let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory()
+            if !self.allowed_references.contains(&subdir_digest) {
+                warn!(
+                    directory.digest = %digest,
+                    subdirectory.digest = %subdir_digest,
+                    "unexpected directory reference"
+                );
+                return false;
+            }
+        }
+        self.allowed_references.insert(digest.clone());
+        true
+    }
+mod tests {
+    use super::{LeavesToRootValidator, RootToLeavesValidator};
+    use crate::directoryservice::order_validator::OrderValidator;
+    use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
+    use crate::proto::Directory;
+    use rstest::rstest;
+    #[rstest]
+    /// Uploading an empty directory should succeed.
+    #[case::empty_directory(&[&*DIRECTORY_A], false)]
+    /// Uploading A, then B (referring to A) should succeed.
+    #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false)]
+    /// Uploading A, then A, then C (referring to A twice) should succeed.
+    /// We pretend to be a dumb client not deduping directories.
+    #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false)]
+    /// Uploading A, then C (referring to A twice) should succeed.
+    #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false)]
+    /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close,
+    /// as B itself would be left unconnected.
+    #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false)]
+    /// Uploading B (referring to A) should fail immediately, because A was never uploaded.
+    #[case::dangling_pointer(&[&*DIRECTORY_B], true)]
+    fn leaves_to_root(
+        #[case] directories_to_upload: &[&Directory],
+        #[case] exp_fail_upload_last: bool,
+    ) {
+        let mut validator = LeavesToRootValidator::default();
+        let len_directories_to_upload = directories_to_upload.len();
+        for (i, d) in directories_to_upload.iter().enumerate() {
+            let resp = validator.add_directory(d);
+            if i == len_directories_to_upload - 1 && exp_fail_upload_last {
+                assert!(!resp, "expect last put to fail");
+                // We don't really care anymore what finalize() would return, as
+                // the add() failed.
+                return;
+            } else {
+                assert!(resp, "expect put to succeed");
+            }
+        }
+    }
+    #[rstest]
+    /// Downloading an empty directory should succeed.
+    #[case::empty_directory(&*DIRECTORY_A, &[&*DIRECTORY_A], false)]
+    /// Downlading B, then A (referenced by B) should succeed.
+    #[case::simple_closure(&*DIRECTORY_B, &[&*DIRECTORY_B, &*DIRECTORY_A], false)]
+    /// Downloading C (referring to A twice), then A should succeed.
+    #[case::same_child_dedup(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_A], false)]
+    /// Downloading C, then B (both referring to A but not referring to each other) should fail immediately as B has no connection to C (the root)
+    #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true)]
+    /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root).
+    #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true)]
+    fn root_to_leaves(
+        #[case] root: &Directory,
+        #[case] directories_to_upload: &[&Directory],
+        #[case] exp_fail_upload_last: bool,
+    ) {
+        let mut validator = RootToLeavesValidator::new_with_root_digest(root.digest());
+        let len_directories_to_upload = directories_to_upload.len();
+        for (i, d) in directories_to_upload.iter().enumerate() {
+            let resp1 = validator.digest_allowed(&d.digest());
+            let resp = validator.add_directory(d);
+            assert_eq!(
+                resp1, resp,
+                "digest_allowed should return the same value as add_directory"
+            );
+            if i == len_directories_to_upload - 1 && exp_fail_upload_last {
+                assert!(!resp, "expect last put to fail");
+                // We don't really care anymore what finalize() would return, as
+                // the add() failed.
+                return;
+            } else {
+                assert!(resp, "expect put to succeed");
+            }
+        }
+    }
diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs
index 25617ebcac82..dc54e3d11d18 100644
--- a/tvix/castore/src/directoryservice/simple_putter.rs
+++ b/tvix/castore/src/directoryservice/simple_putter.rs
@@ -1,6 +1,6 @@
-use super::ClosureValidator;
 use super::DirectoryPutter;
 use super::DirectoryService;
+use super::{DirectoryGraph, LeavesToRootValidator};
 use crate::proto;
 use crate::B3Digest;
 use crate::Error;
@@ -14,7 +14,7 @@ use tracing::warn;
 pub struct SimplePutter<DS: DirectoryService> {
     directory_service: DS,
-    directory_validator: Option<ClosureValidator>,
+    directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>,
 impl<DS: DirectoryService> SimplePutter<DS> {
@@ -33,7 +33,9 @@ impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
-                validator.add(directory)?;
+                validator
+                    .add(directory)
+                    .map_err(|e| Error::StorageError(e.to_string()))?;
@@ -46,7 +48,11 @@ impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
             None => Err(Error::InvalidRequest("already closed".to_string())),
             Some(validator) => {
                 // retrieve the validated directories.
-                let directories = validator.finalize()?;
+                let directories = validator
+                    .validate()
+                    .map_err(|e| Error::StorageError(e.to_string()))?
+                    .drain_leaves_to_root()
+                    .collect::<Vec<_>>();
                 // Get the root digest, which is at the end (cf. insertion order)
                 let root_digest = directories
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs
index 9490a49c00ae..bd98ed6b1e01 100644
--- a/tvix/castore/src/directoryservice/sled.rs
+++ b/tvix/castore/src/directoryservice/sled.rs
@@ -8,7 +8,7 @@ use tonic::async_trait;
 use tracing::{instrument, warn};
 use super::utils::traverse_directory;
-use super::{ClosureValidator, DirectoryPutter, DirectoryService};
+use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator};
 pub struct SledDirectoryService {
@@ -135,7 +135,7 @@ pub struct SledDirectoryPutter {
     /// The directories (inside the directory validator) that we insert later,
     /// or None, if they were already inserted.
-    directory_validator: Option<ClosureValidator>,
+    directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>,
@@ -145,7 +145,9 @@ impl DirectoryPutter for SledDirectoryPutter {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
-                validator.add(directory)?;
+                validator
+                    .add(directory)
+                    .map_err(|e| Error::StorageError(e.to_string()))?;
@@ -162,7 +164,11 @@ impl DirectoryPutter for SledDirectoryPutter {
                     let tree = self.tree.clone();
                     move || {
                         // retrieve the validated directories.
-                        let directories = validator.finalize()?;
+                        let directories = validator
+                            .validate()
+                            .map_err(|e| Error::StorageError(e.to_string()))?
+                            .drain_leaves_to_root()
+                            .collect::<Vec<_>>();
                         // Get the root digest, which is at the end (cf. insertion order)
                         let root_digest = directories
diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
index 5c1428690cb4..ce1d2bcd244a 100644
--- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
+++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
@@ -1,4 +1,5 @@
-use crate::directoryservice::ClosureValidator;
+use crate::directoryservice::DirectoryGraph;
+use crate::directoryservice::LeavesToRootValidator;
 use crate::proto;
 use crate::{directoryservice::DirectoryService, B3Digest};
 use futures::stream::BoxStream;
@@ -78,14 +79,20 @@ where
     ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
         let mut req_inner = request.into_inner();
-        // We put all Directory messages we receive into ClosureValidator first.
-        let mut validator = ClosureValidator::default();
+        // We put all Directory messages we receive into DirectoryGraph.
+        let mut validator = DirectoryGraph::<LeavesToRootValidator>::default();
         while let Some(directory) = req_inner.message().await? {
-            validator.add(directory)?;
+            validator
+                .add(directory)
+                .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
         // drain, which validates connectivity too.
-        let directories = validator.finalize()?;
+        let directories = validator
+            .validate()
+            .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?
+            .drain_leaves_to_root()
+            .collect::<Vec<_>>();
         let mut directory_putter = self.directory_service.put_multiple_start();
         for directory in directories {