diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 68 |
1 files changed, 53 insertions, 15 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 7e362576a1dc..8262a9e98cd1 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -2,10 +2,11 @@ use clap::Parser; use clap::Subcommand; use futures::future::try_join_all; +use futures::StreamExt; +use futures::TryStreamExt; use nix_compat::path_info::ExportedPathInfo; use serde::Deserialize; use serde::Serialize; -use std::os::unix::ffi::OsStrExt; use std::path::PathBuf; use std::sync::Arc; use tokio_listener::Listener; @@ -422,21 +423,58 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let reference_graph: ReferenceGraph<'_> = serde_json::from_slice(reference_graph_json.as_slice())?; - // We currently simply upload all store paths in linear order. - // FUTUREWORK: properly walk the reference graph from the leaves, and upload multiple in parallel. - for elem in reference_graph.closure { - // Skip if that store path already exists - if path_info_service.get(*elem.path.digest()).await?.is_some() { - continue; - } - - let path: PathBuf = elem.path.to_absolute_path().into(); - let root_name = path.file_name().unwrap().as_bytes().to_vec().into(); - // Ingest the given path - let root_node = ingest_path(blob_service.clone(), directory_service.clone(), &path) - .await? - .rename(root_name); + // Arc the PathInfoService, as we clone it . + let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + + // From our reference graph, lookup all pathinfos that might exist. + let elems: Vec<_> = futures::stream::iter(reference_graph.closure) + .map(|elem| { + let path_info_service = path_info_service.clone(); + async move { + path_info_service + .get(*elem.path.digest()) + .await + .map(|resp| (elem, resp)) + } + }) + .buffer_unordered(50) + // Filter out all that are already uploaded. + // TODO: check if there's a better combinator for this + .try_filter_map(|(elem, path_info)| { + std::future::ready(if path_info.is_none() { + Ok(Some(elem)) + } else { + Ok(None) + }) + }) + .try_collect() + .await?; + + // Run ingest_path on all of them. + let uploads: Vec<_> = futures::stream::iter(elems) + .map(|elem| { + // Map to a future returning the root node, alongside with the closure info. + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + async move { + // Ingest the given path. + + ingest_path( + blob_service, + directory_service, + PathBuf::from(elem.path.to_absolute_path()), + ) + .await + .map(|root_node| (elem, root_node)) + } + }) + .buffer_unordered(10) + .try_collect() + .await?; + // Insert them into the PathInfoService. + // FUTUREWORK: do this properly respecting the reference graph. + for (elem, root_node) in uploads { // Create and upload a PathInfo pointing to the root_node, // annotated with information we have from the reference graph. let path_info = PathInfo { |