From 48b66a89820f170d89bb94a7a000bc8f73879aa4 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Thu, 25 May 2023 09:04:13 +0300 Subject: feat(tvix/store/bin/import): process all path imports concurrently Change-Id: I3e1428a4725fc2e552e8f37bc0550121117fcef6 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8633 Tested-by: BuildkiteCI Autosubmit: flokli Reviewed-by: tazjin --- tvix/store/src/bin/tvix-store.rs | 83 +++++++++++++++++++++++----------------- 1 file changed, 47 insertions(+), 36 deletions(-) diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 5912c6464bfe..1e1c96c3bce9 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -1,6 +1,8 @@ use clap::Subcommand; use data_encoding::BASE64; +use futures::future::try_join_all; use std::io; +use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use tracing_subscriber::prelude::*; @@ -10,6 +12,7 @@ use tvix_store::nar::NonCachingNARCalculationService; use tvix_store::pathinfoservice::SledPathInfoService; use tvix_store::proto::blob_service_server::BlobServiceServer; use tvix_store::proto::directory_service_server::DirectoryServiceServer; +use tvix_store::proto::node::Node; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCBlobServiceWrapper; use tvix_store::proto::GRPCDirectoryServiceWrapper; @@ -137,44 +140,52 @@ async fn main() -> Result<(), Box> { nar_calculation_service, )); - for path in paths { - let path_move = path.clone(); - let io_move = io.clone(); - let path_info = tokio::task::spawn_blocking(move || { - io_move - .import_path_with_pathinfo(&path_move) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) + let tasks = paths + .iter() + .map(|path| { + let io_move = io.clone(); + let path = path.clone(); + let task: tokio::task::JoinHandle> = + tokio::task::spawn_blocking(move || { + let path_info = io_move.import_path_with_pathinfo(&path)?; + print_node(&path_info.node.unwrap().node.unwrap(), &path); + Ok(()) + }); + task }) - .await??; - - match path_info.node.unwrap().node.unwrap() { - tvix_store::proto::node::Node::Directory(directory_node) => { - info!( - path = ?path, - name = directory_node.name, - digest = BASE64.encode(&directory_node.digest), - "import successful", - ) - } - tvix_store::proto::node::Node::File(file_node) => { - info!( - path = ?path, - name = file_node.name, - digest = BASE64.encode(&file_node.digest), - "import successful" - ) - } - tvix_store::proto::node::Node::Symlink(symlink_node) => { - info!( - path = ?path, - name = symlink_node.name, - target = symlink_node.target, - "import successful" - ) - } - } - } + .collect::>>>(); + + try_join_all(tasks).await?; } }; Ok(()) } + +fn print_node(node: &Node, path: &Path) { + match node { + Node::Directory(directory_node) => { + info!( + path = ?path, + name = directory_node.name, + digest = BASE64.encode(&directory_node.digest), + "import successful", + ) + } + Node::File(file_node) => { + info!( + path = ?path, + name = file_node.name, + digest = BASE64.encode(&file_node.digest), + "import successful" + ) + } + Node::Symlink(symlink_node) => { + info!( + path = ?path, + name = symlink_node.name, + target = symlink_node.target, + "import successful" + ) + } + } +} -- cgit 1.4.1