about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice/closure_validator.rs
use std::collections::{HashMap, HashSet};

use bstr::ByteSlice;

use petgraph::{
    graph::{DiGraph, NodeIndex},
    visit::Bfs,
};
use tracing::instrument;

use crate::{
    proto::{self, Directory},
    B3Digest, Error,
};

/// This can be used to validate a Directory closure (DAG of connected
/// Directories), and their insertion order.
///
/// Directories need to be inserted (via `add`), in an order from the leaves to
/// the root (DFS Post-Order).
/// During insertion, We validate as much as we can at that time:
///
///  - individual validation of Directory messages
///  - validation of insertion order (no upload of not-yet-known Directories)
///  - validation of size fields of referred Directories
///
/// Internally it keeps all received Directories in a directed graph,
/// with node weights being the Directories and edges pointing to child
/// directories.
///
/// Once all Directories have been inserted, a finalize function can be
/// called to get a (deduplicated and) validated list of directories, in
/// insertion order.
/// During finalize, a check for graph connectivity is performed too, to ensure
/// there's no disconnected components, and only one root.
#[derive(Default)]
pub struct ClosureValidator {
    // A directed graph, using Directory as node weight, without edge weights.
    // Edges point from parents to children.
    graph: DiGraph<Directory, ()>,

    // A lookup table from directory digest to node index.
    digest_to_node_ix: HashMap<B3Digest, NodeIndex>,

    /// Keeps track of the last-inserted directory graph node index.
    /// On a correct insert, this will be the root node, from which the DFS post
    /// order traversal will start from.
    last_directory_ix: Option<NodeIndex>,
}

impl ClosureValidator {
    /// Insert a new Directory into the closure.
    /// Perform individual Directory validation, validation of insertion order
    /// and size fields.
    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
    pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
        let digest = directory.digest();

        // If we already saw this node previously, it's already validated and in the graph.
        if self.digest_to_node_ix.contains_key(&digest) {
            return Ok(());
        }

        // Do some general validation
        directory
            .validate()
            .map_err(|e| Error::InvalidRequest(e.to_string()))?;

        // Ensure the directory only refers to directories which we already accepted.
        // We lookup their node indices and add them to a HashSet.
        let mut child_ixs = HashSet::new();
        for dir in &directory.directories {
            let child_digest = B3Digest::try_from(dir.digest.to_owned()).unwrap(); // validated

            // Ensure the digest has already been seen
            let child_ix = *self.digest_to_node_ix.get(&child_digest).ok_or_else(|| {
                Error::InvalidRequest(format!(
                    "'{}' refers to unseen child dir: {}",
                    dir.name.as_bstr(),
                    &child_digest
                ))
            })?;

            // Ensure the size specified in the child node matches the directory size itself.
            let recorded_child_size = self
                .graph
                .node_weight(child_ix)
                .expect("node not found")
                .size();

            // Ensure the size specified in the child node matches our records.
            if dir.size != recorded_child_size {
                return Err(Error::InvalidRequest(format!(
                    "'{}' has wrong size, specified {}, recorded {}",
                    dir.name.as_bstr(),
                    dir.size,
                    recorded_child_size
                )));
            }

            child_ixs.insert(child_ix);
        }

        // Insert node into the graph, and add edges to all children.
        let node_ix = self.graph.add_node(directory);
        for child_ix in child_ixs {
            self.graph.add_edge(node_ix, child_ix, ());
        }

        // Record the mapping from digest to node_ix in our lookup table.
        self.digest_to_node_ix.insert(digest, node_ix);

        // Update last_directory_ix.
        self.last_directory_ix = Some(node_ix);

