about summary refs log tree commit diff
path: root/tvix/store/src/bin/tvix-store.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r--tvix/store/src/bin/tvix-store.rs378
1 files changed, 147 insertions, 231 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 906d0ab5206a..6da239a8fee3 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -9,19 +9,16 @@ use serde::Deserialize;
 use serde::Serialize;
 use std::path::PathBuf;
 use std::sync::Arc;
-use tokio_listener::Listener;
-use tokio_listener::SystemOptions;
-use tokio_listener::UserOptions;
 use tonic::transport::Server;
-use tracing::info;
-use tracing::Level;
-use tracing_subscriber::EnvFilter;
-use tracing_subscriber::Layer;
-use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
+use tower::ServiceBuilder;
+use tower_http::trace::{DefaultMakeSpan, TraceLayer};
+use tracing::{info, info_span, instrument, Level, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt as _;
 use tvix_castore::import::fs::ingest_path;
 use tvix_store::nar::NarCalculationService;
 use tvix_store::proto::NarInfo;
 use tvix_store::proto::PathInfo;
+use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc};
 
 use tvix_castore::proto::blob_service_server::BlobServiceServer;
 use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
@@ -37,15 +34,6 @@ use tvix_store::pathinfoservice::make_fs;
 #[cfg(feature = "fuse")]
 use tvix_castore::fs::fuse::FuseDaemon;
 
-#[cfg(feature = "otlp")]
-use opentelemetry::KeyValue;
-#[cfg(feature = "otlp")]
-use opentelemetry_sdk::{
-    resource::{ResourceDetector, SdkProvidedResourceDetector},
-    trace::BatchConfig,
-    Resource,
-};
-
 #[cfg(feature = "virtiofs")]
 use tvix_castore::fs::virtiofs::start_virtiofs_daemon;
 
@@ -54,6 +42,11 @@ use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET;
 #[cfg(feature = "tonic-reflection")]
 use tvix_store::proto::FILE_DESCRIPTOR_SET;
 
+use mimalloc::MiMalloc;
+
+#[global_allocator]
+static GLOBAL: MiMalloc = MiMalloc;
+
 #[derive(Parser)]
 #[command(author, version, about, long_about = None)]
 struct Cli {
@@ -65,8 +58,8 @@ struct Cli {
     /// It's also possible to set `RUST_LOG` according to
     /// `tracing_subscriber::filter::EnvFilter`, which will always have
     /// priority.
-    #[arg(long)]
-    log_level: Option<Level>,
+    #[arg(long, default_value_t=Level::INFO)]
+    log_level: Level,
 
     #[command(subcommand)]
     command: Commands,
@@ -76,51 +69,26 @@ struct Cli {
 enum Commands {
     /// Runs the tvix-store daemon.
     Daemon {
-        #[arg(long, short = 'l')]
-        listen_address: Option<String>,
-
-        #[arg(
-            long,
-            env,
-            default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store"
-        )]
-        blob_service_addr: String,
-
-        #[arg(
-            long,
-            env,
-            default_value = "sled:///var/lib/tvix-store/directories.sled"
-        )]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")]
-        path_info_service_addr: String,
+        /// The address to listen on.
+        #[clap(flatten)]
+        listen_args: tokio_listener::ListenerAddressLFlag,
+
+        #[clap(flatten)]
+        service_addrs: ServiceUrls,
     },
     /// Imports a list of paths into the store, print the store path for each of them.
     Import {
         #[clap(value_name = "PATH")]
         paths: Vec<PathBuf>,
 
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
     },
 
     /// Copies a list of store paths on the system into tvix-store.
     Copy {
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
 
         /// A path pointing to a JSON file produced by the Nix
         /// `__structuredAttrs` containing reference graph information provided
@@ -140,14 +108,8 @@ enum Commands {
         #[clap(value_name = "PATH")]
         dest: PathBuf,
 
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
 
         /// Number of FUSE threads to spawn.
         #[arg(long, env, default_value_t = default_threads())]
@@ -176,14 +138,8 @@ enum Commands {
         #[clap(value_name = "PATH")]
         socket: PathBuf,
 
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
 
         /// Whether to list elements at the root of the mount point.
         /// This is useful if your PathInfoService doesn't provide an
@@ -197,113 +153,41 @@ enum Commands {
     },
 }
 
-#[cfg(all(feature = "fuse", not(target_os = "macos")))]
+#[cfg(feature = "fuse")]
 fn default_threads() -> usize {
     std::thread::available_parallelism()
         .map(|threads| threads.into())
         .unwrap_or(4)
 }
-// On MacFUSE only a single channel will receive ENODEV when the file system is
-// unmounted and so all the other channels will block forever.
-// See https://github.com/osxfuse/osxfuse/issues/974
-#[cfg(all(feature = "fuse", target_os = "macos"))]
-fn default_threads() -> usize {
-    1
-}
-
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn std::error::Error>> {
-    let cli = Cli::parse();
-
-    // configure log settings
-    let level = cli.log_level.unwrap_or(Level::INFO);
-
-    // Set up the tracing subscriber.
-    let subscriber = tracing_subscriber::registry().with(
-        tracing_subscriber::fmt::Layer::new()
-            .with_writer(std::io::stderr)
-            .compact()
-            .with_filter(
-                EnvFilter::builder()
-                    .with_default_directive(level.into())
-                    .from_env()
-                    .expect("invalid RUST_LOG"),
-            ),
-    );
-
-    // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI)
-    // then init the registry.
-    // If the feature is feature-flagged out, just init without adding the layer.
-    // It's necessary to do this separately, as every with() call chains the
-    // layer into the type of the registry.
-    #[cfg(feature = "otlp")]
-    {
-        let subscriber = if cli.otlp {
-            let tracer = opentelemetry_otlp::new_pipeline()
-                .tracing()
-                .with_exporter(opentelemetry_otlp::new_exporter().tonic())
-                .with_batch_config(BatchConfig::default())
-                .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
-                    // use SdkProvidedResourceDetector.detect to detect resources,
-                    // but replace the default service name with our default.
-                    // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
-                    let resources =
-                        SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
-                    // SdkProvidedResourceDetector currently always sets
-                    // `service.name`, but we don't like its default.
-                    if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
-                        resources.merge(&Resource::new([KeyValue::new(
-                            "service.name",
-                            "tvix.store",
-                        )]))
-                    } else {
-                        resources
-                    }
-                }))
-                .install_batch(opentelemetry_sdk::runtime::Tokio)?;
-
-            // Create a tracing layer with the configured tracer
-            let layer = tracing_opentelemetry::layer().with_tracer(tracer);
-
-            subscriber.with(Some(layer))
-        } else {
-            subscriber.with(None)
-        };
-
-        subscriber.try_init()?;
-    }
-
-    // Init the registry (when otlp is not enabled)
-    #[cfg(not(feature = "otlp"))]
-    {
-        subscriber.try_init()?;
-    }
 
