about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-05-25T06·04+0300
committerclbot <clbot@tvl.fyi>2023-05-30T10·13+0000
commit48b66a89820f170d89bb94a7a000bc8f73879aa4 (patch)
tree06d12f90252aa2b27576484fcf35ba084b2401d9
parentdbff1289b8e5aa83413226411a9d9d2ce44721e9 (diff)
feat(tvix/store/bin/import): process all path imports concurrently r/6221
Change-Id: I3e1428a4725fc2e552e8f37bc0550121117fcef6
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8633
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
-rw-r--r--tvix/store/src/bin/tvix-store.rs83
1 files changed, 47 insertions, 36 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 5912c6464b..1e1c96c3bc 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -1,6 +1,8 @@
 use clap::Subcommand;
 use data_encoding::BASE64;
+use futures::future::try_join_all;
 use std::io;
+use std::path::Path;
 use std::path::PathBuf;
 use std::sync::Arc;
 use tracing_subscriber::prelude::*;
@@ -10,6 +12,7 @@ use tvix_store::nar::NonCachingNARCalculationService;
 use tvix_store::pathinfoservice::SledPathInfoService;
 use tvix_store::proto::blob_service_server::BlobServiceServer;
 use tvix_store::proto::directory_service_server::DirectoryServiceServer;
+use tvix_store::proto::node::Node;
 use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
 use tvix_store::proto::GRPCBlobServiceWrapper;
 use tvix_store::proto::GRPCDirectoryServiceWrapper;
@@ -137,44 +140,52 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 nar_calculation_service,
             ));
 
-            for path in paths {
-                let path_move = path.clone();
-                let io_move = io.clone();
-                let path_info = tokio::task::spawn_blocking(move || {
-                    io_move
-                        .import_path_with_pathinfo(&path_move)
-                        .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))
+            let tasks = paths
+                .iter()
+                .map(|path| {
+                    let io_move = io.clone();
+                    let path = path.clone();
+                    let task: tokio::task::JoinHandle<Result<(), io::Error>> =
+                        tokio::task::spawn_blocking(move || {
+                            let path_info = io_move.import_path_with_pathinfo(&path)?;
+                            print_node(&path_info.node.unwrap().node.unwrap(), &path);
+                            Ok(())
+                        });
+                    task
                 })
-                .await??;
-
-                match path_info.node.unwrap().node.unwrap() {
-                    tvix_store::proto::node::Node::Directory(directory_node) => {
-                        info!(
-                            path = ?path,
-                            name = directory_node.name,
-                            digest = BASE64.encode(&directory_node.digest),
-                            "import successful",
-                        )
-                    }
-                    tvix_store::proto::node::Node::File(file_node) => {
-                        info!(
-                            path = ?path,
-                            name = file_node.name,
-                            digest = BASE64.encode(&file_node.digest),
-                            "import successful"
-                        )
-                    }
-                    tvix_store::proto::node::Node::Symlink(symlink_node) => {
-                        info!(
-                            path = ?path,
-                            name = symlink_node.name,
-                            target = symlink_node.target,
-                            "import successful"
-                        )
-                    }
-                }
-            }
+                .collect::<Vec<tokio::task::JoinHandle<Result<(), io::Error>>>>();
+
+            try_join_all(tasks).await?;
         }
     };
     Ok(())
 }
+
+fn print_node(node: &Node, path: &Path) {
+    match node {
+        Node::Directory(directory_node) => {
+            info!(
+                path = ?path,
+                name = directory_node.name,
+                digest = BASE64.encode(&directory_node.digest),
+                "import successful",
+            )
+        }
+        Node::File(file_node) => {
+            info!(
+                path = ?path,
+                name = file_node.name,
+                digest = BASE64.encode(&file_node.digest),
+                "import successful"
+            )
+        }
+        Node::Symlink(symlink_node) => {
+            info!(
+                path = ?path,
+                name = symlink_node.name,
+                target = symlink_node.target,
+                "import successful"
+            )
+        }
+    }
+}