about summary refs log tree commit diff
path: root/tvix/store/src/fuse
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-09-13T12·20+0200
committerflokli <flokli@flokli.de>2023-09-18T10·33+0000
commitda6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch)
tree5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix/store/src/fuse
parent3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff)
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync.

This however had some annoying consequences:

 - It became more and more complicated to track when we're in a context
   with an async runtime in the context or not, producing bugs like
   https://b.tvl.fyi/issues/304
 - The sync trait shielded away async clients from async worloads,
   requiring manual block_on code inside the gRPC client code, and
   spawn_blocking calls in consumers of the trait, even if they were
   async (like the gRPC server)
 - We had to write our own custom glue code (SyncReadIntoAsyncRead)
   to convert a sync io::Read into a tokio::io::AsyncRead, which already
   existed in tokio internally, but upstream ia hesitant to expose.

This now makes the BlobService trait async (via the async_trait macro,
like we already do in various gRPC parts), and replaces the sync readers
and writers with their async counterparts.

Tests interacting with a BlobService now need to have an async runtime
available, the easiest way for this is to mark the test functions
with the tokio::test macro, allowing us to directly .await in the test
function.

In places where we don't have an async runtime available from context
(like tvix-cli), we can pass one down explicitly.

Now that we don't provide a sync interface anymore, the (sync) FUSE
library now holds a pointer to a tokio runtime handle, and needs to at
least have 2 threads available when talking to a blob service (which is
why some of the tests now use the multi_thread flavor).

The FUSE tests got a bit more verbose, as we couldn't use the
setup_and_mount function accepting a callback anymore. We can hopefully
move some of the test fixture setup to rstest in the future to make this
less repetitive.

Co-Authored-By: Connor Brewster <cbrewster@hey.com>
Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src/fuse')
-rw-r--r--tvix/store/src/fuse/mod.rs107
-rw-r--r--tvix/store/src/fuse/tests.rs472
2 files changed, 409 insertions, 170 deletions
diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs
index 0015abb9d557..978fd50e2634 100644
--- a/tvix/store/src/fuse/mod.rs
+++ b/tvix/store/src/fuse/mod.rs
@@ -18,11 +18,12 @@ use crate::{
 };
 use fuser::{FileAttr, ReplyAttr, Request};
 use nix_compat::store_path::StorePath;
-use std::io::{self, Read, Seek};
+use std::io;
 use std::os::unix::ffi::OsStrExt;
 use std::str::FromStr;
 use std::sync::Arc;
 use std::{collections::HashMap, time::Duration};
+use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
 use tracing::{debug, info_span, warn};
 
 use self::inode_tracker::InodeTracker;
@@ -79,6 +80,8 @@ pub struct FUSE {
     file_handles: HashMap<u64, Box<dyn BlobReader>>,
 
     next_file_handle: u64,
+
+    tokio_handle: tokio::runtime::Handle,
 }
 
 impl FUSE {
@@ -100,6 +103,7 @@ impl FUSE {
 
             file_handles: Default::default(),
             next_file_handle: 1,
+            tokio_handle: tokio::runtime::Handle::current(),
         }
     }
 
@@ -430,6 +434,7 @@ impl fuser::Filesystem for FUSE {
             reply.error(libc::ENOSYS);
             return;
         }