+#[instrument(skip_all)]
+async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
     match cli.command {
         Commands::Daemon {
-            listen_address,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            listen_args,
+            service_addrs,
         } => {
             // initialize stores
             let (blob_service, directory_service, path_info_service, nar_calculation_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
-
-            let listen_address = listen_address
-                .unwrap_or_else(|| "[::]:8000".to_string())
-                .parse()
-                .unwrap();
+                tvix_store::utils::construct_services(service_addrs).await?;
+
+            let mut server = Server::builder().layer(
+                ServiceBuilder::new()
+                    .layer(
+                        TraceLayer::new_for_grpc().make_span_with(
+                            DefaultMakeSpan::new()
+                                .level(Level::INFO)
+                                .include_headers(true),
+                        ),
+                    )
+                    .map_request(tvix_tracing::propagate::tonic::accept_trace),
+            );
 
-            let mut server = Server::builder();
+            let (_health_reporter, health_service) = tonic_health::server::health_reporter();
 
             #[allow(unused_mut)]
             let mut router = server
+                .add_service(health_service)
                 .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
                     blob_service,
                 )))
@@ -311,47 +195,52 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                     GRPCDirectoryServiceWrapper::new(directory_service),
                 ))
                 .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
-                    Arc::from(path_info_service),
+                    path_info_service,
                     nar_calculation_service,
                 )));
 
             #[cfg(feature = "tonic-reflection")]
             {
-                let reflection_svc = tonic_reflection::server::Builder::configure()
-                    .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
-                    .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
-                    .build()?;
-                router = router.add_service(reflection_svc);
+                router = router.add_service(
+                    tonic_reflection::server::Builder::configure()
+                        .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
+                        .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
+                        .build_v1alpha()?,
+                );
+                router = router.add_service(
+                    tonic_reflection::server::Builder::configure()
+                        .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
+                        .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
+                        .build_v1()?,
+                );
             }
 
-            info!(listen_address=%listen_address, "starting daemon");
+            let listen_address = &listen_args.listen_address.unwrap_or_else(|| {
+                "[::]:8000"
+                    .parse()
+                    .expect("invalid fallback listen address")
+            });
 
