about summary refs log tree commit diff
path: root/tvix/store/src/bin/tvix-store.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r--tvix/store/src/bin/tvix-store.rs619
1 files changed, 312 insertions, 307 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 15f37d301f9d..0ae18ec2161a 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -1,30 +1,32 @@
 use clap::Parser;
 use clap::Subcommand;
 
-use futures::future::try_join_all;
-use nix_compat::path_info::ExportedPathInfo;
+use futures::StreamExt;
+use futures::TryStreamExt;
+use nix_compat::nix_daemon::de::Error;
+use nix_compat::nixhash::CAHash;
+use nix_compat::nixhash::NixHash;
+use nix_compat::{path_info::ExportedPathInfo, store_path::StorePath};
 use serde::Deserialize;
 use serde::Serialize;
 use std::path::PathBuf;
 use std::sync::Arc;
-use tokio_listener::Listener;
-use tokio_listener::SystemOptions;
-use tokio_listener::UserOptions;
 use tonic::transport::Server;
-use tracing::info;
-use tracing::Level;
-use tracing_subscriber::EnvFilter;
-use tracing_subscriber::Layer;
-use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
-use tvix_castore::import::ingest_path;
-use tvix_store::proto::NarInfo;
-use tvix_store::proto::PathInfo;
+use tower::ServiceBuilder;
+use tower_http::trace::{DefaultMakeSpan, TraceLayer};
+use tracing::{debug, info, info_span, instrument, warn, Instrument, Level, Span};
+use tracing_indicatif::span_ext::IndicatifSpanExt;
+use tvix_castore::import::fs::ingest_path;
+use tvix_store::import::path_to_name;
+use tvix_store::nar::NarCalculationService;
+use tvix_store::utils::{ServiceUrls, ServiceUrlsGrpc};
+use tvix_tracing::TracingHandle;
 
 use tvix_castore::proto::blob_service_server::BlobServiceServer;
 use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
 use tvix_castore::proto::GRPCBlobServiceWrapper;
 use tvix_castore::proto::GRPCDirectoryServiceWrapper;
-use tvix_store::pathinfoservice::PathInfoService;
+use tvix_store::pathinfoservice::{PathInfo, PathInfoService};
 use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
 use tvix_store::proto::GRPCPathInfoServiceWrapper;
 
@@ -34,15 +36,6 @@ use tvix_store::pathinfoservice::make_fs;
 #[cfg(feature = "fuse")]
 use tvix_castore::fs::fuse::FuseDaemon;
 
-#[cfg(feature = "otlp")]
-use opentelemetry::KeyValue;
-#[cfg(feature = "otlp")]
-use opentelemetry_sdk::{
-    resource::{ResourceDetector, SdkProvidedResourceDetector},
-    trace::BatchConfig,
-    Resource,
-};
-
 #[cfg(feature = "virtiofs")]
 use tvix_castore::fs::virtiofs::start_virtiofs_daemon;
 
@@ -51,24 +44,18 @@ use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET;
 #[cfg(feature = "tonic-reflection")]
 use tvix_store::proto::FILE_DESCRIPTOR_SET;
 
+use mimalloc::MiMalloc;
+
+#[global_allocator]
+static GLOBAL: MiMalloc = MiMalloc;
+
 #[derive(Parser)]
 #[command(author, version, about, long_about = None)]
 struct Cli {
-    /// Whether to log in JSON
-    #[arg(long)]
-    json: bool,
-
     /// Whether to configure OTLP. Set --otlp=false to disable.
     #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))]
     otlp: bool,
 
-    /// A global log level to use when printing logs.
-    /// It's also possible to set `RUST_LOG` according to
-    /// `tracing_subscriber::filter::EnvFilter`, which will always have
-    /// priority.
-    #[arg(long)]
-    log_level: Option<Level>,
-
     #[command(subcommand)]
     command: Commands,
 }
