diff options
Diffstat (limited to 'tvix/store/src/fs')
-rw-r--r-- | tvix/store/src/fs/mod.rs | 43 | ||||
-rw-r--r-- | tvix/store/src/fs/tests.rs | 231 |
2 files changed, 178 insertions, 96 deletions
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs index 48e605406331..02d3bb3221ad 100644 --- a/tvix/store/src/fs/mod.rs +++ b/tvix/store/src/fs/mod.rs @@ -16,6 +16,7 @@ use crate::{ B3Digest, Error, }; use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; +use futures::StreamExt; use nix_compat::store_path::StorePath; use parking_lot::RwLock; use std::{ @@ -26,7 +27,10 @@ use std::{ sync::{atomic::Ordering, Arc}, time::Duration, }; -use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; +use tokio::{ + io::{AsyncBufReadExt, AsyncSeekExt}, + sync::mpsc, +}; use tracing::{debug, info_span, warn}; use self::{ @@ -159,7 +163,11 @@ impl TvixStoreFs { ))) } else { // If we don't have it, look it up in PathInfoService. - match self.path_info_service.get(store_path.digest)? { + let path_info_service = self.path_info_service.clone(); + let task = self + .tokio_handle + .spawn(async move { path_info_service.get(store_path.digest).await }); + match self.tokio_handle.block_on(task).unwrap()? { // the pathinfo doesn't exist, so the file doesn't exist. None => Ok(None), Some(path_info) => { @@ -204,7 +212,12 @@ impl TvixStoreFs { /// This is both used to initially insert the root node of a store path, /// as well as when looking up an intermediate DirectoryNode. fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result<InodeData, Error> { - match self.directory_service.get(directory_digest) { + let directory_service = self.directory_service.clone(); + let directory_digest_clone = directory_digest.clone(); + let task = self + .tokio_handle + .spawn(async move { directory_service.get(&directory_digest_clone).await }); + match self.tokio_handle.block_on(task).unwrap() { Err(e) => { warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory"); Err(e) @@ -369,12 +382,23 @@ impl FileSystem for TvixStoreFs { if !self.list_root { return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } else { - for (i, path_info) in self - .path_info_service - .list() - .skip(offset as usize) - .enumerate() - { + let path_info_service = self.path_info_service.clone(); + let (tx, mut rx) = mpsc::channel(16); + + // This task will run in the background immediately and will exit + // after the stream ends or if we no longer want any more entries. + self.tokio_handle.spawn(async move { + let mut stream = path_info_service.list().skip(offset as usize).enumerate(); + while let Some(path_info) = stream.next().await { + if tx.send(path_info).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } + } + }); + + while let Some((i, path_info)) = rx.blocking_recv() { let path_info = match path_info { Err(e) => { warn!("failed to retrieve pathinfo: {}", e); @@ -421,6 +445,7 @@ impl FileSystem for TvixStoreFs { break; } } + return Ok(()); } } diff --git a/tvix/store/src/fs/tests.rs b/tvix/store/src/fs/tests.rs index 30f5ca3f40aa..6837f8aa293a 100644 --- a/tvix/store/src/fs/tests.rs +++ b/tvix/store/src/fs/tests.rs @@ -1,8 +1,10 @@ +use futures::StreamExt; use std::io::Cursor; use std::os::unix::prelude::MetadataExt; use std::path::Path; use std::sync::Arc; -use std::{fs, io}; +use tokio::{fs, io}; +use tokio_stream::wrappers::ReadDirStream; use tempfile::TempDir; @@ -75,7 +77,10 @@ async fn populate_blob_a( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } async fn populate_blob_b( @@ -102,7 +107,10 @@ async fn populate_blob_b( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// adds a blob containing helloworld and marks it as executable @@ -133,10 +141,13 @@ async fn populate_helloworld_blob( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } -fn populate_symlink( +async fn populate_symlink( _blob_service: &Arc<dyn BlobService>, _directory_service: &Arc<dyn DirectoryService>, path_info_service: &Arc<dyn PathInfoService>, @@ -151,12 +162,15 @@ fn populate_symlink( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// This writes a symlink pointing to /nix/store/somewhereelse, /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. -fn populate_symlink2( +async fn populate_symlink2( _blob_service: &Arc<dyn BlobService>, _directory_service: &Arc<dyn DirectoryService>, path_info_service: &Arc<dyn PathInfoService>, @@ -171,7 +185,10 @@ fn populate_symlink2( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } async fn populate_directory_with_keep( @@ -189,6 +206,7 @@ async fn populate_directory_with_keep( // upload directory directory_service .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await .expect("must succeed uploading"); // upload pathinfo @@ -202,12 +220,15 @@ async fn populate_directory_with_keep( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory /// itself. -fn populate_pathinfo_without_directory( +async fn populate_pathinfo_without_directory( _: &Arc<dyn BlobService>, _: &Arc<dyn DirectoryService>, path_info_service: &Arc<dyn PathInfoService>, @@ -223,11 +244,14 @@ fn populate_pathinfo_without_directory( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// Insert , but don't provide the blob .keep is pointing to -fn populate_blob_a_without_blob( +async fn populate_blob_a_without_blob( _: &Arc<dyn BlobService>, _: &Arc<dyn DirectoryService>, path_info_service: &Arc<dyn PathInfoService>, @@ -244,7 +268,10 @@ fn populate_blob_a_without_blob( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } async fn populate_directory_complicated( @@ -262,11 +289,13 @@ async fn populate_directory_complicated( // upload inner directory directory_service .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await .expect("must succeed uploading"); // uplodad parent directory directory_service .put(fixtures::DIRECTORY_COMPLICATED.clone()) + .await .expect("must succeed uploading"); // upload pathinfo @@ -280,7 +309,10 @@ async fn populate_directory_complicated( }), ..Default::default() }; - path_info_service.put(path_info).expect("must succeed"); + path_info_service + .put(path_info) + .await + .expect("must succeed"); } /// Ensure mounting itself doesn't fail @@ -329,9 +361,13 @@ async fn root() { { // read_dir succeeds, but getting the first element will fail. - let mut it = fs::read_dir(tmpdir).expect("must succeed"); + let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed")); - let err = it.next().expect("must be some").expect_err("must be err"); + let err = it + .next() + .await + .expect("must be some") + .expect_err("must be err"); assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); } @@ -362,11 +398,15 @@ async fn root_with_listing() { { // read_dir succeeds, but getting the first element will fail. - let mut it = fs::read_dir(tmpdir).expect("must succeed"); + let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed")); - let e = it.next().expect("must be some").expect("must succeed"); + let e = it + .next() + .await + .expect("must be some") + .expect("must succeed"); - let metadata = e.metadata().expect("must succeed"); + let metadata = e.metadata().await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); @@ -400,7 +440,7 @@ async fn stat_file_at_root() { let p = tmpdir.path().join(BLOB_A_NAME); // peek at the file metadata - let metadata = fs::metadata(p).expect("must succeed"); + let metadata = fs::metadata(p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); @@ -434,7 +474,7 @@ async fn read_file_at_root() { let p = tmpdir.path().join(BLOB_A_NAME); // read the file contents - let data = fs::read(p).expect("must succeed"); + let data = fs::read(p).await.expect("must succeed"); // ensure size and contents match assert_eq!(fixtures::BLOB_A.len(), data.len()); @@ -468,7 +508,7 @@ async fn read_large_file_at_root() { let p = tmpdir.path().join(BLOB_B_NAME); { // peek at the file metadata - let metadata = fs::metadata(&p).expect("must succeed"); + let metadata = fs::metadata(&p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); @@ -476,7 +516,7 @@ async fn read_large_file_at_root() { } // read the file contents - let data = fs::read(p).expect("must succeed"); + let data = fs::read(p).await.expect("must succeed"); // ensure size and contents match assert_eq!(fixtures::BLOB_B.len(), data.len()); @@ -496,7 +536,7 @@ async fn symlink_readlink() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_symlink(&blob_service, &directory_service, &path_info_service); + populate_symlink(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -509,20 +549,20 @@ async fn symlink_readlink() { let p = tmpdir.path().join(SYMLINK_NAME); - let target = fs::read_link(&p).expect("must succeed"); + let target = fs::read_link(&p).await.expect("must succeed"); assert_eq!(BLOB_A_NAME, target.to_str().unwrap()); // peek at the file metadata, which follows symlinks. // this must fail, as we didn't populate the target. - let e = fs::metadata(&p).expect_err("must fail"); + let e = fs::metadata(&p).await.expect_err("must fail"); assert_eq!(std::io::ErrorKind::NotFound, e.kind()); // peeking at the file metadata without following symlinks will succeed. - let metadata = fs::symlink_metadata(&p).expect("must succeed"); + let metadata = fs::symlink_metadata(&p).await.expect("must succeed"); assert!(metadata.is_symlink()); // reading from the symlink (which follows) will fail, because the target doesn't exist. - let e = fs::read(p).expect_err("must fail"); + let e = fs::read(p).await.expect_err("must fail"); assert_eq!(std::io::ErrorKind::NotFound, e.kind()); fuse_daemon.unmount().expect("unmount"); @@ -540,7 +580,7 @@ async fn read_stat_through_symlink() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - populate_symlink(&blob_service, &directory_service, &path_info_service); + populate_symlink(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -556,16 +596,16 @@ async fn read_stat_through_symlink() { // peek at the file metadata, which follows symlinks. // this must now return the same metadata as when statting at the target directly. - let metadata_symlink = fs::metadata(&p_symlink).expect("must succeed"); - let metadata_blob = fs::metadata(&p_blob).expect("must succeed"); + let metadata_symlink = fs::metadata(&p_symlink).await.expect("must succeed"); + let metadata_blob = fs::metadata(&p_blob).await.expect("must succeed"); assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type()); assert_eq!(metadata_blob.len(), metadata_symlink.len()); // reading from the symlink (which follows) will return the same data as if // we were reading from the file directly. assert_eq!( - std::fs::read(p_blob).expect("must succeed"), - std::fs::read(p_symlink).expect("must succeed"), + fs::read(p_blob).await.expect("must succeed"), + fs::read(p_symlink).await.expect("must succeed"), ); fuse_daemon.unmount().expect("unmount"); @@ -596,7 +636,7 @@ async fn read_stat_directory() { let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); // peek at the metadata of the directory - let metadata = fs::metadata(p).expect("must succeed"); + let metadata = fs::metadata(p).await.expect("must succeed"); assert!(metadata.is_dir()); assert!(metadata.permissions().readonly()); @@ -628,12 +668,12 @@ async fn read_blob_inside_dir() { let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep"); // peek at metadata. - let metadata = fs::metadata(&p).expect("must succeed"); + let metadata = fs::metadata(&p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); // read from it - let data = fs::read(&p).expect("must succeed"); + let data = fs::read(&p).await.expect("must succeed"); assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); fuse_daemon.unmount().expect("unmount"); @@ -669,12 +709,12 @@ async fn read_blob_deep_inside_dir() { .join(".keep"); // peek at metadata. - let metadata = fs::metadata(&p).expect("must succeed"); + let metadata = fs::metadata(&p).await.expect("must succeed"); assert!(metadata.is_file()); assert!(metadata.permissions().readonly()); // read from it - let data = fs::read(&p).expect("must succeed"); + let data = fs::read(&p).await.expect("must succeed"); assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); fuse_daemon.unmount().expect("unmount"); @@ -706,10 +746,10 @@ async fn readdir() { { // read_dir should succeed. Collect all elements - let elements: Vec<_> = fs::read_dir(p) - .expect("must succeed") + let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed")) .map(|e| e.expect("must not be err")) - .collect(); + .collect() + .await; assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and .. @@ -719,18 +759,18 @@ async fn readdir() { // ".keep", 0 byte file. let e = &elements[0]; assert_eq!(".keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_file()); - assert_eq!(0, e.metadata().expect("must succeed").len()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); // "aa", symlink. let e = &elements[1]; assert_eq!("aa", e.file_name()); - assert!(e.file_type().expect("must succeed").is_symlink()); + assert!(e.file_type().await.expect("must succeed").is_symlink()); // "keep", directory let e = &elements[2]; assert_eq!("keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_dir()); + assert!(e.file_type().await.expect("must succeed").is_dir()); } fuse_daemon.unmount().expect("unmount"); @@ -762,18 +802,18 @@ async fn readdir_deep() { { // read_dir should succeed. Collect all elements - let elements: Vec<_> = fs::read_dir(p) - .expect("must succeed") + let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed")) .map(|e| e.expect("must not be err")) - .collect(); + .collect() + .await; assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and .. // ".keep", 0 byte file. let e = &elements[0]; assert_eq!(".keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_file()); - assert_eq!(0, e.metadata().expect("must succeed").len()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); } fuse_daemon.unmount().expect("unmount"); @@ -792,7 +832,7 @@ async fn check_attributes() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_a(&blob_service, &directory_service, &path_info_service).await; populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - populate_symlink(&blob_service, &directory_service, &path_info_service); + populate_symlink(&blob_service, &directory_service, &path_info_service).await; populate_helloworld_blob(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( @@ -810,10 +850,16 @@ async fn check_attributes() { let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME); // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident. - let metadata_file = fs::symlink_metadata(&p_file).expect("must succeed"); - let metadata_executable_file = fs::symlink_metadata(&p_executable_file).expect("must succeed"); - let metadata_directory = fs::symlink_metadata(&p_directory).expect("must succeed"); - let metadata_symlink = fs::symlink_metadata(&p_symlink).expect("must succeed"); + let metadata_file = fs::symlink_metadata(&p_file).await.expect("must succeed"); + let metadata_executable_file = fs::symlink_metadata(&p_executable_file) + .await + .expect("must succeed"); + let metadata_directory = fs::symlink_metadata(&p_directory) + .await + .expect("must succeed"); + let metadata_symlink = fs::symlink_metadata(&p_symlink) + .await + .expect("must succeed"); // modes should match. We & with 0o777 to remove any higher bits. assert_eq!(0o444, metadata_file.mode() & 0o777); @@ -873,8 +919,14 @@ async fn compare_inodes_directories() { // peek at metadata. assert_eq!( - fs::metadata(p_dir_with_keep).expect("must succeed").ino(), - fs::metadata(p_sibling_dir).expect("must succeed").ino() + fs::metadata(p_dir_with_keep) + .await + .expect("must succeed") + .ino(), + fs::metadata(p_sibling_dir) + .await + .expect("must succeed") + .ino() ); fuse_daemon.unmount().expect("unmount"); @@ -912,8 +964,8 @@ async fn compare_inodes_files() { // peek at metadata. assert_eq!( - fs::metadata(p_keep1).expect("must succeed").ino(), - fs::metadata(p_keep2).expect("must succeed").ino() + fs::metadata(p_keep1).await.expect("must succeed").ino(), + fs::metadata(p_keep2).await.expect("must succeed").ino() ); fuse_daemon.unmount().expect("unmount"); @@ -932,7 +984,7 @@ async fn compare_inodes_symlinks() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - populate_symlink2(&blob_service, &directory_service, &path_info_service); + populate_symlink2(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -948,8 +1000,8 @@ async fn compare_inodes_symlinks() { // peek at metadata. assert_eq!( - fs::symlink_metadata(p1).expect("must succeed").ino(), - fs::symlink_metadata(p2).expect("must succeed").ino() + fs::symlink_metadata(p1).await.expect("must succeed").ino(), + fs::symlink_metadata(p2).await.expect("must succeed").ino() ); fuse_daemon.unmount().expect("unmount"); @@ -978,28 +1030,32 @@ async fn read_wrong_paths_in_root() { .expect("must succeed"); // wrong name - assert!(!tmpdir - .path() - .join("00000000000000000000000000000000-tes") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); // invalid hash - assert!(!tmpdir - .path() - .join("0000000000000000000000000000000-test") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("0000000000000000000000000000000-test")) + .await + .is_err() + ); // right name, must exist - assert!(tmpdir - .path() - .join("00000000000000000000000000000000-test") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-test")) + .await + .is_ok() + ); // now wrong name with right hash still may not exist - assert!(!tmpdir - .path() - .join("00000000000000000000000000000000-tes") - .exists()); + assert!( + fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); fuse_daemon.unmount().expect("unmount"); } @@ -1027,7 +1083,7 @@ async fn disallow_writes() { .expect("must succeed"); let p = tmpdir.path().join(BLOB_A_NAME); - let e = std::fs::File::create(p).expect_err("must fail"); + let e = fs::File::create(p).await.expect_err("must fail"); assert_eq!(Some(libc::EROFS), e.raw_os_error()); @@ -1044,7 +1100,8 @@ async fn missing_directory() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service); + populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service) + .await; let mut fuse_daemon = do_mount( blob_service, @@ -1059,19 +1116,19 @@ async fn missing_directory() { { // `stat` on the path should succeed, because it doesn't trigger the directory request. - fs::metadata(&p).expect("must succeed"); + fs::metadata(&p).await.expect("must succeed"); // However, calling either `readdir` or `stat` on a child should fail with an IO error. // It fails when trying to pull the first entry, because we don't implement opendir separately - fs::read_dir(&p) - .unwrap() + ReadDirStream::new(fs::read_dir(&p).await.unwrap()) .next() + .await .expect("must be some") .expect_err("must be err"); // rust currently sets e.kind() to Uncategorized, which isn't very // helpful, so we don't look at the error more closely than that.. - fs::metadata(p.join(".keep")).expect_err("must fail"); + fs::metadata(p.join(".keep")).await.expect_err("must fail"); } fuse_daemon.unmount().expect("unmount"); @@ -1087,7 +1144,7 @@ async fn missing_blob() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service); + populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service).await; let mut fuse_daemon = do_mount( blob_service, @@ -1102,12 +1159,12 @@ async fn missing_blob() { { // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service. - fs::metadata(&p).expect("must succeed"); + fs::metadata(&p).await.expect("must succeed"); // However, calling read on the blob should fail. // rust currently sets e.kind() to Uncategorized, which isn't very // helpful, so we don't look at the error more closely than that.. - fs::read(p).expect_err("must fail"); + fs::read(p).await.expect_err("must fail"); } fuse_daemon.unmount().expect("unmount"); |