about summary refs log tree commit diff
path: root/tvix/store/src/fuse/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/fuse/mod.rs')
-rw-r--r--tvix/store/src/fuse/mod.rs640
1 files changed, 402 insertions, 238 deletions
diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs
index 978fd50e2634..1a5d884ef7a9 100644
--- a/tvix/store/src/fuse/mod.rs
+++ b/tvix/store/src/fuse/mod.rs
@@ -8,25 +8,34 @@ mod tests;
 use crate::{
     blobservice::{BlobReader, BlobService},
     directoryservice::DirectoryService,
-    fuse::{
-        file_attr::gen_file_attr,
-        inodes::{DirectoryInodeData, InodeData},
-    },
+    fuse::inodes::{DirectoryInodeData, InodeData},
     pathinfoservice::PathInfoService,
     proto::{node::Node, NamedNode},
     B3Digest, Error,
 };
-use fuser::{FileAttr, ReplyAttr, Request};
+use fuse_backend_rs::{
+    api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID},
+    transport::FuseSession,
+};
 use nix_compat::store_path::StorePath;
-use std::io;
-use std::os::unix::ffi::OsStrExt;
-use std::str::FromStr;
-use std::sync::Arc;
-use std::{collections::HashMap, time::Duration};
+use parking_lot::RwLock;
+use std::{
+    collections::HashMap,
+    io,
+    path::Path,
+    str::FromStr,
+    sync::atomic::AtomicU64,
+    sync::{atomic::Ordering, Arc},
+    thread,
+    time::Duration,
+};
 use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
-use tracing::{debug, info_span, warn};
+use tracing::{debug, error, info_span, warn};
 
-use self::inode_tracker::InodeTracker;
+use self::{
+    file_attr::{gen_file_attr, ROOT_FILE_ATTR},
+    inode_tracker::InodeTracker,
+};
 
 /// This implements a read-only FUSE filesystem for a tvix-store
 /// with the passed [BlobService], [DirectoryService] and [PathInfoService].
@@ -71,15 +80,15 @@ pub struct FUSE {
     list_root: bool,
 
     /// This maps a given StorePath to the inode we allocated for the root inode.
-    store_paths: HashMap<StorePath, u64>,
+    store_paths: RwLock<HashMap<StorePath, u64>>,
 
     /// This keeps track of inodes and data alongside them.
-    inode_tracker: InodeTracker,
+    inode_tracker: RwLock<InodeTracker>,
 
     /// This holds all open file handles
-    file_handles: HashMap<u64, Box<dyn BlobReader>>,
+    file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>,
 
-    next_file_handle: u64,
+    next_file_handle: AtomicU64,
 
     tokio_handle: tokio::runtime::Handle,
 }
@@ -98,11 +107,11 @@ impl FUSE {
 
             list_root,
 
-            store_paths: HashMap::default(),
-            inode_tracker: Default::default(),
+            store_paths: RwLock::new(HashMap::default()),
+            inode_tracker: RwLock::new(Default::default()),
 
-            file_handles: Default::default(),
-            next_file_handle: 1,
+            file_handles: RwLock::new(Default::default()),
+            next_file_handle: AtomicU64::new(1),
             tokio_handle: tokio::runtime::Handle::current(),
         }
     }
