diff options
author | Florian Klink <flokli@flokli.de> | 2024-06-16T08·43+0300 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2024-06-16T12·02+0000 |
commit | 7e42b4f314c2136366c4456f2af894108e765abf (patch) | |
tree | cee3fb45a378c76ff9d62bf18ce0118e9d3cfbe1 /tvix/castore/src/fs/fuse/mod.rs | |
parent | 452299dcd205847477874698740626af041705df (diff) |
fix(tvix/store/bin): fix shutdown behaviour for FUSE r/8285
Both umounts happening from another process, as well as tvix-store itself calling umount() on FuseDaemon will cause the FUSE worker threads to terminate. So far there was no nice way to wait on these threads to be terminated from multiple places, causing the `tvix-store mount` command to only be terminated if interrupted via ctrl-c, not via an external umount. Update FuseDaemon to use a ThreadPool, which gives us a join primitive over all threads, that can also be called from multiple places. Await on a join() from there to end the program, not the ctrl-c signal handler as it was before. Using FuseDaemon from multiple tasks requires Arc<>-ing both the ThreadPool as well as the inner FuseSession (which also needs to be inside a Mutex if we want to unmount), but now we can clone FuseDaemon around and use it in two places. We could probably also have used an Option and drop the FuseSession after the first umount, but this looks cleaner. Change-Id: Id635ef59b560c111db52ad0b3ca3d12bc7ae28ca Reviewed-on: https://cl.tvl.fyi/c/depot/+/11825 Reviewed-by: Brian Olsen <me@griff.name> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/fs/fuse/mod.rs')
-rw-r--r-- | tvix/castore/src/fs/fuse/mod.rs | 56 |
1 files changed, 35 insertions, 21 deletions
diff --git a/tvix/castore/src/fs/fuse/mod.rs b/tvix/castore/src/fs/fuse/mod.rs index 94b73d422a14..64ef29ed2aa1 100644 --- a/tvix/castore/src/fs/fuse/mod.rs +++ b/tvix/castore/src/fs/fuse/mod.rs @@ -1,6 +1,8 @@ -use std::{io, path::Path, sync::Arc, thread}; +use std::{io, path::Path, sync::Arc}; use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; +use parking_lot::Mutex; +use threadpool::ThreadPool; use tracing::{error, instrument}; #[cfg(test)] @@ -49,9 +51,12 @@ where } } +/// Starts a [Filesystem] with the specified number of threads, and provides +/// functions to unmount, and wait for it to have completed. +#[derive(Clone)] pub struct FuseDaemon { - session: FuseSession, - threads: Vec<thread::JoinHandle<()>>, + session: Arc<Mutex<FuseSession>>, + threads: Arc<ThreadPool>, } impl FuseDaemon { @@ -59,7 +64,7 @@ impl FuseDaemon { pub fn new<FS, P>( fs: FS, mountpoint: P, - threads: usize, + num_threads: usize, allow_other: bool, ) -> Result<Self, io::Error> where @@ -76,40 +81,49 @@ impl FuseDaemon { session .mount() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - let mut join_handles = Vec::with_capacity(threads); - for _ in 0..threads { + + // construct a thread pool + let threads = threadpool::Builder::new() + .num_threads(num_threads) + .thread_name("fuse_server".to_string()) + .build(); + + for _ in 0..num_threads { + // for each thread requested, create and start a FuseServer accepting requests. let mut server = FuseServer { server: server.clone(), channel: session .new_channel() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, }; - let join_handle = thread::Builder::new() - .name("fuse_server".to_string()) - .spawn(move || { - let _ = server.start(); - })?; - join_handles.push(join_handle); + + threads.execute(move || { + let _ = server.start(); + }); } Ok(FuseDaemon { - session, - threads: join_handles, + session: Arc::new(Mutex::new(session)), + threads: Arc::new(threads), }) } + /// Waits for all threads to finish. + #[instrument(skip_all)] + pub fn wait(&self) { + self.threads.join() + } + + /// Send the unmount command, and waits for all threads to finish. #[instrument(skip_all, err)] - pub fn unmount(&mut self) -> Result<(), io::Error> { + pub fn unmount(&self) -> Result<(), io::Error> { + // Send the unmount command. self.session + .lock() .umount() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - for thread in self.threads.drain(..) { - thread.join().map_err(|_| { - io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") - })?; - } - + self.wait(); Ok(()) } } |