about summary refs log tree commit diff
path: root/tvix/store/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src')
-rw-r--r--tvix/store/src/nar/renderer.rs73
1 files changed, 45 insertions, 28 deletions
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index f510a9c76e60..313397dcf31b 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -5,7 +5,6 @@ use nix_compat::nar::writer::r#async as nar_writer;
 use sha2::{Digest, Sha256};
 use std::{
     pin::Pin,
-    sync::Arc,
     task::{self, Poll},
 };
 use tokio::io::{self, AsyncWrite, BufReader};
@@ -18,11 +17,15 @@ use tvix_castore::{
 
 /// Invoke [write_nar], and return the size and sha256 digest of the produced
 /// NAR output.
-pub async fn calculate_size_and_sha256(
+pub async fn calculate_size_and_sha256<BS, DS>(
     root_node: &castorepb::node::Node,
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-) -> Result<(u64, [u8; 32]), RenderError> {
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(u64, [u8; 32]), RenderError>
+where
+    BS: AsRef<dyn BlobService> + Send,
+    DS: AsRef<dyn DirectoryService> + Send,
+{
     let mut h = Sha256::new();
     let mut cw = CountWrite::from(&mut h);
 
@@ -68,12 +71,17 @@ impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
 /// and uses the passed blob_service and directory_service to perform the
 /// necessary lookups as it traverses the structure.
 /// The contents in NAR serialization are writen to the passed [AsyncWrite].
-pub async fn write_nar<W: AsyncWrite + Unpin + Send>(
+pub async fn write_nar<W, BS, DS>(
     w: W,
     proto_root_node: &castorepb::node::Node,
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-) -> Result<(), RenderError> {
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(), RenderError>
+where
+    W: AsyncWrite + Unpin + Send,
+    BS: AsRef<dyn BlobService> + Send,
+    DS: AsRef<dyn DirectoryService> + Send,
+{
     // Initialize NAR writer
     let mut w = w.compat_write();
     let nar_root_node = nar_writer::open(&mut w)
@@ -94,12 +102,16 @@ pub async fn write_nar<W: AsyncWrite + Unpin + Send>(
 /// Process an intermediate node in the structure.
 /// This consumes the node.
 #[async_recursion]
-async fn walk_node(
+async fn walk_node<BS, DS>(
     nar_node: nar_writer::Node<'async_recursion, '_>,
     proto_node: &castorepb::node::Node,
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-) -> Result<(), RenderError> {
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(BS, DS), RenderError>
+where
+    BS: AsRef<dyn BlobService> + Send,
+    DS: AsRef<dyn DirectoryService> + Send,
+{
     match proto_node {
         castorepb::node::Node::Symlink(proto_symlink_node) => {
             nar_node
@@ -117,6 +129,7 @@ async fn walk_node(
             })?;
 
             let blob_reader = match blob_service
+                .as_ref()
                 .open_read(&digest)
                 .await
                 .map_err(RenderError::StoreError)?
@@ -152,17 +165,16 @@ async fn walk_node(
 
             // look it up with the directory service
             match directory_service
+                .as_ref()
                 .get(&digest)
                 .await
                 .map_err(|e| RenderError::StoreError(e.into()))?
             {
                 // if it's None, that's an error!
-                None => {
-                    return Err(RenderError::DirectoryNotFound(
-                        digest,
-                        proto_directory_node.name.clone(),
-                    ))
-                }
+                None => Err(RenderError::DirectoryNotFound(
+                    digest,
+                    proto_directory_node.name.clone(),
+                ))?,
                 Some(proto_directory) => {
                     // start a directory node
                     let mut nar_node_directory = nar_node
@@ -170,20 +182,22 @@ async fn walk_node(
                         .await
                         .map_err(RenderError::NARWriterError)?;
 
+                    // We put blob_service, directory_service back here whenever we come up from
+                    // the recursion.
+                    let mut blob_service = blob_service;
+                    let mut directory_service = directory_service;
+
                     // for each node in the directory, create a new entry with its name,
-                    // and then invoke walk_node on that entry.
+                    // and then recurse on that entry.
                     for proto_node in proto_directory.nodes() {
                         let child_node = nar_node_directory
                             .entry(proto_node.get_name())
                             .await
                             .map_err(RenderError::NARWriterError)?;
-                        walk_node(
-                            child_node,
-                            &proto_node,
-                            blob_service.clone(),
-                            directory_service.clone(),
-                        )
-                        .await?;
+
+                        (blob_service, directory_service) =
+                            walk_node(child_node, &proto_node, blob_service, directory_service)
+                                .await?;
                     }
 
                     // close the directory
@@ -191,9 +205,12 @@ async fn walk_node(
                         .close()
                         .await
                         .map_err(RenderError::NARWriterError)?;
+
+                    return Ok((blob_service, directory_service));
                 }
             }
         }
     }
-    Ok(())
+
+    Ok((blob_service, directory_service))
 }