diff options
author | Florian Klink <flokli@flokli.de> | 2023-05-11T12·49+0300 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-05-11T14·27+0000 |
commit | 616fa4476f93e1782e68dc713e9e8cb77a426c7d (patch) | |
tree | f76a43e95c75d848d079706fbccfd442210ebebc /tvix/store/src/proto/tests | |
parent | b22b685f0b2524c088deacbf4e80e7b7c73b5afc (diff) |
refactor(tvix/store): remove ChunkService r/6133
Whether chunking is involved or not, is an implementation detail of each Blobstore. Consumers of a whole blob shouldn't need to worry about that. It currently is not visible in the gRPC interface either. It shouldn't bleed into everything. Let the BlobService trait provide `open_read` and `open_write` methods, which return handles providing io::Read or io::Write, and leave the details up to the implementation. This means, our custom BlobReader module can go away, and all the chunking bits in there, too. In the future, we might still want to add more chunking-aware syncing, but as a syncing strategy some stores can expose, not as a fundamental protocol component. This currently needs "SyncReadIntoAsyncRead", taken and vendored in from https://github.com/tokio-rs/tokio/pull/5669. It provides a AsyncRead for a sync Read, which is necessary to connect our (sync) BlobReader interface to a GRPC server implementation. As an alternative, we could also make the BlobReader itself async, and let consumers of the trait (EvalIO) deal with the async-ness, but this is less of a change for now. In terms of vendoring, I initially tried to move our tokio crate to these commits, but ended up in version incompatibilities, so let's vendor it in for now. Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551 Tested-by: BuildkiteCI Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix/store/src/proto/tests')
-rw-r--r-- | tvix/store/src/proto/tests/grpc_blobservice.rs | 174 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_pathinfoservice.rs | 12 |
2 files changed, 26 insertions, 160 deletions
diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs index 88eb21e30241..80e2e71867a3 100644 --- a/tvix/store/src/proto/tests/grpc_blobservice.rs +++ b/tvix/store/src/proto/tests/grpc_blobservice.rs @@ -1,18 +1,14 @@ use crate::blobservice::BlobService; -use crate::chunkservice::ChunkService; -use crate::proto::blob_meta::ChunkMeta; use crate::proto::blob_service_server::BlobService as GRPCBlobService; use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest}; -use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST}; -use crate::tests::utils::{gen_blob_service, gen_chunk_service}; +use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST}; +use crate::tests::utils::gen_blob_service; +use tokio_stream::StreamExt; -fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper< - impl BlobService + Send + Sync + Clone + 'static, - impl ChunkService + Send + Sync + Clone + 'static, -> { +fn gen_grpc_blob_service( +) -> GRPCBlobServiceWrapper<impl BlobService + Send + Sync + Clone + 'static> { let blob_service = gen_blob_service(); - let chunk_service = gen_chunk_service(); - GRPCBlobServiceWrapper::new(blob_service, chunk_service) + GRPCBlobServiceWrapper::from(blob_service) } /// Trying to read a non-existent blob should return a not found error. @@ -26,8 +22,13 @@ async fn not_found_read() { })) .await; - let e = resp.expect_err("must_be_err"); - assert_eq!(e.code(), tonic::Code::NotFound); + // We can't use unwrap_err here, because the Ok value doesn't implement + // debug. + if let Err(e) = resp { + assert_eq!(e.code(), tonic::Code::NotFound); + } else { + panic!("resp is not err") + } } /// Trying to stat a non-existent blob should return a not found error. @@ -47,8 +48,7 @@ async fn not_found_stat() { assert_eq!(resp.code(), tonic::Code::NotFound); } -/// Put a blob in the store, get it back. We send something small enough so it -/// won't get split into multiple chunks. +/// Put a blob in the store, get it back. #[tokio::test] async fn put_read_stat() { let service = gen_grpc_blob_service(); @@ -64,39 +64,30 @@ async fn put_read_stat() { assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest); - // Stat for the digest of A. It should return one chunk. + // Stat for the digest of A. + // We currently don't ask for more granular chunking data, as we don't + // expose it yet. let resp = service .stat(tonic::Request::new(StatBlobRequest { digest: BLOB_A_DIGEST.to_vec(), - include_chunks: true, ..Default::default() })) .await .expect("must succeed") .into_inner(); - assert_eq!(1, resp.chunks.len()); - // the `chunks` field should point to the single chunk. - assert_eq!( - vec![ChunkMeta { - digest: BLOB_A_DIGEST.to_vec(), - size: BLOB_A.len() as u32, - }], - resp.chunks, - ); - - // Read the chunk. It should return the same data. + // Read the blob. It should return the same data. let resp = service .read(tonic::Request::new(ReadBlobRequest { digest: BLOB_A_DIGEST.to_vec(), })) .await; - let mut rx = resp.expect("must succeed").into_inner().into_inner(); + let mut rx = resp.ok().unwrap().into_inner(); // the stream should contain one element, a BlobChunk with the same contents as BLOB_A. let item = rx - .recv() + .next() .await .expect("must be some") .expect("must succeed"); @@ -104,127 +95,8 @@ async fn put_read_stat() { assert_eq!(BLOB_A.to_vec(), item.data); // … and no more elements - assert!(rx.recv().await.is_none()); -} - -/// Put a bigger blob in the store, and get it back. -/// Assert the stat request actually returns more than one chunk, and -/// we can read each chunk individually, as well as the whole blob via the -/// `read()` method. -#[tokio::test] -async fn put_read_stat_large() { - let service = gen_grpc_blob_service(); - - // split up BLOB_B into BlobChunks containing 1K bytes each. - let blob_b_blobchunks: Vec<BlobChunk> = BLOB_B - .chunks(1024) - .map(|x| BlobChunk { data: x.to_vec() }) - .collect(); - - assert!(blob_b_blobchunks.len() > 1); + assert!(rx.next().await.is_none()); - // Send blob B - let put_resp = service - .put(tonic_mock::streaming_request(blob_b_blobchunks)) - .await - .expect("must succeed") - .into_inner(); - - assert_eq!(BLOB_B_DIGEST.to_vec(), put_resp.digest); - - // Stat for the digest of B - let resp = service - .stat(tonic::Request::new(StatBlobRequest { - digest: BLOB_B_DIGEST.to_vec(), - include_chunks: true, - ..Default::default() - })) - .await - .expect("must succeed") - .into_inner(); - - // it should return more than one chunk. - assert_ne!(1, resp.chunks.len()); - - // The size added up should equal the size of BLOB_B. - let mut size_in_stat: u32 = 0; - for chunk in &resp.chunks { - size_in_stat += chunk.size - } - assert_eq!(BLOB_B.len() as u32, size_in_stat); - - // Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values. - // TODO: make the chunker config better accessible, so we don't need to synchronize this. - { - let chunker_avg_size = 64 * 1024; - let chunker_min_size = chunker_avg_size / 4; - let chunker_max_size = chunker_avg_size * 4; - - // initialize a chunker with the current buffer - let blob_b = BLOB_B.to_vec(); - let chunker = fastcdc::v2020::FastCDC::new( - &blob_b, - chunker_min_size, - chunker_avg_size, - chunker_max_size, - ); - - let mut num_chunks = 0; - for (i, chunk) in chunker.enumerate() { - assert_eq!( - resp.chunks[i].size, chunk.length as u32, - "expected locally-chunked chunk length to match stat response" - ); - - num_chunks += 1; - } - - assert_eq!( - resp.chunks.len(), - num_chunks, - "expected number of chunks to match" - ); - } - - // Reading the whole blob by its digest via the read() interface should succeed. - { - let resp = service - .read(tonic::Request::new(ReadBlobRequest { - digest: BLOB_B_DIGEST.to_vec(), - })) - .await; - - let mut rx = resp.expect("must succeed").into_inner().into_inner(); - - let mut buf: Vec<u8> = Vec::new(); - while let Some(item) = rx.recv().await { - let mut blob_chunk = item.expect("must not be err"); - buf.append(&mut blob_chunk.data); - } - - assert_eq!(BLOB_B.to_vec(), buf); - } - - // Reading the whole blob by reading individual chunks should also succeed. - { - let mut buf: Vec<u8> = Vec::new(); - for chunk in &resp.chunks { - // request this individual chunk via read - let resp = service - .read(tonic::Request::new(ReadBlobRequest { - digest: chunk.digest.clone(), - })) - .await; - - let mut rx = resp.expect("must succeed").into_inner().into_inner(); - - // append all items from the stream to the buffer - while let Some(item) = rx.recv().await { - let mut blob_chunk = item.expect("must not be err"); - buf.append(&mut blob_chunk.data); - } - } - // finished looping over all chunks, compare - assert_eq!(BLOB_B.to_vec(), buf); - } + // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks. + // Test with some bigger blob too } diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs index f74c3fda0213..11cab2c264cc 100644 --- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -6,12 +6,10 @@ use crate::proto::GRPCPathInfoServiceWrapper; use crate::proto::PathInfo; use crate::proto::{GetPathInfoRequest, Node, SymlinkNode}; use crate::tests::fixtures::DUMMY_OUTPUT_HASH; -use crate::tests::utils::{ - gen_blob_service, gen_chunk_service, gen_directory_service, gen_pathinfo_service, -}; +use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; use tonic::Request; -/// generates a GRPCPathInfoService out of blob, chunk, directory and pathinfo services. +/// generates a GRPCPathInfoService out of blob, directory and pathinfo services. /// /// We only interact with it via the PathInfo GRPC interface. /// It uses the NonCachingNARCalculationService NARCalculationService to @@ -19,11 +17,7 @@ use tonic::Request; fn gen_grpc_service() -> impl GRPCPathInfoService { GRPCPathInfoServiceWrapper::new( gen_pathinfo_service(), - NonCachingNARCalculationService::new( - gen_blob_service(), - gen_chunk_service(), - gen_directory_service(), - ), + NonCachingNARCalculationService::new(gen_blob_service(), gen_directory_service()), ) } |