From 37a348b4fae16b2b1c5ec12deaa085a049833d7f Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Tue, 19 Sep 2023 11:46:41 -0500 Subject: refactor(tvix/store): Asyncify PathInfoService and DirectoryService We've decided to asyncify all of the services to reduce some of the pains going back and for between sync<->async. The end goal will be for all the tvix-store internals to be async and then expose a sync interface for things like tvix eval io. Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369 Autosubmit: Connor Brewster Tested-by: BuildkiteCI Reviewed-by: flokli --- tvix/store/src/fs/tests.rs | 231 ++++++++++++++++++++++++++++----------------- 1 file changed, 144 insertions(+), 87 deletions(-) (limited to 'tvix/store/src/fs/tests.rs') diff --git a/tvix/store/src/fs/tests.rs b/tvix/store/src/fs/tests.rs index 30f5ca3f40..6837f8aa29 100644 --- a/tvix/store/src/fs/tests.rs +++ b/tvix/store/src/fs/tests.rs @@ -1,8 +1,10 @@ +use futures::StreamExt; use std::io::Cursor; use std::os::unix::prelude::MetadataExt; use std::path::Path; use std::sync::Arc; -use std::{fs, io}; +use tokio::{fs, io}; +use tokio_stream::wrappers::ReadDirStream; use tempfile::TempDir; @@ -75,7 +77,10 @@ async fn populate_blob_a( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } async fn populate_blob_b( @@ -102,7 +107,10 @@ async fn populate_blob_b( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// adds a blob containing helloworld and marks it as executable @@ -133,10 +141,13 @@ async fn populate_helloworld_blob( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } -fn populate_symlink( +async fn populate_symlink( _blob_service: &Arc, _directory_service: &Arc, path_info_service: &Arc, @@ -151,12 +162,15 @@ fn populate_symlink( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// This writes a symlink pointing to /nix/store/somewhereelse, /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. -fn populate_symlink2( +async fn populate_symlink2( _blob_service: &Arc, _directory_service: &Arc, path_info_service: &Arc, @@ -171,7 +185,10 @@ fn populate_symlink2( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } async fn populate_directory_with_keep( @@ -189,6 +206,7 @@ async fn populate_directory_with_keep( // upload directory directory_service .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await .expect("must succeed uploading"); // upload pathinfo @@ -202,12 +220,15 @@ async fn populate_directory_with_keep( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory /// itself. -fn populate_pathinfo_without_directory( +async fn populate_pathinfo_without_directory( _: &Arc, _: &Arc, path_info_service: &Arc, @@ -223,11 +244,14 @@ fn populate_pathinfo_without_directory( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// Insert , but don't provide the blob .keep is pointing to -fn populate_blob_a_without_blob( +async fn populate_blob_a_without_blob( _: &Arc, _: &Arc, path_info_service: &Arc, @@ -244,7 +268,10 @@ fn populate_blob_a_without_blob( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } async fn populate_directory_complicated( @@ -262,11 +289,13 @@ async fn populate_directory_complicated( // upload inner directory directory_service .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await .expect("must succeed uploading"); // uplodad parent directory directory_service .put(fixtures::DIRECTORY_COMPLICATED.clone()) + .await .expect("must succeed uploading"); // upload pathinfo @@ -280,7 +309,10 @@ async fn populate_directory_complicated( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// Ensure mounting itself doesn't fail @@ -329,9 +361,13 @@ async fn root() { { // read_dir succeeds, but getting the first element will fail. - let mut it = fs::read_dir(tmpdir).expect("must succeed"); + let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed")); - let err = it.next().expect("must be some").expect_err("must be err"); + let err = it + .next() + .await + .expect("must be some") + .expect_err("must be err"); assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); } @@ -362,11 +398,15 @@ async fn root_with_listing() { { // read_dir succeeds, but getting the first element will fail. - let mut it = fs::read_dir(tmpdir).expect("must succeed"); + let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed")); - let e = it.next().expect("must be some").expect("must succeed"); + let e = it + .next() + .await + .expect("must be some") + .expect("must succeed"); - let metadata = e.metadata().expect("must succeed"); + let metadata = e.metadata().await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); @@ -400,7 +440,7 @@ async fn stat_file_at_root() { let p = tmpdir.path().join(BLOB_A_NAME); // peek at the file metadata - let metadata = fs::metadata(p).expect("must succeed"); + let metadata = fs::metadata(p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); @@ -434,7 +474,7 @@ async fn read_file_at_root() { let p = tmpdir.path().join(BLOB_A_NAME); // read the file contents - let data = fs::read(p).expect("must succeed"); + let data = fs::read(p).await.expect("must succeed"); // ensure size and contents match assert_eq!(fixtures::BLOB_A.len(), data.len()); @@ -468,7 +508,7 @@ async fn read_large_file_at_root() { let p = tmpdir.path().join(BLOB_B_NAME); { // peek at the file metadata - let metadata = fs::metadata(&p).expect("must succeed"); + let metadata = fs::metadata(&p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); @@ -476,7 +516,7 @@ async fn read_large_file_at_root() { } // read the file contents - let data = fs::read(p).expect("must succeed"); + let data = fs::read(p).await.expect("must succeed"); // ensure size and contents match assert_eq!(fixtures::BLOB_B.len(), data.len()); @@ -496,7 +536,7 @@ async fn symlink_readlink() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_symlink(&blob_service, &directory_service, &path_info_service); + populate_symlink(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -509,20 +549,20 @@ async fn symlink_readlink() { let p = tmpdir.path().join(SYMLINK_NAME); - let target = fs::read_link(&p).expect("must succeed"); + let target = fs::read_link(&p).await.expect("must succeed"); assert_eq!(BLOB_A_NAME, target.to_str().unwrap()); // peek at the file metadata, which follows symlinks. // this must fail, as we didn't populate the target. - let e = fs::metadata(&p).expect_err("must fail"); + let e = fs::metadata(&p).await.expect_err("must fail"); assert_eq!(std::io::ErrorKind::NotFound, e.kind()); // peeking at the file metadata without following symlinks will succeed. - let metadata = fs::symlink_metadata(&p).expect("must succeed"); + let metadata = fs::symlink_metadata(&p).await.expect("must succeed"); assert!(metadata.is_symlink()); // reading from the symlink (which follows) will fail, because the target doesn't exist. - let e = fs::read(p).expect_err("must fail"); + let e = fs::read(p).await.expect_err("must fail"); assert_eq!(std::io::ErrorKind::NotFound, e.kind()); fuse_daemon.unmount().expect("unmount"); @@ -540,7 +580,7 @@ async fn read_stat_through_symlink() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - populate_symlink(&blob_service, &directory_service, &path_info_service); + populate_symlink(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -556,16 +596,16 @@ async fn read_stat_through_symlink() { // peek at the file metadata, which follows symlinks. // this must now return the same metadata as when statting at the target directly. - let metadata_symlink = fs::metadata(&p_symlink).expect("must succeed"); - let metadata_blob = fs::metadata(&p_blob).expect("must succeed"); + let metadata_symlink = fs::metadata(&p_symlink).await.expect("must succeed"); + let metadata_blob = fs::metadata(&p_blob).await.expect("must succeed"); assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type()); assert_eq!(metadata_blob.len(), metadata_symlink.len()); // reading from the symlink (which follows) will return the same data as if // we were reading from the file directly. assert_eq!( - std::fs::read(p_blob).expect("must succeed"), - std::fs::read(p_symlink).expect("must succeed"), + fs::read(p_blob).await.expect("must succeed"), + fs::read(p_symlink).await.expect("must succeed"), ); fuse_daemon.unmount().expect("unmount"); @@ -596,7 +636,7 @@ async fn read_stat_directory() { let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); // peek at the metadata of the directory - let metadata = fs::metadata(p).expect("must succeed"); + let metadata = fs::metadata(p).await.expect("must succeed"); assert!(metadata.is_dir()); assert!(metadata.permissions().readonly()); @@ -628,12 +668,12 @@ async fn read_blob_inside_dir() { let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep"); // peek at metadata. - let metadata = fs::metadata(&p).expect("must succeed"); + let metadata = fs::metadata(&p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); // read from it - let data = fs::read(&p).expect("must succeed"); + let data = fs::read(&p).await.expect("must succeed"); assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); fuse_daemon.unmount().expect("unmount"); @@ -669,12 +709,12 @@ async fn read_blob_deep_inside_dir() { .join(".keep"); // peek at metadata. - let metadata = fs::metadata(&p).expect("must succeed"); + let metadata = fs::metadata(&p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); // read from it - let data = fs::read(&p).expect("must succeed"); + let data = fs::read(&p).await.expect("must succeed"); assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); fuse_daemon.unmount().expect("unmount"); @@ -706,10 +746,10 @@ async fn readdir() { { // read_dir should succeed. Collect all elements - let elements: Vec<_> = fs::read_dir(p) - .expect("must succeed") + let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed")) .map(|e| e.expect("must not be err")) - .collect(); + .collect() + .await; assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and .. @@ -719,18 +759,18 @@ async fn readdir() { // ".keep", 0 byte file. let e = &elements[0]; assert_eq!(".keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_file()); - assert_eq!(0, e.metadata().expect("must succeed").len()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); // "aa", symlink. let e = &elements[1]; assert_eq!("aa", e.file_name()); - assert!(e.file_type().expect("must succeed").is_symlink()); + assert!(e.file_type().await.expect("must succeed").is_symlink()); // "keep", directory let e = &elements[2]; assert_eq!("keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_dir()); + assert!(e.file_type().await.expect("must succeed").is_dir()); } fuse_daemon.unmount().expect("unmount"); @@ -762,18 +802,18 @@ async fn readdir_deep() { { // read_dir should succeed. Collect all elements - let elements: Vec<_> = fs::read_dir(p) - .expect("must succeed") + let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed")) .map(|e| e.expect("must not be err")) - .collect(); + .collect() + .await; assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and .. // ".keep", 0 byte file. let e = &elements[0]; assert_eq!(".keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_file()); - assert_eq!(0, e.metadata().expect("must succeed").len()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); } fuse_daemon.unmount().expect("unmount"); @@ -792,7 +832,7 @@ async fn check_attributes() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_a(&blob_service, &directory_service, &path_info_service).await; populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - populate_symlink(&blob_service, &directory_service, &path_info_service); + populate_symlink(&blob_service, &directory_service, &path_info_service).await; populate_helloworld_blob(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( @@ -810,10 +850,16 @@ async fn check_attributes() { let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME); // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident. - let metadata_file = fs::symlink_metadata(&p_file).expect("must succeed"); - let metadata_executable_file = fs::symlink_metadata(&p_executable_file).expect("must succeed"); - let metadata_directory = fs::symlink_metadata(&p_directory).expect("must succeed"); - let metadata_symlink = fs::symlink_metadata(&p_symlink).expect("must succeed"); + let metadata_file = fs::symlink_metadata(&p_file).await.expect("must succeed"); + let metadata_executable_file = fs::symlink_metadata(&p_executable_file) + .await + .expect("must succeed"); + let metadata_directory = fs::symlink_metadata(&p_directory) + .await + .expect("must succeed"); + let metadata_symlink = fs::symlink_metadata(&p_symlink) + .await + .expect("must succeed"); // modes should match. We & with 0o777 to remove any higher bits. assert_eq!(0o444, metadata_file.mode() & 0o777); @@ -873,8 +919,14 @@ async fn compare_inodes_directories() { // peek at metadata. assert_eq!( - fs::metadata(p_dir_with_keep).expect("must succeed").ino(), - fs::metadata(p_sibling_dir).expect("must succeed").ino() + fs::metadata(p_dir_with_keep) + .await + .expect("must succeed") + .ino(), + fs::metadata(p_sibling_dir) + .await + .expect("must succeed") + .ino() ); fuse_daemon.unmount().expect("unmount"); @@ -912,8 +964,8 @@ async fn compare_inodes_files() { // peek at metadata. assert_eq!( - fs::metadata(p_keep1).expect("must succeed").ino(), - fs::metadata(p_keep2).expect("must succeed").ino() + fs::metadata(p_keep1).await.expect("must succeed").ino(), + fs::metadata(p_keep2).await.expect("must succeed").ino() ); fuse_daemon.unmount().expect("unmount"); @@ -932,7 +984,7 @@ async fn compare_inodes_symlinks() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - populate_symlink2(&blob_service, &directory_service, &path_info_service); + populate_symlink2(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -948,8 +1000,8 @@ async fn compare_inodes_symlinks() { // peek at metadata. assert_eq!( - fs::symlink_metadata(p1).expect("must succeed").ino(), - fs::symlink_metadata(p2).expect("must succeed").ino() + fs::symlink_metadata(p1).await.expect("must succeed").ino(), + fs::symlink_metadata(p2).await.expect("must succeed").ino() ); fuse_daemon.unmount().expect("unmount"); @@ -978,28 +1030,32 @@ async fn read_wrong_paths_in_root() { .expect("must succeed"); // wrong name - assert!(!tmpdir - .path() - .join("00000000000000000000000000000000-tes") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); // invalid hash - assert!(!tmpdir - .path() - .join("0000000000000000000000000000000-test") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("0000000000000000000000000000000-test")) + .await + .is_err() + ); // right name, must exist - assert!(tmpdir - .path() - .join("00000000000000000000000000000000-test") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-test")) + .await + .is_ok() + ); // now wrong name with right hash still may not exist - assert!(!tmpdir - .path() - .join("00000000000000000000000000000000-tes") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); fuse_daemon.unmount().expect("unmount"); } @@ -1027,7 +1083,7 @@ async fn disallow_writes() { .expect("must succeed"); let p = tmpdir.path().join(BLOB_A_NAME); - let e = std::fs::File::create(p).expect_err("must fail"); + let e = fs::File::create(p).await.expect_err("must fail"); assert_eq!(Some(libc::EROFS), e.raw_os_error()); @@ -1044,7 +1100,8 @@ async fn missing_directory() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service); + populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service) + .await; let mut fuse_daemon = do_mount( blob_service, @@ -1059,19 +1116,19 @@ async fn missing_directory() { { // `stat` on the path should succeed, because it doesn't trigger the directory request. - fs::metadata(&p).expect("must succeed"); + fs::metadata(&p).await.expect("must succeed"); // However, calling either `readdir` or `stat` on a child should fail with an IO error. // It fails when trying to pull the first entry, because we don't implement opendir separately - fs::read_dir(&p) - .unwrap() + ReadDirStream::new(fs::read_dir(&p).await.unwrap()) .next() + .await .expect("must be some") .expect_err("must be err"); // rust currently sets e.kind() to Uncategorized, which isn't very // helpful, so we don't look at the error more closely than that.. - fs::metadata(p.join(".keep")).expect_err("must fail"); + fs::metadata(p.join(".keep")).await.expect_err("must fail"); } fuse_daemon.unmount().expect("unmount"); @@ -1087,7 +1144,7 @@ async fn missing_blob() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service); + populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -1102,12 +1159,12 @@ async fn missing_blob() { { // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service. - fs::metadata(&p).expect("must succeed"); + fs::metadata(&p).await.expect("must succeed"); // However, calling read on the blob should fail. // rust currently sets e.kind() to Uncategorized, which isn't very // helpful, so we don't look at the error more closely than that.. - fs::read(p).expect_err("must fail"); + fs::read(p).await.expect_err("must fail"); } fuse_daemon.unmount().expect("unmount"); -- cgit 1.4.1