diff options
Diffstat (limited to 'tvix/store/src/nar')
-rw-r--r-- | tvix/store/src/nar/import.rs | 69 |
1 files changed, 40 insertions, 29 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index e9065a670d21..9be42aeafa0f 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,6 +1,6 @@ use std::{ io::{self, BufRead}, - sync::Arc, + ops::Deref, }; use bytes::Bytes; @@ -21,11 +21,16 @@ use tvix_castore::{ /// This function is not async (because the NAR reader is not) /// and calls [tokio::task::block_in_place] when interacting with backing /// services, so make sure to only call this with spawn_blocking. -pub fn read_nar<R: BufRead + Send>( +pub fn read_nar<R, BS, DS>( r: &mut R, - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, -) -> io::Result<castorepb::node::Node> { + blob_service: BS, + directory_service: DS, +) -> io::Result<castorepb::node::Node> +where + R: BufRead + Send, + BS: Deref<Target = dyn BlobService> + Clone + Send + 'static, + DS: Deref<Target = dyn DirectoryService>, +{ let handle = tokio::runtime::Handle::current(); let directory_putter = directory_service.put_multiple_start(); @@ -73,13 +78,16 @@ pub fn read_nar<R: BufRead + Send>( /// /// [DirectoryPutter] is passed around, so a single instance of it can be used, /// which is sufficient, as this reads through the whole NAR linerarly. -fn process_node( +fn process_node<BS>( handle: tokio::runtime::Handle, name: bytes::Bytes, node: nar::reader::Node, - blob_service: Arc<dyn BlobService>, + blob_service: BS, directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)> { +) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)> +where + BS: Deref<Target = dyn BlobService> + Clone + Send + 'static, +{ Ok(match node { nar::reader::Node::Symlink { target } => ( castorepb::node::Node::Symlink(castorepb::SymlinkNode { @@ -99,13 +107,8 @@ fn process_node( directory_putter, ), nar::reader::Node::Directory(dir_reader) => { - let (directory_node, directory_putter) = process_dir_reader( - handle, - name, - dir_reader, - blob_service.clone(), - directory_putter, - )?; + let (directory_node, directory_putter) = + process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?; ( castorepb::node::Node::Directory(directory_node), @@ -117,21 +120,22 @@ fn process_node( /// Given a name and [nar::reader::FileReader], this ingests the file into the /// passed [BlobService] and returns a [castorepb::FileNode]. -fn process_file_reader( +fn process_file_reader<BS>( handle: tokio::runtime::Handle, name: Bytes, mut file_reader: nar::reader::FileReader, executable: bool, - blob_service: Arc<dyn BlobService>, -) -> io::Result<castorepb::FileNode> { + blob_service: BS, +) -> io::Result<castorepb::FileNode> +where + BS: Deref<Target = dyn BlobService> + Clone + Send + 'static, +{ // store the length. If we read any other length, reading will fail. let expected_len = file_reader.len(); // prepare writing a new blob. - let blob_writer = handle.block_on(handle.spawn({ - let blob_service = blob_service.clone(); - async move { blob_service.open_write().await } - }))?; + let blob_writer = + handle.block_on(handle.spawn(async move { blob_service.open_write().await }))?; // write the blob. let mut blob_writer = { @@ -160,13 +164,16 @@ fn process_file_reader( /// /// [DirectoryPutter] is passed around, so a single instance of it can be used, /// which is sufficient, as this reads through the whole NAR linerarly. -fn process_dir_reader( +fn process_dir_reader<BS>( handle: tokio::runtime::Handle, name: Bytes, mut dir_reader: nar::reader::DirReader, - blob_service: Arc<dyn BlobService>, + blob_service: BS, directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)> { +) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)> +where + BS: Deref<Target = dyn BlobService> + Clone + Send + 'static, +{ let mut directory = castorepb::Directory::default(); let mut directory_putter = directory_putter; @@ -228,14 +235,17 @@ mod test { #[tokio::test] async fn single_symlink() { + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + let handle = tokio::runtime::Handle::current(); let root_node = handle .spawn_blocking(|| { read_nar( &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), - gen_blob_service(), - gen_directory_service(), + blob_service, + directory_service, ) }) .await @@ -254,17 +264,18 @@ mod test { #[tokio::test] async fn single_file() { let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); let handle = tokio::runtime::Handle::current(); let root_node = handle .spawn_blocking({ let blob_service = blob_service.clone(); - || { + move || { read_nar( &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), blob_service, - gen_directory_service(), + directory_service, ) } }) |