about summary refs log tree commit diff
path: root/tvix/store/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src')
-rw-r--r--tvix/store/src/bin/tvix-store.rs14
-rw-r--r--tvix/store/src/blobreader.rs376
-rw-r--r--tvix/store/src/blobservice/memory.rs87
-rw-r--r--tvix/store/src/blobservice/mod.rs37
-rw-r--r--tvix/store/src/blobservice/sled.rs103
-rw-r--r--tvix/store/src/chunkservice/memory.rs53
-rw-r--r--tvix/store/src/chunkservice/mod.rs28
-rw-r--r--tvix/store/src/chunkservice/sled.rs70
-rw-r--r--tvix/store/src/chunkservice/util.rs85
-rw-r--r--tvix/store/src/import.rs54
-rw-r--r--tvix/store/src/lib.rs3
-rw-r--r--tvix/store/src/nar/non_caching_calculation_service.rs21
-rw-r--r--tvix/store/src/nar/renderer.rs70
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs242
-rw-r--r--tvix/store/src/proto/mod.rs2
-rw-r--r--tvix/store/src/proto/sync_read_into_async_read.rs158
-rw-r--r--tvix/store/src/proto/tests/grpc_blobservice.rs174
-rw-r--r--tvix/store/src/proto/tests/grpc_pathinfoservice.rs12
-rw-r--r--tvix/store/src/tests/import.rs31
-rw-r--r--tvix/store/src/tests/nar_renderer.rs179
-rw-r--r--tvix/store/src/tests/utils.rs5
21 files changed, 553 insertions, 1251 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 72814fa530e7..29f8a92cb1be 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -8,7 +8,6 @@ use nix_compat::nixhash::NixHashWithMode;
 use std::path::PathBuf;
 use tracing_subscriber::prelude::*;
 use tvix_store::blobservice::SledBlobService;
-use tvix_store::chunkservice::SledChunkService;
 use tvix_store::directoryservice::SledDirectoryService;
 use tvix_store::import::import_path;
 use tvix_store::nar::NARCalculationService;
@@ -87,7 +86,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
     // initialize stores
     let mut blob_service = SledBlobService::new("blobs.sled".into())?;
-    let mut chunk_service = SledChunkService::new("chunks.sled".into())?;
     let mut directory_service = SledDirectoryService::new("directories.sled".into())?;
     let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?;
 
@@ -102,15 +100,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
 
             let nar_calculation_service = NonCachingNARCalculationService::new(
                 blob_service.clone(),
-                chunk_service.clone(),
                 directory_service.clone(),
             );
 
             #[allow(unused_mut)]
             let mut router = server
-                .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
+                .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from(
                     blob_service,
-                    chunk_service,
                 )))
                 .add_service(DirectoryServiceServer::new(
                     GRPCDirectoryServiceWrapper::from(directory_service),
@@ -135,17 +131,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
         Commands::Import { paths } => {
             let nar_calculation_service = NonCachingNARCalculationService::new(
                 blob_service.clone(),
-                chunk_service.clone(),
                 directory_service.clone(),
             );
 
             for path in paths {
-                let root_node = import_path(
-                    &mut blob_service,
-                    &mut chunk_service,
-                    &mut directory_service,
-                    &path,
-                )?;
+                let root_node = import_path(&mut blob_service, &mut directory_service, &path)?;
 
                 let nar_hash = NixHashWithMode::Recursive(NixHash::new(
                     HashAlgo::Sha256,
diff --git a/tvix/store/src/blobreader.rs b/tvix/store/src/blobreader.rs
deleted file mode 100644
index ff0efdf15237..000000000000
--- a/tvix/store/src/blobreader.rs
+++ /dev/null
@@ -1,376 +0,0 @@
-use std::io::{self, Cursor, Read, Write};
-
-use data_encoding::BASE64;
-
-use crate::{chunkservice::ChunkService, proto};
-
-/// BlobReader implements reading of a blob, by querying individual chunks.
-///
-/// It doesn't talk to BlobService, but assumes something has already fetched
-/// blob_meta already.
-pub struct BlobReader<'a, CS: ChunkService> {
-    // used to look up chunks
-    chunk_service: &'a CS,
-
-    // internal iterator over chunk hashes and their sizes
-    chunks_iter: std::vec::IntoIter<proto::blob_meta::ChunkMeta>,
-
-    // If a chunk was partially read (if buf.len() < chunk.size),
-    // a cursor to its contents are stored here.
-    current_chunk: Option<Cursor<Vec<u8>>>,
-}
-
-impl<'a, CS: ChunkService> BlobReader<'a, CS> {
-    pub fn open(chunk_service: &'a CS, blob_meta: proto::BlobMeta) -> Self {
-        Self {
-            chunk_service,
-            chunks_iter: blob_meta.chunks.into_iter(),
-            current_chunk: None,
-        }
-    }
-
-    /// reads (up to n bytes) from the current chunk into buf (if there is
-    /// a chunk).
-    ///
-    /// If it arrives at the end of the chunk, sets it back to None.
-    /// Returns a [std::io::Result<usize>] of the bytes read from the chunk.
-    fn read_from_current_chunk<W: std::io::Write>(
-        &mut self,
-        m: usize,
-        buf: &mut W,
-    ) -> std::io::Result<usize> {
-        // If there's still something in partial_chunk, read from there
-        // (up to m: usize bytes) and return the number of bytes read.
-        if let Some(current_chunk) = &mut self.current_chunk {
-            let result = io::copy(&mut current_chunk.take(m as u64), buf);
-
-            match result {
-                Ok(n) => {
-                    // if we were not able to read all off m bytes,
-                    // this means we arrived at the end of the chunk.
-                    if n < m as u64 {
-                        self.current_chunk = None
-                    }
-
-                    // n can never be > m, so downcasting this to usize is ok.
-                    Ok(n as usize)
-                }
-                Err(e) => Err(e),
-            }
-        } else {
-            Ok(0)
-        }
-    }
-}
-
-impl<CS: ChunkService> std::io::Read for BlobReader<'_, CS> {
-    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
-        let read_max = buf.len();
-        let mut bytes_read = 0_usize;
-        let mut buf_w = std::io::BufWriter::new(buf);
-
-        // read up to buf.len() bytes into buf, by reading from the current
-        // chunk and subsequent ones.
-        loop {
-            // try to fill buf with bytes from the current chunk
-            // (if there's still one)
-            let n = self.read_from_current_chunk(read_max - bytes_read, &mut buf_w)?;
-            bytes_read += n;
-
-            // We want to make sure we don't accidentially read past more than
-            // we're allowed to.
-            assert!(bytes_read <= read_max);
-
-            // buf is entirerly filled, we're done.
-            if bytes_read == read_max {
-                buf_w.flush()?;
-                break Ok(bytes_read);
-            }
-
-            // Otherwise, bytes_read is < read_max, so we could still write
-            // more to buf.
-            // Check if we have more chunks to read from.
-            match self.chunks_iter.next() {
-                // No more chunks, we're done.
-                None => {
-                    buf_w.flush()?;
-                    return Ok(bytes_read);
-                }
-                // There's another chunk to visit, fetch its contents
-                Some(chunk_meta) => {
-                    let chunk_meta_digest: [u8; 32] =
-                        chunk_meta.digest.clone().try_into().map_err(|_e| {
-                            std::io::Error::new(
-                                io::ErrorKind::InvalidData,
-                                format!(
-                                    "chunk in chunkmeta has wrong digest size, expected 32, got {}",
-                                    chunk_meta.digest.len(),
-                                ),
-                            )
-                        })?;
-                    match self.chunk_service.get(&chunk_meta_digest) {
-                        // Fetch successful, put it into `self.current_chunk` and restart the loop.
-                        Ok(Some(chunk_data)) => {
-                            // make sure the size matches what chunk_meta says as well.
-                            if chunk_data.len() as u32 != chunk_meta.size {
-                                break Err(std::io::Error::new(
-                                io::ErrorKind::InvalidData,
-                                format!(
-                                    "chunk_service returned chunk with wrong size for {}, expected {}, got {}",
-                                    BASE64.encode(&chunk_meta.digest), chunk_meta.size, chunk_data.len()
-                                )
-                            ));
-                            }
-                            self.current_chunk = Some(Cursor::new(chunk_data));
-                        }
-                        // Chunk requested does not exist
-                        Ok(None) => {
-                            break Err(std::io::Error::new(
-                                io::ErrorKind::NotFound,
-                                format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)),
-                            ))
-                        }
-                        // Error occured while fetching the next chunk, propagate the error from the chunk service
-                        Err(e) => {
-                            break Err(std::io::Error::new(io::ErrorKind::InvalidData, e));
-                        }
-                    }
-                }
-            }
-        }
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::BlobReader;
-    use crate::chunkservice::ChunkService;
-    use crate::proto;
-    use crate::tests::fixtures::DUMMY_DATA_1;
-    use crate::tests::fixtures::DUMMY_DATA_2;
-    use crate::tests::fixtures::DUMMY_DIGEST;
-    use crate::tests::utils::gen_chunk_service;
-    use std::io::Cursor;
-    use std::io::Read;
-    use std::io::Write;
-
-    #[test]
-    /// reading from a blobmeta with zero chunks should produce zero bytes.
-    fn empty_blobmeta() -> anyhow::Result<()> {
-        let chunk_service = gen_chunk_service();
-
-        let blobmeta = proto::BlobMeta {
-            chunks: vec![],
-            inline_bao: vec![],
-        };
-
-        let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
-        let mut buf = Cursor::new(Vec::new());
-
-        let res = std::io::copy(&mut blob_reader, &mut buf);
-
-        assert_eq!(0, res.unwrap());
-
-        Ok(())
-    }
-
-    #[test]
-    /// trying to read something where the chunk doesn't exist should fail
-    fn missing_chunk_fail() -> anyhow::Result<()> {
-        let chunk_service = gen_chunk_service();
-
-        let blobmeta = proto::BlobMeta {
-            chunks: vec![proto::blob_meta::ChunkMeta {
-                digest: DUMMY_DIGEST.to_vec(),
-                size: 42,
-            }],
-            inline_bao: vec![],
-        };
-
-        let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
-        let mut buf = Cursor::new(Vec::new());
-
-        let res = std::io::copy(&mut blob_reader, &mut buf);
-
-        assert!(res.is_err());
-
-        Ok(())
-    }
-
-    #[test]
-    /// read something containing the single (empty) chunk
-    fn empty_chunk() -> anyhow::Result<()> {
-        let chunk_service = gen_chunk_service();
-
-        // insert a single chunk
-        let dgst = chunk_service.put(vec![]).expect("must succeed");
-
-        // assemble a blobmeta
-        let blobmeta = proto::BlobMeta {
-            chunks: vec![proto::blob_meta::ChunkMeta {
-                digest: dgst.to_vec(),
-                size: 0,
-            }],
-            inline_bao: vec![],
-        };
-
-        let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
-
-        let mut buf: Vec<u8> = Vec::new();
-
-        let res =
-            std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
-
-        assert_eq!(res, 0, "number of bytes read must match");
-        assert!(buf.is_empty(), "buf must be empty");
-
-        Ok(())
-    }
-
-    /// read something which contains a single chunk
-    #[test]
-    fn single_chunk() -> anyhow::Result<()> {
-        let chunk_service = gen_chunk_service();
-
-        // insert a single chunk
-        let dgst = chunk_service
-            .put(DUMMY_DATA_1.clone())
-            .expect("must succeed");
-
-        // assemble a blobmeta
-        let blobmeta = proto::BlobMeta {
-            chunks: vec![proto::blob_meta::ChunkMeta {
-                digest: dgst.to_vec(),
-                size: 3,
-            }],
-            inline_bao: vec![],
-        };
-
-        let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
-
-        let mut buf: Vec<u8> = Vec::new();
-
-        let res =
-            std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
-
-        assert_eq!(res, 3, "number of bytes read must match");
-        assert_eq!(DUMMY_DATA_1[..], buf[..], "data read must match");
-
-        Ok(())
-    }
-
-    /// read something referring to a chunk, but with wrong size
-    #[test]
-    fn wrong_size_fail() -> anyhow::Result<()> {
-        let chunk_service = gen_chunk_service();
-
-        // insert chunks
-        let dgst_1 = chunk_service
-            .put(DUMMY_DATA_1.clone())
-            .expect("must succeed");
-
-        // assemble a blobmeta
-        let blobmeta = proto::BlobMeta {
-            chunks: vec![proto::blob_meta::ChunkMeta {
-                digest: dgst_1.to_vec(),
-                size: 42,
-            }],
-            inline_bao: vec![],
-        };
-
-        let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
-
-        let mut buf: Vec<u8> = Vec::new();
-
-        let res = std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf));
-
-        assert!(res.is_err(), "reading must fail");
-
-        Ok(())
-    }
-
-    /// read something referring to multiple chunks
-    #[test]
-    fn multiple_chunks() -> anyhow::Result<()> {
-        let chunk_service = gen_chunk_service();
-
-        // insert chunks
-        let dgst_1 = chunk_service
-            .put(DUMMY_DATA_1.clone())
-            .expect("must succeed");
-        let dgst_2 = chunk_service
-            .put(DUMMY_DATA_2.clone())
-            .expect("must succeed");
-
-        // assemble a blobmeta
-        let blobmeta = proto::BlobMeta {
-            chunks: vec![
-                proto::blob_meta::ChunkMeta {
-                    digest: dgst_1.to_vec(),
-                    size: 3,
-                },
-                proto::blob_meta::ChunkMeta {
-                    digest: dgst_2.to_vec(),
-                    size: 2,
-                },
-                proto::blob_meta::ChunkMeta {
-                    digest: dgst_1.to_vec(),
-                    size: 3,
-                },
-            ],
-            inline_bao: vec![],
-        };
-
-        // assemble ecpected data
-        let mut expected_data: Vec<u8> = Vec::new();
-        expected_data.extend_from_slice(&DUMMY_DATA_1[..]);
-        expected_data.extend_from_slice(&DUMMY_DATA_2[..]);
-        expected_data.extend_from_slice(&DUMMY_DATA_1[..]);
-
-        // read via io::copy
-        {
-            let mut blob_reader = BlobReader::open(&chunk_service, blobmeta.clone());
-
-            let mut buf: Vec<u8> = Vec::new();
-
-            let res =
-                std::io::copy(&mut blob_reader, &mut Cursor::new(&mut buf)).expect("must succeed");
-
-            assert_eq!(8, res, "number of bytes read must match");
-
-            assert_eq!(expected_data[..], buf[..], "data read must match");
-        }
-
-        // now read the same thing again, but not via io::copy, but individually
-        {
-            let mut blob_reader = BlobReader::open(&chunk_service, blobmeta);
-
-            let mut buf: Vec<u8> = Vec::new();
-            let mut cursor = Cursor::new(&mut buf);
-
-            let mut bytes_read = 0;
-
-            loop {
-                let mut smallbuf = [0xff; 1];
-                match blob_reader.read(&mut smallbuf) {
-                    Ok(n) => {
-                        if n == 0 {
-                            break;
-                        }
-                        let w_b = cursor.write(&smallbuf).unwrap();
-                        assert_eq!(n, w_b);
-                        bytes_read += w_b;
-                    }
-                    Err(_) => {
-                        panic!("error occured during read");
-                    }
-                }
-            }
-
-            assert_eq!(8, bytes_read, "number of bytes read must match");
-            assert_eq!(expected_data[..], buf[..], "data read must match");
-        }
-
-        Ok(())
-    }
-}
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs
index 57c86551e4de..9a796ca2c0c8 100644
--- a/tvix/store/src/blobservice/memory.rs
+++ b/tvix/store/src/blobservice/memory.rs
@@ -1,51 +1,80 @@
 use data_encoding::BASE64;
