about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-04-13T16·30+0300
committerclbot <clbot@tvl.fyi>2024-04-15T14·37+0000
commitc088123d4e8c0b49cf0af568426d6d146a07f44c (patch)
tree9bd9bc85fee95a597d152807a80a9f46562d924c /tvix/castore
parentb70744fda627594cd42f8b6ae2d0dd1d7d037d61 (diff)
refactor(tvix/castore/import): put invariant checker into a .inspect() r/7926
Separate this a bit stronger from the main application flow.

Change-Id: I2e9bd3ec47cc6e37256ba6afc6e0586ddc9a051f
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11416
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: Brian Olsen <me@griff.name>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/src/import.rs36
1 files changed, 18 insertions, 18 deletions
diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs
index acae8299cc27..520ea18a5ac0 100644
--- a/tvix/castore/src/import.rs
+++ b/tvix/castore/src/import.rs
@@ -8,7 +8,7 @@ use crate::proto::SymlinkNode;
 use crate::Error as CastoreError;
 use async_stream::stream;
 use futures::pin_mut;
-use futures::Stream;
+use futures::{Stream, StreamExt};
 use std::fs::FileType;
 use tracing::Level;
 
@@ -21,7 +21,6 @@ use std::{
     os::unix::prelude::PermissionsExt,
     path::{Path, PathBuf},
 };
-use tokio_stream::StreamExt;
 use tracing::instrument;
 use walkdir::DirEntry;
 use walkdir::WalkDir;
@@ -201,7 +200,7 @@ impl MerkleInvariantChecker {
 pub async fn ingest_entries<'a, BS, DS, S>(
     blob_service: BS,
     directory_service: DS,
-    mut direntry_stream: S,
+    #[allow(unused_mut)] mut direntry_stream: S,
 ) -> Result<Node, Error>
 where
     BS: AsRef<dyn BlobService> + Clone,
@@ -215,25 +214,26 @@ where
     #[cfg(debug_assertions)]
     let mut invariant_checker: MerkleInvariantChecker = Default::default();
 
+    #[cfg(debug_assertions)]
+    let mut direntry_stream = direntry_stream.inspect(|e| {
+        // If we find an ancestor before we see this entry, this means that the caller
+        // broke the contract, refer to the documentation of the invariant checker to
+        // understand the reasoning here.
+        if let Some(ancestor) = invariant_checker.find_ancestor(e) {
+            panic!(
+                "Tvix bug: merkle invariant checker discovered that {} was processed before {}!",
+                ancestor.display(),
+                e.path().display()
+            );
+        }
+
+        invariant_checker.see(e);
+    });
+
     // We need to process a directory's children before processing
     // the directory itself in order to have all the data needed
     // to compute the hash.
     while let Some(entry) = direntry_stream.next().await {
-        #[cfg(debug_assertions)]
-        {
-            // If we find an ancestor before we see this entry, this means that the caller
-            // broke the contract, refer to the documentation of the invariant checker to
-            // understand the reasoning here.
-            if let Some(ancestor) = invariant_checker.find_ancestor(&entry) {
-                panic!("Tvix bug: merkle invariant checker discovered that {} was processed before {}!",
-                    ancestor.display(),
-                    entry.path().display()
-                );
-            }
-
-            invariant_checker.see(&entry);
-        }
-
         let file_type = entry.file_type();
 
         let node = if file_type.is_dir() {