@@ -77,47 +64,26 @@ struct Cli {
 enum Commands {
     /// Runs the tvix-store daemon.
     Daemon {
-        #[arg(long, short = 'l')]
-        listen_address: Option<String>,
+        /// The address to listen on.
+        #[clap(flatten)]
+        listen_args: tokio_listener::ListenerAddressLFlag,
 
-        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")]
-        blob_service_addr: String,
-
-        #[arg(
-            long,
-            env,
-            default_value = "sled:///var/lib/tvix-store/directories.sled"
-        )]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrls,
     },
     /// Imports a list of paths into the store, print the store path for each of them.
     Import {
         #[clap(value_name = "PATH")]
         paths: Vec<PathBuf>,
 
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
     },
 
     /// Copies a list of store paths on the system into tvix-store.
     Copy {
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
 
         /// A path pointing to a JSON file produced by the Nix
         /// `__structuredAttrs` containing reference graph information provided
@@ -137,14 +103,8 @@ enum Commands {
         #[clap(value_name = "PATH")]
         dest: PathBuf,
 
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
 
         /// Number of FUSE threads to spawn.
         #[arg(long, env, default_value_t = default_threads())]
@@ -173,14 +133,8 @@ enum Commands {
         #[clap(value_name = "PATH")]
         socket: PathBuf,
 
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        blob_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        directory_service_addr: String,
-
-        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
-        path_info_service_addr: String,
+        #[clap(flatten)]
+        service_addrs: ServiceUrlsGrpc,
 
         /// Whether to list elements at the root of the mount point.
         /// This is useful if your PathInfoService doesn't provide an
@@ -194,129 +148,44 @@ enum Commands {
     },
 }
 
-#[cfg(all(feature = "fuse", not(target_os = "macos")))]
+#[cfg(feature = "fuse")]
 fn default_threads() -> usize {
     std::thread::available_parallelism()
         .map(|threads| threads.into())
         .unwrap_or(4)
 }
-// On MacFUSE only a single channel will receive ENODEV when the file system is
-// unmounted and so all the other channels will block forever.
-// See https://github.com/osxfuse/osxfuse/issues/974
-#[cfg(all(feature = "fuse", target_os = "macos"))]
-fn default_threads() -> usize {
-    1
-}
-
-#[tokio::main]
-async fn main() -> Result<(), Box<dyn std::error::Error>> {
-    let cli = Cli::parse();
-
-    // configure log settings
-    let level = cli.log_level.unwrap_or(Level::INFO);
-
-    // Set up the tracing subscriber.
-    let subscriber = tracing_subscriber::registry()
-        .with(
-            cli.json.then_some(
-                tracing_subscriber::fmt::Layer::new()
-                    .with_writer(std::io::stderr)
-                    .json()
-                    .with_filter(
-                        EnvFilter::builder()
-                            .with_default_directive(level.into())
-                            .from_env()
-                            .expect("invalid RUST_LOG"),
-                    ),
-            ),
-        )
-        .with(
-            (!cli.json).then_some(
-                tracing_subscriber::fmt::Layer::new()
-                    .with_writer(std::io::stderr)
-                    .pretty()
-                    .with_filter(
-                        EnvFilter::builder()
-                            .with_default_directive(level.into())
-                            .from_env()
-                            .expect("invalid RUST_LOG"),
-                    ),
-            ),
-        );
-
-    // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI)
-    // then init the registry.
-    // If the feature is feature-flagged out, just init without adding the layer.
-    // It's necessary to do this separately, as every with() call chains the
-    // layer into the type of the registry.
-    #[cfg(feature = "otlp")]
-    {
-        let subscriber = if cli.otlp {
-            let tracer = opentelemetry_otlp::new_pipeline()
-                .tracing()
-                .with_exporter(opentelemetry_otlp::new_exporter().tonic())
-                .with_batch_config(BatchConfig::default())
-                .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
-                    // use SdkProvidedResourceDetector.detect to detect resources,
-                    // but replace the default service name with our default.
-                    // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
-                    let resources =
-                        SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
-                    // SdkProvidedResourceDetector currently always sets
-                    // `service.name`, but we don't like its default.
-                    if resources.get("service.name".into()).unwrap() == "unknown_service".into() {
-                        resources.merge(&Resource::new([KeyValue::new(
-                            "service.name",
-                            "tvix.store",
-                        )]))
-                    } else {
-                        resources
-                    }
-                }))
-                .install_batch(opentelemetry_sdk::runtime::Tokio)?;
-
-            // Create a tracing layer with the configured tracer
-            let layer = tracing_opentelemetry::layer().with_tracer(tracer);
-
-            subscriber.with(Some(layer))
-        } else {
-            subscriber.with(None)
-        };
-
-        subscriber.try_init()?;
-    }
-
-    // Init the registry (when otlp is not enabled)
-    #[cfg(not(feature = "otlp"))]
-    {
-        subscriber.try_init()?;
-    }
 
