diff options
Diffstat (limited to 'tvix/castore/src/fs/fuse/mod.rs')
-rw-r--r-- | tvix/castore/src/fs/fuse/mod.rs | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/tvix/castore/src/fs/fuse/mod.rs b/tvix/castore/src/fs/fuse/mod.rs new file mode 100644 index 000000000000..64ef29ed2aa1 --- /dev/null +++ b/tvix/castore/src/fs/fuse/mod.rs @@ -0,0 +1,137 @@ +use std::{io, path::Path, sync::Arc}; + +use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; +use parking_lot::Mutex; +use threadpool::ThreadPool; +use tracing::{error, instrument}; + +#[cfg(test)] +mod tests; + +struct FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>, + channel: fuse_backend_rs::transport::FuseChannel, +} + +#[cfg(target_os = "macos")] +const BADFD: libc::c_int = libc::EBADF; +#[cfg(target_os = "linux")] +const BADFD: libc::c_int = libc::EBADFD; + +impl<FS> FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + fn start(&mut self) -> io::Result<()> { + while let Some((reader, writer)) = self + .channel + .get_request() + .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? + { + if let Err(e) = self + .server + .handle_message(reader, writer.into(), None, None) + { + match e { + // This indicates the session has been shut down. + fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => { + break; + } + error => { + error!(?error, "failed to handle fuse request"); + continue; + } + } + } + } + Ok(()) + } +} + +/// Starts a [Filesystem] with the specified number of threads, and provides +/// functions to unmount, and wait for it to have completed. +#[derive(Clone)] +pub struct FuseDaemon { + session: Arc<Mutex<FuseSession>>, + threads: Arc<ThreadPool>, +} + +impl FuseDaemon { + #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)] + pub fn new<FS, P>( + fs: FS, + mountpoint: P, + num_threads: usize, + allow_other: bool, + ) -> Result<Self, io::Error> + where + FS: FileSystem + Sync + Send + 'static, + P: AsRef<Path> + std::fmt::Debug, + { + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + #[cfg(target_os = "linux")] + session.set_allow_other(allow_other); + session + .mount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + // construct a thread pool + let threads = threadpool::Builder::new() + .num_threads(num_threads) + .thread_name("fuse_server".to_string()) + .build(); + + for _ in 0..num_threads { + // for each thread requested, create and start a FuseServer accepting requests. + let mut server = FuseServer { + server: server.clone(), + channel: session + .new_channel() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + }; + + threads.execute(move || { + let _ = server.start(); + }); + } + + Ok(FuseDaemon { + session: Arc::new(Mutex::new(session)), + threads: Arc::new(threads), + }) + } + + /// Waits for all threads to finish. + #[instrument(skip_all)] + pub fn wait(&self) { + self.threads.join() + } + + /// Send the unmount command, and waits for all threads to finish. + #[instrument(skip_all, err)] + pub fn unmount(&self) -> Result<(), io::Error> { + // Send the unmount command. + self.session + .lock() + .umount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + self.wait(); + Ok(()) + } +} + +impl Drop for FuseDaemon { + fn drop(&mut self) { + if let Err(error) = self.unmount() { + error!(?error, "failed to unmont fuse filesystem") + } + } +} |