about summary refs log tree commit diff
path: root/tvix/store/src/fs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/fs')
-rw-r--r--tvix/store/src/fs/mod.rs43
-rw-r--r--tvix/store/src/fs/tests.rs231
2 files changed, 178 insertions, 96 deletions
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs
index 48e605406331..02d3bb3221ad 100644
--- a/tvix/store/src/fs/mod.rs
+++ b/tvix/store/src/fs/mod.rs
@@ -16,6 +16,7 @@ use crate::{
     B3Digest, Error,
 };
 use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID};
+use futures::StreamExt;
 use nix_compat::store_path::StorePath;
 use parking_lot::RwLock;
 use std::{
@@ -26,7 +27,10 @@ use std::{
     sync::{atomic::Ordering, Arc},
     time::Duration,
 };
-use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
+use tokio::{
+    io::{AsyncBufReadExt, AsyncSeekExt},
+    sync::mpsc,
+};
 use tracing::{debug, info_span, warn};
 
 use self::{
@@ -159,7 +163,11 @@ impl TvixStoreFs {
             )))
         } else {
             // If we don't have it, look it up in PathInfoService.
-            match self.path_info_service.get(store_path.digest)? {
+            let path_info_service = self.path_info_service.clone();
+            let task = self
+                .tokio_handle
+                .spawn(async move { path_info_service.get(store_path.digest).await });
+            match self.tokio_handle.block_on(task).unwrap()? {
                 // the pathinfo doesn't exist, so the file doesn't exist.
                 None => Ok(None),
                 Some(path_info) => {
@@ -204,7 +212,12 @@ impl TvixStoreFs {
     /// This is both used to initially insert the root node of a store path,
     /// as well as when looking up an intermediate DirectoryNode.
     fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> {
-        match self.directory_service.get(directory_digest) {
+        let directory_service = self.directory_service.clone();
+        let directory_digest_clone = directory_digest.clone();
+        let task = self
+            .tokio_handle
+            .spawn(async move { directory_service.get(&directory_digest_clone).await });
+        match self.tokio_handle.block_on(task).unwrap() {
             Err(e) => {
                 warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory");
                 Err(e)
@@ -369,12 +382,23 @@ impl FileSystem for TvixStoreFs {
             if !self.list_root {
                 return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
             } else {
-                for (i, path_info) in self
-                    .path_info_service
-                    .list()
-                    .skip(offset as usize)
-                    .enumerate()
-                {
+                let path_info_service = self.path_info_service.clone();
+                let (tx, mut rx) = mpsc::channel(16);
+
+                // This task will run in the background immediately and will exit
+                // after the stream ends or if we no longer want any more entries.
+                self.tokio_handle.spawn(async move {
+                    let mut stream = path_info_service.list().skip(offset as usize).enumerate();
+                    while let Some(path_info) = stream.next().await {
+                        if tx.send(path_info).await.is_err() {
+                            // If we get a send error, it means the sync code
+                            // doesn't want any more entries.
+                            break;
+                        }
+                    }
+                });
+
+                while let Some((i, path_info)) = rx.blocking_recv() {
                     let path_info = match path_info {
                         Err(e) => {
                             warn!("failed to retrieve pathinfo: {}", e);
@@ -421,6 +445,7 @@ impl FileSystem for TvixStoreFs {
                         break;
                     }
                 }
+
                 return Ok(());
             }
         }
diff --git a/tvix/store/src/fs/tests.rs b/tvix/store/src/fs/tests.rs
index 30f5ca3f40aa..6837f8aa293a 100644
--- a/tvix/store/src/fs/tests.rs
+++ b/tvix/store/src/fs/tests.rs
@@ -1,8 +1,10 @@
+use futures::StreamExt;
 use std::io::Cursor;
 use std::os::unix::prelude::MetadataExt;
 use std::path::Path;
 use std::sync::Arc;
-use std::{fs, io};
+use tokio::{fs, io};
+use tokio_stream::wrappers::ReadDirStream;
 
 use tempfile::TempDir;
 
@@ -75,7 +77,10 @@ async fn populate_blob_a(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 async fn populate_blob_b(
@@ -102,7 +107,10 @@ async fn populate_blob_b(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 /// adds a blob containing helloworld and marks it as executable
@@ -133,10 +141,13 @@ async fn populate_helloworld_blob(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
-fn populate_symlink(
+async fn populate_symlink(
     _blob_service: &Arc<dyn BlobService>,
     _directory_service: &Arc<dyn DirectoryService>,
     path_info_service: &Arc<dyn PathInfoService>,
@@ -151,12 +162,15 @@ fn populate_symlink(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 /// This writes a symlink pointing to /nix/store/somewhereelse,
 /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED.
-fn populate_symlink2(
+async fn populate_symlink2(
     _blob_service: &Arc<dyn BlobService>,
     _directory_service: &Arc<dyn DirectoryService>,
     path_info_service: &Arc<dyn PathInfoService>,
@@ -171,7 +185,10 @@ fn populate_symlink2(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 async fn populate_directory_with_keep(
@@ -189,6 +206,7 @@ async fn populate_directory_with_keep(
     // upload directory
     directory_service
         .put(fixtures::DIRECTORY_WITH_KEEP.clone())
+        .await
         .expect("must succeed uploading");
 
     // upload pathinfo
@@ -202,12 +220,15 @@ async fn populate_directory_with_keep(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 /// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory
 /// itself.
-fn populate_pathinfo_without_directory(
+async fn populate_pathinfo_without_directory(
     _: &Arc<dyn BlobService>,
     _: &Arc<dyn DirectoryService>,
     path_info_service: &Arc<dyn PathInfoService>,
@@ -223,11 +244,14 @@ fn populate_pathinfo_without_directory(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 /// Insert , but don't provide the blob .keep is pointing to
-fn populate_blob_a_without_blob(
+async fn populate_blob_a_without_blob(
     _: &Arc<dyn BlobService>,
     _: &Arc<dyn DirectoryService>,
     path_info_service: &Arc<dyn PathInfoService>,
@@ -244,7 +268,10 @@ fn populate_blob_a_without_blob(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 async fn populate_directory_complicated(
@@ -262,11 +289,13 @@ async fn populate_directory_complicated(
     // upload inner directory
     directory_service
         .put(fixtures::DIRECTORY_WITH_KEEP.clone())
+        .await
         .expect("must succeed uploading");
 
     // uplodad parent directory
     directory_service
         .put(fixtures::DIRECTORY_COMPLICATED.clone())
+        .await
         .expect("must succeed uploading");
 
     // upload pathinfo
@@ -280,7 +309,10 @@ async fn populate_directory_complicated(
         }),
         ..Default::default()
     };
-    path_info_service.put(path_info).expect("must succeed");
+    path_info_service
+        .put(path_info)
+        .await
+        .expect("must succeed");
 }
 
 /// Ensure mounting itself doesn't fail
@@ -329,9 +361,13 @@ async fn root() {
 
     {
         // read_dir succeeds, but getting the first element will fail.
-        let mut it = fs::read_dir(tmpdir).expect("must succeed");
+        let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed"));
 
-        let err = it.next().expect("must be some").expect_err("must be err");
+        let err = it
+            .next()
+            .await
+            .expect("must be some")
+            .expect_err("must be err");
         assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind());
     }
 
@@ -362,11 +398,15 @@ async fn root_with_listing() {
 
     {
         // read_dir succeeds, but getting the first element will fail.
-        let mut it = fs::read_dir(tmpdir).expect("must succeed");
+        let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed"));
 
-        let e = it.next().expect("must be some").expect("must succeed");
+        let e = it
+            .next()
+            .await
+            .expect("must be some")
+            .expect("must succeed");
 
-        let metadata = e.metadata().expect("must succeed");
+        let metadata = e.metadata().await.expect("must succeed");
         assert!(metadata.is_file());
         assert!(metadata.permissions().readonly());
         assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len());
@@ -400,7 +440,7 @@ async fn stat_file_at_root() {
     let p = tmpdir.path().join(BLOB_A_NAME);
 
     // peek at the file metadata
-    let metadata = fs::metadata(p).expect("must succeed");
+    let metadata = fs::metadata(p).await.expect("must succeed");
 
     assert!(metadata.is_file());
     assert!(metadata.permissions().readonly());
@@ -434,7 +474,7 @@ async fn read_file_at_root() {
     let p = tmpdir.path().join(BLOB_A_NAME);
 
     // read the file contents
-    let data = fs::read(p).expect("must succeed");
+    let data = fs::read(p).await.expect("must succeed");
 
     // ensure size and contents match
     assert_eq!(fixtures::BLOB_A.len(), data.len());
@@ -468,7 +508,7 @@ async fn read_large_file_at_root() {
     let p = tmpdir.path().join(BLOB_B_NAME);
     {
         // peek at the file metadata
-        let metadata = fs::metadata(&p).expect("must succeed");
+        let metadata = fs::metadata(&p).await.expect("must succeed");
 
         assert!(metadata.is_file());
         assert!(metadata.permissions().readonly());
@@ -476,7 +516,7 @@ async fn read_large_file_at_root() {
     }
 
     // read the file contents
-    let data = fs::read(p).expect("must succeed");
+    let data = fs::read(p).await.expect("must succeed");
 
     // ensure size and contents match
     assert_eq!(fixtures::BLOB_B.len(), data.len());
@@ -496,7 +536,7 @@ async fn symlink_readlink() {
     let tmpdir = TempDir::new().unwrap();
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
-    populate_symlink(&blob_service, &directory_service, &path_info_service);
+    populate_symlink(&blob_service, &directory_service, &path_info_service).await;
 
     let mut fuse_daemon = do_mount(
         blob_service,
@@ -509,20 +549,20 @@ async fn symlink_readlink() {
 
     let p = tmpdir.path().join(SYMLINK_NAME);
 
-    let target = fs::read_link(&p).expect("must succeed");
+    let target = fs::read_link(&p).await.expect("must succeed");
     assert_eq!(BLOB_A_NAME, target.to_str().unwrap());
 
     // peek at the file metadata, which follows symlinks.
     // this must fail, as we didn't populate the target.
-    let e = fs::metadata(&p).expect_err("must fail");
+    let e = fs::metadata(&p).await.expect_err("must fail");
     assert_eq!(std::io::ErrorKind::NotFound, e.kind());
 
     // peeking at the file metadata without following symlinks will succeed.
-    let metadata = fs::symlink_metadata(&p).expect("must succeed");
+    let metadata = fs::symlink_metadata(&p).await.expect("must succeed");
     assert!(metadata.is_symlink());
 
     // reading from the symlink (which follows) will fail, because the target doesn't exist.
-    let e = fs::read(p).expect_err("must fail");
+    let e = fs::read(p).await.expect_err("must fail");
     assert_eq!(std::io::ErrorKind::NotFound, e.kind());
 
     fuse_daemon.unmount().expect("unmount");
@@ -540,7 +580,7 @@ async fn read_stat_through_symlink() {
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
-    populate_symlink(&blob_service, &directory_service, &path_info_service);
+    populate_symlink(&blob_service, &directory_service, &path_info_service).await;
 
     let mut fuse_daemon = do_mount(
         blob_service,
@@ -556,16 +596,16 @@ async fn read_stat_through_symlink() {
 
     // peek at the file metadata, which follows symlinks.
     // this must now return the same metadata as when statting at the target directly.
-    let metadata_symlink = fs::metadata(&p_symlink).expect("must succeed");
-    let metadata_blob = fs::metadata(&p_blob).expect("must succeed");
+    let metadata_symlink = fs::metadata(&p_symlink).await.expect("must succeed");
+    let metadata_blob = fs::metadata(&p_blob).await.expect("must succeed");
     assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type());
     assert_eq!(metadata_blob.len(), metadata_symlink.len());
 
     // reading from the symlink (which follows) will return the same data as if
     // we were reading from the file directly.
     assert_eq!(
-        std::fs::read(p_blob).expect("must succeed"),
-        std::fs::read(p_symlink).expect("must succeed"),
+        fs::read(p_blob).await.expect("must succeed"),
+        fs::read(p_symlink).await.expect("must succeed"),
     );
 
     fuse_daemon.unmount().expect("unmount");
@@ -596,7 +636,7 @@ async fn read_stat_directory() {
     let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
 
     // peek at the metadata of the directory
-    let metadata = fs::metadata(p).expect("must succeed");
+    let metadata = fs::metadata(p).await.expect("must succeed");
     assert!(metadata.is_dir());
     assert!(metadata.permissions().readonly());
 
@@ -628,12 +668,12 @@ async fn read_blob_inside_dir() {
     let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep");
 
     // peek at metadata.
-    let metadata = fs::metadata(&p).expect("must succeed");
+    let metadata = fs::metadata(&p).await.expect("must succeed");
     assert!(metadata.is_file());
     assert!(metadata.permissions().readonly());
 
     // read from it
-    let data = fs::read(&p).expect("must succeed");
+    let data = fs::read(&p).await.expect("must succeed");
     assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data);
 
     fuse_daemon.unmount().expect("unmount");
@@ -669,12 +709,12 @@ async fn read_blob_deep_inside_dir() {
         .join(".keep");
 
     // peek at metadata.
-    let metadata = fs::metadata(&p).expect("must succeed");
+    let metadata = fs::metadata(&p).await.expect("must succeed");
     assert!(metadata.is_file());
     assert!(metadata.permissions().readonly());
 
     // read from it
-    let data = fs::read(&p).expect("must succeed");
+    let data = fs::read(&p).await.expect("must succeed");
     assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data);
 
     fuse_daemon.unmount().expect("unmount");
@@ -706,10 +746,10 @@ async fn readdir() {
 
     {
         // read_dir should succeed. Collect all elements
-        let elements: Vec<_> = fs::read_dir(p)
-            .expect("must succeed")
+        let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed"))
             .map(|e| e.expect("must not be err"))
-            .collect();
+            .collect()
+            .await;
 
         assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and ..
 
@@ -719,18 +759,18 @@ async fn readdir() {
         // ".keep", 0 byte file.
         let e = &elements[0];
         assert_eq!(".keep", e.file_name());
-        assert!(e.file_type().expect("must succeed").is_file());
-        assert_eq!(0, e.metadata().expect("must succeed").len());
+        assert!(e.file_type().await.expect("must succeed").is_file());
+        assert_eq!(0, e.metadata().await.expect("must succeed").len());
 
         // "aa", symlink.
         let e = &elements[1];
         assert_eq!("aa", e.file_name());
-        assert!(e.file_type().expect("must succeed").is_symlink());
+        assert!(e.file_type().await.expect("must succeed").is_symlink());
 
         // "keep", directory
         let e = &elements[2];
         assert_eq!("keep", e.file_name());
-        assert!(e.file_type().expect("must succeed").is_dir());
+        assert!(e.file_type().await.expect("must succeed").is_dir());
     }
 
     fuse_daemon.unmount().expect("unmount");
@@ -762,18 +802,18 @@ async fn readdir_deep() {
 
     {
         // read_dir should succeed. Collect all elements
-        let elements: Vec<_> = fs::read_dir(p)
-            .expect("must succeed")
+        let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed"))
             .map(|e| e.expect("must not be err"))
-            .collect();
+            .collect()
+            .await;
 
         assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and ..
 
         // ".keep", 0 byte file.
         let e = &elements[0];
         assert_eq!(".keep", e.file_name());
-        assert!(e.file_type().expect("must succeed").is_file());
-        assert_eq!(0, e.metadata().expect("must succeed").len());
+        assert!(e.file_type().await.expect("must succeed").is_file());
+        assert_eq!(0, e.metadata().await.expect("must succeed").len());
     }
 
     fuse_daemon.unmount().expect("unmount");
@@ -792,7 +832,7 @@ async fn check_attributes() {
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_blob_a(&blob_service, &directory_service, &path_info_service).await;
     populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await;
-    populate_symlink(&blob_service, &directory_service, &path_info_service);
+    populate_symlink(&blob_service, &directory_service, &path_info_service).await;
     populate_helloworld_blob(&blob_service, &directory_service, &path_info_service).await;
 
     let mut fuse_daemon = do_mount(
@@ -810,10 +850,16 @@ async fn check_attributes() {
     let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME);
 
     // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident.
-    let metadata_file = fs::symlink_metadata(&p_file).expect("must succeed");
-    let metadata_executable_file = fs::symlink_metadata(&p_executable_file).expect("must succeed");
-    let metadata_directory = fs::symlink_metadata(&p_directory).expect("must succeed");
-    let metadata_symlink = fs::symlink_metadata(&p_symlink).expect("must succeed");
+    let metadata_file = fs::symlink_metadata(&p_file).await.expect("must succeed");
+    let metadata_executable_file = fs::symlink_metadata(&p_executable_file)
+        .await
+        .expect("must succeed");
+    let metadata_directory = fs::symlink_metadata(&p_directory)
+        .await
+        .expect("must succeed");
+    let metadata_symlink = fs::symlink_metadata(&p_symlink)
+        .await
+        .expect("must succeed");
 
     // modes should match. We & with 0o777 to remove any higher bits.
     assert_eq!(0o444, metadata_file.mode() & 0o777);
@@ -873,8 +919,14 @@ async fn compare_inodes_directories() {
 
     // peek at metadata.
     assert_eq!(
-        fs::metadata(p_dir_with_keep).expect("must succeed").ino(),
-        fs::metadata(p_sibling_dir).expect("must succeed").ino()
+        fs::metadata(p_dir_with_keep)
+            .await
+            .expect("must succeed")
+            .ino(),
+        fs::metadata(p_sibling_dir)
+            .await
+            .expect("must succeed")
+            .ino()
     );
 
     fuse_daemon.unmount().expect("unmount");
@@ -912,8 +964,8 @@ async fn compare_inodes_files() {
 
     // peek at metadata.
     assert_eq!(
-        fs::metadata(p_keep1).expect("must succeed").ino(),
-        fs::metadata(p_keep2).expect("must succeed").ino()
+        fs::metadata(p_keep1).await.expect("must succeed").ino(),
+        fs::metadata(p_keep2).await.expect("must succeed").ino()
     );
 
     fuse_daemon.unmount().expect("unmount");
@@ -932,7 +984,7 @@ async fn compare_inodes_symlinks() {
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
     populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await;
-    populate_symlink2(&blob_service, &directory_service, &path_info_service);
+    populate_symlink2(&blob_service, &directory_service, &path_info_service).await;
 
     let mut fuse_daemon = do_mount(
         blob_service,
@@ -948,8 +1000,8 @@ async fn compare_inodes_symlinks() {
 
     // peek at metadata.
     assert_eq!(
-        fs::symlink_metadata(p1).expect("must succeed").ino(),
-        fs::symlink_metadata(p2).expect("must succeed").ino()
+        fs::symlink_metadata(p1).await.expect("must succeed").ino(),
+        fs::symlink_metadata(p2).await.expect("must succeed").ino()
     );
 
     fuse_daemon.unmount().expect("unmount");
@@ -978,28 +1030,32 @@ async fn read_wrong_paths_in_root() {
     .expect("must succeed");
 
     // wrong name
-    assert!(!tmpdir
-        .path()
-        .join("00000000000000000000000000000000-tes")
-        .exists());
+    assert!(
+        fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes"))
+            .await
+            .is_err()
+    );
 
     // invalid hash
-    assert!(!tmpdir
-        .path()
-        .join("0000000000000000000000000000000-test")
-        .exists());
+    assert!(
+        fs::metadata(tmpdir.path().join("0000000000000000000000000000000-test"))
+            .await
+            .is_err()
+    );
 
     // right name, must exist
-    assert!(tmpdir
-        .path()
-        .join("00000000000000000000000000000000-test")
-        .exists());
+    assert!(
+        fs::metadata(tmpdir.path().join("00000000000000000000000000000000-test"))
+            .await
+            .is_ok()
+    );
 
     // now wrong name with right hash still may not exist
-    assert!(!tmpdir
-        .path()
-        .join("00000000000000000000000000000000-tes")
-        .exists());
+    assert!(
+        fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes"))
+            .await
+            .is_err()
+    );
 
     fuse_daemon.unmount().expect("unmount");
 }
@@ -1027,7 +1083,7 @@ async fn disallow_writes() {
     .expect("must succeed");
 
     let p = tmpdir.path().join(BLOB_A_NAME);
-    let e = std::fs::File::create(p).expect_err("must fail");
+    let e = fs::File::create(p).await.expect_err("must fail");
 
     assert_eq!(Some(libc::EROFS), e.raw_os_error());
 
@@ -1044,7 +1100,8 @@ async fn missing_directory() {
     let tmpdir = TempDir::new().unwrap();
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
-    populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service);
+    populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service)
+        .await;
 
     let mut fuse_daemon = do_mount(
         blob_service,
@@ -1059,19 +1116,19 @@ async fn missing_directory() {
 
     {
         // `stat` on the path should succeed, because it doesn't trigger the directory request.
-        fs::metadata(&p).expect("must succeed");
+        fs::metadata(&p).await.expect("must succeed");
 
         // However, calling either `readdir` or `stat` on a child should fail with an IO error.
         // It fails when trying to pull the first entry, because we don't implement opendir separately
-        fs::read_dir(&p)
-            .unwrap()
+        ReadDirStream::new(fs::read_dir(&p).await.unwrap())
             .next()
+            .await
             .expect("must be some")
             .expect_err("must be err");
 
         // rust currently sets e.kind() to Uncategorized, which isn't very
         // helpful, so we don't look at the error more closely than that..
-        fs::metadata(p.join(".keep")).expect_err("must fail");
+        fs::metadata(p.join(".keep")).await.expect_err("must fail");
     }
 
     fuse_daemon.unmount().expect("unmount");
@@ -1087,7 +1144,7 @@ async fn missing_blob() {
     let tmpdir = TempDir::new().unwrap();
 
     let (blob_service, directory_service, path_info_service) = gen_svcs();
-    populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service);
+    populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service).await;
 
     let mut fuse_daemon = do_mount(
         blob_service,
@@ -1102,12 +1159,12 @@ async fn missing_blob() {
 
     {
         // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service.
-        fs::metadata(&p).expect("must succeed");
+        fs::metadata(&p).await.expect("must succeed");
 
         // However, calling read on the blob should fail.
         // rust currently sets e.kind() to Uncategorized, which isn't very
         // helpful, so we don't look at the error more closely than that..
-        fs::read(p).expect_err("must fail");
+        fs::read(p).await.expect_err("must fail");
     }
 
     fuse_daemon.unmount().expect("unmount");