about summary refs log tree commit diff
path: root/tvix/store/src/blobservice/tests.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-09-13T12·20+0200
committerflokli <flokli@flokli.de>2023-09-18T10·33+0000
commitda6cbb4a459d02111c44a67d3d0dd7e654abff23 (patch)
tree5efce82d3d9aea94cf6d3712a3fdbb7d168e4552 /tvix/store/src/blobservice/tests.rs
parent3de96017640b6dc25f1544a1bafd4b370bb1cea0 (diff)
refactor(tvix/store/blobsvc): make BlobStore async r/6606
We previously kept the trait of a BlobService sync.

This however had some annoying consequences:

 - It became more and more complicated to track when we're in a context
   with an async runtime in the context or not, producing bugs like
   https://b.tvl.fyi/issues/304
 - The sync trait shielded away async clients from async worloads,
   requiring manual block_on code inside the gRPC client code, and
   spawn_blocking calls in consumers of the trait, even if they were
   async (like the gRPC server)
 - We had to write our own custom glue code (SyncReadIntoAsyncRead)
   to convert a sync io::Read into a tokio::io::AsyncRead, which already
   existed in tokio internally, but upstream ia hesitant to expose.

This now makes the BlobService trait async (via the async_trait macro,
like we already do in various gRPC parts), and replaces the sync readers
and writers with their async counterparts.

Tests interacting with a BlobService now need to have an async runtime
available, the easiest way for this is to mark the test functions
with the tokio::test macro, allowing us to directly .await in the test
function.

In places where we don't have an async runtime available from context
(like tvix-cli), we can pass one down explicitly.

Now that we don't provide a sync interface anymore, the (sync) FUSE
library now holds a pointer to a tokio runtime handle, and needs to at
least have 2 threads available when talking to a blob service (which is
why some of the tests now use the multi_thread flavor).

The FUSE tests got a bit more verbose, as we couldn't use the
setup_and_mount function accepting a callback anymore. We can hopefully
move some of the test fixture setup to rstest in the future to make this
less repetitive.

Co-Authored-By: Connor Brewster <cbrewster@hey.com>
Change-Id: Ia0501b606e32c852d0108de9c9016b21c94a3c05
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9329
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Diffstat (limited to 'tvix/store/src/blobservice/tests.rs')
-rw-r--r--tvix/store/src/blobservice/tests.rs296
1 files changed, 166 insertions, 130 deletions
diff --git a/tvix/store/src/blobservice/tests.rs b/tvix/store/src/blobservice/tests.rs
index ec7a618fab58..501270780cf4 100644
--- a/tvix/store/src/blobservice/tests.rs
+++ b/tvix/store/src/blobservice/tests.rs
@@ -1,6 +1,9 @@
 use std::io;
+use std::pin::pin;
 
 use test_case::test_case;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncSeekExt;
 
 use super::B3Digest;
 use super::BlobService;
@@ -24,19 +27,25 @@ fn gen_sled_blob_service() -> impl BlobService {
 #[test_case(gen_memory_blob_service(); "memory")]
 #[test_case(gen_sled_blob_service(); "sled")]
 fn has_nonexistent_false(blob_service: impl BlobService) {
-    assert!(!blob_service
-        .has(&fixtures::BLOB_A_DIGEST)
-        .expect("must not fail"));
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        assert!(!blob_service
+            .has(&fixtures::BLOB_A_DIGEST)
+            .await
+            .expect("must not fail"));
+    })
 }
 
 /// Trying to read a non-existing blob should return a None instead of a reader.
 #[test_case(gen_memory_blob_service(); "memory")]
 #[test_case(gen_sled_blob_service(); "sled")]
 fn not_found_read(blob_service: impl BlobService) {
-    assert!(blob_service
-        .open_read(&fixtures::BLOB_A_DIGEST)
-        .expect("must not fail")
-        .is_none())
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        assert!(blob_service
+            .open_read(&fixtures::BLOB_A_DIGEST)
+            .await
+            .expect("must not fail")
+            .is_none())
+    })
 }
 
 /// Put a blob in the store, check has, get it back.
@@ -46,165 +55,192 @@ fn not_found_read(blob_service: impl BlobService) {
 #[test_case(gen_memory_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "memory-big")]
 #[test_case(gen_sled_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "sled-big")]
 fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest: &B3Digest) {
-    let mut w = blob_service.open_write();
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        let mut w = blob_service.open_write().await;
 
-    let l = io::copy(&mut io::Cursor::new(blob_contents), &mut w).expect("copy must succeed");
-    assert_eq!(
-        blob_contents.len(),
-        l as usize,
-        "written bytes must match blob length"
-    );
+        let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w)
+            .await
+            .expect("copy must succeed");
+        assert_eq!(
+            blob_contents.len(),
+            l as usize,
+            "written bytes must match blob length"
+        );
 
