diff options
author | Florian Klink <flokli@flokli.de> | 2023-09-13T12·20+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-09-18T10·33+0000 |
commit | da6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch) | |
tree | 5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix/store/src/blobservice/tests.rs | |
parent | 3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff) |
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync. This however had some annoying consequences: - It became more and more complicated to track when we're in a context with an async runtime in the context or not, producing bugs like https://b.tvl.fyi/issues/304 - The sync trait shielded away async clients from async worloads, requiring manual block_on code inside the gRPC client code, and spawn_blocking calls in consumers of the trait, even if they were async (like the gRPC server) - We had to write our own custom glue code (SyncReadIntoAsyncRead) to convert a sync io::Read into a tokio::io::AsyncRead, which already existed in tokio internally, but upstream ia hesitant to expose. This now makes the BlobService trait async (via the async_trait macro, like we already do in various gRPC parts), and replaces the sync readers and writers with their async counterparts. Tests interacting with a BlobService now need to have an async runtime available, the easiest way for this is to mark the test functions with the tokio::test macro, allowing us to directly .await in the test function. In places where we don't have an async runtime available from context (like tvix-cli), we can pass one down explicitly. Now that we don't provide a sync interface anymore, the (sync) FUSE library now holds a pointer to a tokio runtime handle, and needs to at least have 2 threads available when talking to a blob service (which is why some of the tests now use the multi_thread flavor). The FUSE tests got a bit more verbose, as we couldn't use the setup_and_mount function accepting a callback anymore. We can hopefully move some of the test fixture setup to rstest in the future to make this less repetitive. Co-Authored-By: Connor Brewster <cbrewster@hey.com> Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329 Reviewed-by: Connor Brewster <cbrewster@hey.com> Tested-by: BuildkiteCI Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src/blobservice/tests.rs')
-rw-r--r-- | tvix/store/src/blobservice/tests.rs | 296 |
1 files changed, 166 insertions, 130 deletions
diff --git a/tvix/store/src/blobservice/tests.rs b/tvix/store/src/blobservice/tests.rs index ec7a618fab58..501270780cf4 100644 --- a/tvix/store/src/blobservice/tests.rs +++ b/tvix/store/src/blobservice/tests.rs @@ -1,6 +1,9 @@ use std::io; +use std::pin::pin; use test_case::test_case; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncSeekExt; use super::B3Digest; use super::BlobService; @@ -24,19 +27,25 @@ fn gen_sled_blob_service() -> impl BlobService { #[test_case(gen_memory_blob_service(); "memory")] #[test_case(gen_sled_blob_service(); "sled")] fn has_nonexistent_false(blob_service: impl BlobService) { - assert!(!blob_service - .has(&fixtures::BLOB_A_DIGEST) - .expect("must not fail")); + tokio::runtime::Runtime::new().unwrap().block_on(async { + assert!(!blob_service + .has(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not fail")); + }) } /// Trying to read a non-existing blob should return a None instead of a reader. #[test_case(gen_memory_blob_service(); "memory")] #[test_case(gen_sled_blob_service(); "sled")] fn not_found_read(blob_service: impl BlobService) { - assert!(blob_service - .open_read(&fixtures::BLOB_A_DIGEST) - .expect("must not fail") - .is_none()) + tokio::runtime::Runtime::new().unwrap().block_on(async { + assert!(blob_service + .open_read(&fixtures::BLOB_A_DIGEST) + .await + .expect("must not fail") + .is_none()) + }) } /// Put a blob in the store, check has, get it back. @@ -46,165 +55,192 @@ fn not_found_read(blob_service: impl BlobService) { #[test_case(gen_memory_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "memory-big")] #[test_case(gen_sled_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "sled-big")] fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest: &B3Digest) { - let mut w = blob_service.open_write(); + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut w = blob_service.open_write().await; - let l = io::copy(&mut io::Cursor::new(blob_contents), &mut w).expect("copy must succeed"); - assert_eq!( - blob_contents.len(), - l as usize, - "written bytes must match blob length" - ); + let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w) + .await + .expect("copy must succeed"); + assert_eq!( + blob_contents.len(), + l as usize, + "written bytes must match blob length" + ); - let digest = w.close().expect("close must succeed"); + let digest = w.close().await.expect("close must succeed"); - assert_eq!(*blob_digest, digest, "returned digest must be correct"); + assert_eq!(*blob_digest, digest, "returned digest must be correct"); - assert!( - blob_service.has(blob_digest).expect("must not fail"), - "blob service should now have the blob" - ); + assert!( + blob_service.has(blob_digest).await.expect("must not fail"), + "blob service should now have the blob" + ); - let mut r = blob_service - .open_read(blob_digest) - .expect("open_read must succeed") - .expect("must be some"); + let mut r = blob_service + .open_read(blob_digest) + .await + .expect("open_read must succeed") + .expect("must be some"); - let mut buf: Vec<u8> = Vec::new(); - let l = io::copy(&mut r, &mut buf).expect("copy must succeed"); + let mut buf: Vec<u8> = Vec::new(); + let mut pinned_reader = pin!(r); + let l = tokio::io::copy(&mut pinned_reader, &mut buf) + .await + .expect("copy must succeed"); + // let l = io::copy(&mut r, &mut buf).expect("copy must succeed"); - assert_eq!( - blob_contents.len(), - l as usize, - "read bytes must match blob length" - ); + assert_eq!( + blob_contents.len(), + l as usize, + "read bytes must match blob length" + ); - assert_eq!(blob_contents, buf, "read blob contents must match"); + assert_eq!(blob_contents, buf, "read blob contents must match"); + }) } /// Put a blob in the store, and seek inside it a bit. #[test_case(gen_memory_blob_service(); "memory")] #[test_case(gen_sled_blob_service(); "sled")] fn put_seek(blob_service: impl BlobService) { - let mut w = blob_service.open_write(); - - io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w).expect("copy must succeed"); - w.close().expect("close must succeed"); - - // open a blob for reading - let mut r = blob_service - .open_read(&fixtures::BLOB_B_DIGEST) - .expect("open_read must succeed") - .expect("must be some"); + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut w = blob_service.open_write().await; - let mut pos: u64 = 0; + tokio::io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w) + .await + .expect("copy must succeed"); + w.close().await.expect("close must succeed"); - // read the first 10 bytes, they must match the data in the fixture. - { - let mut buf = [0; 10]; - r.read_exact(&mut buf).expect("must succeed"); + // open a blob for reading + let mut r = blob_service + .open_read(&fixtures::BLOB_B_DIGEST) + .await + .expect("open_read must succeed") + .expect("must be some"); - assert_eq!( - &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], - buf, - "expected first 10 bytes to match" - ); + let mut pos: u64 = 0; - pos += buf.len() as u64; - } - // seek by 0 bytes, using SeekFrom::Start. - let p = r.seek(io::SeekFrom::Start(pos)).expect("must not fail"); - assert_eq!(pos, p); - - // read the next 10 bytes, they must match the data in the fixture. - { - let mut buf = [0; 10]; - r.read_exact(&mut buf).expect("must succeed"); + // read the first 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); - assert_eq!( - &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], - buf, - "expected data to match" - ); + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected first 10 bytes to match" + ); - pos += buf.len() as u64; - } + pos += buf.len() as u64; + } + // seek by 0 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos)) + .await + .expect("must not fail"); + assert_eq!(pos, p); + + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); - // seek by 5 bytes, using SeekFrom::Start. - let p = r.seek(io::SeekFrom::Start(pos + 5)).expect("must not fail"); - pos += 5; - assert_eq!(pos, p); + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); - // read the next 10 bytes, they must match the data in the fixture. - { - let mut buf = [0; 10]; - r.read_exact(&mut buf).expect("must succeed"); + pos += buf.len() as u64; + } - assert_eq!( - &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], - buf, - "expected data to match" - ); + // seek by 5 bytes, using SeekFrom::Start. + let p = r + .seek(io::SeekFrom::Start(pos + 5)) + .await + .expect("must not fail"); + pos += 5; + assert_eq!(pos, p); - pos += buf.len() as u64; - } + // read the next 10 bytes, they must match the data in the fixture. + { + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); - // seek by 12345 bytes, using SeekFrom:: - let p = r.seek(io::SeekFrom::Current(12345)).expect("must not fail"); - pos += 12345; - assert_eq!(pos, p); + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); - // read the next 10 bytes, they must match the data in the fixture. - { - let mut buf = [0; 10]; - r.read_exact(&mut buf).expect("must succeed"); + pos += buf.len() as u64; + } - assert_eq!( - &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], - buf, - "expected data to match" - ); + // seek by 12345 bytes, using SeekFrom:: + let p = r + .seek(io::SeekFrom::Current(12345)) + .await + .expect("must not fail"); + pos += 12345; + assert_eq!(pos, p); - #[allow(unused_assignments)] + // read the next 10 bytes, they must match the data in the fixture. { - pos += buf.len() as u64; + let mut buf = [0; 10]; + r.read_exact(&mut buf).await.expect("must succeed"); + + assert_eq!( + &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()], + buf, + "expected data to match" + ); + + #[allow(unused_assignments)] + { + pos += buf.len() as u64; + } } - } - // seeking to the end is okay… - let p = r - .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64)) - .expect("must not fail"); - pos = fixtures::BLOB_B.len() as u64; - assert_eq!(pos, p); + // seeking to the end is okay… + let p = r + .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64)) + .await + .expect("must not fail"); + pos = fixtures::BLOB_B.len() as u64; + assert_eq!(pos, p); - { - // but it returns no more data. - let mut buf: Vec<u8> = Vec::new(); - r.read_to_end(&mut buf).expect("must not fail"); - assert!(buf.is_empty(), "expected no more data to be read"); - } - - // seeking past the end… - match r.seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1)) { - // should either be ok, but then return 0 bytes. - // this matches the behaviour or a Cursor<Vec<u8>>. - Ok(_pos) => { + { + // but it returns no more data. let mut buf: Vec<u8> = Vec::new(); - r.read_to_end(&mut buf).expect("must not fail"); + r.read_to_end(&mut buf).await.expect("must not fail"); assert!(buf.is_empty(), "expected no more data to be read"); } - // or not be okay. - Err(_) => {} - } - // TODO: this is only broken for the gRPC version - // We expect seeking backwards or relative to the end to fail. - // r.seek(io::SeekFrom::Current(-1)) - // .expect_err("SeekFrom::Current(-1) expected to fail"); + // seeking past the end… + match r + .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1)) + .await + { + // should either be ok, but then return 0 bytes. + // this matches the behaviour or a Cursor<Vec<u8>>. + Ok(_pos) => { + let mut buf: Vec<u8> = Vec::new(); + r.read_to_end(&mut buf).await.expect("must not fail"); + assert!(buf.is_empty(), "expected no more data to be read"); + } + // or not be okay. + Err(_) => {} + } + + // TODO: this is only broken for the gRPC version + // We expect seeking backwards or relative to the end to fail. + // r.seek(io::SeekFrom::Current(-1)) + // .expect_err("SeekFrom::Current(-1) expected to fail"); - // r.seek(io::SeekFrom::Start(pos - 1)) - // .expect_err("SeekFrom::Start(pos-1) expected to fail"); + // r.seek(io::SeekFrom::Start(pos - 1)) + // .expect_err("SeekFrom::Start(pos-1) expected to fail"); - // r.seek(io::SeekFrom::End(0)) - // .expect_err("SeekFrom::End(_) expected to fail"); + // r.seek(io::SeekFrom::End(0)) + // .expect_err("SeekFrom::End(_) expected to fail"); + }) } |