@@ -114,11 +123,11 @@ impl FUSE {
     /// or otherwise fetch from [self.path_info_service], and then insert into
     /// [self.inode_tracker].
     fn name_in_root_to_ino_and_data(
-        &mut self,
-        name: &std::ffi::OsStr,
+        &self,
+        name: &std::ffi::CStr,
     ) -> Result<Option<(u64, Arc<InodeData>)>, Error> {
         // parse the name into a [StorePath].
-        let store_path = if let Some(name) = name.to_str() {
+        let store_path = if let Some(name) = name.to_str().ok() {
             match StorePath::from_str(name) {
                 Ok(store_path) => store_path,
                 Err(e) => {
@@ -129,19 +138,26 @@ impl FUSE {
                 }
             }
         } else {
-            debug!("{name:?} is no string");
+            debug!("{name:?} is not a valid utf-8 string");
             // same here.
             return Ok(None);
         };
 
-        if let Some(ino) = self.store_paths.get(&store_path) {
+        let ino = {
+            // This extra scope makes sure we drop the read lock
+            // immediately after reading, to prevent deadlocks.
+            let store_paths = self.store_paths.read();
+            store_paths.get(&store_path).cloned()
+        };
+
+        if let Some(ino) = ino {
             // If we already have that store path, lookup the inode from
             // self.store_paths and then get the data from [self.inode_tracker],
             // which in the case of a [InodeData::Directory] will be fully
             // populated.
             Ok(Some((
-                *ino,
-                self.inode_tracker.get(*ino).expect("must exist"),
+                ino,
+                self.inode_tracker.read().get(ino).expect("must exist"),
             )))
         } else {
             // If we don't have it, look it up in PathInfoService.
@@ -157,15 +173,29 @@ impl FUSE {
                         return Ok(None);
                     }
 
+                    // Let's check if someone else beat us to updating the inode tracker and
+                    // store_paths map.
+                    let mut store_paths = self.store_paths.write();
+                    if let Some(ino) = store_paths.get(&store_path).cloned() {
+                        return Ok(Some((
+                            ino,
+                            self.inode_tracker.read().get(ino).expect("must exist"),
+                        )));
+                    }
+
                     // insert the (sparse) inode data and register in
                     // self.store_paths.
                     // FUTUREWORK: change put to return the data after
                     // inserting, so we don't need to lookup a second
                     // time?
-                    let ino = self.inode_tracker.put((&root_node).into());
-                    self.store_paths.insert(store_path, ino);
+                    let (ino, inode) = {
+                        let mut inode_tracker = self.inode_tracker.write();
+                        let ino = inode_tracker.put((&root_node).into());
+                        (ino, inode_tracker.get(ino).unwrap())
+                    };
+                    store_paths.insert(store_path, ino);
 
-                    Ok(Some((ino, self.inode_tracker.get(ino).unwrap())))
+                    Ok(Some((ino, inode)))
                 }
             }
         }
@@ -194,136 +224,152 @@ impl FUSE {
     }
 }
 
-impl fuser::Filesystem for FUSE {
-    #[tracing::instrument(skip_all, fields(rq.inode = ino))]
-    fn getattr(&mut self, _req: &Request, ino: u64, reply: ReplyAttr) {
-        debug!("getattr");
+impl FileSystem for FUSE {
+    type Inode = u64;
+    type Handle = u64;
+
+    fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> {
+        Ok(FsOptions::empty())
+    }
 
-        if ino == fuser::FUSE_ROOT_ID {
-            reply.attr(&Duration::MAX, &file_attr::ROOT_FILE_ATTR);
-            return;
+    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
+    fn getattr(
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _handle: Option<Self::Handle>,
+    ) -> io::Result<(libc::stat64, Duration)> {
+        if inode == ROOT_ID {
+            return Ok((ROOT_FILE_ATTR.into(), Duration::MAX));
         }
 
-        match self.inode_tracker.get(ino) {
-            None => reply.error(libc::ENOENT),
+        match self.inode_tracker.read().get(inode) {
+            None => return Err(io::Error::from_raw_os_error(libc::ENOENT)),
             Some(node) => {
                 debug!(node = ?node, "found node");
-                reply.attr(&Duration::MAX, &file_attr::gen_file_attr(&node, ino));
+                Ok((gen_file_attr(&node, inode).into(), Duration::MAX))
             }
         }
     }
 
-    #[tracing::instrument(skip_all, fields(rq.parent_inode = parent_ino, rq.name = ?name))]
+    #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))]
     fn lookup(
-        &mut self,
-        _req: &Request,
-        parent_ino: u64,
-        name: &std::ffi::OsStr,
-        reply: fuser::ReplyEntry,
-    ) {
+        &self,
+        _ctx: &Context,
+        parent: Self::Inode,
+        name: &std::ffi::CStr,
+    ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> {
         debug!("lookup");
 
         // This goes from a parent inode to a node.
-        // - If the parent is [fuser::FUSE_ROOT_ID], we need to check
+        // - If the parent is [ROOT_ID], we need to check
         //   [self.store_paths] (fetching from PathInfoService if needed)
         // - Otherwise, lookup the parent in [self.inode_tracker] (which must be
         //   a [InodeData::Directory]), and find the child with that name.
-        if parent_ino == fuser::FUSE_ROOT_ID {
-            match self.name_in_root_to_ino_and_data(name) {
+        if parent == ROOT_ID {
+            return match self.name_in_root_to_ino_and_data(name) {
                 Err(e) => {
                     warn!("{}", e);
-                    reply.error(libc::EIO);
-                }
-                Ok(None) => {
-                    reply.error(libc::ENOENT);
+                    Err(io::Error::from_raw_os_error(libc::ENOENT))
                 }
+                Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
                 Ok(Some((ino, inode_data))) => {
                     debug!(inode_data=?&inode_data, ino=ino, "Some");
-                    reply_with_entry(reply, &gen_file_attr(&inode_data, ino));
+                    Ok(fuse_backend_rs::api::filesystem::Entry {
+                        inode: ino,
+                        attr: gen_file_attr(&inode_data, ino).into(),
+                        attr_timeout: Duration::MAX,
+                        entry_timeout: Duration::MAX,
+                        ..Default::default()
+                    })
                 }
+            };
+        }
+
+        // This is the "lookup for "a" inside inode 42.
+        // We already know that inode 42 must be a directory.
+        // It might not be populated yet, so if it isn't, we do (by
+        // fetching from [self.directory_service]), and save the result in
+        // [self.inode_tracker].
+        // Now it for sure is populated, so we search for that name in the
+        // list of children and return the FileAttrs.
+
+        // TODO: Reduce the critical section of this write lock.
+        let mut inode_tracker = self.inode_tracker.write();
+        let parent_data = inode_tracker.get(parent).unwrap();
+        let parent_data = match *parent_data {
+            InodeData::Regular(..) | InodeData::Symlink(_) => {
+                // if the parent inode was not a directory, this doesn't make sense
+                return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
             }
-        } else {
-            // This is the "lookup for "a" inside inode 42.
-            // We already know that inode 42 must be a directory.
-            // It might not be populated yet, so if it isn't, we do (by
-            // fetching from [self.directory_service]), and save the result in
-            // [self.inode_tracker].
-            // Now it for sure is populated, so we search for that name in the
-            // list of children and return the FileAttrs.
-
-            let parent_data = self.inode_tracker.get(parent_ino).unwrap();
-            let parent_data = match *parent_data {
-                InodeData::Regular(..) | InodeData::Symlink(_) => {
-                    // if the parent inode was not a directory, this doesn't make sense
-                    reply.error(libc::ENOTDIR);
-                    return;
-                }
-                InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
-                    match self.fetch_directory_inode_data(parent_digest) {
-                        Ok(new_data) => {
-                            // update data in [self.inode_tracker] with populated variant.
-                            // FUTUREWORK: change put to return the data after
-                            // inserting, so we don't need to lookup a second
-                            // time?
-                            let ino = self.inode_tracker.put(new_data);
-                            self.inode_tracker.get(ino).unwrap()
-                        }
-                        Err(_e) => {
-                            reply.error(libc::EIO);
-                            return;
-                        }
+            InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
+                match self.fetch_directory_inode_data(parent_digest) {
+                    Ok(new_data) => {
+                        // update data in [self.inode_tracker] with populated variant.
+                        // FUTUREWORK: change put to return the data after
+                        // inserting, so we don't need to lookup a second
+                        // time?
+                        let ino = inode_tracker.put(new_data);
+                        inode_tracker.get(ino).unwrap()
+                    }
+                    Err(_e) => {
+                        return Err(io::Error::from_raw_os_error(libc::EIO));
                     }
                 }
-                InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data,
-            };
-
-            // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))].
-            let (parent_digest, children) = if let InodeData::Directory(
-                DirectoryInodeData::Populated(ref parent_digest, ref children),
-            ) = *parent_data
-            {
-                (parent_digest, children)
-            } else {
-                panic!("unexpected type")
-            };
-            let span = info_span!("lookup", directory.digest = %parent_digest);
-            let _enter = span.enter();
-
-            // in the children, find the one with the desired name.
-            if let Some((child_ino, _)) =
-                children.iter().find(|e| e.1.get_name() == name.as_bytes())
-            {
-                // lookup the child [InodeData] in [self.inode_tracker].
-                // We know the inodes for children have already been allocated.
-                let child_inode_data = self.inode_tracker.get(*child_ino).unwrap();
-
-                // Reply with the file attributes for the child.
-                // For child directories, we still have all data we need to reply.
-                reply_with_entry(reply, &gen_file_attr(&child_inode_data, *child_ino));
-            } else {
-                // Child not found, return ENOENT.
-                reply.error(libc::ENOENT);
             }
+            InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data,
+        };
+
+        // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))].
+        let (parent_digest, children) = if let InodeData::Directory(
+            DirectoryInodeData::Populated(ref parent_digest, ref children),
+        ) = *parent_data
+        {
+            (parent_digest, children)
+        } else {
+            panic!("unexpected type")
+        };
+        let span = info_span!("lookup", directory.digest = %parent_digest);
+        let _enter = span.enter();
+
+        // in the children, find the one with the desired name.
+        if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) {
+            // lookup the child [InodeData] in [self.inode_tracker].
+            // We know the inodes for children have already been allocated.
+            let child_inode_data = inode_tracker.get(*child_ino).unwrap();
+
+            // Reply with the file attributes for the child.
+            // For child directories, we still have all data we need to reply.
+            Ok(fuse_backend_rs::api::filesystem::Entry {
+                inode: *child_ino,
+                attr: gen_file_attr(&child_inode_data, *child_ino).into(),
+                attr_timeout: Duration::MAX,
+                entry_timeout: Duration::MAX,
+                ..Default::default()
+            })
+        } else {
+            // Child not found, return ENOENT.
+            Err(io::Error::from_raw_os_error(libc::ENOENT))
         }
     }
 
     // TODO: readdirplus?
 
-    #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset))]
+    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))]
     fn readdir(
-        &mut self,
-        _req: &Request<'_>,
-        ino: u64,
-        _fh: u64,
-        offset: i64,
-        mut reply: fuser::ReplyDirectory,
-    ) {
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _handle: Self::Handle,
+        _size: u32,
+        offset: u64,
+        add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>,
+    ) -> io::Result<()> {
         debug!("readdir");
 
-        if ino == fuser::FUSE_ROOT_ID {
+        if inode == ROOT_ID {
             if !self.list_root {
-                reply.error(libc::EPERM); // same error code as ipfs/kubo
-                return;
+                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
             } else {
                 for (i, path_info) in self
                     .path_info_service
@@ -334,8 +380,7 @@ impl fuser::Filesystem for FUSE {
                     let path_info = match path_info {
                         Err(e) => {
                             warn!("failed to retrieve pathinfo: {}", e);
-                            reply.error(libc::EPERM);
-                            return;
+                            return Err(io::Error::from_raw_os_error(libc::EPERM));
                         }
                         Ok(path_info) => path_info,
                     };
@@ -344,41 +389,51 @@ impl fuser::Filesystem for FUSE {
                     let root_node = path_info.node.unwrap().node.unwrap();
                     let store_path = StorePath::from_bytes(root_node.get_name()).unwrap();
 
-                    let ino = match self.store_paths.get(&store_path) {
-                        Some(ino) => *ino,
+                    let ino = {
+                        // This extra scope makes sure we drop the read lock
+                        // immediately after reading, to prevent deadlocks.
+                        let store_paths = self.store_paths.read();
+                        store_paths.get(&store_path).cloned()
+                    };
+                    let ino = match ino {
+                        Some(ino) => ino,
                         None => {
                             // insert the (sparse) inode data and register in
                             // self.store_paths.
-                            let ino = self.inode_tracker.put((&root_node).into());
-                            self.store_paths.insert(store_path.clone(), ino);
+                            let ino = self.inode_tracker.write().put((&root_node).into());
+                            self.store_paths.write().insert(store_path.clone(), ino);
                             ino
                         }
                     };
 
                     let ty = match root_node {
-                        Node::Directory(_) => fuser::FileType::Directory,
-                        Node::File(_) => fuser::FileType::RegularFile,
-                        Node::Symlink(_) => fuser::FileType::Symlink,
+                        Node::Directory(_) => libc::S_IFDIR,
+                        Node::File(_) => libc::S_IFREG,
+                        Node::Symlink(_) => libc::S_IFLNK,
                     };
 
-                    let full =
-                        reply.add(ino, offset + i as i64 + 1_i64, ty, store_path.to_string());
-                    if full {
+                    let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
+                        ino,
+                        offset: offset + i as u64 + 1,
+                        type_: ty,
+                        name: store_path.to_string().as_bytes(),
+                    })?;
+                    // If the buffer is full, add_entry will return `Ok(0)`.
+                    if written == 0 {
                         break;
                     }
                 }
-                reply.ok();
-                return;
+                return Ok(());
             }
         }
 
         // lookup the inode data.
-        let dir_inode_data = self.inode_tracker.get(ino).unwrap();
+        let mut inode_tracker = self.inode_tracker.write();
+        let dir_inode_data = inode_tracker.get(inode).unwrap();
         let dir_inode_data = match *dir_inode_data {
             InodeData::Regular(..) | InodeData::Symlink(..) => {
                 warn!("Not a directory");
-                reply.error(libc::ENOTDIR);
-                return;
+                return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
             }
             InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => {
                 match self.fetch_directory_inode_data(directory_digest) {
@@ -387,12 +442,11 @@ impl fuser::Filesystem for FUSE {
                         // FUTUREWORK: change put to return the data after
                         // inserting, so we don't need to lookup a second
                         // time?
-                        let ino = self.inode_tracker.put(new_data);
-                        self.inode_tracker.get(ino).unwrap()
+                        let ino = inode_tracker.put(new_data.clone());
+                        inode_tracker.get(ino).unwrap()
                     }
                     Err(_e) => {
-                        reply.error(libc::EIO);
-                        return;
+                        return Err(io::Error::from_raw_os_error(libc::EIO));
                     }
                 }
             }
@@ -405,42 +459,49 @@ impl fuser::Filesystem for FUSE {
         {
             for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() {
                 // the second parameter will become the "offset" parameter on the next call.
-                let full = reply.add(
-                    *ino,
-                    offset + i as i64 + 1_i64,
-                    match child_node {
-                        Node::Directory(_) => fuser::FileType::Directory,
-                        Node::File(_) => fuser::FileType::RegularFile,
-                        Node::Symlink(_) => fuser::FileType::Symlink,
+                let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
+                    ino: *ino,
+                    offset: offset + i as u64 + 1,
+                    type_: match child_node {
+                        Node::Directory(_) => libc::S_IFDIR,
+                        Node::File(_) => libc::S_IFREG,
+                        Node::Symlink(_) => libc::S_IFLNK,
                     },
-                    std::ffi::OsStr::from_bytes(child_node.get_name()),
-                );
-                if full {
+                    name: child_node.get_name(),
+                })?;
+                // If the buffer is full, add_entry will return `Ok(0)`.
+                if written == 0 {
                     break;
                 }
             }
-            reply.ok();
         } else {
             panic!("unexpected type")
         }
-    }
 
-    #[tracing::instrument(skip_all, fields(rq.inode = ino))]
-    fn open(&mut self, _req: &Request<'_>, ino: u64, _flags: i32, reply: fuser::ReplyOpen) {
-        // get a new file handle
-        let fh = self.next_file_handle;
+        Ok(())
+    }
 
-        if ino == fuser::FUSE_ROOT_ID {
-            reply.error(libc::ENOSYS);
-            return;
+    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
+    fn open(
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _flags: u32,
+        _fuse_flags: u32,
+    ) -> io::Result<(
+        Option<Self::Handle>,
+        fuse_backend_rs::api::filesystem::OpenOptions,
+    )> {
+        if inode == ROOT_ID {
+            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
         }
 
         // lookup the inode
-        match *self.inode_tracker.get(ino).unwrap() {
+        match *self.inode_tracker.read().get(inode).unwrap() {
             // read is invalid on non-files.
             InodeData::Directory(..) | InodeData::Symlink(_) => {
                 warn!("is directory");
-                reply.error(libc::EISDIR);
+                return Err(io::Error::from_raw_os_error(libc::EISDIR));
             }
             InodeData::Regular(ref blob_digest, _blob_size, _) => {
                 let span = info_span!("read", blob.digest = %blob_digest);
@@ -458,79 +519,87 @@ impl fuser::Filesystem for FUSE {
                 match blob_reader {
                     Ok(None) => {
                         warn!("blob not found");
-                        reply.error(libc::EIO);
+                        return Err(io::Error::from_raw_os_error(libc::EIO));
                     }
                     Err(e) => {
                         warn!(e=?e, "error opening blob");
-                        reply.error(libc::EIO);
+                        return Err(io::Error::from_raw_os_error(libc::EIO));
                     }
                     Ok(Some(blob_reader)) => {
-                        debug!("add file handle {}", fh);
-                        self.file_handles.insert(fh, blob_reader);
-                        reply.opened(fh, 0);
-
+                        // get a new file handle
                         // TODO: this will overflow after 2**64 operations,
                         // which is fine for now.
                         // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
                         // for the discussion on alternatives.
-                        self.next_file_handle += 1;
+                        let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst);
+
+                        debug!("add file handle {}", fh);
+                        self.file_handles
+                            .write()
+                            .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader)));
+
+                        Ok((
+                            Some(fh),
+                            fuse_backend_rs::api::filesystem::OpenOptions::empty(),
+                        ))
                     }
                 }
             }
         }
     }
 
-    #[tracing::instrument(skip_all, fields(rq.inode = ino, fh = fh))]
+    #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))]
     fn release(
-        &mut self,
-        _req: &Request<'_>,
-        ino: u64,
-        fh: u64,
-        _flags: i32,
-        _lock_owner: Option<u64>,
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _flags: u32,
+        handle: Self::Handle,
         _flush: bool,
-        reply: fuser::ReplyEmpty,
-    ) {
+        _flock_release: bool,
+        _lock_owner: Option<u64>,
+    ) -> io::Result<()> {
         // remove and get ownership on the blob reader
-        match self.file_handles.remove(&fh) {
+        match self.file_handles.write().remove(&handle) {
             // drop it, which will close it.
             Some(blob_reader) => drop(blob_reader),
             None => {
                 // These might already be dropped if a read error occured.
-                debug!("file_handle {} not found", fh);
+                debug!("file_handle {} not found", handle);
             }
         }
 
-        reply.ok();
+        Ok(())
     }
 
-    #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset, rq.size = size))]
+    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))]
     fn read(
-        &mut self,
-        _req: &Request<'_>,
-        ino: u64,
-        fh: u64,
-        offset: i64,
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        handle: Self::Handle,
+        w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter,
         size: u32,
-        _flags: i32,
+        offset: u64,
         _lock_owner: Option<u64>,
-        reply: fuser::ReplyData,
-    ) {
+        _flags: u32,
+    ) -> io::Result<usize> {
         debug!("read");
 
         // We need to take out the blob reader from self.file_handles, so we can
         // interact with it in the separate task.
         // On success, we pass it back out of the task, so we can put it back in self.file_handles.
-        let mut blob_reader = match self.file_handles.remove(&fh) {
-            Some(blob_reader) => blob_reader,
+        let blob_reader = match self.file_handles.read().get(&handle) {
+            Some(blob_reader) => blob_reader.clone(),
             None => {
-                warn!("file handle {} unknown", fh);
-                reply.error(libc::EIO);
-                return;
+                warn!("file handle {} unknown", handle);
+                return Err(io::Error::from_raw_os_error(libc::EIO));
             }
         };
 
         let task = self.tokio_handle.spawn(async move {
+            let mut blob_reader = blob_reader.lock().await;
+
             // seek to the offset specified, which is relative to the start of the file.
             let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await;
 
@@ -540,68 +609,163 @@ impl fuser::Filesystem for FUSE {
                 }
                 Err(e) => {
                     warn!("failed to seek to offset {}: {}", offset, e);
-                    return Err(libc::EIO);
+                    return Err(io::Error::from_raw_os_error(libc::EIO));
                 }
             }
 
-            // As written in the fuser docs, read should send exactly the number
+            // As written in the fuse docs, read should send exactly the number
             // of bytes requested except on EOF or error.
 
             let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
 
             while (buf.len() as u64) < size as u64 {
-                match blob_reader.fill_buf().await {
-                    Ok(int_buf) => {
-                        // copy things from the internal buffer into buf to fill it till up until size
+                let int_buf = blob_reader.fill_buf().await?;
+                // copy things from the internal buffer into buf to fill it till up until size
 
-                        // an empty buffer signals we reached EOF.
-                        if int_buf.is_empty() {
-                            break;
-                        }
+                // an empty buffer signals we reached EOF.
+                if int_buf.is_empty() {
+                    break;
+                }
 
-                        // calculate how many bytes we can read from int_buf.
-                        // It's either all of int_buf, or the number of bytes missing in buf to reach size.
-                        let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len());
+                // calculate how many bytes we can read from int_buf.
+                // It's either all of int_buf, or the number of bytes missing in buf to reach size.
+                let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len());
 
-                        // copy these bytes into our buffer
-                        buf.extend_from_slice(&int_buf[..len_to_copy]);
-                        // and consume them in the buffered reader.
-                        blob_reader.consume(len_to_copy);
-                    }
-                    Err(e) => return Err(e.raw_os_error().unwrap()),
-                }
+                // copy these bytes into our buffer
+                buf.extend_from_slice(&int_buf[..len_to_copy]);
+                // and consume them in the buffered reader.
+                blob_reader.consume(len_to_copy);
             }
-            Ok((buf, blob_reader))
+
+            Ok(buf)
         });
 
-        let resp = self.tokio_handle.block_on(task).unwrap();
+        let buf = self.tokio_handle.block_on(task).unwrap()?;
 
-        match resp {
-            Err(e) => reply.error(e),
-            Ok((buf, blob_reader)) => {
-                reply.data(&buf);
-                self.file_handles.insert(fh, blob_reader);
-            }
-        }
+        w.write(&buf)
     }
 
-    #[tracing::instrument(skip_all, fields(rq.inode = ino))]
-    fn readlink(&mut self, _req: &Request<'_>, ino: u64, reply: fuser::ReplyData) {
-        if ino == fuser::FUSE_ROOT_ID {
-            reply.error(libc::ENOSYS);
-            return;
+    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
+    fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> {
+        if inode == ROOT_ID {
+            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
         }
 
         // lookup the inode
-        match *self.inode_tracker.get(ino).unwrap() {
+        match *self.inode_tracker.read().get(inode).unwrap() {
             InodeData::Directory(..) | InodeData::Regular(..) => {
-                reply.error(libc::EINVAL);
+                Err(io::Error::from_raw_os_error(libc::EINVAL))
             }
-            InodeData::Symlink(ref target) => reply.data(target),
+            InodeData::Symlink(ref target) => Ok(target.to_vec()),
         }
     }
 }
 
-fn reply_with_entry(reply: fuser::ReplyEntry, file_attr: &FileAttr) {
-    reply.entry(&Duration::MAX, file_attr, 1 /* TODO: generation */);
+struct FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
+    channel: fuse_backend_rs::transport::FuseChannel,
+}
+
+impl<FS> FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    fn start(&mut self) -> io::Result<()> {
+        loop {
+            if let Some((reader, writer)) = self
+                .channel
+                .get_request()
+                .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
+            {
+                if let Err(e) = self
+                    .server
+                    .handle_message(reader, writer.into(), None, None)
+                {
+                    match e {
+                        // This indicates the session has been shut down.
+                        fuse_backend_rs::Error::EncodeMessage(e)
+                            if e.raw_os_error() == Some(libc::EBADFD) =>
+                        {
+                            break;
+                        }
+                        error => {
+                            error!(?error, "failed to handle fuse request");
+                            continue;
+                        }
+                    }
+                }
+            } else {
+                break;
+            }
+        }
+        Ok(())
+    }
+}
+
+pub struct FuseDaemon {
+    session: FuseSession,
+    threads: Vec<thread::JoinHandle<()>>,
+}
+
+impl FuseDaemon {
+    pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error>
+    where
+        FS: FileSystem + Sync + Send + 'static,
+        P: AsRef<Path>,
+    {
+        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
+
+        let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        session.set_allow_other(false);
+        session
+            .mount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+        let mut join_handles = Vec::with_capacity(threads);
+        for _ in 0..threads {
+            let mut server = FuseServer {
+                server: server.clone(),
+                channel: session
+                    .new_channel()
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
+            };
+            let join_handle = thread::Builder::new()
+                .name("fuse_server".to_string())
+                .spawn(move || {
+                    let _ = server.start();
+                })?;
+            join_handles.push(join_handle);
+        }
+
+        Ok(FuseDaemon {
+            session,
+            threads: join_handles,
+        })
+    }
+
+    pub fn unmount(&mut self) -> Result<(), io::Error> {
+        self.session
+            .umount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        for thread in self.threads.drain(..) {
+            thread.join().map_err(|_| {
+                io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
+            })?;
+        }
+
+        Ok(())
+    }
+}
+
+impl Drop for FuseDaemon {
+    fn drop(&mut self) {
+        if let Err(error) = self.unmount() {
+            error!(?error, "failed to unmont fuse filesystem")
+        }
+    }
 }