-    let digest = w.close().expect("close must succeed");
+        let digest = w.close().await.expect("close must succeed");
 
-    assert_eq!(*blob_digest, digest, "returned digest must be correct");
+        assert_eq!(*blob_digest, digest, "returned digest must be correct");
 
-    assert!(
-        blob_service.has(blob_digest).expect("must not fail"),
-        "blob service should now have the blob"
-    );
+        assert!(
+            blob_service.has(blob_digest).await.expect("must not fail"),
+            "blob service should now have the blob"
+        );
 
-    let mut r = blob_service
-        .open_read(blob_digest)
-        .expect("open_read must succeed")
-        .expect("must be some");
+        let mut r = blob_service
+            .open_read(blob_digest)
+            .await
+            .expect("open_read must succeed")
+            .expect("must be some");
 
-    let mut buf: Vec<u8> = Vec::new();
-    let l = io::copy(&mut r, &mut buf).expect("copy must succeed");
+        let mut buf: Vec<u8> = Vec::new();
+        let mut pinned_reader = pin!(r);
+        let l = tokio::io::copy(&mut pinned_reader, &mut buf)
+            .await
+            .expect("copy must succeed");
+        // let l = io::copy(&mut r, &mut buf).expect("copy must succeed");
 
-    assert_eq!(
-        blob_contents.len(),
-        l as usize,
-        "read bytes must match blob length"
-    );
+        assert_eq!(
+            blob_contents.len(),
+            l as usize,
+            "read bytes must match blob length"
+        );
 
-    assert_eq!(blob_contents, buf, "read blob contents must match");
+        assert_eq!(blob_contents, buf, "read blob contents must match");
+    })
 }
 
 /// Put a blob in the store, and seek inside it a bit.
 #[test_case(gen_memory_blob_service(); "memory")]
 #[test_case(gen_sled_blob_service(); "sled")]
 fn put_seek(blob_service: impl BlobService) {
-    let mut w = blob_service.open_write();
-
-    io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w).expect("copy must succeed");
-    w.close().expect("close must succeed");
-
-    // open a blob for reading
-    let mut r = blob_service
-        .open_read(&fixtures::BLOB_B_DIGEST)
-        .expect("open_read must succeed")
-        .expect("must be some");
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        let mut w = blob_service.open_write().await;
 
-    let mut pos: u64 = 0;
+        tokio::io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w)
+            .await
+            .expect("copy must succeed");
+        w.close().await.expect("close must succeed");
 
-    // read the first 10 bytes, they must match the data in the fixture.
-    {
-        let mut buf = [0; 10];
-        r.read_exact(&mut buf).expect("must succeed");
+        // open a blob for reading
+        let mut r = blob_service
+            .open_read(&fixtures::BLOB_B_DIGEST)
+            .await
+            .expect("open_read must succeed")
+            .expect("must be some");
 
-        assert_eq!(
-            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-            buf,
-            "expected first 10 bytes to match"
-        );
+        let mut pos: u64 = 0;
 
-        pos += buf.len() as u64;
-    }
-    // seek by 0 bytes, using SeekFrom::Start.
-    let p = r.seek(io::SeekFrom::Start(pos)).expect("must not fail");
-    assert_eq!(pos, p);
-
-    // read the next 10 bytes, they must match the data in the fixture.
-    {
-        let mut buf = [0; 10];
-        r.read_exact(&mut buf).expect("must succeed");
+        // read the first 10 bytes, they must match the data in the fixture.
+        {
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
 
-        assert_eq!(
-            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-            buf,
-            "expected data to match"
-        );
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected first 10 bytes to match"
+            );
 
-        pos += buf.len() as u64;
-    }
+            pos += buf.len() as u64;
+        }
+        // seek by 0 bytes, using SeekFrom::Start.
+        let p = r
+            .seek(io::SeekFrom::Start(pos))
+            .await
+            .expect("must not fail");
+        assert_eq!(pos, p);
+
+        // read the next 10 bytes, they must match the data in the fixture.
+        {
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
 
-    // seek by 5 bytes, using SeekFrom::Start.
-    let p = r.seek(io::SeekFrom::Start(pos + 5)).expect("must not fail");
-    pos += 5;
-    assert_eq!(pos, p);
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected data to match"
+            );
 
