about summary refs log tree commit diff
path: root/tvix/store/src/fuse/mod.rs
mod file_attr;
mod inode_tracker;
mod inodes;

#[cfg(test)]
mod tests;

use crate::{
    blobservice::{BlobReader, BlobService},
    directoryservice::DirectoryService,
    fuse::{
        file_attr::gen_file_attr,
        inodes::{DirectoryInodeData, InodeData},
    },
    pathinfoservice::PathInfoService,
    proto::{node::Node, NamedNode},
    B3Digest, Error,
};
use fuser::{FileAttr, ReplyAttr, Request};
use nix_compat::store_path::StorePath;
use std::io;
use std::os::unix::ffi::OsStrExt;
use std::str::FromStr;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
use tracing::{debug, info_span, warn};

use self::inode_tracker::InodeTracker;

/// This implements a read-only FUSE filesystem for a tvix-store
/// with the passed [BlobService], [DirectoryService] and [PathInfoService].
///
/// We don't allow listing on the root mountpoint (inode 0).
/// In the future, this might be made configurable once a listing method is
/// added to [self.path_info_service], and then show all store paths in that
/// store.
///
/// Linux uses inodes in filesystems. When implementing FUSE, most calls are
/// *for* a given inode.
///
/// This means, we need to have a stable mapping of inode numbers to the
/// corresponding store nodes.
///
/// We internally delegate all inode allocation and state keeping to the
/// inode tracker, and store the currently "explored" store paths together with
/// root inode of the root.
///
/// There's some places where inodes are allocated / data inserted into
/// the inode tracker, if not allocated before already:
///  - Processing a `lookup` request, either in the mount root, or somewhere
///    deeper
///  - Processing a `readdir` request
///
///  Things pointing to the same contents get the same inodes, irrespective of
///  their own location.
///  This means:
///  - Symlinks with the same target will get the same inode.
///  - Regular/executable files with the same contents will get the same inode
///  - Directories with the same contents will get the same inode.
///
/// Due to the above being valid across the whole store, and considering the
/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed
/// allocation", aka reserve Directory.size inodes for each PathInfo.
pub struct FUSE {
    blob_service: Arc<dyn BlobService>,
    directory_service: Arc<dyn DirectoryService>,
    path_info_service: Arc<dyn PathInfoService>,

    /// Whether to (try) listing elements in the root.
    list_root: bool,

    /// This maps a given StorePath to the inode we allocated for the root inode.
    store_paths: HashMap<StorePath, u64>,

    /// This keeps track of inodes and data alongside them.
    inode_tracker: InodeTracker,

    /// This holds all open file handles
    file_handles: HashMap<u64, Box<dyn BlobReader>>,

    next_file_handle: u64,

    tokio_handle: tokio::runtime::Handle,
}

impl FUSE {
    pub fn new(
        blob_service: Arc<dyn BlobService>,
        directory_service: Arc<dyn DirectoryService>,
        path_info_service: Arc<dyn PathInfoService>,
        list_root: bool,
    ) -> Self {
        Self {
            blob_service,
            directory_service,
            path_info_service,

            list_root,

            store_paths: HashMap::default(),
            inode_tracker: Default::default(),

            file_handles: Default::default(),
            next_file_handle: 1,
            tokio_handle: tokio::runtime::Handle::current(),
        }
    }

