about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
Diffstat (limited to 'tvix')
-rw-r--r--tvix/store/src/bin/tvix-store.rs68
1 files changed, 53 insertions, 15 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 7e362576a1dc..8262a9e98cd1 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -2,10 +2,11 @@ use clap::Parser;
 use clap::Subcommand;
 
 use futures::future::try_join_all;
+use futures::StreamExt;
+use futures::TryStreamExt;
 use nix_compat::path_info::ExportedPathInfo;
 use serde::Deserialize;
 use serde::Serialize;
-use std::os::unix::ffi::OsStrExt;
 use std::path::PathBuf;
 use std::sync::Arc;
 use tokio_listener::Listener;
@@ -422,21 +423,58 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             let reference_graph: ReferenceGraph<'_> =
                 serde_json::from_slice(reference_graph_json.as_slice())?;
 
-            // We currently simply upload all store paths in linear order.
-            // FUTUREWORK: properly walk the reference graph from the leaves, and upload multiple in parallel.
-            for elem in reference_graph.closure {
-                // Skip if that store path already exists
-                if path_info_service.get(*elem.path.digest()).await?.is_some() {
-                    continue;
-                }
-
-                let path: PathBuf = elem.path.to_absolute_path().into();
-                let root_name = path.file_name().unwrap().as_bytes().to_vec().into();
-                // Ingest the given path
-                let root_node = ingest_path(blob_service.clone(), directory_service.clone(), &path)
-                    .await?
-                    .rename(root_name);
+            // Arc the PathInfoService, as we clone it .
+            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+
+            // From our reference graph, lookup all pathinfos that might exist.
+            let elems: Vec<_> = futures::stream::iter(reference_graph.closure)
+                .map(|elem| {
+                    let path_info_service = path_info_service.clone();
+                    async move {
+                        path_info_service
+                            .get(*elem.path.digest())
+                            .await
+                            .map(|resp| (elem, resp))
+                    }
+                })
+                .buffer_unordered(50)
+                // Filter out all that are already uploaded.
+                // TODO: check if there's a better combinator for this
+                .try_filter_map(|(elem, path_info)| {
+                    std::future::ready(if path_info.is_none() {
+                        Ok(Some(elem))
+                    } else {
+                        Ok(None)
+                    })
+                })
+                .try_collect()
+                .await?;
+
+            // Run ingest_path on all of them.
+            let uploads: Vec<_> = futures::stream::iter(elems)
+                .map(|elem| {
+                    // Map to a future returning the root node, alongside with the closure info.
+                    let blob_service = blob_service.clone();
+                    let directory_service = directory_service.clone();
+                    async move {
+                        // Ingest the given path.
+
+                        ingest_path(
+                            blob_service,
+                            directory_service,
+                            PathBuf::from(elem.path.to_absolute_path()),
+                        )
+                        .await
+                        .map(|root_node| (elem, root_node))
+                    }
+                })
+                .buffer_unordered(10)
+                .try_collect()
+                .await?;
 
+            // Insert them into the PathInfoService.
+            // FUTUREWORK: do this properly respecting the reference graph.
+            for (elem, root_node) in uploads {
                 // Create and upload a PathInfo pointing to the root_node,
                 // annotated with information we have from the reference graph.
                 let path_info = PathInfo {