+use std::io::Cursor;
 use std::{
     collections::HashMap,
     sync::{Arc, RwLock},
 };
-use tracing::instrument;
+use tracing::{instrument, warn};
 
-use crate::{proto, Error};
+use super::{BlobService, BlobWriter};
+use crate::Error;
 
-use super::BlobService;
+// type B3Digest = [u8; 32];
+// struct B3Digest ([u8; 32]);
 
 #[derive(Clone, Default)]
 pub struct MemoryBlobService {
-    db: Arc<RwLock<HashMap<Vec<u8>, proto::BlobMeta>>>,
+    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
 }
 
 impl BlobService for MemoryBlobService {
-    #[instrument(skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))]
-    fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> {
-        if req.include_bao {
-            todo!("not implemented yet")
-        }
+    type BlobReader = Cursor<Vec<u8>>;
+    type BlobWriter = MemoryBlobWriter;
 
+    #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))]
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
         let db = self.db.read().unwrap();
-        // if include_chunks is also false, the user only wants to know if the
-        // blob is present at all.
-        if !req.include_chunks {
-            Ok(if db.contains_key(&req.digest) {
-                Some(proto::BlobMeta::default())
-            } else {
-                None
-            })
-        } else {
-            match db.get(&req.digest) {
-                None => Ok(None),
-                Some(blob_meta) => Ok(Some(blob_meta.clone())),
-            }
+        Ok(db.contains_key(digest))
+    }
+
+    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
+        let db = self.db.read().unwrap();
+
+        Ok(db.get(digest).map(|x| Cursor::new(x.clone())))
+    }
+
+    #[instrument(skip(self))]
+    fn open_write(&self) -> Result<Self::BlobWriter, Error> {
+        Ok(MemoryBlobWriter::new(self.db.clone()))
+    }
+}
+
+pub struct MemoryBlobWriter {
+    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
+
+    buf: Vec<u8>,
+}
+
+impl MemoryBlobWriter {
+    fn new(db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>) -> Self {
+        Self {
+            buf: Vec::new(),
+            db,
         }
     }
+}
+impl std::io::Write for MemoryBlobWriter {
+    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+        self.buf.write(buf)
+    }
+
+    fn flush(&mut self) -> std::io::Result<()> {
+        self.buf.flush()
+    }
+}
 
