about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-03-27T12·47+0200
committerflokli <flokli@flokli.de>2023-03-31T10·54+0000
commit5bf913432424ad29e976f4acad535aa1340d84fe (patch)
tree14dbc2cc4bc5fa4b896cc745ac5c02555d60c774
parente9686f84d9be4aa5adace6ff531ff00558239f75 (diff)
refactor(tvix/store/directorysvc): move DirectoryTraverser to utils r/6065
Change-Id: Ie60a660e0fda7c80a6c7de20404c1965fe0e0d63
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8355
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
-rw-r--r--tvix/store/src/directoryservice/mod.rs107
-rw-r--r--tvix/store/src/directoryservice/utils.rs106
2 files changed, 108 insertions, 105 deletions
diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs
index d7b143df3e..53f6f08b14 100644
--- a/tvix/store/src/directoryservice/mod.rs
+++ b/tvix/store/src/directoryservice/mod.rs
@@ -1,15 +1,13 @@
-use std::collections::{HashSet, VecDeque};
-
-use tracing::{debug_span, instrument, warn};
-
 use crate::{proto, Error};
 mod grpc;
 mod memory;
 mod sled;
+mod utils;
 
 pub use self::grpc::GRPCDirectoryService;
 pub use self::memory::MemoryDirectoryService;
 pub use self::sled::SledDirectoryService;
+pub use self::utils::DirectoryTraverser;
 
 /// The base trait all Directory services need to implement.
 /// This is a simple get and put of [crate::proto::Directory], returning their
@@ -30,104 +28,3 @@ pub trait DirectoryService {
     /// we can't have that yet.
     fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator;
 }