    /// This will turn a lookup request for [std::ffi::OsStr] in the root to
    /// a ino and [InodeData].
    /// It will peek in [self.store_paths], and then either look it up from
    /// [self.inode_tracker],
    /// or otherwise fetch from [self.path_info_service], and then insert into
    /// [self.inode_tracker].
    fn name_in_root_to_ino_and_data(
        &mut self,
        name: &std::ffi::OsStr,
    ) -> Result<Option<(u64, Arc<InodeData>)>, Error> {
        // parse the name into a [StorePath].
        let store_path = if let Some(name) = name.to_str() {
            match StorePath::from_str(name) {
                Ok(store_path) => store_path,
                Err(e) => {
                    debug!(e=?e, "unable to parse as store path");
                    // This is not an error, but a "ENOENT", as someone can stat
                    // a file inside the root that's no valid store path
                    return Ok(None);
                }
            }
        } else {
            debug!("{name:?} is no string");
            // same here.
            return Ok(None);
        };

        if let Some(ino) = self.store_paths.get(&store_path) {
            // If we already have that store path, lookup the inode from
            // self.store_paths and then get the data from [self.inode_tracker],
            // which in the case of a [InodeData::Directory] will be fully
            // populated.
            Ok(Some((
                *ino,
                self.inode_tracker.get(*ino).expect("must exist"),
            )))
        } else {
            // If we don't have it, look it up in PathInfoService.
            match self.path_info_service.get(store_path.digest)? {
                // the pathinfo doesn't exist, so the file doesn't exist.
                None => Ok(None),
                Some(path_info) => {
                    // The pathinfo does exist, so there must be a root node
                    let root_node = path_info.node.unwrap().node.unwrap();

                    // The name must match what's passed in the lookup, otherwise we return nothing.
                    if root_node.get_name() != store_path.to_string().as_bytes() {
                        return Ok(None);
                    }

                    // insert the (sparse) inode data and register in
                    // self.store_paths.
                    // FUTUREWORK: change put to return the data after
                    // inserting, so we don't need to lookup a second
                    // time?
                    let ino = self.inode_tracker.put((&root_node).into());
                    self.store_paths.insert(store_path, ino);

                    Ok(Some((ino, self.inode_tracker.get(ino).unwrap())))
                }
            }
        }
    }

    /// This will lookup a directory by digest, and will turn it into a
    /// [InodeData::Directory(DirectoryInodeData::Populated(..))].
    /// This is both used to initially insert the root node of a store path,
    /// as well as when looking up an intermediate DirectoryNode.
    fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> {
        match self.directory_service.get(directory_digest) {
            Err(e) => {
                warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory");
                Err(e)
            }
            // If the Directory can't be found, this is a hole, bail out.
            Ok(None) => {
                tracing::error!(directory.digest=%directory_digest, "directory not found in directory service");
                Err(Error::StorageError(format!(
                    "directory {} not found",
                    directory_digest
                )))
            }
            Ok(Some(directory)) => Ok(directory.into()),
        }
    }
}

impl fuser::Filesystem for FUSE {
    #[tracing::instrument(skip_all, fields(rq.inode = ino))]
    fn getattr(&mut self, _req: &Request, ino: u64, reply: ReplyAttr) {
        debug!("getattr");

        if ino == fuser::FUSE_ROOT_ID {
            reply.attr(&Duration::MAX, &file_attr::ROOT_FILE_ATTR);
            return;
        }

        match self.inode_tracker.get(ino) {
            None => reply.error(libc::ENOENT),
            Some(node) => {
                debug!(node = ?node, "found node");
                reply.attr(&Duration::MAX, &file_attr::gen_file_attr(&node, ino));
            }
        }
    }

