about summary refs log tree commit diff
path: root/tvix/glue/src/builtins/import.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/glue/src/builtins/import.rs')
-rw-r--r--tvix/glue/src/builtins/import.rs428
1 files changed, 223 insertions, 205 deletions
diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs
index 273be08ef7b6..83b91165c09e 100644
--- a/tvix/glue/src/builtins/import.rs
+++ b/tvix/glue/src/builtins/import.rs
@@ -1,8 +1,9 @@
 //! Implements builtins used to import paths in the store.
 
-use crate::builtins::errors::ImportError;
+use crate::tvix_store_io::TvixStoreIO;
 use std::path::Path;
 use tvix_castore::import::ingest_entries;
+use tvix_castore::Node;
 use tvix_eval::{
     builtin_macros::builtins,
     generators::{self, GenCo},
@@ -16,7 +17,7 @@ async fn filtered_ingest(
     co: GenCo,
     path: &Path,
     filter: Option<&Value>,
-) -> Result<tvix_castore::proto::node::Node, ErrorKind> {
+) -> Result<Node, ErrorKind> {
     let mut entries: Vec<walkdir::DirEntry> = vec![];
     let mut it = walkdir::WalkDir::new(path)
         .follow_links(false)
@@ -88,10 +89,11 @@ async fn filtered_ingest(
     let dir_entries = entries.into_iter().rev().map(Ok);
 
     state.tokio_handle.block_on(async {
-        let entries = tvix_castore::import::fs::dir_entries_to_ingestion_stream(
+        let entries = tvix_castore::import::fs::dir_entries_to_ingestion_stream::<'_, _, _, &[u8]>(
             &state.blob_service,
             dir_entries,
             path,
+            None, // TODO re-scan
         );
         ingest_entries(&state.directory_service, entries)
             .await
@@ -104,174 +106,134 @@ async fn filtered_ingest(
 
 #[builtins(state = "Rc<TvixStoreIO>")]
 mod import_builtins {
-    use std::os::unix::ffi::OsStrExt;
-    use std::rc::Rc;
-
     use super::*;
 
+    use crate::builtins::ImportError;
     use crate::tvix_store_io::TvixStoreIO;
+    use bstr::ByteSlice;
     use nix_compat::nixhash::{CAHash, NixHash};
-    use nix_compat::store_path::StorePath;
+    use nix_compat::store_path::{build_ca_path, StorePathRef};
     use sha2::Digest;
+    use std::rc::Rc;
     use tokio::io::AsyncWriteExt;
-    use tvix_castore::proto::node::Node;
-    use tvix_castore::proto::FileNode;
     use tvix_eval::builtins::coerce_value_to_path;
     use tvix_eval::generators::Gen;
     use tvix_eval::{generators::GenCo, ErrorKind, Value};
     use tvix_eval::{FileType, NixContextElement, NixString};
+    use tvix_store::path_info::PathInfo;
 
-    #[builtin("path")]
-    async fn builtin_path(
+    // This is a helper used by both builtins.path and builtins.filterSource.
+    async fn import_helper(
         state: Rc<TvixStoreIO>,
         co: GenCo,
-        args: Value,
+        path: std::path::PathBuf,
+        name: Option<&Value>,
+        filter: Option<&Value>,
+        recursive_ingestion: bool,
+        expected_sha256: Option<[u8; 32]>,
     ) -> Result<Value, ErrorKind> {
-        let args = args.to_attrs()?;
-        let path = args.select_required("path")?;
-        let path =
-            match coerce_value_to_path(&co, generators::request_force(&co, path.clone()).await)
-                .await?
-            {
-                Ok(path) => path,
-                Err(cek) => return Ok(cek.into()),
-            };
-        let name: String = if let Some(name) = args.select("name") {
-            generators::request_force(&co, name.clone())
+        let name: String = match name {
+            Some(name) => generators::request_force(&co, name.clone())
                 .await
                 .to_str()?
                 .as_bstr()
-                .to_string()
-        } else {
-            tvix_store::import::path_to_name(&path)
+                .to_string(),
+            None => tvix_store::import::path_to_name(&path)
                 .expect("Failed to derive the default name out of the path")
-                .to_string()
+                .to_string(),
         };
-        let filter = args.select("filter");
-        let recursive_ingestion = args
-            .select("recursive")
-            .map(|r| r.as_bool())
-            .transpose()?
-            .unwrap_or(true); // Yes, yes, Nix, by default, puts `recursive = true;`.
-        let expected_sha256 = args
-            .select("sha256")
-            .map(|h| {
-                h.to_str().and_then(|expected| {
-                    let expected = expected.into_bstring().to_string();
-                    // TODO: ensure that we fail if this is not a valid str.
-                    nix_compat::nixhash::from_str(&expected, None).map_err(|_err| {
-                        // TODO: a better error would be nice, we use
-                        // DerivationError::InvalidOutputHash usually for derivation construction.
-                        // This is not a derivation construction, should we move it outside and
-                        // generalize?
-                        ErrorKind::TypeError {
-                            expected: "sha256",
-                            actual: "not a sha256",
-                        }
-                    })
-                })
-            })
-            .transpose()?;
-
-        // Check if the path points to a regular file.
-        // If it does, the filter function is never executed.
-        // TODO: follow symlinks and check their type instead
-        let (root_node, ca_hash) = match state.file_type(path.as_ref())? {
+        // As a first step, we ingest the contents, and get back a root node,
+        // and optionally the sha256 a flat file.
+        let (root_node, ca) = match std::fs::metadata(&path)?.file_type().into() {
+            // Check if the path points to a regular file.
+            // If it does, the filter function is never executed, and we copy to the blobservice directly.
+            // If recursive is false, we need to calculate the sha256 digest of the raw contents,
+            // as that affects the output path calculation.
             FileType::Regular => {
-                let mut file = state.open(path.as_ref())?;
-                // This is a single file, copy it to the blobservice directly.
-                let mut hash = sha2::Sha256::new();
+                let mut file = state.open(&path)?;
+
+                let mut flat_sha256 = (!recursive_ingestion).then(sha2::Sha256::new);
                 let mut blob_size = 0;
+
                 let mut blob_writer = state
                     .tokio_handle
                     .block_on(async { state.blob_service.open_write().await });
 
-                let mut buf = [0u8; 4096];
+                // read piece by piece and write to blob_writer.
+                // This is a bit manual due to EvalIO being sync, while everything else async.
+                {
+                    let mut buf = [0u8; 4096];
 
-                loop {
-                    // read bytes into buffer, break out if EOF
-                    let len = file.read(&mut buf)?;
-                    if len == 0 {
-                        break;
-                    }
-                    blob_size += len as u64;
+                    loop {
+                        // read bytes into buffer, break out if EOF
+                        let len = file.read(&mut buf)?;
+                        if len == 0 {
+                            break;
+                        }
+                        blob_size += len as u64;
 
-                    let data = &buf[0..len];
+                        let data = &buf[0..len];
 
-                    // add to blobwriter
-                    state
-                        .tokio_handle
-                        .block_on(async { blob_writer.write_all(data).await })?;
+                        // add to blobwriter
+                        state
+                            .tokio_handle
+                            .block_on(async { blob_writer.write_all(data).await })?;
 
-                    // update the sha256 hash function. We can skip that if we're not using it.
-                    if !recursive_ingestion {
-                        hash.update(data);
+                        // update blob_sha256 if needed.
+                        if let Some(h) = flat_sha256.as_mut() {
+                            h.update(data)
+                        }
                     }
                 }
 
-                // close the blob writer, get back the b3 digest.
-                let blob_digest = state
-                    .tokio_handle
-                    .block_on(async { blob_writer.close().await })?;
-
-                let root_node = Node::File(FileNode {
-                    // The name gets set further down, while constructing the PathInfo.
-                    name: "".into(),
-                    digest: blob_digest.into(),
-                    size: blob_size,
-                    executable: false,
-                });
-
-                let ca_hash = if recursive_ingestion {
-                    let (_nar_size, nar_sha256) = state
-                        .tokio_handle
-                        .block_on(async {
-                            state
-                                .nar_calculation_service
-                                .as_ref()
-                                .calculate_nar(&root_node)
-                                .await
-                        })
-                        .map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?;
-                    CAHash::Nar(NixHash::Sha256(nar_sha256))
-                } else {
-                    CAHash::Flat(NixHash::Sha256(hash.finalize().into()))
-                };
-
-                (root_node, ca_hash)
+                // close the blob writer, construct the root node and the blob_sha256 (later used for output path calculation)
+                (
+                    Node::File {
+                        digest: state
+                            .tokio_handle
+                            .block_on(async { blob_writer.close().await })?,
+                        size: blob_size,
+                        executable: false,
+                    },
+                    {
+                        // If non-recursive ingestion is requested…
+                        if let Some(flat_sha256) = flat_sha256 {
+                            let actual_sha256 = flat_sha256.finalize().into();
+
+                            // compare the recorded flat hash with an upfront one if provided.
+                            if let Some(expected_sha256) = expected_sha256 {
+                                if actual_sha256 != expected_sha256 {
+                                    return Err(ImportError::HashMismatch(
+                                        path,
+                                        NixHash::Sha256(expected_sha256),
+                                        NixHash::Sha256(actual_sha256),
+                                    )
+                                    .into());
+                                }
+                            }
+
+                            Some(CAHash::Flat(NixHash::Sha256(actual_sha256)))
+                        } else {
+                            None
+                        }
+                    },
+                )
             }
 
-            FileType::Directory => {
-                if !recursive_ingestion {
-                    return Err(ImportError::FlatImportOfNonFile(
-                        path.to_string_lossy().to_string(),
-                    ))?;
-                }
-
-                // do the filtered ingest
-                let root_node = filtered_ingest(state.clone(), co, path.as_ref(), filter).await?;
-
-                // calculate the NAR sha256
-                let (_nar_size, nar_sha256) = state
-                    .tokio_handle
-                    .block_on(async {
-                        state
-                            .nar_calculation_service
-                            .as_ref()
-                            .calculate_nar(&root_node)
-                            .await
-                    })
-                    .map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?;
-
-                let ca_hash = CAHash::Nar(NixHash::Sha256(nar_sha256));
-
-                (root_node, ca_hash)
+            FileType::Directory if !recursive_ingestion => {
+                return Err(ImportError::FlatImportOfNonFile(path))?
             }
+
+            // do the filtered ingest
+            FileType::Directory => (
+                filtered_ingest(state.clone(), co, path.as_ref(), filter).await?,
+                None,
+            ),
             FileType::Symlink => {
                 // FUTUREWORK: Nix follows a symlink if it's at the root,
                 // except if it's not resolve-able (NixOS/nix#7761).i
                 return Err(tvix_eval::ErrorKind::IO {
-                    path: Some(path.to_path_buf()),
+                    path: Some(path),
                     error: Rc::new(std::io::Error::new(
                         std::io::ErrorKind::Unsupported,
                         "builtins.path pointing to a symlink is ill-defined.",
@@ -280,7 +242,7 @@ mod import_builtins {
             }
             FileType::Unknown => {
                 return Err(tvix_eval::ErrorKind::IO {
-                    path: Some(path.to_path_buf()),
+                    path: Some(path),
                     error: Rc::new(std::io::Error::new(
                         std::io::ErrorKind::Unsupported,
                         "unsupported file type",
@@ -289,32 +251,67 @@ mod import_builtins {
             }
         };
 
-        let (path_info, _hash, output_path) = state.tokio_handle.block_on(async {
-            state
-                .node_to_path_info(name.as_ref(), path.as_ref(), &ca_hash, root_node)
-                .await
-        })?;
-
-        if let Some(expected_sha256) = expected_sha256 {
-            if *ca_hash.hash() != expected_sha256 {
-                Err(ImportError::HashMismatch(
-                    path.to_string_lossy().to_string(),
-                    expected_sha256,
-                    ca_hash.hash().into_owned(),
-                ))?;
+        // Calculate the NAR sha256.
+        let (nar_size, nar_sha256) = state
+            .tokio_handle
+            .block_on(async {
+                state
+                    .nar_calculation_service
+                    .as_ref()
+                    .calculate_nar(&root_node)
+                    .await
+            })
+            .map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?;
+
+        // Calculate the CA hash for the recursive cases, this is only already
+        // `Some(_)` for flat ingestion.
+        let ca = match ca {
+            None => {
+                // If an upfront-expected NAR hash was specified, compare.
+                if let Some(expected_nar_sha256) = expected_sha256 {
+                    if expected_nar_sha256 != nar_sha256 {
+                        return Err(ImportError::HashMismatch(
+                            path,
+                            NixHash::Sha256(expected_nar_sha256),
+                            NixHash::Sha256(nar_sha256),
+                        )
+                        .into());
+                    }
+                }
+                CAHash::Nar(NixHash::Sha256(nar_sha256))
             }
-        }
+            Some(ca) => ca,
+        };
+
+        let store_path = build_ca_path(&name, &ca, Vec::<&str>::new(), false)
+            .map_err(|e| tvix_eval::ErrorKind::TvixError(Rc::new(e)))?;
 
-        state
+        let path_info = state
             .tokio_handle
-            .block_on(async { state.path_info_service.as_ref().put(path_info).await })
+            .block_on(async {
+                state
+                    .path_info_service
+                    .as_ref()
+                    .put(PathInfo {
+                        store_path,
+                        node: root_node,
+                        // There's no reference scanning on path contents ingested like this.
+                        references: vec![],
+                        nar_size,
+                        nar_sha256,
+                        signatures: vec![],
+                        deriver: None,
+                        ca: Some(ca),
+                    })
+                    .await
+            })
             .map_err(|e| tvix_eval::ErrorKind::IO {
-                path: Some(path.to_path_buf()),
+                path: Some(path),
                 error: Rc::new(e.into()),
             })?;
 
         // We need to attach context to the final output path.
-        let outpath = output_path.to_absolute_path();
+        let outpath = path_info.store_path.to_absolute_path();
 
         Ok(
             NixString::new_context_from(NixContextElement::Plain(outpath.clone()).into(), outpath)
@@ -322,45 +319,72 @@ mod import_builtins {
         )
     }
 
-    #[builtin("filterSource")]
-    async fn builtin_filter_source(
+    #[builtin("path")]
+    async fn builtin_path(
         state: Rc<TvixStoreIO>,
         co: GenCo,
-        #[lazy] filter: Value,
-        path: Value,
+        args: Value,
     ) -> Result<Value, ErrorKind> {
-        let p = path.to_path()?;
-        let root_node = filtered_ingest(Rc::clone(&state), co, &p, Some(&filter)).await?;
-        let name = tvix_store::import::path_to_name(&p)?;
+        let args = args.to_attrs()?;
 
-        let outpath = state
-            .tokio_handle
-            .block_on(async {
-                let (_, nar_sha256) = state
-                    .nar_calculation_service
-                    .as_ref()
-                    .calculate_nar(&root_node)
-                    .await?;
+        let path = match coerce_value_to_path(
+            &co,
+            generators::request_force(&co, args.select_required("path")?.clone()).await,
+        )
+        .await?
+        {
+            Ok(path) => path,
+            Err(cek) => return Ok(cek.into()),
+        };
 
-                state
-                    .register_node_in_path_info_service(
-                        name,
-                        &p,
-                        &CAHash::Nar(NixHash::Sha256(nar_sha256)),
-                        root_node,
-                    )
-                    .await
+        let filter = args.select("filter");
+
+        // Construct a sha256 hasher, which is needed for flat ingestion.
+        let recursive_ingestion = args
+            .select("recursive")
+            .map(|r| r.as_bool())
+            .transpose()?
+            .unwrap_or(true); // Yes, yes, Nix, by default, puts `recursive = true;`.
+
+        let expected_sha256 = args
+            .select("sha256")
+            .map(|h| {
+                h.to_str().and_then(|expected| {
+                    match nix_compat::nixhash::from_str(expected.to_str()?, Some("sha256")) {
+                        Ok(NixHash::Sha256(digest)) => Ok(digest),
+                        Ok(_) => unreachable!(),
+                        Err(e) => Err(ErrorKind::InvalidHash(e.to_string())),
+                    }
+                })
             })
-            .map_err(|err| ErrorKind::IO {
-                path: Some(p.to_path_buf()),
-                error: err.into(),
-            })?
-            .to_absolute_path();
+            .transpose()?;
 
-        Ok(
-            NixString::new_context_from(NixContextElement::Plain(outpath.clone()).into(), outpath)
-                .into(),
+        import_helper(
+            state,
+            co,
+            path,
+            args.select("name"),
+            filter,
+            recursive_ingestion,
+            expected_sha256,
         )
+        .await
+    }
+
+    #[builtin("filterSource")]
+    async fn builtin_filter_source(
+        state: Rc<TvixStoreIO>,
+        co: GenCo,
+        #[lazy] filter: Value,
+        path: Value,
+    ) -> Result<Value, ErrorKind> {
+        let path =
+            match coerce_value_to_path(&co, generators::request_force(&co, path).await).await? {
+                Ok(path) => path,
+                Err(cek) => return Ok(cek.into()),
+            };
+
+        import_helper(state, co, path, None, Some(&filter), true, None).await
     }
 
     #[builtin("storePath")]
@@ -369,39 +393,33 @@ mod import_builtins {
         co: GenCo,
         path: Value,
     ) -> Result<Value, ErrorKind> {
-        let p = std::str::from_utf8(match &path {
-            Value::String(s) => s.as_bytes(),
-            Value::Path(p) => p.as_os_str().as_bytes(),
+        let p = match &path {
+            Value::String(s) => Path::new(s.as_bytes().to_os_str()?),
+            Value::Path(p) => p.as_path(),
             _ => {
                 return Err(ErrorKind::TypeError {
                     expected: "string or path",
                     actual: path.type_of(),
                 })
             }
-        })?;
-
-        let path_exists = if let Ok((store_path, sub_path)) = StorePath::from_absolute_path_full(p)
-        {
-            if !sub_path.as_os_str().is_empty() {
-                false
-            } else {
-                state.store_path_exists(store_path.as_ref()).await?
-            }
-        } else {
-            false
         };
 
-        if !path_exists {
-            return Err(ImportError::PathNotInStore(p.into()).into());
-        }
+        // For this builtin, the path needs to start with an absolute store path.
+        let (store_path, _sub_path) = StorePathRef::from_absolute_path_full(p)
+            .map_err(|_e| ImportError::PathNotAbsoluteOrInvalid(p.to_path_buf()))?;
 
-        Ok(Value::String(NixString::new_context_from(
-            [NixContextElement::Plain(p.into())].into(),
-            p,
-        )))
+        if state.path_exists(p)? {
+            Ok(Value::String(NixString::new_context_from(
+                [NixContextElement::Plain(store_path.to_absolute_path())].into(),
+                p.as_os_str().as_encoded_bytes(),
+            )))
+        } else {
+            Err(ErrorKind::IO {
+                path: Some(p.to_path_buf()),
+                error: Rc::new(std::io::ErrorKind::NotFound.into()),
+            })
+        }
     }
 }
 
 pub use import_builtins::builtins as import_builtins;
-
-use crate::tvix_store_io::TvixStoreIO;