-            let listener = Listener::bind(
-                &listen_address,
-                &SystemOptions::default(),
-                &UserOptions::default(),
+            let listener = tokio_listener::Listener::bind(
+                listen_address,
+                &Default::default(),
+                &listen_args.listener_options,
             )
             .await?;
 
+            info!(listen_address=%listen_address, "starting daemon");
+
             router.serve_with_incoming(listener).await?;
         }
         Commands::Import {
             paths,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
         } => {
             // FUTUREWORK: allow flat for single files?
             let (blob_service, directory_service, path_info_service, nar_calculation_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+                tvix_store::utils::construct_services(service_addrs).await?;
 
-            // Arc PathInfoService and NarCalculationService, as we clone it .
-            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+            // Arc NarCalculationService, as we clone it .
             let nar_calculation_service: Arc<dyn NarCalculationService> =
                 nar_calculation_service.into();
 
@@ -388,18 +277,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             try_join_all(tasks).await?;
         }
         Commands::Copy {
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
             reference_graph_path,
         } => {
             let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+                tvix_store::utils::construct_services(service_addrs).await?;
 
             // Parse the file at reference_graph_path.
             let reference_graph_json = tokio::fs::read(&reference_graph_path).await?;
@@ -413,18 +295,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             let reference_graph: ReferenceGraph<'_> =
                 serde_json::from_slice(reference_graph_json.as_slice())?;
 
-            // Arc the PathInfoService, as we clone it .
-            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+            let lookups_span = info_span!(
+                "lookup pathinfos",
+                "indicatif.pb_show" = tracing::field::Empty
+            );
+            lookups_span.pb_set_length(reference_graph.closure.len() as u64);
+            lookups_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+            lookups_span.pb_start();
 
             // From our reference graph, lookup all pathinfos that might exist.
             let elems: Vec<_> = futures::stream::iter(reference_graph.closure)
                 .map(|elem| {
                     let path_info_service = path_info_service.clone();
                     async move {
-                        path_info_service
+                        let resp = path_info_service
                             .get(*elem.path.digest())
                             .await
-                            .map(|resp| (elem, resp))
+                            .map(|resp| (elem, resp));
+
+                        Span::current().pb_inc(1);
+                        resp
                     }
                 })
                 .buffer_unordered(50)
@@ -468,9 +358,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 // Create and upload a PathInfo pointing to the root_node,
                 // annotated with information we have from the reference graph.
                 let path_info = PathInfo {
-                    node: Some(tvix_castore::proto::Node {
-                        node: Some(root_node),
-                    }),
+                    node: Some(tvix_castore::proto::Node::from_name_and_node(
+                        elem.path.to_string().into(),
+                        root_node,
+                    )),
                     references: Vec::from_iter(
                         elem.references.iter().map(|e| e.digest().to_vec().into()),
                     ),
@@ -492,27 +383,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
         #[cfg(feature = "fuse")]
         Commands::Mount {
             dest,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
             list_root,
             threads,
             allow_other,
             show_xattr,
         } => {
             let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+                tvix_store::utils::construct_services(service_addrs).await?;
 
-            let mut fuse_daemon = tokio::task::spawn_blocking(move || {
+            let fuse_daemon = tokio::task::spawn_blocking(move || {
                 let fs = make_fs(
                     blob_service,
                     directory_service,
-                    Arc::from(path_info_service),
+                    path_info_service,
                     list_root,
                     show_xattr,
                 );
@@ -522,39 +406,38 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             })
             .await??;
 
-            // grab a handle to unmount the file system, and register a signal
-            // handler.
-            tokio::spawn(async move {
-                tokio::signal::ctrl_c().await.unwrap();
-                info!("interrupt received, unmounting…");
-                tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
-                info!("unmount occured, terminating…");
-                Ok::<_, std::io::Error>(())
-            })
-            .await??;
+            // Wait for a ctrl_c and then call fuse_daemon.unmount().
+            tokio::spawn({
+                let fuse_daemon = fuse_daemon.clone();
+                async move {
+                    tokio::signal::ctrl_c().await.unwrap();
+                    info!("interrupt received, unmounting…");
+                    tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
+                    info!("unmount occured, terminating…");
+                    Ok::<_, std::io::Error>(())
+                }
+            });
+
+            // Wait for the server to finish, which can either happen through it
+            // being unmounted externally, or receiving a signal invoking the
+            // handler above.
+            tokio::task::spawn_blocking(move || fuse_daemon.wait()).await?
         }
         #[cfg(feature = "virtiofs")]
         Commands::VirtioFs {
             socket,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
             list_root,
             show_xattr,
         } => {
             let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+                tvix_store::utils::construct_services(service_addrs).await?;
 
             tokio::task::spawn_blocking(move || {
                 let fs = make_fs(
                     blob_service,
                     directory_service,
-                    Arc::from(path_info_service),
+                    path_info_service,
                     list_root,
                     show_xattr,
                 );
@@ -567,3 +450,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     };
     Ok(())
 }
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
+    let cli = Cli::parse();
+
+    let tracing_handle = {
+        let mut builder = tvix_tracing::TracingBuilder::default();
+        builder = builder.level(cli.log_level).enable_progressbar();
+        #[cfg(feature = "otlp")]
+        {
+            if cli.otlp {
+                builder = builder.enable_otlp("tvix.store");
+            }
+        }
+        builder.build()?
+    };
+
+    tokio::select! {
+        res = tokio::signal::ctrl_c() => {
+            res?;
+            if let Err(e) = tracing_handle.force_shutdown().await {
+                eprintln!("failed to shutdown tracing: {e}");
+            }
+            Ok(())
+        },
+        res = run_cli(cli) => {
+            if let Err(e) = tracing_handle.shutdown().await {
+                eprintln!("failed to shutdown tracing: {e}");
+            }
+            res
+        }
+    }
+}