about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-03-23T22·37+0100
committerclbot <clbot@tvl.fyi>2024-03-24T20·03+0000
commitfe6ae58ba5c16bbf8348c083a0b1b79635b39c53 (patch)
tree132150f2a1e89a4bacaf30846c9f43fc3f2a0776 /tvix/castore/src/blobservice
parentf5cf6592455d5178e829984c76f2df5037319272 (diff)
feat(tvix/castore): add rstest-based BlobService tests r/7779
Change-Id: Ifae9c41e1e3fa5e96f9b7e188181a44650ff8a04
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11250
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r--tvix/castore/src/blobservice/tests.rs243
-rw-r--r--tvix/castore/src/blobservice/tests/mod.rs253
-rw-r--r--tvix/castore/src/blobservice/tests/utils.rs41
3 files changed, 294 insertions, 243 deletions
diff --git a/tvix/castore/src/blobservice/tests.rs b/tvix/castore/src/blobservice/tests.rs
deleted file mode 100644
index 7480ca808225..000000000000
--- a/tvix/castore/src/blobservice/tests.rs
+++ /dev/null
@@ -1,243 +0,0 @@
-use std::io;
-use std::pin::pin;
-
-use test_case::test_case;
-use tokio::io::AsyncReadExt;
-use tokio::io::AsyncSeekExt;
-
-use super::B3Digest;
-use super::BlobService;
-use super::MemoryBlobService;
-use super::SledBlobService;
-use crate::fixtures;
-
-// TODO: avoid having to define all different services we test against for all functions.
-// maybe something like rstest can be used?
-
-fn gen_memory_blob_service() -> impl BlobService {
-    MemoryBlobService::default()
-}
-fn gen_sled_blob_service() -> impl BlobService {
-    SledBlobService::new_temporary().unwrap()
-}
-
-// TODO: add GRPC blob service here.
-
-/// Using [BlobService::has] on a non-existing blob should return false
-#[test_case(gen_memory_blob_service(); "memory")]
-#[test_case(gen_sled_blob_service(); "sled")]
-fn has_nonexistent_false(blob_service: impl BlobService) {
-    tokio::runtime::Runtime::new().unwrap().block_on(async {
-        assert!(!blob_service
-            .has(&fixtures::BLOB_A_DIGEST)
-            .await
-            .expect("must not fail"));
-    })
-}
-
-/// Trying to read a non-existing blob should return a None instead of a reader.
-#[test_case(gen_memory_blob_service(); "memory")]
-#[test_case(gen_sled_blob_service(); "sled")]
-fn not_found_read(blob_service: impl BlobService) {
-    tokio::runtime::Runtime::new().unwrap().block_on(async {
-        assert!(blob_service
-            .open_read(&fixtures::BLOB_A_DIGEST)
-            .await
-            .expect("must not fail")
-            .is_none())
-    })
-}
-
-/// Put a blob in the store, check has, get it back.
-/// We test both with small and big blobs.
-#[test_case(gen_memory_blob_service(), &fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST; "memory-small")]
-#[test_case(gen_sled_blob_service(), &fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST; "sled-small")]
-#[test_case(gen_memory_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "memory-big")]
-#[test_case(gen_sled_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "sled-big")]
-fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest: &B3Digest) {
-    tokio::runtime::Runtime::new().unwrap().block_on(async {
-        let mut w = blob_service.open_write().await;
-
-        let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w)
-            .await
-            .expect("copy must succeed");
-        assert_eq!(
-            blob_contents.len(),
-            l as usize,
-            "written bytes must match blob length"
-        );
-
-        let digest = w.close().await.expect("close must succeed");
-
-        assert_eq!(*blob_digest, digest, "returned digest must be correct");
-
-        assert!(
-            blob_service.has(blob_digest).await.expect("must not fail"),
-            "blob service should now have the blob"
-        );
-
-        let mut r = blob_service
-            .open_read(blob_digest)
-            .await
-            .expect("open_read must succeed")
-            .expect("must be some");
-
-        let mut buf: Vec<u8> = Vec::new();
-        let mut pinned_reader = pin!(r);
-        let l = tokio::io::copy(&mut pinned_reader, &mut buf)
-            .await
-            .expect("copy must succeed");
-        // let l = io::copy(&mut r, &mut buf).expect("copy must succeed");
-
-        assert_eq!(
-            blob_contents.len(),
-            l as usize,
-            "read bytes must match blob length"
-        );
-
-        assert_eq!(blob_contents, buf, "read blob contents must match");
-    })
-}
-
-/// Put a blob in the store, and seek inside it a bit.
-#[test_case(gen_memory_blob_service(); "memory")]
-#[test_case(gen_sled_blob_service(); "sled")]
-fn put_seek(blob_service: impl BlobService) {
-    tokio::runtime::Runtime::new().unwrap().block_on(async {
-        let mut w = blob_service.open_write().await;
-
-        tokio::io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w)
-            .await
-            .expect("copy must succeed");
-        w.close().await.expect("close must succeed");
-
-        // open a blob for reading
-        let mut r = blob_service
-            .open_read(&fixtures::BLOB_B_DIGEST)
-            .await
-            .expect("open_read must succeed")
-            .expect("must be some");
-
-        let mut pos: u64 = 0;
-
-        // read the first 10 bytes, they must match the data in the fixture.
-        {
-            let mut buf = [0; 10];
-            r.read_exact(&mut buf).await.expect("must succeed");
-
-            assert_eq!(
-                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-                buf,
-                "expected first 10 bytes to match"
-            );
-
-            pos += buf.len() as u64;
-        }
-        // seek by 0 bytes, using SeekFrom::Start.
-        let p = r
-            .seek(io::SeekFrom::Start(pos))
-            .await
-            .expect("must not fail");
-        assert_eq!(pos, p);
-
-        // read the next 10 bytes, they must match the data in the fixture.
-        {
-            let mut buf = [0; 10];
-            r.read_exact(&mut buf).await.expect("must succeed");
-
-            assert_eq!(
-                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-                buf,
-                "expected data to match"
-            );
-
-            pos += buf.len() as u64;
-        }
-
-        // seek by 5 bytes, using SeekFrom::Start.
-        let p = r
-            .seek(io::SeekFrom::Start(pos + 5))
-            .await
-            .expect("must not fail");
-        pos += 5;
-        assert_eq!(pos, p);
-
-        // read the next 10 bytes, they must match the data in the fixture.
-        {
-            let mut buf = [0; 10];
-            r.read_exact(&mut buf).await.expect("must succeed");
-
-            assert_eq!(
-                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-                buf,
-                "expected data to match"
-            );
-
-            pos += buf.len() as u64;
-        }
-
-        // seek by 12345 bytes, using SeekFrom::
-        let p = r
-            .seek(io::SeekFrom::Current(12345))
-            .await
-            .expect("must not fail");
-        pos += 12345;
-        assert_eq!(pos, p);
-
-        // read the next 10 bytes, they must match the data in the fixture.
-        {
-            let mut buf = [0; 10];
-            r.read_exact(&mut buf).await.expect("must succeed");
-
-            assert_eq!(
-                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
-                buf,
-                "expected data to match"
-            );
-
-            #[allow(unused_assignments)]
-            {
-                pos += buf.len() as u64;
-            }
-        }
-
-        // seeking to the end is okay…
-        let p = r
-            .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64))
-            .await
-            .expect("must not fail");
-        pos = fixtures::BLOB_B.len() as u64;
-        assert_eq!(pos, p);
-
-        {
-            // but it returns no more data.
-            let mut buf: Vec<u8> = Vec::new();
-            r.read_to_end(&mut buf).await.expect("must not fail");
-            assert!(buf.is_empty(), "expected no more data to be read");
-        }
-
-        // seeking past the end…
-        // should either be ok, but then return 0 bytes.
-        // this matches the behaviour or a Cursor<Vec<u8>>.
-        if let Ok(_pos) = r
-            .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1))
-            .await
-        {
-            let mut buf: Vec<u8> = Vec::new();
-            r.read_to_end(&mut buf).await.expect("must not fail");
-            assert!(buf.is_empty(), "expected no more data to be read");
-        }
-        // or not be okay.
-
-        // TODO: this is only broken for the gRPC version
-        // We expect seeking backwards or relative to the end to fail.
-        // r.seek(io::SeekFrom::Current(-1))
-        //     .expect_err("SeekFrom::Current(-1) expected to fail");
-
-        // r.seek(io::SeekFrom::Start(pos - 1))
-        //     .expect_err("SeekFrom::Start(pos-1) expected to fail");
-
-        // r.seek(io::SeekFrom::End(0))
-        //     .expect_err("SeekFrom::End(_) expected to fail");
-    })
-}
diff --git a/tvix/castore/src/blobservice/tests/mod.rs b/tvix/castore/src/blobservice/tests/mod.rs
new file mode 100644
index 000000000000..b1ae66bac40d
--- /dev/null
+++ b/tvix/castore/src/blobservice/tests/mod.rs
@@ -0,0 +1,253 @@
+//! This contains test scenarios that a given [BlobService] needs to pass.
+//! We use [rstest] and [rstest_reuse] to provide all services we want to test
+//! against, and then apply this template to all test functions.
+
+use rstest::*;
+use rstest_reuse::{self, *};
+use std::io;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncSeekExt;
+
+use super::BlobService;
+use crate::blobservice;
+use crate::fixtures::BLOB_A;
+use crate::fixtures::BLOB_A_DIGEST;
+use crate::fixtures::BLOB_B;
+use crate::fixtures::BLOB_B_DIGEST;
+
+mod utils;
+use self::utils::make_grpc_blob_service_client;
+
+/// This produces a template, which will be applied to all individual test functions.
+/// See https://github.com/la10736/rstest/issues/130#issuecomment-968864832
+#[template]
+#[rstest]
+#[case::memory(blobservice::from_addr("memory://").await.unwrap())]
+#[case::grpc(make_grpc_blob_service_client().await)]
+#[case::sled(blobservice::from_addr("sled://").await.unwrap())]
+pub fn blob_services(#[case] blob_service: impl BlobService) {}
+
+/// Using [BlobService::has] on a non-existing blob should return false.
+#[apply(blob_services)]
+#[tokio::test]
+async fn has_nonexistent_false(blob_service: impl BlobService) {
+    assert!(!blob_service
+        .has(&BLOB_A_DIGEST)
+        .await
+        .expect("must not fail"));
+}
+
+/// Using [BlobService::chunks] on a non-existing blob should return Ok(None)
+#[apply(blob_services)]
+#[tokio::test]
+async fn chunks_nonexistent_false(blob_service: impl BlobService) {
+    assert!(blob_service
+        .chunks(&BLOB_A_DIGEST)
+        .await
+        .expect("must be ok")
+        .is_none());
+}
+
+// TODO: do tests with `chunks`
+
+/// Trying to read a non-existing blob should return a None instead of a reader.
+#[apply(blob_services)]
+#[tokio::test]
+async fn not_found_read(blob_service: impl BlobService) {
+    assert!(blob_service
+        .open_read(&BLOB_A_DIGEST)
+        .await
+        .expect("must not fail")
+        .is_none())
+}
+
+/// Put a blob in the store, check has, get it back.
+#[apply(blob_services)]
+// #[case::small(&fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST)]
+// #[case::big(&fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST)]
+#[tokio::test]
+async fn put_has_get(blob_service: impl BlobService) {
+    // TODO: figure out how to instantiate this with BLOB_A and BLOB_B, as two separate cases
+    for (blob_contents, blob_digest) in &[
+        (&*BLOB_A, BLOB_A_DIGEST.clone()),
+        (&*BLOB_B, BLOB_B_DIGEST.clone()),
+    ] {
+        let mut w = blob_service.open_write().await;
+
+        let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w)
+            .await
+            .expect("copy must succeed");
+        assert_eq!(
+            blob_contents.len(),
+            l as usize,
+            "written bytes must match blob length"
+        );
+
+        let digest = w.close().await.expect("close must succeed");
+
+        assert_eq!(*blob_digest, digest, "returned digest must be correct");
+
+        assert!(
+            blob_service.has(blob_digest).await.expect("must not fail"),
+            "blob service should now have the blob"
+        );
+
+        let mut r = blob_service
+            .open_read(blob_digest)
+            .await
+            .expect("open_read must succeed")
+            .expect("must be some");
+
+        let mut buf: Vec<u8> = Vec::new();
+        let mut pinned_reader = std::pin::pin!(r);
+        let l = tokio::io::copy(&mut pinned_reader, &mut buf)
+            .await
+            .expect("copy must succeed");
+
+        assert_eq!(
+            blob_contents.len(),
+            l as usize,
+            "read bytes must match blob length"
+        );
+
+        assert_eq!(&blob_contents[..], &buf, "read blob contents must match");
+    }
+}
+
+/// Put a blob in the store, and seek inside it a bit.
+#[apply(blob_services)]
+#[tokio::test]
+async fn put_seek(blob_service: impl BlobService) {
+    let mut w = blob_service.open_write().await;
+
+    tokio::io::copy(&mut io::Cursor::new(&BLOB_B.to_vec()), &mut w)
+        .await
+        .expect("copy must succeed");
+    w.close().await.expect("close must succeed");
+
+    // open a blob for reading
+    let mut r = blob_service
+        .open_read(&BLOB_B_DIGEST)
+        .await
+        .expect("open_read must succeed")
+        .expect("must be some");
+
+    let mut pos: u64 = 0;
+
+    // read the first 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).await.expect("must succeed");
+
+        assert_eq!(
+            &BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected first 10 bytes to match"
+        );
+
+        pos += buf.len() as u64;
+    }
+    // seek by 0 bytes, using SeekFrom::Start.
+    let p = r
+        .seek(io::SeekFrom::Start(pos))
+        .await
+        .expect("must not fail");
+    assert_eq!(pos, p);
+
+    // read the next 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).await.expect("must succeed");
+
+        assert_eq!(
+            &BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected data to match"
+        );
+
+        pos += buf.len() as u64;
+    }
+
+    // seek by 5 bytes, using SeekFrom::Start.
+    let p = r
+        .seek(io::SeekFrom::Start(pos + 5))
+        .await
+        .expect("must not fail");
+    pos += 5;
+    assert_eq!(pos, p);
+
+    // read the next 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).await.expect("must succeed");
+
+        assert_eq!(
+            &BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected data to match"
+        );
+
+        pos += buf.len() as u64;
+    }
+
+    // seek by 12345 bytes, using SeekFrom::
+    let p = r
+        .seek(io::SeekFrom::Current(12345))
+        .await
+        .expect("must not fail");
+    pos += 12345;
+    assert_eq!(pos, p);
+
+    // read the next 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).await.expect("must succeed");
+
+        assert_eq!(
+            &BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected data to match"
+        );
+
+        #[allow(unused_assignments)]
+        {
+            pos += buf.len() as u64;
+        }
+    }
+
+    // seeking to the end is okay…
+    let p = r
+        .seek(io::SeekFrom::Start(BLOB_B.len() as u64))
+        .await
+        .expect("must not fail");
+    pos = BLOB_B.len() as u64;
+    assert_eq!(pos, p);
+
+    {
+        // but it returns no more data.
+        let mut buf: Vec<u8> = Vec::new();
+        r.read_to_end(&mut buf).await.expect("must not fail");
+        assert!(buf.is_empty(), "expected no more data to be read");
+    }
+
+    // seeking past the end…
+    // should either be ok, but then return 0 bytes.
+    // this matches the behaviour or a Cursor<Vec<u8>>.
+    if let Ok(_pos) = r.seek(io::SeekFrom::Start(BLOB_B.len() as u64 + 1)).await {
+        let mut buf: Vec<u8> = Vec::new();
+        r.read_to_end(&mut buf).await.expect("must not fail");
+        assert!(buf.is_empty(), "expected no more data to be read");
+    }
+    // or not be okay.
+
+    // TODO: this is only broken for the gRPC version
+    // We expect seeking backwards or relative to the end to fail.
+    // r.seek(io::SeekFrom::Current(-1))
+    //     .expect_err("SeekFrom::Current(-1) expected to fail");
+
+    // r.seek(io::SeekFrom::Start(pos - 1))
+    //     .expect_err("SeekFrom::Start(pos-1) expected to fail");
+
+    // r.seek(io::SeekFrom::End(0))
+    //     .expect_err("SeekFrom::End(_) expected to fail");
+}
diff --git a/tvix/castore/src/blobservice/tests/utils.rs b/tvix/castore/src/blobservice/tests/utils.rs
new file mode 100644
index 000000000000..706c4b5e4319
--- /dev/null
+++ b/tvix/castore/src/blobservice/tests/utils.rs
@@ -0,0 +1,41 @@
+use crate::blobservice::{BlobService, MemoryBlobService};
+use crate::proto::blob_service_client::BlobServiceClient;
+use crate::proto::GRPCBlobServiceWrapper;
+use crate::{blobservice::GRPCBlobService, proto::blob_service_server::BlobServiceServer};
+use tonic::transport::{Endpoint, Server, Uri};
+
+/// Constructs and returns a gRPC BlobService.
+/// The server part is a [MemoryBlobService], exposed via the
+/// [GRPCBlobServiceWrapper], and connected through a DuplexStream
+pub async fn make_grpc_blob_service_client() -> Box<dyn BlobService> {
+    let (left, right) = tokio::io::duplex(64);
+
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn(async {
+        let blob_service = Box::<MemoryBlobService>::default() as Box<dyn BlobService>;
+
+        // spin up a new DirectoryService
+        let mut server = Server::builder();
+        let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
+            blob_service,
+        )));
+
+        router
+            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+            .await
+    });
+
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+
+    Box::new(GRPCBlobService::from_client(BlobServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
+    )))
+}