diff options
-rw-r--r-- | tvix/Cargo.lock | 1 | ||||
-rw-r--r-- | tvix/Cargo.nix | 4 | ||||
-rw-r--r-- | tvix/store/Cargo.toml | 1 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 93 |
4 files changed, 99 insertions, 0 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 1fff246e86c5..d53ced76aeb1 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -4528,6 +4528,7 @@ dependencies = [ "rstest", "rstest_reuse", "serde", + "serde_json", "serde_qs", "serde_with", "sha2", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index f4bc00bb3c43..6886685b63aa 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -14550,6 +14550,10 @@ rec { features = [ "derive" ]; } { + name = "serde_json"; + packageId = "serde_json"; + } + { name = "serde_qs"; packageId = "serde_qs"; } diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index dee959b4de9d..a62d57837013 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -21,6 +21,7 @@ opentelemetry = { version = "0.21.0", optional = true} opentelemetry-otlp = { version = "0.14.0", optional = true } opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"], optional = true} serde = { version = "1.0.197", features = [ "derive" ] } +serde_json = "1.0" serde_with = "3.7.0" serde_qs = "0.12.0" sha2 = "0.10.6" diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 27a67b7c91c1..a5845c74aed0 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -2,6 +2,9 @@ use clap::Parser; use clap::Subcommand; use futures::future::try_join_all; +use nix_compat::path_info::ExportedPathInfo; +use serde::Deserialize; +use serde::Serialize; use std::path::PathBuf; use std::sync::Arc; use tokio_listener::Listener; @@ -13,6 +16,9 @@ use tracing::Level; use tracing_subscriber::EnvFilter; use tracing_subscriber::Layer; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tvix_castore::import::ingest_path; +use tvix_store::proto::NarInfo; +use tvix_store::proto::PathInfo; use tvix_castore::proto::blob_service_server::BlobServiceServer; use tvix_castore::proto::directory_service_server::DirectoryServiceServer; @@ -101,6 +107,30 @@ enum Commands { #[arg(long, env, default_value = "grpc+http://[::1]:8000")] path_info_service_addr: String, }, + + /// Copies a list of store paths on the system into tvix-store. + Copy { + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// A path pointing to a JSON file produced by the Nix + /// `__structuredAttrs` containing reference graph information provided + /// by the `exportReferencesGraph` feature. + /// + /// This can be used to invoke tvix-store inside a Nix derivation + /// copying to a Tvix store (or outside, if the JSON file is copied + /// out). + /// + /// Currently limited to the `closure` key inside that JSON file. + #[arg(value_name = "NIX_ATTRS_JSON_FILE", env = "NIX_ATTRS_JSON_FILE")] + reference_graph_path: PathBuf, + }, /// Mounts a tvix-store at the given mountpoint #[cfg(feature = "fuse")] Mount { @@ -357,6 +387,69 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { try_join_all(tasks).await?; } + Commands::Copy { + blob_service_addr, + directory_service_addr, + path_info_service_addr, + reference_graph_path, + } => { + let (blob_service, directory_service, path_info_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + // Parse the file at reference_graph_path. + let reference_graph_json = tokio::fs::read(&reference_graph_path).await?; + + #[derive(Deserialize, Serialize)] + struct ReferenceGraph<'a> { + #[serde(borrow)] + closure: Vec<ExportedPathInfo<'a>>, + } + + let reference_graph: ReferenceGraph<'_> = + serde_json::from_slice(reference_graph_json.as_slice())?; + + // We currently simply upload all store paths in linear order. + // FUTUREWORK: properly walk the reference graph from the leaves, and upload multiple in parallel. + for elem in reference_graph.closure { + // Skip if that store path already exists + if path_info_service.get(*elem.path.digest()).await?.is_some() { + continue; + } + + let path: PathBuf = elem.path.to_absolute_path().into(); + // Ingest the given path + let root_node = + ingest_path(blob_service.clone(), directory_service.clone(), path).await?; + + // Create and upload a PathInfo pointing to the root_node, + // annotated with information we have from the reference graph. + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(root_node), + }), + references: Vec::from_iter( + elem.references.iter().map(|e| e.digest().to_vec().into()), + ), + narinfo: Some(NarInfo { + nar_size: elem.nar_size, + nar_sha256: elem.nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: Vec::from_iter( + elem.references.iter().map(|e| e.to_string()), + ), + deriver: None, + ca: None, + }), + }; + + path_info_service.put(path_info).await?; + } + } #[cfg(feature = "fuse")] Commands::Mount { dest, |