about summary refs log tree commit diff
path: root/tvix/castore/src/import/fs.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/import/fs.rs')
-rw-r--r--tvix/castore/src/import/fs.rs72
1 files changed, 54 insertions, 18 deletions
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index dc7821b8101e..78eac6f6128d 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -8,7 +8,9 @@ use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
 use tokio::io::BufReader;
 use tokio_util::io::InspectReader;
+use tracing::info_span;
 use tracing::instrument;
+use tracing::Instrument;
 use tracing::Span;
 use tracing_indicatif::span_ext::IndicatifSpanExt;
 use walkdir::DirEntry;
@@ -16,8 +18,8 @@ use walkdir::WalkDir;
 
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
-use crate::proto::node::Node;
-use crate::B3Digest;
+use crate::refscan::{ReferenceReader, ReferenceScanner};
+use crate::{B3Digest, Node};
 
 use super::ingest_entries;
 use super::IngestionEntry;
@@ -30,20 +32,24 @@ use super::IngestionError;
 ///
 /// This function will walk the filesystem using `walkdir` and will consume
 /// `O(#number of entries)` space.
-#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)]
-pub async fn ingest_path<BS, DS, P>(
+#[instrument(
+    skip(blob_service, directory_service, reference_scanner),
+    fields(path),
+    err
+)]
+pub async fn ingest_path<BS, DS, P, P2>(
     blob_service: BS,
     directory_service: DS,
     path: P,
+    reference_scanner: Option<&ReferenceScanner<P2>>,
 ) -> Result<Node, IngestionError<Error>>
 where
     P: AsRef<std::path::Path> + std::fmt::Debug,
     BS: BlobService + Clone,
     DS: DirectoryService,
+    P2: AsRef<[u8]> + Send + Sync,
 {
     let span = Span::current();
-    span.pb_set_message(&format!("Ingesting {:?}", path));
-    span.pb_start();
 
     let iter = WalkDir::new(path.as_ref())
         .follow_links(false)
@@ -51,7 +57,8 @@ where
         .contents_first(true)
         .into_iter();
 
-    let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref());
+    let entries =
+        dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref(), reference_scanner);
     ingest_entries(
         directory_service,
         entries.inspect({
@@ -72,14 +79,16 @@ where
 /// The produced stream is buffered, so uploads can happen concurrently.
 ///
 /// The root is the [Path] in the filesystem that is being ingested into the castore.
-pub fn dir_entries_to_ingestion_stream<'a, BS, I>(
+pub fn dir_entries_to_ingestion_stream<'a, BS, I, P>(
     blob_service: BS,
     iter: I,
     root: &'a std::path::Path,
+    reference_scanner: Option<&'a ReferenceScanner<P>>,
 ) -> BoxStream<'a, Result<IngestionEntry, Error>>
 where
     BS: BlobService + Clone + 'a,
     I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
+    P: AsRef<[u8]> + Send + Sync,
 {
     let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
 
@@ -90,7 +99,13 @@ where
                 async move {
                     match x {
                         Ok(dir_entry) => {
-                            dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await
+                            dir_entry_to_ingestion_entry(
+                                blob_service,
+                                &dir_entry,
+                                prefix,
+                                reference_scanner,
+                            )
+                            .await
                         }
                         Err(e) => Err(Error::Stat(
                             prefix.to_path_buf(),
@@ -108,13 +123,15 @@ where
 ///
 /// The prefix path is stripped from the path of each entry. This is usually the parent path
 /// of the path being ingested so that the last element of the stream only has one component.
-pub async fn dir_entry_to_ingestion_entry<BS>(
+pub async fn dir_entry_to_ingestion_entry<BS, P>(
     blob_service: BS,
     entry: &DirEntry,
     prefix: &std::path::Path,
+    reference_scanner: Option<&ReferenceScanner<P>>,
 ) -> Result<IngestionEntry, Error>
 where
     BS: BlobService,
+    P: AsRef<[u8]>,
 {
     let file_type = entry.file_type();
 
@@ -135,13 +152,25 @@ where
             .into_os_string()
             .into_vec();
 
+        if let Some(reference_scanner) = &reference_scanner {
+            reference_scanner.scan(&target);
+        }
+
         Ok(IngestionEntry::Symlink { path, target })
     } else if file_type.is_file() {
         let metadata = entry
             .metadata()
             .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
 
-        let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?;
+        let digest = upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner)
+            .instrument({
+                let span = info_span!("upload_blob", "indicatif.pb_show" = tracing::field::Empty);
+                span.pb_set_message(&format!("Uploading blob for {:?}", fs_path));
+                span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
+
+                span
+            })
+            .await?;
 
         Ok(IngestionEntry::Regular {
             path,
@@ -157,17 +186,17 @@ where
 }
 
 /// Uploads the file at the provided [Path] the the [BlobService].
-#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)]
-async fn upload_blob<BS>(
+#[instrument(skip(blob_service, reference_scanner), fields(path), err)]
+async fn upload_blob<BS, P>(
     blob_service: BS,
     path: impl AsRef<std::path::Path>,
+    reference_scanner: Option<&ReferenceScanner<P>>,
 ) -> Result<B3Digest, Error>
 where
     BS: BlobService,
+    P: AsRef<[u8]>,
 {
     let span = Span::current();
-    span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
-    span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref()));
     span.pb_start();
 
     let file = tokio::fs::File::open(path.as_ref())
@@ -185,9 +214,16 @@ where
     });
 
     let mut writer = blob_service.open_write().await;
-    tokio::io::copy(&mut BufReader::new(reader), &mut writer)
-        .await
-        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
+    if let Some(reference_scanner) = reference_scanner {
+        let mut reader = ReferenceReader::new(reference_scanner, BufReader::new(reader));
+        tokio::io::copy(&mut reader, &mut writer)
+            .await
+            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
+    } else {
+        tokio::io::copy(&mut BufReader::new(reader), &mut writer)
+            .await
+            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
+    }
 
     let digest = writer
         .close()