about summary refs log tree commit diff
path: root/tvix/store/src/fs/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/fs/mod.rs')
-rw-r--r--tvix/store/src/fs/mod.rs629
1 files changed, 0 insertions, 629 deletions
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs
deleted file mode 100644
index c11bd0a44c7e..000000000000
--- a/tvix/store/src/fs/mod.rs
+++ /dev/null
@@ -1,629 +0,0 @@
-mod file_attr;
-mod inode_tracker;
-mod inodes;
-mod root_nodes;
-
-#[cfg(feature = "fuse")]
-pub mod fuse;
-
-#[cfg(feature = "virtiofs")]
-pub mod virtiofs;
-
-#[cfg(test)]
-mod tests;
-
-use fuse_backend_rs::abi::fuse_abi::stat64;
-use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID};
-use futures::StreamExt;
-use parking_lot::RwLock;
-use std::ops::Deref;
-use std::{
-    collections::HashMap,
-    io,
-    sync::atomic::AtomicU64,
-    sync::{atomic::Ordering, Arc},
-    time::Duration,
-};
-use tokio::{
-    io::{AsyncReadExt, AsyncSeekExt},
-    sync::mpsc,
-};
-use tracing::{debug, info_span, instrument, warn};
-use tvix_castore::proto as castorepb;
-use tvix_castore::{
-    blobservice::{BlobReader, BlobService},
-    directoryservice::DirectoryService,
-    proto::{node::Node, NamedNode},
-    B3Digest,
-};
-
-use self::root_nodes::RootNodes;
-use self::{
-    file_attr::{gen_file_attr, ROOT_FILE_ATTR},
-    inode_tracker::InodeTracker,
-    inodes::{DirectoryInodeData, InodeData},
-};
-
-/// This implements a read-only FUSE filesystem for a tvix-store
-/// with the passed [BlobService], [DirectoryService] and [RootNodes].
-///
-/// Linux uses inodes in filesystems. When implementing FUSE, most calls are
-/// *for* a given inode.
-///
-/// This means, we need to have a stable mapping of inode numbers to the
-/// corresponding store nodes.
-///
-/// We internally delegate all inode allocation and state keeping to the
-/// inode tracker.
-/// We store a mapping from currently "explored" names in the root to their
-/// inode.
-///
-/// There's some places where inodes are allocated / data inserted into
-/// the inode tracker, if not allocated before already:
-///  - Processing a `lookup` request, either in the mount root, or somewhere
-///    deeper.
-///  - Processing a `readdir` request
-///
-///  Things pointing to the same contents get the same inodes, irrespective of
-///  their own location.
-///  This means:
-///  - Symlinks with the same target will get the same inode.
-///  - Regular/executable files with the same contents will get the same inode
-///  - Directories with the same contents will get the same inode.
-///
-/// Due to the above being valid across the whole store, and considering the
-/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed
-/// allocation", aka reserve Directory.size inodes for each directory node we
-/// explore.
-pub struct TvixStoreFs<BS, DS, RN> {
-    blob_service: BS,
-    directory_service: DS,
-    root_nodes_provider: RN,
-
-    /// Whether to (try) listing elements in the root.
-    list_root: bool,
-
-    /// This maps a given basename in the root to the inode we allocated for the node.
-    root_nodes: RwLock<HashMap<Vec<u8>, u64>>,
-
-    /// This keeps track of inodes and data alongside them.
-    inode_tracker: RwLock<InodeTracker>,
-
-    /// This holds all open file handles
-    #[allow(clippy::type_complexity)]
-    file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>,
-
-    next_file_handle: AtomicU64,
-
-    tokio_handle: tokio::runtime::Handle,
-}
-
-impl<BS, DS, RN> TvixStoreFs<BS, DS, RN>
-where
-    BS: Deref<Target = dyn BlobService> + Clone + Send,
-    DS: Deref<Target = dyn DirectoryService> + Clone + Send + 'static,
-    RN: RootNodes + Clone + 'static,
-{
-    pub fn new(
-        blob_service: BS,
-        directory_service: DS,
-        root_nodes_provider: RN,
-        list_root: bool,
-    ) -> Self {
-        Self {
-            blob_service,
-            directory_service,
-            root_nodes_provider,
-
-            list_root,
-
-            root_nodes: RwLock::new(HashMap::default()),
-            inode_tracker: RwLock::new(Default::default()),
-
-            file_handles: RwLock::new(Default::default()),
-            next_file_handle: AtomicU64::new(1),
-            tokio_handle: tokio::runtime::Handle::current(),
-        }
-    }
-
-    /// Retrieves the inode for a given root node basename, if present.
-    /// This obtains a read lock on self.root_nodes.
-    fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> {
-        self.root_nodes.read().get(name).cloned()
-    }
-
-    /// For a given inode, look up the given directory behind it (from
-    /// self.inode_tracker), and return its children.
-    /// The inode_tracker MUST know about this inode already, and it MUST point
-    /// to a [InodeData::Directory].
-    /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup
-    /// in self.directory_service is performed, and self.inode_tracker is updated with the
-    /// [DirectoryInodeData::Populated].
-    #[instrument(skip(self), err)]
-    fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> {
-        let data = self.inode_tracker.read().get(ino).unwrap();
-        match *data {
-            // if it's populated already, return children.
-            InodeData::Directory(DirectoryInodeData::Populated(
-                ref parent_digest,
-                ref children,
-            )) => Ok((parent_digest.clone(), children.clone())),
-            // if it's sparse, fetch data using directory_service, populate child nodes
-            // and update it in [self.inode_tracker].
-            InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
-                let directory = self
-                    .tokio_handle
-                    .block_on(self.tokio_handle.spawn({
-                        let directory_service = self.directory_service.clone();
-                        let parent_digest = parent_digest.to_owned();
-                        async move { directory_service.get(&parent_digest).await }
-                    }))
-                    .unwrap()?
-                    .ok_or_else(|| {
-                        warn!(directory.digest=%parent_digest, "directory not found");
-                        // If the Directory can't be found, this is a hole, bail out.
-                        io::Error::from_raw_os_error(libc::EIO)
-                    })?;
-
-                // Turn the retrieved directory into a InodeData::Directory(DirectoryInodeData::Populated(..)),
-                // allocating inodes for the children on the way.
-                let children = {
-                    let mut inode_tracker = self.inode_tracker.write();
-
-                    let children: Vec<(u64, castorepb::node::Node)> = directory
-                        .nodes()
-                        .map(|child_node| {
-                            let child_ino = inode_tracker.put((&child_node).into());
-                            (child_ino, child_node)
-                        })
-                        .collect();
-
-                    // replace.
-                    inode_tracker.replace(
-                        ino,
-                        Arc::new(InodeData::Directory(DirectoryInodeData::Populated(
-                            parent_digest.clone(),
-                            children.clone(),
-                        ))),
-                    );
-
-                    children
-                };
-
-                Ok((parent_digest.clone(), children))
-            }
-            // if the parent inode was not a directory, this doesn't make sense
-            InodeData::Regular(..) | InodeData::Symlink(_) => {
-                Err(io::Error::from_raw_os_error(libc::ENOTDIR))
-            }
-        }
-    }
-
-    /// This will turn a lookup request for a name in the root to a ino and
-    /// [InodeData].
-    /// It will peek in [self.root_nodes], and then either look it up from
-    /// [self.inode_tracker],
-    /// or otherwise fetch from [self.root_nodes], and then insert into
-    /// [self.inode_tracker].
-    /// In the case the name can't be found, a libc::ENOENT is returned.
-    fn name_in_root_to_ino_and_data(
-        &self,
-        name: &std::ffi::CStr,
-    ) -> io::Result<(u64, Arc<InodeData>)> {
-        // Look up the inode for that root node.
-        // If there's one, [self.inode_tracker] MUST also contain the data,
-        // which we can then return.
-        if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) {
-            return Ok((
-                inode,
-                self.inode_tracker
-                    .read()
-                    .get(inode)
-                    .expect("must exist")
-                    .to_owned(),
-            ));
-        }
-
-        // We don't have it yet, look it up in [self.root_nodes].
-        match self.tokio_handle.block_on({
-            let root_nodes_provider = self.root_nodes_provider.clone();
-            async move { root_nodes_provider.get_by_basename(name.to_bytes()).await }
-        }) {
-            // if there was an error looking up the root node, propagate up an IO error.
-            Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)),
-            // the root node doesn't exist, so the file doesn't exist.
-            Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
-            // The root node does exist
-            Ok(Some(root_node)) => {
-                // The name must match what's passed in the lookup, otherwise this is also a ENOENT.
-                if root_node.get_name() != name.to_bytes() {
-                    debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch");
-                    return Err(io::Error::from_raw_os_error(libc::ENOENT));
-                }
-
-                // Let's check if someone else beat us to updating the inode tracker and
-                // root_nodes map. This avoids locking inode_tracker for writing.
-                if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) {
-                    return Ok((
-                        *ino,
-                        self.inode_tracker.read().get(*ino).expect("must exist"),
-                    ));
-                }
-
-                // Only in case it doesn't, lock [self.root_nodes] and
-                // [self.inode_tracker] for writing.
-                let mut root_nodes = self.root_nodes.write();
-                let mut inode_tracker = self.inode_tracker.write();
-
-                // insert the (sparse) inode data and register in
-                // self.root_nodes.
-                let inode_data: InodeData = (&root_node).into();
-                let ino = inode_tracker.put(inode_data.clone());
-                root_nodes.insert(name.to_bytes().into(), ino);
-
-                Ok((ino, Arc::new(inode_data)))
-            }
-        }
-    }
-}
-
-impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN>
-where
-    BS: Deref<Target = dyn BlobService> + Clone + Send + 'static,
-    DS: Deref<Target = dyn DirectoryService> + Send + Clone + 'static,
-    RN: RootNodes + Clone + 'static,
-{
-    type Handle = u64;
-    type Inode = u64;
-
-    fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> {
-        Ok(FsOptions::empty())
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
-    fn getattr(
-        &self,
-        _ctx: &Context,
-        inode: Self::Inode,
-        _handle: Option<Self::Handle>,
-    ) -> io::Result<(stat64, Duration)> {
-        if inode == ROOT_ID {
-            return Ok((ROOT_FILE_ATTR.into(), Duration::MAX));
-        }
-
-        match self.inode_tracker.read().get(inode) {
-            None => Err(io::Error::from_raw_os_error(libc::ENOENT)),
-            Some(node) => {
-                debug!(node = ?node, "found node");
-                Ok((gen_file_attr(&node, inode).into(), Duration::MAX))
-            }
-        }
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))]
-    fn lookup(
-        &self,
-        _ctx: &Context,
-        parent: Self::Inode,
-        name: &std::ffi::CStr,
-    ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> {
-        debug!("lookup");
-
-        // This goes from a parent inode to a node.
-        // - If the parent is [ROOT_ID], we need to check
-        //   [self.root_nodes] (fetching from a [RootNode] provider if needed)
-        // - Otherwise, lookup the parent in [self.inode_tracker] (which must be
-        //   a [InodeData::Directory]), and find the child with that name.
-        if parent == ROOT_ID {
-            let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?;
-
-            debug!(inode_data=?&inode_data, ino=ino, "Some");
-            return Ok(fuse_backend_rs::api::filesystem::Entry {
-                inode: ino,
-                attr: gen_file_attr(&inode_data, ino).into(),
-                attr_timeout: Duration::MAX,
-                entry_timeout: Duration::MAX,
-                ..Default::default()
-            });
-        }
-        // This is the "lookup for "a" inside inode 42.
-        // We already know that inode 42 must be a directory.
-        let (parent_digest, children) = self.get_directory_children(parent)?;
-
-        let span = info_span!("lookup", directory.digest = %parent_digest);
-        let _enter = span.enter();
-
-        // Search for that name in the list of children and return the FileAttrs.
-
-        // in the children, find the one with the desired name.
-        if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) {
-            // lookup the child [InodeData] in [self.inode_tracker].
-            // We know the inodes for children have already been allocated.
-            let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap();
-
-            // Reply with the file attributes for the child.
-            // For child directories, we still have all data we need to reply.
-            Ok(fuse_backend_rs::api::filesystem::Entry {
-                inode: *child_ino,
-                attr: gen_file_attr(&child_inode_data, *child_ino).into(),
-                attr_timeout: Duration::MAX,
-                entry_timeout: Duration::MAX,
-                ..Default::default()
-            })
-        } else {
-            // Child not found, return ENOENT.
-            Err(io::Error::from_raw_os_error(libc::ENOENT))
-        }
-    }
-
-    // TODO: readdirplus?
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))]
-    fn readdir(
-        &self,
-        _ctx: &Context,
-        inode: Self::Inode,
-        _handle: Self::Handle,
-        _size: u32,
-        offset: u64,
-        add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>,
-    ) -> io::Result<()> {
-        debug!("readdir");
-
-        if inode == ROOT_ID {
-            if !self.list_root {
-                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
-            } else {
-                let root_nodes_provider = self.root_nodes_provider.clone();
-                let (tx, mut rx) = mpsc::channel(16);
-
-                // This task will run in the background immediately and will exit
-                // after the stream ends or if we no longer want any more entries.
-                self.tokio_handle.spawn(async move {
-                    let mut stream = root_nodes_provider.list().skip(offset as usize).enumerate();
-                    while let Some(node) = stream.next().await {
-                        if tx.send(node).await.is_err() {
-                            // If we get a send error, it means the sync code
-                            // doesn't want any more entries.
-                            break;
-                        }
-                    }
-                });
-
-                while let Some((i, root_node)) = rx.blocking_recv() {
-                    let root_node = match root_node {
-                        Err(e) => {
-                            warn!("failed to retrieve pathinfo: {}", e);
-                            return Err(io::Error::from_raw_os_error(libc::EPERM));
-                        }
-                        Ok(root_node) => root_node,
-                    };
-
-                    let name = root_node.get_name();
-                    // obtain the inode, or allocate a new one.
-                    let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| {
-                        // insert the (sparse) inode data and register in
-                        // self.root_nodes.
-                        let ino = self.inode_tracker.write().put((&root_node).into());
-                        self.root_nodes.write().insert(name.into(), ino);
-                        ino
-                    });
-
-                    let ty = match root_node {
-                        Node::Directory(_) => libc::S_IFDIR,
-                        Node::File(_) => libc::S_IFREG,
-                        Node::Symlink(_) => libc::S_IFLNK,
-                    };
-
-                    let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
-                        ino,
-                        offset: offset + i as u64 + 1,
-                        type_: ty,
-                        name,
-                    })?;
-                    // If the buffer is full, add_entry will return `Ok(0)`.
-                    if written == 0 {
-                        break;
-                    }
-                }
-
-                return Ok(());
-            }
-        }
-
-        // lookup the children, or return an error if it's not a directory.
-        let (parent_digest, children) = self.get_directory_children(inode)?;
-
-        let span = info_span!("lookup", directory.digest = %parent_digest);
-        let _enter = span.enter();
-
-        for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() {
-            // the second parameter will become the "offset" parameter on the next call.
-            let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
-                ino: *ino,
-                offset: offset + i as u64 + 1,
-                type_: match child_node {
-                    #[allow(clippy::unnecessary_cast)]
-                    // libc::S_IFDIR is u32 on Linux and u16 on MacOS
-                    Node::Directory(_) => libc::S_IFDIR as u32,
-                    #[allow(clippy::unnecessary_cast)]
-                    // libc::S_IFDIR is u32 on Linux and u16 on MacOS
-                    Node::File(_) => libc::S_IFREG as u32,
-                    #[allow(clippy::unnecessary_cast)]
-                    // libc::S_IFDIR is u32 on Linux and u16 on MacOS
-                    Node::Symlink(_) => libc::S_IFLNK as u32,
-                },
-                name: child_node.get_name(),
-            })?;
-            // If the buffer is full, add_entry will return `Ok(0)`.
-            if written == 0 {
-                break;
-            }
-        }
-
-        Ok(())
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
-    fn open(
-        &self,
-        _ctx: &Context,
-        inode: Self::Inode,
-        _flags: u32,
-        _fuse_flags: u32,
-    ) -> io::Result<(
-        Option<Self::Handle>,
-        fuse_backend_rs::api::filesystem::OpenOptions,
-    )> {
-        if inode == ROOT_ID {
-            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
-        }
-
-        // lookup the inode
-        match *self.inode_tracker.read().get(inode).unwrap() {
-            // read is invalid on non-files.
-            InodeData::Directory(..) | InodeData::Symlink(_) => {
-                warn!("is directory");
-                Err(io::Error::from_raw_os_error(libc::EISDIR))
-            }
-            InodeData::Regular(ref blob_digest, _blob_size, _) => {
-                let span = info_span!("read", blob.digest = %blob_digest);
-                let _enter = span.enter();
-
-                let blob_service = self.blob_service.clone();
-                let blob_digest = blob_digest.clone();
-
-                let task = self
-                    .tokio_handle
-                    .spawn(async move { blob_service.open_read(&blob_digest).await });
-
-                let blob_reader = self.tokio_handle.block_on(task).unwrap();
-
-                match blob_reader {
-                    Ok(None) => {
-                        warn!("blob not found");
-                        Err(io::Error::from_raw_os_error(libc::EIO))
-                    }
-                    Err(e) => {
-                        warn!(e=?e, "error opening blob");
-                        Err(io::Error::from_raw_os_error(libc::EIO))
-                    }
-                    Ok(Some(blob_reader)) => {
-                        // get a new file handle
-                        // TODO: this will overflow after 2**64 operations,
-                        // which is fine for now.
-                        // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
-                        // for the discussion on alternatives.
-                        let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst);
-
-                        debug!("add file handle {}", fh);
-                        self.file_handles
-                            .write()
-                            .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader)));
-
-                        Ok((
-                            Some(fh),
-                            fuse_backend_rs::api::filesystem::OpenOptions::empty(),
-                        ))
-                    }
-                }
-            }
-        }
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))]
-    fn release(
-        &self,
-        _ctx: &Context,
-        inode: Self::Inode,
-        _flags: u32,
-        handle: Self::Handle,
-        _flush: bool,
-        _flock_release: bool,
-        _lock_owner: Option<u64>,
-    ) -> io::Result<()> {
-        // remove and get ownership on the blob reader
-        match self.file_handles.write().remove(&handle) {
-            // drop it, which will close it.
-            Some(blob_reader) => drop(blob_reader),
-            None => {
-                // These might already be dropped if a read error occured.
-                debug!("file_handle {} not found", handle);
-            }
-        }
-
-        Ok(())
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))]
-    fn read(
-        &self,
-        _ctx: &Context,
-        inode: Self::Inode,
-        handle: Self::Handle,
-        w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter,
-        size: u32,
-        offset: u64,
-        _lock_owner: Option<u64>,
-        _flags: u32,
-    ) -> io::Result<usize> {
-        debug!("read");
-
-        // We need to take out the blob reader from self.file_handles, so we can
-        // interact with it in the separate task.
-        // On success, we pass it back out of the task, so we can put it back in self.file_handles.
-        let blob_reader = match self.file_handles.read().get(&handle) {
-            Some(blob_reader) => blob_reader.clone(),
-            None => {
-                warn!("file handle {} unknown", handle);
-                return Err(io::Error::from_raw_os_error(libc::EIO));
-            }
-        };
-
-        let task = self.tokio_handle.spawn(async move {
-            let mut blob_reader = blob_reader.lock().await;
-
-            // seek to the offset specified, which is relative to the start of the file.
-            let resp = blob_reader.seek(io::SeekFrom::Start(offset)).await;
-
-            match resp {
-                Ok(pos) => {
-                    debug_assert_eq!(offset, pos);
-                }
-                Err(e) => {
-                    warn!("failed to seek to offset {}: {}", offset, e);
-                    return Err(io::Error::from_raw_os_error(libc::EIO));
-                }
-            }
-
-            // As written in the fuse docs, read should send exactly the number
-            // of bytes requested except on EOF or error.
-
-            let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
-
-            // copy things from the internal buffer into buf to fill it till up until size
-            tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?;
-
-            Ok(buf)
-        });
-
-        let buf = self.tokio_handle.block_on(task).unwrap()?;
-
-        w.write(&buf)
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
-    fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> {
-        if inode == ROOT_ID {
-            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
-        }
-
-        // lookup the inode
-        match *self.inode_tracker.read().get(inode).unwrap() {
-            InodeData::Directory(..) | InodeData::Regular(..) => {
-                Err(io::Error::from_raw_os_error(libc::EINVAL))
-            }
-            InodeData::Symlink(ref target) => Ok(target.to_vec()),
-        }
-    }
-}