diff options
author | Connor Brewster <cbrewster@hey.com> | 2023-09-16T18·54-0500 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-09-20T14·27+0000 |
commit | 7e737fde34260daa477794d63b0b3344b4a1d81b (patch) | |
tree | 51f7d4b3c7f18aa78e584b06840fcdd645474895 /tvix/store/src/fs/fuse.rs | |
parent | 6b7c936bc50934b45df132f659292e2c45256dea (diff) |
refactor(tvix/store/fs): Separate FUSE and filesystem code r/6622
In prepration for adding virtiofs support, I thought it would make sense to split out the filesystem implementation from FUSE itself. The `fs` module holds the tvix-store filesystem implemetation and the `fuse` module holds the code to spawn a FUSE daemon backed by multiple threads. Change-Id: I8c58447b8c3aa016a613068f8e7ec166554e237c Reviewed-on: https://cl.tvl.fyi/c/depot/+/9343 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Autosubmit: Connor Brewster <cbrewster@hey.com>
Diffstat (limited to 'tvix/store/src/fs/fuse.rs')
-rw-r--r-- | tvix/store/src/fs/fuse.rs | 113 |
1 files changed, 113 insertions, 0 deletions
diff --git a/tvix/store/src/fs/fuse.rs b/tvix/store/src/fs/fuse.rs new file mode 100644 index 000000000000..8535c7858450 --- /dev/null +++ b/tvix/store/src/fs/fuse.rs @@ -0,0 +1,113 @@ +use std::{io, path::Path, sync::Arc, thread}; + +use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; +use tracing::error; + +struct FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>, + channel: fuse_backend_rs::transport::FuseChannel, +} + +impl<FS> FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + fn start(&mut self) -> io::Result<()> { + loop { + if let Some((reader, writer)) = self + .channel + .get_request() + .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? + { + if let Err(e) = self + .server + .handle_message(reader, writer.into(), None, None) + { + match e { + // This indicates the session has been shut down. + fuse_backend_rs::Error::EncodeMessage(e) + if e.raw_os_error() == Some(libc::EBADFD) => + { + break; + } + error => { + error!(?error, "failed to handle fuse request"); + continue; + } + } + } + } else { + break; + } + } + Ok(()) + } +} + +pub struct FuseDaemon { + session: FuseSession, + threads: Vec<thread::JoinHandle<()>>, +} + +impl FuseDaemon { + pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error> + where + FS: FileSystem + Sync + Send + 'static, + P: AsRef<Path>, + { + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + session.set_allow_other(false); + session + .mount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + let mut join_handles = Vec::with_capacity(threads); + for _ in 0..threads { + let mut server = FuseServer { + server: server.clone(), + channel: session + .new_channel() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + }; + let join_handle = thread::Builder::new() + .name("fuse_server".to_string()) + .spawn(move || { + let _ = server.start(); + })?; + join_handles.push(join_handle); + } + + Ok(FuseDaemon { + session, + threads: join_handles, + }) + } + + pub fn unmount(&mut self) -> Result<(), io::Error> { + self.session + .umount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + for thread in self.threads.drain(..) { + thread.join().map_err(|_| { + io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") + })?; + } + + Ok(()) + } +} + +impl Drop for FuseDaemon { + fn drop(&mut self) { + if let Err(error) = self.unmount() { + error!(?error, "failed to unmont fuse filesystem") + } + } +} |