diff options
Diffstat (limited to 'tvix/store/src')
-rw-r--r-- | tvix/store/src/nar/renderer.rs | 73 |
1 files changed, 45 insertions, 28 deletions
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index f510a9c76e60..313397dcf31b 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -5,7 +5,6 @@ use nix_compat::nar::writer::r#async as nar_writer; use sha2::{Digest, Sha256}; use std::{ pin::Pin, - sync::Arc, task::{self, Poll}, }; use tokio::io::{self, AsyncWrite, BufReader}; @@ -18,11 +17,15 @@ use tvix_castore::{ /// Invoke [write_nar], and return the size and sha256 digest of the produced /// NAR output. -pub async fn calculate_size_and_sha256( +pub async fn calculate_size_and_sha256<BS, DS>( root_node: &castorepb::node::Node, - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, -) -> Result<(u64, [u8; 32]), RenderError> { + blob_service: BS, + directory_service: DS, +) -> Result<(u64, [u8; 32]), RenderError> +where + BS: AsRef<dyn BlobService> + Send, + DS: AsRef<dyn DirectoryService> + Send, +{ let mut h = Sha256::new(); let mut cw = CountWrite::from(&mut h); @@ -68,12 +71,17 @@ impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> { /// and uses the passed blob_service and directory_service to perform the /// necessary lookups as it traverses the structure. /// The contents in NAR serialization are writen to the passed [AsyncWrite]. -pub async fn write_nar<W: AsyncWrite + Unpin + Send>( +pub async fn write_nar<W, BS, DS>( w: W, proto_root_node: &castorepb::node::Node, - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, -) -> Result<(), RenderError> { + blob_service: BS, + directory_service: DS, +) -> Result<(), RenderError> +where + W: AsyncWrite + Unpin + Send, + BS: AsRef<dyn BlobService> + Send, + DS: AsRef<dyn DirectoryService> + Send, +{ // Initialize NAR writer let mut w = w.compat_write(); let nar_root_node = nar_writer::open(&mut w) @@ -94,12 +102,16 @@ pub async fn write_nar<W: AsyncWrite + Unpin + Send>( /// Process an intermediate node in the structure. /// This consumes the node. #[async_recursion] -async fn walk_node( +async fn walk_node<BS, DS>( nar_node: nar_writer::Node<'async_recursion, '_>, proto_node: &castorepb::node::Node, - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, -) -> Result<(), RenderError> { + blob_service: BS, + directory_service: DS, +) -> Result<(BS, DS), RenderError> +where + BS: AsRef<dyn BlobService> + Send, + DS: AsRef<dyn DirectoryService> + Send, +{ match proto_node { castorepb::node::Node::Symlink(proto_symlink_node) => { nar_node @@ -117,6 +129,7 @@ async fn walk_node( })?; let blob_reader = match blob_service + .as_ref() .open_read(&digest) .await .map_err(RenderError::StoreError)? @@ -152,17 +165,16 @@ async fn walk_node( // look it up with the directory service match directory_service + .as_ref() .get(&digest) .await .map_err(|e| RenderError::StoreError(e.into()))? { // if it's None, that's an error! - None => { - return Err(RenderError::DirectoryNotFound( - digest, - proto_directory_node.name.clone(), - )) - } + None => Err(RenderError::DirectoryNotFound( + digest, + proto_directory_node.name.clone(), + ))?, Some(proto_directory) => { // start a directory node let mut nar_node_directory = nar_node @@ -170,20 +182,22 @@ async fn walk_node( .await .map_err(RenderError::NARWriterError)?; + // We put blob_service, directory_service back here whenever we come up from + // the recursion. + let mut blob_service = blob_service; + let mut directory_service = directory_service; + // for each node in the directory, create a new entry with its name, - // and then invoke walk_node on that entry. + // and then recurse on that entry. for proto_node in proto_directory.nodes() { let child_node = nar_node_directory .entry(proto_node.get_name()) .await .map_err(RenderError::NARWriterError)?; - walk_node( - child_node, - &proto_node, - blob_service.clone(), - directory_service.clone(), - ) - .await?; + + (blob_service, directory_service) = + walk_node(child_node, &proto_node, blob_service, directory_service) + .await?; } // close the directory @@ -191,9 +205,12 @@ async fn walk_node( .close() .await .map_err(RenderError::NARWriterError)?; + + return Ok((blob_service, directory_service)); } } } } - Ok(()) + + Ok((blob_service, directory_service)) } |