-    #[instrument(skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))]
-    fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> {
-        let mut db = self.db.write().unwrap();
+impl BlobWriter for MemoryBlobWriter {
+    fn close(self) -> Result<[u8; 32], Error> {
+        // in this memory implementation, we don't actually bother hashing
+        // incrementally while writing, but do it at the end.
+        let mut hasher = blake3::Hasher::new();
+        hasher.update(&self.buf);
+        let digest: [u8; 32] = hasher.finalize().into();
 
-        db.insert(blob_digest.to_vec(), blob_meta);
+        // open the database for writing.
+        let mut db = self.db.write()?;
+        db.insert(digest, self.buf);
 
-        Ok(())
-        // TODO: make sure all callers make sure the chunks exist.
-        // TODO: where should we calculate the bao?
+        Ok(digest)
     }
 }
diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs
index 53e941795e7e..e97ccdf335a0 100644
--- a/tvix/store/src/blobservice/mod.rs
+++ b/tvix/store/src/blobservice/mod.rs
@@ -1,4 +1,6 @@
-use crate::{proto, Error};
+use std::io;
+
+use crate::Error;
 
 mod memory;
 mod sled;
@@ -7,14 +9,31 @@ pub use self::memory::MemoryBlobService;
 pub use self::sled::SledBlobService;
 
 /// The base trait all BlobService services need to implement.
-/// It provides information about how a blob is chunked,
-/// and allows creating new blobs by creating a BlobMeta (referring to chunks
-/// in a [crate::chunkservice::ChunkService]).
+/// It provides functions to check whether a given blob exists,
+/// a way to get a [io::Read] to a blob, and a method to initiate writing a new
+/// Blob, which returns a [BlobWriter], that can be used
 pub trait BlobService {
-    /// Retrieve chunking information for a given blob
-    fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error>;
+    type BlobReader: io::Read + Send + std::marker::Unpin;
+    type BlobWriter: BlobWriter + Send;
+
+    /// Check if the service has the blob, by its content hash.
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
+
+    /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>.
+    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error>;
+
+    /// Insert a new blob into the store. Returns a [BlobWriter], which
+    /// implements [io::Write] and a [BlobWriter::close].
+    /// TODO: is there any reason we want this to be a Result<>, and not just T?
+    fn open_write(&self) -> Result<Self::BlobWriter, Error>;
+}
 
-    /// Insert chunking information for a given blob.
-    /// Implementations SHOULD make sure chunks referred do exist.
-    fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error>;
+/// A [io::Write] that you need to close() afterwards, and get back the digest
+/// of the written blob.
+pub trait BlobWriter: io::Write {
+    /// Signal there's no more data to be written, and return the digest of the
+    /// contents written.
+    ///
+    /// This consumes self, so it's not possible to close twice.
+    fn close(self) -> Result<[u8; 32], Error>;
 }
diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs
index 1ae34ee5fb9c..c229f799de98 100644
--- a/tvix/store/src/blobservice/sled.rs
+++ b/tvix/store/src/blobservice/sled.rs
@@ -1,13 +1,13 @@
-use std::path::PathBuf;
+use std::{
+    io::{self, Cursor},
+    path::PathBuf,
+};
 
+use super::{BlobService, BlobWriter};
+use crate::Error;
 use data_encoding::BASE64;
-use prost::Message;
 use tracing::instrument;
 
-use crate::{proto, Error};
-
-use super::BlobService;
-
 #[derive(Clone)]
 pub struct SledBlobService {
     db: sled::Db,
@@ -30,44 +30,69 @@ impl SledBlobService {
 }
 
 impl BlobService for SledBlobService {
-    #[instrument(name = "SledBlobService::stat", skip(self, req), fields(blob.digest=BASE64.encode(&req.digest)))]
-    fn stat(&self, req: &proto::StatBlobRequest) -> Result<Option<proto::BlobMeta>, Error> {
-        if req.include_bao {
-            todo!("not implemented yet")
+    type BlobReader = Cursor<Vec<u8>>;
+    type BlobWriter = SledBlobWriter;
+
+    #[instrument(name = "SledBlobService::has", skip(self), fields(blob.digest=BASE64.encode(digest)))]
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
+        match self.db.contains_key(digest) {
+            Ok(has) => Ok(has),
+            Err(e) => Err(Error::StorageError(e.to_string())),
         }
+    }
 
-        // if include_chunks is also false, the user only wants to know if the
-        // blob is present at all.
-        if !req.include_chunks {
-            match self.db.contains_key(&req.digest) {
-                Ok(false) => Ok(None),
-                Ok(true) => Ok(Some(proto::BlobMeta::default())),
-                Err(e) => Err(Error::StorageError(e.to_string())),
-            }
-        } else {
-            match self.db.get(&req.digest) {
-                Ok(None) => Ok(None),
-                Ok(Some(data)) => match proto::BlobMeta::decode(&*data) {
-                    Ok(blob_meta) => Ok(Some(blob_meta)),
-                    Err(e) => Err(Error::StorageError(format!(
-                        "unable to parse blobmeta message for blob {}: {}",
-                        BASE64.encode(&req.digest),
-                        e
-                    ))),
-                },
-                Err(e) => Err(Error::StorageError(e.to_string())),
-            }
+    #[instrument(name = "SledBlobService::open_read", skip(self), fields(blob.digest=BASE64.encode(digest)))]
+    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
+        match self.db.get(digest) {
+            Ok(None) => Ok(None),
+            Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))),
+            Err(e) => Err(Error::StorageError(e.to_string())),
         }
     }
 
-    #[instrument(name = "SledBlobService::put", skip(self, blob_meta, blob_digest), fields(blob.digest = BASE64.encode(blob_digest)))]
-    fn put(&self, blob_digest: &[u8], blob_meta: proto::BlobMeta) -> Result<(), Error> {
-        let result = self.db.insert(blob_digest, blob_meta.encode_to_vec());
-        if let Err(e) = result {
-            return Err(Error::StorageError(e.to_string()));
+    #[instrument(name = "SledBlobService::open_write", skip(self))]
+    fn open_write(&self) -> Result<Self::BlobWriter, Error> {
+        Ok(SledBlobWriter::new(self.db.clone()))
+    }
+}
+
+pub struct SledBlobWriter {
+    db: sled::Db,
+    buf: Vec<u8>,
+    hasher: blake3::Hasher,
+}
+
+impl SledBlobWriter {
+    pub fn new(db: sled::Db) -> Self {
+        Self {
+            buf: Vec::default(),
+            db,
+            hasher: blake3::Hasher::new(),
         }
-        Ok(())
-        // TODO: make sure all callers make sure the chunks exist.
-        // TODO: where should we calculate the bao?
+    }
+}
+
+impl io::Write for SledBlobWriter {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        let bytes_written = self.buf.write(buf)?;
+        self.hasher.write(&buf[..bytes_written])
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        self.buf.flush()
+    }
+}
+
+impl BlobWriter for SledBlobWriter {
+    fn close(self) -> Result<[u8; 32], Error> {
+        let digest = self.hasher.finalize();
+        self.db.insert(digest.as_bytes(), self.buf).map_err(|e| {
+            Error::StorageError(format!("unable to insert blob: {}", e.to_string()))
+        })?;
+
+        Ok(digest
+            .to_owned()
+            .try_into()
+            .map_err(|_| Error::StorageError("invalid digest length in response".to_string()))?)
     }
 }
diff --git a/tvix/store/src/chunkservice/memory.rs b/tvix/store/src/chunkservice/memory.rs
deleted file mode 100644
index 9fe4dc17d5ec..000000000000
--- a/tvix/store/src/chunkservice/memory.rs
+++ /dev/null
@@ -1,53 +0,0 @@
-use data_encoding::BASE64;
-use std::{
-    collections::HashMap,
-    sync::{Arc, RwLock},
-};
-use tracing::instrument;
-
-use crate::Error;
-
-use super::ChunkService;
-
-#[derive(Clone, Default)]
-pub struct MemoryChunkService {
-    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
-}
-
-impl ChunkService for MemoryChunkService {
-    #[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
-        let db = self.db.read().unwrap();
-        Ok(db.get(digest).is_some())
-    }
-
-    #[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))]
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
-        let db = self.db.read().unwrap();
-        match db.get(digest) {
-            None => Ok(None),
-            Some(data) => {
-                // calculate the hash to verify this is really what we expect
-                let actual_digest = blake3::hash(data).as_bytes().to_vec();
-                if actual_digest != digest {
-                    return Err(Error::StorageError(format!(
-                        "invalid hash encountered when reading chunk, expected {}, got {}",
-                        BASE64.encode(digest),
-                        BASE64.encode(&actual_digest),
-                    )));
-                }
-                Ok(Some(data.clone()))
-            }
-        }
-    }
-
-    #[instrument(skip(self, data))]
-    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
-        let digest = blake3::hash(&data);
-
-        let mut db = self.db.write().unwrap();
-        db.insert(*digest.as_bytes(), data);
-
-        Ok(*digest.as_bytes())
-    }
-}
diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs
deleted file mode 100644
index faf0a88f151a..000000000000
--- a/tvix/store/src/chunkservice/mod.rs
+++ /dev/null
@@ -1,28 +0,0 @@
-mod util;
-
-pub mod memory;
-pub mod sled;
-
-use crate::Error;
-
-pub use self::memory::MemoryChunkService;
-pub use self::sled::SledChunkService;
-pub use self::util::read_all_and_chunk;
-pub use self::util::update_hasher;
-pub use self::util::upload_chunk;
-
-/// The base trait all ChunkService services need to implement.
-/// It allows checking for the existence, download and upload of chunks.
-/// It's usually used after consulting a [crate::blobservice::BlobService] for
-/// chunking information.
-pub trait ChunkService {
-    /// check if the service has a chunk, given by its digest.
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
-
-    /// retrieve a chunk by its digest. Implementations MUST validate the digest
-    /// matches.
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error>;
-
-    /// insert a chunk. returns the digest of the chunk, or an error.
-    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error>;
-}
diff --git a/tvix/store/src/chunkservice/sled.rs b/tvix/store/src/chunkservice/sled.rs
deleted file mode 100644
index 8e86e1825b28..000000000000
--- a/tvix/store/src/chunkservice/sled.rs
+++ /dev/null
@@ -1,70 +0,0 @@
-use std::path::PathBuf;
-
-use data_encoding::BASE64;
-use tracing::instrument;
-
-use crate::Error;
-
-use super::ChunkService;
-
-#[derive(Clone)]
-pub struct SledChunkService {
-    db: sled::Db,
-}
-
-impl SledChunkService {
-    pub fn new(p: PathBuf) -> Result<Self, sled::Error> {
-        let config = sled::Config::default().use_compression(true).path(p);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-
-    pub fn new_temporary() -> Result<Self, sled::Error> {
-        let config = sled::Config::default().temporary(true);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-}
-
-impl ChunkService for SledChunkService {
-    #[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
-        match self.db.get(digest) {
-            Ok(None) => Ok(false),
-            Ok(Some(_)) => Ok(true),
-            Err(e) => Err(Error::StorageError(e.to_string())),
-        }
-    }
-
-    #[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))]
-    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
-        match self.db.get(digest) {
-            Ok(None) => Ok(None),
-            Ok(Some(data)) => {
-                // calculate the hash to verify this is really what we expect
-                let actual_digest = blake3::hash(&data).as_bytes().to_vec();
-                if actual_digest != digest {
-                    return Err(Error::StorageError(format!(
-                        "invalid hash encountered when reading chunk, expected {}, got {}",
-                        BASE64.encode(digest),
-                        BASE64.encode(&actual_digest),
-                    )));
-                }
-                Ok(Some(Vec::from(&*data)))
-            }
-            Err(e) => Err(Error::StorageError(e.to_string())),
-        }
-    }
-
-    #[instrument(name = "SledChunkService::put", skip(self, data))]
-    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
-        let digest = blake3::hash(&data);
-        let result = self.db.insert(digest.as_bytes(), data);
-        if let Err(e) = result {
-            return Err(Error::StorageError(e.to_string()));
-        }
-        Ok(*digest.as_bytes())
-    }
-}
diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs
deleted file mode 100644
index 2ad663733b04..000000000000
--- a/tvix/store/src/chunkservice/util.rs
+++ /dev/null
@@ -1,85 +0,0 @@
-use crate::{proto, Error};
-use std::io::Read;
-use tracing::{debug, instrument};
-
-use super::ChunkService;
-
-/// uploads a chunk to a chunk service, and returns its digest (or an error) when done.
-#[instrument(skip_all, err)]
-pub fn upload_chunk<CS: ChunkService>(
-    chunk_service: &CS,
-    chunk_data: Vec<u8>,
-) -> Result<[u8; 32], Error> {
-    let mut hasher = blake3::Hasher::new();
-    update_hasher(&mut hasher, &chunk_data);
-    let digest = hasher.finalize();
-
-    if chunk_service.has(digest.as_bytes())? {
-        debug!("already has chunk, skipping");
-    }
-    let digest_resp = chunk_service.put(chunk_data)?;
-
-    assert_eq!(&digest_resp, digest.as_bytes());
-
-    Ok(*digest.as_bytes())
-}
-
-/// reads through a reader, writes chunks to a [ChunkService] and returns a
-/// [proto::BlobMeta] pointing to all the chunks.
-#[instrument(skip_all, err)]
-pub fn read_all_and_chunk<CS: ChunkService, R: Read>(
-    chunk_service: &CS,
-    r: R,
-) -> Result<(Vec<u8>, proto::BlobMeta), Error> {
-    let mut blob_meta = proto::BlobMeta::default();
-
-    // hash the file contents, upload chunks if not there yet
-    let mut blob_hasher = blake3::Hasher::new();
-
-    // TODO: play with chunking sizes
-    let chunker_avg_size = 64 * 1024;
-    let chunker_min_size = chunker_avg_size / 4;
-    let chunker_max_size = chunker_avg_size * 4;
-
-    let chunker =
-        fastcdc::v2020::StreamCDC::new(r, chunker_min_size, chunker_avg_size, chunker_max_size);
-
-    for chunking_result in chunker {
-        let chunk = chunking_result.unwrap();
-        // TODO: convert to error::UnableToRead
-
-        let chunk_len = chunk.data.len() as u32;
-
-        // update calculate blob hash
-        update_hasher(&mut blob_hasher, &chunk.data);
-
-        let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
-
-        blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
-            digest: chunk_digest.to_vec(),
-            size: chunk_len,
-        });
-    }
-    Ok((blob_hasher.finalize().as_bytes().to_vec(), blob_meta))
-}
-
-/// updates a given hasher with more data. Uses rayon if the data is
-/// sufficiently big.
-///
-/// From the docs:
-///
-/// To get any performance benefit from multithreading, the input buffer needs
-/// to be large. As a rule of thumb on x86_64, update_rayon is slower than
-/// update for inputs under 128 KiB. That threshold varies quite a lot across
-/// different processors, and it’s important to benchmark your specific use
-/// case.
-///
-/// We didn't benchmark yet, so these numbers might need tweaking.
-#[instrument(skip_all)]
-pub fn update_hasher(hasher: &mut blake3::Hasher, data: &[u8]) {
-    if data.len() > 128 * 1024 {
-        hasher.update_rayon(data);
-    } else {
-        hasher.update(data);
-    }
-}
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index e62097ec468d..bf80eb4b71b9 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -1,19 +1,17 @@
-use crate::{chunkservice::read_all_and_chunk, directoryservice::DirectoryPutter, proto};
+use crate::{blobservice::BlobService, directoryservice::DirectoryService};
+use crate::{blobservice::BlobWriter, directoryservice::DirectoryPutter, proto};
 use std::{
     collections::HashMap,
     fmt::Debug,
     fs,
     fs::File,
+    io,
     os::unix::prelude::PermissionsExt,
     path::{Path, PathBuf},
 };
 use tracing::instrument;
 use walkdir::WalkDir;
 
