use std::{io, path::Path, sync::Arc}; use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; use parking_lot::Mutex; use threadpool::ThreadPool; use tracing::{error, instrument}; #[cfg(test)] mod tests; struct FuseServer<FS> where FS: FileSystem + Sync + Send, { server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>, channel: fuse_backend_rs::transport::FuseChannel, } #[cfg(target_os = "macos")] const BADFD: libc::c_int = libc::EBADF; #[cfg(target_os = "linux")] const BADFD: libc::c_int = libc::EBADFD; impl<FS> FuseServer<FS> where FS: FileSystem + Sync + Send, { fn start(&mut self) -> io::Result<()> { while let Some((reader, writer)) = self .channel .get_request() .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? { if let Err(e) = self .server .handle_message(reader, writer.into(), None, None) { match e { // This indicates the session has been shut down. fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => { break; } error => { error!(?error, "failed to handle fuse request"); continue; } } } } Ok(()) } } /// Starts a [Filesystem] with the specified number of threads, and provides /// functions to unmount, and wait for it to have completed. #[derive(Clone)] pub struct FuseDaemon { session: Arc<Mutex<FuseSession>>, threads: Arc<ThreadPool>, } impl FuseDaemon { #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)] pub fn new<FS, P>( fs: FS, mountpoint: P, num_threads: usize, allow_other: bool, ) -> Result<Self, io::Error> where FS: FileSystem + Sync + Send + 'static, P: AsRef<Path> + std::fmt::Debug, { let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; #[cfg(target_os = "linux")] session.set_allow_other(allow_other); session .mount() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; // construct a thread pool let threads = threadpool::Builder::new() .num_threads(num_threads) .thread_name("fuse_server".to_string()) .build(); for _ in 0..num_threads { // for each thread requested, create and start a FuseServer accepting requests. let mut server = FuseServer { server: server.clone(), channel: session .new_channel() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, }; threads.execute(move || { let _ = server.start(); }); } Ok(FuseDaemon { session: Arc::new(Mutex::new(session)), threads: Arc::new(threads), }) } /// Waits for all threads to finish. #[instrument(skip_all)] pub fn wait(&self) { self.threads.join() } /// Send the unmount command, and waits for all threads to finish. #[instrument(skip_all, err)] pub fn unmount(&self) -> Result<(), io::Error> { // Send the unmount command. self.session .lock() .umount() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; self.wait(); Ok(()) } } impl Drop for FuseDaemon { fn drop(&mut self) { if let Err(error) = self.unmount() { error!(?error, "failed to unmont fuse filesystem") } } }