about summary refs log tree commit diff
path: root/tvix/store/src/directoryservice/utils.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-03-27T12·47+0200
committerflokli <flokli@flokli.de>2023-03-31T10·54+0000
commit5bf913432424ad29e976f4acad535aa1340d84fe (patch)
tree14dbc2cc4bc5fa4b896cc745ac5c02555d60c774 /tvix/store/src/directoryservice/utils.rs
parente9686f84d9be4aa5adace6ff531ff00558239f75 (diff)
refactor(tvix/store/directorysvc): move DirectoryTraverser to utils r/6065
Change-Id: Ie60a660e0fda7c80a6c7de20404c1965fe0e0d63
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8355
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/directoryservice/utils.rs')
-rw-r--r--tvix/store/src/directoryservice/utils.rs106
1 files changed, 106 insertions, 0 deletions
diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs
new file mode 100644
index 000000000000..10edb9ba74dc
--- /dev/null
+++ b/tvix/store/src/directoryservice/utils.rs
@@ -0,0 +1,106 @@
+use super::DirectoryService;
+use crate::proto;
+use crate::Error;
+use std::collections::{HashSet, VecDeque};
+use tracing::{debug_span, instrument, warn};
+
+/// Traverses a [proto::Directory] from the root to the children.
+///
+/// This is mostly BFS, but directories are only returned once.
+pub struct DirectoryTraverser<DS: DirectoryService> {
+    directory_service: DS,
+    /// The list of all directories that still need to be traversed. The next
+    /// element is picked from the front, new elements are enqueued at the
+    /// back.
+    worklist_directory_digests: VecDeque<[u8; 32]>,
+    /// The list of directory digests already sent to the consumer.
+    /// We omit sending the same directories multiple times.
+    sent_directory_digests: HashSet<[u8; 32]>,
+}
+
+impl<DS: DirectoryService> DirectoryTraverser<DS> {
+    pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self {
+        Self {
+            directory_service,
+            worklist_directory_digests: VecDeque::from([*root_directory_digest]),
+            sent_directory_digests: HashSet::new(),
+        }
+    }
+
+    // enqueue all child directory digests to the work queue, as
+    // long as they're not part of the worklist or already sent.
+    // This panics if the digest looks invalid, it's supposed to be checked first.
+    fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
+        for child_directory_node in &directory.directories {
+            let child_digest: [u8; 32] = child_directory_node
+                .digest
+                .as_slice()
+                .try_into()
+                .map_err(|_e| Error::StorageError("invalid digest length".to_string()))
+                .unwrap();
+
+            if self.worklist_directory_digests.contains(&child_digest)
+                || self.sent_directory_digests.contains(&child_digest)
+            {
+                continue;
+            }
+            self.worklist_directory_digests.push_back(child_digest);
+        }
+    }
+}
+
+impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
+    type Item = Result<proto::Directory, Error>;
+
+    #[instrument(skip_all)]
+    fn next(&mut self) -> Option<Self::Item> {
+        // fetch the next directory digest from the top of the work queue.
+        match self.worklist_directory_digests.pop_front() {
+            None => None,
+            Some(current_directory_digest) => {
+                let current_directory_b64 = data_encoding::BASE64.encode(&current_directory_digest);
+                let span = debug_span!("directory.digest", current_directory_b64);
+                let _ = span.enter();
+
+                // look up the directory itself.
+                let current_directory = match self.directory_service.get(&current_directory_digest)
+                {
+                    // if we got it
+                    Ok(Some(current_directory)) => {
+                        // validate, we don't want to send invalid directories.
+                        if let Err(e) = current_directory.validate() {
+                            warn!("directory failed validation: {}", e.to_string());
+                            return Some(Err(Error::StorageError(format!(
+                                "invalid directory: {}",
+                                current_directory_b64
+                            ))));
+                        }
+                        current_directory
+                    }
+                    // if it's not there, we have an inconsistent store!
+                    Ok(None) => {
+                        warn!("directory {} does not exist", current_directory_b64);
+                        return Some(Err(Error::StorageError(format!(
+                            "directory {} does not exist",
+                            current_directory_b64
+                        ))));
+                    }
+                    Err(e) => {
+                        warn!("failed to look up directory");
+                        return Some(Err(Error::StorageError(format!(
+                            "unable to look up directory {}: {}",
+                            current_directory_b64, e
+                        ))));
+                    }
+                };
+
+                // All DirectoryServices MUST validate directory nodes, before returning them out, so we
+                // can be sure [enqueue_child_directories] doesn't panic.
+
+                // enqueue child directories
+                self.enqueue_child_directories(&current_directory);
+                Some(Ok(current_directory))
+            }
+        }
+    }
+}