about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
authorYureka <tvl@yuka.dev>2024-09-27T19·32+0200
committerclbot <clbot@tvl.fyi>2024-10-01T13·40+0000
commitb82cacb449c71684aab533cb1f27ddfd48bf78da (patch)
treefebc5cf7e8d49a1ff02dd5900b500082b46b1fd4 /tvix
parentd277bd9fbf17d579b9c51c12a4126f67a9e9db6c (diff)
feat(castore/fs): optional refscanner for ingest r/8741
Change-Id: Ieca06de4c2e2680d89fe05a380079fafa5454837
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12529
Autosubmit: yuka <yuka@yuka.dev>
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix')
-rw-r--r--tvix/castore/src/import/fs.rs52
-rw-r--r--tvix/castore/src/tests/import.rs17
-rw-r--r--tvix/glue/src/builtins/import.rs3
-rw-r--r--tvix/store/src/bin/tvix-store.rs3
-rw-r--r--tvix/store/src/import.rs7
5 files changed, 60 insertions, 22 deletions
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index 1332fdfe57b5..5708579baad3 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -16,6 +16,7 @@ use walkdir::WalkDir;
 
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
+use crate::refscan::{ReferenceReader, ReferenceScanner};
 use crate::{B3Digest, Node};
 
 use super::ingest_entries;
@@ -29,16 +30,18 @@ use super::IngestionError;
 ///
 /// This function will walk the filesystem using `walkdir` and will consume
 /// `O(#number of entries)` space.
