about summary refs log tree commit diff
path: root/tvix/store/src/proto
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/proto')
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs242
-rw-r--r--tvix/store/src/proto/mod.rs2
-rw-r--r--tvix/store/src/proto/sync_read_into_async_read.rs158
-rw-r--r--tvix/store/src/proto/tests/grpc_blobservice.rs174
-rw-r--r--tvix/store/src/proto/tests/grpc_pathinfoservice.rs12
5 files changed, 270 insertions, 318 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index e891caa0268e..d1e98a5b7953 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,54 +1,34 @@
-use std::collections::VecDeque;
-
 use crate::{
-    blobservice::BlobService,
-    chunkservice::{read_all_and_chunk, update_hasher, ChunkService},
-    Error,
+    blobservice::{BlobService, BlobWriter},
+    proto::sync_read_into_async_read::SyncReadIntoAsyncRead,
 };
 use data_encoding::BASE64;
-use tokio::{sync::mpsc::channel, task};
-use tokio_stream::{wrappers::ReceiverStream, StreamExt};
+use std::{collections::VecDeque, io, pin::Pin};
+use tokio::task;
+use tokio_stream::StreamExt;
+use tokio_util::io::ReaderStream;
 use tonic::{async_trait, Request, Response, Status, Streaming};
-use tracing::{debug, instrument, warn};
+use tracing::{instrument, warn};
 
-pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> {
+pub struct GRPCBlobServiceWrapper<BS: BlobService> {
     blob_service: BS,
-    chunk_service: CS,
 }
 
-impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
-    pub fn new(blob_service: BS, chunk_service: CS) -> Self {
+impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> {
+    fn from(value: BS) -> Self {
         Self {
-            blob_service,
-            chunk_service,
-        }
-    }
-
-    // upload the chunk to the chunk service, and return its digest (or an error) when done.
-    #[instrument(skip(chunk_service))]
-    fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
-        let mut hasher = blake3::Hasher::new();
-        update_hasher(&mut hasher, &chunk_data);
-        let digest = hasher.finalize();
-
-        if chunk_service.has(digest.as_bytes())? {
-            debug!("already has chunk, skipping");
+            blob_service: value,
         }
-        let digest_resp = chunk_service.put(chunk_data)?;
-
-        assert_eq!(&digest_resp, digest.as_bytes());
-
-        Ok(digest.as_bytes().to_vec())
     }
 }
 
 #[async_trait]
-impl<
-        BS: BlobService + Send + Sync + Clone + 'static,
-        CS: ChunkService + Send + Sync + Clone + 'static,
-    > super::blob_service_server::BlobService for GRPCBlobServiceWrapper<BS, CS>
+impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server::BlobService
+    for GRPCBlobServiceWrapper<BS>
 {
-    type ReadStream = ReceiverStream<Result<super::BlobChunk, Status>>;
+    // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
+    type ReadStream =
+        Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>;
 
     #[instrument(skip(self))]
     async fn stat(
@@ -56,12 +36,22 @@ impl<
         request: Request<super::StatBlobRequest>,
     ) -> Result<Response<super::BlobMeta>, Status> {
         let rq = request.into_inner();
-        match self.blob_service.stat(&rq) {
-            Ok(None) => Err(Status::not_found(format!(
+        let req_digest: [u8; 32] = rq
+            .digest
+            .clone()
+            .try_into()
+            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
+
+        if rq.include_chunks || rq.include_bao {
+            return Err(Status::internal("not implemented"));
+        }
+
+        match self.blob_service.has(&req_digest) {
+            Ok(true) => Ok(Response::new(super::BlobMeta::default())),
+            Ok(false) => Err(Status::not_found(format!(
                 "blob {} not found",
-                BASE64.encode(&rq.digest)
+                BASE64.encode(&req_digest)
             ))),
-            Ok(Some(blob_meta)) => Ok(Response::new(blob_meta)),
             Err(e) => Err(e.into()),
         }
     }
@@ -71,99 +61,38 @@ impl<
         &self,
         request: Request<super::ReadBlobRequest>,
     ) -> Result<Response<Self::ReadStream>, Status> {
-        let req = request.into_inner();
-        let (tx, rx) = channel(5);
+        let rq = request.into_inner();
 
-        let req_digest: [u8; 32] = req
+        let req_digest: [u8; 32] = rq
             .digest
+            .clone()
             .try_into()
-            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
-
-        // query the blob service for more detailed blob info
-        let stat_resp = self.blob_service.stat(&super::StatBlobRequest {
-            digest: req_digest.to_vec(),
-            include_chunks: true,
-            ..Default::default()
-        })?;
-
-        match stat_resp {
-            None => {
-                // If the stat didn't return any blobmeta, the client might
-                // still have asked for a single chunk to be read.
-                // Check the chunkstore.
-                if let Some(data) = self.chunk_service.get(&req_digest)? {
-                    // We already know the hash matches, and contrary to
-                    // iterating over a blobmeta, we can't know the size,
-                    // so send the contents of that chunk over,
-                    // as the first (and only) element of the stream.
-                    task::spawn(async move {
-                        let res = Ok(super::BlobChunk { data });
-                        // send the result to the client. If the client already left, that's also fine.
-                        if (tx.send(res).await).is_err() {
-                            debug!("receiver dropped");
-                        }
-                    });
-                } else {
-                    return Err(Status::not_found(format!(
-                        "blob {} not found",
-                        BASE64.encode(&req_digest),
-                    )));
-                }
-            }
-            Some(blobmeta) => {
-                let chunk_client = self.chunk_service.clone();
-
-                // TODO: use BlobReader?
-                // But then we might not be able to send compressed chunks as-is.
-                // Might require implementing https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html for it
-                // first, so we can .next().await in here.
-
-                task::spawn(async move {
-                    for chunkmeta in blobmeta.chunks {
-                        // request chunk.
-                        // We don't need to validate the digest again, as
-                        // that's required for all implementations of ChunkService.
-                        // TODO: handle error
-                        let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap();
-                        let res = match chunk_client.get(chunkmeta_digest) {
-                            Err(e) => Err(e.into()),
-                            // TODO: make this a separate error type
-                            Ok(None) => Err(Error::StorageError(format!(
-                                "consistency error: chunk {} for blob {} not found",
-                                BASE64.encode(chunkmeta_digest),
-                                BASE64.encode(&req_digest),
-                            ))
-                            .into()),
-                            Ok(Some(data)) => {
-                                // We already know the hash matches, but also
-                                // check the size matches what chunkmeta said.
-                                if data.len() as u32 != chunkmeta.size {
-                                    Err(Error::StorageError(format!(
-                                        "consistency error: chunk {} for blob {} has wrong size, expected {}, got {}",
-                                        BASE64.encode(chunkmeta_digest),
-                                        BASE64.encode(&req_digest),
-                                        chunkmeta.size,
-                                        data.len(),
-                                    )).into())
-                                } else {
-                                    // send out the current chunk
-                                    // TODO: we might want to break this up further if too big?
-                                    Ok(super::BlobChunk { data })
-                                }
-                            }
-                        };
-                        // send the result to the client
-                        if (tx.send(res).await).is_err() {
-                            debug!("receiver dropped");
-                            break;
-                        }
+            .map_err(|_| Status::invalid_argument("invalid digest length"))?;
+
+        match self.blob_service.open_read(&req_digest) {
+            Ok(Some(reader)) => {
+                let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into();
+
+                fn stream_mapper(
+                    x: Result<bytes::Bytes, io::Error>,
+                ) -> Result<super::BlobChunk, Status> {
+                    match x {
+                        Ok(bytes) => Ok(super::BlobChunk {
+                            data: bytes.to_vec(),
+                        }),
+                        Err(e) => Err(Status::from(e)),
                     }
-                });
+                }
+
+                let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper);
+                Ok(Response::new(Box::pin(chunks_stream)))
             }
+            Ok(None) => Err(Status::not_found(format!(
+                "blob {} not found",
+                BASE64.encode(&rq.digest)
+            ))),
+            Err(e) => Err(e.into()),
         }
-
-        let receiver_stream = ReceiverStream::new(rx);
-        Ok(Response::new(receiver_stream))
     }
 
     #[instrument(skip(self))]
@@ -180,37 +109,34 @@ impl<
 
         let data_reader = tokio_util::io::StreamReader::new(data_stream);
 
-        // TODO: can we get rid of this clone?
-        let chunk_service = self.chunk_service.clone();
-
-        let (blob_digest, blob_meta) =
-            task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> {
-                // feed read_all_and_chunk a (sync) reader to the data retrieved from the stream.
-                read_all_and_chunk(
-                    &chunk_service,
-                    tokio_util::io::SyncIoBridge::new(data_reader),
-                )
-            })
-            .await
-            .map_err(|e| Status::internal(e.to_string()))??;
-
-        // upload blobmeta if not there yet
-        if self
+        // prepare a writer, which we'll use in the blocking task below.
+        let mut writer = self
             .blob_service
-            .stat(&super::StatBlobRequest {
-                digest: blob_digest.to_vec(),
-                include_chunks: false,
-                include_bao: false,
-            })?
-            .is_none()
-        {
-            // upload blobmeta
-            self.blob_service.put(&blob_digest, blob_meta)?;
-        }
-
-        // return to client.
-        Ok(Response::new(super::PutBlobResponse {
-            digest: blob_digest,
-        }))
+            .open_write()
+            .map_err(|e| Status::internal(format!("unable to open for write: {}", e)))?;
+
+        let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> {
+            // construct a sync reader to the data
+            let mut reader = tokio_util::io::SyncIoBridge::new(data_reader);
+
+            io::copy(&mut reader, &mut writer).map_err(|e| {
+                warn!("error copying: {}", e);
+                Status::internal("error copying")
+            })?;
+
+            let digest = writer
+                .close()
+                .map_err(|e| {
+                    warn!("error closing stream: {}", e);
+                    Status::internal("error closing stream")
+                })?
+                .to_vec();
+
+            Ok(super::PutBlobResponse { digest })
+        })
+        .await
+        .map_err(|_| Status::internal("failed to wait for task"))??;
+
+        Ok(Response::new(result))
     }
 }
diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs
index 5002d7e77a96..62741a3ad508 100644
--- a/tvix/store/src/proto/mod.rs
+++ b/tvix/store/src/proto/mod.rs
@@ -11,6 +11,8 @@ mod grpc_blobservice_wrapper;
 mod grpc_directoryservice_wrapper;
 mod grpc_pathinfoservice_wrapper;
 
+mod sync_read_into_async_read;
+
 pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper;
 pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper;
 pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper;
diff --git a/tvix/store/src/proto/sync_read_into_async_read.rs b/tvix/store/src/proto/sync_read_into_async_read.rs
new file mode 100644
index 000000000000..0a0ef019781c
--- /dev/null
+++ b/tvix/store/src/proto/sync_read_into_async_read.rs
@@ -0,0 +1,158 @@
+use bytes::Buf;
+use core::task::Poll::Ready;
+use futures::ready;
+use futures::Future;
+use std::io;
+use std::io::Read;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+use tokio::io::AsyncRead;
+use tokio::runtime::Handle;
+use tokio::sync::Mutex;
+use tokio::task::JoinHandle;
+
+#[derive(Debug)]
+enum State<Buf: bytes::Buf + bytes::BufMut> {
+    Idle(Option<Buf>),
+    Busy(JoinHandle<(io::Result<usize>, Buf)>),
+}
+
+use State::{Busy, Idle};
+
+/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a
+/// synchronous API.
+#[derive(Debug)]
+pub struct SyncReadIntoAsyncRead<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> {
+    state: Mutex<State<Buf>>,
+    reader: Arc<Mutex<R>>,
+    rt: Handle,
+}
+
+impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> SyncReadIntoAsyncRead<R, Buf> {
+    /// This must be called from within a Tokio runtime context, or else it will panic.
+    #[track_caller]
+    pub fn new(rt: Handle, reader: R) -> Self {
+        Self {
+            rt,
+            state: State::Idle(None).into(),
+            reader: Arc::new(reader.into()),
+        }
+    }
+
+    /// This must be called from within a Tokio runtime context, or else it will panic.
+    pub fn new_with_reader(readable: R) -> Self {
+        Self::new(Handle::current(), readable)
+    }
+}
+
+/// Repeats operations that are interrupted.
+macro_rules! uninterruptibly {
+    ($e:expr) => {{
+        loop {
+            match $e {
+                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
+                res => break res,
+            }
+        }
+    }};
+}
+
+impl<
+        R: Read + Send + 'static + std::marker::Unpin,
+        Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static,
+    > AsyncRead for SyncReadIntoAsyncRead<R, Buf>
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        dst: &mut tokio::io::ReadBuf<'_>,
+    ) -> Poll<io::Result<()>> {
+        let me = self.get_mut();
+        // Do we need this mutex?
+        let state = me.state.get_mut();
+
+        loop {
+            match state {
+                Idle(ref mut buf_cell) => {
+                    let mut buf = buf_cell.take().unwrap_or_default();
+
+                    if buf.has_remaining() {
+                        // Here, we will split the `buf` into `[..dst.remaining()... ; rest ]`
+                        // The `rest` is stuffed into the `buf_cell` for further poll_read.
+                        // The other is completely consumed into the unfilled destination.
+                        // `rest` can be empty.
+                        let mut adjusted_src =
+                            buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining()));
+                        let copied_size = adjusted_src.remaining();
+                        adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size));
+                        dst.set_filled(copied_size);
+                        *buf_cell = Some(buf);
+                        return Ready(Ok(()));
+                    }
+
+                    let reader = me.reader.clone();
+                    *state = Busy(me.rt.spawn_blocking(move || {
+                        let result = uninterruptibly!(reader.blocking_lock().read(
+                            // SAFETY: `reader.read` will *ONLY* write initialized bytes
+                            // and never *READ* uninitialized bytes
+                            // inside this buffer.
+                            //
+                            // Furthermore, casting the slice as `*mut [u8]`
+                            // is safe because it has the same layout.
+                            //
+                            // Finally, the pointer obtained is valid and owned
+                            // by `buf` only as we have a valid mutable reference
+                            // to it, it is valid for write.
+                            //
+                            // Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998
+                            unsafe {
+                                &mut *(buf.chunk_mut().as_uninit_slice_mut()
+                                    as *mut [std::mem::MaybeUninit<u8>]
+                                    as *mut [u8])
+                            }
+                        ));
+
+                        if let Ok(n) = result {
+                            // SAFETY: given we initialize `n` bytes, we can move `n` bytes
+                            // forward.
+                            unsafe {
+                                buf.advance_mut(n);
+                            }
+                        }
+
+                        (result, buf)
+                    }));
+                }
+                Busy(ref mut rx) => {
+                    let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?;
+
+                    match result {
+                        Ok(n) => {
+                            if n > 0 {
+                                let remaining = std::cmp::min(n, dst.remaining());
+                                let mut adjusted_src = buf.copy_to_bytes(remaining);
+                                adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining));
+                                dst.advance(remaining);
+                            }
+                            *state = Idle(Some(buf));
+                            return Ready(Ok(()));
+                        }
+                        Err(e) => {
+                            *state = Idle(None);
+                            return Ready(Err(e));
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> From<R> for SyncReadIntoAsyncRead<R, Buf> {
+    /// This must be called from within a Tokio runtime context, or else it will panic.
+    fn from(value: R) -> Self {
+        Self::new_with_reader(value)
+    }
+}
diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs
index 88eb21e30241..80e2e71867a3 100644
--- a/tvix/store/src/proto/tests/grpc_blobservice.rs
+++ b/tvix/store/src/proto/tests/grpc_blobservice.rs
@@ -1,18 +1,14 @@
 use crate::blobservice::BlobService;
-use crate::chunkservice::ChunkService;
-use crate::proto::blob_meta::ChunkMeta;
 use crate::proto::blob_service_server::BlobService as GRPCBlobService;
 use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest};
-use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST};
-use crate::tests::utils::{gen_blob_service, gen_chunk_service};
+use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST};
+use crate::tests::utils::gen_blob_service;
+use tokio_stream::StreamExt;
 
-fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper<
-    impl BlobService + Send + Sync + Clone + 'static,
-    impl ChunkService + Send + Sync + Clone + 'static,
-> {
+fn gen_grpc_blob_service(
+) -> GRPCBlobServiceWrapper<impl BlobService + Send + Sync + Clone + 'static> {
     let blob_service = gen_blob_service();
-    let chunk_service = gen_chunk_service();
-    GRPCBlobServiceWrapper::new(blob_service, chunk_service)
+    GRPCBlobServiceWrapper::from(blob_service)
 }
 
 /// Trying to read a non-existent blob should return a not found error.
@@ -26,8 +22,13 @@ async fn not_found_read() {
         }))
         .await;
 
-    let e = resp.expect_err("must_be_err");
-    assert_eq!(e.code(), tonic::Code::NotFound);
+    // We can't use unwrap_err here, because the Ok value doesn't implement
+    // debug.
+    if let Err(e) = resp {
+        assert_eq!(e.code(), tonic::Code::NotFound);
+    } else {
+        panic!("resp is not err")
+    }
 }
 
 /// Trying to stat a non-existent blob should return a not found error.
@@ -47,8 +48,7 @@ async fn not_found_stat() {
     assert_eq!(resp.code(), tonic::Code::NotFound);
 }
 