-use crate::{
-    blobservice::BlobService, chunkservice::ChunkService, directoryservice::DirectoryService,
-};
-
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
     #[error("failed to upload directory at {0}: {1}")]
@@ -57,9 +55,8 @@ impl From<super::Error> for Error {
 //
 // It assumes the caller adds returned nodes to the directories it assembles.
 #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
-fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: DirectoryPutter>(
+fn process_entry<BS: BlobService, DP: DirectoryPutter>(
     blob_service: &mut BS,
-    chunk_service: &mut CS,
     directory_putter: &mut DP,
     entry: &walkdir::DirEntry,
     maybe_directory: Option<proto::Directory>,
@@ -112,23 +109,16 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire
             .metadata()
             .map_err(|e| Error::UnableToStat(entry_path.clone(), e.into()))?;
 
-        let file = File::open(entry_path.clone())
+        let mut file = File::open(entry_path.clone())
             .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?;
 
-        let (blob_digest, blob_meta) = read_all_and_chunk(chunk_service, file)?;
-
-        // upload blobmeta if not there yet
-        if blob_service
-            .stat(&proto::StatBlobRequest {
-                digest: blob_digest.to_vec(),
-                include_chunks: false,
-                include_bao: false,
-            })?
-            .is_none()
-        {
-            // upload blobmeta
-            blob_service.put(&blob_digest, blob_meta)?;
-        }
+        let mut writer = blob_service.open_write()?;
+
+        if let Err(e) = io::copy(&mut file, &mut writer) {
+            return Err(Error::UnableToRead(entry_path, e));
+        };
+
+        let digest = writer.close()?;
 
         return Ok(proto::node::Node::File(proto::FileNode {
             name: entry
@@ -136,7 +126,7 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire
                 .to_str()
                 .map(|s| Ok(s.to_owned()))
                 .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
-            digest: blob_digest,
+            digest: digest.to_vec(),
             size: metadata.len() as u32,
             // If it's executable by the user, it'll become executable.
             // This matches nix's dump() function behaviour.
@@ -152,15 +142,9 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire
 /// to the PathInfoService.
 //
 // returns the root node, or an error.
-#[instrument(skip(blob_service, chunk_service, directory_service), fields(path=?p))]
-pub fn import_path<
-    BS: BlobService,
-    CS: ChunkService + std::marker::Sync,
-    DS: DirectoryService,
-    P: AsRef<Path> + Debug,
->(
+#[instrument(skip(blob_service, directory_service), fields(path=?p))]
+pub fn import_path<BS: BlobService, DS: DirectoryService, P: AsRef<Path> + Debug>(
     blob_service: &mut BS,
-    chunk_service: &mut CS,
     directory_service: &mut DS,
     p: P,
 ) -> Result<proto::node::Node, Error> {
@@ -212,13 +196,7 @@ pub fn import_path<
             }
         };
 
-        let node = process_entry(
-            blob_service,
-            chunk_service,
-            &mut directory_putter,
-            &entry,
-            maybe_directory,
-        )?;
+        let node = process_entry(blob_service, &mut directory_putter, &entry, maybe_directory)?;
 
         if entry.depth() == 0 {
             return Ok(node);
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index aac650b1ca8b..3ce826f98ba6 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -1,15 +1,12 @@
-mod blobreader;
 mod errors;
 
 pub mod blobservice;
-pub mod chunkservice;
 pub mod directoryservice;
 pub mod import;
 pub mod nar;
 pub mod pathinfoservice;
 pub mod proto;
 
-pub use blobreader::BlobReader;
 pub use errors::Error;
 
 #[cfg(test)]
diff --git a/tvix/store/src/nar/non_caching_calculation_service.rs b/tvix/store/src/nar/non_caching_calculation_service.rs
index f77f0b30d61f..94dd51bc6a7f 100644
--- a/tvix/store/src/nar/non_caching_calculation_service.rs
+++ b/tvix/store/src/nar/non_caching_calculation_service.rs
@@ -2,7 +2,6 @@ use count_write::CountWrite;
 use sha2::{Digest, Sha256};
 
 use crate::blobservice::BlobService;
-use crate::chunkservice::ChunkService;
 use crate::directoryservice::DirectoryService;
 use crate::proto;
 
@@ -12,26 +11,20 @@ use super::{NARCalculationService, RenderError};
 /// A NAR calculation service which simply renders the whole NAR whenever
 /// we ask for the calculation.
 #[derive(Clone)]
-pub struct NonCachingNARCalculationService<
-    BS: BlobService,
-    CS: ChunkService + Clone,
-    DS: DirectoryService,
-> {
-    nar_renderer: NARRenderer<BS, CS, DS>,
+pub struct NonCachingNARCalculationService<BS: BlobService, DS: DirectoryService> {
+    nar_renderer: NARRenderer<BS, DS>,
 }
 
-impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService>
-    NonCachingNARCalculationService<BS, CS, DS>
-{
-    pub fn new(blob_service: BS, chunk_service: CS, directory_service: DS) -> Self {
+impl<BS: BlobService, DS: DirectoryService> NonCachingNARCalculationService<BS, DS> {
+    pub fn new(blob_service: BS, directory_service: DS) -> Self {
         Self {
-            nar_renderer: NARRenderer::new(blob_service, chunk_service, directory_service),
+            nar_renderer: NARRenderer::new(blob_service, directory_service),
         }
     }
 }
 
-impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARCalculationService
-    for NonCachingNARCalculationService<BS, CS, DS>
+impl<BS: BlobService, DS: DirectoryService> NARCalculationService
+    for NonCachingNARCalculationService<BS, DS>
 {
     fn calculate_nar(
         &self,
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index a061dad9bb35..b080a713ec0a 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -1,10 +1,11 @@
+use std::io::{self, BufReader};
+
 use crate::{
     blobservice::BlobService,
-    chunkservice::ChunkService,
     directoryservice::DirectoryService,
     proto::{self, NamedNode},
-    BlobReader,
 };
+use data_encoding::BASE64;
 use nix_compat::nar;
 
 use super::RenderError;
@@ -12,17 +13,15 @@ use super::RenderError;
 /// A NAR renderer, using a blob_service, chunk_service and directory_service
 /// to render a NAR to a writer.
 #[derive(Clone)]
-pub struct NARRenderer<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> {
+pub struct NARRenderer<BS: BlobService, DS: DirectoryService> {
     blob_service: BS,
-    chunk_service: CS,
     directory_service: DS,
 }
 
-impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARRenderer<BS, CS, DS> {
-    pub fn new(blob_service: BS, chunk_service: CS, directory_service: DS) -> Self {
+impl<BS: BlobService, DS: DirectoryService> NARRenderer<BS, DS> {
+    pub fn new(blob_service: BS, directory_service: DS) -> Self {
         Self {
             blob_service,
-            chunk_service,
             directory_service,
         }
     }
@@ -65,49 +64,22 @@ impl<BS: BlobService, CS: ChunkService + Clone, DS: DirectoryService> NARRendere
                         ))
                     })?;
 
-                // query blob_service for blob_meta
-                let resp = self
-                    .blob_service
-                    .stat(&proto::StatBlobRequest {
-                        digest: digest.to_vec(),
-                        include_chunks: true,
-                        ..Default::default()
-                    })
-                    .map_err(RenderError::StoreError)?;
+                // TODO: handle error
+                let mut blob_reader = match self.blob_service.open_read(&digest).unwrap() {
+                    Some(blob_reader) => Ok(BufReader::new(blob_reader)),
+                    None => Err(RenderError::NARWriterError(io::Error::new(
+                        io::ErrorKind::NotFound,
+                        format!("blob with digest {} not found", BASE64.encode(&digest)),
+                    ))),
+                }?;
 
-                match resp {
-                    // if it's None, that's an error!
-                    None => {
-                        return Err(RenderError::BlobNotFound(
-                            digest.to_vec(),
-                            proto_file_node.name.to_owned(),
-                        ));
-                    }
-                    Some(blob_meta) => {
-                        // make sure the blob_meta size matches what we expect from proto_file_node
-                        let blob_meta_size = blob_meta.chunks.iter().fold(0, |acc, e| acc + e.size);
-                        if blob_meta_size != proto_file_node.size {
-                            return Err(RenderError::UnexpectedBlobMeta(
-                                digest.to_vec(),
-                                proto_file_node.name.to_owned(),
-                                proto_file_node.size,
-                                blob_meta_size,
-                            ));
-                        }
-
-                        let mut blob_reader = std::io::BufReader::new(BlobReader::open(
-                            &self.chunk_service,
-                            blob_meta,
-                        ));
-                        nar_node
-                            .file(
-                                proto_file_node.executable,
-                                proto_file_node.size.into(),
-                                &mut blob_reader,
-                            )
-                            .map_err(RenderError::NARWriterError)?;
-                    }
-                }
+                nar_node
+                    .file(
+                        proto_file_node.executable,
+                        proto_file_node.size.into(),
+                        &mut blob_reader,
+                    )
+                    .map_err(RenderError::NARWriterError)?;
             }
             proto::node::Node::Directory(proto_directory_node) => {
                 let digest: [u8; 32] =
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index e891caa0268e..d1e98a5b7953 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,54 +1,34 @@
-use std::collections::VecDeque;
-
 use crate::{
-    blobservice::BlobService,
-    chunkservice::{read_all_and_chunk, update_hasher, ChunkService},
-    Error,
+    blobservice::{BlobService, BlobWriter},
+    proto::sync_read_into_async_read::SyncReadIntoAsyncRead,
 };
 use data_encoding::BASE64;
-use tokio::{sync::mpsc::channel, task};
-use tokio_stream::{wrappers::ReceiverStream, StreamExt};
+use std::{collections::VecDeque, io, pin::Pin};
+use tokio::task;
+use tokio_stream::StreamExt;
+use tokio_util::io::ReaderStream;
 use tonic::{async_trait, Request, Response, Status, Streaming};
-use tracing::{debug, instrument, warn};
+use tracing::{instrument, warn};
 
-pub struct GRPCBlobServiceWrapper<BS: BlobService, CS: ChunkService> {
+pub struct GRPCBlobServiceWrapper<BS: BlobService> {
     blob_service: BS,
-    chunk_service: CS,
 }
 
-impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
-    pub fn new(blob_service: BS, chunk_service: CS) -> Self {
+impl<BS: BlobService> From<BS> for GRPCBlobServiceWrapper<BS> {
+    fn from(value: BS) -> Self {
         Self {
-            blob_service,
-            chunk_service,
-        }
-    }
-
-    // upload the chunk to the chunk service, and return its digest (or an error) when done.
-    #[instrument(skip(chunk_service))]
-    fn upload_chunk(chunk_service: CS, chunk_data: Vec<u8>) -> Result<Vec<u8>, Error> {
-        let mut hasher = blake3::Hasher::new();
-        update_hasher(&mut hasher, &chunk_data);
-        let digest = hasher.finalize();
-
-        if chunk_service.has(digest.as_bytes())? {
-            debug!("already has chunk, skipping");
+            blob_service: value,
         }
-        let digest_resp = chunk_service.put(chunk_data)?;
-
-        assert_eq!(&digest_resp, digest.as_bytes());
-
-        Ok(digest.as_bytes().to_vec())
     }
 }
 
 #[async_trait]
-impl<
-        BS: BlobService + Send + Sync + Clone + 'static,
-        CS: ChunkService + Send + Sync + Clone + 'static,
-    > super::blob_service_server::BlobService for GRPCBlobServiceWrapper<BS, CS>
+impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server::BlobService
+    for GRPCBlobServiceWrapper<BS>
 {
-    type ReadStream = ReceiverStream<Result<super::BlobChunk, Status>>;
+    // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
+    type ReadStream =
+        Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>;
 
     #[instrument(skip(self))]
     async fn stat(
@@ -56,12 +36,22 @@ impl<
         request: Request<super::StatBlobRequest>,
     ) -> Result<Response<super::BlobMeta>, Status> {
         let rq = request.into_inner();
-        match self.blob_service.stat(&rq) {
-            Ok(None) => Err(Status::not_found(format!(
+        let req_digest: [u8; 32] = rq
+            .digest
+            .clone()
+            .try_into()
+            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
+
+        if rq.include_chunks || rq.include_bao {
+            return Err(Status::internal("not implemented"));
+        }
+
+        match self.blob_service.has(&req_digest) {
+            Ok(true) => Ok(Response::new(super::BlobMeta::default())),
+            Ok(false) => Err(Status::not_found(format!(
                 "blob {} not found",
-                BASE64.encode(&rq.digest)
+                BASE64.encode(&req_digest)
             ))),
-            Ok(Some(blob_meta)) => Ok(Response::new(blob_meta)),
             Err(e) => Err(e.into()),
         }
     }
@@ -71,99 +61,38 @@ impl<
         &self,
         request: Request<super::ReadBlobRequest>,
     ) -> Result<Response<Self::ReadStream>, Status> {
-        let req = request.into_inner();
-        let (tx, rx) = channel(5);
+        let rq = request.into_inner();
 
-        let req_digest: [u8; 32] = req
+        let req_digest: [u8; 32] = rq
             .digest
+            .clone()
             .try_into()
-            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
-
-        // query the blob service for more detailed blob info
-        let stat_resp = self.blob_service.stat(&super::StatBlobRequest {
-            digest: req_digest.to_vec(),
-            include_chunks: true,
-            ..Default::default()
-        })?;
-
-        match stat_resp {
-            None => {
-                // If the stat didn't return any blobmeta, the client might
-                // still have asked for a single chunk to be read.
-                // Check the chunkstore.
-                if let Some(data) = self.chunk_service.get(&req_digest)? {
-                    // We already know the hash matches, and contrary to
-                    // iterating over a blobmeta, we can't know the size,
-                    // so send the contents of that chunk over,
-                    // as the first (and only) element of the stream.
-                    task::spawn(async move {
-                        let res = Ok(super::BlobChunk { data });
-                        // send the result to the client. If the client already left, that's also fine.
-                        if (tx.send(res).await).is_err() {
-                            debug!("receiver dropped");
-                        }
-                    });
-                } else {
-                    return Err(Status::not_found(format!(
-                        "blob {} not found",
-                        BASE64.encode(&req_digest),
-                    )));
-                }
-            }
-            Some(blobmeta) => {
-                let chunk_client = self.chunk_service.clone();
-
-                // TODO: use BlobReader?
-                // But then we might not be able to send compressed chunks as-is.
-                // Might require implementing https://docs.rs/futures-core/latest/futures_core/stream/trait.Stream.html for it
-                // first, so we can .next().await in here.
-
-                task::spawn(async move {
-                    for chunkmeta in blobmeta.chunks {
-                        // request chunk.
-                        // We don't need to validate the digest again, as
-                        // that's required for all implementations of ChunkService.
-                        // TODO: handle error
-                        let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap();
-                        let res = match chunk_client.get(chunkmeta_digest) {
-                            Err(e) => Err(e.into()),
-                            // TODO: make this a separate error type
-                            Ok(None) => Err(Error::StorageError(format!(
-                                "consistency error: chunk {} for blob {} not found",
-                                BASE64.encode(chunkmeta_digest),
-                                BASE64.encode(&req_digest),
-                            ))
-                            .into()),
-                            Ok(Some(data)) => {
-                                // We already know the hash matches, but also
-                                // check the size matches what chunkmeta said.
-                                if data.len() as u32 != chunkmeta.size {
-                                    Err(Error::StorageError(format!(
-                                        "consistency error: chunk {} for blob {} has wrong size, expected {}, got {}",
-                                        BASE64.encode(chunkmeta_digest),
-                                        BASE64.encode(&req_digest),
-                                        chunkmeta.size,
-                                        data.len(),
-                                    )).into())
-                                } else {
-                                    // send out the current chunk
-                                    // TODO: we might want to break this up further if too big?
-                                    Ok(super::BlobChunk { data })
-                                }
-                            }
-                        };
-                        // send the result to the client
-                        if (tx.send(res).await).is_err() {
-                            debug!("receiver dropped");
-                            break;
-                        }
+            .map_err(|_| Status::invalid_argument("invalid digest length"))?;
+
+        match self.blob_service.open_read(&req_digest) {
+            Ok(Some(reader)) => {
+                let async_reader: SyncReadIntoAsyncRead<_, bytes::BytesMut> = reader.into();
+
+                fn stream_mapper(
+                    x: Result<bytes::Bytes, io::Error>,
+                ) -> Result<super::BlobChunk, Status> {
+                    match x {
+                        Ok(bytes) => Ok(super::BlobChunk {
+                            data: bytes.to_vec(),
+                        }),
+                        Err(e) => Err(Status::from(e)),
                     }
-                });
+                }
+
+                let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper);
+                Ok(Response::new(Box::pin(chunks_stream)))
             }
+            Ok(None) => Err(Status::not_found(format!(
+                "blob {} not found",
+                BASE64.encode(&rq.digest)
+            ))),
+            Err(e) => Err(e.into()),
         }
-
-        let receiver_stream = ReceiverStream::new(rx);
-        Ok(Response::new(receiver_stream))
     }
 
     #[instrument(skip(self))]
@@ -180,37 +109,34 @@ impl<
 
         let data_reader = tokio_util::io::StreamReader::new(data_stream);
 
-        // TODO: can we get rid of this clone?
-        let chunk_service = self.chunk_service.clone();
-
-        let (blob_digest, blob_meta) =
-            task::spawn_blocking(move || -> Result<(Vec<u8>, super::BlobMeta), Error> {
-                // feed read_all_and_chunk a (sync) reader to the data retrieved from the stream.
-                read_all_and_chunk(
-                    &chunk_service,
-                    tokio_util::io::SyncIoBridge::new(data_reader),
-                )
-            })
-            .await
-            .map_err(|e| Status::internal(e.to_string()))??;
-
-        // upload blobmeta if not there yet
-        if self
+        // prepare a writer, which we'll use in the blocking task below.
+        let mut writer = self
             .blob_service
-            .stat(&super::StatBlobRequest {
-                digest: blob_digest.to_vec(),
-                include_chunks: false,
-                include_bao: false,
-            })?
-            .is_none()
-        {
-            // upload blobmeta
-            self.blob_service.put(&blob_digest, blob_meta)?;
-        }
-
-        // return to client.
-        Ok(Response::new(super::PutBlobResponse {
-            digest: blob_digest,
-        }))
+            .open_write()
+            .map_err(|e| Status::internal(format!("unable to open for write: {}", e)))?;
+
+        let result = task::spawn_blocking(move || -> Result<super::PutBlobResponse, Status> {
+            // construct a sync reader to the data
+            let mut reader = tokio_util::io::SyncIoBridge::new(data_reader);
+
+            io::copy(&mut reader, &mut writer).map_err(|e| {
+                warn!("error copying: {}", e);
+                Status::internal("error copying")
+            })?;
+
+            let digest = writer
+                .close()
+                .map_err(|e| {
+                    warn!("error closing stream: {}", e);
+                    Status::internal("error closing stream")
+                })?
+                .to_vec();
+
+            Ok(super::PutBlobResponse { digest })
+        })
+        .await
+        .map_err(|_| Status::internal("failed to wait for task"))??;
+
+        Ok(Response::new(result))
     }
 }
diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs
index 5002d7e77a96..62741a3ad508 100644
--- a/tvix/store/src/proto/mod.rs
+++ b/tvix/store/src/proto/mod.rs
@@ -11,6 +11,8 @@ mod grpc_blobservice_wrapper;
 mod grpc_directoryservice_wrapper;
 mod grpc_pathinfoservice_wrapper;
 
+mod sync_read_into_async_read;
+
 pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper;
 pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper;
 pub use grpc_pathinfoservice_wrapper::GRPCPathInfoServiceWrapper;
diff --git a/tvix/store/src/proto/sync_read_into_async_read.rs b/tvix/store/src/proto/sync_read_into_async_read.rs
new file mode 100644
index 000000000000..0a0ef019781c
--- /dev/null
+++ b/tvix/store/src/proto/sync_read_into_async_read.rs
@@ -0,0 +1,158 @@
+use bytes::Buf;
+use core::task::Poll::Ready;
+use futures::ready;
+use futures::Future;
+use std::io;
+use std::io::Read;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+use tokio::io::AsyncRead;
+use tokio::runtime::Handle;
+use tokio::sync::Mutex;
+use tokio::task::JoinHandle;
+
+#[derive(Debug)]
+enum State<Buf: bytes::Buf + bytes::BufMut> {
+    Idle(Option<Buf>),
+    Busy(JoinHandle<(io::Result<usize>, Buf)>),
+}
+
+use State::{Busy, Idle};
+
+/// Use a [`SyncReadIntoAsyncRead`] to asynchronously read from a
+/// synchronous API.
+#[derive(Debug)]
+pub struct SyncReadIntoAsyncRead<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> {
+    state: Mutex<State<Buf>>,
+    reader: Arc<Mutex<R>>,
+    rt: Handle,
+}
+
+impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> SyncReadIntoAsyncRead<R, Buf> {
+    /// This must be called from within a Tokio runtime context, or else it will panic.
+    #[track_caller]
+    pub fn new(rt: Handle, reader: R) -> Self {
+        Self {
+            rt,
+            state: State::Idle(None).into(),
+            reader: Arc::new(reader.into()),
+        }
+    }
+
+    /// This must be called from within a Tokio runtime context, or else it will panic.
+    pub fn new_with_reader(readable: R) -> Self {
+        Self::new(Handle::current(), readable)
+    }
+}
+
+/// Repeats operations that are interrupted.
+macro_rules! uninterruptibly {
+    ($e:expr) => {{
+        loop {
+            match $e {
+                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
+                res => break res,
+            }
+        }
+    }};
+}
+
+impl<
+        R: Read + Send + 'static + std::marker::Unpin,
+        Buf: bytes::Buf + bytes::BufMut + Send + Default + std::marker::Unpin + 'static,
+    > AsyncRead for SyncReadIntoAsyncRead<R, Buf>
+{
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        dst: &mut tokio::io::ReadBuf<'_>,
+    ) -> Poll<io::Result<()>> {
+        let me = self.get_mut();
+        // Do we need this mutex?
+        let state = me.state.get_mut();
+
+        loop {
+            match state {
+                Idle(ref mut buf_cell) => {
+                    let mut buf = buf_cell.take().unwrap_or_default();
+
+                    if buf.has_remaining() {
+                        // Here, we will split the `buf` into `[..dst.remaining()... ; rest ]`
+                        // The `rest` is stuffed into the `buf_cell` for further poll_read.
+                        // The other is completely consumed into the unfilled destination.
+                        // `rest` can be empty.
+                        let mut adjusted_src =
+                            buf.copy_to_bytes(std::cmp::min(buf.remaining(), dst.remaining()));
+                        let copied_size = adjusted_src.remaining();
+                        adjusted_src.copy_to_slice(dst.initialize_unfilled_to(copied_size));
+                        dst.set_filled(copied_size);
+                        *buf_cell = Some(buf);
+                        return Ready(Ok(()));
+                    }
+
+                    let reader = me.reader.clone();
+                    *state = Busy(me.rt.spawn_blocking(move || {
+                        let result = uninterruptibly!(reader.blocking_lock().read(
+                            // SAFETY: `reader.read` will *ONLY* write initialized bytes
+                            // and never *READ* uninitialized bytes
+                            // inside this buffer.
+                            //
+                            // Furthermore, casting the slice as `*mut [u8]`
+                            // is safe because it has the same layout.
+                            //
+                            // Finally, the pointer obtained is valid and owned
+                            // by `buf` only as we have a valid mutable reference
+                            // to it, it is valid for write.
+                            //
+                            // Here, we copy an nightly API: https://doc.rust-lang.org/stable/src/core/mem/maybe_uninit.rs.html#994-998
+                            unsafe {
+                                &mut *(buf.chunk_mut().as_uninit_slice_mut()
+                                    as *mut [std::mem::MaybeUninit<u8>]
+                                    as *mut [u8])
+                            }
+                        ));
+
+                        if let Ok(n) = result {
+                            // SAFETY: given we initialize `n` bytes, we can move `n` bytes
+                            // forward.
+                            unsafe {
+                                buf.advance_mut(n);
+                            }
+                        }
+
+                        (result, buf)
+                    }));
+                }
+                Busy(ref mut rx) => {
+                    let (result, mut buf) = ready!(Pin::new(rx).poll(cx))?;
+
+                    match result {
+                        Ok(n) => {
+                            if n > 0 {
+                                let remaining = std::cmp::min(n, dst.remaining());
+                                let mut adjusted_src = buf.copy_to_bytes(remaining);
+                                adjusted_src.copy_to_slice(dst.initialize_unfilled_to(remaining));
+                                dst.advance(remaining);
+                            }
+                            *state = Idle(Some(buf));
+                            return Ready(Ok(()));
+                        }
+                        Err(e) => {
+                            *state = Idle(None);
+                            return Ready(Err(e));
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+impl<R: Read + Send, Buf: bytes::Buf + bytes::BufMut> From<R> for SyncReadIntoAsyncRead<R, Buf> {
+    /// This must be called from within a Tokio runtime context, or else it will panic.
+    fn from(value: R) -> Self {
+        Self::new_with_reader(value)
+    }
+}
diff --git a/tvix/store/src/proto/tests/grpc_blobservice.rs b/tvix/store/src/proto/tests/grpc_blobservice.rs
index 88eb21e30241..80e2e71867a3 100644
--- a/tvix/store/src/proto/tests/grpc_blobservice.rs
+++ b/tvix/store/src/proto/tests/grpc_blobservice.rs
@@ -1,18 +1,14 @@
 use crate::blobservice::BlobService;
-use crate::chunkservice::ChunkService;
-use crate::proto::blob_meta::ChunkMeta;
 use crate::proto::blob_service_server::BlobService as GRPCBlobService;
 use crate::proto::{BlobChunk, GRPCBlobServiceWrapper, ReadBlobRequest, StatBlobRequest};
-use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST};
-use crate::tests::utils::{gen_blob_service, gen_chunk_service};
+use crate::tests::fixtures::{BLOB_A, BLOB_A_DIGEST};
+use crate::tests::utils::gen_blob_service;
+use tokio_stream::StreamExt;
 
-fn gen_grpc_blob_service() -> GRPCBlobServiceWrapper<
-    impl BlobService + Send + Sync + Clone + 'static,
-    impl ChunkService + Send + Sync + Clone + 'static,
-> {
+fn gen_grpc_blob_service(
+) -> GRPCBlobServiceWrapper<impl BlobService + Send + Sync + Clone + 'static> {
     let blob_service = gen_blob_service();
-    let chunk_service = gen_chunk_service();
-    GRPCBlobServiceWrapper::new(blob_service, chunk_service)
+    GRPCBlobServiceWrapper::from(blob_service)
 }
 
 /// Trying to read a non-existent blob should return a not found error.
@@ -26,8 +22,13 @@ async fn not_found_read() {
         }))
         .await;
 
-    let e = resp.expect_err("must_be_err");
-    assert_eq!(e.code(), tonic::Code::NotFound);
+    // We can't use unwrap_err here, because the Ok value doesn't implement
+    // debug.
+    if let Err(e) = resp {
+        assert_eq!(e.code(), tonic::Code::NotFound);
+    } else {
+        panic!("resp is not err")
+    }
 }
 
 /// Trying to stat a non-existent blob should return a not found error.
@@ -47,8 +48,7 @@ async fn not_found_stat() {
     assert_eq!(resp.code(), tonic::Code::NotFound);
 }
 
-/// Put a blob in the store, get it back. We send something small enough so it
-/// won't get split into multiple chunks.
+/// Put a blob in the store, get it back.
 #[tokio::test]
 async fn put_read_stat() {
     let service = gen_grpc_blob_service();
@@ -64,39 +64,30 @@ async fn put_read_stat() {
 
     assert_eq!(BLOB_A_DIGEST.to_vec(), put_resp.digest);
 
-    // Stat for the digest of A. It should return one chunk.
+    // Stat for the digest of A.
+    // We currently don't ask for more granular chunking data, as we don't
+    // expose it yet.
     let resp = service
         .stat(tonic::Request::new(StatBlobRequest {
             digest: BLOB_A_DIGEST.to_vec(),
-            include_chunks: true,
             ..Default::default()
         }))
         .await
         .expect("must succeed")
         .into_inner();
 
-    assert_eq!(1, resp.chunks.len());
-    // the `chunks` field should point to the single chunk.
-    assert_eq!(
-        vec![ChunkMeta {
-            digest: BLOB_A_DIGEST.to_vec(),
-            size: BLOB_A.len() as u32,
-        }],
-        resp.chunks,
-    );
-
-    // Read the chunk. It should return the same data.
+    // Read the blob. It should return the same data.
     let resp = service
         .read(tonic::Request::new(ReadBlobRequest {
             digest: BLOB_A_DIGEST.to_vec(),
         }))
         .await;
 
-    let mut rx = resp.expect("must succeed").into_inner().into_inner();
+    let mut rx = resp.ok().unwrap().into_inner();
 
     // the stream should contain one element, a BlobChunk with the same contents as BLOB_A.
     let item = rx
-        .recv()
+        .next()
         .await
         .expect("must be some")
         .expect("must succeed");
@@ -104,127 +95,8 @@ async fn put_read_stat() {
     assert_eq!(BLOB_A.to_vec(), item.data);
 
     // … and no more elements
-    assert!(rx.recv().await.is_none());
-}
-
-/// Put a bigger blob in the store, and get it back.
-/// Assert the stat request actually returns more than one chunk, and
-/// we can read each chunk individually, as well as the whole blob via the
-/// `read()` method.
-#[tokio::test]
-async fn put_read_stat_large() {
-    let service = gen_grpc_blob_service();
-
-    // split up BLOB_B into BlobChunks containing 1K bytes each.
-    let blob_b_blobchunks: Vec<BlobChunk> = BLOB_B
-        .chunks(1024)
-        .map(|x| BlobChunk { data: x.to_vec() })
-        .collect();
-
-    assert!(blob_b_blobchunks.len() > 1);
+    assert!(rx.next().await.is_none());
 
-    // Send blob B
-    let put_resp = service
-        .put(tonic_mock::streaming_request(blob_b_blobchunks))
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    assert_eq!(BLOB_B_DIGEST.to_vec(), put_resp.digest);
-
-    // Stat for the digest of B
-    let resp = service
-        .stat(tonic::Request::new(StatBlobRequest {
-            digest: BLOB_B_DIGEST.to_vec(),
-            include_chunks: true,
-            ..Default::default()
-        }))
-        .await
-        .expect("must succeed")
-        .into_inner();
-
-    // it should return more than one chunk.
-    assert_ne!(1, resp.chunks.len());
-
-    // The size added up should equal the size of BLOB_B.
-    let mut size_in_stat: u32 = 0;
-    for chunk in &resp.chunks {
-        size_in_stat += chunk.size
-    }
-    assert_eq!(BLOB_B.len() as u32, size_in_stat);
-
-    // Chunks are chunked up the same way we would do locally, when initializing the chunker with the same values.
-    // TODO: make the chunker config better accessible, so we don't need to synchronize this.
-    {
-        let chunker_avg_size = 64 * 1024;
-        let chunker_min_size = chunker_avg_size / 4;
-        let chunker_max_size = chunker_avg_size * 4;
-
-        // initialize a chunker with the current buffer
-        let blob_b = BLOB_B.to_vec();
-        let chunker = fastcdc::v2020::FastCDC::new(
-            &blob_b,
-            chunker_min_size,
-            chunker_avg_size,
-            chunker_max_size,
-        );
-
-        let mut num_chunks = 0;
-        for (i, chunk) in chunker.enumerate() {
-            assert_eq!(
-                resp.chunks[i].size, chunk.length as u32,
-                "expected locally-chunked chunk length to match stat response"
-            );
-
-            num_chunks += 1;
-        }
-
-        assert_eq!(
-            resp.chunks.len(),
-            num_chunks,
-            "expected number of chunks to match"
-        );
-    }
-
-    // Reading the whole blob by its digest via the read() interface should succeed.
-    {
-        let resp = service
-            .read(tonic::Request::new(ReadBlobRequest {
-                digest: BLOB_B_DIGEST.to_vec(),
-            }))
-            .await;
-
-        let mut rx = resp.expect("must succeed").into_inner().into_inner();
-
-        let mut buf: Vec<u8> = Vec::new();
-        while let Some(item) = rx.recv().await {
-            let mut blob_chunk = item.expect("must not be err");
-            buf.append(&mut blob_chunk.data);
-        }
-
-        assert_eq!(BLOB_B.to_vec(), buf);
-    }
-
-    // Reading the whole blob by reading individual chunks should also succeed.
-    {
-        let mut buf: Vec<u8> = Vec::new();
-        for chunk in &resp.chunks {
-            // request this individual chunk via read
-            let resp = service
-                .read(tonic::Request::new(ReadBlobRequest {
-                    digest: chunk.digest.clone(),
-                }))
-                .await;
-
-            let mut rx = resp.expect("must succeed").into_inner().into_inner();
-
-            // append all items from the stream to the buffer
-            while let Some(item) = rx.recv().await {
-                let mut blob_chunk = item.expect("must not be err");
-                buf.append(&mut blob_chunk.data);
-            }
-        }
-        // finished looping over all chunks, compare
-        assert_eq!(BLOB_B.to_vec(), buf);
-    }
+    // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks.
+    // Test with some bigger blob too
 }
diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
index f74c3fda0213..11cab2c264cc 100644
--- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
+++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
@@ -6,12 +6,10 @@ use crate::proto::GRPCPathInfoServiceWrapper;
 use crate::proto::PathInfo;
 use crate::proto::{GetPathInfoRequest, Node, SymlinkNode};
 use crate::tests::fixtures::DUMMY_OUTPUT_HASH;
-use crate::tests::utils::{
-    gen_blob_service, gen_chunk_service, gen_directory_service, gen_pathinfo_service,
-};
+use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service};
 use tonic::Request;
 
-/// generates a GRPCPathInfoService out of blob, chunk, directory and pathinfo services.
+/// generates a GRPCPathInfoService out of blob, directory and pathinfo services.
 ///
 /// We only interact with it via the PathInfo GRPC interface.
 /// It uses the NonCachingNARCalculationService NARCalculationService to
@@ -19,11 +17,7 @@ use tonic::Request;
 fn gen_grpc_service() -> impl GRPCPathInfoService {
     GRPCPathInfoServiceWrapper::new(
         gen_pathinfo_service(),
-        NonCachingNARCalculationService::new(
-            gen_blob_service(),
-            gen_chunk_service(),
-            gen_directory_service(),
-        ),
+        NonCachingNARCalculationService::new(gen_blob_service(), gen_directory_service()),
     )
 }
 
diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs
index 3a48df9e33c8..ed5154d6fec0 100644
--- a/tvix/store/src/tests/import.rs
+++ b/tvix/store/src/tests/import.rs
@@ -1,4 +1,4 @@
-use super::utils::{gen_blob_service, gen_chunk_service, gen_directory_service};
+use super::utils::{gen_blob_service, gen_directory_service};
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
 use crate::import::import_path;
@@ -21,7 +21,6 @@ fn symlink() {
 
     let root_node = import_path(
         &mut gen_blob_service(),
-        &mut gen_chunk_service(),
         &mut gen_directory_service(),
         tmpdir.path().join("doesntmatter"),
     )
@@ -46,7 +45,6 @@ fn single_file() {
 
     let root_node = import_path(
         &mut blob_service,
-        &mut gen_chunk_service(),
         &mut gen_directory_service(),
         tmpdir.path().join("root"),
     )
@@ -64,13 +62,8 @@ fn single_file() {
 
     // ensure the blob has been uploaded
     assert!(blob_service
-        .stat(&proto::StatBlobRequest {
-            digest: HELLOWORLD_BLOB_DIGEST.to_vec(),
-            include_chunks: false,
-            ..Default::default()
-        })
-        .unwrap()
-        .is_some());
+        .has(&HELLOWORLD_BLOB_DIGEST.to_vec().try_into().unwrap())
+        .unwrap());
 }
 
 #[test]
@@ -89,13 +82,8 @@ fn complicated() {
     let mut blob_service = gen_blob_service();
     let mut directory_service = gen_directory_service();
 
-    let root_node = import_path(
-        &mut blob_service,
-        &mut gen_chunk_service(),
-        &mut directory_service,
-        tmpdir.path(),
-    )
-    .expect("must succeed");
+    let root_node = import_path(&mut blob_service, &mut directory_service, tmpdir.path())
+        .expect("must succeed");
 
     // ensure root_node matched expectations
     assert_eq!(
@@ -124,11 +112,6 @@ fn complicated() {
 
     // ensure EMPTY_BLOB_CONTENTS has been uploaded
     assert!(blob_service
-        .stat(&proto::StatBlobRequest {
-            digest: EMPTY_BLOB_DIGEST.to_vec(),
-            include_chunks: false,
-            include_bao: false
-        })
-        .unwrap()
-        .is_some());
+        .has(&EMPTY_BLOB_DIGEST.to_vec().try_into().unwrap())
+        .unwrap());
 }
diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs
index 2d612d9c273f..b5dbf595960f 100644
--- a/tvix/store/src/tests/nar_renderer.rs
+++ b/tvix/store/src/tests/nar_renderer.rs
@@ -1,21 +1,17 @@
 use crate::blobservice::BlobService;
-use crate::chunkservice::ChunkService;
+use crate::blobservice::BlobWriter;
 use crate::directoryservice::DirectoryService;
 use crate::nar::NARRenderer;
-use crate::proto;
 use crate::proto::DirectoryNode;
 use crate::proto::FileNode;
 use crate::proto::SymlinkNode;
 use crate::tests::fixtures::*;
 use crate::tests::utils::*;
+use std::io;
 
 #[test]
 fn single_symlink() {
-    let renderer = NARRenderer::new(
-        gen_blob_service(),
-        gen_chunk_service(),
-        gen_directory_service(),
-    );
+    let renderer = NARRenderer::new(gen_blob_service(), gen_directory_service());
     // don't put anything in the stores, as we don't actually do any requests.
 
     let mut buf: Vec<u8> = vec![];
@@ -37,11 +33,7 @@ fn single_symlink() {
 /// match what's in the store.
 #[test]
 fn single_file_missing_blob() {
-    let renderer = NARRenderer::new(
-        gen_blob_service(),
-        gen_chunk_service(),
-        gen_directory_service(),
-    );
+    let renderer = NARRenderer::new(gen_blob_service(), gen_directory_service());
     let mut buf: Vec<u8> = vec![];
 
     let e = renderer
@@ -56,10 +48,11 @@ fn single_file_missing_blob() {
         )
         .expect_err("must fail");
 
-    if let crate::nar::RenderError::BlobNotFound(actual_digest, _) = e {
-        assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), actual_digest);
-    } else {
-        panic!("unexpected error")
+    match e {
+        crate::nar::RenderError::NARWriterError(e) => {
+            assert_eq!(io::ErrorKind::NotFound, e.kind());
+        }
+        _ => panic!("unexpected error: {:?}", e),
     }
 }
 
@@ -68,84 +61,79 @@ fn single_file_missing_blob() {
 #[test]
 fn single_file_wrong_blob_size() {
     let blob_service = gen_blob_service();
-    let chunk_service = gen_chunk_service();
 
-    // insert blob and chunk into the stores
-    chunk_service
-        .put(HELLOWORLD_BLOB_CONTENTS.to_vec())
-        .unwrap();
-
-    blob_service
-        .put(
-            &HELLOWORLD_BLOB_DIGEST,
-            proto::BlobMeta {
-                chunks: vec![proto::blob_meta::ChunkMeta {
+    // insert blob into the store
+    let mut writer = blob_service.open_write().unwrap();
+    io::copy(
+        &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()),
+        &mut writer,
+    )
+    .unwrap();
+    assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), writer.close().unwrap());
+
+    let renderer = NARRenderer::new(blob_service, gen_directory_service());
+
+    // Test with a root FileNode of a too big size
+    {
+        let mut buf: Vec<u8> = vec![];
+        let e = renderer
+            .write_nar(
+                &mut buf,
+                &crate::proto::node::Node::File(FileNode {
+                    name: "doesntmatter".to_string(),
                     digest: HELLOWORLD_BLOB_DIGEST.to_vec(),
-                    size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
-                }],
-                ..Default::default()
-            },
-        )
-        .unwrap();
-
-    let renderer = NARRenderer::new(blob_service, chunk_service, gen_directory_service());
-    let mut buf: Vec<u8> = vec![];
-
-    let e = renderer
-        .write_nar(
-            &mut buf,
-            &crate::proto::node::Node::File(FileNode {
-                name: "doesntmatter".to_string(),
-                digest: HELLOWORLD_BLOB_DIGEST.to_vec(),
-                size: 42, // <- note the wrong size here!
-                executable: false,
-            }),
-        )
-        .expect_err("must fail");
+                    size: 42, // <- note the wrong size here!
+                    executable: false,
+                }),
+            )
+            .expect_err("must fail");
+
+        match e {
+            crate::nar::RenderError::NARWriterError(e) => {
+                assert_eq!(io::ErrorKind::UnexpectedEof, e.kind());
+            }
+            _ => panic!("unexpected error: {:?}", e),
+        }
+    }
 
-    if let crate::nar::RenderError::UnexpectedBlobMeta(digest, _, expected_size, actual_size) = e {
-        assert_eq!(
-            digest,
-            HELLOWORLD_BLOB_DIGEST.to_vec(),
-            "expect digest to match"
-        );
-        assert_eq!(
-            expected_size, 42,
-            "expected expected size to be what's passed in the request"
-        );
-        assert_eq!(
-            actual_size,
-            HELLOWORLD_BLOB_CONTENTS.len() as u32,
-            "expected actual size to be correct"
-        );
-    } else {
-        panic!("unexpected error")
+    // Test with a root FileNode of a too small size
+    {
+        let mut buf: Vec<u8> = vec![];
+        let e = renderer
+            .write_nar(
+                &mut buf,
+                &crate::proto::node::Node::File(FileNode {
+                    name: "doesntmatter".to_string(),
+                    digest: HELLOWORLD_BLOB_DIGEST.to_vec(),
+                    size: 2, // <- note the wrong size here!
+                    executable: false,
+                }),
+            )
+            .expect_err("must fail");
+
+        match e {
+            crate::nar::RenderError::NARWriterError(e) => {
+                assert_eq!(io::ErrorKind::InvalidInput, e.kind());
+            }
+            _ => panic!("unexpected error: {:?}", e),
+        }
     }
 }
 
 #[test]
 fn single_file() {
     let blob_service = gen_blob_service();
-    let chunk_service = gen_chunk_service();
 
-    chunk_service
-        .put(HELLOWORLD_BLOB_CONTENTS.to_vec())
-        .unwrap();
+    // insert blob into the store
+    let mut writer = blob_service.open_write().unwrap();
+    io::copy(
+        &mut io::Cursor::new(HELLOWORLD_BLOB_CONTENTS.to_vec()),
+        &mut writer,
+    )
+    .unwrap();
+    assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), writer.close().unwrap());
 
-    blob_service
-        .put(
-            &HELLOWORLD_BLOB_DIGEST,
-            proto::BlobMeta {
-                chunks: vec![proto::blob_meta::ChunkMeta {
-                    digest: HELLOWORLD_BLOB_DIGEST.to_vec(),
-                    size: HELLOWORLD_BLOB_CONTENTS.len() as u32,
-                }],
-                ..Default::default()
-            },
-        )
-        .unwrap();
-
-    let renderer = NARRenderer::new(blob_service, chunk_service, gen_directory_service());
+    let renderer = NARRenderer::new(blob_service, gen_directory_service());
     let mut buf: Vec<u8> = vec![];
 
     renderer
@@ -166,31 +154,24 @@ fn single_file() {
 #[test]
 fn test_complicated() {
     let blob_service = gen_blob_service();
-    let chunk_service = gen_chunk_service();
     let directory_service = gen_directory_service();
 
     // put all data into the stores.
-    let digest = chunk_service.put(EMPTY_BLOB_CONTENTS.to_vec()).unwrap();
-
-    blob_service
-        .put(
-            &digest,
-            proto::BlobMeta {
-                chunks: vec![proto::blob_meta::ChunkMeta {
-                    digest: digest.to_vec(),
-                    size: EMPTY_BLOB_CONTENTS.len() as u32,
-                }],
-                ..Default::default()
-            },
-        )
-        .unwrap();
+    // insert blob into the store
+    let mut writer = blob_service.open_write().unwrap();
+    io::copy(
+        &mut io::Cursor::new(EMPTY_BLOB_CONTENTS.to_vec()),
+        &mut writer,
+    )
+    .unwrap();
+    assert_eq!(EMPTY_BLOB_DIGEST.to_vec(), writer.close().unwrap());
 
     directory_service.put(DIRECTORY_WITH_KEEP.clone()).unwrap();
     directory_service
         .put(DIRECTORY_COMPLICATED.clone())
         .unwrap();
 
-    let renderer = NARRenderer::new(blob_service, chunk_service, directory_service);
+    let renderer = NARRenderer::new(blob_service, directory_service);
     let mut buf: Vec<u8> = vec![];
 
     renderer
diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs
index 916e6516216c..2991feed41db 100644
--- a/tvix/store/src/tests/utils.rs
+++ b/tvix/store/src/tests/utils.rs
@@ -1,6 +1,5 @@
 use crate::{
     blobservice::{BlobService, MemoryBlobService},
-    chunkservice::{ChunkService, MemoryChunkService},
     directoryservice::{DirectoryService, MemoryDirectoryService},
     pathinfoservice::{MemoryPathInfoService, PathInfoService},
 };
@@ -9,10 +8,6 @@ pub fn gen_blob_service() -> impl BlobService + Send + Sync + Clone + 'static {
     MemoryBlobService::default()
 }
 
-pub fn gen_chunk_service() -> impl ChunkService + Clone {
-    MemoryChunkService::default()
-}
-
 pub fn gen_directory_service() -> impl DirectoryService + Send + Sync + Clone + 'static {
     MemoryDirectoryService::default()
 }