about summary refs log tree commit diff
path: root/tvix/castore/src/fs/fuse/mod.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-05-26T14·21+0200
committerclbot <clbot@tvl.fyi>2024-05-26T19·46+0000
commit9586e5c30da23dd25d404d075a64161689f8cc02 (patch)
tree97410fd75187730bb9eecafbbbb2b5b29732afaa /tvix/castore/src/fs/fuse/mod.rs
parent61cf4905fe4ad01b87c0251d7a90426b7b0ea3bc (diff)
refactor(tvix/castore): move src/fs/test into fuse mod r/8172
These tests only interact with the FUSE layer, and
import super::fuse to do its work.

However, this only works if the `fuse` feature is enabled, which we
don't do if we enable the `virtiofs` feature only, causing the tests
to fail:

```
❯ cargo test --no-default-features --features virtiofs
   Compiling tvix-castore v0.1.0 (/home/flokli/dev/nixos/code.tvl.fyi-submit2/tvix/castore)
error[E0432]: unresolved import `super::fuse`
  --> castore/src/fs/tests.rs:14:13
   |
14 | use super::{fuse::FuseDaemon, TvixStoreFs};
   |             ^^^^ could not find `fuse` in `super`
```

We move src/fs/tests.rs to src/fs/fuse/tests.rs
(and src/fs/fuse.rs to src/fs/fuse/mod.rs) to better structure this,
which will automatically cause both tests and code to only be built if
we have the `fuse` feature enabled.

Change-Id: I8fbbad3e4457e326bdfd171aa5c43d25d3187b5b
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11715
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix/castore/src/fs/fuse/mod.rs')
-rw-r--r--tvix/castore/src/fs/fuse/mod.rs123
1 files changed, 123 insertions, 0 deletions
diff --git a/tvix/castore/src/fs/fuse/mod.rs b/tvix/castore/src/fs/fuse/mod.rs
new file mode 100644
index 000000000000..94b73d422a14
--- /dev/null
+++ b/tvix/castore/src/fs/fuse/mod.rs
@@ -0,0 +1,123 @@
+use std::{io, path::Path, sync::Arc, thread};
+
+use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
+use tracing::{error, instrument};
+
+#[cfg(test)]
+mod tests;
+
+struct FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
+    channel: fuse_backend_rs::transport::FuseChannel,
+}
+
+#[cfg(target_os = "macos")]
+const BADFD: libc::c_int = libc::EBADF;
+#[cfg(target_os = "linux")]
+const BADFD: libc::c_int = libc::EBADFD;
+
+impl<FS> FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    fn start(&mut self) -> io::Result<()> {
+        while let Some((reader, writer)) = self
+            .channel
+            .get_request()
+            .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
+        {
+            if let Err(e) = self
+                .server
+                .handle_message(reader, writer.into(), None, None)
+            {
+                match e {
+                    // This indicates the session has been shut down.
+                    fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => {
+                        break;
+                    }
+                    error => {
+                        error!(?error, "failed to handle fuse request");
+                        continue;
+                    }
+                }
+            }
+        }
+        Ok(())
+    }
+}
+
+pub struct FuseDaemon {
+    session: FuseSession,
+    threads: Vec<thread::JoinHandle<()>>,
+}
+
+impl FuseDaemon {
+    #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)]
+    pub fn new<FS, P>(
+        fs: FS,
+        mountpoint: P,
+        threads: usize,
+        allow_other: bool,
+    ) -> Result<Self, io::Error>
+    where
+        FS: FileSystem + Sync + Send + 'static,
+        P: AsRef<Path> + std::fmt::Debug,
+    {
+        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
+
+        let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        #[cfg(target_os = "linux")]
+        session.set_allow_other(allow_other);
+        session
+            .mount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+        let mut join_handles = Vec::with_capacity(threads);
+        for _ in 0..threads {
+            let mut server = FuseServer {
+                server: server.clone(),
+                channel: session
+                    .new_channel()
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
+            };
+            let join_handle = thread::Builder::new()
+                .name("fuse_server".to_string())
+                .spawn(move || {
+                    let _ = server.start();
+                })?;
+            join_handles.push(join_handle);
+        }
+
+        Ok(FuseDaemon {
+            session,
+            threads: join_handles,
+        })
+    }
+
+    #[instrument(skip_all, err)]
+    pub fn unmount(&mut self) -> Result<(), io::Error> {
+        self.session
+            .umount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        for thread in self.threads.drain(..) {
+            thread.join().map_err(|_| {
+                io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
+            })?;
+        }
+
+        Ok(())
+    }
+}
+
+impl Drop for FuseDaemon {
+    fn drop(&mut self) {
+        if let Err(error) = self.unmount() {
+            error!(?error, "failed to unmont fuse filesystem")
+        }
+    }
+}