about summary refs log tree commit diff
path: root/tvix/store/src/proto/tests/grpc_blobservice.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/proto/tests/grpc_blobservice.rs')
-rw-r--r--tvix/store/src/proto/tests/grpc_blobservice.rs174
1 files changed, 23 insertions, 151 deletions
diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs
index 88eb21e302..80e2e71867 100644
--- a/tvix/store/src/proto/tests/grpc_blobservice.rs
+++ b/tvix/store/src/proto/tests/grpc_blobservice.rs
@@ -1,18 +1,14 @@
 use crate::blobservice::BlobService;
-use crate::chunkservice::ChunkService;
-use crate::proto::blob_meta::ChunkMeta;
 use crate::proto::blob_service_server::BlobService as GRPCBlobService;
 use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest};
-use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST};
-use crate::tests::utils::{gen_blob_service, gen_chunk_service};
+use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST};
+use crate::tests::utils::gen_blob_service;
+use tokio_stream::StreamExt;
 
-fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper<
-    impl BlobService + Send + Sync + Clone + 'static,
-    impl ChunkService + Send + Sync + Clone + 'static,
-> {
+fn gen_grpc_blob_service(
+) -> GRPCBlobServiceWrapper<impl BlobService + Send + Sync + Clone + 'static> {
     let blob_service = gen_blob_service();
-    let chunk_service = gen_chunk_service();
-    GRPCBlobServiceWrapper::new(blob_service, chunk_service)
+    GRPCBlobServiceWrapper::from(blob_service)
 }
 
 /// Trying to read a non-existent blob should return a not found error.
@@ -26,8 +22,13 @@ async fn not_found_read() {
         }))
         .await;
 
-    let e = resp.expect_err("must_be_err");
-    assert_eq!(e.code(), tonic::Code::NotFound);
+    // We can't use unwrap_err here, because the Ok value doesn't implement
+    // debug.
+    if let Err(e) = resp {
+        assert_eq!(e.code(), tonic::Code::NotFound);
+    } else {
+        panic!("resp is not err")
+    }
 }
 
 /// Trying to stat a non-existent blob should return a not found error.
@@ -47,8 +48,7 @@ async fn not_found_stat() {
     assert_eq!(resp.code(), tonic::Code::NotFound);
 }
 
