use super::RenderError; use count_write::CountWrite; use nix_compat::nar; use sha2::{Digest, Sha256}; use std::{io, sync::Arc}; use tokio::{io::BufReader, task::spawn_blocking}; use tracing::warn; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, proto::{self as castorepb, NamedNode}, Error, }; /// Invoke [write_nar], and return the size and sha256 digest of the produced /// NAR output. pub async fn calculate_size_and_sha256( root_node: &castorepb::node::Node, blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) -> Result<(u64, [u8; 32]), RenderError> { let h = Sha256::new(); let cw = CountWrite::from(h); let cw = write_nar(cw, root_node, blob_service, directory_service).await?; Ok((cw.count(), cw.into_inner().finalize().into())) } /// Accepts a [castorepb::node::Node] pointing to the root of a (store) path, /// and uses the passed blob_service and directory_service to perform the /// necessary lookups as it traverses the structure. /// The contents in NAR serialization are writen to the passed [std::io::Write]. /// /// The writer is passed back in the return value. This is done because async Rust /// lacks scoped blocking tasks, so we need to transfer ownership of the writer /// internally. /// /// # Panics /// This will panic if called outside the context of a Tokio runtime. pub async fn write_nar<W: std::io::Write + Send + 'static>( mut w: W, proto_root_node: &castorepb::node::Node, blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) -> Result<W, RenderError> { let tokio_handle = tokio::runtime::Handle::current(); let proto_root_node = proto_root_node.clone(); spawn_blocking(move || { // Initialize NAR writer let nar_root_node = nar::writer::open(&mut w).map_err(RenderError::NARWriterError)?; walk_node( tokio_handle, nar_root_node, &proto_root_node, blob_service, directory_service, )?; Ok(w) }) .await .unwrap() } /// Process an intermediate node in the structure. /// This consumes the node. fn walk_node( tokio_handle: tokio::runtime::Handle, nar_node: nar::writer::Node, proto_node: &castorepb::node::Node, blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) -> Result<(), RenderError> { match proto_node { castorepb::node::Node::Symlink(proto_symlink_node) => { nar_node .symlink(&proto_symlink_node.target) .map_err(RenderError::NARWriterError)?; } castorepb::node::Node::File(proto_file_node) => { let digest = proto_file_node.digest.clone().try_into().map_err(|_e| { warn!( file_node = ?proto_file_node, "invalid digest length in file node", ); RenderError::StoreError(Error::StorageError( "invalid digest len in file node".to_string(), )) })?; let blob_reader = match tokio_handle .block_on(async { blob_service.open_read(&digest).await }) .map_err(RenderError::StoreError)? { Some(blob_reader) => Ok(BufReader::new(blob_reader)), None => Err(RenderError::NARWriterError(io::Error::new( io::ErrorKind::NotFound, format!("blob with digest {} not found", &digest), ))), }?; nar_node .file( proto_file_node.executable, proto_file_node.size.into(), &mut tokio_util::io::SyncIoBridge::new(blob_reader), ) .map_err(RenderError::NARWriterError)?; } castorepb::node::Node::Directory(proto_directory_node) => { let digest = proto_directory_node .digest .clone() .try_into() .map_err(|_e| { RenderError::StoreError(Error::StorageError( "invalid digest len in directory node".to_string(), )) })?; // look it up with the directory service match tokio_handle .block_on(async { directory_service.get(&digest).await }) .map_err(RenderError::StoreError)? { // if it's None, that's an error! None => { return Err(RenderError::DirectoryNotFound( digest, proto_directory_node.name.clone(), )) } Some(proto_directory) => { // start a directory node let mut nar_node_directory = nar_node.directory().map_err(RenderError::NARWriterError)?; // for each node in the directory, create a new entry with its name, // and then invoke walk_node on that entry. for proto_node in proto_directory.nodes() { let child_node = nar_node_directory .entry(proto_node.get_name()) .map_err(RenderError::NARWriterError)?; walk_node( tokio_handle.clone(), child_node, &proto_node, blob_service.clone(), directory_service.clone(), )?; } // close the directory nar_node_directory .close() .map_err(RenderError::NARWriterError)?; } } } } Ok(()) }