about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-02-13T18·33+0100
committerflokli <flokli@flokli.de>2023-03-10T10·58+0000
commit35ea0b0d2ed2d72fdafbb0b64756578a389ec484 (patch)
tree968032157e12a06f15a4869b588e8299c4a6fd20 /tvix
parent69516f1f6877a4de7dd92a758c83dd7ffc3254e3 (diff)
feat(tvix/store): add tests for GRPCBlobServiceWrapper r/5911
Change-Id: I0c9a2d887b5b94c43afee97a8a1e701cc9f74f23
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8098
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix')
-rw-r--r--tvix/store/src/proto/tests/grpc_blobservice.rs210
-rw-r--r--tvix/store/src/proto/tests/mod.rs1
2 files changed, 211 insertions, 0 deletions
diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs
new file mode 100644
index 0000000000..e616d141f1
--- /dev/null
+++ b/tvix/store/src/proto/tests/grpc_blobservice.rs
@@ -0,0 +1,210 @@
+use crate::blobservice::{BlobService, SledBlobService};
+use crate::chunkservice::{ChunkService, SledChunkService};
+use crate::proto::blob_meta::ChunkMeta;
+use crate::proto::blob_service_server::BlobService as GRPCBlobService;
+use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest};
+use lazy_static::lazy_static;
+use std::path::Path;
+use tempfile::TempDir;
+
+lazy_static! {
+    // 2 bytes
+    static ref BLOB_A: Vec<u8> = vec![0x00, 0x01];
+    static ref BLOB_A_DIGEST: Vec<u8> = blake3::hash(&BLOB_A).as_bytes().to_vec();
+
+    // 1MB
+    static ref BLOB_B: Vec<u8> = (0..255).collect::<Vec<u8>>().repeat(4 * 1024);
+    static ref BLOB_B_DIGEST: Vec<u8> = blake3::hash(&BLOB_B).as_bytes().to_vec();
+}
+
+fn gen_grpc_blob_service(
+    p: &Path,
+) -> GRPCBlobServiceWrapper<
+    impl BlobService + Send + Sync + Clone + 'static,
+    impl ChunkService + Send + Sync + Clone + 'static,
+> {
+    let blob_service = SledBlobService::new(p.join("blobs")).unwrap();
+    let chunk_service = SledChunkService::new(p.join("chunks")).unwrap();
+    GRPCBlobServiceWrapper::new(blob_service, chunk_service)
+}
+
+/// Trying to read a non-existent blob should return a not found error.
+#[tokio::test]
+async fn not_found_read() {
+    let service = gen_grpc_blob_service(TempDir::new().unwrap().path());
+
+    let resp = service
+        .read(tonic::Request::new(ReadBlobRequest {
+            digest: BLOB_A_DIGEST.to_vec(),
+        }))
+        .await;
+
+    let e = resp.expect_err("must_be_err");
+    assert_eq!(e.code(), tonic::Code::NotFound);
+}
+
+/// Trying to stat a non-existent blob should return a not found error.
+#[tokio::test]
+async fn not_found_stat() {
+    let service = gen_grpc_blob_service(TempDir::new().unwrap().path());
+
+    let resp = service
+        .stat(tonic::Request::new(StatBlobRequest {
+            digest: BLOB_A_DIGEST.to_vec(),
+            ..Default::default()
+        }))
+        .await
+        .expect_err("must fail");
+
+    // The resp should be a status with Code::NotFound
+    assert_eq!(resp.code(), tonic::Code::NotFound);
+}
+
+/// Put a blob in the store, get it back. We send something small enough so it
+/// won't get split into multiple chunks.
+#[tokio::test]
+async fn put_read_stat() {
+    let service = gen_grpc_blob_service(TempDir::new().unwrap().path());
+
+    // Send blob A.
+    let put_resp = service
+        .put(tonic_mock::streaming_request(vec![BlobChunk {
+            data: BLOB_A.clone(),
+        }]))
+        .await
+        .expect("must succeed")
+        .into_inner();
+
+    assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest);
+
+    // Stat for the digest of A. It should return one chunk.
+    let resp = service
+        .stat(tonic::Request::new(StatBlobRequest {
+            digest: BLOB_A_DIGEST.to_vec(),
+            include_chunks: true,
+            ..Default::default()
+        }))
+        .await
+        .expect("must succeed")
+        .into_inner();
+
+    assert_eq!(1, resp.chunks.len());
+    // the `chunks` field should point to the single chunk.
+    assert_eq!(
+        vec![ChunkMeta {
+            digest: BLOB_A_DIGEST.to_vec(),
+            size: BLOB_A.len() as u32,
+        }],
+        resp.chunks,
+    );
+
+    // Read the chunk. It should return the same data.
+    let resp = service
+        .read(tonic::Request::new(ReadBlobRequest {
+            digest: BLOB_A_DIGEST.to_vec(),
+        }))
+        .await;
+
+    let mut rx = resp.expect("must succeed").into_inner().into_inner();
+
+    // the stream should contain one element, a BlobChunk with the same contents as BLOB_A.
+    let item = rx
+        .recv()
+        .await
+        .expect("must be some")
+        .expect("must succeed");
+
+    assert_eq!(BLOB_A.to_vec(), item.data);
+
+    // … and no more elements
+    assert!(rx.recv().await.is_none());
+}
+
+/// Put a bigger blob in the store, and get it back.
+/// Assert the stat request actually returns more than one chunk, and
+/// we can read each chunk individually, as well as the whole blob via the
+/// `read()` method.
+#[tokio::test]
+async fn put_read_stat_large() {
+    let service = gen_grpc_blob_service(TempDir::new().unwrap().path());
+
+    // split up BLOB_B into BlobChunks containing 1K bytes each.
+    let blob_b_blobchunks: Vec<BlobChunk> = BLOB_B
+        .chunks(1024)
+        .map(|x| BlobChunk { data: x.to_vec() })
+        .collect();
+
+    assert!(blob_b_blobchunks.len() > 1);
+
+    // Send blob B
+    let put_resp = service
+        .put(tonic_mock::streaming_request(blob_b_blobchunks))
+        .await
+        .expect("must succeed")
+        .into_inner();
+
+    assert_eq!(BLOB_B_DIGEST.to_vec(), put_resp.digest);
+
+    // Stat for the digest of B
+    let resp = service
+        .stat(tonic::Request::new(StatBlobRequest {
+            digest: BLOB_B_DIGEST.to_vec(),
+            include_chunks: true,
+            ..Default::default()
+        }))
+        .await
+        .expect("must succeed")
+        .into_inner();
+
+    // it should return more than one chunk.
+    assert_ne!(1, resp.chunks.len());
+
+    // The size added up should equal the size of BLOB_B.
+    let mut size_in_stat: u32 = 0;
+    for chunk in &resp.chunks {
+        size_in_stat += chunk.size
+    }
+    assert_eq!(BLOB_B.len() as u32, size_in_stat);
+
+    // Reading the whole blob by its digest via the read() interface should succeed.
+    {
+        let resp = service
+            .read(tonic::Request::new(ReadBlobRequest {
+                digest: BLOB_B_DIGEST.to_vec(),
+            }))
+            .await;
+
+        let mut rx = resp.expect("must succeed").into_inner().into_inner();
+
+        let mut buf: Vec<u8> = Vec::new();
+        while let Some(item) = rx.recv().await {
+            let mut blob_chunk = item.expect("must not be err");
+            buf.append(&mut blob_chunk.data);
+        }
+
+        assert_eq!(BLOB_B.to_vec(), buf);
+    }
+
+    // Reading the whole blob by reading individual chunks should also succeed.
+    {
+        let mut buf: Vec<u8> = Vec::new();
+        for chunk in &resp.chunks {
+            // request this individual chunk via read
+            let resp = service
+                .read(tonic::Request::new(ReadBlobRequest {
+                    digest: chunk.digest.clone(),
+                }))
+                .await;
+
+            let mut rx = resp.expect("must succeed").into_inner().into_inner();
+
+            // append all items from the stream to the buffer
+            while let Some(item) = rx.recv().await {
+                let mut blob_chunk = item.expect("must not be err");
+                buf.append(&mut blob_chunk.data);
+            }
+        }
+        // finished looping over all chunks, compare
+        assert_eq!(BLOB_B.to_vec(), buf);
+    }
+}
diff --git a/tvix/store/src/proto/tests/mod.rs b/tvix/store/src/proto/tests/mod.rs
index 04631afe40..9995be9d37 100644
--- a/tvix/store/src/proto/tests/mod.rs
+++ b/tvix/store/src/proto/tests/mod.rs
@@ -1,3 +1,4 @@
 mod directory;
 mod directory_nodes_iterator;
+mod grpc_blobservice;
 mod pathinfo;