-/// Put a blob in the store, get it back. We send something small enough so it
-/// won't get split into multiple chunks.
+/// Put a blob in the store, get it back.
 #[tokio::test]
 async fn put_read_stat() {
     let service = gen_grpc_blob_service();
@@ -64,39 +64,30 @@ async fn put_read_stat() {
 
     assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest);
 
-    // Stat for the digest of A. It should return one chunk.
+    // Stat for the digest of A.
+    // We currently don't ask for more granular chunking data, as we don't
+    // expose it yet.
     let resp = service
         .stat(tonic::Request::new(StatBlobRequest {
             digest: BLOB_A_DIGEST.to_vec(),
-            include_chunks: true,
             ..Default::default()
         }))
         .await
         .expect("must succeed")
         .into_inner();
 
-    assert_eq!(1, resp.chunks.len());
-    // the `chunks` field should point to the single chunk.
-    assert_eq!(
-        vec![ChunkMeta {
-            digest: BLOB_A_DIGEST.to_vec(),
-            size: BLOB_A.len() as u32,
-        }],
-        resp.chunks,
-    );
-
-    // Read the chunk. It should return the same data.
+    // Read the blob. It should return the same data.
     let resp = service
         .read(tonic::Request::new(ReadBlobRequest {
             digest: BLOB_A_DIGEST.to_vec(),
         }))
         .await;
 
-    let mut rx = resp.expect("must succeed").into_inner().into_inner();
+    let mut rx = resp.ok().unwrap().into_inner();
 
     // the stream should contain one element, a BlobChunk with the same contents as BLOB_A.
     let item = rx
-        .recv()
+        .next()
         .await
         .expect("must be some")
         .expect("must succeed");
@@ -104,127 +95,8 @@ async fn put_read_stat() {
     assert_eq!(BLOB_A.to_vec(), item.data);
 
     // … and no more elements
-    assert!(rx.recv().await.is_none());
-}
-
-/// Put a bigger blob in the store, and get it back.
-/// Assert the stat request actually returns more than one chunk, and
-/// we can read each chunk individually, as well as the whole blob via the
-/// `read()` method.
-#[tokio::test]
-async fn put_read_stat_large() {
-    let service = gen_grpc_blob_service();
-
-    // split up BLOB_B into BlobChunks containing 1K bytes each.
-    let blob_b_blobchunks: Vec<BlobChunk> = BLOB_B
-        .chunks(1024)
-        .map(|x| BlobChunk { data: x.to_vec() })
-        .collect();
-
-    assert!(blob_b_blobchunks.len() > 1);
+    assert!(rx.next().await.is_none());
 
-    // Send blob B
-    let put_resp = service
-        .put(tonic_mock::streaming_request(blob_b_blobchunks))
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    assert_eq!(BLOB_B_DIGEST.to_vec(), put_resp.digest);
-
-    // Stat for the digest of B
-    let resp = service
-        .stat(tonic::Request::new(StatBlobRequest {
-            digest: BLOB_B_DIGEST.to_vec(),
-            include_chunks: true,
-            ..Default::default()
-        }))
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    // it should return more than one chunk.
-    assert_ne!(1, resp.chunks.len());
-
-    // The size added up should equal the size of BLOB_B.
-    let mut size_in_stat: u32 = 0;
-    for chunk in &resp.chunks {
-        size_in_stat += chunk.size
-    }
-    assert_eq!(BLOB_B.len() as u32, size_in_stat);
-
-    // Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values.
-    // TODO: make the chunker config better accessible, so we don't need to synchronize this.
-    {
-        let chunker_avg_size = 64 * 1024;
-        let chunker_min_size = chunker_avg_size / 4;
-        let chunker_max_size = chunker_avg_size * 4;
-
-        // initialize a chunker with the current buffer
-        let blob_b = BLOB_B.to_vec();
-        let chunker = fastcdc::v2020::FastCDC::new(
-            &blob_b,
-            chunker_min_size,
-            chunker_avg_size,
-            chunker_max_size,
-        );
-
-        let mut num_chunks = 0;
-        for (i, chunk) in chunker.enumerate() {
-            assert_eq!(
-                resp.chunks[i].size, chunk.length as u32,
-                "expected locally-chunked chunk length to match stat response"
-            );
-
-            num_chunks += 1;
-        }
-
-        assert_eq!(
-            resp.chunks.len(),
-            num_chunks,
-            "expected number of chunks to match"
-        );
-    }
-
-    // Reading the whole blob by its digest via the read() interface should succeed.
-    {
-        let resp = service
-            .read(tonic::Request::new(ReadBlobRequest {
-                digest: BLOB_B_DIGEST.to_vec(),
-            }))
-            .await;
-
-        let mut rx = resp.expect("must succeed").into_inner().into_inner();
-
-        let mut buf: Vec<u8> = Vec::new();
-        while let Some(item) = rx.recv().await {
-            let mut blob_chunk = item.expect("must not be err");
-            buf.append(&mut blob_chunk.data);
-        }
-
-        assert_eq!(BLOB_B.to_vec(), buf);
-    }
-
-    // Reading the whole blob by reading individual chunks should also succeed.
-    {
-        let mut buf: Vec<u8> = Vec::new();
-        for chunk in &resp.chunks {
-            // request this individual chunk via read
-            let resp = service
-                .read(tonic::Request::new(ReadBlobRequest {
-                    digest: chunk.digest.clone(),
-                }))
-                .await;
-
-            let mut rx = resp.expect("must succeed").into_inner().into_inner();
-
-            // append all items from the stream to the buffer
-            while let Some(item) = rx.recv().await {
-                let mut blob_chunk = item.expect("must not be err");
-                buf.append(&mut blob_chunk.data);
-            }
-        }
-        // finished looping over all chunks, compare
-        assert_eq!(BLOB_B.to_vec(), buf);
-    }
+    // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks.
+    // Test with some bigger blob too
 }
diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
index f74c3fda0213..11cab2c264cc 100644
--- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
+++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
@@ -6,12 +6,10 @@ use crate::proto::GRPCPathInfoServiceWrapper;
 use crate::proto::PathInfo;
 use crate::proto::{GetPathInfoRequest, Node, SymlinkNode};
 use crate::tests::fixtures::DUMMY_OUTPUT_HASH;
-use crate::tests::utils::{
-    gen_blob_service, gen_chunk_service, gen_directory_service, gen_pathinfo_service,
-};
+use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service};
 use tonic::Request;
 
-/// generates a GRPCPathInfoService out of blob, chunk, directory and pathinfo services.
+/// generates a GRPCPathInfoService out of blob, directory and pathinfo services.
 ///
 /// We only interact with it via the PathInfo GRPC interface.
 /// It uses the NonCachingNARCalculationService NARCalculationService to
@@ -19,11 +17,7 @@ use tonic::Request;
 fn gen_grpc_service() -> impl GRPCPathInfoService {
     GRPCPathInfoServiceWrapper::new(
         gen_pathinfo_service(),
-        NonCachingNARCalculationService::new(
-            gen_blob_service(),
-            gen_chunk_service(),
-            gen_directory_service(),
-        ),
+        NonCachingNARCalculationService::new(gen_blob_service(), gen_directory_service()),
     )
 }