From 99f675ecef846ce2cbba70ac41b685f37c1a1103 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 9 Jan 2024 11:25:12 +0200 Subject: refactor(tvix/store/nar/import): use AsRef We need to be a bit careful and pass the BlobService around (similar to how we already do with the directory_putter), but that allows getting rid of a bunch of annoying trait bounds. We also stop spawning additional tasks where we can just use block_on. Change-Id: If36de0ee947d2c779d20a384308241d2262d4764 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10580 Reviewed-by: raitobezarius Tested-by: BuildkiteCI Autosubmit: flokli --- tvix/store/src/nar/import.rs | 51 ++++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 25 deletions(-) (limited to 'tvix') diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index cda5b4e05a61..20b922a61f2e 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,10 +1,6 @@ -use std::{ - io::{self, BufRead}, - ops::Deref, -}; - use bytes::Bytes; use nix_compat::nar; +use std::io::{self, BufRead}; use tokio_util::io::SyncIoBridge; use tracing::warn; use tvix_castore::{ @@ -28,19 +24,19 @@ pub fn read_nar( ) -> io::Result where R: BufRead + Send, - BS: Deref + Clone + Send + 'static, - DS: Deref, + BS: AsRef, + DS: AsRef, { let handle = tokio::runtime::Handle::current(); - let directory_putter = directory_service.put_multiple_start(); + let directory_putter = directory_service.as_ref().put_multiple_start(); let node = nix_compat::nar::reader::open(r)?; - let (root_node, mut directory_putter) = process_node( + let (root_node, mut directory_putter, _) = process_node( handle.clone(), "".into(), // this is the root node, it has an empty name node, - blob_service, + &blob_service, directory_putter, )?; @@ -84,9 +80,9 @@ fn process_node( node: nar::reader::Node, blob_service: BS, directory_putter: Box, -) -> io::Result<(castorepb::node::Node, Box)> +) -> io::Result<(castorepb::node::Node, Box, BS)> where - BS: Deref + Clone + Send + 'static, + BS: AsRef, { Ok(match node { nar::reader::Node::Symlink { target } => ( @@ -95,6 +91,7 @@ where target: target.into(), }), directory_putter, + blob_service, ), nar::reader::Node::File { executable, reader } => ( castorepb::node::Node::File(process_file_reader( @@ -102,17 +99,19 @@ where name, reader, executable, - blob_service, + &blob_service, )?), directory_putter, + blob_service, ), nar::reader::Node::Directory(dir_reader) => { - let (directory_node, directory_putter) = + let (directory_node, directory_putter, blob_service_back) = process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?; ( castorepb::node::Node::Directory(directory_node), directory_putter, + blob_service_back, ) } }) @@ -128,14 +127,13 @@ fn process_file_reader( blob_service: BS, ) -> io::Result where - BS: Deref + Clone + Send + 'static, + BS: AsRef, { // store the length. If we read any other length, reading will fail. let expected_len = file_reader.len(); // prepare writing a new blob. - let blob_writer = - handle.block_on(handle.spawn(async move { blob_service.open_write().await }))?; + let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await }); // write the blob. let mut blob_writer = { @@ -149,7 +147,7 @@ where }; // close the blob_writer, retrieve the digest. - let blob_digest = handle.block_on(handle.spawn(async move { blob_writer.close().await }))??; + let blob_digest = handle.block_on(async { blob_writer.close().await })?; Ok(castorepb::FileNode { name, @@ -170,22 +168,24 @@ fn process_dir_reader( mut dir_reader: nar::reader::DirReader, blob_service: BS, directory_putter: Box, -) -> io::Result<(castorepb::DirectoryNode, Box)> +) -> io::Result<(castorepb::DirectoryNode, Box, BS)> where - BS: Deref + Clone + Send + 'static, + BS: AsRef, { let mut directory = castorepb::Directory::default(); let mut directory_putter = directory_putter; + let mut blob_service = blob_service; while let Some(entry) = dir_reader.next()? { - let (node, directory_putter_back) = process_node( + let (node, directory_putter_back, blob_service_back) = process_node( handle.clone(), entry.name.into(), entry.node, - blob_service.clone(), + blob_service, directory_putter, )?; + blob_service = blob_service_back; directory_putter = directory_putter_back; match node { @@ -213,6 +213,7 @@ where size: directory_size, }, directory_putter, + blob_service, )) } @@ -238,8 +239,8 @@ mod test { #[tokio::test] async fn single_symlink() { - let blob_service: Arc = gen_blob_service().into(); - let directory_service: Arc = gen_directory_service().into(); + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); let handle = tokio::runtime::Handle::current(); @@ -267,7 +268,7 @@ mod test { #[tokio::test] async fn single_file() { let blob_service: Arc = gen_blob_service().into(); - let directory_service: Arc = gen_directory_service().into(); + let directory_service = gen_directory_service(); let handle = tokio::runtime::Handle::current(); -- cgit 1.4.1