diff options
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 212 |
1 files changed, 102 insertions, 110 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 15f37d301f..03c699b893 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -2,6 +2,8 @@ use clap::Parser; use clap::Subcommand; use futures::future::try_join_all; +use futures::StreamExt; +use futures::TryStreamExt; use nix_compat::path_info::ExportedPathInfo; use serde::Deserialize; use serde::Serialize; @@ -11,12 +13,10 @@ use tokio_listener::Listener; use tokio_listener::SystemOptions; use tokio_listener::UserOptions; use tonic::transport::Server; -use tracing::info; -use tracing::Level; -use tracing_subscriber::EnvFilter; -use tracing_subscriber::Layer; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use tvix_castore::import::ingest_path; +use tracing::{info, info_span, instrument, Level, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt as _; +use tvix_castore::import::fs::ingest_path; +use tvix_store::nar::NarCalculationService; use tvix_store::proto::NarInfo; use tvix_store::proto::PathInfo; @@ -34,15 +34,6 @@ use tvix_store::pathinfoservice::make_fs; #[cfg(feature = "fuse")] use tvix_castore::fs::fuse::FuseDaemon; -#[cfg(feature = "otlp")] -use opentelemetry::KeyValue; -#[cfg(feature = "otlp")] -use opentelemetry_sdk::{ - resource::{ResourceDetector, SdkProvidedResourceDetector}, - trace::BatchConfig, - Resource, -}; - #[cfg(feature = "virtiofs")] use tvix_castore::fs::virtiofs::start_virtiofs_daemon; @@ -54,10 +45,6 @@ use tvix_store::proto::FILE_DESCRIPTOR_SET; #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { - /// Whether to log in JSON - #[arg(long)] - json: bool, - /// Whether to configure OTLP. Set --otlp=false to disable. #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))] otlp: bool, @@ -66,8 +53,8 @@ struct Cli { /// It's also possible to set `RUST_LOG` according to /// `tracing_subscriber::filter::EnvFilter`, which will always have /// priority. - #[arg(long)] - log_level: Option<Level>, + #[arg(long, default_value_t=Level::INFO)] + log_level: Level, #[command(subcommand)] command: Commands, @@ -80,7 +67,11 @@ enum Commands { #[arg(long, short = 'l')] listen_address: Option<String>, - #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")] + #[arg( + long, + env, + default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store" + )] blob_service_addr: String, #[arg( @@ -209,87 +200,22 @@ fn default_threads() -> usize { } #[tokio::main] +#[instrument(fields(indicatif.pb_show=1))] async fn main() -> Result<(), Box<dyn std::error::Error>> { let cli = Cli::parse(); - // configure log settings - let level = cli.log_level.unwrap_or(Level::INFO); - - // Set up the tracing subscriber. - let subscriber = tracing_subscriber::registry() - .with( - cli.json.then_some( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .json() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), - ), - ) - .with( - (!cli.json).then_some( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .pretty() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), - ), - ); - - // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI) - // then init the registry. - // If the feature is feature-flagged out, just init without adding the layer. - // It's necessary to do this separately, as every with() call chains the - // layer into the type of the registry. #[cfg(feature = "otlp")] { - let subscriber = if cli.otlp { - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(opentelemetry_otlp::new_exporter().tonic()) - .with_batch_config(BatchConfig::default()) - .with_trace_config(opentelemetry_sdk::trace::config().with_resource({ - // use SdkProvidedResourceDetector.detect to detect resources, - // but replace the default service name with our default. - // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 - let resources = - SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); - // SdkProvidedResourceDetector currently always sets - // `service.name`, but we don't like its default. - if resources.get("service.name".into()).unwrap() == "unknown_service".into() { - resources.merge(&Resource::new([KeyValue::new( - "service.name", - "tvix.store", - )])) - } else { - resources - } - })) - .install_batch(opentelemetry_sdk::runtime::Tokio)?; - - // Create a tracing layer with the configured tracer - let layer = tracing_opentelemetry::layer().with_tracer(tracer); - - subscriber.with(Some(layer)) + if cli.otlp { + tvix_tracing::init_with_otlp(cli.log_level, "tvix.store")?; } else { - subscriber.with(None) - }; - - subscriber.try_init()?; + tvix_tracing::init(cli.log_level)?; + } } - // Init the registry (when otlp is not enabled) #[cfg(not(feature = "otlp"))] { - subscriber.try_init()?; + tvix_tracing::init(cli.log_level)?; } match cli.command { @@ -300,7 +226,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { path_info_service_addr, } => { // initialize stores - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -325,6 +251,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { )) .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( Arc::from(path_info_service), + nar_calculation_service, ))); #[cfg(feature = "tonic-reflection")] @@ -354,7 +281,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { path_info_service_addr, } => { // FUTUREWORK: allow flat for single files? - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -362,16 +289,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { ) .await?; - // Arc the PathInfoService, as we clone it . + // Arc PathInfoService and NarCalculationService, as we clone it . let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + let nar_calculation_service: Arc<dyn NarCalculationService> = + nar_calculation_service.into(); + + let root_span = { + let s = Span::current(); + s.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + s.pb_set_message("Importing paths"); + s.pb_set_length(paths.len() as u64); + s.pb_start(); + s + }; let tasks = paths .into_iter() .map(|path| { + let paths_span = root_span.clone(); tokio::task::spawn({ let blob_service = blob_service.clone(); let directory_service = directory_service.clone(); let path_info_service = path_info_service.clone(); + let nar_calculation_service = nar_calculation_service.clone(); async move { if let Ok(name) = tvix_store::import::path_to_name(&path) { @@ -381,6 +321,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { blob_service, directory_service, path_info_service, + nar_calculation_service, ) .await; if let Ok(output_path) = resp { @@ -388,6 +329,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { println!("{}", output_path.to_absolute_path()); } } + paths_span.pb_inc(1); } }) }) @@ -401,7 +343,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { path_info_service_addr, reference_graph_path, } => { - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -421,19 +363,69 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let reference_graph: ReferenceGraph<'_> = serde_json::from_slice(reference_graph_json.as_slice())?; - // We currently simply upload all store paths in linear order. - // FUTUREWORK: properly walk the reference graph from the leaves, and upload multiple in parallel. - for elem in reference_graph.closure { - // Skip if that store path already exists - if path_info_service.get(*elem.path.digest()).await?.is_some() { - continue; - } + // Arc the PathInfoService, as we clone it . + let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); - let path: PathBuf = elem.path.to_absolute_path().into(); - // Ingest the given path - let root_node = - ingest_path(blob_service.clone(), directory_service.clone(), path).await?; + let lookups_span = info_span!( + "lookup pathinfos", + "indicatif.pb_show" = tracing::field::Empty + ); + lookups_span.pb_set_length(reference_graph.closure.len() as u64); + lookups_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + lookups_span.pb_start(); + + // From our reference graph, lookup all pathinfos that might exist. + let elems: Vec<_> = futures::stream::iter(reference_graph.closure) + .map(|elem| { + let path_info_service = path_info_service.clone(); + async move { + let resp = path_info_service + .get(*elem.path.digest()) + .await + .map(|resp| (elem, resp)); + + Span::current().pb_inc(1); + resp + } + }) + .buffer_unordered(50) + // Filter out all that are already uploaded. + // TODO: check if there's a better combinator for this + .try_filter_map(|(elem, path_info)| { + std::future::ready(if path_info.is_none() { + Ok(Some(elem)) + } else { + Ok(None) + }) + }) + .try_collect() + .await?; + + // Run ingest_path on all of them. + let uploads: Vec<_> = futures::stream::iter(elems) + .map(|elem| { + // Map to a future returning the root node, alongside with the closure info. + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + async move { + // Ingest the given path. + + ingest_path( + blob_service, + directory_service, + PathBuf::from(elem.path.to_absolute_path()), + ) + .await + .map(|root_node| (elem, root_node)) + } + }) + .buffer_unordered(10) + .try_collect() + .await?; + // Insert them into the PathInfoService. + // FUTUREWORK: do this properly respecting the reference graph. + for (elem, root_node) in uploads { // Create and upload a PathInfo pointing to the root_node, // annotated with information we have from the reference graph. let path_info = PathInfo { @@ -469,7 +461,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { allow_other, show_xattr, } => { - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -511,7 +503,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { list_root, show_xattr, } => { - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, |