about summary refs log tree commit diff
path: root/tvix/store/src/directoryservice/utils.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/directoryservice/utils.rs')
-rw-r--r--tvix/store/src/directoryservice/utils.rs140
1 files changed, 140 insertions, 0 deletions
diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs
new file mode 100644
index 000000000000..3661808734f3
--- /dev/null
+++ b/tvix/store/src/directoryservice/utils.rs
@@ -0,0 +1,140 @@
+use super::DirectoryPutter;
+use super::DirectoryService;
+use crate::proto;
+use crate::B3Digest;
+use crate::Error;
+use std::collections::{HashSet, VecDeque};
+use tracing::{debug_span, instrument, warn};
+
+/// Traverses a [proto::Directory] from the root to the children.
+///
+/// This is mostly BFS, but directories are only returned once.
+pub struct DirectoryTraverser<DS: DirectoryService> {
+    directory_service: DS,
+    /// The list of all directories that still need to be traversed. The next
+    /// element is picked from the front, new elements are enqueued at the
+    /// back.
+    worklist_directory_digests: VecDeque<B3Digest>,
+    /// The list of directory digests already sent to the consumer.
+    /// We omit sending the same directories multiple times.
+    sent_directory_digests: HashSet<B3Digest>,
+}
+
+impl<DS: DirectoryService> DirectoryTraverser<DS> {
+    pub fn with(directory_service: DS, root_directory_digest: &B3Digest) -> Self {
+        Self {
+            directory_service,
+            worklist_directory_digests: VecDeque::from([root_directory_digest.clone()]),
+            sent_directory_digests: HashSet::new(),
+        }
+    }
+
+    // enqueue all child directory digests to the work queue, as
+    // long as they're not part of the worklist or already sent.
+    // This panics if the digest looks invalid, it's supposed to be checked first.
+    fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
+        for child_directory_node in &directory.directories {
+            // TODO: propagate error
+            let child_digest = B3Digest::from_vec(child_directory_node.digest.clone()).unwrap();
+
+            if self.worklist_directory_digests.contains(&child_digest)
+                || self.sent_directory_digests.contains(&child_digest)
+            {
+                continue;
+            }
+            self.worklist_directory_digests.push_back(child_digest);
+        }
+    }
+}
+
+impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
+    type Item = Result<proto::Directory, Error>;
+
+    #[instrument(skip_all)]
+    fn next(&mut self) -> Option<Self::Item> {
+        // fetch the next directory digest from the top of the work queue.
+        match self.worklist_directory_digests.pop_front() {
+            None => None,
+            Some(current_directory_digest) => {
+                let span = debug_span!("directory.digest", "{}", current_directory_digest);
+                let _ = span.enter();
+
+                // look up the directory itself.
+                let current_directory = match self.directory_service.get(&current_directory_digest)
+                {
+                    // if we got it
+                    Ok(Some(current_directory)) => {
+                        // validate, we don't want to send invalid directories.
+                        if let Err(e) = current_directory.validate() {
+                            warn!("directory failed validation: {}", e.to_string());
+                            return Some(Err(Error::StorageError(format!(
+                                "invalid directory: {}",
+                                current_directory_digest
+                            ))));
+                        }
+                        current_directory
+                    }
+                    // if it's not there, we have an inconsistent store!
+                    Ok(None) => {
+                        warn!("directory {} does not exist", current_directory_digest);
+                        return Some(Err(Error::StorageError(format!(
+                            "directory {} does not exist",
+                            current_directory_digest
+                        ))));
+                    }
+                    Err(e) => {
+                        warn!("failed to look up directory");
+                        return Some(Err(Error::StorageError(format!(
+                            "unable to look up directory {}: {}",
+                            current_directory_digest, e
+                        ))));
+                    }
+                };
+
+                // All DirectoryServices MUST validate directory nodes, before returning them out, so we
+                // can be sure [enqueue_child_directories] doesn't panic.
+
+                // enqueue child directories
+                self.enqueue_child_directories(&current_directory);
+                Some(Ok(current_directory))
+            }
+        }
+    }
+}
+
+/// This is a simple implementation of a Directory uploader.
+/// TODO: verify connectivity? Factor out these checks into generic helpers?
+pub struct SimplePutter<DS: DirectoryService> {
+    directory_service: DS,
+    last_directory_digest: Option<B3Digest>,
+}
+
+impl<DS: DirectoryService> SimplePutter<DS> {
+    pub fn new(directory_service: DS) -> Self {
+        Self {
+            directory_service,
+            last_directory_digest: None,
+        }
+    }
+}
+
+impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
+    fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+        let digest = self.directory_service.put(directory)?;
+
+        // track the last directory digest
+        self.last_directory_digest = Some(digest);
+
+        Ok(())
+    }
+
+    /// We need to be mutable here, as that's the signature of the trait.
+    fn close(&mut self) -> Result<B3Digest, Error> {
+        match &self.last_directory_digest {
+            Some(last_digest) => Ok(last_digest.clone()),
+            None => Err(Error::InvalidRequest(
+                "no directories sent, can't show root digest".to_string(),
+            )),
+        }
+    }
+}