-#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)]
-pub async fn ingest_path<BS, DS, P>(
+#[instrument(skip(blob_service, directory_service, reference_scanner), fields(path, indicatif.pb_show=1), err)]
+pub async fn ingest_path<BS, DS, P, P2>(
     blob_service: BS,
     directory_service: DS,
     path: P,
+    reference_scanner: Option<&ReferenceScanner<P2>>,
 ) -> Result<Node, IngestionError<Error>>
 where
     P: AsRef<std::path::Path> + std::fmt::Debug,
     BS: BlobService + Clone,
     DS: DirectoryService,
+    P2: AsRef<[u8]> + Send + Sync,
 {
     let span = Span::current();
     span.pb_set_message(&format!("Ingesting {:?}", path));
@@ -50,7 +53,8 @@ where
         .contents_first(true)
         .into_iter();
 
-    let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref());
+    let entries =
+        dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref(), reference_scanner);
     ingest_entries(
         directory_service,
         entries.inspect({
@@ -71,14 +75,16 @@ where
 /// The produced stream is buffered, so uploads can happen concurrently.
 ///
 /// The root is the [Path] in the filesystem that is being ingested into the castore.
-pub fn dir_entries_to_ingestion_stream<'a, BS, I>(
+pub fn dir_entries_to_ingestion_stream<'a, BS, I, P>(
     blob_service: BS,
     iter: I,
     root: &'a std::path::Path,
+    reference_scanner: Option<&'a ReferenceScanner<P>>,
 ) -> BoxStream<'a, Result<IngestionEntry, Error>>
 where
     BS: BlobService + Clone + 'a,
     I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
+    P: AsRef<[u8]> + Send + Sync,
 {
     let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
 
@@ -89,7 +95,13 @@ where
                 async move {
                     match x {
                         Ok(dir_entry) => {
-                            dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await
+                            dir_entry_to_ingestion_entry(
+                                blob_service,
+                                &dir_entry,
+                                prefix,
+                                reference_scanner,
+                            )
+                            .await
                         }
                         Err(e) => Err(Error::Stat(
                             prefix.to_path_buf(),
@@ -107,13 +119,15 @@ where
 ///
 /// The prefix path is stripped from the path of each entry. This is usually the parent path
 /// of the path being ingested so that the last element of the stream only has one component.
-pub async fn dir_entry_to_ingestion_entry<BS>(
+pub async fn dir_entry_to_ingestion_entry<BS, P>(
     blob_service: BS,
     entry: &DirEntry,
     prefix: &std::path::Path,
+    reference_scanner: Option<&ReferenceScanner<P>>,
 ) -> Result<IngestionEntry, Error>
 where
     BS: BlobService,
+    P: AsRef<[u8]>,
 {
     let file_type = entry.file_type();
 
@@ -134,13 +148,18 @@ where
             .into_os_string()
             .into_vec();
 
+        if let Some(reference_scanner) = &reference_scanner {
+            reference_scanner.scan(&target);
+        }
+
         Ok(IngestionEntry::Symlink { path, target })
     } else if file_type.is_file() {
         let metadata = entry
             .metadata()
             .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
 
-        let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?;
+        let digest =
+            upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner).await?;
 
         Ok(IngestionEntry::Regular {
             path,
@@ -156,13 +175,15 @@ where
 }
 
 /// Uploads the file at the provided [Path] the the [BlobService].
-#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)]
-async fn upload_blob<BS>(
+#[instrument(skip(blob_service, reference_scanner), fields(path, indicatif.pb_show=1), err)]
+async fn upload_blob<BS, P>(
     blob_service: BS,
     path: impl AsRef<std::path::Path>,
+    reference_scanner: Option<&ReferenceScanner<P>>,
 ) -> Result<B3Digest, Error>
 where
     BS: BlobService,
+    P: AsRef<[u8]>,
 {
     let span = Span::current();
     span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
@@ -184,9 +205,16 @@ where
     });
 
     let mut writer = blob_service.open_write().await;
-    tokio::io::copy(&mut BufReader::new(reader), &mut writer)
-        .await
-        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
+    if let Some(reference_scanner) = reference_scanner {
+        let mut reader = ReferenceReader::new(reference_scanner, BufReader::new(reader));
+        tokio::io::copy(&mut reader, &mut writer)
+            .await
+            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
+    } else {
+        tokio::io::copy(&mut BufReader::new(reader), &mut writer)
+            .await
+            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
+    }
 
     let digest = writer
         .close()
diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs
index 32c2c363689f..51d1b68a4ec9 100644
--- a/tvix/castore/src/tests/import.rs
+++ b/tvix/castore/src/tests/import.rs
@@ -21,10 +21,11 @@ async fn symlink() {
     )
     .unwrap();
 
-    let root_node = ingest_path(
+    let root_node = ingest_path::<_, _, _, &[u8]>(
         blob_service,
         directory_service,
         tmpdir.path().join("doesntmatter"),
+        None,
     )
     .await
     .expect("must succeed");
@@ -46,10 +47,11 @@ async fn single_file() {
 
     std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap();
 
-    let root_node = ingest_path(
+    let root_node = ingest_path::<_, _, _, &[u8]>(
         blob_service.clone(),
         directory_service,
         tmpdir.path().join("root"),
+        None,
     )
     .await
     .expect("must succeed");
@@ -84,9 +86,14 @@ async fn complicated() {
     // File ``keep/.keep`
     std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap();
 
-    let root_node = ingest_path(blob_service.clone(), &directory_service, tmpdir.path())
-        .await
-        .expect("must succeed");
+    let root_node = ingest_path::<_, _, _, &[u8]>(
+        blob_service.clone(),
+        &directory_service,
+        tmpdir.path(),
+        None,
+    )
+    .await
+    .expect("must succeed");
 
     // ensure root_node matched expectations
     assert_eq!(
diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs
index 34ae2778ecdd..53a86b52394f 100644
--- a/tvix/glue/src/builtins/import.rs
+++ b/tvix/glue/src/builtins/import.rs
@@ -89,10 +89,11 @@ async fn filtered_ingest(
     let dir_entries = entries.into_iter().rev().map(Ok);
 
     state.tokio_handle.block_on(async {
-        let entries = tvix_castore::import::fs::dir_entries_to_ingestion_stream(
+        let entries = tvix_castore::import::fs::dir_entries_to_ingestion_stream::<'_, _, _, &[u8]>(
             &state.blob_service,
             dir_entries,
             path,
+            None, // TODO re-scan
         );
         ingest_entries(&state.directory_service, entries)
             .await
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 6da239a8fee3..b77a46dace7c 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -339,10 +339,11 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
                     async move {
                         // Ingest the given path.
 
-                        ingest_path(
+                        ingest_path::<_, _, _, &[u8]>(
                             blob_service,
                             directory_service,
                             PathBuf::from(elem.path.to_absolute_path()),
+                            None,
                         )
                         .await
                         .map(|root_node| (elem, root_node))
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index 1719669a4285..2c2b205016a1 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -123,9 +123,10 @@ where
     PS: AsRef<dyn PathInfoService>,
     NS: NarCalculationService,
 {
-    let root_node = ingest_path(blob_service, directory_service, path.as_ref())
-        .await
-        .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
+    let root_node =
+        ingest_path::<_, _, _, &[u8]>(blob_service, directory_service, path.as_ref(), None)
+            .await
+            .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
 
     // Ask for the NAR size and sha256
     let (nar_size, nar_sha256) = nar_calculation_service.calculate_nar(&root_node).await?;