diff options
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 206 |
1 files changed, 206 insertions, 0 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs new file mode 100644 index 000000000000..8c278c433997 --- /dev/null +++ b/tvix/store/src/bin/tvix-store.rs @@ -0,0 +1,206 @@ +use clap::Subcommand; +use data_encoding::BASE64; +use futures::future::try_join_all; +use std::io; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; +use tracing_subscriber::prelude::*; +use tvix_store::blobservice::GRPCBlobService; +use tvix_store::blobservice::SledBlobService; +use tvix_store::directoryservice::GRPCDirectoryService; +use tvix_store::directoryservice::SledDirectoryService; +use tvix_store::nar::GRPCNARCalculationService; +use tvix_store::nar::NonCachingNARCalculationService; +use tvix_store::pathinfoservice::GRPCPathInfoService; +use tvix_store::pathinfoservice::SledPathInfoService; +use tvix_store::proto::blob_service_client::BlobServiceClient; +use tvix_store::proto::blob_service_server::BlobServiceServer; +use tvix_store::proto::directory_service_client::DirectoryServiceClient; +use tvix_store::proto::directory_service_server::DirectoryServiceServer; +use tvix_store::proto::node::Node; +use tvix_store::proto::path_info_service_client::PathInfoServiceClient; +use tvix_store::proto::path_info_service_server::PathInfoServiceServer; +use tvix_store::proto::GRPCBlobServiceWrapper; +use tvix_store::proto::GRPCDirectoryServiceWrapper; +use tvix_store::proto::GRPCPathInfoServiceWrapper; +use tvix_store::TvixStoreIO; + +#[cfg(feature = "reflection")] +use tvix_store::proto::FILE_DESCRIPTOR_SET; + +use clap::Parser; +use tonic::{transport::Server, Result}; +use tracing::{info, Level}; + +#[derive(Parser)] +#[command(author, version, about, long_about = None)] +struct Cli { + /// Whether to log in JSON + #[arg(long)] + json: bool, + + #[arg(long)] + log_level: Option<Level>, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand)] +enum Commands { + /// Runs the tvix-store daemon. + Daemon { + #[arg(long, short = 'l')] + listen_address: Option<String>, + }, + /// Imports a list of paths into the store (not using the daemon) + Import { + #[clap(value_name = "PATH")] + paths: Vec<PathBuf>, + }, +} + +#[tokio::main] +async fn main() -> Result<(), Box<dyn std::error::Error>> { + let cli = Cli::parse(); + + // configure log settings + let level = cli.log_level.unwrap_or(Level::INFO); + + let subscriber = tracing_subscriber::registry() + .with(if cli.json { + Some( + tracing_subscriber::fmt::Layer::new() + .with_writer(io::stdout.with_max_level(level)) + .json(), + ) + } else { + None + }) + .with(if !cli.json { + Some( + tracing_subscriber::fmt::Layer::new() + .with_writer(io::stdout.with_max_level(level)) + .pretty(), + ) + } else { + None + }); + + tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber"); + + match cli.command { + Commands::Daemon { listen_address } => { + // initialize stores + let blob_service = SledBlobService::new("blobs.sled".into())?; + let directory_service = SledDirectoryService::new("directories.sled".into())?; + let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?; + + let listen_address = listen_address + .unwrap_or_else(|| "[::]:8000".to_string()) + .parse() + .unwrap(); + + let mut server = Server::builder(); + + let nar_calculation_service = NonCachingNARCalculationService::new( + blob_service.clone(), + directory_service.clone(), + ); + + #[allow(unused_mut)] + let mut router = server + .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( + blob_service, + ))) + .add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(directory_service), + )) + .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( + path_info_service, + nar_calculation_service, + ))); + + #[cfg(feature = "reflection")] + { + let reflection_svc = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET) + .build()?; + router = router.add_service(reflection_svc); + } + + info!("tvix-store listening on {}", listen_address); + + router.serve(listen_address).await?; + } + Commands::Import { paths } => { + let blob_service = GRPCBlobService::from_client( + BlobServiceClient::connect("http://[::1]:8000").await?, + ); + let directory_service = GRPCDirectoryService::from_client( + DirectoryServiceClient::connect("http://[::1]:8000").await?, + ); + let path_info_service_client = + PathInfoServiceClient::connect("http://[::1]:8000").await?; + let path_info_service = + GRPCPathInfoService::from_client(path_info_service_client.clone()); + let nar_calculation_service = + GRPCNARCalculationService::from_client(path_info_service_client); + + let io = Arc::new(TvixStoreIO::new( + blob_service, + directory_service, + path_info_service, + nar_calculation_service, + )); + + let tasks = paths + .iter() + .map(|path| { + let io_move = io.clone(); + let path = path.clone(); + let task: tokio::task::JoinHandle<Result<(), io::Error>> = + tokio::task::spawn_blocking(move || { + let path_info = io_move.import_path_with_pathinfo(&path)?; + print_node(&path_info.node.unwrap().node.unwrap(), &path); + Ok(()) + }); + task + }) + .collect::<Vec<tokio::task::JoinHandle<Result<(), io::Error>>>>(); + + try_join_all(tasks).await?; + } + }; + Ok(()) +} + +fn print_node(node: &Node, path: &Path) { + match node { + Node::Directory(directory_node) => { + info!( + path = ?path, + name = directory_node.name, + digest = BASE64.encode(&directory_node.digest), + "import successful", + ) + } + Node::File(file_node) => { + info!( + path = ?path, + name = file_node.name, + digest = BASE64.encode(&file_node.digest), + "import successful" + ) + } + Node::Symlink(symlink_node) => { + info!( + path = ?path, + name = symlink_node.name, + target = symlink_node.target, + "import successful" + ) + } + } +} |