+
         // lookup the inode
         match *self.inode_tracker.get(ino).unwrap() {
             // read is invalid on non-files.
@@ -441,7 +446,16 @@ impl fuser::Filesystem for FUSE {
                 let span = info_span!("read", blob.digest = %blob_digest);
                 let _enter = span.enter();
 
-                match self.blob_service.open_read(blob_digest) {
+                let blob_service = self.blob_service.clone();
+                let blob_digest = blob_digest.clone();
+
+                let task = self
+                    .tokio_handle
+                    .spawn(async move { blob_service.open_read(&blob_digest).await });
+
+                let blob_reader = self.tokio_handle.block_on(task).unwrap();
+
+                match blob_reader {
                     Ok(None) => {
                         warn!("blob not found");
                         reply.error(libc::EIO);
@@ -451,6 +465,7 @@ impl fuser::Filesystem for FUSE {
                         reply.error(libc::EIO);
                     }
                     Ok(Some(blob_reader)) => {
+                        debug!("add file handle {}", fh);
                         self.file_handles.insert(fh, blob_reader);
                         reply.opened(fh, 0);
 
@@ -477,9 +492,14 @@ impl fuser::Filesystem for FUSE {
         reply: fuser::ReplyEmpty,
     ) {
         // remove and get ownership on the blob reader
-        let blob_reader = self.file_handles.remove(&fh).unwrap();
-        // drop it, which will close it.
-        drop(blob_reader);
+        match self.file_handles.remove(&fh) {
+            // drop it, which will close it.
+            Some(blob_reader) => drop(blob_reader),
+            None => {
+                // These might already be dropped if a read error occured.
+                debug!("file_handle {} not found", fh);
+            }
+        }
 
         reply.ok();
     }
@@ -498,29 +518,70 @@ impl fuser::Filesystem for FUSE {
     ) {
         debug!("read");
 
-        let blob_reader = self.file_handles.get_mut(&fh).unwrap();
-
-        // seek to the offset specified, which is relative to the start of the file.
-        let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64));
-        match resp {
-            Ok(pos) => {
-                debug_assert_eq!(offset as u64, pos);
-            }
-            Err(e) => {
-                warn!("failed to seek to offset {}: {}", offset, e);
+        // We need to take out the blob reader from self.file_handles, so we can
+        // interact with it in the separate task.
+        // On success, we pass it back out of the task, so we can put it back in self.file_handles.
+        let mut blob_reader = match self.file_handles.remove(&fh) {
+            Some(blob_reader) => blob_reader,
+            None => {
+                warn!("file handle {} unknown", fh);
                 reply.error(libc::EIO);
                 return;
             }
-        }
+        };
 
-        // now with the blobreader seeked to this location, read size of data
-        let data: std::io::Result<Vec<u8>> =
-            blob_reader.bytes().take(size.try_into().unwrap()).collect();
+        let task = self.tokio_handle.spawn(async move {
+            // seek to the offset specified, which is relative to the start of the file.
+            let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await;
 
-        match data {
-            // respond with the requested data
-            Ok(data) => reply.data(&data),
-            Err(e) => reply.error(e.raw_os_error().unwrap()),
+            match resp {
+                Ok(pos) => {
+                    debug_assert_eq!(offset as u64, pos);
+                }
+                Err(e) => {
+                    warn!("failed to seek to offset {}: {}", offset, e);
+                    return Err(libc::EIO);
+                }
+            }
+
+            // As written in the fuser docs, read should send exactly the number
+            // of bytes requested except on EOF or error.
+
+            let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
+
+            while (buf.len() as u64) < size as u64 {
+                match blob_reader.fill_buf().await {
+                    Ok(int_buf) => {
+                        // copy things from the internal buffer into buf to fill it till up until size
+
+                        // an empty buffer signals we reached EOF.
+                        if int_buf.is_empty() {
+                            break;
+                        }
+
+                        // calculate how many bytes we can read from int_buf.
+                        // It's either all of int_buf, or the number of bytes missing in buf to reach size.
+                        let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len());
+
+                        // copy these bytes into our buffer
+                        buf.extend_from_slice(&int_buf[..len_to_copy]);
+                        // and consume them in the buffered reader.
+                        blob_reader.consume(len_to_copy);
+                    }
+                    Err(e) => return Err(e.raw_os_error().unwrap()),
+                }
+            }
+            Ok((buf, blob_reader))
+        });
+
+        let resp = self.tokio_handle.block_on(task).unwrap();
+
+        match resp {
+            Err(e) => reply.error(e),
+            Ok((buf, blob_reader)) => {
+                reply.data(&buf);
+                self.file_handles.insert(fh, blob_reader);
+            }
         }
     }
 
diff --git a/tvix/store/src/fuse/tests.rs b/tvix/store/src/fuse/tests.rs
index 2c99f75471a5..29856433b38c 100644
--- a/tvix/store/src/fuse/tests.rs
+++ b/tvix/store/src/fuse/tests.rs
@@ -1,8 +1,8 @@
-use std::fs;
 use std::io::Cursor;
 use std::os::unix::prelude::MetadataExt;
 use std::path::Path;
 use std::sync::Arc;
+use std::{fs, io};
 
 use tempfile::TempDir;
 
@@ -21,34 +21,25 @@ const SYMLINK_NAME2: &str = "44444444444444444444444444444444-test";
 const DIRECTORY_WITH_KEEP_NAME: &str = "22222222222222222222222222222222-test";
 const DIRECTORY_COMPLICATED_NAME: &str = "33333333333333333333333333333333-test";
 
-fn setup_and_mount<P: AsRef<Path>, F>(
-    mountpoint: P,
-    setup_fn: F,
-) -> Result<fuser::BackgroundSession, std::io::Error>
-where
-    F: Fn(Arc<dyn BlobService>, Arc<dyn DirectoryService>, Arc<dyn PathInfoService>),
-{
-    setup_and_mount_with_listing(mountpoint, setup_fn, false)
-}
-
-fn setup_and_mount_with_listing<P: AsRef<Path>, F>(
-    mountpoint: P,
-    setup_fn: F,
-    list_root: bool,
-) -> Result<fuser::BackgroundSession, std::io::Error>
-where
-    F: Fn(Arc<dyn BlobService>, Arc<dyn DirectoryService>, Arc<dyn PathInfoService>),
-{
+fn gen_svcs() -> (
+    Arc<dyn BlobService>,
+    Arc<dyn DirectoryService>,
+    Arc<dyn PathInfoService>,
+) {
     let blob_service = gen_blob_service();
     let directory_service = gen_directory_service();
     let path_info_service = gen_pathinfo_service(blob_service.clone(), directory_service.clone());
 
-    setup_fn(
-        blob_service.clone(),
-        directory_service.clone(),
-        path_info_service.clone(),
-    );
+    (blob_service, directory_service, path_info_service)
+}
 
+fn do_mount<P: AsRef<Path>>(
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
+    path_info_service: Arc<dyn PathInfoService>,
+    mountpoint: P,
+    list_root: bool,
+) -> io::Result<fuser::BackgroundSession> {
     let fs = FUSE::new(
         blob_service,
         directory_service,
@@ -58,16 +49,17 @@ where
     fuser::spawn_mount2(fs, mountpoint, &[])
 }
 
-fn populate_blob_a(
-    blob_service: Arc<dyn BlobService>,
-    _directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+async fn populate_blob_a(
+    blob_service: &Arc<dyn BlobService>,
+    _directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // Upload BLOB_A
-    let mut bw = blob_service.open_write();
-    std::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw)
+    let mut bw = blob_service.open_write().await;
+    tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw)
+        .await
         .expect("must succeed uploading");
-    bw.close().expect("must succeed closing");
+    bw.close().await.expect("must succeed closing");
 
     // Create a PathInfo for it
     let path_info = PathInfo {
@@ -84,16 +76,17 @@ fn populate_blob_a(
     path_info_service.put(path_info).expect("must succeed");
 }
 
-fn populate_blob_b(
-    blob_service: Arc<dyn BlobService>,
-    _directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+async fn populate_blob_b(
+    blob_service: &Arc<dyn BlobService>,
+    _directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // Upload BLOB_B
-    let mut bw = blob_service.open_write();
-    std::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw)
+    let mut bw = blob_service.open_write().await;
+    tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw)
+        .await
         .expect("must succeed uploading");
-    bw.close().expect("must succeed closing");
+    bw.close().await.expect("must succeed closing");
 
     // Create a PathInfo for it
     let path_info = PathInfo {
@@ -111,9 +104,9 @@ fn populate_blob_b(
 }
 
 fn populate_symlink(
-    _blob_service: Arc<dyn BlobService>,
-    _directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+    _blob_service: &Arc<dyn BlobService>,
+    _directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // Create a PathInfo for it
     let path_info = PathInfo {
@@ -131,9 +124,9 @@ fn populate_symlink(
 /// This writes a symlink pointing to /nix/store/somewhereelse,
 /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED.
 fn populate_symlink2(
-    _blob_service: Arc<dyn BlobService>,
-    _directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+    _blob_service: &Arc<dyn BlobService>,
+    _directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // Create a PathInfo for it
     let path_info = PathInfo {
@@ -148,16 +141,16 @@ fn populate_symlink2(
     path_info_service.put(path_info).expect("must succeed");
 }
 
-fn populate_directory_with_keep(
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+async fn populate_directory_with_keep(
+    blob_service: &Arc<dyn BlobService>,
+    directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // upload empty blob
-    let mut bw = blob_service.open_write();
+    let mut bw = blob_service.open_write().await;
     assert_eq!(
         fixtures::EMPTY_BLOB_DIGEST.to_vec(),
-        bw.close().expect("must succeed closing").to_vec(),
+        bw.close().await.expect("must succeed closing").to_vec(),
     );
 
     // upload directory
@@ -182,9 +175,9 @@ fn populate_directory_with_keep(
 /// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory
 /// itself.
 fn populate_pathinfo_without_directory(
-    _: Arc<dyn BlobService>,
-    _: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+    _: &Arc<dyn BlobService>,
+    _: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // upload pathinfo
     let path_info = PathInfo {
@@ -202,9 +195,9 @@ fn populate_pathinfo_without_directory(
 
 /// Insert , but don't provide the blob .keep is pointing to
 fn populate_blob_a_without_blob(
-    _: Arc<dyn BlobService>,
-    _: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+    _: &Arc<dyn BlobService>,
+    _: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // Create a PathInfo for blob A
     let path_info = PathInfo {
@@ -221,16 +214,16 @@ fn populate_blob_a_without_blob(
     path_info_service.put(path_info).expect("must succeed");
 }
 
-fn populate_directory_complicated(
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-    path_info_service: Arc<dyn PathInfoService>,
+async fn populate_directory_complicated(
+    blob_service: &Arc<dyn BlobService>,
+    directory_service: &Arc<dyn DirectoryService>,
+    path_info_service: &Arc<dyn PathInfoService>,
 ) {
     // upload empty blob
-    let mut bw = blob_service.open_write();
+    let mut bw = blob_service.open_write().await;
     assert_eq!(
         fixtures::EMPTY_BLOB_DIGEST.to_vec(),
-        bw.close().expect("must succeed closing").to_vec(),
+        bw.close().await.expect("must succeed closing").to_vec(),
     );
 
     // upload inner directory
@@ -258,8 +251,8 @@ fn populate_directory_complicated(
 }
 
 /// Ensure mounting itself doesn't fail
-#[test]
-fn mount() {
+#[tokio::test]
+async fn mount() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -268,14 +261,22 @@ fn mount() {
 
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     fuser_session.join()
 }
 
 /// Ensure listing the root isn't allowed
-#[test]
-fn root() {
+#[tokio::test]
+async fn root() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -283,7 +284,15 @@ fn root() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     {
         // read_dir succeeds, but getting the first element will fail.
@@ -297,8 +306,8 @@ fn root() {
 }
 
 /// Ensure listing the root is allowed if configured explicitly
-#[test]
-fn root_with_listing() {
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn root_with_listing() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -306,8 +315,17 @@ fn root_with_listing() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount_with_listing(tmpdir.path(), populate_blob_a, true).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        true, /* allow listing */
+    )
+    .expect("must succeed");
 
     {
         // read_dir succeeds, but getting the first element will fail.
@@ -325,8 +343,8 @@ fn root_with_listing() {
 }
 
 /// Ensure we can stat a file at the root
-#[test]
-fn stat_file_at_root() {
+#[tokio::test]
+async fn stat_file_at_root() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -334,7 +352,17 @@ fn stat_file_at_root() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_A_NAME);
 
@@ -349,8 +377,8 @@ fn stat_file_at_root() {
 }
 
 /// Ensure we can read a file at the root
-#[test]
-fn read_file_at_root() {
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn read_file_at_root() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -358,7 +386,17 @@ fn read_file_at_root() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_A_NAME);
 
@@ -373,8 +411,8 @@ fn read_file_at_root() {
 }
 
 /// Ensure we can read a large file at the root
-#[test]
-fn read_large_file_at_root() {
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn read_large_file_at_root() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -382,7 +420,17 @@ fn read_large_file_at_root() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_b).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_b(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_B_NAME);
     {
@@ -405,8 +453,8 @@ fn read_large_file_at_root() {
 }
 
 /// Read the target of a symlink
-#[test]
-fn symlink_readlink() {
+#[tokio::test]
+async fn symlink_readlink() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -414,7 +462,18 @@ fn symlink_readlink() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), populate_symlink).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_symlink(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
     let p = tmpdir.path().join(SYMLINK_NAME);
 
     let target = fs::read_link(&p).expect("must succeed");
@@ -437,8 +496,8 @@ fn symlink_readlink() {
 }
 
 /// Read and stat a regular file through a symlink pointing to it.
-#[test]
-fn read_stat_through_symlink() {
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn read_stat_through_symlink() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -446,10 +505,17 @@ fn read_stat_through_symlink() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| {
-        populate_blob_a(bs.clone(), ds.clone(), ps.clone());
-        populate_symlink(bs, ds, ps);
-    })
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+    populate_symlink(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
     .expect("must succeed");
 
     let p_symlink = tmpdir.path().join(SYMLINK_NAME);
@@ -473,8 +539,8 @@ fn read_stat_through_symlink() {
 }
 
 /// Read a directory in the root, and validate some attributes.
-#[test]
-fn read_stat_directory() {
+#[tokio::test]
+async fn read_stat_directory() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -482,8 +548,17 @@ fn read_stat_directory() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_with_keep).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
 
@@ -495,9 +570,9 @@ fn read_stat_directory() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
 /// Read a blob inside a directory. This ensures we successfully populate directory data.
-fn read_blob_inside_dir() {
+async fn read_blob_inside_dir() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -505,8 +580,17 @@ fn read_blob_inside_dir() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_with_keep).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep");
 
@@ -522,10 +606,10 @@ fn read_blob_inside_dir() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
 /// Read a blob inside a directory inside a directory. This ensures we properly
 /// populate directories as we traverse down the structure.
-fn read_blob_deep_inside_dir() {
+async fn read_blob_deep_inside_dir() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -533,8 +617,17 @@ fn read_blob_deep_inside_dir() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir
         .path()
@@ -555,8 +648,8 @@ fn read_blob_deep_inside_dir() {
 }
 
 /// Ensure readdir works.
-#[test]
-fn readdir() {
+#[tokio::test]
+async fn readdir() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -564,8 +657,17 @@ fn readdir() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME);
 
@@ -601,9 +703,9 @@ fn readdir() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test]
 /// Do a readdir deeper inside a directory, without doing readdir or stat in the parent directory.
-fn readdir_deep() {
+async fn readdir_deep() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -611,8 +713,17 @@ fn readdir_deep() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep");
 
@@ -636,8 +747,8 @@ fn readdir_deep() {
 }
 
 /// Check attributes match how they show up in /nix/store normally.
-#[test]
-fn check_attributes() {
+#[tokio::test]
+async fn check_attributes() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -645,11 +756,18 @@ fn check_attributes() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| {
-        populate_blob_a(bs.clone(), ds.clone(), ps.clone());
-        populate_directory_with_keep(bs.clone(), ds.clone(), ps.clone());
-        populate_symlink(bs, ds, ps);
-    })
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+    populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
+    populate_symlink(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
     .expect("must succeed");
 
     let p_file = tmpdir.path().join(BLOB_A_NAME);
@@ -689,10 +807,10 @@ fn check_attributes() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test]
 /// Ensure we allocate the same inodes for the same directory contents.
 /// $DIRECTORY_COMPLICATED_NAME/keep contains the same data as $DIRECTORY_WITH_KEEP.
-fn compare_inodes_directories() {
+async fn compare_inodes_directories() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -700,10 +818,17 @@ fn compare_inodes_directories() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| {
-        populate_directory_with_keep(bs.clone(), ds.clone(), ps.clone());
-        populate_directory_complicated(bs, ds, ps);
-    })
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
     .expect("must succeed");
 
     let p_dir_with_keep = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
@@ -720,8 +845,8 @@ fn compare_inodes_directories() {
 
 /// Ensure we allocate the same inodes for the same directory contents.
 /// $DIRECTORY_COMPLICATED_NAME/keep/,keep contains the same data as $DIRECTORY_COMPLICATED_NAME/.keep
-#[test]
-fn compare_inodes_files() {
+#[tokio::test]
+async fn compare_inodes_files() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -729,8 +854,17 @@ fn compare_inodes_files() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p_keep1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join(".keep");
     let p_keep2 = tmpdir
@@ -750,8 +884,8 @@ fn compare_inodes_files() {
 
 /// Ensure we allocate the same inode for symlinks pointing to the same targets.
 /// $DIRECTORY_COMPLICATED_NAME/aa points to the same target as SYMLINK_NAME2.
-#[test]
-fn compare_inodes_symlinks() {
+#[tokio::test]
+async fn compare_inodes_symlinks() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -759,10 +893,17 @@ fn compare_inodes_symlinks() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| {
-        populate_directory_complicated(bs.clone(), ds.clone(), ps.clone());
-        populate_symlink2(bs, ds, ps);
-    })
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
+    populate_symlink2(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
     .expect("must succeed");
 
     let p1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("aa");
@@ -778,8 +919,8 @@ fn compare_inodes_symlinks() {
 }
 
 /// Check we match paths exactly.
-#[test]
-fn read_wrong_paths_in_root() {
+#[tokio::test]
+async fn read_wrong_paths_in_root() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -787,7 +928,17 @@ fn read_wrong_paths_in_root() {
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     // wrong name
     assert!(!tmpdir
@@ -817,8 +968,8 @@ fn read_wrong_paths_in_root() {
 }
 
 /// Make sure writes are not allowed
-#[test]
-fn disallow_writes() {
+#[tokio::test]
+async fn disallow_writes() {
     // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
@@ -827,7 +978,16 @@ fn disallow_writes() {
 
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_A_NAME);
     let e = std::fs::File::create(p).expect_err("must fail");
@@ -837,17 +997,26 @@ fn disallow_writes() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test]
 /// Ensure we get an IO error if the directory service does not have the Directory object.
-fn missing_directory() {
+async fn missing_directory() {
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
         return;
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_pathinfo_without_directory).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
 
@@ -871,17 +1040,26 @@ fn missing_directory() {
     fuser_session.join()
 }
 
-#[test]
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
 /// Ensure we get an IO error if the blob service does not have the blob
-fn missing_blob() {
+async fn missing_blob() {
     if !std::path::Path::new("/dev/fuse").exists() {
         eprintln!("skipping test");
         return;
     }
     let tmpdir = TempDir::new().unwrap();
 
-    let fuser_session =
-        setup_and_mount(tmpdir.path(), populate_blob_a_without_blob).expect("must succeed");
+    let (blob_service, directory_service, path_info_service) = gen_svcs();
+    populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service);
+
+    let fuser_session = do_mount(
+        blob_service,
+        directory_service,
+        path_info_service,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_A_NAME);