about summary refs log tree commit diff
path: root/tvix/store/src/bin/tvix-store.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r--tvix/store/src/bin/tvix-store.rs32
1 files changed, 18 insertions, 14 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 0da264808efb..f707e3850d4f 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -22,7 +22,7 @@ use tvix_store::proto::GRPCPathInfoServiceWrapper;
 use tvix_store::proto::NamedNode;
 use tvix_store::proto::NarInfo;
 use tvix_store::proto::PathInfo;
-use tvix_store::FUSE;
+use tvix_store::{FuseDaemon, FUSE};
 
 #[cfg(feature = "reflection")]
 use tvix_store::proto::FILE_DESCRIPTOR_SET;
@@ -94,6 +94,10 @@ enum Commands {
         #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
         path_info_service_addr: String,
 
+        /// Number of FUSE threads to spawn.
+        #[arg(long, env, default_value_t = default_threads())]
+        threads: usize,
+
         /// Whether to list elements at the root of the mount point.
         /// This is useful if your PathInfoService doesn't provide an
         /// (exhaustive) listing.
@@ -102,6 +106,13 @@ enum Commands {
     },
 }
 
+#[cfg(feature = "fuse")]
+fn default_threads() -> usize {
+    std::thread::available_parallelism()
+        .map(|threads| threads.into())
+        .unwrap_or(4)
+}
+
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let cli = Cli::parse();
@@ -280,6 +291,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             directory_service_addr,
             path_info_service_addr,
             list_root,
+            threads,
         } => {
             let blob_service = blobservice::from_addr(&blob_service_addr)?;
             let directory_service = directoryservice::from_addr(&directory_service_addr)?;
@@ -289,35 +301,27 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 directory_service.clone(),
             )?;
 
-            let mut fuse_session = tokio::task::spawn_blocking(move || {
+            let mut fuse_daemon = tokio::task::spawn_blocking(move || {
                 let f = FUSE::new(
                     blob_service,
                     directory_service,
                     path_info_service,
                     list_root,
                 );
+                info!("mounting tvix-store on {:?}", &dest);
 
-                fuser::Session::new(f, &dest, &[])
+                FuseDaemon::new(f, &dest, threads)
             })
             .await??;
 
             // grab a handle to unmount the file system, and register a signal
             // handler.
-            let mut fuse_unmounter = fuse_session.unmount_callable();
             tokio::spawn(async move {
                 tokio::signal::ctrl_c().await.unwrap();
                 info!("interrupt received, unmounting…");
-                fuse_unmounter.unmount().unwrap();
-            });
-
-            // Start the fuse filesystem and wait for its completion, which
-            // happens when it's unmounted externally, or via the signal handler
-            // task.
-            tokio::task::spawn_blocking(move || -> io::Result<()> {
-                info!("mounting tvix-store on {:?}", fuse_session.mountpoint());
-                fuse_session.run()?;
+                fuse_daemon.unmount()?;
                 info!("unmount occured, terminating…");
-                Ok(())
+                Ok::<_, io::Error>(())
             })
             .await??;
         }