about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/store/src/nar/import.rs51
1 files changed, 26 insertions, 25 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index cda5b4e05a61..20b922a61f2e 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -1,10 +1,6 @@
-use std::{
-    io::{self, BufRead},
-    ops::Deref,
-};
-
 use bytes::Bytes;
 use nix_compat::nar;
+use std::io::{self, BufRead};
 use tokio_util::io::SyncIoBridge;
 use tracing::warn;
 use tvix_castore::{
@@ -28,19 +24,19 @@ pub fn read_nar<R, BS, DS>(
 ) -> io::Result<castorepb::node::Node>
 where
     R: BufRead + Send,
-    BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
-    DS: Deref<Target = dyn DirectoryService>,
+    BS: AsRef<dyn BlobService>,
+    DS: AsRef<dyn DirectoryService>,
 {
     let handle = tokio::runtime::Handle::current();
 
-    let directory_putter = directory_service.put_multiple_start();
+    let directory_putter = directory_service.as_ref().put_multiple_start();
 
     let node = nix_compat::nar::reader::open(r)?;
-    let (root_node, mut directory_putter) = process_node(
+    let (root_node, mut directory_putter, _) = process_node(
         handle.clone(),
         "".into(), // this is the root node, it has an empty name
         node,
-        blob_service,
+        &blob_service,
         directory_putter,
     )?;
 
@@ -84,9 +80,9 @@ fn process_node<BS>(
     node: nar::reader::Node,
     blob_service: BS,
     directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)>
+) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)>
 where
-    BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
+    BS: AsRef<dyn BlobService>,
 {
     Ok(match node {
         nar::reader::Node::Symlink { target } => (
@@ -95,6 +91,7 @@ where
                 target: target.into(),
             }),
             directory_putter,
+            blob_service,
         ),
         nar::reader::Node::File { executable, reader } => (
             castorepb::node::Node::File(process_file_reader(
@@ -102,17 +99,19 @@ where
                 name,
                 reader,
                 executable,
-                blob_service,
+                &blob_service,
             )?),
             directory_putter,
+            blob_service,
         ),
         nar::reader::Node::Directory(dir_reader) => {
-            let (directory_node, directory_putter) =
+            let (directory_node, directory_putter, blob_service_back) =
                 process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
 
             (
                 castorepb::node::Node::Directory(directory_node),
                 directory_putter,
+                blob_service_back,
             )
         }
     })
@@ -128,14 +127,13 @@ fn process_file_reader<BS>(
     blob_service: BS,
 ) -> io::Result<castorepb::FileNode>
 where
-    BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
+    BS: AsRef<dyn BlobService>,
 {
     // store the length. If we read any other length, reading will fail.
     let expected_len = file_reader.len();
 
     // prepare writing a new blob.
-    let blob_writer =
-        handle.block_on(handle.spawn(async move { blob_service.open_write().await }))?;
+    let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await });
 
     // write the blob.
     let mut blob_writer = {
@@ -149,7 +147,7 @@ where
     };
 
     // close the blob_writer, retrieve the digest.
-    let blob_digest = handle.block_on(handle.spawn(async move { blob_writer.close().await }))??;
+    let blob_digest = handle.block_on(async { blob_writer.close().await })?;
 
     Ok(castorepb::FileNode {
         name,
@@ -170,22 +168,24 @@ fn process_dir_reader<BS>(
     mut dir_reader: nar::reader::DirReader,
     blob_service: BS,
     directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)>
+) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)>
 where
-    BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
+    BS: AsRef<dyn BlobService>,
 {
     let mut directory = castorepb::Directory::default();
 
     let mut directory_putter = directory_putter;
+    let mut blob_service = blob_service;
     while let Some(entry) = dir_reader.next()? {
-        let (node, directory_putter_back) = process_node(
+        let (node, directory_putter_back, blob_service_back) = process_node(
             handle.clone(),
             entry.name.into(),
             entry.node,
-            blob_service.clone(),
+            blob_service,
             directory_putter,
         )?;
 
+        blob_service = blob_service_back;
         directory_putter = directory_putter_back;
 
         match node {
@@ -213,6 +213,7 @@ where
             size: directory_size,
         },
         directory_putter,
+        blob_service,
     ))
 }
 
@@ -238,8 +239,8 @@ mod test {
 
     #[tokio::test]
     async fn single_symlink() {
-        let blob_service: Arc<dyn BlobService> = gen_blob_service().into();
-        let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into();
+        let blob_service = gen_blob_service();
+        let directory_service = gen_directory_service();
 
         let handle = tokio::runtime::Handle::current();
 
@@ -267,7 +268,7 @@ mod test {
     #[tokio::test]
     async fn single_file() {
         let blob_service: Arc<dyn BlobService> = gen_blob_service().into();
-        let directory_service: Arc<dyn DirectoryService> = gen_directory_service().into();
+        let directory_service = gen_directory_service();
 
         let handle = tokio::runtime::Handle::current();