about summary refs log tree commit diff
path: root/tvix/store/src/bin/tvix-store.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r--tvix/store/src/bin/tvix-store.rs425
1 files changed, 425 insertions, 0 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
new file mode 100644
index 000000000000..11f19857dd6f
--- /dev/null
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -0,0 +1,425 @@
+use clap::Subcommand;
+use data_encoding::BASE64;
+use futures::future::try_join_all;
+use nix_compat::store_path;
+use nix_compat::store_path::StorePath;
+use std::io;
+use std::path::Path;
+use std::path::PathBuf;
+use tokio::task::JoinHandle;
+use tracing_subscriber::prelude::*;
+use tvix_castore::blobservice;
+use tvix_castore::directoryservice;
+use tvix_castore::import;
+use tvix_castore::proto::blob_service_server::BlobServiceServer;
+use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
+use tvix_castore::proto::node::Node;
+use tvix_castore::proto::GRPCBlobServiceWrapper;
+use tvix_castore::proto::GRPCDirectoryServiceWrapper;
+use tvix_castore::proto::NamedNode;
+use tvix_store::listener::ListenerStream;
+use tvix_store::pathinfoservice;
+use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
+use tvix_store::proto::GRPCPathInfoServiceWrapper;
+use tvix_store::proto::NarInfo;
+use tvix_store::proto::PathInfo;
+
+#[cfg(feature = "fs")]
+use tvix_store::fs::TvixStoreFs;
+
+#[cfg(feature = "fuse")]
+use tvix_store::fs::fuse::FuseDaemon;
+
+#[cfg(feature = "virtiofs")]
+use tvix_store::fs::virtiofs::start_virtiofs_daemon;
+
+#[cfg(feature = "tonic-reflection")]
+use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET;
+#[cfg(feature = "tonic-reflection")]
+use tvix_store::proto::FILE_DESCRIPTOR_SET;
+
+use clap::Parser;
+use tonic::{transport::Server, Result};
+use tracing::{info, Level};
+
+#[derive(Parser)]
+#[command(author, version, about, long_about = None)]
+struct Cli {
+    /// Whether to log in JSON
+    #[arg(long)]
+    json: bool,
+
+    #[arg(long)]
+    log_level: Option<Level>,
+
+    #[command(subcommand)]
+    command: Commands,
+}
+
+#[derive(Subcommand)]
+enum Commands {
+    /// Runs the tvix-store daemon.
+    Daemon {
+        #[arg(long, short = 'l')]
+        listen_address: Option<String>,
+
+        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")]
+        blob_service_addr: String,
+
+        #[arg(
+            long,
+            env,
+            default_value = "sled:///var/lib/tvix-store/directories.sled"
+        )]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "sled:///var/lib/tvix-store/pathinfo.sled")]
+        path_info_service_addr: String,
+    },
+    /// Imports a list of paths into the store, print the store path for each of them.
+    Import {
+        #[clap(value_name = "PATH")]
+        paths: Vec<PathBuf>,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+    },
+    /// Mounts a tvix-store at the given mountpoint
+    #[cfg(feature = "fuse")]
+    Mount {
+        #[clap(value_name = "PATH")]
+        dest: PathBuf,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+
+        /// Number of FUSE threads to spawn.
+        #[arg(long, env, default_value_t = default_threads())]
+        threads: usize,
+
+        /// Whether to list elements at the root of the mount point.
+        /// This is useful if your PathInfoService doesn't provide an
+        /// (exhaustive) listing.
+        #[clap(long, short, action)]
+        list_root: bool,
+    },
+    /// Starts a tvix-store virtiofs daemon at the given socket path.
+    #[cfg(feature = "virtiofs")]
+    #[command(name = "virtiofs")]
+    VirtioFs {
+        #[clap(value_name = "PATH")]
+        socket: PathBuf,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        blob_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        directory_service_addr: String,
+
+        #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
+        path_info_service_addr: String,
+
+        /// Whether to list elements at the root of the mount point.
+        /// This is useful if your PathInfoService doesn't provide an
+        /// (exhaustive) listing.
+        #[clap(long, short, action)]
+        list_root: bool,
+    },
+}
+
+#[cfg(all(feature = "fuse", not(target_os = "macos")))]
+fn default_threads() -> usize {
+    std::thread::available_parallelism()
+        .map(|threads| threads.into())
+        .unwrap_or(4)
+}
+// On MacFUSE only a single channel will receive ENODEV when the file system is
+// unmounted and so all the other channels will block forever.
+// See https://github.com/osxfuse/osxfuse/issues/974
+#[cfg(all(feature = "fuse", target_os = "macos"))]
+fn default_threads() -> usize {
+    1
+}
+
+#[tokio::main]
+async fn main() -> Result<(), Box<dyn std::error::Error>> {
+    let cli = Cli::parse();
+
+    // configure log settings
+    let level = cli.log_level.unwrap_or(Level::INFO);
+
+    let subscriber = tracing_subscriber::registry()
+        .with(if cli.json {
+            Some(
+                tracing_subscriber::fmt::Layer::new()
+                    .with_writer(io::stderr.with_max_level(level))
+                    .json(),
+            )
+        } else {
+            None
+        })
+        .with(if !cli.json {
+            Some(
+                tracing_subscriber::fmt::Layer::new()
+                    .with_writer(io::stderr.with_max_level(level))
+                    .pretty(),
+            )
+        } else {
+            None
+        });
+
+    tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber");
+
+    match cli.command {
+        Commands::Daemon {
+            listen_address,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+        } => {
+            // initialize stores
+            let blob_service = blobservice::from_addr(&blob_service_addr)?;
+            let directory_service = directoryservice::from_addr(&directory_service_addr)?;
+            let path_info_service = pathinfoservice::from_addr(
+                &path_info_service_addr,
+                blob_service.clone(),
+                directory_service.clone(),
+            )?;
+
+            let listen_address = listen_address
+                .unwrap_or_else(|| "[::]:8000".to_string())
+                .parse()
+                .unwrap();
+
+            let mut server = Server::builder();
+
+            #[allow(unused_mut)]
+            let mut router = server
+                .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from(
+                    blob_service,
+                )))
+                .add_service(DirectoryServiceServer::new(
+                    GRPCDirectoryServiceWrapper::from(directory_service),
+                ))
+                .add_service(PathInfoServiceServer::new(
+                    GRPCPathInfoServiceWrapper::from(path_info_service),
+                ));
+
+            #[cfg(feature = "tonic-reflection")]
+            {
+                let reflection_svc = tonic_reflection::server::Builder::configure()
+                    .register_encoded_file_descriptor_set(CASTORE_FILE_DESCRIPTOR_SET)
+                    .register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET)
+                    .build()?;
+                router = router.add_service(reflection_svc);
+            }
+
+            info!("tvix-store listening on {}", listen_address);
+
+            let listener = ListenerStream::bind(&listen_address).await?;
+
+            router.serve_with_incoming(listener).await?;
+        }
+        Commands::Import {
+            paths,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+        } => {
+            // FUTUREWORK: allow flat for single files?
+            let blob_service = blobservice::from_addr(&blob_service_addr)?;
+            let directory_service = directoryservice::from_addr(&directory_service_addr)?;
+            let path_info_service = pathinfoservice::from_addr(
+                &path_info_service_addr,
+                blob_service.clone(),
+                directory_service.clone(),
+            )?;
+
+            let tasks = paths
+                .into_iter()
+                .map(|path| {
+                    let blob_service = blob_service.clone();
+                    let directory_service = directory_service.clone();
+                    let path_info_service = path_info_service.clone();
+
+                    let task: JoinHandle<io::Result<()>> = tokio::task::spawn(async move {
+                        // Ingest the path into blob and directory service.
+                        let root_node = import::ingest_path(
+                            blob_service.clone(),
+                            directory_service.clone(),
+                            &path,
+                        )
+                        .await
+                        .expect("failed to ingest path");
+
+                        // Ask the PathInfoService for the NAR size and sha256
+                        let root_node_copy = root_node.clone();
+                        let path_info_service_clone = path_info_service.clone();
+                        let (nar_size, nar_sha256) = path_info_service_clone
+                            .calculate_nar(&root_node_copy)
+                            .await?;
+
+                        let name = path
+                            .file_name()
+                            .expect("path must not be ..")
+                            .to_str()
+                            .expect("path must be valid unicode");
+
+                        let output_path = store_path::build_nar_based_store_path(&nar_sha256, name);
+
+                        // assemble a new root_node with a name that is derived from the nar hash.
+                        let root_node =
+                            root_node.rename(output_path.to_string().into_bytes().into());
+
+                        // assemble the [crate::proto::PathInfo] object.
+                        let path_info = PathInfo {
+                            node: Some(tvix_castore::proto::Node {
+                                node: Some(root_node),
+                            }),
+                            // There's no reference scanning on path contents ingested like this.
+                            references: vec![],
+                            narinfo: Some(NarInfo {
+                                nar_size,
+                                nar_sha256: nar_sha256.to_vec().into(),
+                                signatures: vec![],
+                                reference_names: vec![],
+                            }),
+                        };
+
+                        // put into [PathInfoService], and return the PathInfo that we get back
+                        // from there (it might contain additional signatures).
+                        let path_info = path_info_service.put(path_info).await?;
+
+                        let node = path_info.node.unwrap().node.unwrap();
+
+                        log_node(&node, &path);
+
+                        println!(
+                            "{}",
+                            StorePath::from_bytes(node.get_name())
+                                .unwrap()
+                                .to_absolute_path()
+                        );
+
+                        Ok(())
+                    });
+                    task
+                })
+                .collect::<Vec<_>>();
+
+            try_join_all(tasks).await?;
+        }
+        #[cfg(feature = "fuse")]
+        Commands::Mount {
+            dest,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            list_root,
+            threads,
+        } => {
+            let blob_service = blobservice::from_addr(&blob_service_addr)?;
+            let directory_service = directoryservice::from_addr(&directory_service_addr)?;
+            let path_info_service = pathinfoservice::from_addr(
+                &path_info_service_addr,
+                blob_service.clone(),
+                directory_service.clone(),
+            )?;
+
+            let mut fuse_daemon = tokio::task::spawn_blocking(move || {
+                let f = TvixStoreFs::new(
+                    blob_service,
+                    directory_service,
+                    path_info_service,
+                    list_root,
+                );
+                info!("mounting tvix-store on {:?}", &dest);
+
+                FuseDaemon::new(f, &dest, threads)
+            })
+            .await??;
+
+            // grab a handle to unmount the file system, and register a signal
+            // handler.
+            tokio::spawn(async move {
+                tokio::signal::ctrl_c().await.unwrap();
+                info!("interrupt received, unmounting…");
+                tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
+                info!("unmount occured, terminating…");
+                Ok::<_, io::Error>(())
+            })
+            .await??;
+        }
+        #[cfg(feature = "virtiofs")]
+        Commands::VirtioFs {
+            socket,
+            blob_service_addr,
+            directory_service_addr,
+            path_info_service_addr,
+            list_root,
+        } => {
+            let blob_service = blobservice::from_addr(&blob_service_addr)?;
+            let directory_service = directoryservice::from_addr(&directory_service_addr)?;
+            let path_info_service = pathinfoservice::from_addr(
+                &path_info_service_addr,
+                blob_service.clone(),
+                directory_service.clone(),
+            )?;
+
+            tokio::task::spawn_blocking(move || {
+                let fs = TvixStoreFs::new(
+                    blob_service,
+                    directory_service,
+                    path_info_service,
+                    list_root,
+                );
+                info!("starting tvix-store virtiofs daemon on {:?}", &socket);
+
+                start_virtiofs_daemon(fs, socket)
+            })
+            .await??;
+        }
+    };
+    Ok(())
+}
+
+fn log_node(node: &Node, path: &Path) {
+    match node {
+        Node::Directory(directory_node) => {
+            info!(
+                path = ?path,
+                name = ?directory_node.name,
+                digest = BASE64.encode(&directory_node.digest),
+                "import successful",
+            )
+        }
+        Node::File(file_node) => {
+            info!(
+                path = ?path,
+                name = ?file_node.name,
+                digest = BASE64.encode(&file_node.digest),
+                "import successful"
+            )
+        }
+        Node::Symlink(symlink_node) => {
+            info!(
+                path = ?path,
+                name = ?symlink_node.name,
+                target = ?symlink_node.target,
+                "import successful"
+            )
+        }
+    }
+}