about summary refs log tree commit diff
path: root/tvix/store/src/proto
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-05-11T12·49+0300
committerflokli <flokli@flokli.de>2023-05-11T14·27+0000
commit616fa4476f93e1782e68dc713e9e8cb77a426c7d (patch)
treef76a43e95c75d848d079706fbccfd442210ebebc /tvix/store/src/proto
parentb22b685f0b2524c088deacbf4e80e7b7c73b5afc (diff)
refactor(tvix/store): remove ChunkService r/6133
Whether chunking is involved or not, is an implementation detail of each
Blobstore. Consumers of a whole blob shouldn't need to worry about that.
It currently is not visible in the gRPC interface either. It
shouldn't bleed into everything.

Let the BlobService trait provide `open_read` and `open_write` methods,
which return handles providing io::Read or io::Write, and leave the
details up to the implementation.

This means, our custom BlobReader module can go away, and all the
chunking bits in there, too.

In the future, we might still want to add more chunking-aware syncing,
but as a syncing strategy some stores can expose, not as a fundamental
protocol component.

This currently needs "SyncReadIntoAsyncRead", taken and vendored in from
https://github.com/tokio-rs/tokio/pull/5669.
It provides a AsyncRead for a sync Read, which is necessary to connect
our (sync) BlobReader interface to a GRPC server implementation.

As an alternative, we could also make the BlobReader itself async, and
let consumers of the trait (EvalIO) deal with the async-ness, but this
is less of a change for now.

In terms of vendoring, I initially tried to move our tokio crate to
these commits, but ended up in version incompatibilities, so let's
vendor it in for now.

Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix/store/src/proto')
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs242
-rw-r--r--tvix/store/src/proto/mod.rs2
-rw-r--r--tvix/store/src/proto/sync_read_into_async_read.rs158
-rw-r--r--tvix/store/src/proto/tests/grpc_blobservice.rs174
-rw-r--r--tvix/store/src/proto/tests/grpc_pathinfoservice.rs12
5 files changed, 270 insertions, 318 deletions
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index e891caa026..d1e98a5b79 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,54 +1,34 @@
-use std::collections::VecDeque;
-
 use crate::{
-    blobservice::BlobService,
-    chunkservice::{read_all_and_chunk, update_hasher, ChunkService},
-    Error,
+    blobservice::{BlobService, BlobWriter},
+    proto::sync_read_into_async_read::SyncReadIntoAsyncRead,
 };
 use data_encoding::BASE64;
-use tokio::{sync::mpsc::channel, task};
-use tokio_stream::{wrappers::ReceiverStream, StreamExt};
+use std::{collections::VecDeque, io, pin::Pin};
+use tokio::task;
+use tokio_stream::StreamExt;
+use tokio_util::io::ReaderStream;
 use tonic::{async_trait, Request, Response, Status, Streaming};
-use tracing::{debug, instrument, warn};
+use tracing::{instrument, warn};
 
-pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> {
+pub struct GRPCBlobServiceWrapper<BS: BlobService> {
     blob_service: BS,
-    chunk_service: CS,
 }
 
-impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
-    pub fn new(blob_service: BS, chunk_service: CS) -> Self {
+impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> {
+    fn from(value: BS) -> Self {
         Self {
-            blob_service,
-            chunk_service,
-        }
-    }
-
-    // upload the chunk to the chunk service, and return its digest (or an error) when done.
-    #[instrument(skip(chunk_service))]
-    fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
-        let mut hasher = blake3::Hasher::new();
-        update_hasher(&mut hasher, &chunk_data);
-        let digest = hasher.finalize();
-
-        if chunk_service.has(digest.as_bytes())? {
-            debug!("already has chunk, skipping");
+            blob_service: value,
         }
-        let digest_resp = chunk_service.put(chunk_data)?;
-
-        assert_eq!(&digest_resp, digest.as_bytes());
-
-        Ok(digest.as_bytes().to_vec())
     }
 }
 
 #[async_trait]
-impl<
-        BS: BlobService + Send + Sync + Clone + 'static,
-        CS: ChunkService + Send + Sync + Clone + 'static,
-    > super::blob_service_server::BlobService for GRPCBlobServiceWrapper<BS, CS>
+impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server::BlobService
+    for GRPCBlobServiceWrapper<BS>
 {
-    type ReadStream = ReceiverStream<Result<super::BlobChunk, Status>>;
+    // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
+    type ReadStream =
+        Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>;
 
     #[instrument(skip(self))]
     async fn stat(
@@ -56,12 +36,22 @@ impl<
         request: Request<super::StatBlobRequest>,
     ) -> Result<Response<super::BlobMeta>, Status> {
         let rq = request.into_inner();
-        match self.blob_service.stat(&rq) {
-            Ok(None) => Err(Status::not_found(format!(
+        let req_digest: [u8; 32] = rq
+            .digest
+            .clone()
+            .try_into()
+            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
+
+        if rq.include_chunks || rq.include_bao {
+            return Err(Status::internal("not implemented"));
+        }
+
+        match self.blob_service.has(&req_digest) {
+            Ok(true) => Ok(Response::new(super::BlobMeta::default())),
+            Ok(false) => Err(Status::not_found(format!(
                 "blob {} not found",
-                BASE64.encode(&rq.digest)
+                BASE64.encode(&req_digest)
             ))),
-            Ok(Some(blob_meta)) => Ok(Response::new(blob_meta)),
             Err(e) => Err(e.into()),
         }
     }
@@ -71,99 +61,38 @@ impl<
         &self,
         request: Request<super::ReadBlobRequest>,
     ) -> Result<Response<Self::ReadStream>, Status> {
-        let req = request.into_inner();
-        let (tx, rx) = channel(5);
+        let rq = request.into_inner();
 
-        let req_digest: [u8; 32] = req
+        let req_digest: [u8; 32] = rq
             .digest
+            .clone()
             .try_into()
-            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
-
-        // query the blob service for more detailed blob info
-        let stat_resp = self.blob_service.stat(&super::StatBlobRequest {
-            digest: req_digest.to_vec(),
-            include_chunks: true,
-            ..Default::default()
-        })?;
-
-        match stat_resp {
-            None => {
-                // If the stat didn't return any blobmeta, the client might
-                // still have asked for a single chunk to be read.
-                // Check the chunkstore.
-                if let Some(data) = self.chunk_service.get(&req_digest)? {
-                    // We already know the hash matches, and contrary to
-                    // iterating over a blobmeta, we can't know the size,
-                    // so send the contents of that chunk over,
-                    // as the first (and only) element of the stream.
-                    task::spawn(async move {
-                        let res = Ok(super::BlobChunk { data });
-                        // send the result to the client. If the client already left, that's also fine.
-                        if (tx.send(res).await).is_err() {
-                            debug!("receiver dropped");
-                        }
-                    });
-                } else {
-                    return Err(Status::not_found(format!(
-                        "blob {} not found",
-                        BASE64.encode(&req_digest),
-                    )));
-                }
-            }
-            Some(blobmeta) => {
-                let chunk_client = self.chunk_service.clone();
-
-                // TODO: use BlobReader?
-                // But then we might not be able to send compressed chunks as-is.
-                // Might require implementing https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html for it
-                // first, so we can .next().await in here.
-
-                task::spawn(async move {
-                    for chunkmeta in blobmeta.chunks {
-                        // request chunk.
-                        // We don't need to validate the digest again, as
-                        // that's required for all implementations of ChunkService.
-                        // TODO: handle error
-                        let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap();
-                        let res = match chunk_client.get(chunkmeta_digest) {
-                            Err(e) => Err(e.into()),
-                            // TODO: make this a separate error type
-                            Ok(None) => Err(Error::StorageError(format!(
-                                "consistency error: chunk {} for blob {} not found",
-                                BASE64.encode(chunkmeta_digest),
-                                BASE64.encode(&req_digest),
-                            ))
-                            .into()),
-                            Ok(Some(data)) => {
-                                // We already know the hash matches, but also
-                                // check the size matches what chunkmeta said.
-                                if data.len() as u32 != chunkmeta.size {
-                                    Err(Error::StorageError(format!(
-                                        "consistency error: chunk {} for blob {} has wrong size, expected {}, got {}",
-                                        BASE64.encode(chunkmeta_digest),
-                                        BASE64.encode(&req_digest),
-                                        chunkmeta.size,
-                                        data.len(),
-                                    )).into())
-                                } else {
-                                    // send out the current chunk
-                                    // TODO: we might want to break this up further if too big?
-                                    Ok(super::BlobChunk { data })
-                                }
-                            }
-                        };
-                        // send the result to the client
-                        if (tx.send(res).await).is_err() {
-                            debug!("receiver dropped");
-                            break;
-                        }
+            .map_err(|_| Status::invalid_argument("invalid digest length"))?;
+
+        match self.blob_service.open_read(&req_digest) {
+            Ok(Some(reader)) => {
+                let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into();
+
+                fn stream_mapper(
+                    x: Result<bytes::Bytes, io::Error>,
+                ) -> Result<super::BlobChunk, Status> {
+                    match x {
+                        Ok(bytes) => Ok(super::BlobChunk {
+                            data: bytes.to_vec(),
+                        }),
+                        Err(e) => Err(Status::from(e)),
                     }
-                });
+                }
+
+                let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper);
+                Ok(Response::new(Box::pin(chunks_stream)))
             }
+            Ok(None) => Err(Status::not_found(format!(
+                "blob {} not found",
+                BASE64.encode(&rq.digest)
+            ))),
+            Err(e) => Err(e.into()),
         }
-
-        let receiver_stream = ReceiverStream::new(rx);
-        Ok(Response::new(receiver_stream))
     }
 
     #[instrument(skip(self))]
@@ -180,37 +109,34 @@ impl<
 
         let data_reader = tokio_util::io::StreamReader::new(data_stream);
 
-        // TODO: can we get rid of this clone?
-        let chunk_service = self.chunk_service.clone();
-
-        let (blob_digest, blob_meta) =
-            task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> {
-                // feed read_all_and_chunk a (sync) reader to the data retrieved from the stream.
-                read_all_and_chunk(
-                    &chunk_service,
-                    tokio_util::io::SyncIoBridge::new(data_reader),
-                )
-            })
-            .await
-            .map_err(|e| Status::internal(e.to_string()))??;
-
-        // upload blobmeta if not there yet
-        if self
+        // prepare a writer, which we'll use in the blocking task below.
+        let mut writer = self
             .blob_service
-            .stat(&super::StatBlobRequest {
-                digest: blob_digest.to_vec(),
-                include_chunks: false,
-                include_bao: false,
-            })?
-            .is_none()
-        {
-            // upload blobmeta
-            self.blob_service.put(&blob_digest, blob_meta)?;
-        }
-
-        // return to client.
-        Ok(Response::new(super::PutBlobResponse {
-            digest: blob_digest,
-        }))
+            .open_write()
+            .map_err(|e| Status::internal(format!("unable to open for write: {}", e)))?;
+
+        let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> {
+            // construct a sync reader to the data
+            let mut reader = tokio_util::io::SyncIoBridge::new(data_reader);
+
+            io::copy(&mut reader, &mut writer).map_err(|e| {
+                warn!("error copying: {}", e);
+                Status::internal("error copying")
+            })?;
+
+            let digest = writer
+                .close()
+                .map_err(|e| {
+                    warn!("error closing stream: {}", e);
+                    Status::internal("error closing stream")
+                })?
+                .to_vec();
+
+            Ok(super::PutBlobResponse { digest })
+        })
+        .await
+        .map_err(|_| Status::internal("failed to wait for task"))??;
+
+        Ok(Response::new(result))
     }
 }
diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs
index 5002d7e77a..62741a3ad5 100644
--- a/tvix/store/src/proto/mod.rs
+++ b/tvix/store/src/proto/mod.rs
@@ -11,6 +11,8 @@ mod grpc_blobservice_wrapper;
 mod grpc_directoryservice_wrapper;
 mod grpc_pathinfoservice_wrapper;
 
+mod sync_read_into_async_read;
+
 pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper;
 pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper;
 pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper;
diff --git a/tvix/store/src/proto/sync_read_into_async_read.rs b/tvix/store/src/proto/sync_read_into_async_read.rs
new file mode 100644
index 0000000000..0a0ef01978
--- /dev/null
+++ b/tvix/store/src/proto/sync_read_into_async_read.rs
@@ -0,0 +1,158 @@
+use bytes::Buf;
+use core::task::Poll::Ready;
+use futures::ready;
+use futures::Future;
+use std::io;
+use std::io::Read;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+use tokio::io::AsyncRead;
+use tokio::runtime::Handle;
+use tokio::sync::Mutex;
+use tokio::task::JoinHandle;
+
+#[derive(Debug)]
+enum State<Buf: bytes::Buf + bytes::BufMut> {
+    Idle(Option<Buf>),
+    Busy(JoinHandle<(io::Result<usize>, Buf)>),
+}
+
+use State::{Busy, Idle};
+
+/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a
+/// synchronous API.
+#[derive(Debug)]
+pub struct SyncReadIntoAsyncRead<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> {
+    state: Mutex<State<Buf>>,
+    reader: Arc<Mutex<R>>,
+    rt: Handle,
+}
+
+impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> SyncReadIntoAsyncRead<R, Buf> {
+    /// This must be called from within a Tokio runtime context, or else it will panic.
+    #[track_caller]
+    pub fn new(rt: Handle, reader: R) -> Self {
+        Self {
+            rt,
+            state: State::Idle(None).into(),
+            reader: Arc::new(reader.into()),
+        }
+    }
+
+    /// This must be called from within a Tokio runtime context, or else it will panic.
+    pub fn new_with_reader(readable: R) -> Self {
+        Self::new(Handle::current(), readable)
+    }
+}
+
+/// Repeats operations that are interrupted.
+macro_rules! uninterruptibly {
+    ($e:expr) => {{
+        loop {
+            match $e {
+                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
+                res => break res,
+            }
+        }
+    }};
+}
+
+impl<
+        R: Read + Send + 'static + std::marker::Unpin,
+        Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static,
+    > AsyncRead for SyncReadIntoAsyncRead<R, Buf>
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        dst: &mut tokio::io::ReadBuf<'_>,
+    ) -> Poll<io::Result<()>> {
+        let me = self.get_mut();
+        // Do we need this mutex?
+        let state = me.state.get_mut();
+
+        loop {
+            match state {
+                Idle(ref mut buf_cell) => {
+                    let mut buf = buf_cell.take().unwrap_or_default();
+
+                    if buf.has_remaining() {
+                        // Here, we will split the `buf` into `[..dst.remaining()... ; rest ]`
+                        // The `rest` is stuffed into the `buf_cell` for further poll_read.
+                        // The other is completely consumed into the unfilled destination.
+                        // `rest` can be empty.
+                        let mut adjusted_src =
+                            buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining()));
+                        let copied_size = adjusted_src.remaining();
+                        adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size));
+                        dst.set_filled(copied_size);
+                        *buf_cell = Some(buf);
+                        return Ready(Ok(()));
+                    }
+
+                    let reader = me.reader.clone();
+                    *state = Busy(me.rt.spawn_blocking(move || {
+                        let result = uninterruptibly!(reader.blocking_lock().read(
+                            // SAFETY: `reader.read` will *ONLY* write initialized bytes
+                            // and never *READ* uninitialized bytes
+                            // inside this buffer.
+                            //
+                            // Furthermore, casting the slice as `*mut [u8]`
+                            // is safe because it has the same layout.
+                            //
+                            // Finally, the pointer obtained is valid and owned
+                            // by `buf` only as we have a valid mutable reference
+                            // to it, it is valid for write.
+                            //
+                            // Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998
+                            unsafe {
+                                &mut *(buf.chunk_mut().as_uninit_slice_mut()
+                                    as *mut [std::mem::MaybeUninit<u8>]
+                                    as *mut [u8])
+                            }
+                        ));
+
+                        if let Ok(n) = result {
+                            // SAFETY: given we initialize `n` bytes, we can move `n` bytes
+                            // forward.
+                            unsafe {
+                                buf.advance_mut(n);
+                            }
+                        }
+
+                        (result, buf)
+                    }));
+                }
+                Busy(ref mut rx) => {
+                    let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?;
+
+                    match result {
+                        Ok(n) => {
+                            if n > 0 {
+                                let remaining = std::cmp::min(n, dst.remaining());
+                                let mut adjusted_src = buf.copy_to_bytes(remaining);
+                                adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining));
+                                dst.advance(remaining);
+                            }
+                            *state = Idle(Some(buf));
+                            return Ready(Ok(()));
+                        }
+                        Err(e) => {
+                            *state = Idle(None);
+                            return Ready(Err(e));
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> From<R> for SyncReadIntoAsyncRead<R, Buf> {
+    /// This must be called from within a Tokio runtime context, or else it will panic.
+    fn from(value: R) -> Self {
+        Self::new_with_reader(value)
+    }
+}
diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs
index 88eb21e302..80e2e71867 100644
--- a/tvix/store/src/proto/tests/grpc_blobservice.rs
+++ b/tvix/store/src/proto/tests/grpc_blobservice.rs
@@ -1,18 +1,14 @@
 use crate::blobservice::BlobService;
-use crate::chunkservice::ChunkService;
-use crate::proto::blob_meta::ChunkMeta;
 use crate::proto::blob_service_server::BlobService as GRPCBlobService;
 use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest};
-use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST};
-use crate::tests::utils::{gen_blob_service, gen_chunk_service};
+use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST};
+use crate::tests::utils::gen_blob_service;
+use tokio_stream::StreamExt;
 
-fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper<
-    impl BlobService + Send + Sync + Clone + 'static,
-    impl ChunkService + Send + Sync + Clone + 'static,
-> {
+fn gen_grpc_blob_service(
+) -> GRPCBlobServiceWrapper<impl BlobService + Send + Sync + Clone + 'static> {
     let blob_service = gen_blob_service();
-    let chunk_service = gen_chunk_service();
-    GRPCBlobServiceWrapper::new(blob_service, chunk_service)
+    GRPCBlobServiceWrapper::from(blob_service)
 }
 
 /// Trying to read a non-existent blob should return a not found error.
@@ -26,8 +22,13 @@ async fn not_found_read() {
         }))
         .await;
 
-    let e = resp.expect_err("must_be_err");
-    assert_eq!(e.code(), tonic::Code::NotFound);
+    // We can't use unwrap_err here, because the Ok value doesn't implement
+    // debug.
+    if let Err(e) = resp {
+        assert_eq!(e.code(), tonic::Code::NotFound);
+    } else {
+        panic!("resp is not err")
+    }
 }
 
 /// Trying to stat a non-existent blob should return a not found error.
@@ -47,8 +48,7 @@ async fn not_found_stat() {
     assert_eq!(resp.code(), tonic::Code::NotFound);
 }
 
