diff options
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 378 |
1 files changed, 147 insertions, 231 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 906d0ab5206a..6da239a8fee3 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -9,19 +9,16 @@ use serde::Deserialize; use serde::Serialize; use std::path::PathBuf; use std::sync::Arc; -use tokio_listener::Listener; -use tokio_listener::SystemOptions; -use tokio_listener::UserOptions; use tonic::transport::Server; -use tracing::info; -use tracing::Level; -use tracing_subscriber::EnvFilter; -use tracing_subscriber::Layer; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tower::ServiceBuilder; +use tower_http::trace::{DefaultMakeSpan, TraceLayer}; +use tracing::{info, info_span, instrument, Level, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt as _; use tvix_castore::import::fs::ingest_path; use tvix_store::nar::NarCalculationService; use tvix_store::proto::NarInfo; use tvix_store::proto::PathInfo; +use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc}; use tvix_castore::proto::blob_service_server::BlobServiceServer; use tvix_castore::proto::directory_service_server::DirectoryServiceServer; @@ -37,15 +34,6 @@ use tvix_store::pathinfoservice::make_fs; #[cfg(feature = "fuse")] use tvix_castore::fs::fuse::FuseDaemon; -#[cfg(feature = "otlp")] -use opentelemetry::KeyValue; -#[cfg(feature = "otlp")] -use opentelemetry_sdk::{ - resource::{ResourceDetector, SdkProvidedResourceDetector}, - trace::BatchConfig, - Resource, -}; - #[cfg(feature = "virtiofs")] use tvix_castore::fs::virtiofs::start_virtiofs_daemon; @@ -54,6 +42,11 @@ use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET; #[cfg(feature = "tonic-reflection")] use tvix_store::proto::FILE_DESCRIPTOR_SET; +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { @@ -65,8 +58,8 @@ struct Cli { /// It's also possible to set `RUST_LOG` according to /// `tracing_subscriber::filter::EnvFilter`, which will always have /// priority. - #[arg(long)] - log_level: Option<Level>, + #[arg(long, default_value_t=Level::INFO)] + log_level: Level, #[command(subcommand)] command: Commands, @@ -76,51 +69,26 @@ struct Cli { enum Commands { /// Runs the tvix-store daemon. Daemon { - #[arg(long, short = 'l')] - listen_address: Option<String>, - - #[arg( - long, - env, - default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store" - )] - blob_service_addr: String, - - #[arg( - long, - env, - default_value = "sled:///var/lib/tvix-store/directories.sled" - )] - directory_service_addr: String, - - #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")] - path_info_service_addr: String, + /// The address to listen on. + #[clap(flatten)] + listen_args: tokio_listener::ListenerAddressLFlag, + + #[clap(flatten)] + service_addrs: ServiceUrls, }, /// Imports a list of paths into the store, print the store path for each of them. Import { #[clap(value_name = "PATH")] paths: Vec<PathBuf>, - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - blob_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - directory_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - path_info_service_addr: String, + #[clap(flatten)] + service_addrs: ServiceUrlsGrpc, }, /// Copies a list of store paths on the system into tvix-store. Copy { - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - blob_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - directory_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - path_info_service_addr: String, + #[clap(flatten)] + service_addrs: ServiceUrlsGrpc, /// A path pointing to a JSON file produced by the Nix /// `__structuredAttrs` containing reference graph information provided @@ -140,14 +108,8 @@ enum Commands { #[clap(value_name = "PATH")] dest: PathBuf, - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - blob_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - directory_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - path_info_service_addr: String, + #[clap(flatten)] + service_addrs: ServiceUrlsGrpc, /// Number of FUSE threads to spawn. #[arg(long, env, default_value_t = default_threads())] @@ -176,14 +138,8 @@ enum Commands { #[clap(value_name = "PATH")] socket: PathBuf, - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - blob_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - directory_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - path_info_service_addr: String, + #[clap(flatten)] + service_addrs: ServiceUrlsGrpc, /// Whether to list elements at the root of the mount point. /// This is useful if your PathInfoService doesn't provide an @@ -197,113 +153,41 @@ enum Commands { }, } -#[cfg(all(feature = "fuse", not(target_os = "macos")))] +#[cfg(feature = "fuse")] fn default_threads() -> usize { std::thread::available_parallelism() .map(|threads| threads.into()) .unwrap_or(4) } -// On MacFUSE only a single channel will receive ENODEV when the file system is -// unmounted and so all the other channels will block forever. -// See https://github.com/osxfuse/osxfuse/issues/974 -#[cfg(all(feature = "fuse", target_os = "macos"))] -fn default_threads() -> usize { - 1 -} - -#[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error>> { - let cli = Cli::parse(); - - // configure log settings - let level = cli.log_level.unwrap_or(Level::INFO); - - // Set up the tracing subscriber. - let subscriber = tracing_subscriber::registry().with( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .compact() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), - ); - - // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI) - // then init the registry. - // If the feature is feature-flagged out, just init without adding the layer. - // It's necessary to do this separately, as every with() call chains the - // layer into the type of the registry. - #[cfg(feature = "otlp")] - { - let subscriber = if cli.otlp { - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(opentelemetry_otlp::new_exporter().tonic()) - .with_batch_config(BatchConfig::default()) - .with_trace_config(opentelemetry_sdk::trace::config().with_resource({ - // use SdkProvidedResourceDetector.detect to detect resources, - // but replace the default service name with our default. - // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 - let resources = - SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); - // SdkProvidedResourceDetector currently always sets - // `service.name`, but we don't like its default. - if resources.get("service.name".into()).unwrap() == "unknown_service".into() { - resources.merge(&Resource::new([KeyValue::new( - "service.name", - "tvix.store", - )])) - } else { - resources - } - })) - .install_batch(opentelemetry_sdk::runtime::Tokio)?; - - // Create a tracing layer with the configured tracer - let layer = tracing_opentelemetry::layer().with_tracer(tracer); - - subscriber.with(Some(layer)) - } else { - subscriber.with(None) - }; - - subscriber.try_init()?; - } - - // Init the registry (when otlp is not enabled) - #[cfg(not(feature = "otlp"))] - { - subscriber.try_init()?; - } +#[instrument(skip_all)] +async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { match cli.command { Commands::Daemon { - listen_address, - blob_service_addr, - directory_service_addr, - path_info_service_addr, + listen_args, + service_addrs, } => { // initialize stores let (blob_service, directory_service, path_info_service, nar_calculation_service) = - tvix_store::utils::construct_services( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - ) - .await?; - - let listen_address = listen_address - .unwrap_or_else(|| "[::]:8000".to_string()) - .parse() - .unwrap(); + tvix_store::utils::construct_services(service_addrs).await?; + + let mut server = Server::builder().layer( + ServiceBuilder::new() + .layer( + TraceLayer::new_for_grpc().make_span_with( + DefaultMakeSpan::new() + .level(Level::INFO) + .include_headers(true), + ), + ) + .map_request(tvix_tracing::propagate::tonic::accept_trace), + ); - let mut server = Server::builder(); + let (_health_reporter, health_service) = tonic_health::server::health_reporter(); #[allow(unused_mut)] let mut router = server + .add_service(health_service) .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( blob_service, ))) @@ -311,47 +195,52 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { GRPCDirectoryServiceWrapper::new(directory_service), )) .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( - Arc::from(path_info_service), + path_info_service, nar_calculation_service, ))); #[cfg(feature = "tonic-reflection")] { - let reflection_svc = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET) - .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) - .build()?; - router = router.add_service(reflection_svc); + router = router.add_service( + tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build_v1alpha()?, + ); + router = router.add_service( + tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build_v1()?, + ); } - info!(listen_address=%listen_address, "starting daemon"); + let listen_address = &listen_args.listen_address.unwrap_or_else(|| { + "[::]:8000" + .parse() + .expect("invalid fallback listen address") + }); - let listener = Listener::bind( - &listen_address, - &SystemOptions::default(), - &UserOptions::default(), + let listener = tokio_listener::Listener::bind( + listen_address, + &Default::default(), + &listen_args.listener_options, ) .await?; + info!(listen_address=%listen_address, "starting daemon"); + router.serve_with_incoming(listener).await?; } Commands::Import { paths, - blob_service_addr, - directory_service_addr, - path_info_service_addr, + service_addrs, } => { // FUTUREWORK: allow flat for single files? let (blob_service, directory_service, path_info_service, nar_calculation_service) = - tvix_store::utils::construct_services( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - ) - .await?; + tvix_store::utils::construct_services(service_addrs).await?; - // Arc PathInfoService and NarCalculationService, as we clone it . - let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + // Arc NarCalculationService, as we clone it . let nar_calculation_service: Arc<dyn NarCalculationService> = nar_calculation_service.into(); @@ -388,18 +277,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { try_join_all(tasks).await?; } Commands::Copy { - blob_service_addr, - directory_service_addr, - path_info_service_addr, + service_addrs, reference_graph_path, } => { let (blob_service, directory_service, path_info_service, _nar_calculation_service) = - tvix_store::utils::construct_services( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - ) - .await?; + tvix_store::utils::construct_services(service_addrs).await?; // Parse the file at reference_graph_path. let reference_graph_json = tokio::fs::read(&reference_graph_path).await?; @@ -413,18 +295,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let reference_graph: ReferenceGraph<'_> = serde_json::from_slice(reference_graph_json.as_slice())?; - // Arc the PathInfoService, as we clone it . - let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + let lookups_span = info_span!( + "lookup pathinfos", + "indicatif.pb_show" = tracing::field::Empty + ); + lookups_span.pb_set_length(reference_graph.closure.len() as u64); + lookups_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + lookups_span.pb_start(); // From our reference graph, lookup all pathinfos that might exist. let elems: Vec<_> = futures::stream::iter(reference_graph.closure) .map(|elem| { let path_info_service = path_info_service.clone(); async move { - path_info_service + let resp = path_info_service .get(*elem.path.digest()) .await - .map(|resp| (elem, resp)) + .map(|resp| (elem, resp)); + + Span::current().pb_inc(1); + resp } }) .buffer_unordered(50) @@ -468,9 +358,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { // Create and upload a PathInfo pointing to the root_node, // annotated with information we have from the reference graph. let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(root_node), - }), + node: Some(tvix_castore::proto::Node::from_name_and_node( + elem.path.to_string().into(), + root_node, + )), references: Vec::from_iter( elem.references.iter().map(|e| e.digest().to_vec().into()), ), @@ -492,27 +383,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { #[cfg(feature = "fuse")] Commands::Mount { dest, - blob_service_addr, - directory_service_addr, - path_info_service_addr, + service_addrs, list_root, threads, allow_other, show_xattr, } => { let (blob_service, directory_service, path_info_service, _nar_calculation_service) = - tvix_store::utils::construct_services( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - ) - .await?; + tvix_store::utils::construct_services(service_addrs).await?; - let mut fuse_daemon = tokio::task::spawn_blocking(move || { + let fuse_daemon = tokio::task::spawn_blocking(move || { let fs = make_fs( blob_service, directory_service, - Arc::from(path_info_service), + path_info_service, list_root, show_xattr, ); @@ -522,39 +406,38 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { }) .await??; - // grab a handle to unmount the file system, and register a signal - // handler. - tokio::spawn(async move { - tokio::signal::ctrl_c().await.unwrap(); - info!("interrupt received, unmounting…"); - tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??; - info!("unmount occured, terminating…"); - Ok::<_, std::io::Error>(()) - }) - .await??; + // Wait for a ctrl_c and then call fuse_daemon.unmount(). + tokio::spawn({ + let fuse_daemon = fuse_daemon.clone(); + async move { + tokio::signal::ctrl_c().await.unwrap(); + info!("interrupt received, unmounting…"); + tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??; + info!("unmount occured, terminating…"); + Ok::<_, std::io::Error>(()) + } + }); + + // Wait for the server to finish, which can either happen through it + // being unmounted externally, or receiving a signal invoking the + // handler above. + tokio::task::spawn_blocking(move || fuse_daemon.wait()).await? } #[cfg(feature = "virtiofs")] Commands::VirtioFs { socket, - blob_service_addr, - directory_service_addr, - path_info_service_addr, + service_addrs, list_root, show_xattr, } => { let (blob_service, directory_service, path_info_service, _nar_calculation_service) = - tvix_store::utils::construct_services( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - ) - .await?; + tvix_store::utils::construct_services(service_addrs).await?; tokio::task::spawn_blocking(move || { let fs = make_fs( blob_service, directory_service, - Arc::from(path_info_service), + path_info_service, list_root, show_xattr, ); @@ -567,3 +450,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { }; Ok(()) } + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + let cli = Cli::parse(); + + let tracing_handle = { + let mut builder = tvix_tracing::TracingBuilder::default(); + builder = builder.level(cli.log_level).enable_progressbar(); + #[cfg(feature = "otlp")] + { + if cli.otlp { + builder = builder.enable_otlp("tvix.store"); + } + } + builder.build()? + }; + + tokio::select! { + res = tokio::signal::ctrl_c() => { + res?; + if let Err(e) = tracing_handle.force_shutdown().await { + eprintln!("failed to shutdown tracing: {e}"); + } + Ok(()) + }, + res = run_cli(cli) => { + if let Err(e) = tracing_handle.shutdown().await { + eprintln!("failed to shutdown tracing: {e}"); + } + res + } + } +} |