about summary refs log tree commit diff
path: root/tvix/castore/src/fs/fuse/mod.rs
use std::{io, path::Path, sync::Arc};

use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
use parking_lot::Mutex;
use threadpool::ThreadPool;
use tracing::{error, instrument};

#[cfg(test)]
mod tests;

struct FuseServer<FS>
where
    FS: FileSystem + Sync + Send,
{
    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
    channel: fuse_backend_rs::transport::FuseChannel,
}

#[cfg(target_os = "macos")]
const BADFD: libc::c_int = libc::EBADF;
#[cfg(target_os = "linux")]
const BADFD: libc::c_int = libc::EBADFD;

impl<FS> FuseServer<FS>
where
    FS: FileSystem + Sync + Send,
{
    fn start(&mut self) -> io::Result<()> {
        while let Some((reader, writer)) = self
            .channel
            .get_request()
            .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
        {
            if let Err(e) = self
                .server
                .handle_message(reader, writer.into(), None, None)
            {
                match e {
                    // This indicates the session has been shut down.
                    fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => {
                        break;
                    }
                    error => {
                        error!(?error, "failed to handle fuse request");
                        continue;
                    }
                }
            }
        }
        Ok(())
    }
}

/// Starts a [Filesystem] with the specified number of threads, and provides
/// functions to unmount, and wait for it to have completed.
#[derive(Clone)]
pub struct FuseDaemon {
    session: Arc<Mutex<FuseSession>>,
    threads: Arc<ThreadPool>,
}

impl FuseDaemon {
    #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)]
    pub fn new<FS, P>(
        fs: FS,
        mountpoint: P,
        num_threads: usize,
        allow_other: bool,
    ) -> Result<Self, io::Error>
    where
        FS: FileSystem + Sync + Send + 'static,
        P: AsRef<Path> + std::fmt::Debug,
    {
        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));

        let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;

        #[cfg(target_os = "linux")]
        session.set_allow_other(allow_other);
        session
            .mount()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;

        // construct a thread pool
        let threads = threadpool::Builder::new()
            .num_threads(num_threads)
            .thread_name("fuse_server".to_string())
            .build();

        for _ in 0..num_threads {
            // for each thread requested, create and start a FuseServer accepting requests.
            let mut server = FuseServer {
                server: server.clone(),
                channel: session
                    .new_channel()
                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
            };

            threads.execute(move || {
                let _ = server.start();
            });
        }

        Ok(FuseDaemon {
            session: Arc::new(Mutex::new(session)),
            threads: Arc::new(threads),
        })
    }

    /// Waits for all threads to finish.
    #[instrument(skip_all)]
    pub fn wait(&self) {
        self.threads.join()
    }

    /// Send the unmount command, and waits for all threads to finish.
    #[instrument(skip_all, err)]
    pub fn unmount(&self) -> Result<(), io::Error> {
        // Send the unmount command.
        self.session
            .lock()
            .umount()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;

        self.wait();
        Ok(())
    }
}

impl Drop for FuseDaemon {
    fn drop(&mut self) {
        if let Err(error) = self.unmount() {
            error!(?error, "failed to unmont fuse filesystem")
        }
    }
}