+#[instrument(skip_all)]
+async fn run_cli(
+    cli: Cli,
+    tracing_handle: TracingHandle,
+) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
     match cli.command {
         Commands::Daemon {
-            listen_address,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            listen_args,
+            service_addrs,
         } => {
             // initialize stores
-            let (blob_service, directory_service, path_info_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
-
-            let listen_address = listen_address
-                .unwrap_or_else(|| "[::]:8000".to_string())
-                .parse()
-                .unwrap();
+            let (blob_service, directory_service, path_info_service, nar_calculation_service) =
+                tvix_store::utils::construct_services(service_addrs).await?;
+
+            let mut server = Server::builder().layer(
+                ServiceBuilder::new()
+                    .layer(
+                        TraceLayer::new_for_grpc().make_span_with(
+                            DefaultMakeSpan::new()
+                                .level(Level::INFO)
+                                .include_headers(true),
+                        ),
+                    )
+                    .map_request(tvix_tracing::propagate::tonic::accept_trace),
+            );
 
-            let mut server = Server::builder();
+            let (_health_reporter, health_service) = tonic_health::server::health_reporter();
 
             #[allow(unused_mut)]
             let mut router = server
+                .add_service(health_service)
                 .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
                     blob_service,
                 )))
@@ -324,90 +193,161 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                     GRPCDirectoryServiceWrapper::new(directory_service),
                 ))
                 .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
-                    Arc::from(path_info_service),
+                    path_info_service,
+                    nar_calculation_service,
                 )));
 
             #[cfg(feature = "tonic-reflection")]
             {
-                let reflection_svc = tonic_reflection::server::Builder::configure()
-                    .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
-                    .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
-                    .build()?;
-                router = router.add_service(reflection_svc);
+                router = router.add_service(
+                    tonic_reflection::server::Builder::configure()
+                        .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
+                        .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
+                        .build_v1alpha()?,
+                );
+                router = router.add_service(
+                    tonic_reflection::server::Builder::configure()
+                        .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
+                        .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
+                        .build_v1()?,
+                );
             }
 
-            info!(listen_address=%listen_address, "starting daemon");
+            let listen_address = &listen_args.listen_address.unwrap_or_else(|| {
+                "[::]:8000"
+                    .parse()
+                    .expect("invalid fallback listen address")
+            });
 
