about summary refs log tree commit diff
path: root/tvix/store/src
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-09-16T15·20-0500
committerConnor Brewster <cbrewster@hey.com>2023-09-20T14·21+0000
commit6b7c936bc50934b45df132f659292e2c45256dea (patch)
tree74b369688e63ed4ff34c9270584ba30b91d7c979 /tvix/store/src
parent237c0eb415c632ec3480d890084c3d815298ac1f (diff)
refactor(tvix/store/fuse): Switch from fuser to fuse-backend-rs r/6621
This switches the FUSE implementation from fuser to fuse-backend-rs.
fuse-backend-rs is designed to work with both FUSE and virtiofs.
Virtiofs support will make it possible to plug the tvix-store into a
microvm and have `/nix/store` access without having to setup FUSE inside
the guest.

Additionally fuse-backend-rs has nice support for running multiple FUSE
threads and has some async support.

The goal of this commit is to mechanically switch over to
fuse-backend-rs with minimal changes. I did have to add some locks here
and there because fuse-backend-rs uses `&self` on all methods whereas
fuser uses `&mut self`. `&self` is required for concurrent access to the
FUSE server, so this makes sense.

We can consider switching to concurrent maps and use some other
techniques to reduce lock contention and critical section size.

Issue: https://b.tvl.fyi/issues/305

Change-Id: Icde5a58c6eef98f8984c1e04e980b756dfb76b47
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9341
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src')
-rw-r--r--tvix/store/src/bin/tvix-store.rs32
-rw-r--r--tvix/store/src/fuse/file_attr.rs55
-rw-r--r--tvix/store/src/fuse/inodes.rs10
-rw-r--r--tvix/store/src/fuse/mod.rs640
-rw-r--r--tvix/store/src/fuse/tests.rs92
-rw-r--r--tvix/store/src/lib.rs2
6 files changed, 489 insertions, 342 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 0da264808efb..f707e3850d4f 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -22,7 +22,7 @@ use tvix_store::proto::GRPCPathInfoServiceWrapper;
 use tvix_store::proto::NamedNode;
 use tvix_store::proto::NarInfo;
 use tvix_store::proto::PathInfo;
-use tvix_store::FUSE;
+use tvix_store::{FuseDaemon, FUSE};
 
 #[cfg(feature = "reflection")]
 use tvix_store::proto::FILE_DESCRIPTOR_SET;
@@ -94,6 +94,10 @@ enum Commands {
         #[arg(long, env, default_value = "grpc+http://[::1]:8000")]
         path_info_service_addr: String,
 
+        /// Number of FUSE threads to spawn.
+        #[arg(long, env, default_value_t = default_threads())]
+        threads: usize,
+
         /// Whether to list elements at the root of the mount point.
         /// This is useful if your PathInfoService doesn't provide an
         /// (exhaustive) listing.
@@ -102,6 +106,13 @@ enum Commands {
     },
 }
 
+#[cfg(feature = "fuse")]
+fn default_threads() -> usize {
+    std::thread::available_parallelism()
+        .map(|threads| threads.into())
+        .unwrap_or(4)
+}
+
 #[tokio::main]
 async fn main() -> Result<(), Box<dyn std::error::Error>> {
     let cli = Cli::parse();
@@ -280,6 +291,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             directory_service_addr,
             path_info_service_addr,
             list_root,
+            threads,
         } => {
             let blob_service = blobservice::from_addr(&blob_service_addr)?;
             let directory_service = directoryservice::from_addr(&directory_service_addr)?;
@@ -289,35 +301,27 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 directory_service.clone(),
             )?;
 