    #[tracing::instrument(skip_all, fields(rq.parent_inode = parent_ino, rq.name = ?name))]
    fn lookup(
        &mut self,
        _req: &Request,
        parent_ino: u64,
        name: &std::ffi::OsStr,
        reply: fuser::ReplyEntry,
    ) {
        debug!("lookup");

        // This goes from a parent inode to a node.
        // - If the parent is [fuser::FUSE_ROOT_ID], we need to check
        //   [self.store_paths] (fetching from PathInfoService if needed)
        // - Otherwise, lookup the parent in [self.inode_tracker] (which must be
        //   a [InodeData::Directory]), and find the child with that name.
        if parent_ino == fuser::FUSE_ROOT_ID {
            match self.name_in_root_to_ino_and_data(name) {
                Err(e) => {
                    warn!("{}", e);
                    reply.error(libc::EIO);
                }
                Ok(None) => {
                    reply.error(libc::ENOENT);
                }
                Ok(Some((ino, inode_data))) => {
                    debug!(inode_data=?&inode_data, ino=ino, "Some");
                    reply_with_entry(reply, &gen_file_attr(&inode_data, ino));
                }
            }
        } else {
            // This is the "lookup for "a" inside inode 42.
            // We already know that inode 42 must be a directory.
            // It might not be populated yet, so if it isn't, we do (by
            // fetching from [self.directory_service]), and save the result in
            // [self.inode_tracker].
            // Now it for sure is populated, so we search for that name in the
            // list of children and return the FileAttrs.

            let parent_data = self.inode_tracker.get(parent_ino).unwrap();
            let parent_data = match *parent_data {
                InodeData::Regular(..) | InodeData::Symlink(_) => {
                    // if the parent inode was not a directory, this doesn't make sense
                    reply.error(libc::ENOTDIR);
                    return;
                }
                InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
                    match self.fetch_directory_inode_data(parent_digest) {
                        Ok(new_data) => {
                            // update data in [self.inode_tracker] with populated variant.
                            // FUTUREWORK: change put to return the data after
                            // inserting, so we don't need to lookup a second
                            // time?
                            let ino = self.inode_tracker.put(new_data);
                            self.inode_tracker.get(ino).unwrap()
                        }
                        Err(_e) => {
                            reply.error(libc::EIO);
                            return;
                        }
                    }
                }
                InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data,
            };

            // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))].
            let (parent_digest, children) = if let InodeData::Directory(
                DirectoryInodeData::Populated(ref parent_digest, ref children),
            ) = *parent_data
            {
                (parent_digest, children)
            } else {
                panic!("unexpected type")
            };
            let span = info_span!("lookup", directory.digest = %parent_digest);
            let _enter = span.enter();

            // in the children, find the one with the desired name.
            if let Some((child_ino, _)) =
                children.iter().find(|e| e.1.get_name() == name.as_bytes())
            {
                // lookup the child [InodeData] in [self.inode_tracker].
                // We know the inodes for children have already been allocated.
                let child_inode_data = self.inode_tracker.get(*child_ino).unwrap();

                // Reply with the file attributes for the child.
                // For child directories, we still have all data we need to reply.
                reply_with_entry(reply, &gen_file_attr(&child_inode_data, *child_ino));
            } else {
                // Child not found, return ENOENT.
                reply.error(libc::ENOENT);
            }
        }
    }

    // TODO: readdirplus?

    #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset))]
    fn readdir(
        &mut self,
        _req: &Request<'_>,
        ino: u64,
        _fh: u64,
        offset: i64,
        mut reply: fuser::ReplyDirectory,
    ) {
        debug!("readdir");

        if ino == fuser::FUSE_ROOT_ID {
            if !self.list_root {
                reply.error(libc::EPERM); // same error code as ipfs/kubo
                return;
            } else {
                for (i, path_info) in self
                    .path_info_service
                    .list()
                    .skip(offset as usize)
                    .enumerate()
                {
                    let path_info = match path_info {
                        Err(e) => {
                            warn!("failed to retrieve pathinfo: {}", e);
                            reply.error(libc::EPERM);
                            return;
                        }
                        Ok(path_info) => path_info,
                    };

                    // We know the root node exists and the store_path can be parsed because clients MUST validate.
                    let root_node = path_info.node.unwrap().node.unwrap();
                    let store_path = StorePath::from_bytes(root_node.get_name()).unwrap();

                    let ino = match self.store_paths.get(&store_path) {
                        Some(ino) => *ino,
                        None => {
                            // insert the (sparse) inode data and register in
                            // self.store_paths.
                            let ino = self.inode_tracker.put((&root_node).into());
                            self.store_paths.insert(store_path.clone(), ino);
                            ino
                        }
                    };

                    let ty = match root_node {
                        Node::Directory(_) => fuser::FileType::Directory,
                        Node::File(_) => fuser::FileType::RegularFile,
                        Node::Symlink(_) => fuser::FileType::Symlink,
                    };

                    let full =
                        reply.add(ino, offset + i as i64 + 1_i64, ty, store_path.to_string());
                    if full {
                        break;
                    }
                }
                reply.ok();
                return;
            }
        }

        // lookup the inode data.
        let dir_inode_data = self.inode_tracker.get(ino).unwrap();
        let dir_inode_data = match *dir_inode_data {
            InodeData::Regular(..) | InodeData::Symlink(..) => {
                warn!("Not a directory");
                reply.error(libc::ENOTDIR);
                return;
            }
            InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => {
                match self.fetch_directory_inode_data(directory_digest) {
                    Ok(new_data) => {
                        // update data in [self.inode_tracker] with populated variant.
                        // FUTUREWORK: change put to return the data after
                        // inserting, so we don't need to lookup a second
                        // time?
                        let ino = self.inode_tracker.put(new_data);
                        self.inode_tracker.get(ino).unwrap()
                    }
                    Err(_e) => {
                        reply.error(libc::EIO);
                        return;
                    }
                }
            }
            InodeData::Directory(DirectoryInodeData::Populated(..)) => dir_inode_data,
        };

        // now parent_data can only be InodeData::Directory(DirectoryInodeData::Populated(..))
        if let InodeData::Directory(DirectoryInodeData::Populated(ref _digest, ref children)) =
            *dir_inode_data
        {
            for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() {
                // the second parameter will become the "offset" parameter on the next call.
                let full = reply.add(
                    *ino,
                    offset + i as i64 + 1_i64,
                    match child_node {
                        Node::Directory(_) => fuser::FileType::Directory,
                        Node::File(_) => fuser::FileType::RegularFile,
                        Node::Symlink(_) => fuser::FileType::Symlink,
                    },
                    std::ffi::OsStr::from_bytes(child_node.get_name()),
                );
                if full {
                    break;
                }
            }
            reply.ok();
        } else {
            panic!("unexpected type")
        }
    }

    #[tracing::instrument(skip_all, fields(rq.inode = ino))]
    fn open(&mut self, _req: &Request<'_>, ino: u64, _flags: i32, reply: fuser::ReplyOpen) {
        // get a new file handle
        let fh = self.next_file_handle;

        if ino == fuser::FUSE_ROOT_ID {
            reply.error(libc::ENOSYS);
            return;
        }

        // lookup the inode
        match *self.inode_tracker.get(ino).unwrap() {
            // read is invalid on non-files.
            InodeData::Directory(..) | InodeData::Symlink(_) => {
                warn!("is directory");
                reply.error(libc::EISDIR);
            }
            InodeData::Regular(ref blob_digest, _blob_size, _) => {
                let span = info_span!("read", blob.digest = %blob_digest);
                let _enter = span.enter();

                let blob_service = self.blob_service.clone();
                let blob_digest = blob_digest.clone();

                let task = self
                    .tokio_handle
                    .spawn(async move { blob_service.open_read(&blob_digest).await });

                let blob_reader = self.tokio_handle.block_on(task).unwrap();

                match blob_reader {
                    Ok(None) => {
                        warn!("blob not found");
                        reply.error(libc::EIO);
                    }
                    Err(e) => {
                        warn!(e=?e, "error opening blob");
                        reply.error(libc::EIO);
                    }
                    Ok(Some(blob_reader)) => {
                        debug!("add file handle {}", fh);
                        self.file_handles.insert(fh, blob_reader);
                        reply.opened(fh, 0);

                        // TODO: this will overflow after 2**64 operations,
                        // which is fine for now.
                        // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
                        // for the discussion on alternatives.
                        self.next_file_handle += 1;
                    }
                }
            }
        }
    }

    #[tracing::instrument(skip_all, fields(rq.inode = ino, fh = fh))]
    fn release(
        &mut self,
        _req: &Request<'_>,
        ino: u64,
        fh: u64,
        _flags: i32,
        _lock_owner: Option<u64>,
        _flush: bool,
        reply: fuser::ReplyEmpty,
    ) {
        // remove and get ownership on the blob reader
        match self.file_handles.remove(&fh) {
            // drop it, which will close it.
            Some(blob_reader) => drop(blob_reader),
            None => {
                // These might already be dropped if a read error occured.
                debug!("file_handle {} not found", fh);
            }
        }

        reply.ok();
    }

    #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset, rq.size = size))]
    fn read(
        &mut self,
        _req: &Request<'_>,
        ino: u64,
        fh: u64,
        offset: i64,
        size: u32,
        _flags: i32,
        _lock_owner: Option<u64>,
        reply: fuser::ReplyData,
    ) {
        debug!("read");

        // We need to take out the blob reader from self.file_handles, so we can
        // interact with it in the separate task.
        // On success, we pass it back out of the task, so we can put it back in self.file_handles.
        let mut blob_reader = match self.file_handles.remove(&fh) {
            Some(blob_reader) => blob_reader,
            None => {
                warn!("file handle {} unknown", fh);
                reply.error(libc::EIO);
                return;
            }
        };

        let task = self.tokio_handle.spawn(async move {
            // seek to the offset specified, which is relative to the start of the file.
            let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await;

            match resp {
                Ok(pos) => {
                    debug_assert_eq!(offset as u64, pos);
                }
                Err(e) => {
                    warn!("failed to seek to offset {}: {}", offset, e);
                    return Err(libc::EIO);
                }
            }

            // As written in the fuser docs, read should send exactly the number
            // of bytes requested except on EOF or error.

            let mut buf: Vec<u8> = Vec::with_capacity(size as usize);

            while (buf.len() as u64) < size as u64 {
                match blob_reader.fill_buf().await {
                    Ok(int_buf) => {
                        // copy things from the internal buffer into buf to fill it till up until size

                        // an empty buffer signals we reached EOF.
                        if int_buf.is_empty() {
                            break;
                        }

                        // calculate how many bytes we can read from int_buf.
                        // It's either all of int_buf, or the number of bytes missing in buf to reach size.
                        let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len());

                        // copy these bytes into our buffer
                        buf.extend_from_slice(&int_buf[..len_to_copy]);
                        // and consume them in the buffered reader.
                        blob_reader.consume(len_to_copy);
                    }
                    Err(e) => return Err(e.raw_os_error().unwrap()),
                }
            }
            Ok((buf, blob_reader))
        });

        let resp = self.tokio_handle.block_on(task).unwrap();

        match resp {
            Err(e) => reply.error(e),
            Ok((buf, blob_reader)) => {
                reply.data(&buf);
                self.file_handles.insert(fh, blob_reader);
            }
        }
    }

    #[tracing::instrument(skip_all, fields(rq.inode = ino))]
    fn readlink(&mut self, _req: &Request<'_>, ino: u64, reply: fuser::ReplyData) {
        if ino == fuser::FUSE_ROOT_ID {
            reply.error(libc::ENOSYS);
            return;
        }

        // lookup the inode
        match *self.inode_tracker.get(ino).unwrap() {
            InodeData::Directory(..) | InodeData::Regular(..) => {
                reply.error(libc::EINVAL);
            }
            InodeData::Symlink(ref target) => reply.data(target),
        }
    }
}

fn reply_with_entry(reply: fuser::ReplyEntry, file_attr: &FileAttr) {
    reply.entry(&Duration::MAX, file_attr, 1 /* TODO: generation */);
}