diff options
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 619 |
1 files changed, 312 insertions, 307 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 15f37d301f9d..0ae18ec2161a 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -1,30 +1,32 @@ use clap::Parser; use clap::Subcommand; -use futures::future::try_join_all; -use nix_compat::path_info::ExportedPathInfo; +use futures::StreamExt; +use futures::TryStreamExt; +use nix_compat::nix_daemon::de::Error; +use nix_compat::nixhash::CAHash; +use nix_compat::nixhash::NixHash; +use nix_compat::{path_info::ExportedPathInfo, store_path::StorePath}; use serde::Deserialize; use serde::Serialize; use std::path::PathBuf; use std::sync::Arc; -use tokio_listener::Listener; -use tokio_listener::SystemOptions; -use tokio_listener::UserOptions; use tonic::transport::Server; -use tracing::info; -use tracing::Level; -use tracing_subscriber::EnvFilter; -use tracing_subscriber::Layer; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; -use tvix_castore::import::ingest_path; -use tvix_store::proto::NarInfo; -use tvix_store::proto::PathInfo; +use tower::ServiceBuilder; +use tower_http::trace::{DefaultMakeSpan, TraceLayer}; +use tracing::{debug, info, info_span, instrument, warn, Instrument, Level, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt; +use tvix_castore::import::fs::ingest_path; +use tvix_store::import::path_to_name; +use tvix_store::nar::NarCalculationService; +use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc}; +use tvix_tracing::TracingHandle; use tvix_castore::proto::blob_service_server::BlobServiceServer; use tvix_castore::proto::directory_service_server::DirectoryServiceServer; use tvix_castore::proto::GRPCBlobServiceWrapper; use tvix_castore::proto::GRPCDirectoryServiceWrapper; -use tvix_store::pathinfoservice::PathInfoService; +use tvix_store::pathinfoservice::{PathInfo, PathInfoService}; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCPathInfoServiceWrapper; @@ -34,15 +36,6 @@ use tvix_store::pathinfoservice::make_fs; #[cfg(feature = "fuse")] use tvix_castore::fs::fuse::FuseDaemon; -#[cfg(feature = "otlp")] -use opentelemetry::KeyValue; -#[cfg(feature = "otlp")] -use opentelemetry_sdk::{ - resource::{ResourceDetector, SdkProvidedResourceDetector}, - trace::BatchConfig, - Resource, -}; - #[cfg(feature = "virtiofs")] use tvix_castore::fs::virtiofs::start_virtiofs_daemon; @@ -51,24 +44,18 @@ use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET; #[cfg(feature = "tonic-reflection")] use tvix_store::proto::FILE_DESCRIPTOR_SET; +use mimalloc::MiMalloc; + +#[global_allocator] +static GLOBAL: MiMalloc = MiMalloc; + #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { - /// Whether to log in JSON - #[arg(long)] - json: bool, - /// Whether to configure OTLP. Set --otlp=false to disable. #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))] otlp: bool, - /// A global log level to use when printing logs. - /// It's also possible to set `RUST_LOG` according to - /// `tracing_subscriber::filter::EnvFilter`, which will always have - /// priority. - #[arg(long)] - log_level: Option<Level>, - #[command(subcommand)] command: Commands, } @@ -77,47 +64,26 @@ struct Cli { enum Commands { /// Runs the tvix-store daemon. Daemon { - #[arg(long, short = 'l')] - listen_address: Option<String>, + /// The address to listen on. + #[clap(flatten)] + listen_args: tokio_listener::ListenerAddressLFlag, - #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")] - blob_service_addr: String, - - #[arg( - long, - env, - default_value = "sled:///var/lib/tvix-store/directories.sled" - )] - directory_service_addr: String, - - #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")] - path_info_service_addr: String, + #[clap(flatten)] + service_addrs: ServiceUrls, }, /// Imports a list of paths into the store, print the store path for each of them. Import { #[clap(value_name = "PATH")] paths: Vec<PathBuf>, - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - blob_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - directory_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - path_info_service_addr: String, + #[clap(flatten)] + service_addrs: ServiceUrlsGrpc, }, /// Copies a list of store paths on the system into tvix-store. Copy { - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - blob_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - directory_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - path_info_service_addr: String, + #[clap(flatten)] + service_addrs: ServiceUrlsGrpc, /// A path pointing to a JSON file produced by the Nix /// `__structuredAttrs` containing reference graph information provided @@ -137,14 +103,8 @@ enum Commands { #[clap(value_name = "PATH")] dest: PathBuf, - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - blob_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - directory_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - path_info_service_addr: String, + #[clap(flatten)] + service_addrs: ServiceUrlsGrpc, /// Number of FUSE threads to spawn. #[arg(long, env, default_value_t = default_threads())] @@ -173,14 +133,8 @@ enum Commands { #[clap(value_name = "PATH")] socket: PathBuf, - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - blob_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - directory_service_addr: String, - - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - path_info_service_addr: String, + #[clap(flatten)] + service_addrs: ServiceUrlsGrpc, /// Whether to list elements at the root of the mount point. /// This is useful if your PathInfoService doesn't provide an @@ -194,129 +148,44 @@ enum Commands { }, } -#[cfg(all(feature = "fuse", not(target_os = "macos")))] +#[cfg(feature = "fuse")] fn default_threads() -> usize { std::thread::available_parallelism() .map(|threads| threads.into()) .unwrap_or(4) } -// On MacFUSE only a single channel will receive ENODEV when the file system is -// unmounted and so all the other channels will block forever. -// See https://github.com/osxfuse/osxfuse/issues/974 -#[cfg(all(feature = "fuse", target_os = "macos"))] -fn default_threads() -> usize { - 1 -} - -#[tokio::main] -async fn main() -> Result<(), Box<dyn std::error::Error>> { - let cli = Cli::parse(); - - // configure log settings - let level = cli.log_level.unwrap_or(Level::INFO); - - // Set up the tracing subscriber. - let subscriber = tracing_subscriber::registry() - .with( - cli.json.then_some( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .json() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), - ), - ) - .with( - (!cli.json).then_some( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .pretty() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), - ), - ); - - // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI) - // then init the registry. - // If the feature is feature-flagged out, just init without adding the layer. - // It's necessary to do this separately, as every with() call chains the - // layer into the type of the registry. - #[cfg(feature = "otlp")] - { - let subscriber = if cli.otlp { - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(opentelemetry_otlp::new_exporter().tonic()) - .with_batch_config(BatchConfig::default()) - .with_trace_config(opentelemetry_sdk::trace::config().with_resource({ - // use SdkProvidedResourceDetector.detect to detect resources, - // but replace the default service name with our default. - // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 - let resources = - SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); - // SdkProvidedResourceDetector currently always sets - // `service.name`, but we don't like its default. - if resources.get("service.name".into()).unwrap() == "unknown_service".into() { - resources.merge(&Resource::new([KeyValue::new( - "service.name", - "tvix.store", - )])) - } else { - resources - } - })) - .install_batch(opentelemetry_sdk::runtime::Tokio)?; - - // Create a tracing layer with the configured tracer - let layer = tracing_opentelemetry::layer().with_tracer(tracer); - - subscriber.with(Some(layer)) - } else { - subscriber.with(None) - }; - - subscriber.try_init()?; - } - - // Init the registry (when otlp is not enabled) - #[cfg(not(feature = "otlp"))] - { - subscriber.try_init()?; - } +#[instrument(skip_all)] +async fn run_cli( + cli: Cli, + tracing_handle: TracingHandle, +) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { match cli.command { Commands::Daemon { - listen_address, - blob_service_addr, - directory_service_addr, - path_info_service_addr, + listen_args, + service_addrs, } => { // initialize stores - let (blob_service, directory_service, path_info_service) = - tvix_store::utils::construct_services( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - ) - .await?; - - let listen_address = listen_address - .unwrap_or_else(|| "[::]:8000".to_string()) - .parse() - .unwrap(); + let (blob_service, directory_service, path_info_service, nar_calculation_service) = + tvix_store::utils::construct_services(service_addrs).await?; + + let mut server = Server::builder().layer( + ServiceBuilder::new() + .layer( + TraceLayer::new_for_grpc().make_span_with( + DefaultMakeSpan::new() + .level(Level::INFO) + .include_headers(true), + ), + ) + .map_request(tvix_tracing::propagate::tonic::accept_trace), + ); - let mut server = Server::builder(); + let (_health_reporter, health_service) = tonic_health::server::health_reporter(); #[allow(unused_mut)] let mut router = server + .add_service(health_service) .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( blob_service, ))) @@ -324,90 +193,161 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { GRPCDirectoryServiceWrapper::new(directory_service), )) .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( - Arc::from(path_info_service), + path_info_service, + nar_calculation_service, ))); #[cfg(feature = "tonic-reflection")] { - let reflection_svc = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET) - .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) - .build()?; - router = router.add_service(reflection_svc); + router = router.add_service( + tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build_v1alpha()?, + ); + router = router.add_service( + tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build_v1()?, + ); } - info!(listen_address=%listen_address, "starting daemon"); + let listen_address = &listen_args.listen_address.unwrap_or_else(|| { + "[::]:8000" + .parse() + .expect("invalid fallback listen address") + }); - let listener = Listener::bind( - &listen_address, - &SystemOptions::default(), - &UserOptions::default(), + let listener = tokio_listener::Listener::bind( + listen_address, + &Default::default(), + &listen_args.listener_options, ) .await?; + info!(listen_address=%listen_address, "starting daemon"); + router.serve_with_incoming(listener).await?; } Commands::Import { paths, - blob_service_addr, - directory_service_addr, - path_info_service_addr, + service_addrs, } => { // FUTUREWORK: allow flat for single files? - let (blob_service, directory_service, path_info_service) = - tvix_store::utils::construct_services( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - ) - .await?; + let (blob_service, directory_service, path_info_service, nar_calculation_service) = + tvix_store::utils::construct_services(service_addrs).await?; - // Arc the PathInfoService, as we clone it . - let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + // Arc NarCalculationService, as we clone it . + let nar_calculation_service: Arc<dyn NarCalculationService> = + nar_calculation_service.into(); - let tasks = paths + // For each path passed, construct the name, or bail out if it's invalid. + let paths_and_names = paths .into_iter() - .map(|path| { - tokio::task::spawn({ - let blob_service = blob_service.clone(); - let directory_service = directory_service.clone(); - let path_info_service = path_info_service.clone(); - - async move { - if let Ok(name) = tvix_store::import::path_to_name(&path) { - let resp = tvix_store::import::import_path_as_nar_ca( - &path, - name, - blob_service, - directory_service, - path_info_service, - ) - .await; - if let Ok(output_path) = resp { - // If the import was successful, print the path to stdout. - println!("{}", output_path.to_absolute_path()); - } + .map(|p| match path_to_name(&p) { + Ok(name) => { + let name = name.to_owned(); + Ok((p, name)) + } + Err(e) => Err(e), + }) + .collect::<Result<Vec<_>, _>>()?; + + let imports_span = + info_span!("import paths", "indicatif.pb_show" = tracing::field::Empty); + imports_span.pb_set_message("Importing"); + imports_span.pb_set_length(paths_and_names.len() as u64); + imports_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + imports_span.pb_start(); + + futures::stream::iter(paths_and_names) + .map(|(path, name)| { + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + let path_info_service = path_info_service.clone(); + let nar_calculation_service = nar_calculation_service.clone(); + let imports_span = imports_span.clone(); + let tracing_handle = tracing_handle.clone(); + + async move { + let span = Span::current(); + span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE); + span.pb_set_message(&format!("Ingesting {:?}", path)); + span.pb_start(); + + // Ingest the contents at the given path into castore. + let root_node = ingest_path::<_, _, _, &[u8]>( + blob_service, + directory_service, + &path, + None, + ) + .await + .map_err(std::io::Error::custom)?; + + span.pb_set_message(&format!("NAR Calculation for {:?}", path)); + + // Ask for the NAR size and sha256 + let (nar_size, nar_sha256) = + nar_calculation_service.calculate_nar(&root_node).await?; + + // Calculate the output path. This might still fail, as some names are illegal. + // FUTUREWORK: express the `name` at the type level to be valid and check for this earlier. + let ca = CAHash::Nar(NixHash::Sha256(nar_sha256)); + let output_path: StorePath<String> = + nix_compat::store_path::build_ca_path::<&str, _, _>( + &name, + &ca, + [], + false, + ) + .map_err(|e| { + warn!(err=%e, "unable to build CA path"); + std::io::Error::custom(e) + })?; + + // Construct and insert PathInfo + match path_info_service + .as_ref() + .put(PathInfo { + store_path: output_path.to_owned(), + node: root_node, + // There's no reference scanning on imported paths + references: vec![], + nar_size, + nar_sha256, + signatures: vec![], + deriver: None, + ca: Some(ca), + }) + .await + { + // If the import was successful, print the path to stdout. + Ok(path_info) => { + use std::io::Write; + debug!(store_path=%path_info.store_path.to_absolute_path(), "imported path"); + writeln!(&mut tracing_handle.get_stdout_writer(), "{}", path_info.store_path.to_absolute_path())?; + imports_span.pb_inc(1); + Ok(()) + } + Err(e) => { + warn!(?path, err=%e, "failed to import"); + Err(e) } } - }) + }.instrument(info_span!("import path", "indicatif.pb_show" = tracing::field::Empty)) }) - .collect::<Vec<_>>(); - - try_join_all(tasks).await?; + .buffer_unordered(50) + .try_collect::<()>() + .await?; } Commands::Copy { - blob_service_addr, - directory_service_addr, - path_info_service_addr, + service_addrs, reference_graph_path, } => { - let (blob_service, directory_service, path_info_service) = - tvix_store::utils::construct_services( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - ) - .await?; + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = + tvix_store::utils::construct_services(service_addrs).await?; // Parse the file at reference_graph_path. let reference_graph_json = tokio::fs::read(&reference_graph_path).await?; @@ -421,38 +361,78 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let reference_graph: ReferenceGraph<'_> = serde_json::from_slice(reference_graph_json.as_slice())?; - // We currently simply upload all store paths in linear order. - // FUTUREWORK: properly walk the reference graph from the leaves, and upload multiple in parallel. - for elem in reference_graph.closure { - // Skip if that store path already exists - if path_info_service.get(*elem.path.digest()).await?.is_some() { - continue; - } + let lookups_span = info_span!( + "lookup pathinfos", + "indicatif.pb_show" = tracing::field::Empty + ); + lookups_span.pb_set_length(reference_graph.closure.len() as u64); + lookups_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + lookups_span.pb_start(); + + // From our reference graph, lookup all pathinfos that might exist. + let elems: Vec<_> = futures::stream::iter(reference_graph.closure) + .map(|elem| { + let path_info_service = path_info_service.clone(); + async move { + let resp = path_info_service + .get(*elem.path.digest()) + .await + .map(|resp| (elem, resp)); + + Span::current().pb_inc(1); + resp + } + }) + .buffer_unordered(50) + // Filter out all that are already uploaded. + // TODO: check if there's a better combinator for this + .try_filter_map(|(elem, path_info)| { + std::future::ready(if path_info.is_none() { + Ok(Some(elem)) + } else { + Ok(None) + }) + }) + .try_collect() + .await?; - let path: PathBuf = elem.path.to_absolute_path().into(); - // Ingest the given path - let root_node = - ingest_path(blob_service.clone(), directory_service.clone(), path).await?; + // Run ingest_path on all of them. + let uploads: Vec<_> = futures::stream::iter(elems) + .map(|elem| { + // Map to a future returning the root node, alongside with the closure info. + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + async move { + // Ingest the given path. + + ingest_path::<_, _, _, &[u8]>( + blob_service, + directory_service, + PathBuf::from(elem.path.to_absolute_path()), + None, + ) + .await + .map(|root_node| (elem, root_node)) + } + }) + .buffer_unordered(10) + .try_collect() + .await?; + // Insert them into the PathInfoService. + // FUTUREWORK: do this properly respecting the reference graph. + for (elem, root_node) in uploads { // Create and upload a PathInfo pointing to the root_node, // annotated with information we have from the reference graph. let path_info = PathInfo { - node: Some(tvix_castore::proto::Node { - node: Some(root_node), - }), - references: Vec::from_iter( - elem.references.iter().map(|e| e.digest().to_vec().into()), - ), - narinfo: Some(NarInfo { - nar_size: elem.nar_size, - nar_sha256: elem.nar_sha256.to_vec().into(), - signatures: vec![], - reference_names: Vec::from_iter( - elem.references.iter().map(|e| e.to_string()), - ), - deriver: None, - ca: None, - }), + store_path: elem.path.to_owned(), + node: root_node, + references: elem.references.iter().map(StorePath::to_owned).collect(), + nar_size: elem.nar_size, + nar_sha256: elem.nar_sha256, + signatures: vec![], + deriver: None, + ca: None, }; path_info_service.put(path_info).await?; @@ -461,27 +441,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { #[cfg(feature = "fuse")] Commands::Mount { dest, - blob_service_addr, - directory_service_addr, - path_info_service_addr, + service_addrs, list_root, threads, allow_other, show_xattr, } => { - let (blob_service, directory_service, path_info_service) = - tvix_store::utils::construct_services( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - ) - .await?; + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = + tvix_store::utils::construct_services(service_addrs).await?; - let mut fuse_daemon = tokio::task::spawn_blocking(move || { + let fuse_daemon = tokio::task::spawn_blocking(move || { let fs = make_fs( blob_service, directory_service, - Arc::from(path_info_service), + path_info_service, list_root, show_xattr, ); @@ -491,39 +464,38 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { }) .await??; - // grab a handle to unmount the file system, and register a signal - // handler. - tokio::spawn(async move { - tokio::signal::ctrl_c().await.unwrap(); - info!("interrupt received, unmounting…"); - tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??; - info!("unmount occured, terminating…"); - Ok::<_, std::io::Error>(()) - }) - .await??; + // Wait for a ctrl_c and then call fuse_daemon.unmount(). + tokio::spawn({ + let fuse_daemon = fuse_daemon.clone(); + async move { + tokio::signal::ctrl_c().await.unwrap(); + info!("interrupt received, unmounting…"); + tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??; + info!("unmount occured, terminating…"); + Ok::<_, std::io::Error>(()) + } + }); + + // Wait for the server to finish, which can either happen through it + // being unmounted externally, or receiving a signal invoking the + // handler above. + tokio::task::spawn_blocking(move || fuse_daemon.wait()).await? } #[cfg(feature = "virtiofs")] Commands::VirtioFs { socket, - blob_service_addr, - directory_service_addr, - path_info_service_addr, + service_addrs, list_root, show_xattr, } => { - let (blob_service, directory_service, path_info_service) = - tvix_store::utils::construct_services( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - ) - .await?; + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = + tvix_store::utils::construct_services(service_addrs).await?; tokio::task::spawn_blocking(move || { let fs = make_fs( blob_service, directory_service, - Arc::from(path_info_service), + path_info_service, list_root, show_xattr, ); @@ -536,3 +508,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { }; Ok(()) } + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { + let cli = Cli::parse(); + + let tracing_handle = { + let mut builder = tvix_tracing::TracingBuilder::default(); + builder = builder.enable_progressbar(); + #[cfg(feature = "otlp")] + { + if cli.otlp { + builder = builder.enable_otlp("tvix.store"); + } + } + builder.build()? + }; + + tokio::select! { + res = tokio::signal::ctrl_c() => { + res?; + if let Err(e) = tracing_handle.force_shutdown().await { + eprintln!("failed to shutdown tracing: {e}"); + } + Ok(()) + }, + res = run_cli(cli, tracing_handle.clone()) => { + if let Err(e) = tracing_handle.shutdown().await { + eprintln!("failed to shutdown tracing: {e}"); + } + res + } + } +} |