From c088123d4e8c0b49cf0af568426d6d146a07f44c Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sat, 13 Apr 2024 19:30:47 +0300 Subject: refactor(tvix/castore/import): put invariant checker into a .inspect() Separate this a bit stronger from the main application flow. Change-Id: I2e9bd3ec47cc6e37256ba6afc6e0586ddc9a051f Reviewed-on: https://cl.tvl.fyi/c/depot/+/11416 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: Brian Olsen Reviewed-by: Connor Brewster --- tvix/castore/src/import.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) (limited to 'tvix/castore/src/import.rs') diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs index acae8299cc27..520ea18a5ac0 100644 --- a/tvix/castore/src/import.rs +++ b/tvix/castore/src/import.rs @@ -8,7 +8,7 @@ use crate::proto::SymlinkNode; use crate::Error as CastoreError; use async_stream::stream; use futures::pin_mut; -use futures::Stream; +use futures::{Stream, StreamExt}; use std::fs::FileType; use tracing::Level; @@ -21,7 +21,6 @@ use std::{ os::unix::prelude::PermissionsExt, path::{Path, PathBuf}, }; -use tokio_stream::StreamExt; use tracing::instrument; use walkdir::DirEntry; use walkdir::WalkDir; @@ -201,7 +200,7 @@ impl MerkleInvariantChecker { pub async fn ingest_entries<'a, BS, DS, S>( blob_service: BS, directory_service: DS, - mut direntry_stream: S, + #[allow(unused_mut)] mut direntry_stream: S, ) -> Result where BS: AsRef + Clone, @@ -215,25 +214,26 @@ where #[cfg(debug_assertions)] let mut invariant_checker: MerkleInvariantChecker = Default::default(); + #[cfg(debug_assertions)] + let mut direntry_stream = direntry_stream.inspect(|e| { + // If we find an ancestor before we see this entry, this means that the caller + // broke the contract, refer to the documentation of the invariant checker to + // understand the reasoning here. + if let Some(ancestor) = invariant_checker.find_ancestor(e) { + panic!( + "Tvix bug: merkle invariant checker discovered that {} was processed before {}!", + ancestor.display(), + e.path().display() + ); + } + + invariant_checker.see(e); + }); + // We need to process a directory's children before processing // the directory itself in order to have all the data needed // to compute the hash. while let Some(entry) = direntry_stream.next().await { - #[cfg(debug_assertions)] - { - // If we find an ancestor before we see this entry, this means that the caller - // broke the contract, refer to the documentation of the invariant checker to - // understand the reasoning here. - if let Some(ancestor) = invariant_checker.find_ancestor(&entry) { - panic!("Tvix bug: merkle invariant checker discovered that {} was processed before {}!", - ancestor.display(), - entry.path().display() - ); - } - - invariant_checker.see(&entry); - } - let file_type = entry.file_type(); let node = if file_type.is_dir() { -- cgit 1.4.1