-            let mut fuse_session = tokio::task::spawn_blocking(move || {
+            let mut fuse_daemon = tokio::task::spawn_blocking(move || {
                 let f = FUSE::new(
                     blob_service,
                     directory_service,
                     path_info_service,
                     list_root,
                 );
+                info!("mounting tvix-store on {:?}", &dest);
 
-                fuser::Session::new(f, &dest, &[])
+                FuseDaemon::new(f, &dest, threads)
             })
             .await??;
 
             // grab a handle to unmount the file system, and register a signal
             // handler.
-            let mut fuse_unmounter = fuse_session.unmount_callable();
             tokio::spawn(async move {
                 tokio::signal::ctrl_c().await.unwrap();
                 info!("interrupt received, unmounting…");
-                fuse_unmounter.unmount().unwrap();
-            });
-
-            // Start the fuse filesystem and wait for its completion, which
-            // happens when it's unmounted externally, or via the signal handler
-            // task.
-            tokio::task::spawn_blocking(move || -> io::Result<()> {
-                info!("mounting tvix-store on {:?}", fuse_session.mountpoint());
-                fuse_session.run()?;
+                fuse_daemon.unmount()?;
                 info!("unmount occured, terminating…");
-                Ok(())
+                Ok::<_, io::Error>(())
             })
             .await??;
         }
diff --git a/tvix/store/src/fuse/file_attr.rs b/tvix/store/src/fuse/file_attr.rs
index 25cfd28dd1f9..b946aa977a0a 100644
--- a/tvix/store/src/fuse/file_attr.rs
+++ b/tvix/store/src/fuse/file_attr.rs
@@ -1,20 +1,19 @@
-use std::time::SystemTime;
-
 use super::inodes::{DirectoryInodeData, InodeData};
-use fuser::FileAttr;
+use fuse_backend_rs::abi::fuse_abi::Attr;
 
-/// The [FileAttr] describing the root
-pub const ROOT_FILE_ATTR: FileAttr = FileAttr {
-    ino: fuser::FUSE_ROOT_ID,
+/// The [Attr] describing the root
+pub const ROOT_FILE_ATTR: Attr = Attr {
+    ino: fuse_backend_rs::api::filesystem::ROOT_ID,
     size: 0,
     blksize: 1024,
     blocks: 0,
-    atime: SystemTime::UNIX_EPOCH,
-    mtime: SystemTime::UNIX_EPOCH,
-    ctime: SystemTime::UNIX_EPOCH,
-    crtime: SystemTime::UNIX_EPOCH,
-    kind: fuser::FileType::Directory,
-    perm: 0o555,
+    mode: libc::S_IFDIR | 0o555,
+    atime: 0,
+    mtime: 0,
+    ctime: 0,
+    atimensec: 0,
+    mtimensec: 0,
+    ctimensec: 0,
     nlink: 0,
     uid: 0,
     gid: 0,
@@ -22,10 +21,12 @@ pub const ROOT_FILE_ATTR: FileAttr = FileAttr {
     flags: 0,
 };
 
-/// for given &Node and inode, construct a [FileAttr]
-pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> FileAttr {
-    FileAttr {
+/// for given &Node and inode, construct an [Attr]
+pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> Attr {
+    Attr {
         ino: inode,
+        // FUTUREWORK: play with this numbers, as it affects read sizes for client applications.
+        blocks: 1024,
         size: match inode_data {
             InodeData::Regular(_, size, _) => *size as u64,
             InodeData::Symlink(target) => target.len() as u64,
@@ -34,24 +35,12 @@ pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> FileAttr {
                 children.len() as u64
             }
         },
-        // FUTUREWORK: play with this numbers, as it affects read sizes for client applications.
-        blksize: 1024,
-        blocks: 0,
-        atime: SystemTime::UNIX_EPOCH,
-        mtime: SystemTime::UNIX_EPOCH,
-        ctime: SystemTime::UNIX_EPOCH,
-        crtime: SystemTime::UNIX_EPOCH,
-        kind: inode_data.into(),
-        perm: match inode_data {
-            InodeData::Regular(_, _, false) => 0o444, // no-executable files
-            InodeData::Regular(_, _, true) => 0o555,  // executable files
-            InodeData::Symlink(_) => 0o444,
-            InodeData::Directory(..) => 0o555,
+        mode: match inode_data {
+            InodeData::Regular(_, _, false) => libc::S_IFREG | 0o444, // no-executable files
+            InodeData::Regular(_, _, true) => libc::S_IFREG | 0o555,  // executable files
+            InodeData::Symlink(_) => libc::S_IFLNK | 0o444,
+            InodeData::Directory(_) => libc::S_IFDIR | 0o555,
         },
-        nlink: 0,
-        uid: 0,
-        gid: 0,
-        rdev: 0,
-        flags: 0,
+        ..Default::default()
     }
 }
diff --git a/tvix/store/src/fuse/inodes.rs b/tvix/store/src/fuse/inodes.rs
index f44dde7b804f..e8959ce3629b 100644
--- a/tvix/store/src/fuse/inodes.rs
+++ b/tvix/store/src/fuse/inodes.rs
@@ -66,13 +66,3 @@ impl From<proto::Directory> for InodeData {
         InodeData::Directory(DirectoryInodeData::Populated(digest, children))
     }
 }
-
-impl From<&InodeData> for fuser::FileType {
-    fn from(val: &InodeData) -> Self {
-        match val {
-            InodeData::Regular(..) => fuser::FileType::RegularFile,
-            InodeData::Symlink(_) => fuser::FileType::Symlink,
-            InodeData::Directory(..) => fuser::FileType::Directory,
-        }
-    }
-}
diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs
index 978fd50e2634..1a5d884ef7a9 100644
--- a/tvix/store/src/fuse/mod.rs
+++ b/tvix/store/src/fuse/mod.rs
@@ -8,25 +8,34 @@ mod tests;
 use crate::{
     blobservice::{BlobReader, BlobService},
     directoryservice::DirectoryService,
-    fuse::{
-        file_attr::gen_file_attr,
-        inodes::{DirectoryInodeData, InodeData},
-    },
+    fuse::inodes::{DirectoryInodeData, InodeData},
     pathinfoservice::PathInfoService,
     proto::{node::Node, NamedNode},
     B3Digest, Error,
 };
-use fuser::{FileAttr, ReplyAttr, Request};
+use fuse_backend_rs::{
+    api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID},
+    transport::FuseSession,
+};
 use nix_compat::store_path::StorePath;
-use std::io;
-use std::os::unix::ffi::OsStrExt;
-use std::str::FromStr;
-use std::sync::Arc;
-use std::{collections::HashMap, time::Duration};
+use parking_lot::RwLock;
+use std::{
+    collections::HashMap,
+    io,
+    path::Path,
+    str::FromStr,
+    sync::atomic::AtomicU64,
+    sync::{atomic::Ordering, Arc},
+    thread,
+    time::Duration,
+};
 use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
-use tracing::{debug, info_span, warn};
+use tracing::{debug, error, info_span, warn};
 
-use self::inode_tracker::InodeTracker;
+use self::{
+    file_attr::{gen_file_attr, ROOT_FILE_ATTR},
+    inode_tracker::InodeTracker,
+};
 
 /// This implements a read-only FUSE filesystem for a tvix-store
 /// with the passed [BlobService], [DirectoryService] and [PathInfoService].
@@ -71,15 +80,15 @@ pub struct FUSE {
     list_root: bool,
 
     /// This maps a given StorePath to the inode we allocated for the root inode.
-    store_paths: HashMap<StorePath, u64>,
+    store_paths: RwLock<HashMap<StorePath, u64>>,
 
     /// This keeps track of inodes and data alongside them.
-    inode_tracker: InodeTracker,
+    inode_tracker: RwLock<InodeTracker>,
 
     /// This holds all open file handles
-    file_handles: HashMap<u64, Box<dyn BlobReader>>,
+    file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>,
 
-    next_file_handle: u64,
+    next_file_handle: AtomicU64,
 
     tokio_handle: tokio::runtime::Handle,
 }
@@ -98,11 +107,11 @@ impl FUSE {
 
             list_root,
 
-            store_paths: HashMap::default(),
-            inode_tracker: Default::default(),
+            store_paths: RwLock::new(HashMap::default()),
+            inode_tracker: RwLock::new(Default::default()),
 
-            file_handles: Default::default(),
-            next_file_handle: 1,
+            file_handles: RwLock::new(Default::default()),
+            next_file_handle: AtomicU64::new(1),
             tokio_handle: tokio::runtime::Handle::current(),
         }
     }
@@ -114,11 +123,11 @@ impl FUSE {
     /// or otherwise fetch from [self.path_info_service], and then insert into
     /// [self.inode_tracker].
     fn name_in_root_to_ino_and_data(
-        &mut self,
-        name: &std::ffi::OsStr,
+        &self,
+        name: &std::ffi::CStr,
     ) -> Result<Option<(u64, Arc<InodeData>)>, Error> {
         // parse the name into a [StorePath].
-        let store_path = if let Some(name) = name.to_str() {
+        let store_path = if let Some(name) = name.to_str().ok() {
             match StorePath::from_str(name) {
                 Ok(store_path) => store_path,
                 Err(e) => {
@@ -129,19 +138,26 @@ impl FUSE {
                 }
             }
         } else {
-            debug!("{name:?} is no string");
+            debug!("{name:?} is not a valid utf-8 string");
             // same here.
             return Ok(None);
         };
 
-        if let Some(ino) = self.store_paths.get(&store_path) {
+        let ino = {
+            // This extra scope makes sure we drop the read lock
+            // immediately after reading, to prevent deadlocks.
+            let store_paths = self.store_paths.read();
+            store_paths.get(&store_path).cloned()
+        };
+
+        if let Some(ino) = ino {
             // If we already have that store path, lookup the inode from
             // self.store_paths and then get the data from [self.inode_tracker],
             // which in the case of a [InodeData::Directory] will be fully
             // populated.
             Ok(Some((
-                *ino,
-                self.inode_tracker.get(*ino).expect("must exist"),
+                ino,
+                self.inode_tracker.read().get(ino).expect("must exist"),
             )))
         } else {
             // If we don't have it, look it up in PathInfoService.
@@ -157,15 +173,29 @@ impl FUSE {
                         return Ok(None);
                     }
 
+                    // Let's check if someone else beat us to updating the inode tracker and
+                    // store_paths map.
+                    let mut store_paths = self.store_paths.write();
+                    if let Some(ino) = store_paths.get(&store_path).cloned() {
+                        return Ok(Some((
+                            ino,
+                            self.inode_tracker.read().get(ino).expect("must exist"),
+                        )));
+                    }
+
                     // insert the (sparse) inode data and register in
                     // self.store_paths.
                     // FUTUREWORK: change put to return the data after
                     // inserting, so we don't need to lookup a second
                     // time?
-                    let ino = self.inode_tracker.put((&root_node).into());
-                    self.store_paths.insert(store_path, ino);
+                    let (ino, inode) = {
+                        let mut inode_tracker = self.inode_tracker.write();
+                        let ino = inode_tracker.put((&root_node).into());
+                        (ino, inode_tracker.get(ino).unwrap())
+                    };
+                    store_paths.insert(store_path, ino);
 
-                    Ok(Some((ino, self.inode_tracker.get(ino).unwrap())))
+                    Ok(Some((ino, inode)))
                 }
             }
         }
@@ -194,136 +224,152 @@ impl FUSE {
     }
 }
 
-impl fuser::Filesystem for FUSE {
-    #[tracing::instrument(skip_all, fields(rq.inode = ino))]
-    fn getattr(&mut self, _req: &Request, ino: u64, reply: ReplyAttr) {
-        debug!("getattr");
+impl FileSystem for FUSE {
+    type Inode = u64;
+    type Handle = u64;
+
+    fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> {
+        Ok(FsOptions::empty())
+    }
 
-        if ino == fuser::FUSE_ROOT_ID {
-            reply.attr(&Duration::MAX, &file_attr::ROOT_FILE_ATTR);
-            return;
+    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
+    fn getattr(
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _handle: Option<Self::Handle>,
+    ) -> io::Result<(libc::stat64, Duration)> {
+        if inode == ROOT_ID {
+            return Ok((ROOT_FILE_ATTR.into(), Duration::MAX));
         }
 
-        match self.inode_tracker.get(ino) {
-            None => reply.error(libc::ENOENT),
+        match self.inode_tracker.read().get(inode) {
+            None => return Err(io::Error::from_raw_os_error(libc::ENOENT)),
             Some(node) => {
                 debug!(node = ?node, "found node");
-                reply.attr(&Duration::MAX, &file_attr::gen_file_attr(&node, ino));
+                Ok((gen_file_attr(&node, inode).into(), Duration::MAX))
             }
         }
     }
 
-    #[tracing::instrument(skip_all, fields(rq.parent_inode = parent_ino, rq.name = ?name))]
+    #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))]
     fn lookup(
-        &mut self,
-        _req: &Request,
-        parent_ino: u64,
-        name: &std::ffi::OsStr,
-        reply: fuser::ReplyEntry,
-    ) {
+        &self,
+        _ctx: &Context,
+        parent: Self::Inode,
+        name: &std::ffi::CStr,
+    ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> {
         debug!("lookup");
 
         // This goes from a parent inode to a node.
-        // - If the parent is [fuser::FUSE_ROOT_ID], we need to check
+        // - If the parent is [ROOT_ID], we need to check
         //   [self.store_paths] (fetching from PathInfoService if needed)
         // - Otherwise, lookup the parent in [self.inode_tracker] (which must be
         //   a [InodeData::Directory]), and find the child with that name.
-        if parent_ino == fuser::FUSE_ROOT_ID {
-            match self.name_in_root_to_ino_and_data(name) {
+        if parent == ROOT_ID {
+            return match self.name_in_root_to_ino_and_data(name) {
                 Err(e) => {
                     warn!("{}", e);
-                    reply.error(libc::EIO);
-                }
-                Ok(None) => {
-                    reply.error(libc::ENOENT);
+                    Err(io::Error::from_raw_os_error(libc::ENOENT))
                 }
+                Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
                 Ok(Some((ino, inode_data))) => {
                     debug!(inode_data=?&inode_data, ino=ino, "Some");
-                    reply_with_entry(reply, &gen_file_attr(&inode_data, ino));
+                    Ok(fuse_backend_rs::api::filesystem::Entry {
+                        inode: ino,
+                        attr: gen_file_attr(&inode_data, ino).into(),
+                        attr_timeout: Duration::MAX,
+                        entry_timeout: Duration::MAX,
+                        ..Default::default()
+                    })
                 }
+            };
+        }
+
+        // This is the "lookup for "a" inside inode 42.
+        // We already know that inode 42 must be a directory.
+        // It might not be populated yet, so if it isn't, we do (by
+        // fetching from [self.directory_service]), and save the result in
+        // [self.inode_tracker].
+        // Now it for sure is populated, so we search for that name in the
+        // list of children and return the FileAttrs.
+
+        // TODO: Reduce the critical section of this write lock.
+        let mut inode_tracker = self.inode_tracker.write();
+        let parent_data = inode_tracker.get(parent).unwrap();
+        let parent_data = match *parent_data {
+            InodeData::Regular(..) | InodeData::Symlink(_) => {
+                // if the parent inode was not a directory, this doesn't make sense
+                return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
             }
-        } else {
-            // This is the "lookup for "a" inside inode 42.
-            // We already know that inode 42 must be a directory.
-            // It might not be populated yet, so if it isn't, we do (by
-            // fetching from [self.directory_service]), and save the result in
-            // [self.inode_tracker].
-            // Now it for sure is populated, so we search for that name in the
-            // list of children and return the FileAttrs.
-
-            let parent_data = self.inode_tracker.get(parent_ino).unwrap();
-            let parent_data = match *parent_data {
-                InodeData::Regular(..) | InodeData::Symlink(_) => {
-                    // if the parent inode was not a directory, this doesn't make sense
-                    reply.error(libc::ENOTDIR);
-                    return;
-                }
-                InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
-                    match self.fetch_directory_inode_data(parent_digest) {
-                        Ok(new_data) => {
-                            // update data in [self.inode_tracker] with populated variant.
-                            // FUTUREWORK: change put to return the data after
-                            // inserting, so we don't need to lookup a second
-                            // time?
-                            let ino = self.inode_tracker.put(new_data);
-                            self.inode_tracker.get(ino).unwrap()
-                        }
-                        Err(_e) => {
-                            reply.error(libc::EIO);
-                            return;
-                        }
+            InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
+                match self.fetch_directory_inode_data(parent_digest) {
+                    Ok(new_data) => {
+                        // update data in [self.inode_tracker] with populated variant.
+                        // FUTUREWORK: change put to return the data after
+                        // inserting, so we don't need to lookup a second
+                        // time?
+                        let ino = inode_tracker.put(new_data);
+                        inode_tracker.get(ino).unwrap()
+                    }
+                    Err(_e) => {
+                        return Err(io::Error::from_raw_os_error(libc::EIO));
                     }
                 }
-                InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data,
-            };
-
-            // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))].
-            let (parent_digest, children) = if let InodeData::Directory(
-                DirectoryInodeData::Populated(ref parent_digest, ref children),
-            ) = *parent_data
-            {
-                (parent_digest, children)
-            } else {
-                panic!("unexpected type")
-            };
-            let span = info_span!("lookup", directory.digest = %parent_digest);
-            let _enter = span.enter();
-
-            // in the children, find the one with the desired name.
-            if let Some((child_ino, _)) =
-                children.iter().find(|e| e.1.get_name() == name.as_bytes())
-            {
-                // lookup the child [InodeData] in [self.inode_tracker].
-                // We know the inodes for children have already been allocated.
-                let child_inode_data = self.inode_tracker.get(*child_ino).unwrap();
-
-                // Reply with the file attributes for the child.
-                // For child directories, we still have all data we need to reply.
-                reply_with_entry(reply, &gen_file_attr(&child_inode_data, *child_ino));
-            } else {
-                // Child not found, return ENOENT.
-                reply.error(libc::ENOENT);
             }
+            InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data,
+        };
+
+        // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))].
+        let (parent_digest, children) = if let InodeData::Directory(
+            DirectoryInodeData::Populated(ref parent_digest, ref children),
+        ) = *parent_data
+        {
+            (parent_digest, children)
+        } else {
+            panic!("unexpected type")
+        };
+        let span = info_span!("lookup", directory.digest = %parent_digest);
+        let _enter = span.enter();
+
+        // in the children, find the one with the desired name.
+        if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) {
+            // lookup the child [InodeData] in [self.inode_tracker].
+            // We know the inodes for children have already been allocated.
+            let child_inode_data = inode_tracker.get(*child_ino).unwrap();
+
+            // Reply with the file attributes for the child.
+            // For child directories, we still have all data we need to reply.
+            Ok(fuse_backend_rs::api::filesystem::Entry {
+                inode: *child_ino,
+                attr: gen_file_attr(&child_inode_data, *child_ino).into(),
+                attr_timeout: Duration::MAX,
+                entry_timeout: Duration::MAX,
+                ..Default::default()
+            })
+        } else {
+            // Child not found, return ENOENT.
+            Err(io::Error::from_raw_os_error(libc::ENOENT))
         }
     }
 
     // TODO: readdirplus?
 
-    #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset))]
+    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))]
     fn readdir(
-        &mut self,
-        _req: &Request<'_>,
-        ino: u64,
-        _fh: u64,
-        offset: i64,
-        mut reply: fuser::ReplyDirectory,
-    ) {
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _handle: Self::Handle,
+        _size: u32,
+        offset: u64,
+        add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>,
+    ) -> io::Result<()> {
         debug!("readdir");
 
-        if ino == fuser::FUSE_ROOT_ID {
+        if inode == ROOT_ID {
             if !self.list_root {
-                reply.error(libc::EPERM); // same error code as ipfs/kubo
-                return;
+                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
             } else {
                 for (i, path_info) in self
                     .path_info_service
@@ -334,8 +380,7 @@ impl fuser::Filesystem for FUSE {
                     let path_info = match path_info {
                         Err(e) => {
                             warn!("failed to retrieve pathinfo: {}", e);
-                            reply.error(libc::EPERM);
-                            return;
+                            return Err(io::Error::from_raw_os_error(libc::EPERM));
                         }
                         Ok(path_info) => path_info,
                     };
@@ -344,41 +389,51 @@ impl fuser::Filesystem for FUSE {
                     let root_node = path_info.node.unwrap().node.unwrap();
                     let store_path = StorePath::from_bytes(root_node.get_name()).unwrap();
 
-                    let ino = match self.store_paths.get(&store_path) {
-                        Some(ino) => *ino,
+                    let ino = {
+                        // This extra scope makes sure we drop the read lock
+                        // immediately after reading, to prevent deadlocks.
+                        let store_paths = self.store_paths.read();
+                        store_paths.get(&store_path).cloned()
+                    };
+                    let ino = match ino {
+                        Some(ino) => ino,
                         None => {
                             // insert the (sparse) inode data and register in
                             // self.store_paths.
-                            let ino = self.inode_tracker.put((&root_node).into());
-                            self.store_paths.insert(store_path.clone(), ino);
+                            let ino = self.inode_tracker.write().put((&root_node).into());
+                            self.store_paths.write().insert(store_path.clone(), ino);
                             ino
                         }
                     };
 
                     let ty = match root_node {
-                        Node::Directory(_) => fuser::FileType::Directory,
-                        Node::File(_) => fuser::FileType::RegularFile,
-                        Node::Symlink(_) => fuser::FileType::Symlink,
+                        Node::Directory(_) => libc::S_IFDIR,
+                        Node::File(_) => libc::S_IFREG,
+                        Node::Symlink(_) => libc::S_IFLNK,
                     };
 
-                    let full =
-                        reply.add(ino, offset + i as i64 + 1_i64, ty, store_path.to_string());
-                    if full {
+                    let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
+                        ino,
+                        offset: offset + i as u64 + 1,
+                        type_: ty,
+                        name: store_path.to_string().as_bytes(),
+                    })?;
+                    // If the buffer is full, add_entry will return `Ok(0)`.
+                    if written == 0 {
                         break;
                     }
                 }
-                reply.ok();
-                return;
+                return Ok(());
             }
         }
 
         // lookup the inode data.
-        let dir_inode_data = self.inode_tracker.get(ino).unwrap();
+        let mut inode_tracker = self.inode_tracker.write();
+        let dir_inode_data = inode_tracker.get(inode).unwrap();
         let dir_inode_data = match *dir_inode_data {
             InodeData::Regular(..) | InodeData::Symlink(..) => {
                 warn!("Not a directory");
-                reply.error(libc::ENOTDIR);
-                return;
+                return Err(io::Error::from_raw_os_error(libc::ENOTDIR));
             }
             InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => {
                 match self.fetch_directory_inode_data(directory_digest) {
@@ -387,12 +442,11 @@ impl fuser::Filesystem for FUSE {
                         // FUTUREWORK: change put to return the data after
                         // inserting, so we don't need to lookup a second
                         // time?
-                        let ino = self.inode_tracker.put(new_data);
-                        self.inode_tracker.get(ino).unwrap()
+                        let ino = inode_tracker.put(new_data.clone());
+                        inode_tracker.get(ino).unwrap()
                     }
                     Err(_e) => {
-                        reply.error(libc::EIO);
-                        return;
+                        return Err(io::Error::from_raw_os_error(libc::EIO));
                     }
                 }
             }
@@ -405,42 +459,49 @@ impl fuser::Filesystem for FUSE {
         {
             for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() {
                 // the second parameter will become the "offset" parameter on the next call.
-                let full = reply.add(
-                    *ino,
-                    offset + i as i64 + 1_i64,
-                    match child_node {
-                        Node::Directory(_) => fuser::FileType::Directory,
-                        Node::File(_) => fuser::FileType::RegularFile,
-                        Node::Symlink(_) => fuser::FileType::Symlink,
+                let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
+                    ino: *ino,
+                    offset: offset + i as u64 + 1,
+                    type_: match child_node {
+                        Node::Directory(_) => libc::S_IFDIR,
+                        Node::File(_) => libc::S_IFREG,
+                        Node::Symlink(_) => libc::S_IFLNK,
                     },
-                    std::ffi::OsStr::from_bytes(child_node.get_name()),
-                );
-                if full {
+                    name: child_node.get_name(),
+                })?;
+                // If the buffer is full, add_entry will return `Ok(0)`.
+                if written == 0 {
                     break;
                 }
             }
-            reply.ok();
         } else {
             panic!("unexpected type")
         }
-    }
 
-    #[tracing::instrument(skip_all, fields(rq.inode = ino))]
-    fn open(&mut self, _req: &Request<'_>, ino: u64, _flags: i32, reply: fuser::ReplyOpen) {
-        // get a new file handle
-        let fh = self.next_file_handle;
+        Ok(())
+    }
 
-        if ino == fuser::FUSE_ROOT_ID {
-            reply.error(libc::ENOSYS);
-            return;
+    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
+    fn open(
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _flags: u32,
+        _fuse_flags: u32,
+    ) -> io::Result<(
+        Option<Self::Handle>,
+        fuse_backend_rs::api::filesystem::OpenOptions,
+    )> {
+        if inode == ROOT_ID {
+            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
         }
 
         // lookup the inode
-        match *self.inode_tracker.get(ino).unwrap() {
+        match *self.inode_tracker.read().get(inode).unwrap() {
             // read is invalid on non-files.
             InodeData::Directory(..) | InodeData::Symlink(_) => {
                 warn!("is directory");
-                reply.error(libc::EISDIR);
+                return Err(io::Error::from_raw_os_error(libc::EISDIR));
             }
             InodeData::Regular(ref blob_digest, _blob_size, _) => {
                 let span = info_span!("read", blob.digest = %blob_digest);
@@ -458,79 +519,87 @@ impl fuser::Filesystem for FUSE {
                 match blob_reader {
                     Ok(None) => {
                         warn!("blob not found");
-                        reply.error(libc::EIO);
+                        return Err(io::Error::from_raw_os_error(libc::EIO));
                     }
                     Err(e) => {
                         warn!(e=?e, "error opening blob");
-                        reply.error(libc::EIO);
+                        return Err(io::Error::from_raw_os_error(libc::EIO));
                     }
                     Ok(Some(blob_reader)) => {
-                        debug!("add file handle {}", fh);
-                        self.file_handles.insert(fh, blob_reader);
-                        reply.opened(fh, 0);
-
+                        // get a new file handle
                         // TODO: this will overflow after 2**64 operations,
                         // which is fine for now.
                         // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
                         // for the discussion on alternatives.
-                        self.next_file_handle += 1;
+                        let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst);
+
+                        debug!("add file handle {}", fh);
+                        self.file_handles
+                            .write()
+                            .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader)));
+
+                        Ok((
+                            Some(fh),
+                            fuse_backend_rs::api::filesystem::OpenOptions::empty(),
+                        ))
                     }
                 }
             }
         }
     }
 
-    #[tracing::instrument(skip_all, fields(rq.inode = ino, fh = fh))]
+    #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))]
     fn release(
-        &mut self,
-        _req: &Request<'_>,
-        ino: u64,
-        fh: u64,
-        _flags: i32,
-        _lock_owner: Option<u64>,
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _flags: u32,
+        handle: Self::Handle,
         _flush: bool,
-        reply: fuser::ReplyEmpty,
-    ) {
+        _flock_release: bool,
+        _lock_owner: Option<u64>,
+    ) -> io::Result<()> {
         // remove and get ownership on the blob reader
-        match self.file_handles.remove(&fh) {
+        match self.file_handles.write().remove(&handle) {
             // drop it, which will close it.
             Some(blob_reader) => drop(blob_reader),
             None => {
                 // These might already be dropped if a read error occured.
-                debug!("file_handle {} not found", fh);
+                debug!("file_handle {} not found", handle);
             }
         }
 
-        reply.ok();
+        Ok(())
     }
 
-    #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset, rq.size = size))]
+    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))]
     fn read(
-        &mut self,
-        _req: &Request<'_>,
-        ino: u64,
-        fh: u64,
-        offset: i64,
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        handle: Self::Handle,
+        w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter,
         size: u32,
-        _flags: i32,
+        offset: u64,
         _lock_owner: Option<u64>,
-        reply: fuser::ReplyData,
-    ) {
+        _flags: u32,
+    ) -> io::Result<usize> {
         debug!("read");
 
         // We need to take out the blob reader from self.file_handles, so we can
         // interact with it in the separate task.
         // On success, we pass it back out of the task, so we can put it back in self.file_handles.
-        let mut blob_reader = match self.file_handles.remove(&fh) {
-            Some(blob_reader) => blob_reader,
+        let blob_reader = match self.file_handles.read().get(&handle) {
+            Some(blob_reader) => blob_reader.clone(),
             None => {
-                warn!("file handle {} unknown", fh);
-                reply.error(libc::EIO);
-                return;
+                warn!("file handle {} unknown", handle);
+                return Err(io::Error::from_raw_os_error(libc::EIO));
             }
         };
 
         let task = self.tokio_handle.spawn(async move {
+            let mut blob_reader = blob_reader.lock().await;
+
             // seek to the offset specified, which is relative to the start of the file.
             let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await;
 
@@ -540,68 +609,163 @@ impl fuser::Filesystem for FUSE {
                 }
                 Err(e) => {
                     warn!("failed to seek to offset {}: {}", offset, e);
-                    return Err(libc::EIO);
+                    return Err(io::Error::from_raw_os_error(libc::EIO));
                 }
             }
 
-            // As written in the fuser docs, read should send exactly the number
+            // As written in the fuse docs, read should send exactly the number
             // of bytes requested except on EOF or error.
 
             let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
 
             while (buf.len() as u64) < size as u64 {
-                match blob_reader.fill_buf().await {
-                    Ok(int_buf) => {
-                        // copy things from the internal buffer into buf to fill it till up until size
+                let int_buf = blob_reader.fill_buf().await?;
+                // copy things from the internal buffer into buf to fill it till up until size
 
-                        // an empty buffer signals we reached EOF.
-                        if int_buf.is_empty() {
-                            break;
-                        }
+                // an empty buffer signals we reached EOF.
+                if int_buf.is_empty() {
+                    break;
+                }
 
-                        // calculate how many bytes we can read from int_buf.
-                        // It's either all of int_buf, or the number of bytes missing in buf to reach size.
-                        let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len());
+                // calculate how many bytes we can read from int_buf.
+                // It's either all of int_buf, or the number of bytes missing in buf to reach size.
+                let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len());
 
-                        // copy these bytes into our buffer
-                        buf.extend_from_slice(&int_buf[..len_to_copy]);
-                        // and consume them in the buffered reader.
-                        blob_reader.consume(len_to_copy);
-                    }
-                    Err(e) => return Err(e.raw_os_error().unwrap()),
-                }
+                // copy these bytes into our buffer
+                buf.extend_from_slice(&int_buf[..len_to_copy]);
+                // and consume them in the buffered reader.
+                blob_reader.consume(len_to_copy);
             }
-            Ok((buf, blob_reader))
+
+            Ok(buf)
         });
 
-        let resp = self.tokio_handle.block_on(task).unwrap();
+        let buf = self.tokio_handle.block_on(task).unwrap()?;
 
-        match resp {
-            Err(e) => reply.error(e),
-            Ok((buf, blob_reader)) => {
-                reply.data(&buf);
-                self.file_handles.insert(fh, blob_reader);
-            }
-        }
+        w.write(&buf)
     }
 
-    #[tracing::instrument(skip_all, fields(rq.inode = ino))]
-    fn readlink(&mut self, _req: &Request<'_>, ino: u64, reply: fuser::ReplyData) {
-        if ino == fuser::FUSE_ROOT_ID {
-            reply.error(libc::ENOSYS);
-            return;
+    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
+    fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> {
+        if inode == ROOT_ID {
+            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
         }
 
         // lookup the inode
-        match *self.inode_tracker.get(ino).unwrap() {
+        match *self.inode_tracker.read().get(inode).unwrap() {
             InodeData::Directory(..) | InodeData::Regular(..) => {
-                reply.error(libc::EINVAL);
+                Err(io::Error::from_raw_os_error(libc::EINVAL))
             }
-            InodeData::Symlink(ref target) => reply.data(target),
+            InodeData::Symlink(ref target) => Ok(target.to_vec()),
         }
     }
 }
 
-fn reply_with_entry(reply: fuser::ReplyEntry, file_attr: &FileAttr) {
-    reply.entry(&Duration::MAX, file_attr, 1 /* TODO: generation */);
+struct FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
+    channel: fuse_backend_rs::transport::FuseChannel,
+}
+
+impl<FS> FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    fn start(&mut self) -> io::Result<()> {
+        loop {
+            if let Some((reader, writer)) = self
+                .channel
+                .get_request()
+                .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
+            {
+                if let Err(e) = self
+                    .server
+                    .handle_message(reader, writer.into(), None, None)
+                {
+                    match e {
+                        // This indicates the session has been shut down.
+                        fuse_backend_rs::Error::EncodeMessage(e)
+                            if e.raw_os_error() == Some(libc::EBADFD) =>
+                        {
+                            break;
+                        }
+                        error => {
+                            error!(?error, "failed to handle fuse request");
+                            continue;
+                        }
+                    }
+                }
+            } else {
+                break;
+            }
+        }
+        Ok(())
+    }
+}
+
+pub struct FuseDaemon {
+    session: FuseSession,
+    threads: Vec<thread::JoinHandle<()>>,
+}
+
+impl FuseDaemon {
+    pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error>
+    where
+        FS: FileSystem + Sync + Send + 'static,
+        P: AsRef<Path>,
+    {
+        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
+
+        let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        session.set_allow_other(false);
+        session
+            .mount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+        let mut join_handles = Vec::with_capacity(threads);
+        for _ in 0..threads {
+            let mut server = FuseServer {
+                server: server.clone(),
+                channel: session
+                    .new_channel()
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
+            };
+            let join_handle = thread::Builder::new()
+                .name("fuse_server".to_string())
+                .spawn(move || {
+                    let _ = server.start();
+                })?;
+            join_handles.push(join_handle);
+        }
+
+        Ok(FuseDaemon {
+            session,
+            threads: join_handles,
+        })
+    }
+
+    pub fn unmount(&mut self) -> Result<(), io::Error> {
+        self.session
+            .umount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        for thread in self.threads.drain(..) {
+            thread.join().map_err(|_| {
+                io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
+            })?;
+        }
+
+        Ok(())
+    }
+}
+
+impl Drop for FuseDaemon {
+    fn drop(&mut self) {
+        if let Err(error) = self.unmount() {
+            error!(?error, "failed to unmont fuse filesystem")
+        }
+    }
 }
diff --git a/tvix/store/src/fuse/tests.rs b/tvix/store/src/fuse/tests.rs
index 015e27ee9988..81de8b13de58 100644
--- a/tvix/store/src/fuse/tests.rs
+++ b/tvix/store/src/fuse/tests.rs
@@ -12,7 +12,7 @@ use crate::pathinfoservice::PathInfoService;
 use crate::proto::{DirectoryNode, FileNode, PathInfo};
 use crate::tests::fixtures;
 use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service};
-use crate::{proto, FUSE};
+use crate::{proto, FuseDaemon, FUSE};
 
 const BLOB_A_NAME: &str = "00000000000000000000000000000000-test";
 const BLOB_B_NAME: &str = "55555555555555555555555555555555-test";
@@ -40,14 +40,14 @@ fn do_mount<P: AsRef<Path>>(
     path_info_service: Arc<dyn PathInfoService>,
     mountpoint: P,
     list_root: bool,
-) -> io::Result<fuser::BackgroundSession> {
+) -> io::Result<FuseDaemon> {
     let fs = FUSE::new(
         blob_service,
         directory_service,
         path_info_service,
         list_root,
     );
-    fuser::spawn_mount2(fs, mountpoint, &[])
+    FuseDaemon::new(fs, mountpoint.as_ref(), 4)
 }
 
 async fn populate_blob_a(
@@ -294,7 +294,7 @@ async fn mount() {
     let tmpdir = TempDir::new().unwrap();
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -303,7 +303,7 @@ async fn mount() {
     )
     .expect("must succeed");
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Ensure listing the root isn't allowed
@@ -317,7 +317,7 @@ async fn root() {
     let tmpdir = TempDir::new().unwrap();
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -334,7 +334,7 @@ async fn root() {
         assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind());
     }
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Ensure listing the root is allowed if configured explicitly
@@ -350,7 +350,7 @@ async fn root_with_listing() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -371,7 +371,7 @@ async fn root_with_listing() {
         assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len());
     }
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Ensure we can stat a file at the root
@@ -387,7 +387,7 @@ async fn stat_file_at_root() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -405,7 +405,7 @@ async fn stat_file_at_root() {
     assert!(metadata.permissions().readonly());
     assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len());
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Ensure we can read a file at the root
@@ -421,7 +421,7 @@ async fn read_file_at_root() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -439,7 +439,7 @@ async fn read_file_at_root() {
     assert_eq!(fixtures::BLOB_A.len(), data.len());
     assert_eq!(fixtures::BLOB_A.to_vec(), data);
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Ensure we can read a large file at the root
@@ -455,7 +455,7 @@ async fn read_large_file_at_root() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_blob_b(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -481,7 +481,7 @@ async fn read_large_file_at_root() {
     assert_eq!(fixtures::BLOB_B.len(), data.len());
     assert_eq!(fixtures::BLOB_B.to_vec(), data);
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Read the target of a symlink
@@ -497,7 +497,7 @@ async fn symlink_readlink() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_symlink(&blob_service, &directory_service, &path_info_service);
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -524,7 +524,7 @@ async fn symlink_readlink() {
     let e = fs::read(p).expect_err("must fail");
     assert_eq!(std::io::ErrorKind::NotFound, e.kind());
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Read and stat a regular file through a symlink pointing to it.
@@ -541,7 +541,7 @@ async fn read_stat_through_symlink() {
     populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
     populate_symlink(&blob_service, &directory_service, &path_info_service);
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -567,7 +567,7 @@ async fn read_stat_through_symlink() {
         std::fs::read(p_symlink).expect("must succeed"),
     );
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Read a directory in the root, and validate some attributes.
@@ -583,7 +583,7 @@ async fn read_stat_directory() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -599,7 +599,7 @@ async fn read_stat_directory() {
     assert!(metadata.is_dir());
     assert!(metadata.permissions().readonly());
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -615,7 +615,7 @@ async fn read_blob_inside_dir() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -635,7 +635,7 @@ async fn read_blob_inside_dir() {
     let data = fs::read(&p).expect("must succeed");
     assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data);
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -652,7 +652,7 @@ async fn read_blob_deep_inside_dir() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -676,7 +676,7 @@ async fn read_blob_deep_inside_dir() {
     let data = fs::read(&p).expect("must succeed");
     assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data);
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Ensure readdir works.
@@ -692,7 +692,7 @@ async fn readdir() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -732,7 +732,7 @@ async fn readdir() {
         assert!(e.file_type().expect("must succeed").is_dir());
     }
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 #[tokio::test]
@@ -748,7 +748,7 @@ async fn readdir_deep() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -775,7 +775,7 @@ async fn readdir_deep() {
         assert_eq!(0, e.metadata().expect("must succeed").len());
     }
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Check attributes match how they show up in /nix/store normally.
@@ -794,7 +794,7 @@ async fn check_attributes() {
     populate_symlink(&blob_service, &directory_service, &path_info_service);
     populate_helloworld_blob(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -840,7 +840,7 @@ async fn check_attributes() {
         // crtime seems MacOS only
     }
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 #[tokio::test]
@@ -858,7 +858,7 @@ async fn compare_inodes_directories() {
     populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
     populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -876,7 +876,7 @@ async fn compare_inodes_directories() {
         fs::metadata(p_sibling_dir).expect("must succeed").ino()
     );
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Ensure we allocate the same inodes for the same directory contents.
@@ -893,7 +893,7 @@ async fn compare_inodes_files() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -915,7 +915,7 @@ async fn compare_inodes_files() {
         fs::metadata(p_keep2).expect("must succeed").ino()
     );
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Ensure we allocate the same inode for symlinks pointing to the same targets.
@@ -933,7 +933,7 @@ async fn compare_inodes_symlinks() {
     populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
     populate_symlink2(&blob_service, &directory_service, &path_info_service);
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -951,7 +951,7 @@ async fn compare_inodes_symlinks() {
         fs::symlink_metadata(p2).expect("must succeed").ino()
     );
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Check we match paths exactly.
@@ -967,7 +967,7 @@ async fn read_wrong_paths_in_root() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -1000,7 +1000,7 @@ async fn read_wrong_paths_in_root() {
         .join("00000000000000000000000000000000-tes")
         .exists());
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 /// Make sure writes are not allowed
@@ -1016,7 +1016,7 @@ async fn disallow_writes() {
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -1028,9 +1028,9 @@ async fn disallow_writes() {
     let p = tmpdir.path().join(BLOB_A_NAME);
     let e = std::fs::File::create(p).expect_err("must fail");
 
-    assert_eq!(std::io::ErrorKind::Unsupported, e.kind());
+    assert_eq!(Some(libc::EROFS), e.raw_os_error());
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 #[tokio::test]
@@ -1045,7 +1045,7 @@ async fn missing_directory() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service);
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -1073,7 +1073,7 @@ async fn missing_directory() {
         fs::metadata(p.join(".keep")).expect_err("must fail");
     }
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
 
 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -1088,7 +1088,7 @@ async fn missing_blob() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service);
 
-    let fuser_session = do_mount(
+    let mut fuse_daemon = do_mount(
         blob_service,
         directory_service,
         path_info_service,
@@ -1109,5 +1109,5 @@ async fn missing_blob() {
         fs::read(p).expect_err("must fail");
     }
 
-    fuser_session.join()
+    fuse_daemon.unmount().expect("unmount");
 }
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index 417faa39237b..252506de5977 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -14,7 +14,7 @@ pub use digests::B3Digest;
 pub use errors::Error;
 
 #[cfg(feature = "fuse")]
-pub use fuse::FUSE;
+pub use fuse::{FuseDaemon, FUSE};
 
 #[cfg(test)]
 mod tests;