about summary refs log tree commit diff
path: root/tvix/store/src/nar/renderer.rs
use super::RenderError;
use count_write::CountWrite;
use nix_compat::nar;
use sha2::{Digest, Sha256};
use std::{io, sync::Arc};
use tokio::{io::BufReader, task::spawn_blocking};
use tracing::warn;
use tvix_castore::{
    blobservice::BlobService,
    directoryservice::DirectoryService,
    proto::{self as castorepb, NamedNode},
    Error,
};

/// Invoke [write_nar], and return the size and sha256 digest of the produced
/// NAR output.
pub async fn calculate_size_and_sha256(
    root_node: &castorepb::node::Node,
    blob_service: Arc<dyn BlobService>,
    directory_service: Arc<dyn DirectoryService>,
) -> Result<(u64, [u8; 32]), RenderError> {
    let h = Sha256::new();
    let cw = CountWrite::from(h);

    let cw = write_nar(cw, root_node, blob_service, directory_service).await?;

    Ok((cw.count(), cw.into_inner().finalize().into()))
}

/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path,
/// and uses the passed blob_service and directory_service to perform the
/// necessary lookups as it traverses the structure.
/// The contents in NAR serialization are writen to the passed [std::io::Write].
///
/// The writer is passed back in the return value. This is done because async Rust
/// lacks scoped blocking tasks, so we need to transfer ownership of the writer
/// internally.
///
/// # Panics
/// This will panic if called outside the context of a Tokio runtime.
pub async fn write_nar<W: std::io::Write + Send + 'static>(
    mut w: W,
    proto_root_node: &castorepb::node::Node,
    blob_service: Arc<dyn BlobService>,
    directory_service: Arc<dyn DirectoryService>,
) -> Result<W, RenderError> {
    let tokio_handle = tokio::runtime::Handle::current();
    let proto_root_node = proto_root_node.clone();

    spawn_blocking(move || {
        // Initialize NAR writer
        let nar_root_node = nar::writer::open(&mut w).map_err(RenderError::NARWriterError)?;

        walk_node(
            tokio_handle,
            nar_root_node,
            &proto_root_node,
            blob_service,
            directory_service,
        )?;

        Ok(w)
    })
    .await
    .unwrap()
}

/// Process an intermediate node in the structure.
/// This consumes the node.
fn walk_node(
    tokio_handle: tokio::runtime::Handle,
    nar_node: nar::writer::Node,
    proto_node: &castorepb::node::Node,
    blob_service: Arc<dyn BlobService>,
    directory_service: Arc<dyn DirectoryService>,
) -> Result<(), RenderError> {
    match proto_node {
        castorepb::node::Node::Symlink(proto_symlink_node) => {
            nar_node
                .symlink(&proto_symlink_node.target)
                .map_err(RenderError::NARWriterError)?;
        }
        castorepb::node::Node::File(proto_file_node) => {
            let digest = proto_file_node.digest.clone().try_into().map_err(|_e| {
                warn!(
                    file_node = ?proto_file_node,
                    "invalid digest length in file node",
                );

                RenderError::StoreError(Error::StorageError(
                    "invalid digest len in file node".to_string(),
                ))
            })?;

            let blob_reader = match tokio_handle
                .block_on(async { blob_service.open_read(&digest).await })
                .map_err(RenderError::StoreError)?
            {
                Some(blob_reader) => Ok(BufReader::new(blob_reader)),
                None => Err(RenderError::NARWriterError(io::Error::new(
                    io::ErrorKind::NotFound,
                    format!("blob with digest {} not found", &digest),
                ))),
            }?;

            nar_node
                .file(
                    proto_file_node.executable,
                    proto_file_node.size.into(),
                    &mut tokio_util::io::SyncIoBridge::new(blob_reader),
                )
                .map_err(RenderError::NARWriterError)?;
        }
        castorepb::node::Node::Directory(proto_directory_node) => {
            let digest = proto_directory_node
                .digest
                .clone()
                .try_into()
                .map_err(|_e| {
                    RenderError::StoreError(Error::StorageError(
                        "invalid digest len in directory node".to_string(),
                    ))
                })?;

            // look it up with the directory service
            match tokio_handle
                .block_on(async { directory_service.get(&digest).await })
                .map_err(RenderError::StoreError)?
            {
                // if it's None, that's an error!
                None => {
                    return Err(RenderError::DirectoryNotFound(
                        digest,
                        proto_directory_node.name.clone(),
                    ))
                }
                Some(proto_directory) => {
                    // start a directory node
                    let mut nar_node_directory =
                        nar_node.directory().map_err(RenderError::NARWriterError)?;

                    // for each node in the directory, create a new entry with its name,
                    // and then invoke walk_node on that entry.
                    for proto_node in proto_directory.nodes() {
                        let child_node = nar_node_directory
                            .entry(proto_node.get_name())
                            .map_err(RenderError::NARWriterError)?;
                        walk_node(
                            tokio_handle.clone(),
                            child_node,
                            &proto_node,
                            blob_service.clone(),
                            directory_service.clone(),
                        )?;
                    }

                    // close the directory
                    nar_node_directory
                        .close()
                        .map_err(RenderError::NARWriterError)?;
                }
            }
        }
    }
    Ok(())
}