about summary refs log tree commit diff
path: root/tvix/store/src/bin/tvix-store.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r--tvix/store/src/bin/tvix-store.rs401
1 files changed, 401 insertions, 0 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
new file mode 100644
index 000000000000..ecee8d78f3b7
--- /dev/null
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -0,0 +1,401 @@
+use clap::Parser;
+use clap::Subcommand;
+
+use futures::future::try_join_all;
+use std::path::PathBuf;
+use std::sync::Arc;
+use tokio_listener::Listener;
+use tokio_listener::SystemOptions;
+use tokio_listener::UserOptions;
+use tonic::transport::Server;
+use tracing::info;
+use tracing::Level;
+use tracing_subscriber::fmt::writer::MakeWriterExt;
+use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
+
+use tvix_castore::proto::blob_service_server::BlobServiceServer;
+use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
+use tvix_castore::proto::GRPCBlobServiceWrapper;
+use tvix_castore::proto::GRPCDirectoryServiceWrapper;
+use tvix_store::pathinfoservice::PathInfoService;
+use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
+use tvix_store::proto::GRPCPathInfoServiceWrapper;
+
+#[cfg(any(feature = "fuse", feature = "virtiofs"))]
+use tvix_store::pathinfoservice::make_fs;
+
+#[cfg(feature = "fuse")]
+use tvix_castore::fs::fuse::FuseDaemon;
+
+#[cfg(feature = "otlp")]
+use opentelemetry::KeyValue;
+#[cfg(feature = "otlp")]
+use opentelemetry_sdk::{
+    resource::{ResourceDetector, SdkProvidedResourceDetector},
+    trace::BatchConfig,
+    Resource,
+};
+
+#[cfg(feature = "virtiofs")]
+use tvix_castore::fs::virtiofs::start_virtiofs_daemon;
+
+#[cfg(feature = "tonic-reflection")]
+use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET;
+#[cfg(feature = "tonic-reflection")]
+use tvix_store::proto::FILE_DESCRIPTOR_SET;
+
+#[derive(Parser)]
+#[command(author, version, about, long_about = None)]
+struct Cli {
+    /// Whether to log in JSON
+    #[arg(long)]
+    json: bool,
+
+    #[arg(long)]
+    log_level: Option<Level>,
+
+    #[command(subcommand)]
+    command: Commands,
+}
+
+#[derive(Subcommand)]
+enum Commands {
+    /// Runs the tvix-store daemon.
+    Daemon {
+        #[arg(long, short = 'l')]
+        listen_address: Option<String>,
+
+        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")]
+        blob_service_addr: String,
+
+        #[arg(
+            long,
+            env,
+            default_value = "sled:///var/lib/tvix-store/directories.sled"
+        )]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")]
+        path_info_service_addr: String,
+    },
+    /// Imports a list of paths into the store, print the store path for each of them.
+    Import {
+        #[clap(value_name = "PATH")]
+        paths: Vec<PathBuf>,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+    },
+    /// Mounts a tvix-store at the given mountpoint
+    #[cfg(feature = "fuse")]
+    Mount {
+        #[clap(value_name = "PATH")]
+        dest: PathBuf,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+
+        /// Number of FUSE threads to spawn.
+        #[arg(long, env, default_value_t = default_threads())]
+        threads: usize,
+
+        /// Whether to list elements at the root of the mount point.
+        /// This is useful if your PathInfoService doesn't provide an
+        /// (exhaustive) listing.
+        #[clap(long, short, action)]
+        list_root: bool,
+    },
+    /// Starts a tvix-store virtiofs daemon at the given socket path.
+    #[cfg(feature = "virtiofs")]
+    #[command(name = "virtiofs")]
+    VirtioFs {
+        #[clap(value_name = "PATH")]
+        socket: PathBuf,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+
+        /// Whether to list elements at the root of the mount point.
+        /// This is useful if your PathInfoService doesn't provide an
+        /// (exhaustive) listing.
+        #[clap(long, short, action)]
+        list_root: bool,
+    },
+}
+
+#[cfg(all(feature = "fuse", not(target_os = "macos")))]
+fn default_threads() -> usize {
+    std::thread::available_parallelism()
+        .map(|threads| threads.into())
+        .unwrap_or(4)
+}
+// On MacFUSE only a single channel will receive ENODEV when the file system is
+// unmounted and so all the other channels will block forever.
+// See https://github.com/osxfuse/osxfuse/issues/974
+#[cfg(all(feature = "fuse", target_os = "macos"))]
+fn default_threads() -> usize {
+    1
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let cli = Cli::parse();
+
+    // configure log settings
+    let level = cli.log_level.unwrap_or(Level::INFO);
+
+    let subscriber = tracing_subscriber::registry()
+        .with(
+            cli.json.then_some(
+                tracing_subscriber::fmt::Layer::new()
+                    .with_writer(std::io::stderr.with_max_level(level))
+                    .json(),
+            ),
+        )
+        .with(
+            (!cli.json).then_some(
+                tracing_subscriber::fmt::Layer::new()
+                    .with_writer(std::io::stderr.with_max_level(level))
+                    .pretty(),
+            ),
+        );
+
+    // Add the otlp layer (when otlp is enabled), then init the registry.
+    // Or if the feature is disabled, just init without adding the layer.
+    // It's necessary to do this separately, as every with() call chains the
+    // layer into the type of the registry.
+    #[cfg(feature = "otlp")]
+    {
+        subscriber
+            .with({
+                let tracer = opentelemetry_otlp::new_pipeline()
+                    .tracing()
+                    .with_exporter(opentelemetry_otlp::new_exporter().tonic())
+                    .with_batch_config(BatchConfig::default())
+                    .with_trace_config(opentelemetry_sdk::trace::config().with_resource({
+                        // use SdkProvidedResourceDetector.detect to detect resources,
+                        // but replace the default service name with our default.
+                        // https://github.com/open-telemetry/opentelemetry-rust/issues/1298
+                        let resources =
+                            SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0));
+
+                        // SdkProvidedResourceDetector currently always sets
+                        // `service.name`, but we don't like its default.
+                        if resources.get("service.name".into()).unwrap() == "unknown_service".into()
+                        {
+                            resources.merge(&Resource::new([KeyValue::new(
+                                "service.name",
+                                "tvix.store",
+                            )]))
+                        } else {
+                            resources
+                        }
+                    }))
+                    .install_batch(opentelemetry_sdk::runtime::Tokio)?;
+
+                // Create a tracing layer with the configured tracer
+                tracing_opentelemetry::layer().with_tracer(tracer)
+            })
+            .try_init()?;
+    }
+
+    // Init the registry (when otlp is not enabled)
+    #[cfg(not(feature = "otlp"))]
+    {
+        subscriber.try_init()?;
+    }
+
+    match cli.command {
+        Commands::Daemon {
+            listen_address,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+        } => {
+            // initialize stores
+            let (blob_service, directory_service, path_info_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+
+            let listen_address = listen_address
+                .unwrap_or_else(|| "[::]:8000".to_string())
+                .parse()
+                .unwrap();
+
+            let mut server = Server::builder();
+
+            #[allow(unused_mut)]
+            let mut router = server
+                .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
+                    blob_service,
+                )))
+                .add_service(DirectoryServiceServer::new(
+                    GRPCDirectoryServiceWrapper::new(directory_service),
+                ))
+                .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
+                    Arc::from(path_info_service),
+                )));
+
+            #[cfg(feature = "tonic-reflection")]
+            {
+                let reflection_svc = tonic_reflection::server::Builder::configure()
+                    .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
+                    .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
+                    .build()?;
+                router = router.add_service(reflection_svc);
+            }
+
+            info!(listen_address=%listen_address, "starting daemon");
+
+            let listener = Listener::bind(
+                &listen_address,
+                &SystemOptions::default(),
+                &UserOptions::default(),
+            )
+            .await?;
+
+            router.serve_with_incoming(listener).await?;
+        }
+        Commands::Import {
+            paths,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+        } => {
+            // FUTUREWORK: allow flat for single files?
+            let (blob_service, directory_service, path_info_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+
+            // Arc the PathInfoService, as we clone it .
+            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+
+            let tasks = paths
+                .into_iter()
+                .map(|path| {
+                    tokio::task::spawn({
+                        let blob_service = blob_service.clone();
+                        let directory_service = directory_service.clone();
+                        let path_info_service = path_info_service.clone();
+
+                        async move {
+                            if let Ok(name) = tvix_store::import::path_to_name(&path) {
+                                let resp = tvix_store::import::import_path_as_nar_ca(
+                                    &path,
+                                    name,
+                                    blob_service,
+                                    directory_service,
+                                    path_info_service,
+                                )
+                                .await;
+                                if let Ok(output_path) = resp {
+                                    // If the import was successful, print the path to stdout.
+                                    println!("{}", output_path.to_absolute_path());
+                                }
+                            }
+                        }
+                    })
+                })
+                .collect::<Vec<_>>();
+
+            try_join_all(tasks).await?;
+        }
+        #[cfg(feature = "fuse")]
+        Commands::Mount {
+            dest,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            list_root,
+            threads,
+        } => {
+            let (blob_service, directory_service, path_info_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+
+            let mut fuse_daemon = tokio::task::spawn_blocking(move || {
+                let fs = make_fs(
+                    blob_service,
+                    directory_service,
+                    Arc::from(path_info_service),
+                    list_root,
+                );
+                info!(mount_path=?dest, "mounting");
+
+                FuseDaemon::new(fs, &dest, threads)
+            })
+            .await??;
+
+            // grab a handle to unmount the file system, and register a signal
+            // handler.
+            tokio::spawn(async move {
+                tokio::signal::ctrl_c().await.unwrap();
+                info!("interrupt received, unmounting…");
+                tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
+                info!("unmount occured, terminating…");
+                Ok::<_, std::io::Error>(())
+            })
+            .await??;
+        }
+        #[cfg(feature = "virtiofs")]
+        Commands::VirtioFs {
+            socket,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            list_root,
+        } => {
+            let (blob_service, directory_service, path_info_service) =
+                tvix_store::utils::construct_services(
+                    blob_service_addr,
+                    directory_service_addr,
+                    path_info_service_addr,
+                )
+                .await?;
+
+            tokio::task::spawn_blocking(move || {
+                let fs = make_fs(
+                    blob_service,
+                    directory_service,
+                    Arc::from(path_info_service),
+                    list_root,
+                );
+                info!(socket_path=?socket, "starting virtiofs-daemon");
+
+                start_virtiofs_daemon(fs, socket)
+            })
+            .await??;
+        }
+    };
+    Ok(())
+}