        Ok(())
    }

    /// Ensure that all inserted Directories are connected, then return a
    /// (deduplicated) and validated list of directories, in from-leaves-to-root
    /// order.
    /// In case no elements have been inserted, returns an empty list.
    #[instrument(level = "trace", skip_all, err)]
    pub(crate) fn finalize(self) -> Result<Vec<Directory>, Error> {
        // If no nodes were inserted, an empty list is returned.
        let last_directory_ix = if let Some(x) = self.last_directory_ix {
            x
        } else {
            return Ok(vec![]);
        };

        // do a BFS traversal of the graph, starting with the root node to get
        // (the count of) all nodes reachable from there.
        let mut traversal = Bfs::new(&self.graph, last_directory_ix);

        let mut visited_directory_count = 0;
        #[cfg(debug_assertions)]
        let mut visited_directory_ixs = HashSet::new();
        #[cfg_attr(not(debug_assertions), allow(unused))]
        while let Some(directory_ix) = traversal.next(&self.graph) {
            #[cfg(debug_assertions)]
            visited_directory_ixs.insert(directory_ix);

            visited_directory_count += 1;
        }

        // If the number of nodes collected equals the total number of nodes in
        // the graph, we know all nodes are connected.
        if visited_directory_count != self.graph.node_count() {
            // more or less exhaustive error reporting.
            #[cfg(debug_assertions)]
            {
                let all_directory_ixs: HashSet<_> = self.graph.node_indices().collect();

                let unvisited_directories: HashSet<_> = all_directory_ixs
                    .difference(&visited_directory_ixs)
                    .map(|ix| self.graph.node_weight(*ix).expect("node not found"))
                    .collect();

                return Err(Error::InvalidRequest(format!(
                    "found {} disconnected directories: {:?}",
                    self.graph.node_count() - visited_directory_ixs.len(),
                    unvisited_directories
                )));
            }
            #[cfg(not(debug_assertions))]
            {
                return Err(Error::InvalidRequest(format!(
                    "found {} disconnected directories",
                    self.graph.node_count() - visited_directory_count
                )));
            }
        }

        // Dissolve the graph, returning the nodes as a Vec.
        // As the graph was populated in a valid DFS PostOrder, we can return
        // nodes in that same order.
        let (nodes, _edges) = self.graph.into_nodes_edges();
        Ok(nodes.into_iter().map(|x| x.weight).collect())
    }
}

#[cfg(test)]
mod tests {
    use crate::{
        fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C},
        proto::{self, Directory},
    };
    use lazy_static::lazy_static;
    use rstest::rstest;

    lazy_static! {
        pub static ref BROKEN_DIRECTORY : Directory = Directory {
            symlinks: vec![proto::SymlinkNode {
                name: "".into(), // invalid name!
                target: "doesntmatter".into(),
            }],
            ..Default::default()
        };

        pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory {
            directories: vec![proto::DirectoryNode {
                name: "foo".into(),
                digest: DIRECTORY_A.digest().into(),
                size: DIRECTORY_A.size() + 42, // wrong!
            }],
            ..Default::default()
        };
    }

    use super::ClosureValidator;

    #[rstest]
    /// Uploading an empty directory should succeed.
    #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))]
    /// Uploading A, then B (referring to A) should succeed.
    #[case::simple_closure(&[&*DIRECTORY_A, &*DIRECTORY_B], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_B]))]
    /// Uploading A, then A, then C (referring to A twice) should succeed.
    /// We pretend to be a dumb client not deduping directories.
    #[case::same_child(&[&*DIRECTORY_A, &*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
    /// Uploading A, then C (referring to A twice) should succeed.
    #[case::same_child_dedup(&[&*DIRECTORY_A, &*DIRECTORY_C], false, Some(vec![&*DIRECTORY_A, &*DIRECTORY_C]))]
    /// Uploading A, then C (referring to A twice), then B (itself referring to A) should fail during close,
    /// as B itself would be left unconnected.
    #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)]
    /// Uploading B (referring to A) should fail immediately, because A was never uploaded.
    #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)]
    /// Uploading a directory failing validation should fail immediately.
    #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)]
    /// Uploading a directory which refers to another Directory with a wrong size should fail.
    #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)]
    fn test_uploads(
        #[case] directories_to_upload: &[&Directory],
        #[case] exp_fail_upload_last: bool,
        #[case] exp_finalize: Option<Vec<&Directory>>, // Some(_) if finalize successful, None if not.
    ) {
        let mut dcv = ClosureValidator::default();
        let len_directories_to_upload = directories_to_upload.len();

        for (i, d) in directories_to_upload.iter().enumerate() {
            let resp = dcv.add((*d).clone());
            if i == len_directories_to_upload - 1 && exp_fail_upload_last {
                assert!(resp.is_err(), "expect last put to fail");

                // We don't really care anymore what finalize() would return, as
                // the add() failed.
                return;
            } else {
                assert!(resp.is_ok(), "expect put to succeed");
            }
        }

        // everything was uploaded successfully. Test finalize().
        let resp = dcv.finalize();

        match exp_finalize {
            Some(directories) => {
                assert_eq!(
                    Vec::from_iter(directories.iter().map(|e| (*e).to_owned())),
                    resp.expect("drain should succeed")
                );
            }
            None => {
                resp.expect_err("drain should fail");
            }
        }
    }
}