about summary refs log tree commit diff
path: root/tvix/castore/src/blobservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/blobservice')
-rw-r--r--tvix/castore/src/blobservice/chunked_reader.rs496
-rw-r--r--tvix/castore/src/blobservice/combinator.rs128
-rw-r--r--tvix/castore/src/blobservice/from_addr.rs88
-rw-r--r--tvix/castore/src/blobservice/grpc.rs388
-rw-r--r--tvix/castore/src/blobservice/memory.rs155
-rw-r--r--tvix/castore/src/blobservice/mod.rs112
-rw-r--r--tvix/castore/src/blobservice/object_store.rs617
-rw-r--r--tvix/castore/src/blobservice/tests/mod.rs253
-rw-r--r--tvix/castore/src/blobservice/tests/utils.rs42
9 files changed, 2279 insertions, 0 deletions
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs
new file mode 100644
index 000000000000..6e8355874bca
--- /dev/null
+++ b/tvix/castore/src/blobservice/chunked_reader.rs
@@ -0,0 +1,496 @@
+use futures::{ready, TryStreamExt};
+use pin_project_lite::pin_project;
+use tokio::io::{AsyncRead, AsyncSeekExt};
+use tokio_stream::StreamExt;
+use tokio_util::io::{ReaderStream, StreamReader};
+use tracing::{instrument, trace, warn};
+
+use crate::B3Digest;
+use std::{cmp::Ordering, pin::Pin};
+
+use super::{BlobReader, BlobService};
+
+pin_project! {
+    /// ChunkedReader provides a chunk-aware [BlobReader], so allows reading and
+    /// seeking into a blob.
+    /// It internally holds a [ChunkedBlob], which is storing chunk information
+    /// able to emit a reader seeked to a specific position whenever we need to seek.
+    pub struct ChunkedReader<BS> {
+        chunked_blob: ChunkedBlob<BS>,
+
+        #[pin]
+        r: Box<dyn AsyncRead + Unpin + Send>,
+
+        pos: u64,
+    }
+}
+
+impl<BS> ChunkedReader<BS>
+where
+    BS: AsRef<dyn BlobService> + Clone + 'static + Send,
+{
+    /// Construct a new [ChunkedReader], by retrieving a list of chunks (their
+    /// blake3 digests and chunk sizes)
+    pub fn from_chunks(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
+        let chunked_blob = ChunkedBlob::from_iter(chunks_it, blob_service);
+        let r = chunked_blob.reader_skipped_offset(0);
+
+        Self {
+            chunked_blob,
+            r,
+            pos: 0,
+        }
+    }
+}
+
+/// ChunkedReader implements BlobReader.
+impl<BS> BlobReader for ChunkedReader<BS> where BS: Send + Clone + 'static + AsRef<dyn BlobService> {}
+
+impl<BS> tokio::io::AsyncRead for ChunkedReader<BS>
+where
+    BS: AsRef<dyn BlobService> + Clone + 'static,
+{
+    fn poll_read(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &mut tokio::io::ReadBuf<'_>,
+    ) -> std::task::Poll<std::io::Result<()>> {
+        // The amount of data read can be determined by the increase
+        // in the length of the slice returned by `ReadBuf::filled`.
+        let filled_before = buf.filled().len();
+
+        let this = self.project();
+
+        ready!(this.r.poll_read(cx, buf))?;
+        let bytes_read = buf.filled().len() - filled_before;
+        *this.pos += bytes_read as u64;
+
+        Ok(()).into()
+    }
+}
+
+impl<BS> tokio::io::AsyncSeek for ChunkedReader<BS>
+where
+    BS: AsRef<dyn BlobService> + Clone + Send + 'static,
+{
+    #[instrument(skip(self), err(Debug))]
+    fn start_seek(self: Pin<&mut Self>, position: std::io::SeekFrom) -> std::io::Result<()> {
+        let total_len = self.chunked_blob.blob_length();
+        let mut this = self.project();
+
+        let absolute_offset: u64 = match position {
+            std::io::SeekFrom::Start(from_start) => from_start,
+            std::io::SeekFrom::End(from_end) => {
+                // note from_end is i64, not u64, so this is usually negative.
+                total_len.checked_add_signed(from_end).ok_or_else(|| {
+                    std::io::Error::new(
+                        std::io::ErrorKind::InvalidInput,
+                        "over/underflow while seeking",
+                    )
+                })?
+            }
+            std::io::SeekFrom::Current(from_current) => {
+                // note from_end is i64, not u64, so this can be positive or negative.
+                (*this.pos)
+                    .checked_add_signed(from_current)
+                    .ok_or_else(|| {
+                        std::io::Error::new(
+                            std::io::ErrorKind::InvalidInput,
+                            "over/underflow while seeking",
+                        )
+                    })?
+            }
+        };
+
+        // check if the position actually did change.
+        if absolute_offset != *this.pos {
+            // ensure the new position still is inside the file.
+            if absolute_offset > total_len {
+                Err(std::io::Error::new(
+                    std::io::ErrorKind::InvalidInput,
+                    "seeked beyond EOF",
+                ))?
+            }
+
+            // Update the position and the internal reader.
+            *this.pos = absolute_offset;
+
+            // FUTUREWORK: if we can seek forward, avoid re-assembling.
+            // At least if it's still in the same chunk?
+            *this.r = this.chunked_blob.reader_skipped_offset(absolute_offset);
+        }
+
+        Ok(())
+    }
+
+    fn poll_complete(
+        self: Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<std::io::Result<u64>> {
+        std::task::Poll::Ready(Ok(self.pos))
+    }
+}
+
+/// Holds a list of blake3 digest for individual chunks (and their sizes).
+/// Is able to construct a Reader that seeked to a certain offset, which
+/// is useful to construct a BlobReader (that implements AsyncSeek).
+/// - the current chunk index, and a Custor<Vec<u8>> holding the data of that chunk.
+struct ChunkedBlob<BS> {
+    blob_service: BS,
+    chunks: Vec<(u64, u64, B3Digest)>,
+}
+
+impl<BS> ChunkedBlob<BS>
+where
+    BS: AsRef<dyn BlobService> + Clone + 'static + Send,
+{
+    /// Constructs [Self] from a list of blake3 digests of chunks and their
+    /// sizes, and a reference to a blob service.
+    /// Initializing it with an empty list is disallowed.
+    fn from_iter(chunks_it: impl Iterator<Item = (B3Digest, u64)>, blob_service: BS) -> Self {
+        let mut chunks = Vec::new();
+        let mut offset: u64 = 0;
+
+        for (chunk_digest, chunk_size) in chunks_it {
+            chunks.push((offset, chunk_size, chunk_digest));
+            offset += chunk_size;
+        }
+
+        assert!(
+            !chunks.is_empty(),
+            "Chunks must be provided, don't use this for blobs without chunks"
+        );
+
+        Self {
+            blob_service,
+            chunks,
+        }
+    }
+
+    /// Returns the length of the blob.
+    fn blob_length(&self) -> u64 {
+        self.chunks
+            .last()
+            .map(|(chunk_offset, chunk_size, _)| chunk_offset + chunk_size)
+            .unwrap_or(0)
+    }
+
+    /// For a given position pos, return the chunk containing the data.
+    /// In case this would range outside the blob, None is returned.
+    #[instrument(level = "trace", skip(self), ret)]
+    fn get_chunk_idx_for_position(&self, pos: u64) -> Option<usize> {
+        // FUTUREWORK: benchmark when to use linear search, binary_search and BTreeSet
+        self.chunks
+            .binary_search_by(|(chunk_start_pos, chunk_size, _)| {
+                if chunk_start_pos + chunk_size <= pos {
+                    Ordering::Less
+                } else if *chunk_start_pos > pos {
+                    Ordering::Greater
+                } else {
+                    Ordering::Equal
+                }
+            })
+            .ok()
+    }
+
+    /// Returns a stream of bytes of the data in that blob.
+    /// It internally assembles a stream reading from each chunk (skipping over
+    /// chunks containing irrelevant data).
+    /// From the first relevant chunk, the irrelevant bytes are skipped too.
+    /// The returned boxed thing does not implement AsyncSeek on its own, but
+    /// ChunkedReader does.
+    #[instrument(level = "trace", skip(self))]
+    fn reader_skipped_offset(&self, offset: u64) -> Box<dyn tokio::io::AsyncRead + Send + Unpin> {
+        if offset == self.blob_length() {
+            return Box::new(std::io::Cursor::new(vec![]));
+        }
+        // construct a stream of all chunks starting with the given offset
+        let start_chunk_idx = self
+            .get_chunk_idx_for_position(offset)
+            .expect("outside of blob");
+        // It's ok to panic here, we can only reach this by seeking, and seeking should already reject out-of-file seeking.
+
+        let skip_first_chunk_bytes = (offset - self.chunks[start_chunk_idx].0) as usize;
+
+        let blob_service = self.blob_service.clone();
+        let chunks: Vec<_> = self.chunks[start_chunk_idx..].to_vec();
+        let readers_stream = tokio_stream::iter(chunks.into_iter().enumerate()).map(
+            move |(nth_chunk, (_chunk_start_offset, chunk_size, chunk_digest))| {
+                let chunk_digest = chunk_digest.to_owned();
+                let blob_service = blob_service.clone();
+                async move {
+                    trace!(chunk_size=%chunk_size, chunk_digest=%chunk_digest, "open_read on chunk in stream");
+                    let mut blob_reader = blob_service
+                        .as_ref()
+                        .open_read(&chunk_digest.to_owned())
+                        .await?
+                        .ok_or_else(|| {
+                            warn!(chunk.digest = %chunk_digest, "chunk not found");
+                            std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found")
+                        })?;
+
+                    // iff this is the first chunk in the stream, skip by skip_first_chunk_bytes
+                    if nth_chunk == 0 && skip_first_chunk_bytes > 0 {
+                        blob_reader
+                            .seek(std::io::SeekFrom::Start(skip_first_chunk_bytes as u64))
+                            .await?;
+                    }
+                    Ok::<_, std::io::Error>(blob_reader)
+                }
+            },
+        );
+
+        // convert the stream of readers to a stream of streams of byte chunks
+        let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) });
+
+        // flatten into one stream of byte chunks
+        let bytes_stream = bytes_streams.try_flatten();
+
+        // convert into AsyncRead
+        Box::new(StreamReader::new(Box::pin(bytes_stream)))
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::{io::SeekFrom, sync::Arc};
+
+    use crate::{
+        blobservice::{chunked_reader::ChunkedReader, BlobService, MemoryBlobService},
+        B3Digest,
+    };
+    use hex_literal::hex;
+    use lazy_static::lazy_static;
+    use tokio::io::{AsyncReadExt, AsyncSeekExt};
+
+    const CHUNK_1: [u8; 2] = hex!("0001");
+    const CHUNK_2: [u8; 4] = hex!("02030405");
+    const CHUNK_3: [u8; 1] = hex!("06");
+    const CHUNK_4: [u8; 2] = hex!("0708");
+    const CHUNK_5: [u8; 7] = hex!("090a0b0c0d0e0f");
+
+    lazy_static! {
+        // `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]`
+        pub static ref CHUNK_1_DIGEST: B3Digest = blake3::hash(&CHUNK_1).as_bytes().into();
+        pub static ref CHUNK_2_DIGEST: B3Digest = blake3::hash(&CHUNK_2).as_bytes().into();
+        pub static ref CHUNK_3_DIGEST: B3Digest = blake3::hash(&CHUNK_3).as_bytes().into();
+        pub static ref CHUNK_4_DIGEST: B3Digest = blake3::hash(&CHUNK_4).as_bytes().into();
+        pub static ref CHUNK_5_DIGEST: B3Digest = blake3::hash(&CHUNK_5).as_bytes().into();
+        pub static ref BLOB_1_LIST: [(B3Digest, u64); 5] = [
+            (CHUNK_1_DIGEST.clone(), 2),
+            (CHUNK_2_DIGEST.clone(), 4),
+            (CHUNK_3_DIGEST.clone(), 1),
+            (CHUNK_4_DIGEST.clone(), 2),
+            (CHUNK_5_DIGEST.clone(), 7),
+        ];
+    }
+
+    use super::ChunkedBlob;
+
+    /// ensure the start offsets are properly calculated.
+    #[test]
+    fn from_iter() {
+        let cb = ChunkedBlob::from_iter(
+            BLOB_1_LIST.clone().into_iter(),
+            Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
+        );
+
+        assert_eq!(
+            cb.chunks,
+            Vec::from_iter([
+                (0, 2, CHUNK_1_DIGEST.clone()),
+                (2, 4, CHUNK_2_DIGEST.clone()),
+                (6, 1, CHUNK_3_DIGEST.clone()),
+                (7, 2, CHUNK_4_DIGEST.clone()),
+                (9, 7, CHUNK_5_DIGEST.clone()),
+            ])
+        );
+    }
+
+    /// ensure ChunkedBlob can't be used with an empty list of chunks
+    #[test]
+    #[should_panic]
+    fn from_iter_empty() {
+        ChunkedBlob::from_iter(
+            [].into_iter(),
+            Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
+        );
+    }
+
+    /// ensure the right chunk is selected
+    #[test]
+    fn chunk_idx_for_position() {
+        let cb = ChunkedBlob::from_iter(
+            BLOB_1_LIST.clone().into_iter(),
+            Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
+        );
+
+        assert_eq!(Some(0), cb.get_chunk_idx_for_position(0), "start of blob");
+
+        assert_eq!(
+            Some(0),
+            cb.get_chunk_idx_for_position(1),
+            "middle of first chunk"
+        );
+        assert_eq!(
+            Some(1),
+            cb.get_chunk_idx_for_position(2),
+            "beginning of second chunk"
+        );
+
+        assert_eq!(
+            Some(4),
+            cb.get_chunk_idx_for_position(15),
+            "right before the end of the blob"
+        );
+        assert_eq!(
+            None,
+            cb.get_chunk_idx_for_position(16),
+            "right outside the blob"
+        );
+        assert_eq!(
+            None,
+            cb.get_chunk_idx_for_position(100),
+            "way outside the blob"
+        );
+    }
+
+    /// returns a blobservice with all chunks in BLOB_1 present.
+    async fn gen_blobservice_blob1() -> Arc<dyn BlobService> {
+        let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
+
+        // seed blob service with all chunks
+        for blob_contents in [
+            CHUNK_1.to_vec(),
+            CHUNK_2.to_vec(),
+            CHUNK_3.to_vec(),
+            CHUNK_4.to_vec(),
+            CHUNK_5.to_vec(),
+        ] {
+            let mut bw = blob_service.open_write().await;
+            tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
+                .await
+                .expect("writing blob");
+            bw.close().await.expect("close blobwriter");
+        }
+
+        blob_service
+    }
+
+    #[tokio::test]
+    async fn test_read() {
+        let blob_service = gen_blobservice_blob1().await;
+        let mut chunked_reader =
+            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
+
+        // read all data
+        let mut buf = Vec::new();
+        tokio::io::copy(&mut chunked_reader, &mut buf)
+            .await
+            .expect("copy");
+
+        assert_eq!(
+            hex!("000102030405060708090a0b0c0d0e0f").to_vec(),
+            buf,
+            "read data must match"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_seek() {
+        let blob_service = gen_blobservice_blob1().await;
+        let mut chunked_reader =
+            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
+
+        // seek to the end
+        // expect to read 0 bytes
+        {
+            chunked_reader
+                .seek(SeekFrom::End(0))
+                .await
+                .expect("seek to end");
+
+            let mut buf = Vec::new();
+            chunked_reader
+                .read_to_end(&mut buf)
+                .await
+                .expect("read to end");
+
+            assert_eq!(hex!("").to_vec(), buf);
+        }
+
+        // seek one bytes before the end
+        {
+            chunked_reader.seek(SeekFrom::End(-1)).await.expect("seek");
+
+            let mut buf = Vec::new();
+            chunked_reader
+                .read_to_end(&mut buf)
+                .await
+                .expect("read to end");
+
+            assert_eq!(hex!("0f").to_vec(), buf);
+        }
+
+        // seek back three bytes, but using relative positioning
+        // read two bytes
+        {
+            chunked_reader
+                .seek(SeekFrom::Current(-3))
+                .await
+                .expect("seek");
+
+            let mut buf = [0b0; 2];
+            chunked_reader
+                .read_exact(&mut buf)
+                .await
+                .expect("read exact");
+
+            assert_eq!(hex!("0d0e"), buf);
+        }
+    }
+
+    // seeds a blob service with only the first two chunks, reads a bit in the
+    // front (which succeeds), but then tries to seek past and read more (which
+    // should fail).
+    #[tokio::test]
+    async fn test_read_missing_chunks() {
+        let blob_service = Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>;
+
+        for blob_contents in [CHUNK_1.to_vec(), CHUNK_2.to_vec()] {
+            let mut bw = blob_service.open_write().await;
+            tokio::io::copy(&mut std::io::Cursor::new(blob_contents), &mut bw)
+                .await
+                .expect("writing blob");
+
+            bw.close().await.expect("close blobwriter");
+        }
+
+        let mut chunked_reader =
+            ChunkedReader::from_chunks(BLOB_1_LIST.clone().into_iter(), blob_service);
+
+        // read a bit from the front (5 bytes out of 6 available)
+        let mut buf = [0b0; 5];
+        chunked_reader
+            .read_exact(&mut buf)
+            .await
+            .expect("read exact");
+
+        assert_eq!(hex!("0001020304"), buf);
+
+        // seek 2 bytes forward, into an area where we don't have chunks
+        chunked_reader
+            .seek(SeekFrom::Current(2))
+            .await
+            .expect("seek");
+
+        let mut buf = Vec::new();
+        chunked_reader
+            .read_to_end(&mut buf)
+            .await
+            .expect_err("must fail");
+
+        // FUTUREWORK: check semantics on errorkinds. Should this be InvalidData
+        // or NotFound?
+    }
+}
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs
new file mode 100644
index 000000000000..6a964c8a8440
--- /dev/null
+++ b/tvix/castore/src/blobservice/combinator.rs
@@ -0,0 +1,128 @@
+use std::sync::Arc;
+
+use tonic::async_trait;
+use tracing::instrument;
+
+use crate::composition::{CompositionContext, ServiceBuilder};
+use crate::{B3Digest, Error};
+
+use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
+
+/// Combinator for a BlobService, using a "local" and "remote" blobservice.
+/// Requests are tried in (and returned from) the local store first, only if
+/// things are not present there, the remote BlobService is queried.
+/// In case the local blobservice doesn't have the blob, we ask the remote
+/// blobservice for chunks, and try to read each of these chunks from the local
+/// blobservice again, before falling back to the remote one.
+/// The remote BlobService is never written to.
+pub struct CombinedBlobService<BL, BR> {
+    local: BL,
+    remote: BR,
+}
+
+impl<BL, BR> Clone for CombinedBlobService<BL, BR>
+where
+    BL: Clone,
+    BR: Clone,
+{
+    fn clone(&self) -> Self {
+        Self {
+            local: self.local.clone(),
+            remote: self.remote.clone(),
+        }
+    }
+}
+
+#[async_trait]
+impl<BL, BR> BlobService for CombinedBlobService<BL, BR>
+where
+    BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
+    BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
+{
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> {
+        Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?)
+    }
+
+    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> {
+        if self.local.as_ref().has(digest).await? {
+            // local store has the blob, so we can assume it also has all chunks.
+            self.local.as_ref().open_read(digest).await
+        } else {
+            // Local store doesn't have the blob.
+            // Ask the remote one for the list of chunks,
+            // and create a chunked reader that uses self.open_read() for
+            // individual chunks. There's a chance we already have some chunks
+            // locally, meaning we don't need to fetch them all from the remote
+            // BlobService.
+            match self.remote.as_ref().chunks(digest).await? {
+                // blob doesn't exist on the remote side either, nothing we can do.
+                None => Ok(None),
+                Some(remote_chunks) => {
+                    // if there's no more granular chunks, or the remote
+                    // blobservice doesn't support chunks, read the blob from
+                    // the remote blobservice directly.
+                    if remote_chunks.is_empty() {
+                        return self.remote.as_ref().open_read(digest).await;
+                    }
+                    // otherwise, a chunked reader, which will always try the
+                    // local backend first.
+
+                    let chunked_reader = ChunkedReader::from_chunks(
+                        remote_chunks.into_iter().map(|chunk| {
+                            (
+                                chunk.digest.try_into().expect("invalid b3 digest"),
+                                chunk.size,
+                            )
+                        }),
+                        Arc::new(self.clone()) as Arc<dyn BlobService>,
+                    );
+                    Ok(Some(Box::new(chunked_reader)))
+                }
+            }
+        }
+    }
+
+    #[instrument(skip_all)]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        // direct writes to the local one.
+        self.local.as_ref().open_write().await
+    }
+}
+
+#[derive(serde::Deserialize, Debug, Clone)]
+#[serde(deny_unknown_fields)]
+pub struct CombinedBlobServiceConfig {
+    local: String,
+    remote: String,
+}
+
+impl TryFrom<url::Url> for CombinedBlobServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
+        Err(Error::StorageError(
+            "Instantiating a CombinedBlobService from a url is not supported".into(),
+        )
+        .into())
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for CombinedBlobServiceConfig {
+    type Output = dyn BlobService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        context: &CompositionContext,
+    ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> {
+        let (local, remote) = futures::join!(
+            context.resolve(self.local.clone()),
+            context.resolve(self.remote.clone())
+        );
+        Ok(Arc::new(CombinedBlobService {
+            local: local?,
+            remote: remote?,
+        }))
+    }
+}
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs
new file mode 100644
index 000000000000..c5cabaa9d945
--- /dev/null
+++ b/tvix/castore/src/blobservice/from_addr.rs
@@ -0,0 +1,88 @@
+use std::sync::Arc;
+
+use url::Url;
+
+use crate::composition::{
+    with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG,
+};
+
+use super::BlobService;
+
+/// Constructs a new instance of a [BlobService] from an URI.
+///
+/// The following schemes are supported by the following services:
+/// - `memory://` ([MemoryBlobService])
+/// - `grpc+*://` ([GRPCBlobService])
+/// - `objectstore+*://` ([ObjectStoreBlobService])
+///
+/// See their `from_url` methods for more details about their syntax.
+pub async fn from_addr(
+    uri: &str,
+) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> {
+    let url = Url::parse(uri)
+        .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?;
+
+    let blob_service_config = with_registry(&REG, || {
+        <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>::try_from(url)
+    })?
+    .0;
+    let blob_service = blob_service_config
+        .build("anonymous", &CompositionContext::blank())
+        .await?;
+
+    Ok(blob_service)
+}
+
+#[cfg(test)]
+mod tests {
+    use super::from_addr;
+    use rstest::rstest;
+
+    #[rstest]
+    /// This uses an unsupported scheme.
+    #[case::unsupported_scheme("http://foo.example/test", false)]
+    /// This correctly sets the scheme, and doesn't set a path.
+    #[case::memory_valid("memory://", true)]
+    /// This sets a memory url host to `foo`
+    #[case::memory_invalid_host("memory://foo", false)]
+    /// This sets a memory url path to "/", which is invalid.
+    #[case::memory_invalid_root_path("memory:///", false)]
+    /// This sets a memory url path to "/foo", which is invalid.
+    #[case::memory_invalid_root_path_foo("memory:///foo", false)]
+    /// Correct scheme to connect to a unix socket.
+    #[case::grpc_valid_unix_socket("grpc+unix:///path/to/somewhere", true)]
+    /// Correct scheme for unix socket, but setting a host too, which is invalid.
+    #[case::grpc_invalid_unix_socket_and_host("grpc+unix://host.example/path/to/somewhere", false)]
+    /// Correct scheme to connect to localhost, with port 12345
+    #[case::grpc_valid_ipv6_localhost_port_12345("grpc+http://[::1]:12345", true)]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[case::grpc_valid_http_host_without_port("grpc+http://localhost", true)]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)]
+    /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
+    #[case::grpc_invalid_has_path("grpc+http://localhost/some-path", false)]
+    /// An example for object store (InMemory)
+    #[case::objectstore_valid_memory("objectstore+memory:///", true)]
+    /// An example for object store (LocalFileSystem)
+    #[case::objectstore_valid_file("objectstore+file:///foo/bar", true)]
+    // An example for object store (HTTP / WebDAV)
+    #[case::objectstore_valid_http_url("objectstore+https://localhost:8080/some-path", true)]
+    /// An example for object store (S3)
+    #[cfg_attr(
+        feature = "cloud",
+        case::objectstore_valid_s3_url("objectstore+s3://bucket/path", true)
+    )]
+    /// An example for object store (GCS)
+    #[cfg_attr(
+        feature = "cloud",
+        case::objectstore_valid_gcs_url("objectstore+gs://bucket/path", true)
+    )]
+    #[tokio::test]
+    async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) {
+        if exp_succeed {
+            from_addr(uri_str).await.expect("should succeed");
+        } else {
+            assert!(from_addr(uri_str).await.is_err(), "should fail");
+        }
+    }
+}
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
new file mode 100644
index 000000000000..0db3dfea4ad8
--- /dev/null
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -0,0 +1,388 @@
+use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
+use crate::composition::{CompositionContext, ServiceBuilder};
+use crate::{
+    proto::{self, stat_blob_response::ChunkMeta},
+    B3Digest,
+};
+use futures::sink::SinkExt;
+use std::{
+    io::{self, Cursor},
+    pin::pin,
+    sync::Arc,
+    task::Poll,
+};
+use tokio::io::AsyncWriteExt;
+use tokio::task::JoinHandle;
+use tokio_stream::{wrappers::ReceiverStream, StreamExt};
+use tokio_util::{
+    io::{CopyToBytes, SinkWriter},
+    sync::PollSender,
+};
+use tonic::{async_trait, Code, Status};
+use tracing::{instrument, Instrument as _};
+
+/// Connects to a (remote) tvix-store BlobService over gRPC.
+#[derive(Clone)]
+pub struct GRPCBlobService<T> {
+    /// The internal reference to a gRPC client.
+    /// Cloning it is cheap, and it internally handles concurrent requests.
+    grpc_client: proto::blob_service_client::BlobServiceClient<T>,
+}
+
+impl<T> GRPCBlobService<T> {
+    /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient].
+    pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self {
+        Self { grpc_client }
+    }
+}
+
+#[async_trait]
+impl<T> BlobService for GRPCBlobService<T>
+where
+    T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static,
+    T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static,
+    <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
+    T::Future: Send,
+{
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
+        match self
+            .grpc_client
+            .clone()
+            .stat(proto::StatBlobRequest {
+                digest: digest.clone().into(),
+                ..Default::default()
+            })
+            .await
+        {
+            Ok(_blob_meta) => Ok(true),
+            Err(e) if e.code() == Code::NotFound => Ok(false),
+            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+        }
+    }
+
+    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
+        // First try to get a list of chunks. In case there's only one chunk returned,
+        // buffer its data into a Vec, otherwise use a ChunkedReader.
+        // We previously used NaiveSeeker here, but userland likes to seek backwards too often,
+        // and without store composition this will get very noisy.
+        // FUTUREWORK: use CombinedBlobService and store composition.
+        match self.chunks(digest).await {
+            Ok(None) => Ok(None),
+            Ok(Some(chunks)) => {
+                if chunks.is_empty() || chunks.len() == 1 {
+                    // No more granular chunking info, treat this as an individual chunk.
+                    // Get a stream of [proto::BlobChunk], or return an error if the blob
+                    // doesn't exist.
+                    return match self
+                        .grpc_client
+                        .clone()
+                        .read(proto::ReadBlobRequest {
+                            digest: digest.clone().into(),
+                        })
+                        .await
+                    {
+                        Ok(stream) => {
+                            let data_stream = stream.into_inner().map(|e| {
+                                e.map(|c| c.data)
+                                    .map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s))
+                            });
+
+                            // Use StreamReader::new to convert to an AsyncRead.
+                            let mut data_reader = tokio_util::io::StreamReader::new(data_stream);
+
+                            let mut buf = Vec::new();
+                            // TODO: only do this up to a certain limit.
+                            tokio::io::copy(&mut data_reader, &mut buf).await?;
+
+                            Ok(Some(Box::new(Cursor::new(buf))))
+                        }
+                        Err(e) if e.code() == Code::NotFound => Ok(None),
+                        Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+                    };
+                }
+
+                // The chunked case. Let ChunkedReader do individual reads.
+                // TODO: we should store the chunking data in some local cache,
+                // so `ChunkedReader` doesn't call `self.chunks` *again* for every chunk.
+                // Think about how store composition will fix this.
+                let chunked_reader = ChunkedReader::from_chunks(
+                    chunks.into_iter().map(|chunk| {
+                        (
+                            chunk.digest.try_into().expect("invalid b3 digest"),
+                            chunk.size,
+                        )
+                    }),
+                    Arc::new(self.clone()) as Arc<dyn BlobService>,
+                );
+                Ok(Some(Box::new(chunked_reader)))
+            }
+            Err(e) => Err(e)?,
+        }
+    }
+
+    /// Returns a BlobWriter, that'll internally wrap each write in a
+    /// [proto::BlobChunk], which is send to the gRPC server.
+    #[instrument(skip_all)]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        // set up an mpsc channel passing around Bytes.
+        let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10);
+
+        // bytes arriving on the RX side are wrapped inside a
+        // [proto::BlobChunk], and a [ReceiverStream] is constructed.
+        let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x });
+
+        // spawn the gRPC put request, which will read from blobchunk_stream.
+        let task = tokio::spawn({
+            let mut grpc_client = self.grpc_client.clone();
+            async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) }
+                // instrument the task with the current span, this is not done by default
+                .in_current_span()
+        });
+
+        // The tx part of the channel is converted to a sink of byte chunks.
+        let sink = PollSender::new(tx)
+            .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e));
+
+        // … which is turned into an [tokio::io::AsyncWrite].
+        let writer = SinkWriter::new(CopyToBytes::new(sink));
+
+        Box::new(GRPCBlobWriter {
+            task_and_writer: Some((task, writer)),
+            digest: None,
+        })
+    }
+
+    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
+        let resp = self
+            .grpc_client
+            .clone()
+            .stat(proto::StatBlobRequest {
+                digest: digest.clone().into(),
+                send_chunks: true,
+                ..Default::default()
+            })
+            .await;
+
+        match resp {
+            Err(e) if e.code() == Code::NotFound => Ok(None),
+            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+            Ok(resp) => {
+                let resp = resp.into_inner();
+
+                resp.validate()
+                    .map_err(|e| std::io::Error::new(io::ErrorKind::InvalidData, e))?;
+
+                Ok(Some(resp.chunks))
+            }
+        }
+    }
+}
+
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct GRPCBlobServiceConfig {
+    url: String,
+}
+
+impl TryFrom<url::Url> for GRPCBlobServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        //   normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
+        // - In the case of unix sockets, there must be a path, but may not be a host.
+        // - In the case of non-unix sockets, there must be a host, but no path.
+        // Constructing the channel is handled by tvix_castore::channel::from_url.
+        Ok(GRPCBlobServiceConfig {
+            url: url.to_string(),
+        })
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for GRPCBlobServiceConfig {
+    type Output = dyn BlobService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let client = proto::blob_service_client::BlobServiceClient::new(
+            crate::tonic::channel_from_url(&self.url.parse()?).await?,
+        );
+        Ok(Arc::new(GRPCBlobService::from_client(client)))
+    }
+}
+
+pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> {
+    /// The task containing the put request, and the inner writer, if we're still writing.
+    task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>,
+
+    /// The digest that has been returned, if we successfully closed.
+    digest: Option<B3Digest>,
+}
+
+#[async_trait]
+impl<W: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static> BlobWriter for GRPCBlobWriter<W> {
+    async fn close(&mut self) -> io::Result<B3Digest> {
+        if self.task_and_writer.is_none() {
+            // if we're already closed, return the b3 digest, which must exist.
+            // If it doesn't, we already closed and failed once, and didn't handle the error.
+            match &self.digest {
+                Some(digest) => Ok(digest.clone()),
+                None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "already closed")),
+            }
+        } else {
+            let (task, mut writer) = self.task_and_writer.take().unwrap();
+
+            // invoke shutdown, so the inner writer closes its internal tx side of
+            // the channel.
+            writer.shutdown().await?;
+
+            // block on the RPC call to return.
+            // This ensures all chunks are sent out, and have been received by the
+            // backend.
+
+            match task.await? {
+                Ok(resp) => {
+                    // return the digest from the response, and store it in self.digest for subsequent closes.
+                    let digest_len = resp.digest.len();
+                    let digest: B3Digest = resp.digest.try_into().map_err(|_| {
+                        io::Error::new(
+                            io::ErrorKind::Other,
+                            format!("invalid root digest length {} in response", digest_len),
+                        )
+                    })?;
+                    self.digest = Some(digest.clone());
+                    Ok(digest)
+                }
+                Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
+            }
+        }
+    }
+}
+
+impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for GRPCBlobWriter<W> {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
+        match &mut self.task_and_writer {
+            None => Poll::Ready(Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "already closed",
+            ))),
+            Some((_, ref mut writer)) => {
+                let pinned_writer = pin!(writer);
+                pinned_writer.poll_write(cx, buf)
+            }
+        }
+    }
+
+    fn poll_flush(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        match &mut self.task_and_writer {
+            None => Poll::Ready(Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "already closed",
+            ))),
+            Some((_, ref mut writer)) => {
+                let pinned_writer = pin!(writer);
+                pinned_writer.poll_flush(cx)
+            }
+        }
+    }
+
+    fn poll_shutdown(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        // TODO(raitobezarius): this might not be a graceful shutdown of the
+        // channel inside the gRPC connection.
+        Poll::Ready(Ok(()))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::time::Duration;
+
+    use tempfile::TempDir;
+    use tokio::net::UnixListener;
+    use tokio_retry::strategy::ExponentialBackoff;
+    use tokio_retry::Retry;
+    use tokio_stream::wrappers::UnixListenerStream;
+
+    use crate::blobservice::MemoryBlobService;
+    use crate::fixtures;
+    use crate::proto::blob_service_client::BlobServiceClient;
+    use crate::proto::GRPCBlobServiceWrapper;
+
+    use super::BlobService;
+    use super::GRPCBlobService;
+
+    /// This ensures connecting via gRPC works as expected.
+    #[tokio::test]
+    async fn test_valid_unix_path_ping_pong() {
+        let tmpdir = TempDir::new().unwrap();
+        let socket_path = tmpdir.path().join("daemon");
+
+        let path_clone = socket_path.clone();
+
+        // Spin up a server
+        tokio::spawn(async {
+            let uds = UnixListener::bind(path_clone).unwrap();
+            let uds_stream = UnixListenerStream::new(uds);
+
+            // spin up a new server
+            let mut server = tonic::transport::Server::builder();
+            let router =
+                server.add_service(crate::proto::blob_service_server::BlobServiceServer::new(
+                    GRPCBlobServiceWrapper::new(
+                        Box::<MemoryBlobService>::default() as Box<dyn BlobService>
+                    ),
+                ));
+            router.serve_with_incoming(uds_stream).await
+        });
+
+        // wait for the socket to be created
+        Retry::spawn(
+            ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)),
+            || async {
+                if socket_path.exists() {
+                    Ok(())
+                } else {
+                    Err(())
+                }
+            },
+        )
+        .await
+        .expect("failed to wait for socket");
+
+        // prepare a client
+        let grpc_client = {
+            let url = url::Url::parse(&format!(
+                "grpc+unix://{}?wait-connect=1",
+                socket_path.display()
+            ))
+            .expect("must parse");
+            let client = BlobServiceClient::new(
+                crate::tonic::channel_from_url(&url)
+                    .await
+                    .expect("must succeed"),
+            );
+            GRPCBlobService::from_client(client)
+        };
+
+        let has = grpc_client
+            .has(&fixtures::BLOB_A_DIGEST)
+            .await
+            .expect("must not be err");
+
+        assert!(!has);
+    }
+}
diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs
new file mode 100644
index 000000000000..3d733f950470
--- /dev/null
+++ b/tvix/castore/src/blobservice/memory.rs
@@ -0,0 +1,155 @@
+use parking_lot::RwLock;
+use std::io::{self, Cursor, Write};
+use std::task::Poll;
+use std::{collections::HashMap, sync::Arc};
+use tonic::async_trait;
+use tracing::instrument;
+
+use super::{BlobReader, BlobService, BlobWriter};
+use crate::composition::{CompositionContext, ServiceBuilder};
+use crate::{B3Digest, Error};
+
+#[derive(Clone, Default)]
+pub struct MemoryBlobService {
+    db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
+}
+
+#[async_trait]
+impl BlobService for MemoryBlobService {
+    #[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
+        let db = self.db.read();
+        Ok(db.contains_key(digest))
+    }
+
+    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
+        let db = self.db.read();
+
+        match db.get(digest).map(|x| Cursor::new(x.clone())) {
+            Some(result) => Ok(Some(Box::new(result))),
+            None => Ok(None),
+        }
+    }
+
+    #[instrument(skip_all)]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        Box::new(MemoryBlobWriter::new(self.db.clone()))
+    }
+}
+
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct MemoryBlobServiceConfig {}
+
+impl TryFrom<url::Url> for MemoryBlobServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        // memory doesn't support host or path in the URL.
+        if url.has_host() || !url.path().is_empty() {
+            return Err(Error::StorageError("invalid url".to_string()).into());
+        }
+        Ok(MemoryBlobServiceConfig {})
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for MemoryBlobServiceConfig {
+    type Output = dyn BlobService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        Ok(Arc::new(MemoryBlobService::default()))
+    }
+}
+
+pub struct MemoryBlobWriter {
+    db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
+
+    /// Contains the buffer Vec and hasher, or None if already closed
+    writers: Option<(Vec<u8>, blake3::Hasher)>,
+
+    /// The digest that has been returned, if we successfully closed.
+    digest: Option<B3Digest>,
+}
+
+impl MemoryBlobWriter {
+    fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self {
+        Self {
+            db,
+            writers: Some((Vec::new(), blake3::Hasher::new())),
+            digest: None,
+        }
+    }
+}
+impl tokio::io::AsyncWrite for MemoryBlobWriter {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+        b: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
+        Poll::Ready(match &mut self.writers {
+            None => Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "already closed",
+            )),
+            Some((ref mut buf, ref mut hasher)) => {
+                let bytes_written = buf.write(b)?;
+                hasher.write(&b[..bytes_written])
+            }
+        })
+    }
+
+    fn poll_flush(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        Poll::Ready(match self.writers {
+            None => Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "already closed",
+            )),
+            Some(_) => Ok(()),
+        })
+    }
+
+    fn poll_shutdown(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        // shutdown is "instantaneous", we only write to memory.
+        Poll::Ready(Ok(()))
+    }
+}
+
+#[async_trait]
+impl BlobWriter for MemoryBlobWriter {
+    async fn close(&mut self) -> io::Result<B3Digest> {
+        if self.writers.is_none() {
+            match &self.digest {
+                Some(digest) => Ok(digest.clone()),
+                None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "already closed")),
+            }
+        } else {
+            let (buf, hasher) = self.writers.take().unwrap();
+
+            let digest: B3Digest = hasher.finalize().as_bytes().into();
+
+            // Only insert if the blob doesn't already exist.
+            let mut db = self.db.upgradable_read();
+            if !db.contains_key(&digest) {
+                // open the database for writing.
+                db.with_upgraded(|db| {
+                    // and put buf in there. This will move buf out.
+                    db.insert(digest.clone(), buf);
+                });
+            }
+
+            self.digest = Some(digest.clone());
+
+            Ok(digest)
+        }
+    }
+}
diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs
new file mode 100644
index 000000000000..85292722fa7e
--- /dev/null
+++ b/tvix/castore/src/blobservice/mod.rs
@@ -0,0 +1,112 @@
+use std::io;
+
+use tonic::async_trait;
+
+use crate::composition::{Registry, ServiceBuilder};
+use crate::proto::stat_blob_response::ChunkMeta;
+use crate::B3Digest;
+
+mod chunked_reader;
+mod combinator;
+mod from_addr;
+mod grpc;
+mod memory;
+mod object_store;
+
+#[cfg(test)]
+pub mod tests;
+
+pub use self::chunked_reader::ChunkedReader;
+pub use self::combinator::{CombinedBlobService, CombinedBlobServiceConfig};
+pub use self::from_addr::from_addr;
+pub use self::grpc::{GRPCBlobService, GRPCBlobServiceConfig};
+pub use self::memory::{MemoryBlobService, MemoryBlobServiceConfig};
+pub use self::object_store::{ObjectStoreBlobService, ObjectStoreBlobServiceConfig};
+
+/// The base trait all BlobService services need to implement.
+/// It provides functions to check whether a given blob exists,
+/// a way to read (and seek) a blob, and a method to create a blobwriter handle,
+/// which will implement a writer interface, and also provides a close funtion,
+/// to finalize a blob and get its digest.
+#[async_trait]
+pub trait BlobService: Send + Sync {
+    /// Check if the service has the blob, by its content hash.
+    /// On implementations returning chunks, this must also work for chunks.
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool>;
+
+    /// Request a blob from the store, by its content hash.
+    /// On implementations returning chunks, this must also work for chunks.
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>>;
+
+    /// Insert a new blob into the store. Returns a [BlobWriter], which
+    /// implements [tokio::io::AsyncWrite] and a [BlobWriter::close] to finalize
+    /// the blob and get its digest.
+    async fn open_write(&self) -> Box<dyn BlobWriter>;
+
+    /// Return a list of chunks for a given blob.
+    /// There's a distinction between returning Ok(None) and Ok(Some(vec![])).
+    /// The former return value is sent in case the blob is not present at all,
+    /// while the second one is sent in case there's no more granular chunks (or
+    /// the backend does not support chunking).
+    /// A default implementation checking for existence and then returning it
+    /// does not have more granular chunks available is provided.
+    async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
+        if !self.has(digest).await? {
+            return Ok(None);
+        }
+        // default implementation, signalling the backend does not have more
+        // granular chunks available.
+        Ok(Some(vec![]))
+    }
+}
+
+#[async_trait]
+impl<A> BlobService for A
+where
+    A: AsRef<dyn BlobService> + Send + Sync,
+{
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
+        self.as_ref().has(digest).await
+    }
+
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
+        self.as_ref().open_read(digest).await
+    }
+
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        self.as_ref().open_write().await
+    }
+
+    async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
+        self.as_ref().chunks(digest).await
+    }
+}
+
+/// A [tokio::io::AsyncWrite] that the user needs to close() afterwards for persist.
+/// On success, it returns the digest of the written blob.
+#[async_trait]
+pub trait BlobWriter: tokio::io::AsyncWrite + Send + Unpin {
+    /// Signal there's no more data to be written, and return the digest of the
+    /// contents written.
+    ///
+    /// Closing a already-closed BlobWriter is a no-op.
+    async fn close(&mut self) -> io::Result<B3Digest>;
+}
+
+/// BlobReader is a [tokio::io::AsyncRead] that also allows seeking.
+pub trait BlobReader: tokio::io::AsyncRead + tokio::io::AsyncSeek + Send + Unpin + 'static {}
+
+/// A [`io::Cursor<Vec<u8>>`] can be used as a BlobReader.
+impl BlobReader for io::Cursor<&'static [u8]> {}
+impl BlobReader for io::Cursor<&'static [u8; 0]> {}
+impl BlobReader for io::Cursor<Vec<u8>> {}
+impl BlobReader for io::Cursor<bytes::Bytes> {}
+impl BlobReader for tokio::fs::File {}
+
+/// Registers the builtin BlobService implementations with the registry
+pub(crate) fn register_blob_services(reg: &mut Registry) {
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::ObjectStoreBlobServiceConfig>("objectstore");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::MemoryBlobServiceConfig>("memory");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::CombinedBlobServiceConfig>("combined");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn BlobService>>, super::blobservice::GRPCBlobServiceConfig>("grpc");
+}
diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs
new file mode 100644
index 000000000000..5bb05cf26123
--- /dev/null
+++ b/tvix/castore/src/blobservice/object_store.rs
@@ -0,0 +1,617 @@
+use std::{
+    collections::HashMap,
+    io::{self, Cursor},
+    pin::pin,
+    sync::Arc,
+    task::Poll,
+};
+
+use data_encoding::HEXLOWER;
+use fastcdc::v2020::AsyncStreamCDC;
+use futures::Future;
+use object_store::{path::Path, ObjectStore};
+use pin_project_lite::pin_project;
+use prost::Message;
+use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
+use tokio_stream::StreamExt;
+use tonic::async_trait;
+use tracing::{debug, instrument, trace, Level};
+use url::Url;
+
+use crate::{
+    composition::{CompositionContext, ServiceBuilder},
+    proto::{stat_blob_response::ChunkMeta, StatBlobResponse},
+    B3Digest, B3HashingReader, Error,
+};
+
+use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
+
+/// Uses any object storage supported by the [object_store] crate to provide a
+/// tvix-castore [BlobService].
+///
+/// # Data format
+/// Data is organized in "blobs" and "chunks".
+/// Blobs don't hold the actual data, but instead contain a list of more
+/// granular chunks that assemble to the contents requested.
+/// This allows clients to seek, and not download chunks they already have
+/// locally, as it's referred to from other files.
+/// Check `rpc_blobstore` and more general BlobStore docs on that.
+///
+/// ## Blobs
+/// Stored at `${base_path}/blobs/b3/$digest_key`. They contains the serialized
+/// StatBlobResponse for the blob with the digest.
+///
+/// ## Chunks
+/// Chunks are stored at `${base_path}/chunks/b3/$digest_key`. They contain
+/// the literal contents of the chunk, but are zstd-compressed.
+///
+/// ## Digest key sharding
+/// The blake3 digest encoded in lower hex, and sharded after the second
+/// character.
+/// The blob for "Hello World" is stored at
+/// `${base_path}/blobs/b3/41/41f8394111eb713a22165c46c90ab8f0fd9399c92028fd6d288944b23ff5bf76`.
+///
+/// This reduces the number of files in the same directory, which would be a
+/// problem at least when using [object_store::local::LocalFileSystem].
+///
+/// # Future changes
+/// There's no guarantees about this being a final format yet.
+/// Once object_store gets support for additional metadata / content-types,
+/// we can eliminate some requests (small blobs only consisting of a single
+/// chunk can be stored as-is, without the blob index file).
+/// It also allows signalling any compression of chunks in the content-type.
+/// Migration *should* be possible by simply adding the right content-types to
+/// all keys stored so far, but no promises ;-)
+#[derive(Clone)]
+pub struct ObjectStoreBlobService {
+    object_store: Arc<dyn ObjectStore>,
+    base_path: Path,
+
+    /// Average chunk size for FastCDC, in bytes.
+    /// min value is half, max value double of that number.
+    avg_chunk_size: u32,
+}
+
+#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,blob.digest=%digest),ret(Display))]
+fn derive_blob_path(base_path: &Path, digest: &B3Digest) -> Path {
+    base_path
+        .child("blobs")
+        .child("b3")
+        .child(HEXLOWER.encode(&digest.as_slice()[..2]))
+        .child(HEXLOWER.encode(digest.as_slice()))
+}
+
+#[instrument(level=Level::TRACE, skip_all,fields(base_path=%base_path,chunk.digest=%digest),ret(Display))]
+fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path {
+    base_path
+        .child("chunks")
+        .child("b3")
+        .child(HEXLOWER.encode(&digest.as_slice()[..2]))
+        .child(HEXLOWER.encode(digest.as_slice()))
+}
+
+#[async_trait]
+impl BlobService for ObjectStoreBlobService {
+    #[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
+        // TODO: clarify if this should work for chunks or not, and explicitly
+        // document in the proto docs.
+        let p = derive_blob_path(&self.base_path, digest);
+
+        match self.object_store.head(&p).await {
+            Ok(_) => Ok(true),
+            Err(object_store::Error::NotFound { .. }) => {
+                let p = derive_chunk_path(&self.base_path, digest);
+                match self.object_store.head(&p).await {
+                    Ok(_) => Ok(true),
+                    Err(object_store::Error::NotFound { .. }) => Ok(false),
+                    Err(e) => Err(e)?,
+                }
+            }
+            Err(e) => Err(e)?,
+        }
+    }
+
+    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
+        // handle reading the empty blob.
+        if digest.as_slice() == blake3::hash(b"").as_bytes() {
+            return Ok(Some(Box::new(Cursor::new(b"")) as Box<dyn BlobReader>));
+        }
+        match self
+            .object_store
+            .get(&derive_chunk_path(&self.base_path, digest))
+            .await
+        {
+            Ok(res) => {
+                // handle reading blobs that are small enough to fit inside a single chunk:
+                // fetch the entire chunk into memory, decompress, ensure the b3 digest matches,
+                // and return a io::Cursor over that data.
+                // FUTUREWORK: use zstd::bulk to prevent decompression bombs
+
+                let chunk_raw_bytes = res.bytes().await?;
+                let chunk_contents = zstd::stream::decode_all(Cursor::new(chunk_raw_bytes))?;
+
+                if *digest != blake3::hash(&chunk_contents).as_bytes().into() {
+                    Err(io::Error::other("chunk contents invalid"))?;
+                }
+
+                Ok(Some(Box::new(Cursor::new(chunk_contents))))
+            }
+            Err(object_store::Error::NotFound { .. }) => {
+                // NOTE: For public-facing things, we would want to stop here.
+                // Clients should fetch granularly, so they can make use of
+                // chunks they have locally.
+                // However, if this is used directly, without any caches, do the
+                // assembly here.
+                // This is subject to change, once we have store composition.
+                // TODO: make this configurable, and/or clarify behaviour for
+                // the gRPC server surface (explicitly document behaviour in the
+                // proto docs)
+                if let Some(chunks) = self.chunks(digest).await? {
+                    let chunked_reader = ChunkedReader::from_chunks(
+                        chunks.into_iter().map(|chunk| {
+                            (
+                                chunk.digest.try_into().expect("invalid b3 digest"),
+                                chunk.size,
+                            )
+                        }),
+                        Arc::new(self.clone()) as Arc<dyn BlobService>,
+                    );
+
+                    Ok(Some(Box::new(chunked_reader)))
+                } else {
+                    // This is neither a chunk nor a blob, return None.
+                    Ok(None)
+                }
+            }
+            Err(e) => Err(e.into()),
+        }
+    }
+
+    #[instrument(skip_all)]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        // ObjectStoreBlobWriter implements AsyncWrite, but all the chunking
+        // needs an AsyncRead, so we create a pipe here.
+        // In its `AsyncWrite` implementation, `ObjectStoreBlobWriter` delegates
+        // writes to w. It periodically polls the future that's reading from the
+        // other side.
+        let (w, r) = tokio::io::duplex(self.avg_chunk_size as usize * 10);
+
+        Box::new(ObjectStoreBlobWriter {
+            writer: Some(w),
+            fut: Some(Box::pin(chunk_and_upload(
+                r,
+                self.object_store.clone(),
+                self.base_path.clone(),
+                self.avg_chunk_size / 2,
+                self.avg_chunk_size,
+                self.avg_chunk_size * 2,
+            ))),
+            fut_output: None,
+        })
+    }
+
+    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
+        match self
+            .object_store
+            .get(&derive_blob_path(&self.base_path, digest))
+            .await
+        {
+            Ok(get_result) => {
+                // fetch the data at the blob path
+                let blob_data = get_result.bytes().await?;
+                // parse into StatBlobResponse
+                let stat_blob_response: StatBlobResponse = StatBlobResponse::decode(blob_data)?;
+
+                debug!(
+                    chunk.count = stat_blob_response.chunks.len(),
+                    blob.size = stat_blob_response
+                        .chunks
+                        .iter()
+                        .map(|x| x.size)
+                        .sum::<u64>(),
+                    "found more granular chunks"
+                );
+
+                Ok(Some(stat_blob_response.chunks))
+            }
+            Err(object_store::Error::NotFound { .. }) => {
+                // If there's only a chunk, we must return the empty vec here, rather than None.
+                match self
+                    .object_store
+                    .head(&derive_chunk_path(&self.base_path, digest))
+                    .await
+                {
+                    Ok(_) => {
+                        // present, but no more chunks available
+                        debug!("found a single chunk");
+                        Ok(Some(vec![]))
+                    }
+                    Err(object_store::Error::NotFound { .. }) => {
+                        // Neither blob nor single chunk found
+                        debug!("not found");
+                        Ok(None)
+                    }
+                    // error checking for chunk
+                    Err(e) => Err(e.into()),
+                }
+            }
+            // error checking for blob
+            Err(err) => Err(err.into()),
+        }
+    }
+}
+
+fn default_avg_chunk_size() -> u32 {
+    256 * 1024
+}
+
+#[derive(serde::Deserialize)]
+#[serde(deny_unknown_fields)]
+pub struct ObjectStoreBlobServiceConfig {
+    object_store_url: String,
+    #[serde(default = "default_avg_chunk_size")]
+    avg_chunk_size: u32,
+    object_store_options: HashMap<String, String>,
+}
+
+impl TryFrom<url::Url> for ObjectStoreBlobServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by
+    /// [object_store].
+    /// Any path suffix becomes the base path of the object store.
+    /// additional options, the same as in [object_store::parse_url_opts] can
+    /// be passed.
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        // We need to convert the URL to string, strip the prefix there, and then
+        // parse it back as url, as Url::set_scheme() rejects some of the transitions we want to do.
+        let trimmed_url = {
+            let s = url.to_string();
+            let mut url = Url::parse(
+                s.strip_prefix("objectstore+")
+                    .ok_or(Error::StorageError("Missing objectstore uri".into()))?,
+            )?;
+            // trim the query pairs, they might contain credentials or local settings we don't want to send as-is.
+            url.set_query(None);
+            url
+        };
+        Ok(ObjectStoreBlobServiceConfig {
+            object_store_url: trimmed_url.into(),
+            object_store_options: url
+                .query_pairs()
+                .into_iter()
+                .map(|(k, v)| (k.to_string(), v.to_string()))
+                .collect(),
+            avg_chunk_size: 256 * 1024,
+        })
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for ObjectStoreBlobServiceConfig {
+    type Output = dyn BlobService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let (object_store, path) = object_store::parse_url_opts(
+            &self.object_store_url.parse()?,
+            &self.object_store_options,
+        )?;
+        Ok(Arc::new(ObjectStoreBlobService {
+            object_store: Arc::new(object_store),
+            base_path: path,
+            avg_chunk_size: self.avg_chunk_size,
+        }))
+    }
+}
+
+/// Reads blob contents from a AsyncRead, chunks and uploads them.
+/// On success, returns a [StatBlobResponse] pointing to the individual chunks.
+#[instrument(skip_all, fields(base_path=%base_path, min_chunk_size, avg_chunk_size, max_chunk_size), err)]
+async fn chunk_and_upload<R: AsyncRead + Unpin>(
+    r: R,
+    object_store: Arc<dyn ObjectStore>,
+    base_path: Path,
+    min_chunk_size: u32,
+    avg_chunk_size: u32,
+    max_chunk_size: u32,
+) -> io::Result<B3Digest> {
+    // wrap reader with something calculating the blake3 hash of all data read.
+    let mut b3_r = B3HashingReader::from(r);
+    // set up a fastcdc chunker
+    let mut chunker =
+        AsyncStreamCDC::new(&mut b3_r, min_chunk_size, avg_chunk_size, max_chunk_size);
+
+    /// This really should just belong into the closure at
+    /// `chunker.as_stream().then(|_| { … })``, but if we try to, rustc spits
+    /// higher-ranked lifetime errors at us.
+    async fn fastcdc_chunk_uploader(
+        resp: Result<fastcdc::v2020::ChunkData, fastcdc::v2020::Error>,
+        base_path: Path,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> std::io::Result<ChunkMeta> {
+        let chunk_data = resp?;
+        let chunk_digest: B3Digest = blake3::hash(&chunk_data.data).as_bytes().into();
+        let chunk_path = derive_chunk_path(&base_path, &chunk_digest);
+
+        upload_chunk(object_store, chunk_digest, chunk_path, chunk_data.data).await
+    }
+
+    // Use the fastcdc chunker to produce a stream of chunks, and upload these
+    // that don't exist to the backend.
+    let chunks = chunker
+        .as_stream()
+        .then(|resp| fastcdc_chunk_uploader(resp, base_path.clone(), object_store.clone()))
+        .collect::<io::Result<Vec<ChunkMeta>>>()
+        .await?;
+
+    let chunks = if chunks.len() < 2 {
+        // The chunker returned only one chunk, which is the entire blob.
+        // According to the protocol, we must return an empty list of chunks
+        // when the blob is not split up further.
+        vec![]
+    } else {
+        chunks
+    };
+
+    let stat_blob_response = StatBlobResponse {
+        chunks,
+        bao: "".into(), // still todo
+    };
+
+    // check for Blob, if it doesn't exist, persist.
+    let blob_digest: B3Digest = b3_r.digest().into();
+    let blob_path = derive_blob_path(&base_path, &blob_digest);
+
+    match object_store.head(&blob_path).await {
+        // blob already exists, nothing to do
+        Ok(_) => {
+            trace!(
+                blob.digest = %blob_digest,
+                blob.path = %blob_path,
+                "blob already exists on backend"
+            );
+        }
+        // chunk does not yet exist, upload first
+        Err(object_store::Error::NotFound { .. }) => {
+            debug!(
+                blob.digest = %blob_digest,
+                blob.path = %blob_path,
+                "uploading blob"
+            );
+            object_store
+                .put(&blob_path, stat_blob_response.encode_to_vec().into())
+                .await?;
+        }
+        Err(err) => {
+            // other error
+            Err(err)?
+        }
+    }
+
+    Ok(blob_digest)
+}
+
+/// upload chunk if it doesn't exist yet.
+#[instrument(skip_all, fields(chunk.digest = %chunk_digest, chunk.size = chunk_data.len(), chunk.path = %chunk_path), err)]
+async fn upload_chunk(
+    object_store: Arc<dyn ObjectStore>,
+    chunk_digest: B3Digest,
+    chunk_path: Path,
+    chunk_data: Vec<u8>,
+) -> std::io::Result<ChunkMeta> {
+    let chunk_size = chunk_data.len();
+    match object_store.head(&chunk_path).await {
+        // chunk already exists, nothing to do
+        Ok(_) => {
+            debug!("chunk already exists");
+        }
+
+        // chunk does not yet exist, compress and upload.
+        Err(object_store::Error::NotFound { .. }) => {
+            let chunk_data_compressed =
+                zstd::encode_all(Cursor::new(chunk_data), zstd::DEFAULT_COMPRESSION_LEVEL)?;
+
+            debug!(chunk.compressed_size=%chunk_data_compressed.len(), "uploading chunk");
+
+            object_store
+                .as_ref()
+                .put(&chunk_path, chunk_data_compressed.into())
+                .await?;
+        }
+        // other error
+        Err(err) => Err(err)?,
+    }
+
+    Ok(ChunkMeta {
+        digest: chunk_digest.into(),
+        size: chunk_size as u64,
+    })
+}
+
+pin_project! {
+    /// Takes care of blob uploads.
+    /// All writes are relayed to self.writer, and we continuously poll the
+    /// future (which will internally read from the other side of the pipe and
+    /// upload chunks).
+    /// Our BlobWriter::close() needs to drop self.writer, so the other side
+    /// will read EOF and can finalize the blob.
+    /// The future should then resolve and return the blob digest.
+    pub struct ObjectStoreBlobWriter<W, Fut>
+    where
+        W: AsyncWrite,
+        Fut: Future,
+    {
+        #[pin]
+        writer: Option<W>,
+
+        #[pin]
+        fut: Option<Fut>,
+
+        fut_output: Option<io::Result<B3Digest>>
+    }
+}
+
+impl<W, Fut> tokio::io::AsyncWrite for ObjectStoreBlobWriter<W, Fut>
+where
+    W: AsyncWrite + Send + Unpin,
+    Fut: Future,
+{
+    fn poll_write(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
+        let this = self.project();
+        // poll the future.
+        let fut = this.fut.as_pin_mut().expect("not future");
+        let fut_p = fut.poll(cx);
+        // if it's ready, the only way this could have happened is that the
+        // upload failed, because we're only closing `self.writer` after all
+        // writes happened.
+        if fut_p.is_ready() {
+            return Poll::Ready(Err(io::Error::other("upload failed")));
+        }
+
+        // write to the underlying writer
+        this.writer
+            .as_pin_mut()
+            .expect("writer must be some")
+            .poll_write(cx, buf)
+    }
+
+    fn poll_flush(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        let this = self.project();
+        // poll the future.
+        let fut = this.fut.as_pin_mut().expect("not future");
+        let fut_p = fut.poll(cx);
+        // if it's ready, the only way this could have happened is that the
+        // upload failed, because we're only closing `self.writer` after all
+        // writes happened.
+        if fut_p.is_ready() {
+            return Poll::Ready(Err(io::Error::other("upload failed")));
+        }
+
+        // Call poll_flush on the writer
+        this.writer
+            .as_pin_mut()
+            .expect("writer must be some")
+            .poll_flush(cx)
+    }
+
+    fn poll_shutdown(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        // There's nothing to do on shutdown. We might have written some chunks
+        // that are nowhere else referenced, but cleaning them up here would be racy.
+        std::task::Poll::Ready(Ok(()))
+    }
+}
+
+#[async_trait]
+impl<W, Fut> BlobWriter for ObjectStoreBlobWriter<W, Fut>
+where
+    W: AsyncWrite + Send + Unpin,
+    Fut: Future<Output = io::Result<B3Digest>> + Send + Unpin,
+{
+    async fn close(&mut self) -> io::Result<B3Digest> {
+        match self.writer.take() {
+            Some(mut writer) => {
+                // shut down the writer, so the other side will read EOF.
+                writer.shutdown().await?;
+
+                // take out the future.
+                let fut = self.fut.take().expect("fut must be some");
+                // await it.
+                let resp = pin!(fut).await;
+
+                match resp.as_ref() {
+                    // In the case of an Ok value, we store it in self.fut_output,
+                    // so future calls to close can return that.
+                    Ok(b3_digest) => {
+                        self.fut_output = Some(Ok(b3_digest.clone()));
+                    }
+                    Err(e) => {
+                        // for the error type, we need to cheat a bit, as
+                        // they're not clone-able.
+                        // Simply store a sloppy clone, with the same ErrorKind and message there.
+                        self.fut_output = Some(Err(std::io::Error::new(e.kind(), e.to_string())))
+                    }
+                }
+                resp
+            }
+            None => {
+                // called a second time, return self.fut_output.
+                match self.fut_output.as_ref().unwrap() {
+                    Ok(ref b3_digest) => Ok(b3_digest.clone()),
+                    Err(e) => Err(std::io::Error::new(e.kind(), e.to_string())),
+                }
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::{chunk_and_upload, default_avg_chunk_size};
+    use crate::{
+        blobservice::{BlobService, ObjectStoreBlobService},
+        fixtures::{BLOB_A, BLOB_A_DIGEST, BLOB_B, BLOB_B_DIGEST},
+    };
+    use std::{io::Cursor, sync::Arc};
+    use url::Url;
+
+    /// Tests chunk_and_upload directly, bypassing the BlobWriter at open_write().
+    #[rstest::rstest]
+    #[case::a(&BLOB_A, &BLOB_A_DIGEST)]
+    #[case::b(&BLOB_B, &BLOB_B_DIGEST)]
+    #[tokio::test]
+    async fn test_chunk_and_upload(
+        #[case] blob: &bytes::Bytes,
+        #[case] blob_digest: &crate::B3Digest,
+    ) {
+        let (object_store, base_path) =
+            object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap();
+        let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store);
+        let blobsvc = Arc::new(ObjectStoreBlobService {
+            object_store: object_store.clone(),
+            avg_chunk_size: default_avg_chunk_size(),
+            base_path,
+        });
+
+        let inserted_blob_digest = chunk_and_upload(
+            &mut Cursor::new(blob.to_vec()),
+            object_store,
+            object_store::path::Path::from("/"),
+            1024 / 2,
+            1024,
+            1024 * 2,
+        )
+        .await
+        .expect("chunk_and_upload succeeds");
+
+        assert_eq!(blob_digest.clone(), inserted_blob_digest);
+
+        // Now we should have the blob
+        assert!(blobsvc.has(blob_digest).await.unwrap());
+
+        // Check if it was chunked correctly
+        let chunks = blobsvc.chunks(blob_digest).await.unwrap().unwrap();
+        if blob.len() < 1024 / 2 {
+            // The blob is smaller than the min chunk size, it should have been inserted as a whole
+            assert!(chunks.is_empty());
+        } else if blob.len() > 1024 * 2 {
+            // The blob is larger than the max chunk size, make sure it was split up into at least
+            // two chunks
+            assert!(chunks.len() >= 2);
+        }
+    }
+}
diff --git a/tvix/castore/src/blobservice/tests/mod.rs b/tvix/castore/src/blobservice/tests/mod.rs
new file mode 100644
index 000000000000..0280faebb171
--- /dev/null
+++ b/tvix/castore/src/blobservice/tests/mod.rs
@@ -0,0 +1,253 @@
+//! This contains test scenarios that a given [BlobService] needs to pass.
+//! We use [rstest] and [rstest_reuse] to provide all services we want to test
+//! against, and then apply this template to all test functions.
+
+use rstest::*;
+use rstest_reuse::{self, *};
+use std::io;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncSeekExt;
+
+use super::BlobService;
+use crate::blobservice;
+use crate::fixtures::BLOB_A;
+use crate::fixtures::BLOB_A_DIGEST;
+use crate::fixtures::BLOB_B;
+use crate::fixtures::BLOB_B_DIGEST;
+
+mod utils;
+use self::utils::make_grpc_blob_service_client;
+
+/// This produces a template, which will be applied to all individual test functions.
+/// See https://github.com/la10736/rstest/issues/130#issuecomment-968864832
+#[template]
+#[rstest]
+#[case::grpc(make_grpc_blob_service_client().await)]
+#[case::memory(blobservice::from_addr("memory://").await.unwrap())]
+#[case::objectstore_memory(blobservice::from_addr("objectstore+memory://").await.unwrap())]
+pub fn blob_services(#[case] blob_service: impl BlobService) {}
+
+/// Using [BlobService::has] on a non-existing blob should return false.
+#[apply(blob_services)]
+#[tokio::test]
+async fn has_nonexistent_false(blob_service: impl BlobService) {
+    assert!(!blob_service
+        .has(&BLOB_A_DIGEST)
+        .await
+        .expect("must not fail"));
+}
+
+/// Using [BlobService::chunks] on a non-existing blob should return Ok(None)
+#[apply(blob_services)]
+#[tokio::test]
+async fn chunks_nonexistent_false(blob_service: impl BlobService) {
+    assert!(blob_service
+        .chunks(&BLOB_A_DIGEST)
+        .await
+        .expect("must be ok")
+        .is_none());
+}
+
+// TODO: do tests with `chunks`
+
+/// Trying to read a non-existing blob should return a None instead of a reader.
+#[apply(blob_services)]
+#[tokio::test]
+async fn not_found_read(blob_service: impl BlobService) {
+    assert!(blob_service
+        .open_read(&BLOB_A_DIGEST)
+        .await
+        .expect("must not fail")
+        .is_none())
+}
+
+/// Put a blob in the store, check has, get it back.
+#[apply(blob_services)]
+// #[case::small(&fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST)]
+// #[case::big(&fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST)]
+#[tokio::test]
+async fn put_has_get(blob_service: impl BlobService) {
+    // TODO: figure out how to instantiate this with BLOB_A and BLOB_B, as two separate cases
+    for (blob_contents, blob_digest) in &[
+        (&*BLOB_A, BLOB_A_DIGEST.clone()),
+        (&*BLOB_B, BLOB_B_DIGEST.clone()),
+    ] {
+        let mut w = blob_service.open_write().await;
+
+        let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w)
+            .await
+            .expect("copy must succeed");
+        assert_eq!(
+            blob_contents.len(),
+            l as usize,
+            "written bytes must match blob length"
+        );
+
+        let digest = w.close().await.expect("close must succeed");
+
+        assert_eq!(*blob_digest, digest, "returned digest must be correct");
+
+        assert!(
+            blob_service.has(blob_digest).await.expect("must not fail"),
+            "blob service should now have the blob"
+        );
+
+        let mut r = blob_service
+            .open_read(blob_digest)
+            .await
+            .expect("open_read must succeed")
+            .expect("must be some");
+
+        let mut buf: Vec<u8> = Vec::new();
+        let mut pinned_reader = std::pin::pin!(r);
+        let l = tokio::io::copy(&mut pinned_reader, &mut buf)
+            .await
+            .expect("copy must succeed");
+
+        assert_eq!(
+            blob_contents.len(),
+            l as usize,
+            "read bytes must match blob length"
+        );
+
+        assert_eq!(&blob_contents[..], &buf, "read blob contents must match");
+    }
+}
+
+/// Put a blob in the store, and seek inside it a bit.
+#[apply(blob_services)]
+#[tokio::test]
+async fn put_seek(blob_service: impl BlobService) {
+    let mut w = blob_service.open_write().await;
+
+    tokio::io::copy(&mut io::Cursor::new(&BLOB_B.to_vec()), &mut w)
+        .await
+        .expect("copy must succeed");
+    w.close().await.expect("close must succeed");
+
+    // open a blob for reading
+    let mut r = blob_service
+        .open_read(&BLOB_B_DIGEST)
+        .await
+        .expect("open_read must succeed")
+        .expect("must be some");
+
+    let mut pos: u64 = 0;
+
+    // read the first 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).await.expect("must succeed");
+
+        assert_eq!(
+            &BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected first 10 bytes to match"
+        );
+
+        pos += buf.len() as u64;
+    }
+    // seek by 0 bytes, using SeekFrom::Start.
+    let p = r
+        .seek(io::SeekFrom::Start(pos))
+        .await
+        .expect("must not fail");
+    assert_eq!(pos, p);
+
+    // read the next 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).await.expect("must succeed");
+
+        assert_eq!(
+            &BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected data to match"
+        );
+
+        pos += buf.len() as u64;
+    }
+
+    // seek by 5 bytes, using SeekFrom::Start.
+    let p = r
+        .seek(io::SeekFrom::Start(pos + 5))
+        .await
+        .expect("must not fail");
+    pos += 5;
+    assert_eq!(pos, p);
+
+    // read the next 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).await.expect("must succeed");
+
+        assert_eq!(
+            &BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected data to match"
+        );
+
+        pos += buf.len() as u64;
+    }
+
+    // seek by 12345 bytes, using SeekFrom::
+    let p = r
+        .seek(io::SeekFrom::Current(12345))
+        .await
+        .expect("must not fail");
+    pos += 12345;
+    assert_eq!(pos, p);
+
+    // read the next 10 bytes, they must match the data in the fixture.
+    {
+        let mut buf = [0; 10];
+        r.read_exact(&mut buf).await.expect("must succeed");
+
+        assert_eq!(
+            &BLOB_B[pos as usize..pos as usize + buf.len()],
+            buf,
+            "expected data to match"
+        );
+
+        #[allow(unused_assignments)]
+        {
+            pos += buf.len() as u64;
+        }
+    }
+
+    // seeking to the end is okay…
+    let p = r
+        .seek(io::SeekFrom::Start(BLOB_B.len() as u64))
+        .await
+        .expect("must not fail");
+    pos = BLOB_B.len() as u64;
+    assert_eq!(pos, p);
+
+    {
+        // but it returns no more data.
+        let mut buf: Vec<u8> = Vec::new();
+        r.read_to_end(&mut buf).await.expect("must not fail");
+        assert!(buf.is_empty(), "expected no more data to be read");
+    }
+
+    // seeking past the end…
+    // should either be ok, but then return 0 bytes.
+    // this matches the behaviour or a Cursor<Vec<u8>>.
+    if let Ok(_pos) = r.seek(io::SeekFrom::Start(BLOB_B.len() as u64 + 1)).await {
+        let mut buf: Vec<u8> = Vec::new();
+        r.read_to_end(&mut buf).await.expect("must not fail");
+        assert!(buf.is_empty(), "expected no more data to be read");
+    }
+    // or not be okay.
+
+    // TODO: this is only broken for the gRPC version
+    // We expect seeking backwards or relative to the end to fail.
+    // r.seek(io::SeekFrom::Current(-1))
+    //     .expect_err("SeekFrom::Current(-1) expected to fail");
+
+    // r.seek(io::SeekFrom::Start(pos - 1))
+    //     .expect_err("SeekFrom::Start(pos-1) expected to fail");
+
+    // r.seek(io::SeekFrom::End(0))
+    //     .expect_err("SeekFrom::End(_) expected to fail");
+}
diff --git a/tvix/castore/src/blobservice/tests/utils.rs b/tvix/castore/src/blobservice/tests/utils.rs
new file mode 100644
index 000000000000..7df4f00d3a09
--- /dev/null
+++ b/tvix/castore/src/blobservice/tests/utils.rs
@@ -0,0 +1,42 @@
+use crate::blobservice::{BlobService, MemoryBlobService};
+use crate::proto::blob_service_client::BlobServiceClient;
+use crate::proto::GRPCBlobServiceWrapper;
+use crate::{blobservice::GRPCBlobService, proto::blob_service_server::BlobServiceServer};
+use hyper_util::rt::TokioIo;
+use tonic::transport::{Endpoint, Server, Uri};
+
+/// Constructs and returns a gRPC BlobService.
+/// The server part is a [MemoryBlobService], exposed via the
+/// [GRPCBlobServiceWrapper], and connected through a DuplexStream
+pub async fn make_grpc_blob_service_client() -> Box<dyn BlobService> {
+    let (left, right) = tokio::io::duplex(64);
+
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn(async {
+        let blob_service = Box::<MemoryBlobService>::default() as Box<dyn BlobService>;
+
+        // spin up a new DirectoryService
+        let mut server = Server::builder();
+        let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
+            blob_service,
+        )));
+
+        router
+            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+            .await
+    });
+
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+
+    Box::new(GRPCBlobService::from_client(BlobServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(TokioIo::new(right)) }
+            }))
+            .await
+            .unwrap(),
+    )))
+}