about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/Cargo.lock10
-rw-r--r--tvix/Cargo.nix25
-rw-r--r--tvix/castore/Cargo.toml6
-rw-r--r--tvix/castore/src/fs/fuse/mod.rs56
-rw-r--r--tvix/castore/src/fs/fuse/tests.rs44
-rw-r--r--tvix/store/src/bin/tvix-store.rs28
6 files changed, 113 insertions, 56 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index 7bc0eb232b..b4ee59031b 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -3694,6 +3694,15 @@ dependencies = [
 ]
 
 [[package]]
+name = "threadpool"
+version = "1.8.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
+dependencies = [
+ "num_cpus",
+]
+
+[[package]]
 name = "tikv-jemalloc-sys"
 version = "0.5.4+5.3.0-patched"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -4230,6 +4239,7 @@ dependencies = [
  "sled",
  "tempfile",
  "thiserror",
+ "threadpool",
  "tokio",
  "tokio-retry",
  "tokio-stream",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 3b8c45bf48..cd8ec667f3 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -11293,6 +11293,24 @@ rec {
         ];
         features = { };
       };
+      "threadpool" = rec {
+        crateName = "threadpool";
+        version = "1.8.1";
+        edition = "2015";
+        sha256 = "1amgfyzvynbm8pacniivzq9r0fh3chhs7kijic81j76l6c5ycl6h";
+        authors = [
+          "The Rust Project Developers"
+          "Corey Farwell <coreyf@rwell.org>"
+          "Stefan Schindler <dns2utf8@estada.ch>"
+        ];
+        dependencies = [
+          {
+            name = "num_cpus";
+            packageId = "num_cpus";
+          }
+        ];
+
+      };
       "tikv-jemalloc-sys" = rec {
         crateName = "tikv-jemalloc-sys";
         version = "0.5.4+5.3.0-patched";
@@ -13320,6 +13338,11 @@ rec {
             packageId = "thiserror";
           }
           {
+            name = "threadpool";
+            packageId = "threadpool";
+            optional = true;
+          }
+          {
             name = "tokio";
             packageId = "tokio";
             features = [ "fs" "macros" "net" "rt" "rt-multi-thread" "signal" ];
@@ -13445,7 +13468,7 @@ rec {
         features = {
           "cloud" = [ "dep:bigtable_rs" "object_store/aws" "object_store/azure" "object_store/gcp" ];
           "default" = [ "cloud" ];
-          "fs" = [ "dep:libc" "dep:fuse-backend-rs" ];
+          "fs" = [ "dep:fuse-backend-rs" "dep:threadpool" "dep:libc" ];
           "fuse" = [ "fs" ];
           "tonic-reflection" = [ "dep:tonic-reflection" ];
           "virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ];
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index c03dca3ebb..e82a6d1242 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -51,6 +51,10 @@ version = "0.11.0"
 optional = true
 version = "0.2.144"
 
+[dependencies.threadpool]
+version = "1.8.1"
+optional = true
+
 [dependencies.tonic-reflection]
 optional = true
 version = "0.11.0"
@@ -100,7 +104,7 @@ cloud = [
   "object_store/azure",
   "object_store/gcp",
 ]
-fs = ["dep:libc", "dep:fuse-backend-rs"]
+fs = ["dep:fuse-backend-rs", "dep:threadpool", "dep:libc"]
 virtiofs = [
   "fs",
   "dep:vhost",
diff --git a/tvix/castore/src/fs/fuse/mod.rs b/tvix/castore/src/fs/fuse/mod.rs
index 94b73d422a..64ef29ed2a 100644
--- a/tvix/castore/src/fs/fuse/mod.rs
+++ b/tvix/castore/src/fs/fuse/mod.rs
@@ -1,6 +1,8 @@
-use std::{io, path::Path, sync::Arc, thread};
+use std::{io, path::Path, sync::Arc};
 
 use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
+use parking_lot::Mutex;
+use threadpool::ThreadPool;
 use tracing::{error, instrument};
 
 #[cfg(test)]
@@ -49,9 +51,12 @@ where
     }
 }
 
+/// Starts a [Filesystem] with the specified number of threads, and provides
+/// functions to unmount, and wait for it to have completed.
+#[derive(Clone)]
 pub struct FuseDaemon {
-    session: FuseSession,
-    threads: Vec<thread::JoinHandle<()>>,
+    session: Arc<Mutex<FuseSession>>,
+    threads: Arc<ThreadPool>,
 }
 
 impl FuseDaemon {
@@ -59,7 +64,7 @@ impl FuseDaemon {
     pub fn new<FS, P>(
         fs: FS,
         mountpoint: P,
-        threads: usize,
+        num_threads: usize,
         allow_other: bool,
     ) -> Result<Self, io::Error>
     where
@@ -76,40 +81,49 @@ impl FuseDaemon {
         session
             .mount()
             .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
-        let mut join_handles = Vec::with_capacity(threads);
-        for _ in 0..threads {
+
+        // construct a thread pool
+        let threads = threadpool::Builder::new()
+            .num_threads(num_threads)
+            .thread_name("fuse_server".to_string())
+            .build();
+
+        for _ in 0..num_threads {
+            // for each thread requested, create and start a FuseServer accepting requests.
             let mut server = FuseServer {
                 server: server.clone(),
                 channel: session
                     .new_channel()
                     .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
             };
-            let join_handle = thread::Builder::new()
-                .name("fuse_server".to_string())
-                .spawn(move || {
-                    let _ = server.start();
-                })?;
-            join_handles.push(join_handle);
+
+            threads.execute(move || {
+                let _ = server.start();
+            });
         }
 
         Ok(FuseDaemon {
-            session,
-            threads: join_handles,
+            session: Arc::new(Mutex::new(session)),
+            threads: Arc::new(threads),
         })
     }
 
+    /// Waits for all threads to finish.
+    #[instrument(skip_all)]
+    pub fn wait(&self) {
+        self.threads.join()
+    }
+
+    /// Send the unmount command, and waits for all threads to finish.
     #[instrument(skip_all, err)]
-    pub fn unmount(&mut self) -> Result<(), io::Error> {
+    pub fn unmount(&self) -> Result<(), io::Error> {
+        // Send the unmount command.
         self.session
+            .lock()
             .umount()
             .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
 
-        for thread in self.threads.drain(..) {
-            thread.join().map_err(|_| {
-                io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
-            })?;
-        }
-
+        self.wait();
         Ok(())
     }
 }
diff --git a/tvix/castore/src/fs/fuse/tests.rs b/tvix/castore/src/fs/fuse/tests.rs
index bb321f5888..bcebcf4a72 100644
--- a/tvix/castore/src/fs/fuse/tests.rs
+++ b/tvix/castore/src/fs/fuse/tests.rs
@@ -248,7 +248,7 @@ async fn mount() {
 
     let (blob_service, directory_service) = gen_svcs();
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         BTreeMap::default(),
@@ -271,7 +271,7 @@ async fn root() {
     let tmpdir = TempDir::new().unwrap();
 
     let (blob_service, directory_service) = gen_svcs();
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         BTreeMap::default(),
@@ -305,7 +305,7 @@ async fn root_with_listing() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -349,7 +349,7 @@ async fn stat_file_at_root() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -386,7 +386,7 @@ async fn read_file_at_root() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -423,7 +423,7 @@ async fn read_large_file_at_root() {
 
     populate_blob_b(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -468,7 +468,7 @@ async fn symlink_readlink() {
 
     populate_symlink(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -515,7 +515,7 @@ async fn read_stat_through_symlink() {
     populate_blob_a(&blob_service, &mut root_nodes).await;
     populate_symlink(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -560,7 +560,7 @@ async fn read_stat_directory() {
 
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -597,7 +597,7 @@ async fn xattr() {
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -680,7 +680,7 @@ async fn read_blob_inside_dir() {
 
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -720,7 +720,7 @@ async fn read_blob_deep_inside_dir() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -763,7 +763,7 @@ async fn readdir() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -823,7 +823,7 @@ async fn readdir_deep() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -873,7 +873,7 @@ async fn check_attributes() {
     populate_symlink(&mut root_nodes).await;
     populate_blob_helloworld(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -948,7 +948,7 @@ async fn compare_inodes_directories() {
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -992,7 +992,7 @@ async fn compare_inodes_files() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1041,7 +1041,7 @@ async fn compare_inodes_symlinks() {
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
     populate_symlink2(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1084,7 +1084,7 @@ async fn read_wrong_paths_in_root() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1139,7 +1139,7 @@ async fn disallow_writes() {
     let (blob_service, directory_service) = gen_svcs();
     let root_nodes = BTreeMap::default();
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1171,7 +1171,7 @@ async fn missing_directory() {
 
     populate_directorynode_without_directory(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1219,7 +1219,7 @@ async fn missing_blob() {
 
     populate_filenode_without_blob(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index dadfa114f7..4353cbdd03 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -452,7 +452,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
                 )
                 .await?;
 
-            let mut fuse_daemon = tokio::task::spawn_blocking(move || {
+            let fuse_daemon = tokio::task::spawn_blocking(move || {
                 let fs = make_fs(
                     blob_service,
                     directory_service,
@@ -466,16 +466,22 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> {
             })
             .await??;
 
-            // grab a handle to unmount the file system, and register a signal
-            // handler.
-            tokio::spawn(async move {
-                tokio::signal::ctrl_c().await.unwrap();
-                info!("interrupt received, unmounting…");
-                tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
-                info!("unmount occured, terminating…");
-                Ok::<_, std::io::Error>(())
-            })
-            .await??;
+            // Wait for a ctrl_c and then call fuse_daemon.unmount().
+            tokio::spawn({
+                let fuse_daemon = fuse_daemon.clone();
+                async move {
+                    tokio::signal::ctrl_c().await.unwrap();
+                    info!("interrupt received, unmounting…");
+                    tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??;
+                    info!("unmount occured, terminating…");
+                    Ok::<_, std::io::Error>(())
+                }
+            });
+
+            // Wait for the server to finish, which can either happen through it
+            // being unmounted externally, or receiving a signal invoking the
+            // handler above.
+            tokio::task::spawn_blocking(move || fuse_daemon.wait()).await?
         }
         #[cfg(feature = "virtiofs")]
         Commands::VirtioFs {