diff options
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 401 |
1 files changed, 401 insertions, 0 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs new file mode 100644 index 000000000000..ecee8d78f3b7 --- /dev/null +++ b/tvix/store/src/bin/tvix-store.rs @@ -0,0 +1,401 @@ +use clap::Parser; +use clap::Subcommand; + +use futures::future::try_join_all; +use std::path::PathBuf; +use std::sync::Arc; +use tokio_listener::Listener; +use tokio_listener::SystemOptions; +use tokio_listener::UserOptions; +use tonic::transport::Server; +use tracing::info; +use tracing::Level; +use tracing_subscriber::fmt::writer::MakeWriterExt; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +use tvix_castore::proto::blob_service_server::BlobServiceServer; +use tvix_castore::proto::directory_service_server::DirectoryServiceServer; +use tvix_castore::proto::GRPCBlobServiceWrapper; +use tvix_castore::proto::GRPCDirectoryServiceWrapper; +use tvix_store::pathinfoservice::PathInfoService; +use tvix_store::proto::path_info_service_server::PathInfoServiceServer; +use tvix_store::proto::GRPCPathInfoServiceWrapper; + +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +use tvix_store::pathinfoservice::make_fs; + +#[cfg(feature = "fuse")] +use tvix_castore::fs::fuse::FuseDaemon; + +#[cfg(feature = "otlp")] +use opentelemetry::KeyValue; +#[cfg(feature = "otlp")] +use opentelemetry_sdk::{ + resource::{ResourceDetector, SdkProvidedResourceDetector}, + trace::BatchConfig, + Resource, +}; + +#[cfg(feature = "virtiofs")] +use tvix_castore::fs::virtiofs::start_virtiofs_daemon; + +#[cfg(feature = "tonic-reflection")] +use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET; +#[cfg(feature = "tonic-reflection")] +use tvix_store::proto::FILE_DESCRIPTOR_SET; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// Whether to log in JSON + #[arg(long)] + json: bool, + + #[arg(long)] + log_level: Option<Level>, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Runs the tvix-store daemon. + Daemon { + #[arg(long, short = 'l')] + listen_address: Option<String>, + + #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")] + blob_service_addr: String, + + #[arg( + long, + env, + default_value = "sled:///var/lib/tvix-store/directories.sled" + )] + directory_service_addr: String, + + #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")] + path_info_service_addr: String, + }, + /// Imports a list of paths into the store, print the store path for each of them. + Import { + #[clap(value_name = "PATH")] + paths: Vec<PathBuf>, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + }, + /// Mounts a tvix-store at the given mountpoint + #[cfg(feature = "fuse")] + Mount { + #[clap(value_name = "PATH")] + dest: PathBuf, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// Number of FUSE threads to spawn. + #[arg(long, env, default_value_t = default_threads())] + threads: usize, + + /// Whether to list elements at the root of the mount point. + /// This is useful if your PathInfoService doesn't provide an + /// (exhaustive) listing. + #[clap(long, short, action)] + list_root: bool, + }, + /// Starts a tvix-store virtiofs daemon at the given socket path. + #[cfg(feature = "virtiofs")] + #[command(name = "virtiofs")] + VirtioFs { + #[clap(value_name = "PATH")] + socket: PathBuf, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// Whether to list elements at the root of the mount point. + /// This is useful if your PathInfoService doesn't provide an + /// (exhaustive) listing. + #[clap(long, short, action)] + list_root: bool, + }, +} + +#[cfg(all(feature = "fuse", not(target_os = "macos")))] +fn default_threads() -> usize { + std::thread::available_parallelism() + .map(|threads| threads.into()) + .unwrap_or(4) +} +// On MacFUSE only a single channel will receive ENODEV when the file system is +// unmounted and so all the other channels will block forever. +// See https://github.com/osxfuse/osxfuse/issues/974 +#[cfg(all(feature = "fuse", target_os = "macos"))] +fn default_threads() -> usize { + 1 +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + let cli = Cli::parse(); + + // configure log settings + let level = cli.log_level.unwrap_or(Level::INFO); + + let subscriber = tracing_subscriber::registry() + .with( + cli.json.then_some( + tracing_subscriber::fmt::Layer::new() + .with_writer(std::io::stderr.with_max_level(level)) + .json(), + ), + ) + .with( + (!cli.json).then_some( + tracing_subscriber::fmt::Layer::new() + .with_writer(std::io::stderr.with_max_level(level)) + .pretty(), + ), + ); + + // Add the otlp layer (when otlp is enabled), then init the registry. + // Or if the feature is disabled, just init without adding the layer. + // It's necessary to do this separately, as every with() call chains the + // layer into the type of the registry. + #[cfg(feature = "otlp")] + { + subscriber + .with({ + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(opentelemetry_otlp::new_exporter().tonic()) + .with_batch_config(BatchConfig::default()) + .with_trace_config(opentelemetry_sdk::trace::config().with_resource({ + // use SdkProvidedResourceDetector.detect to detect resources, + // but replace the default service name with our default. + // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 + let resources = + SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); + + // SdkProvidedResourceDetector currently always sets + // `service.name`, but we don't like its default. + if resources.get("service.name".into()).unwrap() == "unknown_service".into() + { + resources.merge(&Resource::new([KeyValue::new( + "service.name", + "tvix.store", + )])) + } else { + resources + } + })) + .install_batch(opentelemetry_sdk::runtime::Tokio)?; + + // Create a tracing layer with the configured tracer + tracing_opentelemetry::layer().with_tracer(tracer) + }) + .try_init()?; + } + + // Init the registry (when otlp is not enabled) + #[cfg(not(feature = "otlp"))] + { + subscriber.try_init()?; + } + + match cli.command { + Commands::Daemon { + listen_address, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + } => { + // initialize stores + let (blob_service, directory_service, path_info_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + let listen_address = listen_address + .unwrap_or_else(|| "[::]:8000".to_string()) + .parse() + .unwrap(); + + let mut server = Server::builder(); + + #[allow(unused_mut)] + let mut router = server + .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new( + blob_service, + ))) + .add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::new(directory_service), + )) + .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( + Arc::from(path_info_service), + ))); + + #[cfg(feature = "tonic-reflection")] + { + let reflection_svc = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build()?; + router = router.add_service(reflection_svc); + } + + info!(listen_address=%listen_address, "starting daemon"); + + let listener = Listener::bind( + &listen_address, + &SystemOptions::default(), + &UserOptions::default(), + ) + .await?; + + router.serve_with_incoming(listener).await?; + } + Commands::Import { + paths, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + } => { + // FUTUREWORK: allow flat for single files? + let (blob_service, directory_service, path_info_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + // Arc the PathInfoService, as we clone it . + let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + + let tasks = paths + .into_iter() + .map(|path| { + tokio::task::spawn({ + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + let path_info_service = path_info_service.clone(); + + async move { + if let Ok(name) = tvix_store::import::path_to_name(&path) { + let resp = tvix_store::import::import_path_as_nar_ca( + &path, + name, + blob_service, + directory_service, + path_info_service, + ) + .await; + if let Ok(output_path) = resp { + // If the import was successful, print the path to stdout. + println!("{}", output_path.to_absolute_path()); + } + } + } + }) + }) + .collect::<Vec<_>>(); + + try_join_all(tasks).await?; + } + #[cfg(feature = "fuse")] + Commands::Mount { + dest, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + list_root, + threads, + } => { + let (blob_service, directory_service, path_info_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + let mut fuse_daemon = tokio::task::spawn_blocking(move || { + let fs = make_fs( + blob_service, + directory_service, + Arc::from(path_info_service), + list_root, + ); + info!(mount_path=?dest, "mounting"); + + FuseDaemon::new(fs, &dest, threads) + }) + .await??; + + // grab a handle to unmount the file system, and register a signal + // handler. + tokio::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + info!("interrupt received, unmounting…"); + tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??; + info!("unmount occured, terminating…"); + Ok::<_, std::io::Error>(()) + }) + .await??; + } + #[cfg(feature = "virtiofs")] + Commands::VirtioFs { + socket, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + list_root, + } => { + let (blob_service, directory_service, path_info_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + tokio::task::spawn_blocking(move || { + let fs = make_fs( + blob_service, + directory_service, + Arc::from(path_info_service), + list_root, + ); + info!(socket_path=?socket, "starting virtiofs-daemon"); + + start_virtiofs_daemon(fs, socket) + }) + .await??; + } + }; + Ok(()) +} |