-
-/// Traverses a [proto::Directory] from the root to the children.
-///
-/// This is mostly BFS, but directories are only returned once.
-pub struct DirectoryTraverser<DS: DirectoryService> {
-    directory_service: DS,
-    /// The list of all directories that still need to be traversed. The next
-    /// element is picked from the front, new elements are enqueued at the
-    /// back.
-    worklist_directory_digests: VecDeque<[u8; 32]>,
-    /// The list of directory digests already sent to the consumer.
-    /// We omit sending the same directories multiple times.
-    sent_directory_digests: HashSet<[u8; 32]>,
-}
-
-impl<DS: DirectoryService> DirectoryTraverser<DS> {
-    pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self {
-        Self {
-            directory_service,
-            worklist_directory_digests: VecDeque::from([*root_directory_digest]),
-            sent_directory_digests: HashSet::new(),
-        }
-    }
-
-    // enqueue all child directory digests to the work queue, as
-    // long as they're not part of the worklist or already sent.
-    // This panics if the digest looks invalid, it's supposed to be checked first.
-    fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
-        for child_directory_node in &directory.directories {
-            let child_digest: [u8; 32] = child_directory_node
-                .digest
-                .as_slice()
-                .try_into()
-                .map_err(|_e| Error::StorageError("invalid digest length".to_string()))
-                .unwrap();
-
-            if self.worklist_directory_digests.contains(&child_digest)
-                || self.sent_directory_digests.contains(&child_digest)
-            {
-                continue;
-            }
-            self.worklist_directory_digests.push_back(child_digest);
-        }
-    }
-}
-
-impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
-    type Item = Result<proto::Directory, Error>;
-
-    #[instrument(skip_all)]
-    fn next(&mut self) -> Option<Self::Item> {
-        // fetch the next directory digest from the top of the work queue.
-        match self.worklist_directory_digests.pop_front() {
-            None => None,
-            Some(current_directory_digest) => {
-                let current_directory_b64 = data_encoding::BASE64.encode(&current_directory_digest);
-                let span = debug_span!("directory.digest", current_directory_b64);
-                let _ = span.enter();
-
-                // look up the directory itself.
-                let current_directory = match self.directory_service.get(&current_directory_digest)
-                {
-                    // if we got it
-                    Ok(Some(current_directory)) => {
-                        // validate, we don't want to send invalid directories.
-                        if let Err(e) = current_directory.validate() {
-                            warn!("directory failed validation: {}", e.to_string());
-                            return Some(Err(Error::StorageError(format!(
-                                "invalid directory: {}",
-                                current_directory_b64
-                            ))));
-                        }
-                        current_directory
-                    }
-                    // if it's not there, we have an inconsistent store!
-                    Ok(None) => {
-                        warn!("directory {} does not exist", current_directory_b64);
-                        return Some(Err(Error::StorageError(format!(
-                            "directory {} does not exist",
-                            current_directory_b64
-                        ))));
-                    }
-                    Err(e) => {
-                        warn!("failed to look up directory");
-                        return Some(Err(Error::StorageError(format!(
-                            "unable to look up directory {}: {}",
-                            current_directory_b64, e
-                        ))));
-                    }
-                };
-
-                // All DirectoryServices MUST validate directory nodes, before returning them out, so we
-                // can be sure [enqueue_child_directories] doesn't panic.
-
-                // enqueue child directories
-                self.enqueue_child_directories(&current_directory);
-                Some(Ok(current_directory))
-            }
-        }
-    }
-}
diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs
new file mode 100644
index 0000000000..10edb9ba74
--- /dev/null
+++ b/tvix/store/src/directoryservice/utils.rs
@@ -0,0 +1,106 @@
+use super::DirectoryService;
+use crate::proto;
+use crate::Error;
+use std::collections::{HashSet, VecDeque};
+use tracing::{debug_span, instrument, warn};
+
+/// Traverses a [proto::Directory] from the root to the children.
+///
+/// This is mostly BFS, but directories are only returned once.
+pub struct DirectoryTraverser<DS: DirectoryService> {
+    directory_service: DS,
+    /// The list of all directories that still need to be traversed. The next
+    /// element is picked from the front, new elements are enqueued at the
+    /// back.
+    worklist_directory_digests: VecDeque<[u8; 32]>,
+    /// The list of directory digests already sent to the consumer.
+    /// We omit sending the same directories multiple times.
+    sent_directory_digests: HashSet<[u8; 32]>,
+}
+
+impl<DS: DirectoryService> DirectoryTraverser<DS> {
+    pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self {
+        Self {
+            directory_service,
+            worklist_directory_digests: VecDeque::from([*root_directory_digest]),
+            sent_directory_digests: HashSet::new(),
+        }
+    }
+
+    // enqueue all child directory digests to the work queue, as
+    // long as they're not part of the worklist or already sent.
+    // This panics if the digest looks invalid, it's supposed to be checked first.
+    fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
+        for child_directory_node in &directory.directories {
+            let child_digest: [u8; 32] = child_directory_node
+                .digest
+                .as_slice()
+                .try_into()
+                .map_err(|_e| Error::StorageError("invalid digest length".to_string()))
+                .unwrap();
+
+            if self.worklist_directory_digests.contains(&child_digest)
+                || self.sent_directory_digests.contains(&child_digest)
+            {
+                continue;
+            }
+            self.worklist_directory_digests.push_back(child_digest);
+        }
+    }
+}
+
+impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> {
+    type Item = Result<proto::Directory, Error>;
+
+    #[instrument(skip_all)]
+    fn next(&mut self) -> Option<Self::Item> {
+        // fetch the next directory digest from the top of the work queue.
+        match self.worklist_directory_digests.pop_front() {
+            None => None,
+            Some(current_directory_digest) => {
+                let current_directory_b64 = data_encoding::BASE64.encode(&current_directory_digest);
+                let span = debug_span!("directory.digest", current_directory_b64);
+                let _ = span.enter();
+
+                // look up the directory itself.
+                let current_directory = match self.directory_service.get(&current_directory_digest)
+                {
+                    // if we got it
+                    Ok(Some(current_directory)) => {
+                        // validate, we don't want to send invalid directories.
+                        if let Err(e) = current_directory.validate() {
+                            warn!("directory failed validation: {}", e.to_string());
+                            return Some(Err(Error::StorageError(format!(
+                                "invalid directory: {}",
+                                current_directory_b64
+                            ))));
+                        }
+                        current_directory
+                    }
+                    // if it's not there, we have an inconsistent store!
+                    Ok(None) => {
+                        warn!("directory {} does not exist", current_directory_b64);
+                        return Some(Err(Error::StorageError(format!(
+                            "directory {} does not exist",
+                            current_directory_b64
+                        ))));
+                    }
+                    Err(e) => {
+                        warn!("failed to look up directory");
+                        return Some(Err(Error::StorageError(format!(
+                            "unable to look up directory {}: {}",
+                            current_directory_b64, e
+                        ))));
+                    }
+                };
+
+                // All DirectoryServices MUST validate directory nodes, before returning them out, so we
+                // can be sure [enqueue_child_directories] doesn't panic.
+
+                // enqueue child directories
+                self.enqueue_child_directories(&current_directory);
+                Some(Ok(current_directory))
+            }
+        }
+    }
+}