-/// Put a blob in the store, get it back. We send something small enough so it
-/// won't get split into multiple chunks.
+/// Put a blob in the store, get it back.
 #[tokio::test]
 async fn put_read_stat() {
     let service = gen_grpc_blob_service();
@@ -64,39 +64,30 @@ async fn put_read_stat() {
 
     assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest);
 
-    // Stat for the digest of A. It should return one chunk.
+    // Stat for the digest of A.
+    // We currently don't ask for more granular chunking data, as we don't
+    // expose it yet.
     let resp = service
         .stat(tonic::Request::new(StatBlobRequest {
             digest: BLOB_A_DIGEST.to_vec(),
-            include_chunks: true,
             ..Default::default()
         }))
         .await
         .expect("must succeed")
         .into_inner();
 
-    assert_eq!(1, resp.chunks.len());
-    // the `chunks` field should point to the single chunk.
-    assert_eq!(
-        vec![ChunkMeta {
-            digest: BLOB_A_DIGEST.to_vec(),
-            size: BLOB_A.len() as u32,
-        }],
-        resp.chunks,
-    );
-
-    // Read the chunk. It should return the same data.
+    // Read the blob. It should return the same data.
     let resp = service
         .read(tonic::Request::new(ReadBlobRequest {
             digest: BLOB_A_DIGEST.to_vec(),
         }))
         .await;
 
-    let mut rx = resp.expect("must succeed").into_inner().into_inner();
+    let mut rx = resp.ok().unwrap().into_inner();
 
     // the stream should contain one element, a BlobChunk with the same contents as BLOB_A.
     let item = rx
-        .recv()
+        .next()
         .await
         .expect("must be some")
         .expect("must succeed");
@@ -104,127 +95,8 @@ async fn put_read_stat() {
     assert_eq!(BLOB_A.to_vec(), item.data);
 
     // … and no more elements
-    assert!(rx.recv().await.is_none());
-}
-
-/// Put a bigger blob in the store, and get it back.
-/// Assert the stat request actually returns more than one chunk, and
-/// we can read each chunk individually, as well as the whole blob via the
-/// `read()` method.
-#[tokio::test]
-async fn put_read_stat_large() {
-    let service = gen_grpc_blob_service();
-
-    // split up BLOB_B into BlobChunks containing 1K bytes each.
-    let blob_b_blobchunks: Vec<BlobChunk> = BLOB_B
-        .chunks(1024)
-        .map(|x| BlobChunk { data: x.to_vec() })
-        .collect();
-
-    assert!(blob_b_blobchunks.len() > 1);
+    assert!(rx.next().await.is_none());
 
-    // Send blob B
-    let put_resp = service
-        .put(tonic_mock::streaming_request(blob_b_blobchunks))
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    assert_eq!(BLOB_B_DIGEST.to_vec(), put_resp.digest);
-
-    // Stat for the digest of B
-    let resp = service
-        .stat(tonic::Request::new(StatBlobRequest {
-            digest: BLOB_B_DIGEST.to_vec(),
-            include_chunks: true,
-            ..Default::default()
-        }))
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    // it should return more than one chunk.
-    assert_ne!(1, resp.chunks.len());
-
-    // The size added up should equal the size of BLOB_B.
-    let mut size_in_stat: u32 = 0;
-    for chunk in &resp.chunks {
-        size_in_stat += chunk.size
-    }
-    assert_eq!(BLOB_B.len() as u32, size_in_stat);
-
-    // Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values.
-    // TODO: make the chunker config better accessible, so we don't need to synchronize this.
-    {
-        let chunker_avg_size = 64 * 1024;
-        let chunker_min_size = chunker_avg_size / 4;
-        let chunker_max_size = chunker_avg_size * 4;
-
-        // initialize a chunker with the current buffer
-        let blob_b = BLOB_B.to_vec();
-        let chunker = fastcdc::v2020::FastCDC::new(
-            &blob_b,
-            chunker_min_size,
-            chunker_avg_size,
-            chunker_max_size,
-        );
-
-        let mut num_chunks = 0;
-        for (i, chunk) in chunker.enumerate() {
-            assert_eq!(
-                resp.chunks[i].size, chunk.length as u32,
-                "expected locally-chunked chunk length to match stat response"
-            );
-
-            num_chunks += 1;
-        }
-
-        assert_eq!(
-            resp.chunks.len(),
-            num_chunks,
-            "expected number of chunks to match"
-        );
-    }
-
-    // Reading the whole blob by its digest via the read() interface should succeed.
-    {
-        let resp = service
-            .read(tonic::Request::new(ReadBlobRequest {
-                digest: BLOB_B_DIGEST.to_vec(),
-            }))
-            .await;
-
-        let mut rx = resp.expect("must succeed").into_inner().into_inner();
-
-        let mut buf: Vec<u8> = Vec::new();
-        while let Some(item) = rx.recv().await {
-            let mut blob_chunk = item.expect("must not be err");
-            buf.append(&mut blob_chunk.data);
-        }
-
-        assert_eq!(BLOB_B.to_vec(), buf);
-    }
-
-    // Reading the whole blob by reading individual chunks should also succeed.
-    {
-        let mut buf: Vec<u8> = Vec::new();
-        for chunk in &resp.chunks {
-            // request this individual chunk via read
-            let resp = service
-                .read(tonic::Request::new(ReadBlobRequest {
-                    digest: chunk.digest.clone(),
-                }))
-                .await;
-
-            let mut rx = resp.expect("must succeed").into_inner().into_inner();
-
-            // append all items from the stream to the buffer
-            while let Some(item) = rx.recv().await {
-                let mut blob_chunk = item.expect("must not be err");
-                buf.append(&mut blob_chunk.data);
-            }
-        }
-        // finished looping over all chunks, compare
-        assert_eq!(BLOB_B.to_vec(), buf);
-    }
+    // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks.
+    // Test with some bigger blob too
 }