From 09a92b78d27dbf25210169a35c30bcff0b01b2ec Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Mon, 1 Jan 2024 03:50:51 +0200 Subject: refactor(tvix/store/nar/import): be a bit more generic Change-Id: If9a536949f36f428abea1a893f937fe7063e2f41 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10517 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: raitobezarius --- tvix/store/src/nar/import.rs | 69 +++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 29 deletions(-) (limited to 'tvix') diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index e9065a670d21..9be42aeafa0f 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,6 +1,6 @@ use std::{ io::{self, BufRead}, - sync::Arc, + ops::Deref, }; use bytes::Bytes; @@ -21,11 +21,16 @@ use tvix_castore::{ /// This function is not async (because the NAR reader is not) /// and calls [tokio::task::block_in_place] when interacting with backing /// services, so make sure to only call this with spawn_blocking. -pub fn read_nar( +pub fn read_nar( r: &mut R, - blob_service: Arc, - directory_service: Arc, -) -> io::Result { + blob_service: BS, + directory_service: DS, +) -> io::Result +where + R: BufRead + Send, + BS: Deref + Clone + Send + 'static, + DS: Deref, +{ let handle = tokio::runtime::Handle::current(); let directory_putter = directory_service.put_multiple_start(); @@ -73,13 +78,16 @@ pub fn read_nar( /// /// [DirectoryPutter] is passed around, so a single instance of it can be used, /// which is sufficient, as this reads through the whole NAR linerarly. -fn process_node( +fn process_node( handle: tokio::runtime::Handle, name: bytes::Bytes, node: nar::reader::Node, - blob_service: Arc, + blob_service: BS, directory_putter: Box, -) -> io::Result<(castorepb::node::Node, Box)> { +) -> io::Result<(castorepb::node::Node, Box)> +where + BS: Deref + Clone + Send + 'static, +{ Ok(match node { nar::reader::Node::Symlink { target } => ( castorepb::node::Node::Symlink(castorepb::SymlinkNode { @@ -99,13 +107,8 @@ fn process_node( directory_putter, ), nar::reader::Node::Directory(dir_reader) => { - let (directory_node, directory_putter) = process_dir_reader( - handle, - name, - dir_reader, - blob_service.clone(), - directory_putter, - )?; + let (directory_node, directory_putter) = + process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?; ( castorepb::node::Node::Directory(directory_node), @@ -117,21 +120,22 @@ fn process_node( /// Given a name and [nar::reader::FileReader], this ingests the file into the /// passed [BlobService] and returns a [castorepb::FileNode]. -fn process_file_reader( +fn process_file_reader( handle: tokio::runtime::Handle, name: Bytes, mut file_reader: nar::reader::FileReader, executable: bool, - blob_service: Arc, -) -> io::Result { + blob_service: BS, +) -> io::Result +where + BS: Deref + Clone + Send + 'static, +{ // store the length. If we read any other length, reading will fail. let expected_len = file_reader.len(); // prepare writing a new blob. - let blob_writer = handle.block_on(handle.spawn({ - let blob_service = blob_service.clone(); - async move { blob_service.open_write().await } - }))?; + let blob_writer = + handle.block_on(handle.spawn(async move { blob_service.open_write().await }))?; // write the blob. let mut blob_writer = { @@ -160,13 +164,16 @@ fn process_file_reader( /// /// [DirectoryPutter] is passed around, so a single instance of it can be used, /// which is sufficient, as this reads through the whole NAR linerarly. -fn process_dir_reader( +fn process_dir_reader( handle: tokio::runtime::Handle, name: Bytes, mut dir_reader: nar::reader::DirReader, - blob_service: Arc, + blob_service: BS, directory_putter: Box, -) -> io::Result<(castorepb::DirectoryNode, Box)> { +) -> io::Result<(castorepb::DirectoryNode, Box)> +where + BS: Deref + Clone + Send + 'static, +{ let mut directory = castorepb::Directory::default(); let mut directory_putter = directory_putter; @@ -228,14 +235,17 @@ mod test { #[tokio::test] async fn single_symlink() { + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + let handle = tokio::runtime::Handle::current(); let root_node = handle .spawn_blocking(|| { read_nar( &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), - gen_blob_service(), - gen_directory_service(), + blob_service, + directory_service, ) }) .await @@ -254,17 +264,18 @@ mod test { #[tokio::test] async fn single_file() { let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); let handle = tokio::runtime::Handle::current(); let root_node = handle .spawn_blocking({ let blob_service = blob_service.clone(); - || { + move || { read_nar( &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), blob_service, - gen_directory_service(), + directory_service, ) } }) -- cgit 1.4.1