about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/Cargo.toml6
-rw-r--r--tvix/castore/src/fs/fuse/mod.rs56
-rw-r--r--tvix/castore/src/fs/fuse/tests.rs44
3 files changed, 62 insertions, 44 deletions
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index c03dca3ebb34..e82a6d12421d 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -51,6 +51,10 @@ version = "0.11.0"
 optional = true
 version = "0.2.144"
 
+[dependencies.threadpool]
+version = "1.8.1"
+optional = true
+
 [dependencies.tonic-reflection]
 optional = true
 version = "0.11.0"
@@ -100,7 +104,7 @@ cloud = [
   "object_store/azure",
   "object_store/gcp",
 ]
-fs = ["dep:libc", "dep:fuse-backend-rs"]
+fs = ["dep:fuse-backend-rs", "dep:threadpool", "dep:libc"]
 virtiofs = [
   "fs",
   "dep:vhost",
diff --git a/tvix/castore/src/fs/fuse/mod.rs b/tvix/castore/src/fs/fuse/mod.rs
index 94b73d422a14..64ef29ed2aa1 100644
--- a/tvix/castore/src/fs/fuse/mod.rs
+++ b/tvix/castore/src/fs/fuse/mod.rs
@@ -1,6 +1,8 @@
-use std::{io, path::Path, sync::Arc, thread};
+use std::{io, path::Path, sync::Arc};
 
 use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
+use parking_lot::Mutex;
+use threadpool::ThreadPool;
 use tracing::{error, instrument};
 
 #[cfg(test)]
@@ -49,9 +51,12 @@ where
     }
 }
 
+/// Starts a [Filesystem] with the specified number of threads, and provides
+/// functions to unmount, and wait for it to have completed.
+#[derive(Clone)]
 pub struct FuseDaemon {
-    session: FuseSession,
-    threads: Vec<thread::JoinHandle<()>>,
+    session: Arc<Mutex<FuseSession>>,
+    threads: Arc<ThreadPool>,
 }
 
 impl FuseDaemon {
@@ -59,7 +64,7 @@ impl FuseDaemon {
     pub fn new<FS, P>(
         fs: FS,
         mountpoint: P,
-        threads: usize,
+        num_threads: usize,
         allow_other: bool,
     ) -> Result<Self, io::Error>
     where
@@ -76,40 +81,49 @@ impl FuseDaemon {
         session
             .mount()
             .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
-        let mut join_handles = Vec::with_capacity(threads);
-        for _ in 0..threads {
+
+        // construct a thread pool
+        let threads = threadpool::Builder::new()
+            .num_threads(num_threads)
+            .thread_name("fuse_server".to_string())
+            .build();
+
+        for _ in 0..num_threads {
+            // for each thread requested, create and start a FuseServer accepting requests.
             let mut server = FuseServer {
                 server: server.clone(),
                 channel: session
                     .new_channel()
                     .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
             };
-            let join_handle = thread::Builder::new()
-                .name("fuse_server".to_string())
-                .spawn(move || {
-                    let _ = server.start();
-                })?;
-            join_handles.push(join_handle);
+
+            threads.execute(move || {
+                let _ = server.start();
+            });
         }
 
         Ok(FuseDaemon {
-            session,
-            threads: join_handles,
+            session: Arc::new(Mutex::new(session)),
+            threads: Arc::new(threads),
         })
     }
 
+    /// Waits for all threads to finish.
+    #[instrument(skip_all)]
+    pub fn wait(&self) {
+        self.threads.join()
+    }
+
+    /// Send the unmount command, and waits for all threads to finish.
     #[instrument(skip_all, err)]
-    pub fn unmount(&mut self) -> Result<(), io::Error> {
+    pub fn unmount(&self) -> Result<(), io::Error> {
+        // Send the unmount command.
         self.session
+            .lock()
             .umount()
             .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
 
-        for thread in self.threads.drain(..) {
-            thread.join().map_err(|_| {
-                io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
-            })?;
-        }
-
+        self.wait();
         Ok(())
     }
 }
diff --git a/tvix/castore/src/fs/fuse/tests.rs b/tvix/castore/src/fs/fuse/tests.rs
index bb321f5888f8..bcebcf4a7292 100644
--- a/tvix/castore/src/fs/fuse/tests.rs
+++ b/tvix/castore/src/fs/fuse/tests.rs
@@ -248,7 +248,7 @@ async fn mount() {
 
     let (blob_service, directory_service) = gen_svcs();
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         BTreeMap::default(),
@@ -271,7 +271,7 @@ async fn root() {
     let tmpdir = TempDir::new().unwrap();
 
     let (blob_service, directory_service) = gen_svcs();
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         BTreeMap::default(),
@@ -305,7 +305,7 @@ async fn root_with_listing() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -349,7 +349,7 @@ async fn stat_file_at_root() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -386,7 +386,7 @@ async fn read_file_at_root() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -423,7 +423,7 @@ async fn read_large_file_at_root() {
 
     populate_blob_b(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -468,7 +468,7 @@ async fn symlink_readlink() {
 
     populate_symlink(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -515,7 +515,7 @@ async fn read_stat_through_symlink() {
     populate_blob_a(&blob_service, &mut root_nodes).await;
     populate_symlink(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -560,7 +560,7 @@ async fn read_stat_directory() {
 
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -597,7 +597,7 @@ async fn xattr() {
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -680,7 +680,7 @@ async fn read_blob_inside_dir() {
 
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -720,7 +720,7 @@ async fn read_blob_deep_inside_dir() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -763,7 +763,7 @@ async fn readdir() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -823,7 +823,7 @@ async fn readdir_deep() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -873,7 +873,7 @@ async fn check_attributes() {
     populate_symlink(&mut root_nodes).await;
     populate_blob_helloworld(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -948,7 +948,7 @@ async fn compare_inodes_directories() {
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -992,7 +992,7 @@ async fn compare_inodes_files() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1041,7 +1041,7 @@ async fn compare_inodes_symlinks() {
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
     populate_symlink2(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1084,7 +1084,7 @@ async fn read_wrong_paths_in_root() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1139,7 +1139,7 @@ async fn disallow_writes() {
     let (blob_service, directory_service) = gen_svcs();
     let root_nodes = BTreeMap::default();
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1171,7 +1171,7 @@ async fn missing_directory() {
 
     populate_directorynode_without_directory(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1219,7 +1219,7 @@ async fn missing_blob() {
 
     populate_filenode_without_blob(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,