-    // read the next 10 bytes, they must match the data in the fixture.
-    {
-        let mut buf = [0; 10];
-        r.read_exact(&mut buf).expect("must succeed");
+            pos += buf.len() as u64;
+        }
 
-        assert_eq!(
-            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-            buf,
-            "expected data to match"
-        );
+        // seek by 5 bytes, using SeekFrom::Start.
+        let p = r
+            .seek(io::SeekFrom::Start(pos + 5))
+            .await
+            .expect("must not fail");
+        pos += 5;
+        assert_eq!(pos, p);
 
-        pos += buf.len() as u64;
-    }
+        // read the next 10 bytes, they must match the data in the fixture.
+        {
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
 
-    // seek by 12345 bytes, using SeekFrom::
-    let p = r.seek(io::SeekFrom::Current(12345)).expect("must not fail");
-    pos += 12345;
-    assert_eq!(pos, p);
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected data to match"
+            );
 
-    // read the next 10 bytes, they must match the data in the fixture.
-    {
-        let mut buf = [0; 10];
-        r.read_exact(&mut buf).expect("must succeed");
+            pos += buf.len() as u64;
+        }
 
-        assert_eq!(
-            &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-            buf,
-            "expected data to match"
-        );
+        // seek by 12345 bytes, using SeekFrom::
+        let p = r
+            .seek(io::SeekFrom::Current(12345))
+            .await
+            .expect("must not fail");
+        pos += 12345;
+        assert_eq!(pos, p);
 
-        #[allow(unused_assignments)]
+        // read the next 10 bytes, they must match the data in the fixture.
         {
-            pos += buf.len() as u64;
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
+
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected data to match"
+            );
+
+            #[allow(unused_assignments)]
+            {
+                pos += buf.len() as u64;
+            }
         }
-    }
 
-    // seeking to the end is okay…
-    let p = r
-        .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64))
-        .expect("must not fail");
-    pos = fixtures::BLOB_B.len() as u64;
-    assert_eq!(pos, p);
+        // seeking to the end is okay…
+        let p = r
+            .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64))
+            .await
+            .expect("must not fail");
+        pos = fixtures::BLOB_B.len() as u64;
+        assert_eq!(pos, p);
 
-    {
-        // but it returns no more data.
-        let mut buf: Vec<u8> = Vec::new();
-        r.read_to_end(&mut buf).expect("must not fail");
-        assert!(buf.is_empty(), "expected no more data to be read");
-    }
-
-    // seeking past the end…
-    match r.seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1)) {
-        // should either be ok, but then return 0 bytes.
-        // this matches the behaviour or a Cursor<Vec<u8>>.
-        Ok(_pos) => {
+        {
+            // but it returns no more data.
             let mut buf: Vec<u8> = Vec::new();
-            r.read_to_end(&mut buf).expect("must not fail");
+            r.read_to_end(&mut buf).await.expect("must not fail");
             assert!(buf.is_empty(), "expected no more data to be read");
         }
-        // or not be okay.
-        Err(_) => {}
-    }
 
-    // TODO: this is only broken for the gRPC version
-    // We expect seeking backwards or relative to the end to fail.
-    // r.seek(io::SeekFrom::Current(-1))
-    //     .expect_err("SeekFrom::Current(-1) expected to fail");
+        // seeking past the end…
+        match r
+            .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1))
+            .await
+        {
+            // should either be ok, but then return 0 bytes.
+            // this matches the behaviour or a Cursor<Vec<u8>>.
+            Ok(_pos) => {
+                let mut buf: Vec<u8> = Vec::new();
+                r.read_to_end(&mut buf).await.expect("must not fail");
+                assert!(buf.is_empty(), "expected no more data to be read");
+            }
+            // or not be okay.
+            Err(_) => {}
+        }
+
+        // TODO: this is only broken for the gRPC version
+        // We expect seeking backwards or relative to the end to fail.
+        // r.seek(io::SeekFrom::Current(-1))
+        //     .expect_err("SeekFrom::Current(-1) expected to fail");
 
-    // r.seek(io::SeekFrom::Start(pos - 1))
-    //     .expect_err("SeekFrom::Start(pos-1) expected to fail");
+        // r.seek(io::SeekFrom::Start(pos - 1))
+        //     .expect_err("SeekFrom::Start(pos-1) expected to fail");
 
-    // r.seek(io::SeekFrom::End(0))
-    //     .expect_err("SeekFrom::End(_) expected to fail");
+        // r.seek(io::SeekFrom::End(0))
+        //     .expect_err("SeekFrom::End(_) expected to fail");
+    })
 }