about summary refs log tree commit diff
path: root/tvix/store/src/directoryservice/mod.rs
blob: d7b143df3efe85a1bfe30c12e348b1da62e94ee2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use std::collections::{HashSet, VecDeque};

use tracing::{debug_span, instrument, warn};

use crate::{proto, Error};
mod grpc;
mod memory;
mod sled;

pub use self::grpc::GRPCDirectoryService;
pub use self::memory::MemoryDirectoryService;
pub use self::sled::SledDirectoryService;

/// The base trait all Directory services need to implement.
/// This is a simple get and put of [crate::proto::Directory], returning their
/// digest.
pub trait DirectoryService {
    type DirectoriesIterator: Iterator<Item = Result<proto::Directory, Error>> + Send;

    /// Get looks up a single Directory message by its digest.
    /// In case the directory is not found, Ok(None) is returned.
    fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error>;
    /// Get uploads a single Directory message, and returns the calculated
    /// digest, or an error.
    fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error>;

    /// Looks up a closure of [proto::Directory].
    /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`,
    /// and we'd be able to add a default implementation for it here, but
    /// we can't have that yet.
    fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator;
}

/// Traverses a [proto::Directory] from the root to the children.
///
/// This is mostly BFS, but directories are only returned once.
pub struct DirectoryTraverser<DS: DirectoryService> {
    directory_service: DS,
    /// The list of all directories that still need to be traversed. The next
    /// element is picked from the front, new elements are enqueued at the
    /// back.
    worklist_directory_digests: VecDeque<[u8; 32]>,
    /// The list of directory digests already sent to the consumer.
    /// We omit sending the same directories multiple times.
    sent_directory_digests: HashSet<[u8; 32]>,
}

impl<DS: DirectoryService> DirectoryTraverser<DS> {
    pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self {
        Self {
            directory_service,
            worklist_directory_digests: VecDeque::from([*root_directory_digest]),
            sent_directory_digests: HashSet::new(),
        }
    }

    // enqueue all child directory digests to the work queue, as
    // long as they're not part of the worklist or already sent.
    // This panics if the digest looks invalid, it's supposed to be checked first.
    fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
        for child_directory_node in &directory.directories {
            let child_digest: [u8; 32] = child_directory_node
                .digest
                .as_slice()
                .try_into()
                .map_err(|_e| Error::StorageError("invalid digest length".to_string()))
                .unwrap();

            if self.worklist_directory_digests.contains(&child_digest)
                || self.sent_directory_digests.contains(&child_digest)
            {
                continue;
            }
            self.worklist_directory_digests.push_back(child_digest);
        }
    }
}

impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
    type Item = Result<proto::Directory, Error>;

    #[instrument(skip_all)]
    fn next(&mut self) -> Option<Self::Item> {
        // fetch the next directory digest from the top of the work queue.
        match self.worklist_directory_digests.pop_front() {
            None => None,
            Some(current_directory_digest) => {
                let current_directory_b64 = data_encoding::BASE64.encode(&current_directory_digest);
                let span = debug_span!("directory.digest", current_directory_b64);
                let _ = span.enter();

                // look up the directory itself.
                let current_directory = match self.directory_service.get(&current_directory_digest)
                {
                    // if we got it
                    Ok(Some(current_directory)) => {
                        // validate, we don't want to send invalid directories.
                        if let Err(e) = current_directory.validate() {
                            warn!("directory failed validation: {}", e.to_string());
                            return Some(Err(Error::StorageError(format!(
                                "invalid directory: {}",
                                current_directory_b64
                            ))));
                        }
                        current_directory
                    }
                    // if it's not there, we have an inconsistent store!
                    Ok(None) => {
                        warn!("directory {} does not exist", current_directory_b64);
                        return Some(Err(Error::StorageError(format!(
                            "directory {} does not exist",
                            current_directory_b64
                        ))));
                    }
                    Err(e) => {
                        warn!("failed to look up directory");
                        return Some(Err(Error::StorageError(format!(
                            "unable to look up directory {}: {}",
                            current_directory_b64, e
                        ))));
                    }
                };

                // All DirectoryServices MUST validate directory nodes, before returning them out, so we
                // can be sure [enqueue_child_directories] doesn't panic.

                // enqueue child directories
                self.enqueue_child_directories(&current_directory);
                Some(Ok(current_directory))
            }
        }
    }
}