about summary refs log tree commit diff
path: root/tvix/store/src/fuse/mod.rs
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-09-16T18·54-0500
committerclbot <clbot@tvl.fyi>2023-09-20T14·27+0000
commit7e737fde34260daa477794d63b0b3344b4a1d81b (patch)
tree51f7d4b3c7f18aa78e584b06840fcdd645474895 /tvix/store/src/fuse/mod.rs
parent6b7c936bc50934b45df132f659292e2c45256dea (diff)
refactor(tvix/store/fs): Separate FUSE and filesystem code r/6622
In prepration for adding virtiofs support, I thought it would make sense
to split out the filesystem implementation from FUSE itself.

The `fs` module holds the tvix-store filesystem implemetation and the
`fuse` module holds the code to spawn a FUSE daemon backed by multiple
threads.

Change-Id: I8c58447b8c3aa016a613068f8e7ec166554e237c
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9343
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Autosubmit: Connor Brewster <cbrewster@hey.com>
Diffstat (limited to 'tvix/store/src/fuse/mod.rs')
-rw-r--r--tvix/store/src/fuse/mod.rs771
1 files changed, 0 insertions, 771 deletions
diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs
deleted file mode 100644
index 1a5d884ef7a9..000000000000
--- a/tvix/store/src/fuse/mod.rs
+++ /dev/null
@@ -1,771 +0,0 @@
-mod file_attr;
-mod inode_tracker;
-mod inodes;
-
-#[cfg(test)]
-mod tests;
-
-use crate::{
-    blobservice::{BlobReader, BlobService},
-    directoryservice::DirectoryService,
-    fuse::inodes::{DirectoryInodeData, InodeData},
-    pathinfoservice::PathInfoService,
-    proto::{node::Node, NamedNode},
-    B3Digest, Error,
-};
-use fuse_backend_rs::{
-    api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID},
-    transport::FuseSession,
-};
-use nix_compat::store_path::StorePath;
-use parking_lot::RwLock;
-use std::{
-    collections::HashMap,
-    io,
-    path::Path,
-    str::FromStr,
-    sync::atomic::AtomicU64,
-    sync::{atomic::Ordering, Arc},
-    thread,
-    time::Duration,
-};
-use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
-use tracing::{debug, error, info_span, warn};
-
-use self::{
-    file_attr::{gen_file_attr, ROOT_FILE_ATTR},
-    inode_tracker::InodeTracker,
-};
-
-/// This implements a read-only FUSE filesystem for a tvix-store
-/// with the passed [BlobService], [DirectoryService] and [PathInfoService].
-///
-/// We don't allow listing on the root mountpoint (inode 0).
-/// In the future, this might be made configurable once a listing method is
-/// added to [self.path_info_service], and then show all store paths in that
-/// store.
-///
-/// Linux uses inodes in filesystems. When implementing FUSE, most calls are
-/// *for* a given inode.
-///
-/// This means, we need to have a stable mapping of inode numbers to the
-/// corresponding store nodes.
-///
-/// We internally delegate all inode allocation and state keeping to the
-/// inode tracker, and store the currently "explored" store paths together with
-/// root inode of the root.
-///
-/// There's some places where inodes are allocated / data inserted into
-/// the inode tracker, if not allocated before already:
-///  - Processing a `lookup` request, either in the mount root, or somewhere
-///    deeper
-///  - Processing a `readdir` request
-///
-///  Things pointing to the same contents get the same inodes, irrespective of
-///  their own location.
-///  This means:
-///  - Symlinks with the same target will get the same inode.
-///  - Regular/executable files with the same contents will get the same inode
-///  - Directories with the same contents will get the same inode.
-///
-/// Due to the above being valid across the whole store, and considering the
-/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed
-/// allocation", aka reserve Directory.size inodes for each PathInfo.
-pub struct FUSE {
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
-
-    /// Whether to (try) listing elements in the root.
-    list_root: bool,
-
-    /// This maps a given StorePath to the inode we allocated for the root inode.
-    store_paths: RwLock<HashMap<StorePath, u64>>,
-
-    /// This keeps track of inodes and data alongside them.
-    inode_tracker: RwLock<InodeTracker>,
-
-    /// This holds all open file handles
-    file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>,
-
-    next_file_handle: AtomicU64,
-
-    tokio_handle: tokio::runtime::Handle,
-}
-
-impl FUSE {
-    pub fn new(
-        blob_service: Arc<dyn BlobService>,
-        directory_service: Arc<dyn DirectoryService>,
-        path_info_service: Arc<dyn PathInfoService>,
-        list_root: bool,
-    ) -> Self {
-        Self {
-            blob_service,
-            directory_service,
-            path_info_service,
-
-            list_root,
-
-            store_paths: RwLock::new(HashMap::default()),
-            inode_tracker: RwLock::new(Default::default()),
-
-            file_handles: RwLock::new(Default::default()),
-            next_file_handle: AtomicU64::new(1),
-            tokio_handle: tokio::runtime::Handle::current(),
-        }
-    }
-
-    /// This will turn a lookup request for [std::ffi::OsStr] in the root to
-    /// a ino and [InodeData].
-    /// It will peek in [self.store_paths], and then either look it up from
-    /// [self.inode_tracker],
-    /// or otherwise fetch from [self.path_info_service], and then insert into
-    /// [self.inode_tracker].
-    fn name_in_root_to_ino_and_data(
-        &self,
-        name: &std::ffi::CStr,
-    ) -> Result<Option<(u64, Arc<InodeData>)>, Error> {
-        // parse the name into a [StorePath].
-        let store_path = if let Some(name) = name.to_str().ok() {
-            match StorePath::from_str(name) {
-                Ok(store_path) => store_path,
-                Err(e) => {
-                    debug!(e=?e, "unable to parse as store path");
-                    // This is not an error, but a "ENOENT", as someone can stat
-                    // a file inside the root that's no valid store path
-                    return Ok(None);
-                }
-            }
-        } else {
-            debug!("{name:?} is not a valid utf-8 string");
-            // same here.
-            return Ok(None);
-        };
-
-        let ino = {
-            // This extra scope makes sure we drop the read lock
-            // immediately after reading, to prevent deadlocks.
-            let store_paths = self.store_paths.read();
-            store_paths.get(&store_path).cloned()
-        };
-
-        if let Some(ino) = ino {
-            // If we already have that store path, lookup the inode from
-            // self.store_paths and then get the data from [self.inode_tracker],
-            // which in the case of a [InodeData::Directory] will be fully
-            // populated.
-            Ok(Some((
-                ino,
-                self.inode_tracker.read().get(ino).expect("must exist"),
-            )))
-        } else {
-            // If we don't have it, look it up in PathInfoService.
-            match self.path_info_service.get(store_path.digest)? {
-                // the pathinfo doesn't exist, so the file doesn't exist.
-                None => Ok(None),
-                Some(path_info) => {
-                    // The pathinfo does exist, so there must be a root node
-                    let root_node = path_info.node.unwrap().node.unwrap();
-
-                    // The name must match what's passed in the lookup, otherwise we return nothing.
-                    if root_node.get_name() != store_path.to_string().as_bytes() {
-                        return Ok(None);
-                    }
-
-                    // Let's check if someone else beat us to updating the inode tracker and
-                    // store_paths map.
-                    let mut store_paths = self.store_paths.write();
-                    if let Some(ino) = store_paths.get(&store_path).cloned() {
-                        return Ok(Some((
-                            ino,
-                            self.inode_tracker.read().get(ino).expect("must exist"),
-                        )));
-                    }
-
-                    // insert the (sparse) inode data and register in
-                    // self.store_paths.
-                    // FUTUREWORK: change put to return the data after
-                    // inserting, so we don't need to lookup a second
-                    // time?
-                    let (ino, inode) = {
-                        let mut inode_tracker = self.inode_tracker.write();
-                        let ino = inode_tracker.put((&root_node).into());
-                        (ino, inode_tracker.get(ino).unwrap())
-                    };
-                    store_paths.insert(store_path, ino);
-
-                    Ok(Some((ino, inode)))
-                }
-            }
-        }
-    }
-
-    /// This will lookup a directory by digest, and will turn it into a
-    /// [InodeData::Directory(DirectoryInodeData::Populated(..))].
-    /// This is both used to initially insert the root node of a store path,
-    /// as well as when looking up an intermediate DirectoryNode.
-    fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> {
-        match self.directory_service.get(directory_digest) {
-            Err(e) => {
-                warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory");
-                Err(e)
-            }
-            // If the Directory can't be found, this is a hole, bail out.
-            Ok(None) => {
-                tracing::error!(directory.digest=%directory_digest, "directory not found in directory service");
-                Err(Error::StorageError(format!(
-                    "directory {} not found",
-                    directory_digest
-                )))
-            }
-            Ok(Some(directory)) => Ok(directory.into()),
-        }
-    }
-}
-
-impl FileSystem for FUSE {
-    type Inode = u64;
-    type Handle = u64;
-
-    fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> {
-        Ok(FsOptions::empty())
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
-    fn getattr(
-        &self,
-        _ctx: &Context,
-        inode: Self::Inode,
-        _handle: Option<Self::Handle>,
-    ) -> io::Result<(libc::stat64, Duration)> {
-        if inode == ROOT_ID {
-            return Ok((ROOT_FILE_ATTR.into(), Duration::MAX));
-        }
-
-        match self.inode_tracker.read().get(inode) {
-            None => return Err(io::Error::from_raw_os_error(libc::ENOENT)),
-            Some(node) => {
-                debug!(node = ?node, "found node");
-                Ok((gen_file_attr(&node, inode).into(), Duration::MAX))
-            }
-        }
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))]
-    fn lookup(
-        &self,
-        _ctx: &Context,
-        parent: Self::Inode,
-        name: &std::ffi::CStr,
-    ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> {
-        debug!("lookup");
-
-        // This goes from a parent inode to a node.
-        // - If the parent is [ROOT_ID], we need to check
-        //   [self.store_paths] (fetching from PathInfoService if needed)
-        // - Otherwise, lookup the parent in [self.inode_tracker] (which must be
-        //   a [InodeData::Directory]), and find the child with that name.
-        if parent == ROOT_ID {
-            return match self.name_in_root_to_ino_and_data(name) {
-                Err(e) => {
-                    warn!("{}", e);
-                    Err(io::Error::from_raw_os_error(libc::ENOENT))
-                }
-                Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
-                Ok(Some((ino, inode_data))) => {
-                    debug!(inode_data=?&inode_data, ino=ino, "Some");
-                    Ok(fuse_backend_rs::api::filesystem::Entry {
-                        inode: ino,
-                        attr: gen_file_attr(&inode_data, ino).into(),
-                        attr_timeout: Duration::MAX,
-                        entry_timeout: Duration::MAX,
-                        ..Default::default()
-                    })
-                }
-            };
-        }
-
-        // This is the "lookup for "a" inside inode 42.
-        // We already know that inode 42 must be a directory.
-        // It might not be populated yet, so if it isn't, we do (by
-        // fetching from [self.directory_service]), and save the result in
-        // [self.inode_tracker].
-        // Now it for sure is populated, so we search for that name in the
-        // list of children and return the FileAttrs.
-
-        // TODO: Reduce the critical section of this write lock.
-        let mut inode_tracker = self.inode_tracker.write();
-        let parent_data = inode_tracker.get(parent).unwrap();
-        let parent_data = match *parent_data {
-            InodeData::Regular(..) | InodeData::Symlink(_) => {
-                // if the parent inode was not a directory, this doesn't make sense
-                return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
-            }
-            InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
-                match self.fetch_directory_inode_data(parent_digest) {
-                    Ok(new_data) => {
-                        // update data in [self.inode_tracker] with populated variant.
-                        // FUTUREWORK: change put to return the data after
-                        // inserting, so we don't need to lookup a second
-                        // time?
-                        let ino = inode_tracker.put(new_data);
-                        inode_tracker.get(ino).unwrap()
-                    }
-                    Err(_e) => {
-                        return Err(io::Error::from_raw_os_error(libc::EIO));
-                    }
-                }
-            }
-            InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data,
-        };
-
-        // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))].
-        let (parent_digest, children) = if let InodeData::Directory(
-            DirectoryInodeData::Populated(ref parent_digest, ref children),
-        ) = *parent_data
-        {
-            (parent_digest, children)
-        } else {
-            panic!("unexpected type")
-        };
-        let span = info_span!("lookup", directory.digest = %parent_digest);
-        let _enter = span.enter();
-
-        // in the children, find the one with the desired name.
-        if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) {
-            // lookup the child [InodeData] in [self.inode_tracker].
-            // We know the inodes for children have already been allocated.
-            let child_inode_data = inode_tracker.get(*child_ino).unwrap();
-
-            // Reply with the file attributes for the child.
-            // For child directories, we still have all data we need to reply.
-            Ok(fuse_backend_rs::api::filesystem::Entry {
-                inode: *child_ino,
-                attr: gen_file_attr(&child_inode_data, *child_ino).into(),
-                attr_timeout: Duration::MAX,
-                entry_timeout: Duration::MAX,
-                ..Default::default()
-            })
-        } else {
-            // Child not found, return ENOENT.
-            Err(io::Error::from_raw_os_error(libc::ENOENT))
-        }
-    }
-
-    // TODO: readdirplus?
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))]
-    fn readdir(
-        &self,
-        _ctx: &Context,
-        inode: Self::Inode,
-        _handle: Self::Handle,
-        _size: u32,
-        offset: u64,
-        add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>,
-    ) -> io::Result<()> {
-        debug!("readdir");
-
-        if inode == ROOT_ID {
-            if !self.list_root {
-                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
-            } else {
-                for (i, path_info) in self
-                    .path_info_service
-                    .list()
-                    .skip(offset as usize)
-                    .enumerate()
-                {
-                    let path_info = match path_info {
-                        Err(e) => {
-                            warn!("failed to retrieve pathinfo: {}", e);
-                            return Err(io::Error::from_raw_os_error(libc::EPERM));
-                        }
-                        Ok(path_info) => path_info,
-                    };
-
-                    // We know the root node exists and the store_path can be parsed because clients MUST validate.
-                    let root_node = path_info.node.unwrap().node.unwrap();
-                    let store_path = StorePath::from_bytes(root_node.get_name()).unwrap();
-
-                    let ino = {
-                        // This extra scope makes sure we drop the read lock
-                        // immediately after reading, to prevent deadlocks.
-                        let store_paths = self.store_paths.read();
-                        store_paths.get(&store_path).cloned()
-                    };
-                    let ino = match ino {
-                        Some(ino) => ino,
-                        None => {
-                            // insert the (sparse) inode data and register in
-                            // self.store_paths.
-                            let ino = self.inode_tracker.write().put((&root_node).into());
-                            self.store_paths.write().insert(store_path.clone(), ino);
-                            ino
-                        }
-                    };
-
-                    let ty = match root_node {
-                        Node::Directory(_) => libc::S_IFDIR,
-                        Node::File(_) => libc::S_IFREG,
-                        Node::Symlink(_) => libc::S_IFLNK,
-                    };
-
-                    let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
-                        ino,
-                        offset: offset + i as u64 + 1,
-                        type_: ty,
-                        name: store_path.to_string().as_bytes(),
-                    })?;
-                    // If the buffer is full, add_entry will return `Ok(0)`.
-                    if written == 0 {
-                        break;
-                    }
-                }
-                return Ok(());
-            }
-        }
-
-        // lookup the inode data.
-        let mut inode_tracker = self.inode_tracker.write();
-        let dir_inode_data = inode_tracker.get(inode).unwrap();
-        let dir_inode_data = match *dir_inode_data {
-            InodeData::Regular(..) | InodeData::Symlink(..) => {
-                warn!("Not a directory");
-                return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
-            }
-            InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => {
-                match self.fetch_directory_inode_data(directory_digest) {
-                    Ok(new_data) => {
-                        // update data in [self.inode_tracker] with populated variant.
-                        // FUTUREWORK: change put to return the data after
-                        // inserting, so we don't need to lookup a second
-                        // time?
-                        let ino = inode_tracker.put(new_data.clone());
-                        inode_tracker.get(ino).unwrap()
-                    }
-                    Err(_e) => {
-                        return Err(io::Error::from_raw_os_error(libc::EIO));
-                    }
-                }
-            }
-            InodeData::Directory(DirectoryInodeData::Populated(..)) => dir_inode_data,
-        };
-
-        // now parent_data can only be InodeData::Directory(DirectoryInodeData::Populated(..))
-        if let InodeData::Directory(DirectoryInodeData::Populated(ref _digest, ref children)) =
-            *dir_inode_data
-        {
-            for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() {
-                // the second parameter will become the "offset" parameter on the next call.
-                let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
-                    ino: *ino,
-                    offset: offset + i as u64 + 1,
-                    type_: match child_node {
-                        Node::Directory(_) => libc::S_IFDIR,
-                        Node::File(_) => libc::S_IFREG,
-                        Node::Symlink(_) => libc::S_IFLNK,
-                    },
-                    name: child_node.get_name(),
-                })?;
-                // If the buffer is full, add_entry will return `Ok(0)`.
-                if written == 0 {
-                    break;
-                }
-            }
-        } else {
-            panic!("unexpected type")
-        }
-
-        Ok(())
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
-    fn open(
-        &self,
-        _ctx: &Context,
-        inode: Self::Inode,
-        _flags: u32,
-        _fuse_flags: u32,
-    ) -> io::Result<(
-        Option<Self::Handle>,
-        fuse_backend_rs::api::filesystem::OpenOptions,
-    )> {
-        if inode == ROOT_ID {
-            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
-        }
-
-        // lookup the inode
-        match *self.inode_tracker.read().get(inode).unwrap() {
-            // read is invalid on non-files.
-            InodeData::Directory(..) | InodeData::Symlink(_) => {
-                warn!("is directory");
-                return Err(io::Error::from_raw_os_error(libc::EISDIR));
-            }
-            InodeData::Regular(ref blob_digest, _blob_size, _) => {
-                let span = info_span!("read", blob.digest = %blob_digest);
-                let _enter = span.enter();
-
-                let blob_service = self.blob_service.clone();
-                let blob_digest = blob_digest.clone();
-
-                let task = self
-                    .tokio_handle
-                    .spawn(async move { blob_service.open_read(&blob_digest).await });
-
-                let blob_reader = self.tokio_handle.block_on(task).unwrap();
-
-                match blob_reader {
-                    Ok(None) => {
-                        warn!("blob not found");
-                        return Err(io::Error::from_raw_os_error(libc::EIO));
-                    }
-                    Err(e) => {
-                        warn!(e=?e, "error opening blob");
-                        return Err(io::Error::from_raw_os_error(libc::EIO));
-                    }
-                    Ok(Some(blob_reader)) => {
-                        // get a new file handle
-                        // TODO: this will overflow after 2**64 operations,
-                        // which is fine for now.
-                        // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
-                        // for the discussion on alternatives.
-                        let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst);
-
-                        debug!("add file handle {}", fh);
-                        self.file_handles
-                            .write()
-                            .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader)));
-
-                        Ok((
-                            Some(fh),
-                            fuse_backend_rs::api::filesystem::OpenOptions::empty(),
-                        ))
-                    }
-                }
-            }
-        }
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))]
-    fn release(
-        &self,
-        _ctx: &Context,
-        inode: Self::Inode,
-        _flags: u32,
-        handle: Self::Handle,
-        _flush: bool,
-        _flock_release: bool,
-        _lock_owner: Option<u64>,
-    ) -> io::Result<()> {
-        // remove and get ownership on the blob reader
-        match self.file_handles.write().remove(&handle) {
-            // drop it, which will close it.
-            Some(blob_reader) => drop(blob_reader),
-            None => {
-                // These might already be dropped if a read error occured.
-                debug!("file_handle {} not found", handle);
-            }
-        }
-
-        Ok(())
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))]
-    fn read(
-        &self,
-        _ctx: &Context,
-        inode: Self::Inode,
-        handle: Self::Handle,
-        w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter,
-        size: u32,
-        offset: u64,
-        _lock_owner: Option<u64>,
-        _flags: u32,
-    ) -> io::Result<usize> {
-        debug!("read");
-
-        // We need to take out the blob reader from self.file_handles, so we can
-        // interact with it in the separate task.
-        // On success, we pass it back out of the task, so we can put it back in self.file_handles.
-        let blob_reader = match self.file_handles.read().get(&handle) {
-            Some(blob_reader) => blob_reader.clone(),
-            None => {
-                warn!("file handle {} unknown", handle);
-                return Err(io::Error::from_raw_os_error(libc::EIO));
-            }
-        };
-
-        let task = self.tokio_handle.spawn(async move {
-            let mut blob_reader = blob_reader.lock().await;
-
-            // seek to the offset specified, which is relative to the start of the file.
-            let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await;
-
-            match resp {
-                Ok(pos) => {
-                    debug_assert_eq!(offset as u64, pos);
-                }
-                Err(e) => {
-                    warn!("failed to seek to offset {}: {}", offset, e);
-                    return Err(io::Error::from_raw_os_error(libc::EIO));
-                }
-            }
-
-            // As written in the fuse docs, read should send exactly the number
-            // of bytes requested except on EOF or error.
-
-            let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
-
-            while (buf.len() as u64) < size as u64 {
-                let int_buf = blob_reader.fill_buf().await?;
-                // copy things from the internal buffer into buf to fill it till up until size
-
-                // an empty buffer signals we reached EOF.
-                if int_buf.is_empty() {
-                    break;
-                }
-
-                // calculate how many bytes we can read from int_buf.
-                // It's either all of int_buf, or the number of bytes missing in buf to reach size.
-                let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len());
-
-                // copy these bytes into our buffer
-                buf.extend_from_slice(&int_buf[..len_to_copy]);
-                // and consume them in the buffered reader.
-                blob_reader.consume(len_to_copy);
-            }
-
-            Ok(buf)
-        });
-
-        let buf = self.tokio_handle.block_on(task).unwrap()?;
-
-        w.write(&buf)
-    }
-
-    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
-    fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> {
-        if inode == ROOT_ID {
-            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
-        }
-
-        // lookup the inode
-        match *self.inode_tracker.read().get(inode).unwrap() {
-            InodeData::Directory(..) | InodeData::Regular(..) => {
-                Err(io::Error::from_raw_os_error(libc::EINVAL))
-            }
-            InodeData::Symlink(ref target) => Ok(target.to_vec()),
-        }
-    }
-}
-
-struct FuseServer<FS>
-where
-    FS: FileSystem + Sync + Send,
-{
-    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
-    channel: fuse_backend_rs::transport::FuseChannel,
-}
-
-impl<FS> FuseServer<FS>
-where
-    FS: FileSystem + Sync + Send,
-{
-    fn start(&mut self) -> io::Result<()> {
-        loop {
-            if let Some((reader, writer)) = self
-                .channel
-                .get_request()
-                .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
-            {
-                if let Err(e) = self
-                    .server
-                    .handle_message(reader, writer.into(), None, None)
-                {
-                    match e {
-                        // This indicates the session has been shut down.
-                        fuse_backend_rs::Error::EncodeMessage(e)
-                            if e.raw_os_error() == Some(libc::EBADFD) =>
-                        {
-                            break;
-                        }
-                        error => {
-                            error!(?error, "failed to handle fuse request");
-                            continue;
-                        }
-                    }
-                }
-            } else {
-                break;
-            }
-        }
-        Ok(())
-    }
-}
-
-pub struct FuseDaemon {
-    session: FuseSession,
-    threads: Vec<thread::JoinHandle<()>>,
-}
-
-impl FuseDaemon {
-    pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error>
-    where
-        FS: FileSystem + Sync + Send + 'static,
-        P: AsRef<Path>,
-    {
-        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
-
-        let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
-
-        session.set_allow_other(false);
-        session
-            .mount()
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
-        let mut join_handles = Vec::with_capacity(threads);
-        for _ in 0..threads {
-            let mut server = FuseServer {
-                server: server.clone(),
-                channel: session
-                    .new_channel()
-                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
-            };
-            let join_handle = thread::Builder::new()
-                .name("fuse_server".to_string())
-                .spawn(move || {
-                    let _ = server.start();
-                })?;
-            join_handles.push(join_handle);
-        }
-
-        Ok(FuseDaemon {
-            session,
-            threads: join_handles,
-        })
-    }
-
-    pub fn unmount(&mut self) -> Result<(), io::Error> {
-        self.session
-            .umount()
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
-
-        for thread in self.threads.drain(..) {
-            thread.join().map_err(|_| {
-                io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
-            })?;
-        }
-
-        Ok(())
-    }
-}
-
-impl Drop for FuseDaemon {
-    fn drop(&mut self) {
-        if let Err(error) = self.unmount() {
-            error!(?error, "failed to unmont fuse filesystem")
-        }
-    }
-}