about summary refs log tree commit diff
path: root/tvix/glue/src/builtins/import.rs
//! Implements builtins used to import paths in the store.

use crate::builtins::errors::ImportError;
use futures::pin_mut;
use std::path::Path;
use tvix_eval::{
    builtin_macros::builtins,
    generators::{self, GenCo},
    ErrorKind, EvalIO, Value,
};

use std::rc::Rc;

async fn filtered_ingest(
    state: Rc<TvixStoreIO>,
    co: GenCo,
    path: &Path,
    filter: Option<&Value>,
) -> Result<tvix_castore::proto::node::Node, ErrorKind> {
    let mut entries_per_depths: Vec<Vec<walkdir::DirEntry>> = vec![Vec::new()];
    let mut it = walkdir::WalkDir::new(path)
        .follow_links(false)
        .follow_root_links(false)
        .contents_first(false)
        .sort_by_file_name()
        .into_iter();

    // Skip root node.
    entries_per_depths[0].push(
        it.next()
            .ok_or_else(|| ErrorKind::IO {
                path: Some(path.to_path_buf()),
                error: std::io::Error::new(std::io::ErrorKind::NotFound, "No root node emitted")
                    .into(),
            })?
            .map_err(|err| ErrorKind::IO {
                path: Some(path.to_path_buf()),
                error: std::io::Error::from(err).into(),
            })?,
    );

    while let Some(entry) = it.next() {
        // Entry could be a NotFound, if the root path specified does not exist.
        let entry = entry.map_err(|err| ErrorKind::IO {
            path: err.path().map(|p| p.to_path_buf()),
            error: std::io::Error::from(err).into(),
        })?;

        // As per Nix documentation `:doc builtins.filterSource`.
        let file_type = if entry.file_type().is_dir() {
            "directory"
        } else if entry.file_type().is_file() {
            "regular"
        } else if entry.file_type().is_symlink() {
            "symlink"
        } else {
            "unknown"
        };

        let should_keep: bool = if let Some(filter) = filter {
            generators::request_force(
                &co,
                generators::request_call_with(
                    &co,
                    filter.clone(),
                    [
                        Value::String(entry.path().as_os_str().as_encoded_bytes().into()),
                        Value::String(file_type.into()),
                    ],
                )
                .await,
            )
            .await
            .as_bool()?
        } else {
            true
        };

        if !should_keep {
            if file_type == "directory" {
                it.skip_current_dir();
            }
            continue;
        }

        if entry.depth() >= entries_per_depths.len() {
            debug_assert!(
                entry.depth() == entries_per_depths.len(),
                "Received unexpected entry with depth {} during descent, previously at {}",
                entry.depth(),
                entries_per_depths.len()
            );

            entries_per_depths.push(vec![entry]);
        } else {
            entries_per_depths[entry.depth()].push(entry);
        }

        // FUTUREWORK: determine when it's the right moment to flush a level to the ingester.
    }

    let entries_stream = tvix_castore::import::leveled_entries_to_stream(entries_per_depths);

    pin_mut!(entries_stream);

    state.tokio_handle.block_on(async {
        state
            .ingest_entries(entries_stream)
            .await
            .map_err(|err| ErrorKind::IO {
                path: Some(path.to_path_buf()),
                error: err.into(),
            })
    })
}

#[builtins(state = "Rc<TvixStoreIO>")]
mod import_builtins {
    use std::rc::Rc;

    use super::*;

    use nix_compat::nixhash::{CAHash, NixHash};
    use tvix_eval::generators::Gen;
    use tvix_eval::{generators::GenCo, ErrorKind, Value};
    use tvix_eval::{NixContextElement, NixString};

    use tvix_castore::B3Digest;

    use crate::tvix_store_io::TvixStoreIO;