-/// Put a blob in the store, get it back. We send something small enough so it
-/// won't get split into multiple chunks.
+/// Put a blob in the store, get it back.
 #[tokio::test]
 async fn put_read_stat() {
     let service = gen_grpc_blob_service();
@@ -64,39 +64,30 @@ async fn put_read_stat() {
 
     assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest);
 
-    // Stat for the digest of A. It should return one chunk.
+    // Stat for the digest of A.
+    // We currently don't ask for more granular chunking data, as we don't
+    // expose it yet.
     let resp = service
         .stat(tonic::Request::new(StatBlobRequest {
             digest: BLOB_A_DIGEST.to_vec(),
-            include_chunks: true,
             ..Default::default()
         }))
         .await
         .expect("must succeed")
         .into_inner();
 
-    assert_eq!(1, resp.chunks.len());
-    // the `chunks` field should point to the single chunk.
-    assert_eq!(
-        vec![ChunkMeta {
-            digest: BLOB_A_DIGEST.to_vec(),
-            size: BLOB_A.len() as u32,
-        }],
-        resp.chunks,
-    );
-
-    // Read the chunk. It should return the same data.
+    // Read the blob. It should return the same data.
     let resp = service
         .read(tonic::Request::new(ReadBlobRequest {
             digest: BLOB_A_DIGEST.to_vec(),
         }))
         .await;
 
-    let mut rx = resp.expect("must succeed").into_inner().into_inner();
+    let mut rx = resp.ok().unwrap().into_inner();
 
     // the stream should contain one element, a BlobChunk with the same contents as BLOB_A.
     let item = rx
-        .recv()
+        .next()
         .await
         .expect("must be some")
         .expect("must succeed");
@@ -104,127 +95,8 @@ async fn put_read_stat() {
     assert_eq!(BLOB_A.to_vec(), item.data);
 
     // … and no more elements
-    assert!(rx.recv().await.is_none());
-}
-
-/// Put a bigger blob in the store, and get it back.
-/// Assert the stat request actually returns more than one chunk, and
-/// we can read each chunk individually, as well as the whole blob via the
-/// `read()` method.
-#[tokio::test]
-async fn put_read_stat_large() {
-    let service = gen_grpc_blob_service();
-
-    // split up BLOB_B into BlobChunks containing 1K bytes each.
-    let blob_b_blobchunks: Vec<BlobChunk> = BLOB_B
-        .chunks(1024)
-        .map(|x| BlobChunk { data: x.to_vec() })
-        .collect();
-
-    assert!(blob_b_blobchunks.len() > 1);
+    assert!(rx.next().await.is_none());
 
-    // Send blob B
-    let put_resp = service
-        .put(tonic_mock::streaming_request(blob_b_blobchunks))
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    assert_eq!(BLOB_B_DIGEST.to_vec(), put_resp.digest);
-
-    // Stat for the digest of B
-    let resp = service
-        .stat(tonic::Request::new(StatBlobRequest {
-            digest: BLOB_B_DIGEST.to_vec(),
-            include_chunks: true,
-            ..Default::default()
-        }))
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    // it should return more than one chunk.
-    assert_ne!(1, resp.chunks.len());
-
-    // The size added up should equal the size of BLOB_B.
-    let mut size_in_stat: u32 = 0;
-    for chunk in &resp.chunks {
-        size_in_stat += chunk.size
-    }
-    assert_eq!(BLOB_B.len() as u32, size_in_stat);
-
-    // Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values.
-    // TODO: make the chunker config better accessible, so we don't need to synchronize this.
-    {
-        let chunker_avg_size = 64 * 1024;
-        let chunker_min_size = chunker_avg_size / 4;
-        let chunker_max_size = chunker_avg_size * 4;
-
-        // initialize a chunker with the current buffer
-        let blob_b = BLOB_B.to_vec();
-        let chunker = fastcdc::v2020::FastCDC::new(
-            &blob_b,
-            chunker_min_size,
-            chunker_avg_size,
-            chunker_max_size,
-        );
-
-        let mut num_chunks = 0;
-        for (i, chunk) in chunker.enumerate() {
-            assert_eq!(
-                resp.chunks[i].size, chunk.length as u32,
-                "expected locally-chunked chunk length to match stat response"
-            );
-
-            num_chunks += 1;
-        }
-
-        assert_eq!(
-            resp.chunks.len(),
-            num_chunks,
-            "expected number of chunks to match"
-        );
-    }
-
-    // Reading the whole blob by its digest via the read() interface should succeed.
-    {
-        let resp = service
-            .read(tonic::Request::new(ReadBlobRequest {
-                digest: BLOB_B_DIGEST.to_vec(),
-            }))
-            .await;
-
-        let mut rx = resp.expect("must succeed").into_inner().into_inner();
-
-        let mut buf: Vec<u8> = Vec::new();
-        while let Some(item) = rx.recv().await {
-            let mut blob_chunk = item.expect("must not be err");
-            buf.append(&mut blob_chunk.data);
-        }
-
-        assert_eq!(BLOB_B.to_vec(), buf);
-    }
-
-    // Reading the whole blob by reading individual chunks should also succeed.
-    {
-        let mut buf: Vec<u8> = Vec::new();
-        for chunk in &resp.chunks {
-            // request this individual chunk via read
-            let resp = service
-                .read(tonic::Request::new(ReadBlobRequest {
-                    digest: chunk.digest.clone(),
-                }))
-                .await;
-
-            let mut rx = resp.expect("must succeed").into_inner().into_inner();
-
-            // append all items from the stream to the buffer
-            while let Some(item) = rx.recv().await {
-                let mut blob_chunk = item.expect("must not be err");
-                buf.append(&mut blob_chunk.data);
-            }
-        }
-        // finished looping over all chunks, compare
-        assert_eq!(BLOB_B.to_vec(), buf);
-    }
+    // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks.
+    // Test with some bigger blob too
 }
diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
index f74c3fda02..11cab2c264 100644
--- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
+++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
@@ -6,12 +6,10 @@ use crate::proto::GRPCPathInfoServiceWrapper;
 use crate::proto::PathInfo;
 use crate::proto::{GetPathInfoRequest, Node, SymlinkNode};
 use crate::tests::fixtures::DUMMY_OUTPUT_HASH;
-use crate::tests::utils::{
-    gen_blob_service, gen_chunk_service, gen_directory_service, gen_pathinfo_service,
-};
+use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service};
 use tonic::Request;
 
-/// generates a GRPCPathInfoService out of blob, chunk, directory and pathinfo services.
+/// generates a GRPCPathInfoService out of blob, directory and pathinfo services.
 ///
 /// We only interact with it via the PathInfo GRPC interface.
 /// It uses the NonCachingNARCalculationService NARCalculationService to
@@ -19,11 +17,7 @@ use tonic::Request;
 fn gen_grpc_service() -> impl GRPCPathInfoService {
     GRPCPathInfoServiceWrapper::new(
         gen_pathinfo_service(),
-        NonCachingNARCalculationService::new(
-            gen_blob_service(),
-            gen_chunk_service(),
-            gen_directory_service(),
-        ),
+        NonCachingNARCalculationService::new(gen_blob_service(), gen_directory_service()),
     )
 }