diff options
author | Florian Klink <flokli@flokli.de> | 2023-09-13T12·20+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-09-18T10·33+0000 |
commit | da6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch) | |
tree | 5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix/store/src/tests | |
parent | 3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff) |
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync. This however had some annoying consequences: - It became more and more complicated to track when we're in a context with an async runtime in the context or not, producing bugs like https://b.tvl.fyi/issues/304 - The sync trait shielded away async clients from async worloads, requiring manual block_on code inside the gRPC client code, and spawn_blocking calls in consumers of the trait, even if they were async (like the gRPC server) - We had to write our own custom glue code (SyncReadIntoAsyncRead) to convert a sync io::Read into a tokio::io::AsyncRead, which already existed in tokio internally, but upstream ia hesitant to expose. This now makes the BlobService trait async (via the async_trait macro, like we already do in various gRPC parts), and replaces the sync readers and writers with their async counterparts. Tests interacting with a BlobService now need to have an async runtime available, the easiest way for this is to mark the test functions with the tokio::test macro, allowing us to directly .await in the test function. In places where we don't have an async runtime available from context (like tvix-cli), we can pass one down explicitly. Now that we don't provide a sync interface anymore, the (sync) FUSE library now holds a pointer to a tokio runtime handle, and needs to at least have 2 threads available when talking to a blob service (which is why some of the tests now use the multi_thread flavor). The FUSE tests got a bit more verbose, as we couldn't use the setup_and_mount function accepting a callback anymore. We can hopefully move some of the test fixture setup to rstest in the future to make this less repetitive. Co-Authored-By: Connor Brewster <cbrewster@hey.com> Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329 Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src/tests')
-rw-r--r-- | tvix/store/src/tests/import.rs | 19 | ||||
-rw-r--r-- | tvix/store/src/tests/nar_renderer.rs | 212 |
2 files changed, 139 insertions, 92 deletions
diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs index ccaa4f4e4244..45b9c3440d89 100644 --- a/tvix/store/src/tests/import.rs +++ b/tvix/store/src/tests/import.rs @@ -9,8 +9,8 @@ use tempfile::TempDir; use std::os::unix::ffi::OsStrExt; #[cfg(target_family = "unix")] -#[test] -fn symlink() { +#[tokio::test] +async fn symlink() { let tmpdir = TempDir::new().unwrap(); std::fs::create_dir_all(&tmpdir).unwrap(); @@ -25,6 +25,7 @@ fn symlink() { gen_directory_service(), tmpdir.path().join("doesntmatter"), ) + .await .expect("must succeed"); assert_eq!( @@ -36,8 +37,8 @@ fn symlink() { ) } -#[test] -fn single_file() { +#[tokio::test] +async fn single_file() { let tmpdir = TempDir::new().unwrap(); std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap(); @@ -49,6 +50,7 @@ fn single_file() { gen_directory_service(), tmpdir.path().join("root"), ) + .await .expect("must succeed"); assert_eq!( @@ -62,12 +64,12 @@ fn single_file() { ); // ensure the blob has been uploaded - assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).unwrap()); + assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap()); } #[cfg(target_family = "unix")] -#[test] -fn complicated() { +#[tokio::test] +async fn complicated() { let tmpdir = TempDir::new().unwrap(); // File ``.keep` @@ -87,6 +89,7 @@ fn complicated() { directory_service.clone(), tmpdir.path(), ) + .await .expect("must succeed"); // ensure root_node matched expectations @@ -116,5 +119,5 @@ fn complicated() { .is_some()); // ensure EMPTY_BLOB_CONTENTS has been uploaded - assert!(blob_service.has(&EMPTY_BLOB_DIGEST).unwrap()); + assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap()); } diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 102bf5e7ce24..22dbd7bcba9e 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -28,22 +28,26 @@ fn single_symlink() { } /// Make sure the NARRenderer fails if a referred blob doesn't exist. -#[test] -fn single_file_missing_blob() { +#[tokio::test] +async fn single_file_missing_blob() { let mut buf: Vec<u8> = vec![]; - let e = write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u32, - executable: false, - }), - // the blobservice is empty intentionally, to provoke the error. - gen_blob_service(), - gen_directory_service(), - ) + let e = tokio::task::spawn_blocking(move || { + write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + // the blobservice is empty intentionally, to provoke the error. + gen_blob_service(), + gen_directory_service(), + ) + }) + .await + .unwrap() .expect_err("must fail"); match e { @@ -56,34 +60,43 @@ fn single_file_missing_blob() { /// Make sure the NAR Renderer fails if the returned blob meta has another size /// than specified in the proto node. -#[test] -fn single_file_wrong_blob_size() { +#[tokio::test] +async fn single_file_wrong_blob_size() { let blob_service = gen_blob_service(); // insert blob into the store - let mut writer = blob_service.open_write(); - io::copy( + let mut writer = blob_service.open_write().await; + tokio::io::copy( &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()), &mut writer, ) + .await .unwrap(); - assert_eq!(HELLOWORLD_BLOB_DIGEST.clone(), writer.close().unwrap()); + assert_eq!( + HELLOWORLD_BLOB_DIGEST.clone(), + writer.close().await.unwrap() + ); + let bs = blob_service.clone(); // Test with a root FileNode of a too big size { let mut buf: Vec<u8> = vec![]; - let e = write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: 42, // <- note the wrong size here! - executable: false, - }), - blob_service.clone(), - gen_directory_service(), - ) + let e = tokio::task::spawn_blocking(move || { + write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 42, // <- note the wrong size here! + executable: false, + }), + bs, + gen_directory_service(), + ) + }) + .await + .unwrap() .expect_err("must fail"); match e { @@ -94,22 +107,27 @@ fn single_file_wrong_blob_size() { } } + let bs = blob_service.clone(); // Test with a root FileNode of a too small size { let mut buf: Vec<u8> = vec![]; - let e = write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: 2, // <- note the wrong size here! - executable: false, - }), - blob_service, - gen_directory_service(), - ) - .expect_err("must fail"); + let e = tokio::task::spawn_blocking(move || { + write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: 2, // <- note the wrong size here! + executable: false, + }), + bs, + gen_directory_service(), + ) + .expect_err("must fail") + }) + .await + .unwrap(); match e { crate::nar::RenderError::NARWriterError(e) => { @@ -120,51 +138,63 @@ fn single_file_wrong_blob_size() { } } -#[test] -fn single_file() { +#[tokio::test] +async fn single_file() { let blob_service = gen_blob_service(); // insert blob into the store - let mut writer = blob_service.open_write(); - io::copy( + let mut writer = blob_service.open_write().await; + tokio::io::copy( &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.clone()), &mut writer, ) + .await .unwrap(); - assert_eq!(HELLOWORLD_BLOB_DIGEST.clone(), writer.close().unwrap()); + + assert_eq!( + HELLOWORLD_BLOB_DIGEST.clone(), + writer.close().await.unwrap() + ); let mut buf: Vec<u8> = vec![]; - write_nar( - &mut buf, - &crate::proto::node::Node::File(FileNode { - name: "doesntmatter".into(), - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), - size: HELLOWORLD_BLOB_CONTENTS.len() as u32, - executable: false, - }), - blob_service, - gen_directory_service(), - ) - .expect("must succeed"); + let buf = tokio::task::spawn_blocking(move || { + write_nar( + &mut buf, + &crate::proto::node::Node::File(FileNode { + name: "doesntmatter".into(), + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: false, + }), + blob_service, + gen_directory_service(), + ) + .expect("must succeed"); + + buf + }) + .await + .unwrap(); assert_eq!(buf, NAR_CONTENTS_HELLOWORLD.to_vec()); } -#[test] -fn test_complicated() { +#[tokio::test] +async fn test_complicated() { let blob_service = gen_blob_service(); let directory_service = gen_directory_service(); // put all data into the stores. // insert blob into the store - let mut writer = blob_service.open_write(); - io::copy( + let mut writer = blob_service.open_write().await; + tokio::io::copy( &mut io::Cursor::new(EMPTY_BLOB_CONTENTS.clone()), &mut writer, ) + .await .unwrap(); - assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().unwrap()); + assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().await.unwrap()); directory_service.put(DIRECTORY_WITH_KEEP.clone()).unwrap(); directory_service @@ -173,30 +203,44 @@ fn test_complicated() { let mut buf: Vec<u8> = vec![]; - write_nar( - &mut buf, - &crate::proto::node::Node::Directory(DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), - blob_service.clone(), - directory_service.clone(), - ) - .expect("must succeed"); + let bs = blob_service.clone(); + let ds = directory_service.clone(); + + let buf = tokio::task::spawn_blocking(move || { + write_nar( + &mut buf, + &crate::proto::node::Node::Directory(DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + bs, + ds, + ) + .expect("must succeed"); + buf + }) + .await + .unwrap(); assert_eq!(buf, NAR_CONTENTS_COMPLICATED.to_vec()); // ensure calculate_nar does return the correct sha256 digest and sum. - let (nar_size, nar_digest) = calculate_size_and_sha256( - &crate::proto::node::Node::Directory(DirectoryNode { - name: "doesntmatter".into(), - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), - blob_service, - directory_service, - ) + let bs = blob_service.clone(); + let ds = directory_service.clone(); + let (nar_size, nar_digest) = tokio::task::spawn_blocking(move || { + calculate_size_and_sha256( + &crate::proto::node::Node::Directory(DirectoryNode { + name: "doesntmatter".into(), + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + bs, + ds, + ) + }) + .await + .unwrap() .expect("must succeed"); assert_eq!(NAR_CONTENTS_COMPLICATED.len() as u64, nar_size); |