-            let listener = Listener::bind(
-                &listen_address,
-                &SystemOptions::default(),
-                &UserOptions::default(),
+            let listener = tokio_listener::Listener::bind(
+                listen_address,
+                &Default::default(),
+                &listen_args.listener_options,
             )
             .await?;
 
+            info!(listen_address=%listen_address, "starting daemon");
+
             router.serve_with_incoming(listener).await?;
         }
         Commands::Import {
             paths,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
         } => {
             // FUTUREWORK: allow flat for single files?
-            let (blob_service, directory_service, path_info_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+            let (blob_service, directory_service, path_info_service, nar_calculation_service) =
+                tvix_store::utils::construct_services(service_addrs).await?;
 
-            // Arc the PathInfoService, as we clone it .
-            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+            // Arc NarCalculationService, as we clone it .
+            let nar_calculation_service: Arc<dyn NarCalculationService> =
+                nar_calculation_service.into();
 
-            let tasks = paths
+            // For each path passed, construct the name, or bail out if it's invalid.
+            let paths_and_names = paths
                 .into_iter()
-                .map(|path| {
-                    tokio::task::spawn({
-                        let blob_service = blob_service.clone();
-                        let directory_service = directory_service.clone();
-                        let path_info_service = path_info_service.clone();
-
-                        async move {
-                            if let Ok(name) = tvix_store::import::path_to_name(&path) {
-                                let resp = tvix_store::import::import_path_as_nar_ca(
-                                    &path,
-                                    name,
-                                    blob_service,
-                                    directory_service,
-                                    path_info_service,
-                                )
-                                .await;
-                                if let Ok(output_path) = resp {
-                                    // If the import was successful, print the path to stdout.
-                                    println!("{}", output_path.to_absolute_path());
-                                }
+                .map(|p| match path_to_name(&p) {
+                    Ok(name) => {
+                        let name = name.to_owned();
+                        Ok((p, name))
+                    }
+                    Err(e) => Err(e),
+                })
+                .collect::<Result<Vec<_>, _>>()?;
+
+            let imports_span =
+                info_span!("import paths", "indicatif.pb_show" = tracing::field::Empty);
+            imports_span.pb_set_message("Importing");
+            imports_span.pb_set_length(paths_and_names.len() as u64);
+            imports_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+            imports_span.pb_start();
+
+            futures::stream::iter(paths_and_names)
+                .map(|(path, name)| {
+                    let blob_service = blob_service.clone();
+                    let directory_service = directory_service.clone();
+                    let path_info_service = path_info_service.clone();
+                    let nar_calculation_service = nar_calculation_service.clone();
+                    let imports_span = imports_span.clone();
+                    let tracing_handle = tracing_handle.clone();
+
+                    async move {
+                        let span = Span::current();
+                        span.pb_set_style(&tvix_tracing::PB_SPINNER_STYLE);
+                        span.pb_set_message(&format!("Ingesting {:?}", path));
+                        span.pb_start();
+
+                        // Ingest the contents at the given path into castore.
+                        let root_node = ingest_path::<_, _, _, &[u8]>(
+                            blob_service,
+                            directory_service,
+                            &path,
+                            None,
+                        )
+                        .await
+                        .map_err(std::io::Error::custom)?;
+
+                        span.pb_set_message(&format!("NAR Calculation for {:?}", path));
+
+                        // Ask for the NAR size and sha256
+                        let (nar_size, nar_sha256) =
+                            nar_calculation_service.calculate_nar(&root_node).await?;
+
+                        // Calculate the output path. This might still fail, as some names are illegal.
+                        // FUTUREWORK: express the `name` at the type level to be valid and check for this earlier.
+                        let ca = CAHash::Nar(NixHash::Sha256(nar_sha256));
+                        let output_path: StorePath<String> =
+                            nix_compat::store_path::build_ca_path::<&str, _, _>(
+                                &name,
+                                &ca,
+                                [],
+                                false,
+                            )
+                            .map_err(|e| {
+                                warn!(err=%e, "unable to build CA path");
+                                std::io::Error::custom(e)
+                            })?;
+
+                        // Construct and insert PathInfo
+                        match path_info_service
+                            .as_ref()
+                            .put(PathInfo {
+                                store_path: output_path.to_owned(),
+                                node: root_node,
+                                // There's no reference scanning on imported paths
+                                references: vec![],
+                                nar_size,
+                                nar_sha256,
+                                signatures: vec![],
+                                deriver: None,
+                                ca: Some(ca),
+                            })
+                            .await
+                        {
+                            // If the import was successful, print the path to stdout.
+                            Ok(path_info) => {
+                                use std::io::Write;
+                                debug!(store_path=%path_info.store_path.to_absolute_path(), "imported path");
+                                writeln!(&mut tracing_handle.get_stdout_writer(), "{}", path_info.store_path.to_absolute_path())?;
+                                imports_span.pb_inc(1);
+                                Ok(())
+                            }
+                            Err(e) => {
+                                warn!(?path, err=%e, "failed to import");
+                                Err(e)
                             }
                         }
-                    })
+                    }.instrument(info_span!("import path", "indicatif.pb_show" = tracing::field::Empty))
                 })
-                .collect::<Vec<_>>();
-
-            try_join_all(tasks).await?;
+                .buffer_unordered(50)
+                .try_collect::<()>()
+                .await?;
         }
         Commands::Copy {
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
             reference_graph_path,
         } => {
-            let (blob_service, directory_service, path_info_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+                tvix_store::utils::construct_services(service_addrs).await?;
 
             // Parse the file at reference_graph_path.
             let reference_graph_json = tokio::fs::read(&reference_graph_path).await?;
@@ -421,38 +361,78 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             let reference_graph: ReferenceGraph<'_> =
                 serde_json::from_slice(reference_graph_json.as_slice())?;
 
-            // We currently simply upload all store paths in linear order.
-            // FUTUREWORK: properly walk the reference graph from the leaves, and upload multiple in parallel.
-            for elem in reference_graph.closure {
-                // Skip if that store path already exists
-                if path_info_service.get(*elem.path.digest()).await?.is_some() {
-                    continue;
-                }
+            let lookups_span = info_span!(
+                "lookup pathinfos",
+                "indicatif.pb_show" = tracing::field::Empty
+            );
+            lookups_span.pb_set_length(reference_graph.closure.len() as u64);
+            lookups_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE);
+            lookups_span.pb_start();
+
+            // From our reference graph, lookup all pathinfos that might exist.
+            let elems: Vec<_> = futures::stream::iter(reference_graph.closure)
+                .map(|elem| {
+                    let path_info_service = path_info_service.clone();
+                    async move {
+                        let resp = path_info_service
+                            .get(*elem.path.digest())
+                            .await
+                            .map(|resp| (elem, resp));
+
+                        Span::current().pb_inc(1);
+                        resp
+                    }
+                })
+                .buffer_unordered(50)
+                // Filter out all that are already uploaded.
+                // TODO: check if there's a better combinator for this
+                .try_filter_map(|(elem, path_info)| {
+                    std::future::ready(if path_info.is_none() {
+                        Ok(Some(elem))
+                    } else {
+                        Ok(None)
+                    })
+                })
+                .try_collect()
+                .await?;
 
-                let path: PathBuf = elem.path.to_absolute_path().into();
-                // Ingest the given path
-                let root_node =
-                    ingest_path(blob_service.clone(), directory_service.clone(), path).await?;
+            // Run ingest_path on all of them.
+            let uploads: Vec<_> = futures::stream::iter(elems)
+                .map(|elem| {
+                    // Map to a future returning the root node, alongside with the closure info.
+                    let blob_service = blob_service.clone();
+                    let directory_service = directory_service.clone();
+                    async move {
+                        // Ingest the given path.
+
+                        ingest_path::<_, _, _, &[u8]>(
+                            blob_service,
+                            directory_service,
+                            PathBuf::from(elem.path.to_absolute_path()),
+                            None,
+                        )
+                        .await
+                        .map(|root_node| (elem, root_node))
+                    }
+                })
+                .buffer_unordered(10)
+                .try_collect()
+                .await?;
 
+            // Insert them into the PathInfoService.
+            // FUTUREWORK: do this properly respecting the reference graph.
+            for (elem, root_node) in uploads {
                 // Create and upload a PathInfo pointing to the root_node,
                 // annotated with information we have from the reference graph.
                 let path_info = PathInfo {
-                    node: Some(tvix_castore::proto::Node {
-                        node: Some(root_node),
-                    }),
-                    references: Vec::from_iter(
-                        elem.references.iter().map(|e| e.digest().to_vec().into()),
-                    ),
-                    narinfo: Some(NarInfo {
-                        nar_size: elem.nar_size,
-                        nar_sha256: elem.nar_sha256.to_vec().into(),
-                        signatures: vec![],
-                        reference_names: Vec::from_iter(
-                            elem.references.iter().map(|e| e.to_string()),
-                        ),
-                        deriver: None,
-                        ca: None,
-                    }),
+                    store_path: elem.path.to_owned(),
+                    node: root_node,
+                    references: elem.references.iter().map(StorePath::to_owned).collect(),
+                    nar_size: elem.nar_size,
+                    nar_sha256: elem.nar_sha256,
+                    signatures: vec![],
+                    deriver: None,
+                    ca: None,
                 };
 
                 path_info_service.put(path_info).await?;
@@ -461,27 +441,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
         #[cfg(feature = "fuse")]
         Commands::Mount {
             dest,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
             list_root,
             threads,
             allow_other,
             show_xattr,
         } => {
-            let (blob_service, directory_service, path_info_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+                tvix_store::utils::construct_services(service_addrs).await?;
 
-            let mut fuse_daemon = tokio::task::spawn_blocking(move || {
+            let fuse_daemon = tokio::task::spawn_blocking(move || {
                 let fs = make_fs(
                     blob_service,
                     directory_service,
-                    Arc::from(path_info_service),
+                    path_info_service,
                     list_root,
                     show_xattr,
                 );
@@ -491,39 +464,38 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             })
             .await??;
 
-            // grab a handle to unmount the file system, and register a signal
-            // handler.
-            tokio::spawn(async move {
-                tokio::signal::ctrl_c().await.unwrap();
-                info!("interrupt received, unmounting…");
-                tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
-                info!("unmount occured, terminating…");
-                Ok::<_, std::io::Error>(())
-            })
-            .await??;
+            // Wait for a ctrl_c and then call fuse_daemon.unmount().
+            tokio::spawn({
+                let fuse_daemon = fuse_daemon.clone();
+                async move {
+                    tokio::signal::ctrl_c().await.unwrap();
+                    info!("interrupt received, unmounting…");
+                    tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
+                    info!("unmount occured, terminating…");
+                    Ok::<_, std::io::Error>(())
+                }
+            });
+
+            // Wait for the server to finish, which can either happen through it
+            // being unmounted externally, or receiving a signal invoking the
+            // handler above.
+            tokio::task::spawn_blocking(move || fuse_daemon.wait()).await?
         }
         #[cfg(feature = "virtiofs")]
         Commands::VirtioFs {
             socket,
-            blob_service_addr,
-            directory_service_addr,
-            path_info_service_addr,
+            service_addrs,
             list_root,
             show_xattr,
         } => {
-            let (blob_service, directory_service, path_info_service) =
-                tvix_store::utils::construct_services(
-                    blob_service_addr,
-                    directory_service_addr,
-                    path_info_service_addr,
-                )
-                .await?;
+            let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
+                tvix_store::utils::construct_services(service_addrs).await?;
 
             tokio::task::spawn_blocking(move || {
                 let fs = make_fs(
                     blob_service,
                     directory_service,
-                    Arc::from(path_info_service),
+                    path_info_service,
                     list_root,
                     show_xattr,
                 );
@@ -536,3 +508,36 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
     };
     Ok(())
 }
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
+    let cli = Cli::parse();
+
+    let tracing_handle = {
+        let mut builder = tvix_tracing::TracingBuilder::default();
+        builder = builder.enable_progressbar();
+        #[cfg(feature = "otlp")]
+        {
+            if cli.otlp {
+                builder = builder.enable_otlp("tvix.store");
+            }
+        }
+        builder.build()?
+    };
+
+    tokio::select! {
+        res = tokio::signal::ctrl_c() => {
+            res?;
+            if let Err(e) = tracing_handle.force_shutdown().await {
+                eprintln!("failed to shutdown tracing: {e}");
+            }
+            Ok(())
+        },
+        res = run_cli(cli, tracing_handle.clone()) => {
+            if let Err(e) = tracing_handle.shutdown().await {
+                eprintln!("failed to shutdown tracing: {e}");
+            }
+            res
+        }
+    }
+}