about summary refs log tree commit diff
path: root/tvix/store/src/directoryservice/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/directoryservice/mod.rs')
-rw-r--r--tvix/store/src/directoryservice/mod.rs113
1 files changed, 113 insertions, 0 deletions
diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs
index 1fc8fbc4c2..d7b143df3e 100644
--- a/tvix/store/src/directoryservice/mod.rs
+++ b/tvix/store/src/directoryservice/mod.rs
@@ -1,3 +1,7 @@
+use std::collections::{HashSet, VecDeque};
+
+use tracing::{debug_span, instrument, warn};
+
 use crate::{proto, Error};
 mod grpc;
 mod memory;
@@ -11,10 +15,119 @@ pub use self::sled::SledDirectoryService;
 /// This is a simple get and put of [crate::proto::Directory], returning their
 /// digest.
 pub trait DirectoryService {
+    type DirectoriesIterator: Iterator<Item = Result<proto::Directory, Error>> + Send;
+
     /// Get looks up a single Directory message by its digest.
     /// In case the directory is not found, Ok(None) is returned.
     fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error>;
     /// Get uploads a single Directory message, and returns the calculated
     /// digest, or an error.
     fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error>;
+
+    /// Looks up a closure of [proto::Directory].
+    /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`,
+    /// and we'd be able to add a default implementation for it here, but
+    /// we can't have that yet.
+    fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator;
+}
+
+/// Traverses a [proto::Directory] from the root to the children.
+///
+/// This is mostly BFS, but directories are only returned once.
+pub struct DirectoryTraverser<DS: DirectoryService> {
+    directory_service: DS,
+    /// The list of all directories that still need to be traversed. The next
+    /// element is picked from the front, new elements are enqueued at the
+    /// back.
+    worklist_directory_digests: VecDeque<[u8; 32]>,
+    /// The list of directory digests already sent to the consumer.
+    /// We omit sending the same directories multiple times.
+    sent_directory_digests: HashSet<[u8; 32]>,
+}
+
+impl<DS: DirectoryService> DirectoryTraverser<DS> {
+    pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self {
+        Self {
+            directory_service,
+            worklist_directory_digests: VecDeque::from([*root_directory_digest]),
+            sent_directory_digests: HashSet::new(),
+        }
+    }
+
+    // enqueue all child directory digests to the work queue, as
+    // long as they're not part of the worklist or already sent.
+    // This panics if the digest looks invalid, it's supposed to be checked first.
+    fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
+        for child_directory_node in &directory.directories {
+            let child_digest: [u8; 32] = child_directory_node
+                .digest
+                .as_slice()
+                .try_into()
+                .map_err(|_e| Error::StorageError("invalid digest length".to_string()))
+                .unwrap();
+
+            if self.worklist_directory_digests.contains(&child_digest)
+                || self.sent_directory_digests.contains(&child_digest)
+            {
+                continue;
+            }
+            self.worklist_directory_digests.push_back(child_digest);
+        }
+    }
+}
+
+impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
+    type Item = Result<proto::Directory, Error>;
+
+    #[instrument(skip_all)]
+    fn next(&mut self) -> Option<Self::Item> {
+        // fetch the next directory digest from the top of the work queue.
+        match self.worklist_directory_digests.pop_front() {
+            None => None,
+            Some(current_directory_digest) => {
+                let current_directory_b64 = data_encoding::BASE64.encode(&current_directory_digest);
+                let span = debug_span!("directory.digest", current_directory_b64);
+                let _ = span.enter();
+
+                // look up the directory itself.
+                let current_directory = match self.directory_service.get(&current_directory_digest)
+                {
+                    // if we got it
+                    Ok(Some(current_directory)) => {
+                        // validate, we don't want to send invalid directories.
+                        if let Err(e) = current_directory.validate() {
+                            warn!("directory failed validation: {}", e.to_string());
+                            return Some(Err(Error::StorageError(format!(
+                                "invalid directory: {}",
+                                current_directory_b64
+                            ))));
+                        }
+                        current_directory
+                    }
+                    // if it's not there, we have an inconsistent store!
+                    Ok(None) => {
+                        warn!("directory {} does not exist", current_directory_b64);
+                        return Some(Err(Error::StorageError(format!(
+                            "directory {} does not exist",
+                            current_directory_b64
+                        ))));
+                    }
+                    Err(e) => {
+                        warn!("failed to look up directory");
+                        return Some(Err(Error::StorageError(format!(
+                            "unable to look up directory {}: {}",
+                            current_directory_b64, e
+                        ))));
+                    }
+                };
+
+                // All DirectoryServices MUST validate directory nodes, before returning them out, so we
+                // can be sure [enqueue_child_directories] doesn't panic.
+
+                // enqueue child directories
+                self.enqueue_child_directories(&current_directory);
+                Some(Ok(current_directory))
+            }
+        }
+    }
 }