about summary refs log tree commit diff
path: root/tvix/store/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src')
-rw-r--r--tvix/store/src/nar/import.rs69
1 files changed, 40 insertions, 29 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index e9065a670d21..9be42aeafa0f 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -1,6 +1,6 @@
 use std::{
     io::{self, BufRead},
-    sync::Arc,
+    ops::Deref,
 };
 
 use bytes::Bytes;
@@ -21,11 +21,16 @@ use tvix_castore::{
 /// This function is not async (because the NAR reader is not)
 /// and calls [tokio::task::block_in_place] when interacting with backing
 /// services, so make sure to only call this with spawn_blocking.
-pub fn read_nar<R: BufRead + Send>(
+pub fn read_nar<R, BS, DS>(
     r: &mut R,
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-) -> io::Result<castorepb::node::Node> {
+    blob_service: BS,
+    directory_service: DS,
+) -> io::Result<castorepb::node::Node>
+where
+    R: BufRead + Send,
+    BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
+    DS: Deref<Target = dyn DirectoryService>,
+{
     let handle = tokio::runtime::Handle::current();
 
     let directory_putter = directory_service.put_multiple_start();
@@ -73,13 +78,16 @@ pub fn read_nar<R: BufRead + Send>(
 ///
 /// [DirectoryPutter] is passed around, so a single instance of it can be used,
 /// which is sufficient, as this reads through the whole NAR linerarly.
-fn process_node(
+fn process_node<BS>(
     handle: tokio::runtime::Handle,
     name: bytes::Bytes,
     node: nar::reader::Node,
-    blob_service: Arc<dyn BlobService>,
+    blob_service: BS,
     directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)> {
+) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)>
+where
+    BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
+{
     Ok(match node {
         nar::reader::Node::Symlink { target } => (
             castorepb::node::Node::Symlink(castorepb::SymlinkNode {
@@ -99,13 +107,8 @@ fn process_node(
             directory_putter,
         ),
         nar::reader::Node::Directory(dir_reader) => {
-            let (directory_node, directory_putter) = process_dir_reader(
-                handle,
-                name,
-                dir_reader,
-                blob_service.clone(),
-                directory_putter,
-            )?;
+            let (directory_node, directory_putter) =
+                process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
 
             (
                 castorepb::node::Node::Directory(directory_node),
@@ -117,21 +120,22 @@ fn process_node(
 
 /// Given a name and [nar::reader::FileReader], this ingests the file into the
 /// passed [BlobService] and returns a [castorepb::FileNode].
-fn process_file_reader(
+fn process_file_reader<BS>(
     handle: tokio::runtime::Handle,
     name: Bytes,
     mut file_reader: nar::reader::FileReader,
     executable: bool,
-    blob_service: Arc<dyn BlobService>,
-) -> io::Result<castorepb::FileNode> {
+    blob_service: BS,
+) -> io::Result<castorepb::FileNode>
+where
+    BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
+{
     // store the length. If we read any other length, reading will fail.
     let expected_len = file_reader.len();
 
     // prepare writing a new blob.
-    let blob_writer = handle.block_on(handle.spawn({
-        let blob_service = blob_service.clone();
-        async move { blob_service.open_write().await }
-    }))?;
+    let blob_writer =
+        handle.block_on(handle.spawn(async move { blob_service.open_write().await }))?;
 
     // write the blob.
     let mut blob_writer = {
@@ -160,13 +164,16 @@ fn process_file_reader(
 ///
 /// [DirectoryPutter] is passed around, so a single instance of it can be used,
 /// which is sufficient, as this reads through the whole NAR linerarly.
-fn process_dir_reader(
+fn process_dir_reader<BS>(
     handle: tokio::runtime::Handle,
     name: Bytes,
     mut dir_reader: nar::reader::DirReader,
-    blob_service: Arc<dyn BlobService>,
+    blob_service: BS,
     directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)> {
+) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)>
+where
+    BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
+{
     let mut directory = castorepb::Directory::default();
 
     let mut directory_putter = directory_putter;
@@ -228,14 +235,17 @@ mod test {
 
     #[tokio::test]
     async fn single_symlink() {
+        let blob_service = gen_blob_service();
+        let directory_service = gen_directory_service();
+
         let handle = tokio::runtime::Handle::current();
 
         let root_node = handle
             .spawn_blocking(|| {
                 read_nar(
                     &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
-                    gen_blob_service(),
-                    gen_directory_service(),
+                    blob_service,
+                    directory_service,
                 )
             })
             .await
@@ -254,17 +264,18 @@ mod test {
     #[tokio::test]
     async fn single_file() {
         let blob_service = gen_blob_service();
+        let directory_service = gen_directory_service();
 
         let handle = tokio::runtime::Handle::current();
 
         let root_node = handle
             .spawn_blocking({
                 let blob_service = blob_service.clone();
-                || {
+                move || {
                     read_nar(
                         &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
                         blob_service,
-                        gen_directory_service(),
+                        directory_service,
                     )
                 }
             })