diff options
author | Florian Klink <flokli@flokli.de> | 2023-09-13T12·20+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-09-18T10·33+0000 |
commit | da6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch) | |
tree | 5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix/store/src/fuse | |
parent | 3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff) |
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync. This however had some annoying consequences: - It became more and more complicated to track when we're in a context with an async runtime in the context or not, producing bugs like https://b.tvl.fyi/issues/304 - The sync trait shielded away async clients from async worloads, requiring manual block_on code inside the gRPC client code, and spawn_blocking calls in consumers of the trait, even if they were async (like the gRPC server) - We had to write our own custom glue code (SyncReadIntoAsyncRead) to convert a sync io::Read into a tokio::io::AsyncRead, which already existed in tokio internally, but upstream ia hesitant to expose. This now makes the BlobService trait async (via the async_trait macro, like we already do in various gRPC parts), and replaces the sync readers and writers with their async counterparts. Tests interacting with a BlobService now need to have an async runtime available, the easiest way for this is to mark the test functions with the tokio::test macro, allowing us to directly .await in the test function. In places where we don't have an async runtime available from context (like tvix-cli), we can pass one down explicitly. Now that we don't provide a sync interface anymore, the (sync) FUSE library now holds a pointer to a tokio runtime handle, and needs to at least have 2 threads available when talking to a blob service (which is why some of the tests now use the multi_thread flavor). The FUSE tests got a bit more verbose, as we couldn't use the setup_and_mount function accepting a callback anymore. We can hopefully move some of the test fixture setup to rstest in the future to make this less repetitive. Co-Authored-By: Connor Brewster <cbrewster@hey.com> Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329 Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src/fuse')
-rw-r--r-- | tvix/store/src/fuse/mod.rs | 107 | ||||
-rw-r--r-- | tvix/store/src/fuse/tests.rs | 472 |
2 files changed, 409 insertions, 170 deletions
diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs index 0015abb9d557..978fd50e2634 100644 --- a/tvix/store/src/fuse/mod.rs +++ b/tvix/store/src/fuse/mod.rs @@ -18,11 +18,12 @@ use crate::{ }; use fuser::{FileAttr, ReplyAttr, Request}; use nix_compat::store_path::StorePath; -use std::io::{self, Read, Seek}; +use std::io; use std::os::unix::ffi::OsStrExt; use std::str::FromStr; use std::sync::Arc; use std::{collections::HashMap, time::Duration}; +use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; use tracing::{debug, info_span, warn}; use self::inode_tracker::InodeTracker; @@ -79,6 +80,8 @@ pub struct FUSE { file_handles: HashMap<u64, Box<dyn BlobReader>>, next_file_handle: u64, + + tokio_handle: tokio::runtime::Handle, } impl FUSE { @@ -100,6 +103,7 @@ impl FUSE { file_handles: Default::default(), next_file_handle: 1, + tokio_handle: tokio::runtime::Handle::current(), } } @@ -430,6 +434,7 @@ impl fuser::Filesystem for FUSE { reply.error(libc::ENOSYS); return; } + // lookup the inode match *self.inode_tracker.get(ino).unwrap() { // read is invalid on non-files. @@ -441,7 +446,16 @@ impl fuser::Filesystem for FUSE { let span = info_span!("read", blob.digest = %blob_digest); let _enter = span.enter(); - match self.blob_service.open_read(blob_digest) { + let blob_service = self.blob_service.clone(); + let blob_digest = blob_digest.clone(); + + let task = self + .tokio_handle + .spawn(async move { blob_service.open_read(&blob_digest).await }); + + let blob_reader = self.tokio_handle.block_on(task).unwrap(); + + match blob_reader { Ok(None) => { warn!("blob not found"); reply.error(libc::EIO); @@ -451,6 +465,7 @@ impl fuser::Filesystem for FUSE { reply.error(libc::EIO); } Ok(Some(blob_reader)) => { + debug!("add file handle {}", fh); self.file_handles.insert(fh, blob_reader); reply.opened(fh, 0); @@ -477,9 +492,14 @@ impl fuser::Filesystem for FUSE { reply: fuser::ReplyEmpty, ) { // remove and get ownership on the blob reader - let blob_reader = self.file_handles.remove(&fh).unwrap(); - // drop it, which will close it. - drop(blob_reader); + match self.file_handles.remove(&fh) { + // drop it, which will close it. + Some(blob_reader) => drop(blob_reader), + None => { + // These might already be dropped if a read error occured. + debug!("file_handle {} not found", fh); + } + } reply.ok(); } @@ -498,29 +518,70 @@ impl fuser::Filesystem for FUSE { ) { debug!("read"); - let blob_reader = self.file_handles.get_mut(&fh).unwrap(); - - // seek to the offset specified, which is relative to the start of the file. - let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)); - match resp { - Ok(pos) => { - debug_assert_eq!(offset as u64, pos); - } - Err(e) => { - warn!("failed to seek to offset {}: {}", offset, e); + // We need to take out the blob reader from self.file_handles, so we can + // interact with it in the separate task. + // On success, we pass it back out of the task, so we can put it back in self.file_handles. + let mut blob_reader = match self.file_handles.remove(&fh) { + Some(blob_reader) => blob_reader, + None => { + warn!("file handle {} unknown", fh); reply.error(libc::EIO); return; } - } + }; - // now with the blobreader seeked to this location, read size of data - let data: std::io::Result<Vec<u8>> = - blob_reader.bytes().take(size.try_into().unwrap()).collect(); + let task = self.tokio_handle.spawn(async move { + // seek to the offset specified, which is relative to the start of the file. + let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await; - match data { - // respond with the requested data - Ok(data) => reply.data(&data), - Err(e) => reply.error(e.raw_os_error().unwrap()), + match resp { + Ok(pos) => { + debug_assert_eq!(offset as u64, pos); + } + Err(e) => { + warn!("failed to seek to offset {}: {}", offset, e); + return Err(libc::EIO); + } + } + + // As written in the fuser docs, read should send exactly the number + // of bytes requested except on EOF or error. + + let mut buf: Vec<u8> = Vec::with_capacity(size as usize); + + while (buf.len() as u64) < size as u64 { + match blob_reader.fill_buf().await { + Ok(int_buf) => { + // copy things from the internal buffer into buf to fill it till up until size + + // an empty buffer signals we reached EOF. + if int_buf.is_empty() { + break; + } + + // calculate how many bytes we can read from int_buf. + // It's either all of int_buf, or the number of bytes missing in buf to reach size. + let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len()); + + // copy these bytes into our buffer + buf.extend_from_slice(&int_buf[..len_to_copy]); + // and consume them in the buffered reader. + blob_reader.consume(len_to_copy); + } + Err(e) => return Err(e.raw_os_error().unwrap()), + } + } + Ok((buf, blob_reader)) + }); + + let resp = self.tokio_handle.block_on(task).unwrap(); + + match resp { + Err(e) => reply.error(e), + Ok((buf, blob_reader)) => { + reply.data(&buf); + self.file_handles.insert(fh, blob_reader); + } } } diff --git a/tvix/store/src/fuse/tests.rs b/tvix/store/src/fuse/tests.rs index 2c99f75471a5..29856433b38c 100644 --- a/tvix/store/src/fuse/tests.rs +++ b/tvix/store/src/fuse/tests.rs @@ -1,8 +1,8 @@ -use std::fs; use std::io::Cursor; use std::os::unix::prelude::MetadataExt; use std::path::Path; use std::sync::Arc; +use std::{fs, io}; use tempfile::TempDir; @@ -21,34 +21,25 @@ const SYMLINK_NAME2: &str = "44444444444444444444444444444444-test"; const DIRECTORY_WITH_KEEP_NAME: &str = "22222222222222222222222222222222-test"; const DIRECTORY_COMPLICATED_NAME: &str = "33333333333333333333333333333333-test"; -fn setup_and_mount<P: AsRef<Path>, F>( - mountpoint: P, - setup_fn: F, -) -> Result<fuser::BackgroundSession, std::io::Error> -where - F: Fn(Arc<dyn BlobService>, Arc<dyn DirectoryService>, Arc<dyn PathInfoService>), -{ - setup_and_mount_with_listing(mountpoint, setup_fn, false) -} - -fn setup_and_mount_with_listing<P: AsRef<Path>, F>( - mountpoint: P, - setup_fn: F, - list_root: bool, -) -> Result<fuser::BackgroundSession, std::io::Error> -where - F: Fn(Arc<dyn BlobService>, Arc<dyn DirectoryService>, Arc<dyn PathInfoService>), -{ +fn gen_svcs() -> ( + Arc<dyn BlobService>, + Arc<dyn DirectoryService>, + Arc<dyn PathInfoService>, +) { let blob_service = gen_blob_service(); let directory_service = gen_directory_service(); let path_info_service = gen_pathinfo_service(blob_service.clone(), directory_service.clone()); - setup_fn( - blob_service.clone(), - directory_service.clone(), - path_info_service.clone(), - ); + (blob_service, directory_service, path_info_service) +} +fn do_mount<P: AsRef<Path>>( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + path_info_service: Arc<dyn PathInfoService>, + mountpoint: P, + list_root: bool, +) -> io::Result<fuser::BackgroundSession> { let fs = FUSE::new( blob_service, directory_service, @@ -58,16 +49,17 @@ where fuser::spawn_mount2(fs, mountpoint, &[]) } -fn populate_blob_a( - blob_service: Arc<dyn BlobService>, - _directory_service: Arc<dyn DirectoryService>, - path_info_service: Arc<dyn PathInfoService>, +async fn populate_blob_a( + blob_service: &Arc<dyn BlobService>, + _directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, ) { // Upload BLOB_A - let mut bw = blob_service.open_write(); - std::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) + .await .expect("must succeed uploading"); - bw.close().expect("must succeed closing"); + bw.close().await.expect("must succeed closing"); // Create a PathInfo for it let path_info = PathInfo { @@ -84,16 +76,17 @@ fn populate_blob_a( path_info_service.put(path_info).expect("must succeed"); } -fn populate_blob_b( - blob_service: Arc<dyn BlobService>, - _directory_service: Arc<dyn DirectoryService>, - path_info_service: Arc<dyn PathInfoService>, +async fn populate_blob_b( + blob_service: &Arc<dyn BlobService>, + _directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, ) { // Upload BLOB_B - let mut bw = blob_service.open_write(); - std::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) + .await .expect("must succeed uploading"); - bw.close().expect("must succeed closing"); + bw.close().await.expect("must succeed closing"); // Create a PathInfo for it let path_info = PathInfo { @@ -111,9 +104,9 @@ fn populate_blob_b( } fn populate_symlink( - _blob_service: Arc<dyn BlobService>, - _directory_service: Arc<dyn DirectoryService>, - path_info_service: Arc<dyn PathInfoService>, + _blob_service: &Arc<dyn BlobService>, + _directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, ) { // Create a PathInfo for it let path_info = PathInfo { @@ -131,9 +124,9 @@ fn populate_symlink( /// This writes a symlink pointing to /nix/store/somewhereelse, /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. fn populate_symlink2( - _blob_service: Arc<dyn BlobService>, - _directory_service: Arc<dyn DirectoryService>, - path_info_service: Arc<dyn PathInfoService>, + _blob_service: &Arc<dyn BlobService>, + _directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, ) { // Create a PathInfo for it let path_info = PathInfo { @@ -148,16 +141,16 @@ fn populate_symlink2( path_info_service.put(path_info).expect("must succeed"); } -fn populate_directory_with_keep( - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, - path_info_service: Arc<dyn PathInfoService>, +async fn populate_directory_with_keep( + blob_service: &Arc<dyn BlobService>, + directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, ) { // upload empty blob - let mut bw = blob_service.open_write(); + let mut bw = blob_service.open_write().await; assert_eq!( fixtures::EMPTY_BLOB_DIGEST.to_vec(), - bw.close().expect("must succeed closing").to_vec(), + bw.close().await.expect("must succeed closing").to_vec(), ); // upload directory @@ -182,9 +175,9 @@ fn populate_directory_with_keep( /// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory /// itself. fn populate_pathinfo_without_directory( - _: Arc<dyn BlobService>, - _: Arc<dyn DirectoryService>, - path_info_service: Arc<dyn PathInfoService>, + _: &Arc<dyn BlobService>, + _: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, ) { // upload pathinfo let path_info = PathInfo { @@ -202,9 +195,9 @@ fn populate_pathinfo_without_directory( /// Insert , but don't provide the blob .keep is pointing to fn populate_blob_a_without_blob( - _: Arc<dyn BlobService>, - _: Arc<dyn DirectoryService>, - path_info_service: Arc<dyn PathInfoService>, + _: &Arc<dyn BlobService>, + _: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, ) { // Create a PathInfo for blob A let path_info = PathInfo { @@ -221,16 +214,16 @@ fn populate_blob_a_without_blob( path_info_service.put(path_info).expect("must succeed"); } -fn populate_directory_complicated( - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, - path_info_service: Arc<dyn PathInfoService>, +async fn populate_directory_complicated( + blob_service: &Arc<dyn BlobService>, + directory_service: &Arc<dyn DirectoryService>, + path_info_service: &Arc<dyn PathInfoService>, ) { // upload empty blob - let mut bw = blob_service.open_write(); + let mut bw = blob_service.open_write().await; assert_eq!( fixtures::EMPTY_BLOB_DIGEST.to_vec(), - bw.close().expect("must succeed closing").to_vec(), + bw.close().await.expect("must succeed closing").to_vec(), ); // upload inner directory @@ -258,8 +251,8 @@ fn populate_directory_complicated( } /// Ensure mounting itself doesn't fail -#[test] -fn mount() { +#[tokio::test] +async fn mount() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -268,14 +261,22 @@ fn mount() { let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); fuser_session.join() } /// Ensure listing the root isn't allowed -#[test] -fn root() { +#[tokio::test] +async fn root() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -283,7 +284,15 @@ fn root() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); { // read_dir succeeds, but getting the first element will fail. @@ -297,8 +306,8 @@ fn root() { } /// Ensure listing the root is allowed if configured explicitly -#[test] -fn root_with_listing() { +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn root_with_listing() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -306,8 +315,17 @@ fn root_with_listing() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount_with_listing(tmpdir.path(), populate_blob_a, true).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + true, /* allow listing */ + ) + .expect("must succeed"); { // read_dir succeeds, but getting the first element will fail. @@ -325,8 +343,8 @@ fn root_with_listing() { } /// Ensure we can stat a file at the root -#[test] -fn stat_file_at_root() { +#[tokio::test] +async fn stat_file_at_root() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -334,7 +352,17 @@ fn stat_file_at_root() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(BLOB_A_NAME); @@ -349,8 +377,8 @@ fn stat_file_at_root() { } /// Ensure we can read a file at the root -#[test] -fn read_file_at_root() { +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_file_at_root() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -358,7 +386,17 @@ fn read_file_at_root() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(BLOB_A_NAME); @@ -373,8 +411,8 @@ fn read_file_at_root() { } /// Ensure we can read a large file at the root -#[test] -fn read_large_file_at_root() { +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_large_file_at_root() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -382,7 +420,17 @@ fn read_large_file_at_root() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_b).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_b(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(BLOB_B_NAME); { @@ -405,8 +453,8 @@ fn read_large_file_at_root() { } /// Read the target of a symlink -#[test] -fn symlink_readlink() { +#[tokio::test] +async fn symlink_readlink() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -414,7 +462,18 @@ fn symlink_readlink() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), populate_symlink).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_symlink(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + let p = tmpdir.path().join(SYMLINK_NAME); let target = fs::read_link(&p).expect("must succeed"); @@ -437,8 +496,8 @@ fn symlink_readlink() { } /// Read and stat a regular file through a symlink pointing to it. -#[test] -fn read_stat_through_symlink() { +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_stat_through_symlink() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -446,10 +505,17 @@ fn read_stat_through_symlink() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| { - populate_blob_a(bs.clone(), ds.clone(), ps.clone()); - populate_symlink(bs, ds, ps); - }) + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + populate_symlink(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) .expect("must succeed"); let p_symlink = tmpdir.path().join(SYMLINK_NAME); @@ -473,8 +539,8 @@ fn read_stat_through_symlink() { } /// Read a directory in the root, and validate some attributes. -#[test] -fn read_stat_directory() { +#[tokio::test] +async fn read_stat_directory() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -482,8 +548,17 @@ fn read_stat_directory() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_with_keep).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); @@ -495,9 +570,9 @@ fn read_stat_directory() { fuser_session.join() } -#[test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] /// Read a blob inside a directory. This ensures we successfully populate directory data. -fn read_blob_inside_dir() { +async fn read_blob_inside_dir() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -505,8 +580,17 @@ fn read_blob_inside_dir() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_with_keep).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep"); @@ -522,10 +606,10 @@ fn read_blob_inside_dir() { fuser_session.join() } -#[test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] /// Read a blob inside a directory inside a directory. This ensures we properly /// populate directories as we traverse down the structure. -fn read_blob_deep_inside_dir() { +async fn read_blob_deep_inside_dir() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -533,8 +617,17 @@ fn read_blob_deep_inside_dir() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir .path() @@ -555,8 +648,8 @@ fn read_blob_deep_inside_dir() { } /// Ensure readdir works. -#[test] -fn readdir() { +#[tokio::test] +async fn readdir() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -564,8 +657,17 @@ fn readdir() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME); @@ -601,9 +703,9 @@ fn readdir() { fuser_session.join() } -#[test] +#[tokio::test] /// Do a readdir deeper inside a directory, without doing readdir or stat in the parent directory. -fn readdir_deep() { +async fn readdir_deep() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -611,8 +713,17 @@ fn readdir_deep() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); @@ -636,8 +747,8 @@ fn readdir_deep() { } /// Check attributes match how they show up in /nix/store normally. -#[test] -fn check_attributes() { +#[tokio::test] +async fn check_attributes() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -645,11 +756,18 @@ fn check_attributes() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| { - populate_blob_a(bs.clone(), ds.clone(), ps.clone()); - populate_directory_with_keep(bs.clone(), ds.clone(), ps.clone()); - populate_symlink(bs, ds, ps); - }) + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + populate_symlink(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) .expect("must succeed"); let p_file = tmpdir.path().join(BLOB_A_NAME); @@ -689,10 +807,10 @@ fn check_attributes() { fuser_session.join() } -#[test] +#[tokio::test] /// Ensure we allocate the same inodes for the same directory contents. /// $DIRECTORY_COMPLICATED_NAME/keep contains the same data as $DIRECTORY_WITH_KEEP. -fn compare_inodes_directories() { +async fn compare_inodes_directories() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -700,10 +818,17 @@ fn compare_inodes_directories() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| { - populate_directory_with_keep(bs.clone(), ds.clone(), ps.clone()); - populate_directory_complicated(bs, ds, ps); - }) + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) .expect("must succeed"); let p_dir_with_keep = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); @@ -720,8 +845,8 @@ fn compare_inodes_directories() { /// Ensure we allocate the same inodes for the same directory contents. /// $DIRECTORY_COMPLICATED_NAME/keep/,keep contains the same data as $DIRECTORY_COMPLICATED_NAME/.keep -#[test] -fn compare_inodes_files() { +#[tokio::test] +async fn compare_inodes_files() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -729,8 +854,17 @@ fn compare_inodes_files() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_directory_complicated).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p_keep1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join(".keep"); let p_keep2 = tmpdir @@ -750,8 +884,8 @@ fn compare_inodes_files() { /// Ensure we allocate the same inode for symlinks pointing to the same targets. /// $DIRECTORY_COMPLICATED_NAME/aa points to the same target as SYMLINK_NAME2. -#[test] -fn compare_inodes_symlinks() { +#[tokio::test] +async fn compare_inodes_symlinks() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -759,10 +893,17 @@ fn compare_inodes_symlinks() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |bs: Arc<_>, ds: Arc<_>, ps: Arc<_>| { - populate_directory_complicated(bs.clone(), ds.clone(), ps.clone()); - populate_symlink2(bs, ds, ps); - }) + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + populate_symlink2(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) .expect("must succeed"); let p1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("aa"); @@ -778,8 +919,8 @@ fn compare_inodes_symlinks() { } /// Check we match paths exactly. -#[test] -fn read_wrong_paths_in_root() { +#[tokio::test] +async fn read_wrong_paths_in_root() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -787,7 +928,17 @@ fn read_wrong_paths_in_root() { } let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), populate_blob_a).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); // wrong name assert!(!tmpdir @@ -817,8 +968,8 @@ fn read_wrong_paths_in_root() { } /// Make sure writes are not allowed -#[test] -fn disallow_writes() { +#[tokio::test] +async fn disallow_writes() { // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); @@ -827,7 +978,16 @@ fn disallow_writes() { let tmpdir = TempDir::new().unwrap(); - let fuser_session = setup_and_mount(tmpdir.path(), |_, _, _| {}).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(BLOB_A_NAME); let e = std::fs::File::create(p).expect_err("must fail"); @@ -837,17 +997,26 @@ fn disallow_writes() { fuser_session.join() } -#[test] +#[tokio::test] /// Ensure we get an IO error if the directory service does not have the Directory object. -fn missing_directory() { +async fn missing_directory() { if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); return; } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_pathinfo_without_directory).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); @@ -871,17 +1040,26 @@ fn missing_directory() { fuser_session.join() } -#[test] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] /// Ensure we get an IO error if the blob service does not have the blob -fn missing_blob() { +async fn missing_blob() { if !std::path::Path::new("/dev/fuse").exists() { eprintln!("skipping test"); return; } let tmpdir = TempDir::new().unwrap(); - let fuser_session = - setup_and_mount(tmpdir.path(), populate_blob_a_without_blob).expect("must succeed"); + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service); + + let fuser_session = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); let p = tmpdir.path().join(BLOB_A_NAME); |