    #[builtin("path")]
    async fn builtin_path(
        state: Rc<TvixStoreIO>,
        co: GenCo,
        args: Value,
    ) -> Result<Value, ErrorKind> {
        let args = args.to_attrs()?;
        let path = args.select_required("path")?;
        let path = generators::request_force(&co, path.clone())
            .await
            .to_path()?;
        let name: String = if let Some(name) = args.select("name") {
            generators::request_force(&co, name.clone())
                .await
                .to_str()?
                .as_bstr()
                .to_string()
        } else {
            tvix_store::import::path_to_name(&path)
                .expect("Failed to derive the default name out of the path")
                .to_string()
        };
        let filter = args.select("filter");
        let recursive_ingestion = args
            .select("recursive")
            .map(|r| r.as_bool())
            .transpose()?
            .unwrap_or(true); // Yes, yes, Nix, by default, puts `recursive = true;`.
        let expected_sha256 = args
            .select("sha256")
            .map(|h| {
                h.to_str().and_then(|expected| {
                    let expected = expected.into_bstring().to_string();
                    // TODO: ensure that we fail if this is not a valid str.
                    nix_compat::nixhash::from_str(&expected, None).map_err(|_err| {
                        // TODO: a better error would be nice, we use
                        // DerivationError::InvalidOutputHash usually for derivation construction.
                        // This is not a derivation construction, should we move it outside and
                        // generalize?
                        ErrorKind::TypeError {
                            expected: "sha256",
                            actual: "not a sha256",
                        }
                    })
                })
            })
            .transpose()?;

        // FUTUREWORK(performance): this reads the file instead of using a stat-like
        // system call to the file, this degrades very badly on large files.
        if !recursive_ingestion && state.read_to_end(path.as_ref()).is_err() {
            Err(ImportError::FlatImportOfNonFile(
                path.to_string_lossy().to_string(),
            ))?;
        }

        let root_node = filtered_ingest(state.clone(), co, path.as_ref(), filter).await?;
        let ca: CAHash = if recursive_ingestion {
            CAHash::Nar(NixHash::Sha256(state.tokio_handle.block_on(async {
                Ok::<_, tvix_eval::ErrorKind>(
                    state
                        .path_info_service
                        .as_ref()
                        .calculate_nar(&root_node)
                        .await
                        .map_err(|e| ErrorKind::TvixError(Rc::new(e)))?
                        .1,
                )
            })?))
        } else {
            let digest: B3Digest = match root_node {
                tvix_castore::proto::node::Node::File(ref fnode) => {
                    // It's already validated.
                    fnode.digest.clone().try_into().unwrap()
                }
                // We cannot hash anything else than file in flat import mode.
                _ => {
                    return Err(ImportError::FlatImportOfNonFile(
                        path.to_string_lossy().to_string(),
                    )
                    .into())
                }
            };

            // FUTUREWORK: avoid hashing again.
            CAHash::Flat(NixHash::Sha256(
                state
                    .tokio_handle
                    .block_on(async { state.blob_to_sha256_hash(digest).await })?,
            ))
        };

        let obtained_hash = ca.hash().clone().into_owned();
        let (path_info, output_path) = state.tokio_handle.block_on(async {
            state
                .node_to_path_info(name.as_ref(), path.as_ref(), ca, root_node)
                .await
        })?;

        if let Some(expected_sha256) = expected_sha256 {
            if obtained_hash != expected_sha256 {
                Err(ImportError::HashMismatch(
                    path.to_string_lossy().to_string(),
                    expected_sha256,
                    obtained_hash,
                ))?;
            }
        }

        let _: tvix_store::proto::PathInfo = state.tokio_handle.block_on(async {
            // This is necessary to cause the coercion of the error type.
            Ok::<_, std::io::Error>(state.path_info_service.as_ref().put(path_info).await?)
        })?;

        // We need to attach context to the final output path.
        let outpath = output_path.to_absolute_path();

        Ok(
            NixString::new_context_from(NixContextElement::Plain(outpath.clone()).into(), outpath)
                .into(),
        )
    }

    #[builtin("filterSource")]
    async fn builtin_filter_source(
        state: Rc<TvixStoreIO>,
        co: GenCo,
        #[lazy] filter: Value,
        path: Value,
    ) -> Result<Value, ErrorKind> {
        let p = path.to_path()?;
        let root_node = filtered_ingest(Rc::clone(&state), co, &p, Some(&filter)).await?;
        let name = tvix_store::import::path_to_name(&p)?;

        let outpath = state
            .tokio_handle
            .block_on(async {
                let (_, nar_sha256) = state
                    .path_info_service
                    .as_ref()
                    .calculate_nar(&root_node)
                    .await?;

                state
                    .register_node_in_path_info_service(
                        name,
                        &p,
                        CAHash::Nar(NixHash::Sha256(nar_sha256)),
                        root_node,
                    )
                    .await
            })
            .map_err(|err| ErrorKind::IO {
                path: Some(p.to_path_buf()),
                error: err.into(),
            })?
            .to_absolute_path();

        Ok(
            NixString::new_context_from(NixContextElement::Plain(outpath.clone()).into(), outpath)
                .into(),
        )
    }
}

pub use import_builtins::builtins as import_builtins;

use crate::tvix_store_io::TvixStoreIO;