about summary refs log tree commit diff
path: root/tvix/castore/src/fs/fuse/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/fs/fuse/mod.rs')
-rw-r--r--tvix/castore/src/fs/fuse/mod.rs137
1 files changed, 137 insertions, 0 deletions
diff --git a/tvix/castore/src/fs/fuse/mod.rs b/tvix/castore/src/fs/fuse/mod.rs
new file mode 100644
index 000000000000..64ef29ed2aa1
--- /dev/null
+++ b/tvix/castore/src/fs/fuse/mod.rs
@@ -0,0 +1,137 @@
+use std::{io, path::Path, sync::Arc};
+
+use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
+use parking_lot::Mutex;
+use threadpool::ThreadPool;
+use tracing::{error, instrument};
+
+#[cfg(test)]
+mod tests;
+
+struct FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
+    channel: fuse_backend_rs::transport::FuseChannel,
+}
+
+#[cfg(target_os = "macos")]
+const BADFD: libc::c_int = libc::EBADF;
+#[cfg(target_os = "linux")]
+const BADFD: libc::c_int = libc::EBADFD;
+
+impl<FS> FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    fn start(&mut self) -> io::Result<()> {
+        while let Some((reader, writer)) = self
+            .channel
+            .get_request()
+            .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
+        {
+            if let Err(e) = self
+                .server
+                .handle_message(reader, writer.into(), None, None)
+            {
+                match e {
+                    // This indicates the session has been shut down.
+                    fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => {
+                        break;
+                    }
+                    error => {
+                        error!(?error, "failed to handle fuse request");
+                        continue;
+                    }
+                }
+            }
+        }
+        Ok(())
+    }
+}
+
+/// Starts a [Filesystem] with the specified number of threads, and provides
+/// functions to unmount, and wait for it to have completed.
+#[derive(Clone)]
+pub struct FuseDaemon {
+    session: Arc<Mutex<FuseSession>>,
+    threads: Arc<ThreadPool>,
+}
+
+impl FuseDaemon {
+    #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)]
+    pub fn new<FS, P>(
+        fs: FS,
+        mountpoint: P,
+        num_threads: usize,
+        allow_other: bool,
+    ) -> Result<Self, io::Error>
+    where
+        FS: FileSystem + Sync + Send + 'static,
+        P: AsRef<Path> + std::fmt::Debug,
+    {
+        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
+
+        let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        #[cfg(target_os = "linux")]
+        session.set_allow_other(allow_other);
+        session
+            .mount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        // construct a thread pool
+        let threads = threadpool::Builder::new()
+            .num_threads(num_threads)
+            .thread_name("fuse_server".to_string())
+            .build();
+
+        for _ in 0..num_threads {
+            // for each thread requested, create and start a FuseServer accepting requests.
+            let mut server = FuseServer {
+                server: server.clone(),
+                channel: session
+                    .new_channel()
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
+            };
+
+            threads.execute(move || {
+                let _ = server.start();
+            });
+        }
+
+        Ok(FuseDaemon {
+            session: Arc::new(Mutex::new(session)),
+            threads: Arc::new(threads),
+        })
+    }
+
+    /// Waits for all threads to finish.
+    #[instrument(skip_all)]
+    pub fn wait(&self) {
+        self.threads.join()
+    }
+
+    /// Send the unmount command, and waits for all threads to finish.
+    #[instrument(skip_all, err)]
+    pub fn unmount(&self) -> Result<(), io::Error> {
+        // Send the unmount command.
+        self.session
+            .lock()
+            .umount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        self.wait();
+        Ok(())
+    }
+}
+
+impl Drop for FuseDaemon {
+    fn drop(&mut self) {
+        if let Err(error) = self.unmount() {
+            error!(?error, "failed to unmont fuse filesystem")
+        }
+    }
+}