about summary refs log tree commit diff
path: root/tvix/castore/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src')
-rw-r--r--tvix/castore/src/blobservice/combinator.rs136
-rw-r--r--tvix/castore/src/blobservice/from_addr.rs121
-rw-r--r--tvix/castore/src/blobservice/grpc.rs316
-rw-r--r--tvix/castore/src/blobservice/memory.rs137
-rw-r--r--tvix/castore/src/blobservice/mod.rs79
-rw-r--r--tvix/castore/src/blobservice/naive_seeker.rs269
-rw-r--r--tvix/castore/src/blobservice/simplefs.rs195
-rw-r--r--tvix/castore/src/blobservice/sled.rs150
-rw-r--r--tvix/castore/src/blobservice/tests.rs243
-rw-r--r--tvix/castore/src/digests.rs73
-rw-r--r--tvix/castore/src/directoryservice/from_addr.rs120
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs505
-rw-r--r--tvix/castore/src/directoryservice/memory.rs86
-rw-r--r--tvix/castore/src/directoryservice/mod.rs76
-rw-r--r--tvix/castore/src/directoryservice/sled.rs112
-rw-r--r--tvix/castore/src/directoryservice/traverse.rs231
-rw-r--r--tvix/castore/src/directoryservice/utils.rs134
-rw-r--r--tvix/castore/src/errors.rs61
-rw-r--r--tvix/castore/src/fixtures.rs88
-rw-r--r--tvix/castore/src/fs/file_attr.rs53
-rw-r--r--tvix/castore/src/fs/fuse.rs115
-rw-r--r--tvix/castore/src/fs/inode_tracker.rs207
-rw-r--r--tvix/castore/src/fs/inodes.rs57
-rw-r--r--tvix/castore/src/fs/mod.rs628
-rw-r--r--tvix/castore/src/fs/root_nodes.rs37
-rw-r--r--tvix/castore/src/fs/tests.rs1141
-rw-r--r--tvix/castore/src/fs/virtiofs.rs237
-rw-r--r--tvix/castore/src/import.rs384
-rw-r--r--tvix/castore/src/lib.rs20
-rw-r--r--tvix/castore/src/proto/grpc_blobservice_wrapper.rs168
-rw-r--r--tvix/castore/src/proto/grpc_directoryservice_wrapper.rs178
-rw-r--r--tvix/castore/src/proto/mod.rs387
-rw-r--r--tvix/castore/src/proto/tests/directory.rs373
-rw-r--r--tvix/castore/src/proto/tests/directory_nodes_iterator.rs78
-rw-r--r--tvix/castore/src/proto/tests/grpc_blobservice.rs94
-rw-r--r--tvix/castore/src/proto/tests/grpc_directoryservice.rs246
-rw-r--r--tvix/castore/src/proto/tests/mod.rs4
-rw-r--r--tvix/castore/src/tests/import.rs126
-rw-r--r--tvix/castore/src/tests/mod.rs1
-rw-r--r--tvix/castore/src/tonic.rs118
-rw-r--r--tvix/castore/src/utils.rs89
41 files changed, 7873 insertions, 0 deletions
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs
new file mode 100644
index 000000000000..d75cad62a629
--- /dev/null
+++ b/tvix/castore/src/blobservice/combinator.rs
@@ -0,0 +1,136 @@
+use data_encoding::BASE64;
+use futures::{StreamExt, TryStreamExt};
+use tokio_util::io::{ReaderStream, StreamReader};
+use tonic::async_trait;
+use tracing::{instrument, warn};
+
+use crate::B3Digest;
+
+use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
+
+/// Combinator for a BlobService, using a "local" and "remote" blobservice.
+/// Requests are tried in (and returned from) the local store first, only if
+/// things are not present there, the remote BlobService is queried.
+/// In case the local blobservice doesn't have the blob, we ask the remote
+/// blobservice for chunks, and try to read each of these chunks from the local
+/// blobservice again, before falling back to the remote one.
+/// The remote BlobService is never written to.
+pub struct CombinedBlobService<BL, BR> {
+    local: BL,
+    remote: BR,
+}
+
+impl<BL, BR> Clone for CombinedBlobService<BL, BR>
+where
+    BL: Clone,
+    BR: Clone,
+{
+    fn clone(&self) -> Self {
+        Self {
+            local: self.local.clone(),
+            remote: self.remote.clone(),
+        }
+    }
+}
+
+#[async_trait]
+impl<BL, BR> BlobService for CombinedBlobService<BL, BR>
+where
+    BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
+    BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
+{
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> {
+        Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?)
+    }
+
+    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> {
+        if self.local.as_ref().has(digest).await? {
+            // local store has the blob, so we can assume it also has all chunks.
+            self.local.as_ref().open_read(digest).await
+        } else {
+            // Local store doesn't have the blob.
+            // Ask the remote one for the list of chunks,
+            // and create a chunked reader that uses self.open_read() for
+            // individual chunks. There's a chance we already have some chunks
+            // locally, meaning we don't need to fetch them all from the remote
+            // BlobService.
+            match self.remote.as_ref().chunks(digest).await? {
+                // blob doesn't exist on the remote side either, nothing we can do.
+                None => Ok(None),
+                Some(remote_chunks) => {
+                    // if there's no more granular chunks, or the remote
+                    // blobservice doesn't support chunks, read the blob from
+                    // the remote blobservice directly.
+                    if remote_chunks.is_empty() {
+                        return self.remote.as_ref().open_read(digest).await;
+                    }
+                    // otherwise, a chunked reader, which will always try the
+                    // local backend first.
+
+                    // map Vec<ChunkMeta> to Vec<(B3Digest, u64)>
+                    let chunks: Vec<(B3Digest, u64)> = remote_chunks
+                        .into_iter()
+                        .map(|chunk_meta| {
+                            (
+                                B3Digest::try_from(chunk_meta.digest)
+                                    .expect("invalid chunk digest"),
+                                chunk_meta.size,
+                            )
+                        })
+                        .collect();
+
+                    Ok(Some(make_chunked_reader(self.clone(), chunks)))
+                }
+            }
+        }
+    }
+
+    #[instrument(skip_all)]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        // direct writes to the local one.
+        self.local.as_ref().open_write().await
+    }
+}
+
+fn make_chunked_reader<BS>(
+    // This must consume, as we can't retain references to blob_service,
+    // as it'd add a lifetime to BlobReader in general, which will get
+    // problematic in TvixStoreFs, which is using async move closures and cloning.
+    blob_service: BS,
+    // A list of b3 digests for individual chunks, and their sizes.
+    chunks: Vec<(B3Digest, u64)>,
+) -> Box<dyn BlobReader>
+where
+    BS: BlobService + Clone + 'static,
+{
+    // TODO: offset, verified streaming
+
+    // construct readers for each chunk
+    let blob_service = blob_service.clone();
+    let readers_stream = tokio_stream::iter(chunks).map(move |(digest, _)| {
+        let d = digest.to_owned();
+        let blob_service = blob_service.clone();
+        async move {
+            blob_service.open_read(&d.to_owned()).await?.ok_or_else(|| {
+                warn!(
+                    chunk.digest = BASE64.encode(digest.as_slice()),
+                    "chunk not found"
+                );
+                std::io::Error::new(std::io::ErrorKind::NotFound, "chunk not found")
+            })
+        }
+    });
+
+    // convert the stream of readers to a stream of streams of byte chunks
+    let bytes_streams = readers_stream.then(|elem| async { elem.await.map(ReaderStream::new) });
+
+    // flatten into one stream of byte chunks
+    let bytes_stream = bytes_streams.try_flatten();
+
+    // convert into AsyncRead
+    let blob_reader = StreamReader::new(bytes_stream);
+
+    Box::new(NaiveSeeker::new(Box::pin(blob_reader)))
+}
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs
new file mode 100644
index 000000000000..db221f05ab3b
--- /dev/null
+++ b/tvix/castore/src/blobservice/from_addr.rs
@@ -0,0 +1,121 @@
+use url::Url;
+
+use crate::{proto::blob_service_client::BlobServiceClient, Error};
+
+use super::{
+    BlobService, GRPCBlobService, MemoryBlobService, SimpleFilesystemBlobService, SledBlobService,
+};
+
+/// Constructs a new instance of a [BlobService] from an URI.
+///
+/// The following schemes are supported by the following services:
+/// - `memory://` ([MemoryBlobService])
+/// - `sled://` ([SledBlobService])
+/// - `grpc+*://` ([GRPCBlobService])
+/// - `simplefs://` ([SimpleFilesystemBlobService])
+///
+/// See their `from_url` methods for more details about their syntax.
+pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error> {
+    let url = Url::parse(uri)
+        .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?;
+
+    Ok(if url.scheme() == "memory" {
+        // memory doesn't support host or path in the URL.
+        if url.has_host() || !url.path().is_empty() {
+            return Err(Error::StorageError("invalid url".to_string()));
+        }
+        Box::<MemoryBlobService>::default()
+    } else if url.scheme() == "sled" {
+        // sled doesn't support host, and a path can be provided (otherwise
+        // it'll live in memory only).
+        if url.has_host() {
+            return Err(Error::StorageError("no host allowed".to_string()));
+        }
+
+        if url.path() == "/" {
+            return Err(Error::StorageError(
+                "cowardly refusing to open / with sled".to_string(),
+            ));
+        }
+
+        // TODO: expose other parameters as URL parameters?
+
+        if url.path().is_empty() {
+            return Ok(Box::new(
+                SledBlobService::new_temporary().map_err(|e| Error::StorageError(e.to_string()))?,
+            ));
+        }
+        return Ok(Box::new(
+            SledBlobService::new(url.path()).map_err(|e| Error::StorageError(e.to_string()))?,
+        ));
+    } else if url.scheme().starts_with("grpc+") {
+        // schemes starting with grpc+ go to the GRPCPathInfoService.
+        //   That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
+        // - In the case of unix sockets, there must be a path, but may not be a host.
+        // - In the case of non-unix sockets, there must be a host, but no path.
+        // Constructing the channel is handled by tvix_castore::channel::from_url.
+        let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?);
+        Box::new(GRPCBlobService::from_client(client))
+    } else if url.scheme() == "simplefs" {
+        if url.path().is_empty() {
+            return Err(Error::StorageError("Invalid filesystem path".to_string()));
+        }
+
+        Box::new(SimpleFilesystemBlobService::new(url.path().into()).await?)
+    } else {
+        Err(crate::Error::StorageError(format!(
+            "unknown scheme: {}",
+            url.scheme()
+        )))?
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::from_addr;
+    use lazy_static::lazy_static;
+    use tempfile::TempDir;
+    use test_case::test_case;
+
+    lazy_static! {
+        static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap();
+        static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap();
+    }
+
+    /// This uses an unsupported scheme.
+    #[test_case("http://foo.example/test", false; "unsupported scheme")]
+    /// This configures sled in temporary mode.
+    #[test_case("sled://", true; "sled valid temporary")]
+    /// This configures sled with /, which should fail.
+    #[test_case("sled:///", false; "sled invalid root")]
+    /// This configures sled with a host, not path, which should fail.
+    #[test_case("sled://foo.example", false; "sled invalid host")]
+    /// This configures sled with a valid path path, which should succeed.
+    #[test_case(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true; "sled valid path")]
+    /// This configures sled with a host, and a valid path path, which should fail.
+    #[test_case(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false; "sled invalid host with valid path")]
+    /// This correctly sets the scheme, and doesn't set a path.
+    #[test_case("memory://", true; "memory valid")]
+    /// This sets a memory url host to `foo`
+    #[test_case("memory://foo", false; "memory invalid host")]
+    /// This sets a memory url path to "/", which is invalid.
+    #[test_case("memory:///", false; "memory invalid root path")]
+    /// This sets a memory url path to "/foo", which is invalid.
+    #[test_case("memory:///foo", false; "memory invalid root path foo")]
+    /// Correct scheme to connect to a unix socket.
+    #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")]
+    /// Correct scheme for unix socket, but setting a host too, which is invalid.
+    #[test_case("grpc+unix://host.example/path/to/somewhere", false; "grpc invalid unix socket and host")]
+    /// Correct scheme to connect to localhost, with port 12345
+    #[test_case("grpc+http://[::1]:12345", true; "grpc valid IPv6 localhost port 12345")]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[test_case("grpc+http://localhost", true; "grpc valid http host without port")]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[test_case("grpc+https://localhost", true; "grpc valid https host without port")]
+    /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
+    #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")]
+    #[tokio::test]
+    async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) {
+        assert_eq!(from_addr(uri_str).await.is_ok(), is_ok)
+    }
+}
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
new file mode 100644
index 000000000000..d98a9b517724
--- /dev/null
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -0,0 +1,316 @@
+use super::{naive_seeker::NaiveSeeker, BlobReader, BlobService, BlobWriter};
+use crate::{
+    proto::{self, stat_blob_response::ChunkMeta},
+    B3Digest,
+};
+use futures::sink::SinkExt;
+use std::{io, pin::pin, task::Poll};
+use tokio::io::AsyncWriteExt;
+use tokio::task::JoinHandle;
+use tokio_stream::{wrappers::ReceiverStream, StreamExt};
+use tokio_util::{
+    io::{CopyToBytes, SinkWriter},
+    sync::PollSender,
+};
+use tonic::{async_trait, transport::Channel, Code, Status};
+use tracing::{instrument, warn};
+
+/// Connects to a (remote) tvix-store BlobService over gRPC.
+#[derive(Clone)]
+pub struct GRPCBlobService {
+    /// The internal reference to a gRPC client.
+    /// Cloning it is cheap, and it internally handles concurrent requests.
+    grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
+}
+
+impl GRPCBlobService {
+    /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient].
+    /// panics if called outside the context of a tokio runtime.
+    pub fn from_client(
+        grpc_client: proto::blob_service_client::BlobServiceClient<Channel>,
+    ) -> Self {
+        Self { grpc_client }
+    }
+}
+
+#[async_trait]
+impl BlobService for GRPCBlobService {
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
+        let mut grpc_client = self.grpc_client.clone();
+        let resp = grpc_client
+            .stat(proto::StatBlobRequest {
+                digest: digest.clone().into(),
+                ..Default::default()
+            })
+            .await;
+
+        match resp {
+            Ok(_blob_meta) => Ok(true),
+            Err(e) if e.code() == Code::NotFound => Ok(false),
+            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+        }
+    }
+
+    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
+        // Get a stream of [proto::BlobChunk], or return an error if the blob
+        // doesn't exist.
+        match self
+            .grpc_client
+            .clone()
+            .read(proto::ReadBlobRequest {
+                digest: digest.clone().into(),
+            })
+            .await
+        {
+            Ok(stream) => {
+                // on success, this is a stream of tonic::Result<proto::BlobChunk>,
+                // so access .data and map errors into std::io::Error.
+                let data_stream = stream.into_inner().map(|e| {
+                    e.map(|c| c.data)
+                        .map_err(|s| std::io::Error::new(io::ErrorKind::InvalidData, s))
+                });
+
+                // Use StreamReader::new to convert to an AsyncRead.
+                let data_reader = tokio_util::io::StreamReader::new(data_stream);
+
+                Ok(Some(Box::new(NaiveSeeker::new(data_reader))))
+            }
+            Err(e) if e.code() == Code::NotFound => Ok(None),
+            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+        }
+    }
+
+    /// Returns a BlobWriter, that'll internally wrap each write in a
+    /// [proto::BlobChunk], which is send to the gRPC server.
+    #[instrument(skip_all)]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        // set up an mpsc channel passing around Bytes.
+        let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10);
+
+        // bytes arriving on the RX side are wrapped inside a
+        // [proto::BlobChunk], and a [ReceiverStream] is constructed.
+        let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x });
+
+        // spawn the gRPC put request, which will read from blobchunk_stream.
+        let task = tokio::spawn({
+            let mut grpc_client = self.grpc_client.clone();
+            async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) }
+        });
+
+        // The tx part of the channel is converted to a sink of byte chunks.
+        let sink = PollSender::new(tx)
+            .sink_map_err(|e| std::io::Error::new(std::io::ErrorKind::BrokenPipe, e));
+
+        // … which is turned into an [tokio::io::AsyncWrite].
+        let writer = SinkWriter::new(CopyToBytes::new(sink));
+
+        Box::new(GRPCBlobWriter {
+            task_and_writer: Some((task, writer)),
+            digest: None,
+        })
+    }
+
+    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
+        let resp = self
+            .grpc_client
+            .clone()
+            .stat(proto::StatBlobRequest {
+                digest: digest.clone().into(),
+                send_chunks: true,
+                ..Default::default()
+            })
+            .await;
+
+        match resp {
+            Err(e) if e.code() == Code::NotFound => Ok(None),
+            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e)),
+            Ok(resp) => {
+                let resp = resp.into_inner();
+
+                resp.validate()
+                    .map_err(|e| std::io::Error::new(io::ErrorKind::InvalidData, e))?;
+
+                if resp.chunks.is_empty() {
+                    warn!("chunk list is empty");
+                }
+                Ok(Some(resp.chunks))
+            }
+        }
+    }
+}
+
+pub struct GRPCBlobWriter<W: tokio::io::AsyncWrite> {
+    /// The task containing the put request, and the inner writer, if we're still writing.
+    task_and_writer: Option<(JoinHandle<Result<proto::PutBlobResponse, Status>>, W)>,
+
+    /// The digest that has been returned, if we successfully closed.
+    digest: Option<B3Digest>,
+}
+
+#[async_trait]
+impl<W: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static> BlobWriter for GRPCBlobWriter<W> {
+    async fn close(&mut self) -> io::Result<B3Digest> {
+        if self.task_and_writer.is_none() {
+            // if we're already closed, return the b3 digest, which must exist.
+            // If it doesn't, we already closed and failed once, and didn't handle the error.
+            match &self.digest {
+                Some(digest) => Ok(digest.clone()),
+                None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "already closed")),
+            }
+        } else {
+            let (task, mut writer) = self.task_and_writer.take().unwrap();
+
+            // invoke shutdown, so the inner writer closes its internal tx side of
+            // the channel.
+            writer.shutdown().await?;
+
+            // block on the RPC call to return.
+            // This ensures all chunks are sent out, and have been received by the
+            // backend.
+
+            match task.await? {
+                Ok(resp) => {
+                    // return the digest from the response, and store it in self.digest for subsequent closes.
+                    let digest_len = resp.digest.len();
+                    let digest: B3Digest = resp.digest.try_into().map_err(|_| {
+                        io::Error::new(
+                            io::ErrorKind::Other,
+                            format!("invalid root digest length {} in response", digest_len),
+                        )
+                    })?;
+                    self.digest = Some(digest.clone());
+                    Ok(digest)
+                }
+                Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
+            }
+        }
+    }
+}
+
+impl<W: tokio::io::AsyncWrite + Unpin> tokio::io::AsyncWrite for GRPCBlobWriter<W> {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
+        match &mut self.task_and_writer {
+            None => Poll::Ready(Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "already closed",
+            ))),
+            Some((_, ref mut writer)) => {
+                let pinned_writer = pin!(writer);
+                pinned_writer.poll_write(cx, buf)
+            }
+        }
+    }
+
+    fn poll_flush(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        match &mut self.task_and_writer {
+            None => Poll::Ready(Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "already closed",
+            ))),
+            Some((_, ref mut writer)) => {
+                let pinned_writer = pin!(writer);
+                pinned_writer.poll_flush(cx)
+            }
+        }
+    }
+
+    fn poll_shutdown(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        // TODO(raitobezarius): this might not be a graceful shutdown of the
+        // channel inside the gRPC connection.
+        Poll::Ready(Ok(()))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::time::Duration;
+
+    use tempfile::TempDir;
+    use tokio::net::UnixListener;
+    use tokio_retry::strategy::ExponentialBackoff;
+    use tokio_retry::Retry;
+    use tokio_stream::wrappers::UnixListenerStream;
+
+    use crate::blobservice::MemoryBlobService;
+    use crate::fixtures;
+    use crate::proto::blob_service_client::BlobServiceClient;
+    use crate::proto::GRPCBlobServiceWrapper;
+
+    use super::BlobService;
+    use super::GRPCBlobService;
+
+    /// This ensures connecting via gRPC works as expected.
+    #[tokio::test]
+    async fn test_valid_unix_path_ping_pong() {
+        let tmpdir = TempDir::new().unwrap();
+        let socket_path = tmpdir.path().join("daemon");
+
+        let path_clone = socket_path.clone();
+
+        // Spin up a server
+        tokio::spawn(async {
+            let uds = UnixListener::bind(path_clone).unwrap();
+            let uds_stream = UnixListenerStream::new(uds);
+
+            // spin up a new server
+            let mut server = tonic::transport::Server::builder();
+            let router =
+                server.add_service(crate::proto::blob_service_server::BlobServiceServer::new(
+                    GRPCBlobServiceWrapper::new(
+                        Box::<MemoryBlobService>::default() as Box<dyn BlobService>
+                    ),
+                ));
+            router.serve_with_incoming(uds_stream).await
+        });
+
+        // wait for the socket to be created
+        Retry::spawn(
+            ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)),
+            || async {
+                if socket_path.exists() {
+                    Ok(())
+                } else {
+                    Err(())
+                }
+            },
+        )
+        .await
+        .expect("failed to wait for socket");
+
+        // prepare a client
+        let grpc_client = {
+            let url = url::Url::parse(&format!(
+                "grpc+unix://{}?wait-connect=1",
+                socket_path.display()
+            ))
+            .expect("must parse");
+            let client = BlobServiceClient::new(
+                crate::tonic::channel_from_url(&url)
+                    .await
+                    .expect("must succeed"),
+            );
+
+            GRPCBlobService::from_client(client)
+        };
+
+        let has = grpc_client
+            .has(&fixtures::BLOB_A_DIGEST)
+            .await
+            .expect("must not be err");
+
+        assert!(!has);
+    }
+}
diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs
new file mode 100644
index 000000000000..25eec334de60
--- /dev/null
+++ b/tvix/castore/src/blobservice/memory.rs
@@ -0,0 +1,137 @@
+use std::io::{self, Cursor, Write};
+use std::task::Poll;
+use std::{
+    collections::HashMap,
+    sync::{Arc, RwLock},
+};
+use tonic::async_trait;
+use tracing::instrument;
+
+use super::{BlobReader, BlobService, BlobWriter};
+use crate::B3Digest;
+
+#[derive(Clone, Default)]
+pub struct MemoryBlobService {
+    db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
+}
+
+#[async_trait]
+impl BlobService for MemoryBlobService {
+    #[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
+        let db = self.db.read().unwrap();
+        Ok(db.contains_key(digest))
+    }
+
+    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
+        let db = self.db.read().unwrap();
+
+        match db.get(digest).map(|x| Cursor::new(x.clone())) {
+            Some(result) => Ok(Some(Box::new(result))),
+            None => Ok(None),
+        }
+    }
+
+    #[instrument(skip_all)]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        Box::new(MemoryBlobWriter::new(self.db.clone()))
+    }
+}
+
+pub struct MemoryBlobWriter {
+    db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
+
+    /// Contains the buffer Vec and hasher, or None if already closed
+    writers: Option<(Vec<u8>, blake3::Hasher)>,
+
+    /// The digest that has been returned, if we successfully closed.
+    digest: Option<B3Digest>,
+}
+
+impl MemoryBlobWriter {
+    fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self {
+        Self {
+            db,
+            writers: Some((Vec::new(), blake3::Hasher::new())),
+            digest: None,
+        }
+    }
+}
+impl tokio::io::AsyncWrite for MemoryBlobWriter {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+        b: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
+        Poll::Ready(match &mut self.writers {
+            None => Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "already closed",
+            )),
+            Some((ref mut buf, ref mut hasher)) => {
+                let bytes_written = buf.write(b)?;
+                hasher.write(&b[..bytes_written])
+            }
+        })
+    }
+
+    fn poll_flush(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        Poll::Ready(match self.writers {
+            None => Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "already closed",
+            )),
+            Some(_) => Ok(()),
+        })
+    }
+
+    fn poll_shutdown(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        // shutdown is "instantaneous", we only write to memory.
+        Poll::Ready(Ok(()))
+    }
+}
+
+#[async_trait]
+impl BlobWriter for MemoryBlobWriter {
+    async fn close(&mut self) -> io::Result<B3Digest> {
+        if self.writers.is_none() {
+            match &self.digest {
+                Some(digest) => Ok(digest.clone()),
+                None => Err(io::Error::new(io::ErrorKind::BrokenPipe, "already closed")),
+            }
+        } else {
+            let (buf, hasher) = self.writers.take().unwrap();
+
+            // We know self.hasher is doing blake3 hashing, so this won't fail.
+            let digest: B3Digest = hasher.finalize().as_bytes().into();
+
+            // Only insert if the blob doesn't already exist.
+            let db = self.db.read().map_err(|e| {
+                io::Error::new(io::ErrorKind::BrokenPipe, format!("RwLock poisoned: {}", e))
+            })?;
+            if !db.contains_key(&digest) {
+                // drop the read lock, so we can open for writing.
+                drop(db);
+
+                // open the database for writing.
+                let mut db = self.db.write().map_err(|e| {
+                    io::Error::new(io::ErrorKind::BrokenPipe, format!("RwLock poisoned: {}", e))
+                })?;
+
+                // and put buf in there. This will move buf out.
+                db.insert(digest.clone(), buf);
+            }
+
+            self.digest = Some(digest.clone());
+
+            Ok(digest)
+        }
+    }
+}
diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs
new file mode 100644
index 000000000000..fa6b87926ba1
--- /dev/null
+++ b/tvix/castore/src/blobservice/mod.rs
@@ -0,0 +1,79 @@
+use std::io;
+use tonic::async_trait;
+
+use crate::proto::stat_blob_response::ChunkMeta;
+use crate::B3Digest;
+
+mod combinator;
+mod from_addr;
+mod grpc;
+mod memory;
+mod naive_seeker;
+mod simplefs;
+mod sled;
+
+#[cfg(test)]
+mod tests;
+
+pub use self::combinator::CombinedBlobService;
+pub use self::from_addr::from_addr;
+pub use self::grpc::GRPCBlobService;
+pub use self::memory::MemoryBlobService;
+pub use self::simplefs::SimpleFilesystemBlobService;
+pub use self::sled::SledBlobService;
+
+/// The base trait all BlobService services need to implement.
+/// It provides functions to check whether a given blob exists,
+/// a way to read (and seek) a blob, and a method to create a blobwriter handle,
+/// which will implement a writer interface, and also provides a close funtion,
+/// to finalize a blob and get its digest.
+#[async_trait]
+pub trait BlobService: Send + Sync {
+    /// Check if the service has the blob, by its content hash.
+    /// On implementations returning chunks, this must also work for chunks.
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool>;
+
+    /// Request a blob from the store, by its content hash.
+    /// On implementations returning chunks, this must also work for chunks.
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>>;
+
+    /// Insert a new blob into the store. Returns a [BlobWriter], which
+    /// implements [tokio::io::AsyncWrite] and a [BlobWriter::close] to finalize
+    /// the blob and get its digest.
+    async fn open_write(&self) -> Box<dyn BlobWriter>;
+
+    /// Return a list of chunks for a given blob.
+    /// There's a distinction between returning Ok(None) and Ok(Some(vec![])).
+    /// The former return value is sent in case the blob is not present at all,
+    /// while the second one is sent in case there's no more granular chunks (or
+    /// the backend does not support chunking).
+    /// A default implementation checking for existence and then returning it
+    /// does not have more granular chunks available is provided.
+    async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
+        if !self.has(digest).await? {
+            return Ok(None);
+        } else {
+            // default implementation, signalling the backend does not have more
+            // granular chunks available.
+            return Ok(Some(vec![]));
+        }
+    }
+}
+
+/// A [tokio::io::AsyncWrite] that the user needs to close() afterwards for persist.
+/// On success, it returns the digest of the written blob.
+#[async_trait]
+pub trait BlobWriter: tokio::io::AsyncWrite + Send + Sync + Unpin + 'static {
+    /// Signal there's no more data to be written, and return the digest of the
+    /// contents written.
+    ///
+    /// Closing a already-closed BlobWriter is a no-op.
+    async fn close(&mut self) -> io::Result<B3Digest>;
+}
+
+/// BlobReader is a [tokio::io::AsyncRead] that also allows seeking.
+pub trait BlobReader: tokio::io::AsyncRead + tokio::io::AsyncSeek + Send + Unpin + 'static {}
+
+/// A [`io::Cursor<Vec<u8>>`] can be used as a BlobReader.
+impl BlobReader for io::Cursor<Vec<u8>> {}
+impl BlobReader for tokio::fs::File {}
diff --git a/tvix/castore/src/blobservice/naive_seeker.rs b/tvix/castore/src/blobservice/naive_seeker.rs
new file mode 100644
index 000000000000..e65a82c7f45a
--- /dev/null
+++ b/tvix/castore/src/blobservice/naive_seeker.rs
@@ -0,0 +1,269 @@
+use super::BlobReader;
+use pin_project_lite::pin_project;
+use std::io;
+use std::task::Poll;
+use tokio::io::AsyncRead;
+use tracing::{debug, instrument};
+
+pin_project! {
+    /// This implements [tokio::io::AsyncSeek] for and [tokio::io::AsyncRead] by
+    /// simply skipping over some bytes, keeping track of the position.
+    /// It fails whenever you try to seek backwards.
+    ///
+    /// ## Pinning concerns:
+    ///
+    /// [NaiveSeeker] is itself pinned by callers, and we do not need to concern
+    /// ourselves regarding that.
+    ///
+    /// Though, its fields as per
+    /// <https://doc.rust-lang.org/std/pin/#pinning-is-not-structural-for-field>
+    /// can be pinned or unpinned.
+    ///
+    /// So we need to go over each field and choose our policy carefully.
+    ///
+    /// The obvious cases are the bookkeeping integers we keep in the structure,
+    /// those are private and not shared to anyone, we never build a
+    /// `Pin<&mut X>` out of them at any point, therefore, we can safely never
+    /// mark them as pinned. Of course, it is expected that no developer here
+    /// attempt to `pin!(self.pos)` to pin them because it makes no sense. If
+    /// they have to become pinned, they should be marked `#[pin]` and we need
+    /// to discuss it.
+    ///
+    /// So the bookkeeping integers are in the right state with respect to their
+    /// pinning status. The projection should offer direct access.
+    ///
+    /// On the `r` field, i.e. a `BufReader<R>`, given that
+    /// <https://docs.rs/tokio/latest/tokio/io/struct.BufReader.html#impl-Unpin-for-BufReader%3CR%3E>
+    /// is available, even a `Pin<&mut BufReader<R>>` can be safely moved.
+    ///
+    /// The only care we should have regards the internal reader itself, i.e.
+    /// the `R` instance, see that Tokio decided to `#[pin]` it too:
+    /// <https://docs.rs/tokio/latest/src/tokio/io/util/buf_reader.rs.html#29>
+    ///
+    /// In general, there's no `Unpin` instance for `R: tokio::io::AsyncRead`
+    /// (see <https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html>).
+    ///
+    /// Therefore, we could keep it unpinned and pin it in every call site
+    /// whenever we need to call `poll_*` which can be confusing to the non-
+    /// expert developer and we have a fair share amount of situations where the
+    /// [BufReader] instance is naked, i.e. in its `&mut BufReader<R>`
+    /// form, this is annoying because it could lead to expose the naked `R`
+    /// internal instance somehow and would produce a risk of making it move
+    /// unexpectedly.
+    ///
+    /// We choose the path of the least resistance as we have no reason to have
+    /// access to the raw `BufReader<R>` instance, we just `#[pin]` it too and
+    /// enjoy its `poll_*` safe APIs and push the unpinning concerns to the
+    /// internal implementations themselves, which studied the question longer
+    /// than us.
+    pub struct NaiveSeeker<R: tokio::io::AsyncRead> {
+        #[pin]
+        r: tokio::io::BufReader<R>,
+        pos: u64,
+        bytes_to_skip: u64,
+    }
+}
+
+impl<R: tokio::io::AsyncRead> NaiveSeeker<R> {
+    pub fn new(r: R) -> Self {
+        NaiveSeeker {
+            r: tokio::io::BufReader::new(r),
+            pos: 0,
+            bytes_to_skip: 0,
+        }
+    }
+}
+
+impl<R: tokio::io::AsyncRead> tokio::io::AsyncRead for NaiveSeeker<R> {
+    fn poll_read(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &mut tokio::io::ReadBuf<'_>,
+    ) -> Poll<std::io::Result<()>> {
+        // The amount of data read can be determined by the increase
+        // in the length of the slice returned by `ReadBuf::filled`.
+        let filled_before = buf.filled().len();
+        let this = self.project();
+        let pos: &mut u64 = this.pos;
+
+        match this.r.poll_read(cx, buf) {
+            Poll::Ready(a) => {
+                let bytes_read = buf.filled().len() - filled_before;
+                *pos += bytes_read as u64;
+
+                Poll::Ready(a)
+            }
+            Poll::Pending => Poll::Pending,
+        }
+    }
+}
+
+impl<R: tokio::io::AsyncRead> tokio::io::AsyncBufRead for NaiveSeeker<R> {
+    fn poll_fill_buf(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<io::Result<&[u8]>> {
+        self.project().r.poll_fill_buf(cx)
+    }
+
+    fn consume(self: std::pin::Pin<&mut Self>, amt: usize) {
+        let this = self.project();
+        this.r.consume(amt);
+        let pos: &mut u64 = this.pos;
+        *pos += amt as u64;
+    }
+}
+
+impl<R: tokio::io::AsyncRead> tokio::io::AsyncSeek for NaiveSeeker<R> {
+    #[instrument(skip(self))]
+    fn start_seek(
+        self: std::pin::Pin<&mut Self>,
+        position: std::io::SeekFrom,
+    ) -> std::io::Result<()> {
+        let absolute_offset: u64 = match position {
+            io::SeekFrom::Start(start_offset) => {
+                if start_offset < self.pos {
+                    return Err(io::Error::new(
+                        io::ErrorKind::Unsupported,
+                        format!("can't seek backwards ({} -> {})", self.pos, start_offset),
+                    ));
+                } else {
+                    start_offset
+                }
+            }
+            // we don't know the total size, can't support this.
+            io::SeekFrom::End(_end_offset) => {
+                return Err(io::Error::new(
+                    io::ErrorKind::Unsupported,
+                    "can't seek from end",
+                ));
+            }
+            io::SeekFrom::Current(relative_offset) => {
+                if relative_offset < 0 {
+                    return Err(io::Error::new(
+                        io::ErrorKind::Unsupported,
+                        "can't seek backwards relative to current position",
+                    ));
+                } else {
+                    self.pos + relative_offset as u64
+                }
+            }
+        };
+
+        debug!(absolute_offset=?absolute_offset, "seek");
+
+        // we already know absolute_offset is larger than self.pos
+        debug_assert!(
+            absolute_offset >= self.pos,
+            "absolute_offset {} is larger than self.pos {}",
+            absolute_offset,
+            self.pos
+        );
+
+        // calculate bytes to skip
+        *self.project().bytes_to_skip = absolute_offset - self.pos;
+
+        Ok(())
+    }
+
+    #[instrument(skip(self))]
+    fn poll_complete(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<std::io::Result<u64>> {
+        if self.bytes_to_skip == 0 {
+            // return the new position (from the start of the stream)
+            return Poll::Ready(Ok(self.pos));
+        }
+
+        // discard some bytes, until pos is where we want it to be.
+        // We create a buffer that we'll discard later on.
+        let mut buf = [0; 1024];
+
+        // Loop until we've reached the desired seek position. This is done by issuing repeated
+        // `poll_read` calls. If the data is not available yet, we will yield back to the executor
+        // and wait to be polled again.
+        loop {
+            // calculate the length we want to skip at most, which is either a max
+            // buffer size, or the number of remaining bytes to read, whatever is
+            // smaller.
+            let bytes_to_skip = std::cmp::min(self.bytes_to_skip as usize, buf.len());
+
+            let mut read_buf = tokio::io::ReadBuf::new(&mut buf[..bytes_to_skip]);
+
+            match self.as_mut().poll_read(cx, &mut read_buf) {
+                Poll::Ready(_a) => {
+                    let bytes_read = read_buf.filled().len() as u64;
+
+                    if bytes_read == 0 {
+                        return Poll::Ready(Err(io::Error::new(
+                            io::ErrorKind::UnexpectedEof,
+                            format!(
+                                "tried to skip {} bytes, but only was able to skip {} until reaching EOF",
+                                bytes_to_skip, bytes_read
+                            ),
+                        )));
+                    }
+
+                    // calculate bytes to skip
+                    let bytes_to_skip = self.bytes_to_skip - bytes_read;
+
+                    *self.as_mut().project().bytes_to_skip = bytes_to_skip;
+
+                    if bytes_to_skip == 0 {
+                        return Poll::Ready(Ok(self.pos));
+                    }
+                }
+                Poll::Pending => return Poll::Pending,
+            };
+        }
+    }
+}
+
+impl<R: tokio::io::AsyncRead + Send + Unpin + 'static> BlobReader for NaiveSeeker<R> {}
+
+#[cfg(test)]
+mod tests {
+    use super::NaiveSeeker;
+    use std::io::{Cursor, SeekFrom};
+    use tokio::io::{AsyncReadExt, AsyncSeekExt};
+
+    /// This seek requires multiple `poll_read` as we use a 1024 bytes internal
+    /// buffer when doing the seek.
+    /// This ensures we don't hang indefinitely.
+    #[tokio::test]
+    async fn seek() {
+        let buf = vec![0u8; 4096];
+        let reader = Cursor::new(&buf);
+        let mut seeker = NaiveSeeker::new(reader);
+        seeker.seek(SeekFrom::Start(4000)).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn seek_read() {
+        let mut buf = vec![0u8; 2048];
+        buf.extend_from_slice(&[1u8; 2048]);
+        buf.extend_from_slice(&[2u8; 2048]);
+
+        let reader = Cursor::new(&buf);
+        let mut seeker = NaiveSeeker::new(reader);
+
+        let mut read_buf = vec![0u8; 1024];
+        seeker.read_exact(&mut read_buf).await.expect("must read");
+        assert_eq!(read_buf.as_slice(), &[0u8; 1024]);
+
+        seeker
+            .seek(SeekFrom::Current(1024))
+            .await
+            .expect("must seek");
+        seeker.read_exact(&mut read_buf).await.expect("must read");
+        assert_eq!(read_buf.as_slice(), &[1u8; 1024]);
+
+        seeker
+            .seek(SeekFrom::Start(2 * 2048))
+            .await
+            .expect("must seek");
+        seeker.read_exact(&mut read_buf).await.expect("must read");
+        assert_eq!(read_buf.as_slice(), &[2u8; 1024]);
+    }
+}
diff --git a/tvix/castore/src/blobservice/simplefs.rs b/tvix/castore/src/blobservice/simplefs.rs
new file mode 100644
index 000000000000..b21db2808c23
--- /dev/null
+++ b/tvix/castore/src/blobservice/simplefs.rs
@@ -0,0 +1,195 @@
+use std::{
+    io,
+    path::{Path, PathBuf},
+    pin::pin,
+    task::Poll,
+};
+
+use bytes::Buf;
+use data_encoding::HEXLOWER;
+use pin_project_lite::pin_project;
+use tokio::io::AsyncWriteExt;
+use tonic::async_trait;
+use tracing::instrument;
+
+use crate::B3Digest;
+
+use super::{BlobReader, BlobService, BlobWriter};
+
+/// Connects to a tvix-store BlobService on an existing path backed by a POSIX-compliant
+/// filesystem.
+///
+/// It takes an existing path, builds a `tmp` directory and a `blobs` directory inside of it. All
+/// blobs received are staged in that `tmp` directory, then they are moved **atomically** into
+/// `blobs/B3DIGEST[:2]/B3DIGEST[2:]` in a sharding style, e.g. `abcdef` gets turned into `ab/cdef`
+///
+/// **Disclaimer** : This very simple implementation is subject to change and does not give any
+/// final guarantees on the on-disk format.
+#[derive(Clone)]
+pub struct SimpleFilesystemBlobService {
+    /// Where the blobs are located on a filesystem already mounted.
+    path: PathBuf,
+}
+
+impl SimpleFilesystemBlobService {
+    pub async fn new(path: PathBuf) -> std::io::Result<Self> {
+        tokio::fs::create_dir_all(&path).await?;
+        tokio::fs::create_dir_all(path.join("tmp")).await?;
+        tokio::fs::create_dir_all(path.join("blobs")).await?;
+
+        Ok(Self { path })
+    }
+}
+
+fn derive_path(root: &Path, digest: &B3Digest) -> PathBuf {
+    let prefix = HEXLOWER.encode(&digest.as_slice()[..2]);
+    let pathname = HEXLOWER.encode(digest.as_slice());
+
+    root.join("blobs").join(prefix).join(pathname)
+}
+
+#[async_trait]
+impl BlobService for SimpleFilesystemBlobService {
+    #[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
+        Ok(tokio::fs::try_exists(derive_path(&self.path, digest)).await?)
+    }
+
+    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
+        let dst_path = derive_path(&self.path, digest);
+        let reader = match tokio::fs::File::open(dst_path).await {
+            Ok(file) => {
+                let reader: Box<dyn BlobReader> = Box::new(file);
+                Ok(Some(reader))
+            }
+            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
+            Err(e) => Err(e),
+        };
+
+        Ok(reader?)
+    }
+
+    #[instrument(skip_all)]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        let file = match async_tempfile::TempFile::new_in(self.path.join("tmp")).await {
+            Ok(file) => Ok(file),
+            Err(e) => match e {
+                async_tempfile::Error::Io(io_error) => Err(io_error),
+                async_tempfile::Error::InvalidFile => Err(std::io::Error::new(
+                    std::io::ErrorKind::NotFound,
+                    "invalid or missing file specified",
+                )),
+                async_tempfile::Error::InvalidDirectory => Err(std::io::Error::new(
+                    std::io::ErrorKind::NotFound,
+                    "invalid or missing directory specified",
+                )),
+            },
+        };
+
+        Box::new(SimpleFilesystemBlobWriter {
+            root: self.path.clone(),
+            file,
+            digester: blake3::Hasher::new(),
+        })
+    }
+}
+
+pin_project! {
+    struct SimpleFilesystemBlobWriter {
+        root: PathBuf,
+        file: std::io::Result<async_tempfile::TempFile>,
+        digester: blake3::Hasher
+    }
+}
+
+impl tokio::io::AsyncWrite for SimpleFilesystemBlobWriter {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> std::task::Poll<Result<usize, std::io::Error>> {
+        if let Err(e) = self.file.as_mut() {
+            return Poll::Ready(Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            )));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        match pin!(writer).poll_write(cx, buf) {
+            Poll::Ready(Ok(n)) => {
+                let this = self.project();
+                this.digester.update(buf.take(n).into_inner());
+                Poll::Ready(Ok(n))
+            }
+            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
+            Poll::Pending => Poll::Pending,
+        }
+    }
+
+    fn poll_flush(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), std::io::Error>> {
+        if let Err(e) = self.file.as_mut() {
+            return Poll::Ready(Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            )));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        pin!(writer).poll_flush(cx)
+    }
+
+    fn poll_shutdown(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), std::io::Error>> {
+        if let Err(e) = self.file.as_mut() {
+            return Poll::Ready(Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            )));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        pin!(writer).poll_shutdown(cx)
+    }
+}
+
+#[async_trait]
+impl BlobWriter for SimpleFilesystemBlobWriter {
+    async fn close(&mut self) -> io::Result<B3Digest> {
+        if let Err(e) = self.file.as_mut() {
+            return Err(std::mem::replace(
+                e,
+                std::io::Error::new(
+                    std::io::ErrorKind::NotConnected,
+                    "this file is already closed",
+                ),
+            ));
+        }
+
+        let writer = self.file.as_mut().unwrap();
+        writer.sync_all().await?;
+        writer.flush().await?;
+
+        let digest: B3Digest = self.digester.finalize().as_bytes().into();
+        let dst_path = derive_path(&self.root, &digest);
+        tokio::fs::create_dir_all(dst_path.parent().unwrap()).await?;
+        tokio::fs::rename(writer.file_path(), dst_path).await?;
+
+        Ok(digest)
+    }
+}
diff --git a/tvix/castore/src/blobservice/sled.rs b/tvix/castore/src/blobservice/sled.rs
new file mode 100644
index 000000000000..3dd4bff7bc8e
--- /dev/null
+++ b/tvix/castore/src/blobservice/sled.rs
@@ -0,0 +1,150 @@
+use super::{BlobReader, BlobService, BlobWriter};
+use crate::{B3Digest, Error};
+use std::{
+    io::{self, Cursor, Write},
+    path::Path,
+    task::Poll,
+};
+use tonic::async_trait;
+use tracing::instrument;
+
+#[derive(Clone)]
+pub struct SledBlobService {
+    db: sled::Db,
+}
+
+impl SledBlobService {
+    pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> {
+        let config = sled::Config::default()
+            .use_compression(false) // is a required parameter
+            .path(p);
+        let db = config.open()?;
+
+        Ok(Self { db })
+    }
+
+    pub fn new_temporary() -> Result<Self, sled::Error> {
+        let config = sled::Config::default().temporary(true);
+        let db = config.open()?;
+
+        Ok(Self { db })
+    }
+}
+
+#[async_trait]
+impl BlobService for SledBlobService {
+    #[instrument(skip(self), fields(blob.digest=%digest))]
+    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
+        match self.db.contains_key(digest.as_slice()) {
+            Ok(has) => Ok(has),
+            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
+        }
+    }
+
+    #[instrument(skip(self), fields(blob.digest=%digest))]
+    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
+        match self.db.get(digest.as_slice()) {
+            Ok(None) => Ok(None),
+            Ok(Some(data)) => Ok(Some(Box::new(Cursor::new(data[..].to_vec())))),
+            Err(e) => Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
+        }
+    }
+
+    #[instrument(skip(self))]
+    async fn open_write(&self) -> Box<dyn BlobWriter> {
+        Box::new(SledBlobWriter::new(self.db.clone()))
+    }
+}
+
+pub struct SledBlobWriter {
+    db: sled::Db,
+
+    /// Contains the buffer Vec and hasher, or None if already closed
+    writers: Option<(Vec<u8>, blake3::Hasher)>,
+
+    /// The digest that has been returned, if we successfully closed.
+    digest: Option<B3Digest>,
+}
+
+impl SledBlobWriter {
+    pub fn new(db: sled::Db) -> Self {
+        Self {
+            db,
+            writers: Some((Vec::new(), blake3::Hasher::new())),
+            digest: None,
+        }
+    }
+}
+
+impl tokio::io::AsyncWrite for SledBlobWriter {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+        b: &[u8],
+    ) -> std::task::Poll<Result<usize, io::Error>> {
+        Poll::Ready(match &mut self.writers {
+            None => Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "already closed",
+            )),
+            Some((ref mut buf, ref mut hasher)) => {
+                let bytes_written = buf.write(b)?;
+                hasher.write(&b[..bytes_written])
+            }
+        })
+    }
+
+    fn poll_flush(
+        mut self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        Poll::Ready(match &mut self.writers {
+            None => Err(io::Error::new(
+                io::ErrorKind::NotConnected,
+                "already closed",
+            )),
+            Some(_) => Ok(()),
+        })
+    }
+
+    fn poll_shutdown(
+        self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Result<(), io::Error>> {
+        // shutdown is "instantaneous", we only write to a Vec<u8> as buffer.
+        Poll::Ready(Ok(()))
+    }
+}
+
+#[async_trait]
+impl BlobWriter for SledBlobWriter {
+    async fn close(&mut self) -> io::Result<B3Digest> {
+        if self.writers.is_none() {
+            match &self.digest {
+                Some(digest) => Ok(digest.clone()),
+                None => Err(io::Error::new(
+                    io::ErrorKind::NotConnected,
+                    "already closed",
+                )),
+            }
+        } else {
+            let (buf, hasher) = self.writers.take().unwrap();
+
+            let digest: B3Digest = hasher.finalize().as_bytes().into();
+
+            // Only insert if the blob doesn't already exist.
+            if !self.db.contains_key(digest.as_slice()).map_err(|e| {
+                Error::StorageError(format!("Unable to check if we have blob {}: {}", digest, e))
+            })? {
+                // put buf in there. This will move buf out.
+                self.db
+                    .insert(digest.as_slice(), buf)
+                    .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?;
+            }
+
+            self.digest = Some(digest.clone());
+
+            Ok(digest)
+        }
+    }
+}
diff --git a/tvix/castore/src/blobservice/tests.rs b/tvix/castore/src/blobservice/tests.rs
new file mode 100644
index 000000000000..7480ca808225
--- /dev/null
+++ b/tvix/castore/src/blobservice/tests.rs
@@ -0,0 +1,243 @@
+use std::io;
+use std::pin::pin;
+
+use test_case::test_case;
+use tokio::io::AsyncReadExt;
+use tokio::io::AsyncSeekExt;
+
+use super::B3Digest;
+use super::BlobService;
+use super::MemoryBlobService;
+use super::SledBlobService;
+use crate::fixtures;
+
+// TODO: avoid having to define all different services we test against for all functions.
+// maybe something like rstest can be used?
+
+fn gen_memory_blob_service() -> impl BlobService {
+    MemoryBlobService::default()
+}
+fn gen_sled_blob_service() -> impl BlobService {
+    SledBlobService::new_temporary().unwrap()
+}
+
+// TODO: add GRPC blob service here.
+
+/// Using [BlobService::has] on a non-existing blob should return false
+#[test_case(gen_memory_blob_service(); "memory")]
+#[test_case(gen_sled_blob_service(); "sled")]
+fn has_nonexistent_false(blob_service: impl BlobService) {
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        assert!(!blob_service
+            .has(&fixtures::BLOB_A_DIGEST)
+            .await
+            .expect("must not fail"));
+    })
+}
+
+/// Trying to read a non-existing blob should return a None instead of a reader.
+#[test_case(gen_memory_blob_service(); "memory")]
+#[test_case(gen_sled_blob_service(); "sled")]
+fn not_found_read(blob_service: impl BlobService) {
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        assert!(blob_service
+            .open_read(&fixtures::BLOB_A_DIGEST)
+            .await
+            .expect("must not fail")
+            .is_none())
+    })
+}
+
+/// Put a blob in the store, check has, get it back.
+/// We test both with small and big blobs.
+#[test_case(gen_memory_blob_service(), &fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST; "memory-small")]
+#[test_case(gen_sled_blob_service(), &fixtures::BLOB_A, &fixtures::BLOB_A_DIGEST; "sled-small")]
+#[test_case(gen_memory_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "memory-big")]
+#[test_case(gen_sled_blob_service(), &fixtures::BLOB_B, &fixtures::BLOB_B_DIGEST; "sled-big")]
+fn put_has_get(blob_service: impl BlobService, blob_contents: &[u8], blob_digest: &B3Digest) {
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        let mut w = blob_service.open_write().await;
+
+        let l = tokio::io::copy(&mut io::Cursor::new(blob_contents), &mut w)
+            .await
+            .expect("copy must succeed");
+        assert_eq!(
+            blob_contents.len(),
+            l as usize,
+            "written bytes must match blob length"
+        );
+
+        let digest = w.close().await.expect("close must succeed");
+
+        assert_eq!(*blob_digest, digest, "returned digest must be correct");
+
+        assert!(
+            blob_service.has(blob_digest).await.expect("must not fail"),
+            "blob service should now have the blob"
+        );
+
+        let mut r = blob_service
+            .open_read(blob_digest)
+            .await
+            .expect("open_read must succeed")
+            .expect("must be some");
+
+        let mut buf: Vec<u8> = Vec::new();
+        let mut pinned_reader = pin!(r);
+        let l = tokio::io::copy(&mut pinned_reader, &mut buf)
+            .await
+            .expect("copy must succeed");
+        // let l = io::copy(&mut r, &mut buf).expect("copy must succeed");
+
+        assert_eq!(
+            blob_contents.len(),
+            l as usize,
+            "read bytes must match blob length"
+        );
+
+        assert_eq!(blob_contents, buf, "read blob contents must match");
+    })
+}
+
+/// Put a blob in the store, and seek inside it a bit.
+#[test_case(gen_memory_blob_service(); "memory")]
+#[test_case(gen_sled_blob_service(); "sled")]
+fn put_seek(blob_service: impl BlobService) {
+    tokio::runtime::Runtime::new().unwrap().block_on(async {
+        let mut w = blob_service.open_write().await;
+
+        tokio::io::copy(&mut io::Cursor::new(&fixtures::BLOB_B.to_vec()), &mut w)
+            .await
+            .expect("copy must succeed");
+        w.close().await.expect("close must succeed");
+
+        // open a blob for reading
+        let mut r = blob_service
+            .open_read(&fixtures::BLOB_B_DIGEST)
+            .await
+            .expect("open_read must succeed")
+            .expect("must be some");
+
+        let mut pos: u64 = 0;
+
+        // read the first 10 bytes, they must match the data in the fixture.
+        {
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
+
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected first 10 bytes to match"
+            );
+
+            pos += buf.len() as u64;
+        }
+        // seek by 0 bytes, using SeekFrom::Start.
+        let p = r
+            .seek(io::SeekFrom::Start(pos))
+            .await
+            .expect("must not fail");
+        assert_eq!(pos, p);
+
+        // read the next 10 bytes, they must match the data in the fixture.
+        {
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
+
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected data to match"
+            );
+
+            pos += buf.len() as u64;
+        }
+
+        // seek by 5 bytes, using SeekFrom::Start.
+        let p = r
+            .seek(io::SeekFrom::Start(pos + 5))
+            .await
+            .expect("must not fail");
+        pos += 5;
+        assert_eq!(pos, p);
+
+        // read the next 10 bytes, they must match the data in the fixture.
+        {
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
+
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected data to match"
+            );
+
+            pos += buf.len() as u64;
+        }
+
+        // seek by 12345 bytes, using SeekFrom::
+        let p = r
+            .seek(io::SeekFrom::Current(12345))
+            .await
+            .expect("must not fail");
+        pos += 12345;
+        assert_eq!(pos, p);
+
+        // read the next 10 bytes, they must match the data in the fixture.
+        {
+            let mut buf = [0; 10];
+            r.read_exact(&mut buf).await.expect("must succeed");
+
+            assert_eq!(
+                &fixtures::BLOB_B[pos as usize..pos as usize + buf.len()],
+                buf,
+                "expected data to match"
+            );
+
+            #[allow(unused_assignments)]
+            {
+                pos += buf.len() as u64;
+            }
+        }
+
+        // seeking to the end is okay…
+        let p = r
+            .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64))
+            .await
+            .expect("must not fail");
+        pos = fixtures::BLOB_B.len() as u64;
+        assert_eq!(pos, p);
+
+        {
+            // but it returns no more data.
+            let mut buf: Vec<u8> = Vec::new();
+            r.read_to_end(&mut buf).await.expect("must not fail");
+            assert!(buf.is_empty(), "expected no more data to be read");
+        }
+
+        // seeking past the end…
+        // should either be ok, but then return 0 bytes.
+        // this matches the behaviour or a Cursor<Vec<u8>>.
+        if let Ok(_pos) = r
+            .seek(io::SeekFrom::Start(fixtures::BLOB_B.len() as u64 + 1))
+            .await
+        {
+            let mut buf: Vec<u8> = Vec::new();
+            r.read_to_end(&mut buf).await.expect("must not fail");
+            assert!(buf.is_empty(), "expected no more data to be read");
+        }
+        // or not be okay.
+
+        // TODO: this is only broken for the gRPC version
+        // We expect seeking backwards or relative to the end to fail.
+        // r.seek(io::SeekFrom::Current(-1))
+        //     .expect_err("SeekFrom::Current(-1) expected to fail");
+
+        // r.seek(io::SeekFrom::Start(pos - 1))
+        //     .expect_err("SeekFrom::Start(pos-1) expected to fail");
+
+        // r.seek(io::SeekFrom::End(0))
+        //     .expect_err("SeekFrom::End(_) expected to fail");
+    })
+}
diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs
new file mode 100644
index 000000000000..137ed2669a8f
--- /dev/null
+++ b/tvix/castore/src/digests.rs
@@ -0,0 +1,73 @@
+use bytes::Bytes;
+use data_encoding::BASE64;
+use thiserror::Error;
+
+#[derive(PartialEq, Eq, Hash, Debug)]
+pub struct B3Digest(Bytes);
+
+// TODO: allow converting these errors to crate::Error
+#[derive(Error, Debug)]
+pub enum Error {
+    #[error("invalid digest length: {0}")]
+    InvalidDigestLen(usize),
+}
+
+pub const B3_LEN: usize = 32;
+
+impl B3Digest {
+    pub fn as_slice(&self) -> &[u8] {
+        &self.0[..]
+    }
+}
+
+impl From<B3Digest> for bytes::Bytes {
+    fn from(val: B3Digest) -> Self {
+        val.0
+    }
+}
+
+impl TryFrom<Vec<u8>> for B3Digest {
+    type Error = Error;
+
+    // constructs a [B3Digest] from a [Vec<u8>].
+    // Returns an error if the digest has the wrong length.
+    fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
+        if value.len() != B3_LEN {
+            Err(Error::InvalidDigestLen(value.len()))
+        } else {
+            Ok(Self(value.into()))
+        }
+    }
+}
+
+impl TryFrom<bytes::Bytes> for B3Digest {
+    type Error = Error;
+
+    // constructs a [B3Digest] from a [bytes::Bytes].
+    // Returns an error if the digest has the wrong length.
+    fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> {
+        if value.len() != B3_LEN {
+            Err(Error::InvalidDigestLen(value.len()))
+        } else {
+            Ok(Self(value))
+        }
+    }
+}
+
+impl From<&[u8; B3_LEN]> for B3Digest {
+    fn from(value: &[u8; B3_LEN]) -> Self {
+        Self(value.to_vec().into())
+    }
+}
+
+impl Clone for B3Digest {
+    fn clone(&self) -> Self {
+        Self(self.0.to_owned())
+    }
+}
+
+impl std::fmt::Display for B3Digest {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "b3:{}", BASE64.encode(&self.0))
+    }
+}
diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs
new file mode 100644
index 000000000000..0a5a0464c14a
--- /dev/null
+++ b/tvix/castore/src/directoryservice/from_addr.rs
@@ -0,0 +1,120 @@
+use url::Url;
+
+use crate::{proto::directory_service_client::DirectoryServiceClient, Error};
+
+use super::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService, SledDirectoryService};
+
+/// Constructs a new instance of a [DirectoryService] from an URI.
+///
+/// The following URIs are supported:
+/// - `memory:`
+///   Uses a in-memory implementation.
+/// - `sled:`
+///   Uses a in-memory sled implementation.
+/// - `sled:///absolute/path/to/somewhere`
+///   Uses sled, using a path on the disk for persistency. Can be only opened
+///   from one process at the same time.
+/// - `grpc+unix:///absolute/path/to/somewhere`
+///   Connects to a local tvix-store gRPC service via Unix socket.
+/// - `grpc+http://host:port`, `grpc+https://host:port`
+///    Connects to a (remote) tvix-store gRPC service.
+pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Error> {
+    let url = Url::parse(uri)
+        .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?;
+
+    Ok(if url.scheme() == "memory" {
+        // memory doesn't support host or path in the URL.
+        if url.has_host() || !url.path().is_empty() {
+            return Err(Error::StorageError("invalid url".to_string()));
+        }
+        Box::<MemoryDirectoryService>::default()
+    } else if url.scheme() == "sled" {
+        // sled doesn't support host, and a path can be provided (otherwise
+        // it'll live in memory only).
+        if url.has_host() {
+            return Err(Error::StorageError("no host allowed".to_string()));
+        }
+
+        if url.path() == "/" {
+            return Err(Error::StorageError(
+                "cowardly refusing to open / with sled".to_string(),
+            ));
+        }
+
+        // TODO: expose compression and other parameters as URL parameters?
+
+        if url.path().is_empty() {
+            return Ok(Box::new(
+                SledDirectoryService::new_temporary()
+                    .map_err(|e| Error::StorageError(e.to_string()))?,
+            ));
+        }
+        return Ok(Box::new(
+            SledDirectoryService::new(url.path())
+                .map_err(|e| Error::StorageError(e.to_string()))?,
+        ));
+    } else if url.scheme().starts_with("grpc+") {
+        // schemes starting with grpc+ go to the GRPCPathInfoService.
+        //   That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
+        // - In the case of unix sockets, there must be a path, but may not be a host.
+        // - In the case of non-unix sockets, there must be a host, but no path.
+        // Constructing the channel is handled by tvix_castore::channel::from_url.
+        let client = DirectoryServiceClient::new(crate::tonic::channel_from_url(&url).await?);
+        Box::new(GRPCDirectoryService::from_client(client))
+    } else {
+        Err(crate::Error::StorageError(format!(
+            "unknown scheme: {}",
+            url.scheme()
+        )))?
+    })
+}
+
+#[cfg(test)]
+mod tests {
+    use super::from_addr;
+    use lazy_static::lazy_static;
+    use tempfile::TempDir;
+    use test_case::test_case;
+
+    lazy_static! {
+        static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap();
+        static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap();
+    }
+
+    /// This uses an unsupported scheme.
+    #[test_case("http://foo.example/test", false; "unsupported scheme")]
+    /// This configures sled in temporary mode.
+    #[test_case("sled://", true; "sled valid temporary")]
+    /// This configures sled with /, which should fail.
+    #[test_case("sled:///", false; "sled invalid root")]
+    /// This configures sled with a host, not path, which should fail.
+    #[test_case("sled://foo.example", false; "sled invalid host")]
+    /// This configures sled with a valid path path, which should succeed.
+    #[test_case(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true; "sled valid path")]
+    /// This configures sled with a host, and a valid path path, which should fail.
+    #[test_case(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false; "sled invalid host with valid path")]
+    /// This correctly sets the scheme, and doesn't set a path.
+    #[test_case("memory://", true; "memory valid")]
+    /// This sets a memory url host to `foo`
+    #[test_case("memory://foo", false; "memory invalid host")]
+    /// This sets a memory url path to "/", which is invalid.
+    #[test_case("memory:///", false; "memory invalid root path")]
+    /// This sets a memory url path to "/foo", which is invalid.
+    #[test_case("memory:///foo", false; "memory invalid root path foo")]
+    /// Correct scheme to connect to a unix socket.
+    #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")]
+    /// Correct scheme for unix socket, but setting a host too, which is invalid.
+    #[test_case("grpc+unix://host.example/path/to/somewhere", false; "grpc invalid unix socket and host")]
+    /// Correct scheme to connect to localhost, with port 12345
+    #[test_case("grpc+http://[::1]:12345", true; "grpc valid IPv6 localhost port 12345")]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[test_case("grpc+http://localhost", true; "grpc valid http host without port")]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[test_case("grpc+https://localhost", true; "grpc valid https host without port")]
+    /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
+    #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")]
+    #[tokio::test]
+    async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) {
+        assert_eq!(from_addr(uri_str).await.is_ok(), is_ok)
+    }
+}
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
new file mode 100644
index 000000000000..ad06cb17b668
--- /dev/null
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -0,0 +1,505 @@
+use std::collections::HashSet;
+
+use super::{DirectoryPutter, DirectoryService};
+use crate::proto::{self, get_directory_request::ByWhat};
+use crate::{B3Digest, Error};
+use async_stream::try_stream;
+use futures::stream::BoxStream;
+use tokio::spawn;
+use tokio::sync::mpsc::UnboundedSender;
+use tokio::task::JoinHandle;
+use tokio_stream::wrappers::UnboundedReceiverStream;
+use tonic::async_trait;
+use tonic::Code;
+use tonic::{transport::Channel, Status};
+use tracing::{instrument, warn};
+
+/// Connects to a (remote) tvix-store DirectoryService over gRPC.
+#[derive(Clone)]
+pub struct GRPCDirectoryService {
+    /// The internal reference to a gRPC client.
+    /// Cloning it is cheap, and it internally handles concurrent requests.
+    grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
+}
+
+impl GRPCDirectoryService {
+    /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient].
+    /// panics if called outside the context of a tokio runtime.
+    pub fn from_client(
+        grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
+    ) -> Self {
+        Self { grpc_client }
+    }
+}
+
+#[async_trait]
+impl DirectoryService for GRPCDirectoryService {
+    async fn get(
+        &self,
+        digest: &B3Digest,
+    ) -> Result<Option<crate::proto::Directory>, crate::Error> {
+        // Get a new handle to the gRPC client, and copy the digest.
+        let mut grpc_client = self.grpc_client.clone();
+        let digest_cpy = digest.clone();
+        let message = async move {
+            let mut s = grpc_client
+                .get(proto::GetDirectoryRequest {
+                    recursive: false,
+                    by_what: Some(ByWhat::Digest(digest_cpy.into())),
+                })
+                .await?
+                .into_inner();
+
+            // Retrieve the first message only, then close the stream (we set recursive to false)
+            s.message().await
+        };
+
+        let digest = digest.clone();
+        match message.await {
+            Ok(Some(directory)) => {
+                // Validate the retrieved Directory indeed has the
+                // digest we expect it to have, to detect corruptions.
+                let actual_digest = directory.digest();
+                if actual_digest != digest {
+                    Err(crate::Error::StorageError(format!(
+                        "requested directory with digest {}, but got {}",
+                        digest, actual_digest
+                    )))
+                } else if let Err(e) = directory.validate() {
+                    // Validate the Directory itself is valid.
+                    warn!("directory failed validation: {}", e.to_string());
+                    Err(crate::Error::StorageError(format!(
+                        "directory {} failed validation: {}",
+                        digest, e,
+                    )))
+                } else {
+                    Ok(Some(directory))
+                }
+            }
+            Ok(None) => Ok(None),
+            Err(e) if e.code() == Code::NotFound => Ok(None),
+            Err(e) => Err(crate::Error::StorageError(e.to_string())),
+        }
+    }
+
+    async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
+        let resp = self
+            .grpc_client
+            .clone()
+            .put(tokio_stream::once(directory))
+            .await;
+
+        match resp {
+            Ok(put_directory_resp) => Ok(put_directory_resp
+                .into_inner()
+                .root_digest
+                .try_into()
+                .map_err(|_| {
+                    Error::StorageError("invalid root digest length in response".to_string())
+                })?),
+            Err(e) => Err(crate::Error::StorageError(e.to_string())),
+        }
+    }
+
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    fn get_recursive(
+        &self,
+        root_directory_digest: &B3Digest,
+    ) -> BoxStream<Result<proto::Directory, Error>> {
+        let mut grpc_client = self.grpc_client.clone();
+        let root_directory_digest = root_directory_digest.clone();
+
+        let stream = try_stream! {
+            let mut stream = grpc_client
+                .get(proto::GetDirectoryRequest {
+                    recursive: true,
+                    by_what: Some(ByWhat::Digest(root_directory_digest.clone().into())),
+                })
+                .await
+                .map_err(|e| crate::Error::StorageError(e.to_string()))?
+                .into_inner();
+
+            // The Directory digests we received so far
+            let mut received_directory_digests: HashSet<B3Digest> = HashSet::new();
+            // The Directory digests we're still expecting to get sent.
+            let mut expected_directory_digests: HashSet<B3Digest> = HashSet::from([root_directory_digest]);
+
+            loop {
+                match stream.message().await {
+                    Ok(Some(directory)) => {
+                        // validate the directory itself.
+                        if let Err(e) = directory.validate() {
+                            Err(crate::Error::StorageError(format!(
+                                "directory {} failed validation: {}",
+                                directory.digest(),
+                                e,
+                            )))?;
+                        }
+                        // validate we actually expected that directory, and move it from expected to received.
+                        let directory_digest = directory.digest();
+                        let was_expected = expected_directory_digests.remove(&directory_digest);
+                        if !was_expected {
+                            // FUTUREWORK: dumb clients might send the same stuff twice.
+                            // as a fallback, we might want to tolerate receiving
+                            // it if it's in received_directory_digests (as that
+                            // means it once was in expected_directory_digests)
+                            Err(crate::Error::StorageError(format!(
+                                "received unexpected directory {}",
+                                directory_digest
+                            )))?;
+                        }
+                        received_directory_digests.insert(directory_digest);
+
+                        // register all children in expected_directory_digests.
+                        for child_directory in &directory.directories {
+                            // We ran validate() above, so we know these digests must be correct.
+                            let child_directory_digest =
+                                child_directory.digest.clone().try_into().unwrap();
+
+                            expected_directory_digests
+                                .insert(child_directory_digest);
+                        }
+
+                        yield directory;
+                    },
+                    Ok(None) => {
+                        // If we were still expecting something, that's an error.
+                        if !expected_directory_digests.is_empty() {
+                            Err(crate::Error::StorageError(format!(
+                                "still expected {} directories, but got premature end of stream",
+                                expected_directory_digests.len(),
+                            )))?
+                        } else {
+                            return
+                        }
+                    },
+                    Err(e) => {
+                        Err(crate::Error::StorageError(e.to_string()))?;
+                    },
+                }
+            }
+        };
+
+        Box::pin(stream)
+    }
+
+    #[instrument(skip_all)]
+    fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
+    where
+        Self: Clone,
+    {
+        let mut grpc_client = self.grpc_client.clone();
+
+        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+
+        let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move {
+            let s = grpc_client
+                .put(UnboundedReceiverStream::new(rx))
+                .await?
+                .into_inner();
+
+            Ok(s)
+        });
+
+        Box::new(GRPCPutter::new(tx, task))
+    }
+}
+
+/// Allows uploading multiple Directory messages in the same gRPC stream.
+pub struct GRPCPutter {
+    /// Data about the current request - a handle to the task, and the tx part
+    /// of the channel.
+    /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request.
+    /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed.
+    #[allow(clippy::type_complexity)] // lol
+    rq: Option<(
+        JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
+        UnboundedSender<proto::Directory>,
+    )>,
+}
+
+impl GRPCPutter {
+    pub fn new(
+        directory_sender: UnboundedSender<proto::Directory>,
+        task: JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
+    ) -> Self {
+        Self {
+            rq: Some((task, directory_sender)),
+        }
+    }
+
+    // allows checking if the tx part of the channel is closed.
+    // only used in the test case.
+    #[cfg(test)]
+    fn is_closed(&self) -> bool {
+        match self.rq {
+            None => true,
+            Some((_, ref directory_sender)) => directory_sender.is_closed(),
+        }
+    }
+}
+
+#[async_trait]
+impl DirectoryPutter for GRPCPutter {
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
+        match self.rq {
+            // If we're not already closed, send the directory to directory_sender.
+            Some((_, ref directory_sender)) => {
+                if directory_sender.send(directory).is_err() {
+                    // If the channel has been prematurely closed, invoke close (so we can peek at the error code)
+                    // That error code is much more helpful, because it
+                    // contains the error message from the server.
+                    self.close().await?;
+                }
+                Ok(())
+            }
+            // If self.close() was already called, we can't put again.
+            None => Err(Error::StorageError(
+                "DirectoryPutter already closed".to_string(),
+            )),
+        }
+    }
+
+    /// Closes the stream for sending, and returns the value
+    async fn close(&mut self) -> Result<B3Digest, crate::Error> {
+        // get self.rq, and replace it with None.
+        // This ensures we can only close it once.
+        match std::mem::take(&mut self.rq) {
+            None => Err(Error::StorageError("already closed".to_string())),
+            Some((task, directory_sender)) => {
+                // close directory_sender, so blocking on task will finish.
+                drop(directory_sender);
+
+                let root_digest = task
+                    .await?
+                    .map_err(|e| Error::StorageError(e.to_string()))?
+                    .root_digest;
+
+                root_digest.try_into().map_err(|_| {
+                    Error::StorageError("invalid root digest length in response".to_string())
+                })
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use core::time;
+    use futures::StreamExt;
+    use std::{any::Any, time::Duration};
+    use tempfile::TempDir;
+    use tokio::net::UnixListener;
+    use tokio_retry::{strategy::ExponentialBackoff, Retry};
+    use tokio_stream::wrappers::UnixListenerStream;
+
+    use crate::{
+        directoryservice::{
+            grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService,
+            MemoryDirectoryService,
+        },
+        fixtures::{self, DIRECTORY_A, DIRECTORY_B},
+        proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper},
+        utils::gen_directorysvc_grpc_client,
+    };
+
+    #[tokio::test]
+    async fn test() {
+        // create the GrpcDirectoryService
+        let directory_service =
+            super::GRPCDirectoryService::from_client(gen_directorysvc_grpc_client().await);
+
+        // try to get DIRECTORY_A should return Ok(None)
+        assert_eq!(
+            None,
+            directory_service
+                .get(&DIRECTORY_A.digest())
+                .await
+                .expect("must not fail")
+        );
+
+        // Now upload it
+        assert_eq!(
+            DIRECTORY_A.digest(),
+            directory_service
+                .put(DIRECTORY_A.clone())
+                .await
+                .expect("must succeed")
+        );
+
+        // And retrieve it, compare for equality.
+        assert_eq!(
+            DIRECTORY_A.clone(),
+            directory_service
+                .get(&DIRECTORY_A.digest())
+                .await
+                .expect("must succeed")
+                .expect("must be some")
+        );
+
+        // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
+        directory_service
+            .put(DIRECTORY_B.clone())
+            .await
+            .expect_err("must fail");
+
+        // Putting DIRECTORY_B in a put_multiple will succeed, but the close
+        // will always fail.
+        {
+            let mut handle = directory_service.put_multiple_start();
+            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
+            handle.close().await.expect_err("must fail");
+        }
+
+        // Uploading A and then B should succeed, and closing should return the digest of B.
+        let mut handle = directory_service.put_multiple_start();
+        handle.put(DIRECTORY_A.clone()).await.expect("must succeed");
+        handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
+        let digest = handle.close().await.expect("must succeed");
+        assert_eq!(DIRECTORY_B.digest(), digest);
+
+        // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
+        let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest());
+        assert_eq!(
+            DIRECTORY_B.clone(),
+            directories_it
+                .next()
+                .await
+                .expect("must be some")
+                .expect("must succeed")
+        );
+        assert_eq!(
+            DIRECTORY_A.clone(),
+            directories_it
+                .next()
+                .await
+                .expect("must be some")
+                .expect("must succeed")
+        );
+
+        // Uploading B and then A should fail, because B refers to A, which
+        // hasn't been uploaded yet.
+        // However, the client can burst, so we might not have received the
+        // error back from the server.
+        {
+            let mut handle = directory_service.put_multiple_start();
+            // sending out B will always be fine
+            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
+
+            // whether we will be able to put A as well depends on whether we
+            // already received the error about B.
+            if handle.put(DIRECTORY_A.clone()).await.is_ok() {
+                // If we didn't, and this was Ok(_), …
+                // a subsequent close MUST fail (because it waits for the
+                // server)
+                handle.close().await.expect_err("must fail");
+            }
+        }
+
+        // Now we do the same test as before, send B, then A, but wait
+        // a long long time so we already received the error from the server
+        // (causing the internal stream to be closed).
+        // Uploading anything else subsequently should then fail.
+        {
+            let mut handle = directory_service.put_multiple_start();
+            handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
+
+            // get a GRPCPutter, so we can peek at [is_closed].
+            let handle_any = &mut handle as &mut dyn Any;
+
+            // `unchecked_downcast_mut` is unstable for now,
+            // https://github.com/rust-lang/rust/issues/90850
+            // We do the same thing here.
+            // The reason for why we cannot use the checked downcast lies
+            // in the fact that:
+            // - GRPCPutter has type ID A
+            // - Box<GRPCPutter> has type ID B
+            // - "Box<dyn GRPCPutter>" (invalid type) has type ID C
+            // B seems different from C in this context.
+            // We cannot unpack and perform upcast coercion of the traits as it's an unstable
+            // feature.
+            // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose
+            // of not making leak `is_closed` in the original trait.
+            let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box<GRPCPutter>) };
+            let mut is_closed = false;
+            for _try in 1..1000 {
+                if handle.is_closed() {
+                    is_closed = true;
+                    break;
+                }
+                tokio::time::sleep(time::Duration::from_millis(10)).await;
+            }
+
+            assert!(
+                is_closed,
+                "expected channel to eventually close, but never happened"
+            );
+
+            handle
+                .put(DIRECTORY_A.clone())
+                .await
+                .expect_err("must fail");
+        }
+    }
+
+    /// This ensures connecting via gRPC works as expected.
+    #[tokio::test]
+    async fn test_valid_unix_path_ping_pong() {
+        let tmpdir = TempDir::new().unwrap();
+        let socket_path = tmpdir.path().join("daemon");
+
+        let path_clone = socket_path.clone();
+
+        // Spin up a server
+        tokio::spawn(async {
+            let uds = UnixListener::bind(path_clone).unwrap();
+            let uds_stream = UnixListenerStream::new(uds);
+
+            // spin up a new server
+            let mut server = tonic::transport::Server::builder();
+            let router = server.add_service(
+                crate::proto::directory_service_server::DirectoryServiceServer::new(
+                    GRPCDirectoryServiceWrapper::new(
+                        Box::<MemoryDirectoryService>::default() as Box<dyn DirectoryService>
+                    ),
+                ),
+            );
+            router.serve_with_incoming(uds_stream).await
+        });
+
+        // wait for the socket to be created
+        Retry::spawn(
+            ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)),
+            || async {
+                if socket_path.exists() {
+                    Ok(())
+                } else {
+                    Err(())
+                }
+            },
+        )
+        .await
+        .expect("failed to wait for socket");
+
+        // prepare a client
+        let grpc_client = {
+            let url = url::Url::parse(&format!(
+                "grpc+unix://{}?wait-connect=1",
+                socket_path.display()
+            ))
+            .expect("must parse");
+            let client = DirectoryServiceClient::new(
+                crate::tonic::channel_from_url(&url)
+                    .await
+                    .expect("must succeed"),
+            );
+            GRPCDirectoryService::from_client(client)
+        };
+
+        assert!(grpc_client
+            .get(&fixtures::DIRECTORY_A.digest())
+            .await
+            .expect("must not fail")
+            .is_none())
+    }
+}
diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs
new file mode 100644
index 000000000000..528ffe2f2c03
--- /dev/null
+++ b/tvix/castore/src/directoryservice/memory.rs
@@ -0,0 +1,86 @@
+use crate::{proto, B3Digest, Error};
+use futures::stream::BoxStream;
+use std::collections::HashMap;
+use std::sync::{Arc, RwLock};
+use tonic::async_trait;
+use tracing::{instrument, warn};
+
+use super::utils::{traverse_directory, SimplePutter};
+use super::{DirectoryPutter, DirectoryService};
+
+#[derive(Clone, Default)]
+pub struct MemoryDirectoryService {
+    db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>,
+}
+
+#[async_trait]
+impl DirectoryService for MemoryDirectoryService {
+    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
+    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+        let db = self.db.read()?;
+
+        match db.get(digest) {
+            // The directory was not found, return
+            None => Ok(None),
+
+            // The directory was found, try to parse the data as Directory message
+            Some(directory) => {
+                // Validate the retrieved Directory indeed has the
+                // digest we expect it to have, to detect corruptions.
+                let actual_digest = directory.digest();
+                if actual_digest != *digest {
+                    return Err(Error::StorageError(format!(
+                        "requested directory with digest {}, but got {}",
+                        digest, actual_digest
+                    )));
+                }
+
+                // Validate the Directory itself is valid.
+                if let Err(e) = directory.validate() {
+                    warn!("directory failed validation: {}", e.to_string());
+                    return Err(Error::StorageError(format!(
+                        "directory {} failed validation: {}",
+                        actual_digest, e,
+                    )));
+                }
+
+                Ok(Some(directory.clone()))
+            }
+        }
+    }
+
+    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
+    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+        let digest = directory.digest();
+
+        // validate the directory itself.
+        if let Err(e) = directory.validate() {
+            return Err(Error::InvalidRequest(format!(
+                "directory {} failed validation: {}",
+                digest, e,
+            )));
+        }
+
+        // store it
+        let mut db = self.db.write()?;
+        db.insert(digest.clone(), directory);
+
+        Ok(digest)
+    }
+
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    fn get_recursive(
+        &self,
+        root_directory_digest: &B3Digest,
+    ) -> BoxStream<Result<proto::Directory, Error>> {
+        traverse_directory(self.clone(), root_directory_digest)
+    }
+
+    #[instrument(skip_all)]
+    fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
+    where
+        Self: Clone,
+    {
+        Box::new(SimplePutter::new(self.clone()))
+    }
+}
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
new file mode 100644
index 000000000000..db3d5767eadd
--- /dev/null
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -0,0 +1,76 @@
+use crate::{proto, B3Digest, Error};
+use futures::stream::BoxStream;
+use tonic::async_trait;
+
+mod from_addr;
+mod grpc;
+mod memory;
+mod sled;
+mod traverse;
+mod utils;
+
+pub use self::from_addr::from_addr;
+pub use self::grpc::GRPCDirectoryService;
+pub use self::memory::MemoryDirectoryService;
+pub use self::sled::SledDirectoryService;
+pub use self::traverse::descend_to;
+
+/// The base trait all Directory services need to implement.
+/// This is a simple get and put of [crate::proto::Directory], returning their
+/// digest.
+#[async_trait]
+pub trait DirectoryService: Send + Sync {
+    /// Looks up a single Directory message by its digest.
+    /// The returned Directory message *must* be valid.
+    /// In case the directory is not found, Ok(None) is returned.
+    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
+    /// Uploads a single Directory message, and returns the calculated
+    /// digest, or an error. An error *must* also be returned if the message is
+    /// not valid.
+    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
+
+    /// Looks up a closure of [proto::Directory].
+    /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`,
+    /// and we'd be able to add a default implementation for it here, but
+    /// we can't have that yet.
+    ///
+    /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily,
+    /// and the box allows different underlying stream implementations to be returned since
+    /// Rust doesn't support this as a generic in traits yet. This is the same thing that
+    /// [async_trait] generates, but for streams instead of futures.
+    ///
+    /// The individual Directory messages *must* be valid.
+    fn get_recursive(
+        &self,
+        root_directory_digest: &B3Digest,
+    ) -> BoxStream<Result<proto::Directory, Error>>;
+
+    /// Allows persisting a closure of [proto::Directory], which is a graph of
+    /// connected Directory messages.
+    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>;
+}
+
+/// Provides a handle to put a closure of connected [proto::Directory] elements.
+///
+/// The consumer can periodically call [DirectoryPutter::put], starting from the
+/// leaves. Once the root is reached, [DirectoryPutter::close] can be called to
+/// retrieve the root digest (or an error).
+///
+/// DirectoryPutters might be created without a single [DirectoryPutter::put],
+/// and then dropped without calling [DirectoryPutter::close],
+/// for example when ingesting a path that ends up not pointing to a directory,
+/// but a single file or symlink.
+#[async_trait]
+pub trait DirectoryPutter: Send {
+    /// Put a individual [proto::Directory] into the store.
+    /// Error semantics and behaviour is up to the specific implementation of
+    /// this trait.
+    /// Due to bursting, the returned error might refer to an object previously
+    /// sent via `put`.
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
+
+    /// Close the stream, and wait for any errors.
+    /// If there's been any invalid Directory message uploaded, and error *must*
+    /// be returned.
+    async fn close(&mut self) -> Result<B3Digest, Error>;
+}
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs
new file mode 100644
index 000000000000..9acd3854184b
--- /dev/null
+++ b/tvix/castore/src/directoryservice/sled.rs
@@ -0,0 +1,112 @@
+use crate::directoryservice::DirectoryPutter;
+use crate::proto::Directory;
+use crate::{proto, B3Digest, Error};
+use futures::stream::BoxStream;
+use prost::Message;
+use std::path::Path;
+use tonic::async_trait;
+use tracing::{instrument, warn};
+
+use super::utils::{traverse_directory, SimplePutter};
+use super::DirectoryService;
+
+#[derive(Clone)]
+pub struct SledDirectoryService {
+    db: sled::Db,
+}
+
+impl SledDirectoryService {
+    pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> {
+        let config = sled::Config::default()
+            .use_compression(false) // is a required parameter
+            .path(p);
+        let db = config.open()?;
+
+        Ok(Self { db })
+    }
+
+    pub fn new_temporary() -> Result<Self, sled::Error> {
+        let config = sled::Config::default().temporary(true);
+        let db = config.open()?;
+
+        Ok(Self { db })
+    }
+}
+
+#[async_trait]
+impl DirectoryService for SledDirectoryService {
+    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
+    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+        match self.db.get(digest.as_slice()) {
+            // The directory was not found, return
+            Ok(None) => Ok(None),
+
+            // The directory was found, try to parse the data as Directory message
+            Ok(Some(data)) => match Directory::decode(&*data) {
+                Ok(directory) => {
+                    // Validate the retrieved Directory indeed has the
+                    // digest we expect it to have, to detect corruptions.
+                    let actual_digest = directory.digest();
+                    if actual_digest != *digest {
+                        return Err(Error::StorageError(format!(
+                            "requested directory with digest {}, but got {}",
+                            digest, actual_digest
+                        )));
+                    }
+
+                    // Validate the Directory itself is valid.
+                    if let Err(e) = directory.validate() {
+                        warn!("directory failed validation: {}", e.to_string());
+                        return Err(Error::StorageError(format!(
+                            "directory {} failed validation: {}",
+                            actual_digest, e,
+                        )));
+                    }
+
+                    Ok(Some(directory))
+                }
+                Err(e) => {
+                    warn!("unable to parse directory {}: {}", digest, e);
+                    Err(Error::StorageError(e.to_string()))
+                }
+            },
+            // some storage error?
+            Err(e) => Err(Error::StorageError(e.to_string())),
+        }
+    }
+
+    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
+    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+        let digest = directory.digest();
+
+        // validate the directory itself.
+        if let Err(e) = directory.validate() {
+            return Err(Error::InvalidRequest(format!(
+                "directory {} failed validation: {}",
+                digest, e,
+            )));
+        }
+        // store it
+        let result = self.db.insert(digest.as_slice(), directory.encode_to_vec());
+        if let Err(e) = result {
+            return Err(Error::StorageError(e.to_string()));
+        }
+        Ok(digest)
+    }
+
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    fn get_recursive(
+        &self,
+        root_directory_digest: &B3Digest,
+    ) -> BoxStream<Result<proto::Directory, Error>> {
+        traverse_directory(self.clone(), root_directory_digest)
+    }
+
+    #[instrument(skip_all)]
+    fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
+    where
+        Self: Clone,
+    {
+        Box::new(SimplePutter::new(self.clone()))
+    }
+}
diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs
new file mode 100644
index 000000000000..528cef757432
--- /dev/null
+++ b/tvix/castore/src/directoryservice/traverse.rs
@@ -0,0 +1,231 @@
+use super::DirectoryService;
+use crate::{proto::NamedNode, B3Digest, Error};
+use std::os::unix::ffi::OsStrExt;
+use tracing::{instrument, warn};
+
+/// This descends from a (root) node to the given (sub)path, returning the Node
+/// at that path, or none, if there's nothing at that path.
+#[instrument(skip(directory_service))]
+pub async fn descend_to<DS>(
+    directory_service: DS,
+    root_node: crate::proto::node::Node,
+    path: &std::path::Path,
+) -> Result<Option<crate::proto::node::Node>, Error>
+where
+    DS: AsRef<dyn DirectoryService>,
+{
+    // strip a possible `/` prefix from the path.
+    let path = {
+        if path.starts_with("/") {
+            path.strip_prefix("/").unwrap()
+        } else {
+            path
+        }
+    };
+
+    let mut cur_node = root_node;
+    let mut it = path.components();
+
+    loop {
+        match it.next() {
+            None => {
+                // the (remaining) path is empty, return the node we're current at.
+                return Ok(Some(cur_node));
+            }
+            Some(first_component) => {
+                match cur_node {
+                    crate::proto::node::Node::File(_) | crate::proto::node::Node::Symlink(_) => {
+                        // There's still some path left, but the current node is no directory.
+                        // This means the path doesn't exist, as we can't reach it.
+                        return Ok(None);
+                    }
+                    crate::proto::node::Node::Directory(directory_node) => {
+                        let digest: B3Digest = directory_node.digest.try_into().map_err(|_e| {
+                            Error::StorageError("invalid digest length".to_string())
+                        })?;
+
+                        // fetch the linked node from the directory_service
+                        match directory_service.as_ref().get(&digest).await? {
+                            // If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
+                            None => {
+                                warn!("directory {} does not exist", digest);
+
+                                return Err(Error::StorageError(format!(
+                                    "directory {} does not exist",
+                                    digest
+                                )));
+                            }
+                            Some(directory) => {
+                                // look for first_component in the [Directory].
+                                // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we
+                                // could stop as soon as e.name is larger than the search string.
+                                let child_node = directory.nodes().find(|n| {
+                                    n.get_name() == first_component.as_os_str().as_bytes()
+                                });
+
+                                match child_node {
+                                    // child node not found means there's no such element inside the directory.
+                                    None => {
+                                        return Ok(None);
+                                    }
+                                    // child node found, return to top-of loop to find the next
+                                    // node in the path.
+                                    Some(child_node) => {
+                                        cur_node = child_node;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::path::PathBuf;
+
+    use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP};
+    use crate::utils::gen_directory_service;
+
+    use super::descend_to;
+
+    #[tokio::test]
+    async fn test_descend_to() {
+        let directory_service = gen_directory_service();
+
+        let mut handle = directory_service.put_multiple_start();
+        handle
+            .put(DIRECTORY_WITH_KEEP.clone())
+            .await
+            .expect("must succeed");
+        handle
+            .put(DIRECTORY_COMPLICATED.clone())
+            .await
+            .expect("must succeed");
+
+        // construct the node for DIRECTORY_COMPLICATED
+        let node_directory_complicated =
+            crate::proto::node::Node::Directory(crate::proto::DirectoryNode {
+                name: "doesntmatter".into(),
+                digest: DIRECTORY_COMPLICATED.digest().into(),
+                size: DIRECTORY_COMPLICATED.size(),
+            });
+
+        // construct the node for DIRECTORY_COMPLICATED
+        let node_directory_with_keep = crate::proto::node::Node::Directory(
+            DIRECTORY_COMPLICATED.directories.first().unwrap().clone(),
+        );
+
+        // construct the node for the .keep file
+        let node_file_keep =
+            crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone());
+
+        // traversal to an empty subpath should return the root node.
+        {
+            let resp = descend_to(
+                &directory_service,
+                node_directory_complicated.clone(),
+                &PathBuf::from(""),
+            )
+            .await
+            .expect("must succeed");
+
+            assert_eq!(Some(node_directory_complicated.clone()), resp);
+        }
+
+        // traversal to `keep` should return the node for DIRECTORY_WITH_KEEP
+        {
+            let resp = descend_to(
+                &directory_service,
+                node_directory_complicated.clone(),
+                &PathBuf::from("keep"),
+            )
+            .await
+            .expect("must succeed");
+
+            assert_eq!(Some(node_directory_with_keep), resp);
+        }
+
+        // traversal to `keep/.keep` should return the node for the .keep file
+        {
+            let resp = descend_to(
+                &directory_service,
+                node_directory_complicated.clone(),
+                &PathBuf::from("keep/.keep"),
+            )
+            .await
+            .expect("must succeed");
+
+            assert_eq!(Some(node_file_keep.clone()), resp);
+        }
+
+        // traversal to `keep/.keep` should return the node for the .keep file
+        {
+            let resp = descend_to(
+                &directory_service,
+                node_directory_complicated.clone(),
+                &PathBuf::from("/keep/.keep"),
+            )
+            .await
+            .expect("must succeed");
+
+            assert_eq!(Some(node_file_keep), resp);
+        }
+
+        // traversal to `void` should return None (doesn't exist)
+        {
+            let resp = descend_to(
+                &directory_service,
+                node_directory_complicated.clone(),
+                &PathBuf::from("void"),
+            )
+            .await
+            .expect("must succeed");
+
+            assert_eq!(None, resp);
+        }
+
+        // traversal to `void` should return None (doesn't exist)
+        {
+            let resp = descend_to(
+                &directory_service,
+                node_directory_complicated.clone(),
+                &PathBuf::from("//v/oid"),
+            )
+            .await
+            .expect("must succeed");
+
+            assert_eq!(None, resp);
+        }
+
+        // traversal to `keep/.keep/404` should return None (the path can't be
+        // reached, as keep/.keep already is a file)
+        {
+            let resp = descend_to(
+                &directory_service,
+                node_directory_complicated.clone(),
+                &PathBuf::from("keep/.keep/foo"),
+            )
+            .await
+            .expect("must succeed");
+
+            assert_eq!(None, resp);
+        }
+
+        // traversal to a subpath of '/' should return the root node.
+        {
+            let resp = descend_to(
+                &directory_service,
+                node_directory_complicated.clone(),
+                &PathBuf::from("/"),
+            )
+            .await
+            .expect("must succeed");
+
+            assert_eq!(Some(node_directory_complicated), resp);
+        }
+    }
+}
diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs
new file mode 100644
index 000000000000..341705a8db9f
--- /dev/null
+++ b/tvix/castore/src/directoryservice/utils.rs
@@ -0,0 +1,134 @@
+use super::DirectoryPutter;
+use super::DirectoryService;
+use crate::proto;
+use crate::B3Digest;
+use crate::Error;
+use async_stream::stream;
+use futures::stream::BoxStream;
+use std::collections::{HashSet, VecDeque};
+use tonic::async_trait;
+use tracing::warn;
+
+/// Traverses a [proto::Directory] from the root to the children.
+///
+/// This is mostly BFS, but directories are only returned once.
+pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
+    directory_service: DS,
+    root_directory_digest: &B3Digest,
+) -> BoxStream<'a, Result<proto::Directory, Error>> {
+    // The list of all directories that still need to be traversed. The next
+    // element is picked from the front, new elements are enqueued at the
+    // back.
+    let mut worklist_directory_digests: VecDeque<B3Digest> =
+        VecDeque::from([root_directory_digest.clone()]);
+    // The list of directory digests already sent to the consumer.
+    // We omit sending the same directories multiple times.
+    let mut sent_directory_digests: HashSet<B3Digest> = HashSet::new();
+
+    let stream = stream! {
+        while let Some(current_directory_digest) = worklist_directory_digests.pop_front() {
+            match directory_service.get(&current_directory_digest).await {
+                // if it's not there, we have an inconsistent store!
+                Ok(None) => {
+                    warn!("directory {} does not exist", current_directory_digest);
+                    yield Err(Error::StorageError(format!(
+                        "directory {} does not exist",
+                        current_directory_digest
+                    )));
+                }
+                Err(e) => {
+                    warn!("failed to look up directory");
+                    yield Err(Error::StorageError(format!(
+                        "unable to look up directory {}: {}",
+                        current_directory_digest, e
+                    )));
+                }
+
+                // if we got it
+                Ok(Some(current_directory)) => {
+                    // validate, we don't want to send invalid directories.
+                    if let Err(e) = current_directory.validate() {
+                        warn!("directory failed validation: {}", e.to_string());
+                        yield Err(Error::StorageError(format!(
+                            "invalid directory: {}",
+                            current_directory_digest
+                        )));
+                    }
+
+                    // We're about to send this directory, so let's avoid sending it again if a
+                    // descendant has it.
+                    sent_directory_digests.insert(current_directory_digest);
+
+                    // enqueue all child directory digests to the work queue, as
+                    // long as they're not part of the worklist or already sent.
+                    // This panics if the digest looks invalid, it's supposed to be checked first.
+                    for child_directory_node in &current_directory.directories {
+                        // TODO: propagate error
+                        let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap();
+
+                        if worklist_directory_digests.contains(&child_digest)
+                            || sent_directory_digests.contains(&child_digest)
+                        {
+                            continue;
+                        }
+                        worklist_directory_digests.push_back(child_digest);
+                    }
+
+                    yield Ok(current_directory);
+                }
+            };
+        }
+    };
+
+    Box::pin(stream)
+}
+
+/// This is a simple implementation of a Directory uploader.
+/// TODO: verify connectivity? Factor out these checks into generic helpers?
+pub struct SimplePutter<DS: DirectoryService> {
+    directory_service: DS,
+    last_directory_digest: Option<B3Digest>,
+    closed: bool,
+}
+
+impl<DS: DirectoryService> SimplePutter<DS> {
+    pub fn new(directory_service: DS) -> Self {
+        Self {
+            directory_service,
+            closed: false,
+            last_directory_digest: None,
+        }
+    }
+}
+
+#[async_trait]
+impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+        if self.closed {
+            return Err(Error::StorageError("already closed".to_string()));
+        }
+
+        let digest = self.directory_service.put(directory).await?;
+
+        // track the last directory digest
+        self.last_directory_digest = Some(digest);
+
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<B3Digest, Error> {
+        if self.closed {
+            return Err(Error::StorageError("already closed".to_string()));
+        }
+
+        match &self.last_directory_digest {
+            Some(last_digest) => {
+                self.closed = true;
+                Ok(last_digest.clone())
+            }
+            None => Err(Error::InvalidRequest(
+                "no directories sent, can't show root digest".to_string(),
+            )),
+        }
+    }
+}
diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs
new file mode 100644
index 000000000000..1b3ae4d1c862
--- /dev/null
+++ b/tvix/castore/src/errors.rs
@@ -0,0 +1,61 @@
+use std::sync::PoisonError;
+use thiserror::Error;
+use tokio::task::JoinError;
+use tonic::Status;
+
+/// Errors related to communication with the store.
+#[derive(Debug, Error)]
+pub enum Error {
+    #[error("invalid request: {0}")]
+    InvalidRequest(String),
+
+    #[error("internal storage error: {0}")]
+    StorageError(String),
+}
+
+impl<T> From<PoisonError<T>> for Error {
+    fn from(value: PoisonError<T>) -> Self {
+        Error::StorageError(value.to_string())
+    }
+}
+
+impl From<JoinError> for Error {
+    fn from(value: JoinError) -> Self {
+        Error::StorageError(value.to_string())
+    }
+}
+
+impl From<Error> for Status {
+    fn from(value: Error) -> Self {
+        match value {
+            Error::InvalidRequest(msg) => Status::invalid_argument(msg),
+            Error::StorageError(msg) => Status::data_loss(format!("storage error: {}", msg)),
+        }
+    }
+}
+
+impl From<crate::tonic::Error> for Error {
+    fn from(value: crate::tonic::Error) -> Self {
+        Self::StorageError(value.to_string())
+    }
+}
+
+impl From<std::io::Error> for Error {
+    fn from(value: std::io::Error) -> Self {
+        if value.kind() == std::io::ErrorKind::InvalidInput {
+            Error::InvalidRequest(value.to_string())
+        } else {
+            Error::StorageError(value.to_string())
+        }
+    }
+}
+
+// TODO: this should probably go somewhere else?
+impl From<Error> for std::io::Error {
+    fn from(value: Error) -> Self {
+        match value {
+            Error::InvalidRequest(msg) => Self::new(std::io::ErrorKind::InvalidInput, msg),
+            Error::StorageError(msg) => Self::new(std::io::ErrorKind::Other, msg),
+        }
+    }
+}
diff --git a/tvix/castore/src/fixtures.rs b/tvix/castore/src/fixtures.rs
new file mode 100644
index 000000000000..a206d9b7ddc6
--- /dev/null
+++ b/tvix/castore/src/fixtures.rs
@@ -0,0 +1,88 @@
+use crate::{
+    proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode},
+    B3Digest,
+};
+use lazy_static::lazy_static;
+
+pub const HELLOWORLD_BLOB_CONTENTS: &[u8] = b"Hello World!";
+pub const EMPTY_BLOB_CONTENTS: &[u8] = b"";
+
+lazy_static! {
+    pub static ref DUMMY_DIGEST: B3Digest = {
+        let u = [0u8; 32];
+        (&u).into()
+    };
+    pub static ref DUMMY_DIGEST_2: B3Digest = {
+        let mut u = [0u8; 32];
+        u[0] = 0x10;
+        (&u).into()
+    };
+    pub static ref DUMMY_DATA_1: bytes::Bytes = vec![0x01, 0x02, 0x03].into();
+    pub static ref DUMMY_DATA_2: bytes::Bytes = vec![0x04, 0x05].into();
+
+    pub static ref HELLOWORLD_BLOB_DIGEST: B3Digest =
+        blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().into();
+    pub static ref EMPTY_BLOB_DIGEST: B3Digest =
+        blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().into();
+
+    // 2 bytes
+    pub static ref BLOB_A: bytes::Bytes = vec![0x00, 0x01].into();
+    pub static ref BLOB_A_DIGEST: B3Digest = blake3::hash(&BLOB_A).as_bytes().into();
+
+    // 1MB
+    pub static ref BLOB_B: bytes::Bytes = (0..255).collect::<Vec<u8>>().repeat(4 * 1024).into();
+    pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into();
+
+    // Directories
+    pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory {
+        directories: vec![],
+        files: vec![FileNode {
+            name: b".keep".to_vec().into(),
+            digest: EMPTY_BLOB_DIGEST.clone().into(),
+            size: 0,
+            executable: false,
+        }],
+        symlinks: vec![],
+    };
+    pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory {
+        directories: vec![DirectoryNode {
+            name: b"keep".to_vec().into(),
+            digest: DIRECTORY_WITH_KEEP.digest().into(),
+            size: DIRECTORY_WITH_KEEP.size(),
+        }],
+        files: vec![FileNode {
+            name: b".keep".to_vec().into(),
+            digest: EMPTY_BLOB_DIGEST.clone().into(),
+            size: 0,
+            executable: false,
+        }],
+        symlinks: vec![SymlinkNode {
+            name: b"aa".to_vec().into(),
+            target: b"/nix/store/somewhereelse".to_vec().into(),
+        }],
+    };
+    pub static ref DIRECTORY_A: Directory = Directory::default();
+    pub static ref DIRECTORY_B: Directory = Directory {
+        directories: vec![DirectoryNode {
+            name: b"a".to_vec().into(),
+            digest: DIRECTORY_A.digest().into(),
+            size: DIRECTORY_A.size(),
+        }],
+        ..Default::default()
+    };
+    pub static ref DIRECTORY_C: Directory = Directory {
+        directories: vec![
+            DirectoryNode {
+                name: b"a".to_vec().into(),
+                digest: DIRECTORY_A.digest().into(),
+                size: DIRECTORY_A.size(),
+            },
+            DirectoryNode {
+                name: b"a'".to_vec().into(),
+                digest: DIRECTORY_A.digest().into(),
+                size: DIRECTORY_A.size(),
+            }
+        ],
+        ..Default::default()
+    };
+}
diff --git a/tvix/castore/src/fs/file_attr.rs b/tvix/castore/src/fs/file_attr.rs
new file mode 100644
index 000000000000..ad41f036a253
--- /dev/null
+++ b/tvix/castore/src/fs/file_attr.rs
@@ -0,0 +1,53 @@
+#![allow(clippy::unnecessary_cast)] // libc::S_IFDIR is u32 on Linux and u16 on MacOS
+use super::inodes::{DirectoryInodeData, InodeData};
+use fuse_backend_rs::abi::fuse_abi::Attr;
+
+/// The [Attr] describing the root
+pub const ROOT_FILE_ATTR: Attr = Attr {
+    ino: fuse_backend_rs::api::filesystem::ROOT_ID,
+    size: 0,
+    blksize: 1024,
+    blocks: 0,
+    mode: libc::S_IFDIR as u32 | 0o555,
+    atime: 0,
+    mtime: 0,
+    ctime: 0,
+    atimensec: 0,
+    mtimensec: 0,
+    ctimensec: 0,
+    nlink: 0,
+    uid: 0,
+    gid: 0,
+    rdev: 0,
+    flags: 0,
+    #[cfg(target_os = "macos")]
+    crtime: 0,
+    #[cfg(target_os = "macos")]
+    crtimensec: 0,
+    #[cfg(target_os = "macos")]
+    padding: 0,
+};
+
+/// for given &Node and inode, construct an [Attr]
+pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> Attr {
+    Attr {
+        ino: inode,
+        // FUTUREWORK: play with this numbers, as it affects read sizes for client applications.
+        blocks: 1024,
+        size: match inode_data {
+            InodeData::Regular(_, size, _) => *size as u64,
+            InodeData::Symlink(target) => target.len() as u64,
+            InodeData::Directory(DirectoryInodeData::Sparse(_, size)) => *size as u64,
+            InodeData::Directory(DirectoryInodeData::Populated(_, ref children)) => {
+                children.len() as u64
+            }
+        },
+        mode: match inode_data {
+            InodeData::Regular(_, _, false) => libc::S_IFREG as u32 | 0o444, // no-executable files
+            InodeData::Regular(_, _, true) => libc::S_IFREG as u32 | 0o555,  // executable files
+            InodeData::Symlink(_) => libc::S_IFLNK as u32 | 0o444,
+            InodeData::Directory(_) => libc::S_IFDIR as u32 | 0o555,
+        },
+        ..Default::default()
+    }
+}
diff --git a/tvix/castore/src/fs/fuse.rs b/tvix/castore/src/fs/fuse.rs
new file mode 100644
index 000000000000..1dce43915905
--- /dev/null
+++ b/tvix/castore/src/fs/fuse.rs
@@ -0,0 +1,115 @@
+use std::{io, path::Path, sync::Arc, thread};
+
+use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
+use tracing::{error, instrument};
+
+struct FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
+    channel: fuse_backend_rs::transport::FuseChannel,
+}
+
+#[cfg(target_os = "macos")]
+const BADFD: libc::c_int = libc::EBADF;
+#[cfg(target_os = "linux")]
+const BADFD: libc::c_int = libc::EBADFD;
+
+impl<FS> FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    fn start(&mut self) -> io::Result<()> {
+        while let Some((reader, writer)) = self
+            .channel
+            .get_request()
+            .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
+        {
+            if let Err(e) = self
+                .server
+                .handle_message(reader, writer.into(), None, None)
+            {
+                match e {
+                    // This indicates the session has been shut down.
+                    fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => {
+                        break;
+                    }
+                    error => {
+                        error!(?error, "failed to handle fuse request");
+                        continue;
+                    }
+                }
+            }
+        }
+        Ok(())
+    }
+}
+
+pub struct FuseDaemon {
+    session: FuseSession,
+    threads: Vec<thread::JoinHandle<()>>,
+}
+
+impl FuseDaemon {
+    #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)]
+    pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error>
+    where
+        FS: FileSystem + Sync + Send + 'static,
+        P: AsRef<Path> + std::fmt::Debug,
+    {
+        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
+
+        let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        #[cfg(target_os = "linux")]
+        session.set_allow_other(false);
+        session
+            .mount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+        let mut join_handles = Vec::with_capacity(threads);
+        for _ in 0..threads {
+            let mut server = FuseServer {
+                server: server.clone(),
+                channel: session
+                    .new_channel()
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
+            };
+            let join_handle = thread::Builder::new()
+                .name("fuse_server".to_string())
+                .spawn(move || {
+                    let _ = server.start();
+                })?;
+            join_handles.push(join_handle);
+        }
+
+        Ok(FuseDaemon {
+            session,
+            threads: join_handles,
+        })
+    }
+
+    #[instrument(skip_all, err)]
+    pub fn unmount(&mut self) -> Result<(), io::Error> {
+        self.session
+            .umount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        for thread in self.threads.drain(..) {
+            thread.join().map_err(|_| {
+                io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
+            })?;
+        }
+
+        Ok(())
+    }
+}
+
+impl Drop for FuseDaemon {
+    fn drop(&mut self) {
+        if let Err(error) = self.unmount() {
+            error!(?error, "failed to unmont fuse filesystem")
+        }
+    }
+}
diff --git a/tvix/castore/src/fs/inode_tracker.rs b/tvix/castore/src/fs/inode_tracker.rs
new file mode 100644
index 000000000000..4a8283b6b144
--- /dev/null
+++ b/tvix/castore/src/fs/inode_tracker.rs
@@ -0,0 +1,207 @@
+use std::{collections::HashMap, sync::Arc};
+
+use super::inodes::{DirectoryInodeData, InodeData};
+use crate::B3Digest;
+
+/// InodeTracker keeps track of inodes, stores data being these inodes and deals
+/// with inode allocation.
+pub struct InodeTracker {
+    data: HashMap<u64, Arc<InodeData>>,
+
+    // lookup table for blobs by their B3Digest
+    blob_digest_to_inode: HashMap<B3Digest, u64>,
+
+    // lookup table for symlinks by their target
+    symlink_target_to_inode: HashMap<bytes::Bytes, u64>,
+
+    // lookup table for directories by their B3Digest.
+    // Note the corresponding directory may not be present in data yet.
+    directory_digest_to_inode: HashMap<B3Digest, u64>,
+
+    // the next inode to allocate
+    next_inode: u64,
+}
+
+impl Default for InodeTracker {
+    fn default() -> Self {
+        Self {
+            data: Default::default(),
+
+            blob_digest_to_inode: Default::default(),
+            symlink_target_to_inode: Default::default(),
+            directory_digest_to_inode: Default::default(),
+
+            next_inode: 2,
+        }
+    }
+}
+
+impl InodeTracker {
+    // Retrieves data for a given inode, if it exists.
+    pub fn get(&self, ino: u64) -> Option<Arc<InodeData>> {
+        self.data.get(&ino).cloned()
+    }
+
+    // Replaces data for a given inode.
+    // Panics if the inode doesn't already exist.
+    pub fn replace(&mut self, ino: u64, data: Arc<InodeData>) {
+        if self.data.insert(ino, data).is_none() {
+            panic!("replace called on unknown inode");
+        }
+    }
+
+    // Stores data and returns the inode for it.
+    // In case an inode has already been allocated for the same data, that inode
+    // is returned, otherwise a new one is allocated.
+    // In case data is a [InodeData::Directory], inodes for all items are looked
+    // up
+    pub fn put(&mut self, data: InodeData) -> u64 {
+        match data {
+            InodeData::Regular(ref digest, _, _) => {
+                match self.blob_digest_to_inode.get(digest) {
+                    Some(found_ino) => {
+                        // We already have it, return the inode.
+                        *found_ino
+                    }
+                    None => self.insert_and_increment(data),
+                }
+            }
+            InodeData::Symlink(ref target) => {
+                match self.symlink_target_to_inode.get(target) {
+                    Some(found_ino) => {
+                        // We already have it, return the inode.
+                        *found_ino
+                    }
+                    None => self.insert_and_increment(data),
+                }
+            }
+            InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => {
+                // check the lookup table if the B3Digest is known.
+                match self.directory_digest_to_inode.get(digest) {
+                    Some(found_ino) => {
+                        // We already have it, return the inode.
+                        *found_ino
+                    }
+                    None => {
+                        // insert and return the inode
+                        self.insert_and_increment(data)
+                    }
+                }
+            }
+            // Inserting [DirectoryInodeData::Populated] doesn't normally happen,
+            // only via [replace].
+            InodeData::Directory(DirectoryInodeData::Populated(..)) => {
+                unreachable!("should never be called with DirectoryInodeData::Populated")
+            }
+        }
+    }
+
+    // Inserts the data and returns the inode it was stored at, while
+    // incrementing next_inode.
+    fn insert_and_increment(&mut self, data: InodeData) -> u64 {
+        let ino = self.next_inode;
+        // insert into lookup tables
+        match data {
+            InodeData::Regular(ref digest, _, _) => {
+                self.blob_digest_to_inode.insert(digest.clone(), ino);
+            }
+            InodeData::Symlink(ref target) => {
+                self.symlink_target_to_inode.insert(target.clone(), ino);
+            }
+            InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => {
+                self.directory_digest_to_inode.insert(digest.clone(), ino);
+            }
+            // This is currently not used outside test fixtures.
+            // Usually a [DirectoryInodeData::Sparse] is inserted and later
+            // "upgraded" with more data.
+            // However, as a future optimization, a lookup for a PathInfo could trigger a
+            // [DirectoryService::get_recursive()] request that "forks into
+            // background" and prepopulates all Directories in a closure.
+            InodeData::Directory(DirectoryInodeData::Populated(ref digest, _)) => {
+                self.directory_digest_to_inode.insert(digest.clone(), ino);
+            }
+        }
+        // Insert data
+        self.data.insert(ino, Arc::new(data));
+
+        // increment inode counter and return old inode.
+        self.next_inode += 1;
+        ino
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::fixtures;
+
+    use super::InodeData;
+    use super::InodeTracker;
+
+    /// Getting something non-existent should be none
+    #[test]
+    fn get_nonexistent() {
+        let inode_tracker = InodeTracker::default();
+        assert!(inode_tracker.get(1).is_none());
+    }
+
+    /// Put of a regular file should allocate a uid, which should be the same when inserting again.
+    #[test]
+    fn put_regular() {
+        let mut inode_tracker = InodeTracker::default();
+        let f = InodeData::Regular(
+            fixtures::BLOB_A_DIGEST.clone(),
+            fixtures::BLOB_A.len() as u64,
+            false,
+        );
+
+        // put it in
+        let ino = inode_tracker.put(f.clone());
+
+        // a get should return the right data
+        let data = inode_tracker.get(ino).expect("must be some");
+        match *data {
+            InodeData::Regular(ref digest, _, _) => {
+                assert_eq!(&fixtures::BLOB_A_DIGEST.clone(), digest);
+            }
+            InodeData::Symlink(_) | InodeData::Directory(..) => panic!("wrong type"),
+        }
+
+        // another put should return the same ino
+        assert_eq!(ino, inode_tracker.put(f));
+
+        // inserting another file should return a different ino
+        assert_ne!(
+            ino,
+            inode_tracker.put(InodeData::Regular(
+                fixtures::BLOB_B_DIGEST.clone(),
+                fixtures::BLOB_B.len() as u64,
+                false,
+            ))
+        );
+    }
+
+    // Put of a symlink should allocate a uid, which should be the same when inserting again
+    #[test]
+    fn put_symlink() {
+        let mut inode_tracker = InodeTracker::default();
+        let f = InodeData::Symlink("target".into());
+
+        // put it in
+        let ino = inode_tracker.put(f.clone());
+
+        // a get should return the right data
+        let data = inode_tracker.get(ino).expect("must be some");
+        match *data {
+            InodeData::Symlink(ref target) => {
+                assert_eq!(b"target".to_vec(), *target);
+            }
+            InodeData::Regular(..) | InodeData::Directory(..) => panic!("wrong type"),
+        }
+
+        // another put should return the same ino
+        assert_eq!(ino, inode_tracker.put(f));
+
+        // inserting another file should return a different ino
+        assert_ne!(ino, inode_tracker.put(InodeData::Symlink("target2".into())));
+    }
+}
diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs
new file mode 100644
index 000000000000..9131b703bae0
--- /dev/null
+++ b/tvix/castore/src/fs/inodes.rs
@@ -0,0 +1,57 @@
+//! This module contains all the data structures used to track information
+//! about inodes, which present tvix-castore nodes in a filesystem.
+use crate::proto as castorepb;
+use crate::B3Digest;
+
+#[derive(Clone, Debug)]
+pub enum InodeData {
+    Regular(B3Digest, u64, bool),  // digest, size, executable
+    Symlink(bytes::Bytes),         // target
+    Directory(DirectoryInodeData), // either [DirectoryInodeData:Sparse] or [DirectoryInodeData:Populated]
+}
+
+/// This encodes the two different states of [InodeData::Directory].
+/// Either the data still is sparse (we only saw a [castorepb::DirectoryNode],
+/// but didn't fetch the [castorepb::Directory] struct yet, or we processed a
+/// lookup and did fetch the data.
+#[derive(Clone, Debug)]
+pub enum DirectoryInodeData {
+    Sparse(B3Digest, u64),                                  // digest, size
+    Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)]
+}
+
+impl From<&castorepb::node::Node> for InodeData {
+    fn from(value: &castorepb::node::Node) -> Self {
+        match value {
+            castorepb::node::Node::Directory(directory_node) => directory_node.into(),
+            castorepb::node::Node::File(file_node) => file_node.into(),
+            castorepb::node::Node::Symlink(symlink_node) => symlink_node.into(),
+        }
+    }
+}
+
+impl From<&castorepb::SymlinkNode> for InodeData {
+    fn from(value: &castorepb::SymlinkNode) -> Self {
+        InodeData::Symlink(value.target.clone())
+    }
+}
+
+impl From<&castorepb::FileNode> for InodeData {
+    fn from(value: &castorepb::FileNode) -> Self {
+        InodeData::Regular(
+            value.digest.clone().try_into().unwrap(),
+            value.size,
+            value.executable,
+        )
+    }
+}
+
+/// Converts a DirectoryNode to a sparsely populated InodeData::Directory.
+impl From<&castorepb::DirectoryNode> for InodeData {
+    fn from(value: &castorepb::DirectoryNode) -> Self {
+        InodeData::Directory(DirectoryInodeData::Sparse(
+            value.digest.clone().try_into().unwrap(),
+            value.size,
+        ))
+    }
+}
diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs
new file mode 100644
index 000000000000..28528f38af1d
--- /dev/null
+++ b/tvix/castore/src/fs/mod.rs
@@ -0,0 +1,628 @@
+mod file_attr;
+mod inode_tracker;
+mod inodes;
+mod root_nodes;
+
+#[cfg(feature = "fuse")]
+pub mod fuse;
+
+#[cfg(feature = "virtiofs")]
+pub mod virtiofs;
+
+#[cfg(test)]
+mod tests;
+
+use crate::proto as castorepb;
+use crate::{
+    blobservice::{BlobReader, BlobService},
+    directoryservice::DirectoryService,
+    proto::{node::Node, NamedNode},
+    B3Digest,
+};
+use fuse_backend_rs::abi::fuse_abi::stat64;
+use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID};
+use futures::StreamExt;
+use parking_lot::RwLock;
+use std::{
+    collections::HashMap,
+    io,
+    sync::atomic::AtomicU64,
+    sync::{atomic::Ordering, Arc},
+    time::Duration,
+};
+use tokio::{
+    io::{AsyncReadExt, AsyncSeekExt},
+    sync::mpsc,
+};
+use tracing::{debug, info_span, instrument, warn};
+
+pub use self::root_nodes::RootNodes;
+use self::{
+    file_attr::{gen_file_attr, ROOT_FILE_ATTR},
+    inode_tracker::InodeTracker,
+    inodes::{DirectoryInodeData, InodeData},
+};
+
+/// This implements a read-only FUSE filesystem for a tvix-store
+/// with the passed [BlobService], [DirectoryService] and [RootNodes].
+///
+/// Linux uses inodes in filesystems. When implementing FUSE, most calls are
+/// *for* a given inode.
+///
+/// This means, we need to have a stable mapping of inode numbers to the
+/// corresponding store nodes.
+///
+/// We internally delegate all inode allocation and state keeping to the
+/// inode tracker.
+/// We store a mapping from currently "explored" names in the root to their
+/// inode.
+///
+/// There's some places where inodes are allocated / data inserted into
+/// the inode tracker, if not allocated before already:
+///  - Processing a `lookup` request, either in the mount root, or somewhere
+///    deeper.
+///  - Processing a `readdir` request
+///
+///  Things pointing to the same contents get the same inodes, irrespective of
+///  their own location.
+///  This means:
+///  - Symlinks with the same target will get the same inode.
+///  - Regular/executable files with the same contents will get the same inode
+///  - Directories with the same contents will get the same inode.
+///
+/// Due to the above being valid across the whole store, and considering the
+/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed
+/// allocation", aka reserve Directory.size inodes for each directory node we
+/// explore.
+/// Tests for this live in the tvix-store crate.
+pub struct TvixStoreFs<BS, DS, RN> {
+    blob_service: BS,
+    directory_service: DS,
+    root_nodes_provider: RN,
+
+    /// Whether to (try) listing elements in the root.
+    list_root: bool,
+
+    /// This maps a given basename in the root to the inode we allocated for the node.
+    root_nodes: RwLock<HashMap<Vec<u8>, u64>>,
+
+    /// This keeps track of inodes and data alongside them.
+    inode_tracker: RwLock<InodeTracker>,
+
+    /// This holds all open file handles
+    #[allow(clippy::type_complexity)]
+    file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>,
+
+    next_file_handle: AtomicU64,
+
+    tokio_handle: tokio::runtime::Handle,
+}
+
+impl<BS, DS, RN> TvixStoreFs<BS, DS, RN>
+where
+    BS: AsRef<dyn BlobService> + Clone + Send,
+    DS: AsRef<dyn DirectoryService> + Clone + Send + 'static,
+    RN: RootNodes + Clone + 'static,
+{
+    pub fn new(
+        blob_service: BS,
+        directory_service: DS,
+        root_nodes_provider: RN,
+        list_root: bool,
+    ) -> Self {
+        Self {
+            blob_service,
+            directory_service,
+            root_nodes_provider,
+
+            list_root,
+
+            root_nodes: RwLock::new(HashMap::default()),
+            inode_tracker: RwLock::new(Default::default()),
+
+            file_handles: RwLock::new(Default::default()),
+            next_file_handle: AtomicU64::new(1),
+            tokio_handle: tokio::runtime::Handle::current(),
+        }
+    }
+
+    /// Retrieves the inode for a given root node basename, if present.
+    /// This obtains a read lock on self.root_nodes.
+    fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> {
+        self.root_nodes.read().get(name).cloned()
+    }
+
+    /// For a given inode, look up the given directory behind it (from
+    /// self.inode_tracker), and return its children.
+    /// The inode_tracker MUST know about this inode already, and it MUST point
+    /// to a [InodeData::Directory].
+    /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup
+    /// in self.directory_service is performed, and self.inode_tracker is updated with the
+    /// [DirectoryInodeData::Populated].
+    #[instrument(skip(self), err)]
+    fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> {
+        let data = self.inode_tracker.read().get(ino).unwrap();
+        match *data {
+            // if it's populated already, return children.
+            InodeData::Directory(DirectoryInodeData::Populated(
+                ref parent_digest,
+                ref children,
+            )) => Ok((parent_digest.clone(), children.clone())),
+            // if it's sparse, fetch data using directory_service, populate child nodes
+            // and update it in [self.inode_tracker].
+            InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => {
+                let directory = self
+                    .tokio_handle
+                    .block_on(self.tokio_handle.spawn({
+                        let directory_service = self.directory_service.clone();
+                        let parent_digest = parent_digest.to_owned();
+                        async move { directory_service.as_ref().get(&parent_digest).await }
+                    }))
+                    .unwrap()?
+                    .ok_or_else(|| {
+                        warn!(directory.digest=%parent_digest, "directory not found");
+                        // If the Directory can't be found, this is a hole, bail out.
+                        io::Error::from_raw_os_error(libc::EIO)
+                    })?;
+
+                // Turn the retrieved directory into a InodeData::Directory(DirectoryInodeData::Populated(..)),
+                // allocating inodes for the children on the way.
+                let children = {
+                    let mut inode_tracker = self.inode_tracker.write();
+
+                    let children: Vec<(u64, castorepb::node::Node)> = directory
+                        .nodes()
+                        .map(|child_node| {
+                            let child_ino = inode_tracker.put((&child_node).into());
+                            (child_ino, child_node)
+                        })
+                        .collect();
+
+                    // replace.
+                    inode_tracker.replace(
+                        ino,
+                        Arc::new(InodeData::Directory(DirectoryInodeData::Populated(
+                            parent_digest.clone(),
+                            children.clone(),
+                        ))),
+                    );
+
+                    children
+                };
+
+                Ok((parent_digest.clone(), children))
+            }
+            // if the parent inode was not a directory, this doesn't make sense
+            InodeData::Regular(..) | InodeData::Symlink(_) => {
+                Err(io::Error::from_raw_os_error(libc::ENOTDIR))
+            }
+        }
+    }
+
+    /// This will turn a lookup request for a name in the root to a ino and
+    /// [InodeData].
+    /// It will peek in [self.root_nodes], and then either look it up from
+    /// [self.inode_tracker],
+    /// or otherwise fetch from [self.root_nodes], and then insert into
+    /// [self.inode_tracker].
+    /// In the case the name can't be found, a libc::ENOENT is returned.
+    fn name_in_root_to_ino_and_data(
+        &self,
+        name: &std::ffi::CStr,
+    ) -> io::Result<(u64, Arc<InodeData>)> {
+        // Look up the inode for that root node.
+        // If there's one, [self.inode_tracker] MUST also contain the data,
+        // which we can then return.
+        if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) {
+            return Ok((
+                inode,
+                self.inode_tracker
+                    .read()
+                    .get(inode)
+                    .expect("must exist")
+                    .to_owned(),
+            ));
+        }
+
+        // We don't have it yet, look it up in [self.root_nodes].
+        match self.tokio_handle.block_on({
+            let root_nodes_provider = self.root_nodes_provider.clone();
+            async move { root_nodes_provider.get_by_basename(name.to_bytes()).await }
+        }) {
+            // if there was an error looking up the root node, propagate up an IO error.
+            Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)),
+            // the root node doesn't exist, so the file doesn't exist.
+            Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
+            // The root node does exist
+            Ok(Some(ref root_node)) => {
+                // The name must match what's passed in the lookup, otherwise this is also a ENOENT.
+                if root_node.get_name() != name.to_bytes() {
+                    debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch");
+                    return Err(io::Error::from_raw_os_error(libc::ENOENT));
+                }
+
+                // Let's check if someone else beat us to updating the inode tracker and
+                // root_nodes map. This avoids locking inode_tracker for writing.
+                if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) {
+                    return Ok((
+                        *ino,
+                        self.inode_tracker.read().get(*ino).expect("must exist"),
+                    ));
+                }
+
+                // Only in case it doesn't, lock [self.root_nodes] and
+                // [self.inode_tracker] for writing.
+                let mut root_nodes = self.root_nodes.write();
+                let mut inode_tracker = self.inode_tracker.write();
+
+                // insert the (sparse) inode data and register in
+                // self.root_nodes.
+                let inode_data: InodeData = root_node.into();
+                let ino = inode_tracker.put(inode_data.clone());
+                root_nodes.insert(name.to_bytes().into(), ino);
+
+                Ok((ino, Arc::new(inode_data)))
+            }
+        }
+    }
+}
+
+impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN>
+where
+    BS: AsRef<dyn BlobService> + Clone + Send + 'static,
+    DS: AsRef<dyn DirectoryService> + Send + Clone + 'static,
+    RN: RootNodes + Clone + 'static,
+{
+    type Handle = u64;
+    type Inode = u64;
+
+    fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> {
+        Ok(FsOptions::empty())
+    }
+
+    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
+    fn getattr(
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _handle: Option<Self::Handle>,
+    ) -> io::Result<(stat64, Duration)> {
+        if inode == ROOT_ID {
+            return Ok((ROOT_FILE_ATTR.into(), Duration::MAX));
+        }
+
+        match self.inode_tracker.read().get(inode) {
+            None => Err(io::Error::from_raw_os_error(libc::ENOENT)),
+            Some(node) => {
+                debug!(node = ?node, "found node");
+                Ok((gen_file_attr(&node, inode).into(), Duration::MAX))
+            }
+        }
+    }
+
+    #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))]
+    fn lookup(
+        &self,
+        _ctx: &Context,
+        parent: Self::Inode,
+        name: &std::ffi::CStr,
+    ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> {
+        debug!("lookup");
+
+        // This goes from a parent inode to a node.
+        // - If the parent is [ROOT_ID], we need to check
+        //   [self.root_nodes] (fetching from a [RootNode] provider if needed)
+        // - Otherwise, lookup the parent in [self.inode_tracker] (which must be
+        //   a [InodeData::Directory]), and find the child with that name.
+        if parent == ROOT_ID {
+            let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?;
+
+            debug!(inode_data=?&inode_data, ino=ino, "Some");
+            return Ok(fuse_backend_rs::api::filesystem::Entry {
+                inode: ino,
+                attr: gen_file_attr(&inode_data, ino).into(),
+                attr_timeout: Duration::MAX,
+                entry_timeout: Duration::MAX,
+                ..Default::default()
+            });
+        }
+        // This is the "lookup for "a" inside inode 42.
+        // We already know that inode 42 must be a directory.
+        let (parent_digest, children) = self.get_directory_children(parent)?;
+
+        let span = info_span!("lookup", directory.digest = %parent_digest);
+        let _enter = span.enter();
+
+        // Search for that name in the list of children and return the FileAttrs.
+
+        // in the children, find the one with the desired name.
+        if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) {
+            // lookup the child [InodeData] in [self.inode_tracker].
+            // We know the inodes for children have already been allocated.
+            let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap();
+
+            // Reply with the file attributes for the child.
+            // For child directories, we still have all data we need to reply.
+            Ok(fuse_backend_rs::api::filesystem::Entry {
+                inode: *child_ino,
+                attr: gen_file_attr(&child_inode_data, *child_ino).into(),
+                attr_timeout: Duration::MAX,
+                entry_timeout: Duration::MAX,
+                ..Default::default()
+            })
+        } else {
+            // Child not found, return ENOENT.
+            Err(io::Error::from_raw_os_error(libc::ENOENT))
+        }
+    }
+
+    // TODO: readdirplus?
+
+    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))]
+    fn readdir(
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _handle: Self::Handle,
+        _size: u32,
+        offset: u64,
+        add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>,
+    ) -> io::Result<()> {
+        debug!("readdir");
+
+        if inode == ROOT_ID {
+            if !self.list_root {
+                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
+            } else {
+                let root_nodes_provider = self.root_nodes_provider.clone();
+                let (tx, mut rx) = mpsc::channel(16);
+
+                // This task will run in the background immediately and will exit
+                // after the stream ends or if we no longer want any more entries.
+                self.tokio_handle.spawn(async move {
+                    let mut stream = root_nodes_provider.list().skip(offset as usize).enumerate();
+                    while let Some(node) = stream.next().await {
+                        if tx.send(node).await.is_err() {
+                            // If we get a send error, it means the sync code
+                            // doesn't want any more entries.
+                            break;
+                        }
+                    }
+                });
+
+                while let Some((i, ref root_node)) = rx.blocking_recv() {
+                    let root_node = match root_node {
+                        Err(e) => {
+                            warn!("failed to retrieve pathinfo: {}", e);
+                            return Err(io::Error::from_raw_os_error(libc::EPERM));
+                        }
+                        Ok(root_node) => root_node,
+                    };
+
+                    let name = root_node.get_name();
+                    // obtain the inode, or allocate a new one.
+                    let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| {
+                        // insert the (sparse) inode data and register in
+                        // self.root_nodes.
+                        let ino = self.inode_tracker.write().put(root_node.into());
+                        self.root_nodes.write().insert(name.into(), ino);
+                        ino
+                    });
+
+                    let ty = match root_node {
+                        Node::Directory(_) => libc::S_IFDIR,
+                        Node::File(_) => libc::S_IFREG,
+                        Node::Symlink(_) => libc::S_IFLNK,
+                    };
+
+                    let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
+                        ino,
+                        offset: offset + i as u64 + 1,
+                        type_: ty,
+                        name,
+                    })?;
+                    // If the buffer is full, add_entry will return `Ok(0)`.
+                    if written == 0 {
+                        break;
+                    }
+                }
+
+                return Ok(());
+            }
+        }
+
+        // lookup the children, or return an error if it's not a directory.
+        let (parent_digest, children) = self.get_directory_children(inode)?;
+
+        let span = info_span!("lookup", directory.digest = %parent_digest);
+        let _enter = span.enter();
+
+        for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() {
+            // the second parameter will become the "offset" parameter on the next call.
+            let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
+                ino: *ino,
+                offset: offset + i as u64 + 1,
+                type_: match child_node {
+                    #[allow(clippy::unnecessary_cast)]
+                    // libc::S_IFDIR is u32 on Linux and u16 on MacOS
+                    Node::Directory(_) => libc::S_IFDIR as u32,
+                    #[allow(clippy::unnecessary_cast)]
+                    // libc::S_IFDIR is u32 on Linux and u16 on MacOS
+                    Node::File(_) => libc::S_IFREG as u32,
+                    #[allow(clippy::unnecessary_cast)]
+                    // libc::S_IFDIR is u32 on Linux and u16 on MacOS
+                    Node::Symlink(_) => libc::S_IFLNK as u32,
+                },
+                name: child_node.get_name(),
+            })?;
+            // If the buffer is full, add_entry will return `Ok(0)`.
+            if written == 0 {
+                break;
+            }
+        }
+
+        Ok(())
+    }
+
+    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
+    fn open(
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _flags: u32,
+        _fuse_flags: u32,
+    ) -> io::Result<(
+        Option<Self::Handle>,
+        fuse_backend_rs::api::filesystem::OpenOptions,
+    )> {
+        if inode == ROOT_ID {
+            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
+        }
+
+        // lookup the inode
+        match *self.inode_tracker.read().get(inode).unwrap() {
+            // read is invalid on non-files.
+            InodeData::Directory(..) | InodeData::Symlink(_) => {
+                warn!("is directory");
+                Err(io::Error::from_raw_os_error(libc::EISDIR))
+            }
+            InodeData::Regular(ref blob_digest, _blob_size, _) => {
+                let span = info_span!("read", blob.digest = %blob_digest);
+                let _enter = span.enter();
+
+                let task = self.tokio_handle.spawn({
+                    let blob_service = self.blob_service.clone();
+                    let blob_digest = blob_digest.clone();
+                    async move { blob_service.as_ref().open_read(&blob_digest).await }
+                });
+
+                let blob_reader = self.tokio_handle.block_on(task).unwrap();
+
+                match blob_reader {
+                    Ok(None) => {
+                        warn!("blob not found");
+                        Err(io::Error::from_raw_os_error(libc::EIO))
+                    }
+                    Err(e) => {
+                        warn!(e=?e, "error opening blob");
+                        Err(io::Error::from_raw_os_error(libc::EIO))
+                    }
+                    Ok(Some(blob_reader)) => {
+                        // get a new file handle
+                        // TODO: this will overflow after 2**64 operations,
+                        // which is fine for now.
+                        // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
+                        // for the discussion on alternatives.
+                        let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst);
+
+                        debug!("add file handle {}", fh);
+                        self.file_handles
+                            .write()
+                            .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader)));
+
+                        Ok((
+                            Some(fh),
+                            fuse_backend_rs::api::filesystem::OpenOptions::empty(),
+                        ))
+                    }
+                }
+            }
+        }
+    }
+
+    #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))]
+    fn release(
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _flags: u32,
+        handle: Self::Handle,
+        _flush: bool,
+        _flock_release: bool,
+        _lock_owner: Option<u64>,
+    ) -> io::Result<()> {
+        // remove and get ownership on the blob reader
+        match self.file_handles.write().remove(&handle) {
+            // drop it, which will close it.
+            Some(blob_reader) => drop(blob_reader),
+            None => {
+                // These might already be dropped if a read error occured.
+                debug!("file_handle {} not found", handle);
+            }
+        }
+
+        Ok(())
+    }
+
+    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))]
+    fn read(
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        handle: Self::Handle,
+        w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter,
+        size: u32,
+        offset: u64,
+        _lock_owner: Option<u64>,
+        _flags: u32,
+    ) -> io::Result<usize> {
+        debug!("read");
+
+        // We need to take out the blob reader from self.file_handles, so we can
+        // interact with it in the separate task.
+        // On success, we pass it back out of the task, so we can put it back in self.file_handles.
+        let blob_reader = match self.file_handles.read().get(&handle) {
+            Some(blob_reader) => blob_reader.clone(),
+            None => {
+                warn!("file handle {} unknown", handle);
+                return Err(io::Error::from_raw_os_error(libc::EIO));
+            }
+        };
+
+        let task = self.tokio_handle.spawn(async move {
+            let mut blob_reader = blob_reader.lock().await;
+
+            // seek to the offset specified, which is relative to the start of the file.
+            let resp = blob_reader.seek(io::SeekFrom::Start(offset)).await;
+
+            match resp {
+                Ok(pos) => {
+                    debug_assert_eq!(offset, pos);
+                }
+                Err(e) => {
+                    warn!("failed to seek to offset {}: {}", offset, e);
+                    return Err(io::Error::from_raw_os_error(libc::EIO));
+                }
+            }
+
+            // As written in the fuse docs, read should send exactly the number
+            // of bytes requested except on EOF or error.
+
+            let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
+
+            // copy things from the internal buffer into buf to fill it till up until size
+            tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?;
+
+            Ok(buf)
+        });
+
+        let buf = self.tokio_handle.block_on(task).unwrap()?;
+
+        w.write(&buf)
+    }
+
+    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
+    fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> {
+        if inode == ROOT_ID {
+            return Err(io::Error::from_raw_os_error(libc::ENOSYS));
+        }
+
+        // lookup the inode
+        match *self.inode_tracker.read().get(inode).unwrap() {
+            InodeData::Directory(..) | InodeData::Regular(..) => {
+                Err(io::Error::from_raw_os_error(libc::EINVAL))
+            }
+            InodeData::Symlink(ref target) => Ok(target.to_vec()),
+        }
+    }
+}
diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs
new file mode 100644
index 000000000000..6609e049a1fc
--- /dev/null
+++ b/tvix/castore/src/fs/root_nodes.rs
@@ -0,0 +1,37 @@
+use std::collections::BTreeMap;
+
+use crate::{proto::node::Node, Error};
+use bytes::Bytes;
+use futures::stream::BoxStream;
+use tonic::async_trait;
+
+/// Provides an interface for looking up root nodes  in tvix-castore by given
+/// a lookup key (usually the basename), and optionally allow a listing.
+#[async_trait]
+pub trait RootNodes: Send + Sync {
+    /// Looks up a root CA node based on the basename of the node in the root
+    /// directory of the filesystem.
+    async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>;
+
+    /// Lists all root CA nodes in the filesystem. An error can be returned
+    /// in case listing is not allowed
+    fn list(&self) -> BoxStream<Result<Node, Error>>;
+}
+
+#[async_trait]
+/// Implements RootNodes for something deref'ing to a BTreeMap of Nodes, where
+/// the key is the node name.
+impl<T> RootNodes for T
+where
+    T: AsRef<BTreeMap<Bytes, Node>> + Send + Sync,
+{
+    async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> {
+        Ok(self.as_ref().get(name).cloned())
+    }
+
+    fn list(&self) -> BoxStream<Result<Node, Error>> {
+        Box::pin(tokio_stream::iter(
+            self.as_ref().iter().map(|(_, v)| Ok(v.clone())),
+        ))
+    }
+}
diff --git a/tvix/castore/src/fs/tests.rs b/tvix/castore/src/fs/tests.rs
new file mode 100644
index 000000000000..2f27c3c1c8e4
--- /dev/null
+++ b/tvix/castore/src/fs/tests.rs
@@ -0,0 +1,1141 @@
+use std::{
+    collections::BTreeMap,
+    io::{self, Cursor},
+    os::unix::fs::MetadataExt,
+    path::Path,
+    sync::Arc,
+};
+
+use bytes::Bytes;
+use tempfile::TempDir;
+use tokio_stream::{wrappers::ReadDirStream, StreamExt};
+
+use super::{fuse::FuseDaemon, TvixStoreFs};
+use crate::proto::node::Node;
+use crate::proto::{self as castorepb};
+use crate::{
+    blobservice::{BlobService, MemoryBlobService},
+    directoryservice::{DirectoryService, MemoryDirectoryService},
+    fixtures,
+};
+
+const BLOB_A_NAME: &str = "00000000000000000000000000000000-test";
+const BLOB_B_NAME: &str = "55555555555555555555555555555555-test";
+const HELLOWORLD_BLOB_NAME: &str = "66666666666666666666666666666666-test";
+const SYMLINK_NAME: &str = "11111111111111111111111111111111-test";
+const SYMLINK_NAME2: &str = "44444444444444444444444444444444-test";
+const DIRECTORY_WITH_KEEP_NAME: &str = "22222222222222222222222222222222-test";
+const DIRECTORY_COMPLICATED_NAME: &str = "33333333333333333333333333333333-test";
+
+fn gen_svcs() -> (Arc<dyn BlobService>, Arc<dyn DirectoryService>) {
+    (
+        Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>,
+        Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService>,
+    )
+}
+
+fn do_mount<P: AsRef<Path>, BS, DS>(
+    blob_service: BS,
+    directory_service: DS,
+    root_nodes: BTreeMap<bytes::Bytes, Node>,
+    mountpoint: P,
+    list_root: bool,
+) -> io::Result<FuseDaemon>
+where
+    BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
+    DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
+{
+    let fs = TvixStoreFs::new(
+        blob_service,
+        directory_service,
+        Arc::new(root_nodes),
+        list_root,
+    );
+    FuseDaemon::new(Arc::new(fs), mountpoint.as_ref(), 4)
+}
+
+async fn populate_blob_a(
+    blob_service: &Arc<dyn BlobService>,
+    root_nodes: &mut BTreeMap<Bytes, Node>,
+) {
+    let mut bw = blob_service.open_write().await;
+    tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw)
+        .await
+        .expect("must succeed uploading");
+    bw.close().await.expect("must succeed closing");
+
+    root_nodes.insert(
+        BLOB_A_NAME.into(),
+        Node::File(castorepb::FileNode {
+            name: BLOB_A_NAME.into(),
+            digest: fixtures::BLOB_A_DIGEST.clone().into(),
+            size: fixtures::BLOB_A.len() as u64,
+            executable: false,
+        }),
+    );
+}
+
+async fn populate_blob_b(
+    blob_service: &Arc<dyn BlobService>,
+    root_nodes: &mut BTreeMap<Bytes, Node>,
+) {
+    let mut bw = blob_service.open_write().await;
+    tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw)
+        .await
+        .expect("must succeed uploading");
+    bw.close().await.expect("must succeed closing");
+
+    root_nodes.insert(
+        BLOB_B_NAME.into(),
+        Node::File(castorepb::FileNode {
+            name: BLOB_B_NAME.into(),
+            digest: fixtures::BLOB_B_DIGEST.clone().into(),
+            size: fixtures::BLOB_B.len() as u64,
+            executable: false,
+        }),
+    );
+}
+
+/// adds a blob containing helloworld and marks it as executable
+async fn populate_blob_helloworld(
+    blob_service: &Arc<dyn BlobService>,
+    root_nodes: &mut BTreeMap<Bytes, Node>,
+) {
+    let mut bw = blob_service.open_write().await;
+    tokio::io::copy(
+        &mut Cursor::new(fixtures::HELLOWORLD_BLOB_CONTENTS.to_vec()),
+        &mut bw,
+    )
+    .await
+    .expect("must succeed uploading");
+    bw.close().await.expect("must succeed closing");
+
+    root_nodes.insert(
+        HELLOWORLD_BLOB_NAME.into(),
+        Node::File(castorepb::FileNode {
+            name: HELLOWORLD_BLOB_NAME.into(),
+            digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64,
+            executable: true,
+        }),
+    );
+}
+
+async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) {
+    root_nodes.insert(
+        SYMLINK_NAME.into(),
+        Node::Symlink(castorepb::SymlinkNode {
+            name: SYMLINK_NAME.into(),
+            target: BLOB_A_NAME.into(),
+        }),
+    );
+}
+
+/// This writes a symlink pointing to /nix/store/somewhereelse,
+/// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED.
+async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) {
+    root_nodes.insert(
+        SYMLINK_NAME2.into(),
+        Node::Symlink(castorepb::SymlinkNode {
+            name: SYMLINK_NAME2.into(),
+            target: "/nix/store/somewhereelse".into(),
+        }),
+    );
+}
+
+async fn populate_directory_with_keep(
+    blob_service: &Arc<dyn BlobService>,
+    directory_service: &Arc<dyn DirectoryService>,
+    root_nodes: &mut BTreeMap<Bytes, Node>,
+) {
+    // upload empty blob
+    let mut bw = blob_service.open_write().await;
+    assert_eq!(
+        fixtures::EMPTY_BLOB_DIGEST.as_slice(),
+        bw.close().await.expect("must succeed closing").as_slice(),
+    );
+
+    // upload directory
+    directory_service
+        .put(fixtures::DIRECTORY_WITH_KEEP.clone())
+        .await
+        .expect("must succeed uploading");
+
+    root_nodes.insert(
+        DIRECTORY_WITH_KEEP_NAME.into(),
+        castorepb::node::Node::Directory(castorepb::DirectoryNode {
+            name: DIRECTORY_WITH_KEEP_NAME.into(),
+            digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(),
+            size: fixtures::DIRECTORY_WITH_KEEP.size(),
+        }),
+    );
+}
+
+/// Create a root node for DIRECTORY_WITH_KEEP, but don't upload the Directory
+/// itself.
+async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) {
+    root_nodes.insert(
+        DIRECTORY_WITH_KEEP_NAME.into(),
+        castorepb::node::Node::Directory(castorepb::DirectoryNode {
+            name: DIRECTORY_WITH_KEEP_NAME.into(),
+            digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(),
+            size: fixtures::DIRECTORY_WITH_KEEP.size(),
+        }),
+    );
+}
+
+/// Insert BLOB_A, but don't provide the blob .keep is pointing to.
+async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) {
+    root_nodes.insert(
+        BLOB_A_NAME.into(),
+        Node::File(castorepb::FileNode {
+            name: BLOB_A_NAME.into(),
+            digest: fixtures::BLOB_A_DIGEST.clone().into(),
+            size: fixtures::BLOB_A.len() as u64,
+            executable: false,
+        }),
+    );
+}
+
+async fn populate_directory_complicated(
+    blob_service: &Arc<dyn BlobService>,
+    directory_service: &Arc<dyn DirectoryService>,
+    root_nodes: &mut BTreeMap<Bytes, Node>,
+) {
+    // upload empty blob
+    let mut bw = blob_service.open_write().await;
+    assert_eq!(
+        fixtures::EMPTY_BLOB_DIGEST.as_slice(),
+        bw.close().await.expect("must succeed closing").as_slice(),
+    );
+
+    // upload inner directory
+    directory_service
+        .put(fixtures::DIRECTORY_WITH_KEEP.clone())
+        .await
+        .expect("must succeed uploading");
+
+    // upload parent directory
+    directory_service
+        .put(fixtures::DIRECTORY_COMPLICATED.clone())
+        .await
+        .expect("must succeed uploading");
+
+    root_nodes.insert(
+        DIRECTORY_COMPLICATED_NAME.into(),
+        Node::Directory(castorepb::DirectoryNode {
+            name: DIRECTORY_COMPLICATED_NAME.into(),
+            digest: fixtures::DIRECTORY_COMPLICATED.digest().into(),
+            size: fixtures::DIRECTORY_COMPLICATED.size(),
+        }),
+    );
+}
+
+/// Ensure mounting itself doesn't fail
+#[tokio::test]
+async fn mount() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        BTreeMap::default(),
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    fuse_daemon.unmount().expect("unmount");
+}
+/// Ensure listing the root isn't allowed
+#[tokio::test]
+async fn root() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        BTreeMap::default(),
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    {
+        // read_dir succeeds, but getting the first element will fail.
+        let mut it = ReadDirStream::new(tokio::fs::read_dir(tmpdir).await.expect("must succeed"));
+
+        let err = it
+            .next()
+            .await
+            .expect("must be some")
+            .expect_err("must be err");
+        assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind());
+    }
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Ensure listing the root is allowed if configured explicitly
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn root_with_listing() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_blob_a(&blob_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        true, /* allow listing */
+    )
+    .expect("must succeed");
+
+    {
+        // read_dir succeeds, but getting the first element will fail.
+        let mut it = ReadDirStream::new(tokio::fs::read_dir(tmpdir).await.expect("must succeed"));
+
+        let e = it
+            .next()
+            .await
+            .expect("must be some")
+            .expect("must succeed");
+
+        let metadata = e.metadata().await.expect("must succeed");
+        assert!(metadata.is_file());
+        assert!(metadata.permissions().readonly());
+        assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len());
+    }
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Ensure we can stat a file at the root
+#[tokio::test]
+async fn stat_file_at_root() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_blob_a(&blob_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir.path().join(BLOB_A_NAME);
+
+    // peek at the file metadata
+    let metadata = tokio::fs::metadata(p).await.expect("must succeed");
+
+    assert!(metadata.is_file());
+    assert!(metadata.permissions().readonly());
+    assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len());
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Ensure we can read a file at the root
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn read_file_at_root() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_blob_a(&blob_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir.path().join(BLOB_A_NAME);
+
+    // read the file contents
+    let data = tokio::fs::read(p).await.expect("must succeed");
+
+    // ensure size and contents match
+    assert_eq!(fixtures::BLOB_A.len(), data.len());
+    assert_eq!(fixtures::BLOB_A.to_vec(), data);
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Ensure we can read a large file at the root
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn read_large_file_at_root() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_blob_b(&blob_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir.path().join(BLOB_B_NAME);
+    {
+        // peek at the file metadata
+        let metadata = tokio::fs::metadata(&p).await.expect("must succeed");
+
+        assert!(metadata.is_file());
+        assert!(metadata.permissions().readonly());
+        assert_eq!(fixtures::BLOB_B.len() as u64, metadata.len());
+    }
+
+    // read the file contents
+    let data = tokio::fs::read(p).await.expect("must succeed");
+
+    // ensure size and contents match
+    assert_eq!(fixtures::BLOB_B.len(), data.len());
+    assert_eq!(fixtures::BLOB_B.to_vec(), data);
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Read the target of a symlink
+#[tokio::test]
+async fn symlink_readlink() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_symlink(&mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir.path().join(SYMLINK_NAME);
+
+    let target = tokio::fs::read_link(&p).await.expect("must succeed");
+    assert_eq!(BLOB_A_NAME, target.to_str().unwrap());
+
+    // peek at the file metadata, which follows symlinks.
+    // this must fail, as we didn't populate the target.
+    let e = tokio::fs::metadata(&p).await.expect_err("must fail");
+    assert_eq!(std::io::ErrorKind::NotFound, e.kind());
+
+    // peeking at the file metadata without following symlinks will succeed.
+    let metadata = tokio::fs::symlink_metadata(&p).await.expect("must succeed");
+    assert!(metadata.is_symlink());
+
+    // reading from the symlink (which follows) will fail, because the target doesn't exist.
+    let e = tokio::fs::read(p).await.expect_err("must fail");
+    assert_eq!(std::io::ErrorKind::NotFound, e.kind());
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Read and stat a regular file through a symlink pointing to it.
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn read_stat_through_symlink() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_blob_a(&blob_service, &mut root_nodes).await;
+    populate_symlink(&mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p_symlink = tmpdir.path().join(SYMLINK_NAME);
+    let p_blob = tmpdir.path().join(SYMLINK_NAME);
+
+    // peek at the file metadata, which follows symlinks.
+    // this must now return the same metadata as when statting at the target directly.
+    let metadata_symlink = tokio::fs::metadata(&p_symlink).await.expect("must succeed");
+    let metadata_blob = tokio::fs::metadata(&p_blob).await.expect("must succeed");
+    assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type());
+    assert_eq!(metadata_blob.len(), metadata_symlink.len());
+
+    // reading from the symlink (which follows) will return the same data as if
+    // we were reading from the file directly.
+    assert_eq!(
+        tokio::fs::read(p_blob).await.expect("must succeed"),
+        tokio::fs::read(p_symlink).await.expect("must succeed"),
+    );
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Read a directory in the root, and validate some attributes.
+#[tokio::test]
+async fn read_stat_directory() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
+
+    // peek at the metadata of the directory
+    let metadata = tokio::fs::metadata(p).await.expect("must succeed");
+    assert!(metadata.is_dir());
+    assert!(metadata.permissions().readonly());
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+/// Read a blob inside a directory. This ensures we successfully populate directory data.
+async fn read_blob_inside_dir() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep");
+
+    // peek at metadata.
+    let metadata = tokio::fs::metadata(&p).await.expect("must succeed");
+    assert!(metadata.is_file());
+    assert!(metadata.permissions().readonly());
+
+    // read from it
+    let data = tokio::fs::read(&p).await.expect("must succeed");
+    assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data);
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+/// Read a blob inside a directory inside a directory. This ensures we properly
+/// populate directories as we traverse down the structure.
+async fn read_blob_deep_inside_dir() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir
+        .path()
+        .join(DIRECTORY_COMPLICATED_NAME)
+        .join("keep")
+        .join(".keep");
+
+    // peek at metadata.
+    let metadata = tokio::fs::metadata(&p).await.expect("must succeed");
+    assert!(metadata.is_file());
+    assert!(metadata.permissions().readonly());
+
+    // read from it
+    let data = tokio::fs::read(&p).await.expect("must succeed");
+    assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data);
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Ensure readdir works.
+#[tokio::test]
+async fn readdir() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME);
+
+    {
+        // read_dir should succeed. Collect all elements
+        let elements: Vec<_> =
+            ReadDirStream::new(tokio::fs::read_dir(p).await.expect("must succeed"))
+                .map(|e| e.expect("must not be err"))
+                .collect()
+                .await;
+
+        assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and ..
+
+        // We explicitly look at specific positions here, because we always emit
+        // them ordered.
+
+        // ".keep", 0 byte file.
+        let e = &elements[0];
+        assert_eq!(".keep", e.file_name());
+        assert!(e.file_type().await.expect("must succeed").is_file());
+        assert_eq!(0, e.metadata().await.expect("must succeed").len());
+
+        // "aa", symlink.
+        let e = &elements[1];
+        assert_eq!("aa", e.file_name());
+        assert!(e.file_type().await.expect("must succeed").is_symlink());
+
+        // "keep", directory
+        let e = &elements[2];
+        assert_eq!("keep", e.file_name());
+        assert!(e.file_type().await.expect("must succeed").is_dir());
+    }
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+#[tokio::test]
+/// Do a readdir deeper inside a directory, without doing readdir or stat in the parent directory.
+async fn readdir_deep() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep");
+
+    {
+        // read_dir should succeed. Collect all elements
+        let elements: Vec<_> =
+            ReadDirStream::new(tokio::fs::read_dir(p).await.expect("must succeed"))
+                .map(|e| e.expect("must not be err"))
+                .collect()
+                .await;
+
+        assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and ..
+
+        // ".keep", 0 byte file.
+        let e = &elements[0];
+        assert_eq!(".keep", e.file_name());
+        assert!(e.file_type().await.expect("must succeed").is_file());
+        assert_eq!(0, e.metadata().await.expect("must succeed").len());
+    }
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Check attributes match how they show up in /nix/store normally.
+#[tokio::test]
+async fn check_attributes() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_blob_a(&blob_service, &mut root_nodes).await;
+    populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
+    populate_symlink(&mut root_nodes).await;
+    populate_blob_helloworld(&blob_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p_file = tmpdir.path().join(BLOB_A_NAME);
+    let p_directory = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
+    let p_symlink = tmpdir.path().join(SYMLINK_NAME);
+    let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME);
+
+    // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident.
+    let metadata_file = tokio::fs::symlink_metadata(&p_file)
+        .await
+        .expect("must succeed");
+    let metadata_executable_file = tokio::fs::symlink_metadata(&p_executable_file)
+        .await
+        .expect("must succeed");
+    let metadata_directory = tokio::fs::symlink_metadata(&p_directory)
+        .await
+        .expect("must succeed");
+    let metadata_symlink = tokio::fs::symlink_metadata(&p_symlink)
+        .await
+        .expect("must succeed");
+
+    // modes should match. We & with 0o777 to remove any higher bits.
+    assert_eq!(0o444, metadata_file.mode() & 0o777);
+    assert_eq!(0o555, metadata_executable_file.mode() & 0o777);
+    assert_eq!(0o555, metadata_directory.mode() & 0o777);
+    assert_eq!(0o444, metadata_symlink.mode() & 0o777);
+
+    // files should have the correct filesize
+    assert_eq!(fixtures::BLOB_A.len() as u64, metadata_file.len());
+    // directories should have their "size" as filesize
+    assert_eq!(
+        { fixtures::DIRECTORY_WITH_KEEP.size() },
+        metadata_directory.size()
+    );
+
+    for metadata in &[&metadata_file, &metadata_directory, &metadata_symlink] {
+        // uid and gid should be 0.
+        assert_eq!(0, metadata.uid());
+        assert_eq!(0, metadata.gid());
+
+        // all times should be set to the unix epoch.
+        assert_eq!(0, metadata.atime());
+        assert_eq!(0, metadata.mtime());
+        assert_eq!(0, metadata.ctime());
+        // crtime seems MacOS only
+    }
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+#[tokio::test]
+/// Ensure we allocate the same inodes for the same directory contents.
+/// $DIRECTORY_COMPLICATED_NAME/keep contains the same data as $DIRECTORY_WITH_KEEP.
+async fn compare_inodes_directories() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
+    populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p_dir_with_keep = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
+    let p_sibling_dir = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep");
+
+    // peek at metadata.
+    assert_eq!(
+        tokio::fs::metadata(p_dir_with_keep)
+            .await
+            .expect("must succeed")
+            .ino(),
+        tokio::fs::metadata(p_sibling_dir)
+            .await
+            .expect("must succeed")
+            .ino()
+    );
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Ensure we allocate the same inodes for the same directory contents.
+/// $DIRECTORY_COMPLICATED_NAME/keep/,keep contains the same data as $DIRECTORY_COMPLICATED_NAME/.keep
+#[tokio::test]
+async fn compare_inodes_files() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p_keep1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join(".keep");
+    let p_keep2 = tmpdir
+        .path()
+        .join(DIRECTORY_COMPLICATED_NAME)
+        .join("keep")
+        .join(".keep");
+
+    // peek at metadata.
+    assert_eq!(
+        tokio::fs::metadata(p_keep1)
+            .await
+            .expect("must succeed")
+            .ino(),
+        tokio::fs::metadata(p_keep2)
+            .await
+            .expect("must succeed")
+            .ino()
+    );
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Ensure we allocate the same inode for symlinks pointing to the same targets.
+/// $DIRECTORY_COMPLICATED_NAME/aa points to the same target as SYMLINK_NAME2.
+#[tokio::test]
+async fn compare_inodes_symlinks() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
+    populate_symlink2(&mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("aa");
+    let p2 = tmpdir.path().join(SYMLINK_NAME2);
+
+    // peek at metadata.
+    assert_eq!(
+        tokio::fs::symlink_metadata(p1)
+            .await
+            .expect("must succeed")
+            .ino(),
+        tokio::fs::symlink_metadata(p2)
+            .await
+            .expect("must succeed")
+            .ino()
+    );
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Check we match paths exactly.
+#[tokio::test]
+async fn read_wrong_paths_in_root() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_blob_a(&blob_service, &mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    // wrong name
+    assert!(
+        tokio::fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes"))
+            .await
+            .is_err()
+    );
+
+    // invalid hash
+    assert!(
+        tokio::fs::metadata(tmpdir.path().join("0000000000000000000000000000000-test"))
+            .await
+            .is_err()
+    );
+
+    // right name, must exist
+    assert!(
+        tokio::fs::metadata(tmpdir.path().join("00000000000000000000000000000000-test"))
+            .await
+            .is_ok()
+    );
+
+    // now wrong name with right hash still may not exist
+    assert!(
+        tokio::fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes"))
+            .await
+            .is_err()
+    );
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+/// Make sure writes are not allowed
+#[tokio::test]
+async fn disallow_writes() {
+    // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let root_nodes = BTreeMap::default();
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir.path().join(BLOB_A_NAME);
+    let e = tokio::fs::File::create(p).await.expect_err("must fail");
+
+    assert_eq!(Some(libc::EROFS), e.raw_os_error());
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+#[tokio::test]
+/// Ensure we get an IO error if the directory service does not have the Directory object.
+async fn missing_directory() {
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_directorynode_without_directory(&mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME);
+
+    {
+        // `stat` on the path should succeed, because it doesn't trigger the directory request.
+        tokio::fs::metadata(&p).await.expect("must succeed");
+
+        // However, calling either `readdir` or `stat` on a child should fail with an IO error.
+        // It fails when trying to pull the first entry, because we don't implement opendir separately
+        ReadDirStream::new(tokio::fs::read_dir(&p).await.unwrap())
+            .next()
+            .await
+            .expect("must be some")
+            .expect_err("must be err");
+
+        // rust currently sets e.kind() to Uncategorized, which isn't very
+        // helpful, so we don't look at the error more closely than that..
+        tokio::fs::metadata(p.join(".keep"))
+            .await
+            .expect_err("must fail");
+    }
+
+    fuse_daemon.unmount().expect("unmount");
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+/// Ensure we get an IO error if the blob service does not have the blob
+async fn missing_blob() {
+    if !std::path::Path::new("/dev/fuse").exists() {
+        eprintln!("skipping test");
+        return;
+    }
+    let tmpdir = TempDir::new().unwrap();
+
+    let (blob_service, directory_service) = gen_svcs();
+    let mut root_nodes = BTreeMap::default();
+
+    populate_filenode_without_blob(&mut root_nodes).await;
+
+    let mut fuse_daemon = do_mount(
+        blob_service,
+        directory_service,
+        root_nodes,
+        tmpdir.path(),
+        false,
+    )
+    .expect("must succeed");
+
+    let p = tmpdir.path().join(BLOB_A_NAME);
+
+    {
+        // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service.
+        tokio::fs::metadata(&p).await.expect("must succeed");
+
+        // However, calling read on the blob should fail.
+        // rust currently sets e.kind() to Uncategorized, which isn't very
+        // helpful, so we don't look at the error more closely than that..
+        tokio::fs::read(p).await.expect_err("must fail");
+    }
+
+    fuse_daemon.unmount().expect("unmount");
+}
diff --git a/tvix/castore/src/fs/virtiofs.rs b/tvix/castore/src/fs/virtiofs.rs
new file mode 100644
index 000000000000..846270d28568
--- /dev/null
+++ b/tvix/castore/src/fs/virtiofs.rs
@@ -0,0 +1,237 @@
+use std::{
+    convert, error, fmt, io,
+    ops::Deref,
+    path::Path,
+    sync::{Arc, MutexGuard, RwLock},
+};
+
+use fuse_backend_rs::{
+    api::{filesystem::FileSystem, server::Server},
+    transport::{FsCacheReqHandler, Reader, VirtioFsWriter},
+};
+use tracing::error;
+use vhost::vhost_user::{
+    Listener, SlaveFsCacheReq, VhostUserProtocolFeatures, VhostUserVirtioFeatures,
+};
+use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringMutex, VringState, VringT};
+use virtio_bindings::bindings::virtio_ring::{
+    VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC,
+};
+use virtio_queue::QueueT;
+use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap};
+use vmm_sys_util::epoll::EventSet;
+
+const VIRTIO_F_VERSION_1: u32 = 32;
+const NUM_QUEUES: usize = 2;
+const QUEUE_SIZE: usize = 1024;
+
+#[derive(Debug)]
+enum Error {
+    /// Failed to handle non-input event.
+    HandleEventNotEpollIn,
+    /// Failed to handle unknown event.
+    HandleEventUnknownEvent,
+    /// Invalid descriptor chain.
+    InvalidDescriptorChain,
+    /// Failed to handle filesystem requests.
+    HandleRequests(fuse_backend_rs::Error),
+    /// Failed to construct new vhost user daemon.
+    NewDaemon,
+    /// Failed to start the vhost user daemon.
+    StartDaemon,
+    /// Failed to wait for the vhost user daemon.
+    WaitDaemon,
+}
+
+impl fmt::Display for Error {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "vhost_user_fs_error: {self:?}")
+    }
+}
+
+impl error::Error for Error {}
+
+impl convert::From<Error> for io::Error {
+    fn from(e: Error) -> Self {
+        io::Error::new(io::ErrorKind::Other, e)
+    }
+}
+
+struct VhostUserFsBackend<FS>
+where
+    FS: FileSystem + Send + Sync,
+{
+    server: Arc<Server<Arc<FS>>>,
+    event_idx: bool,
+    guest_mem: GuestMemoryAtomic<GuestMemoryMmap>,
+    cache_req: Option<SlaveFsCacheReq>,
+}
+
+impl<FS> VhostUserFsBackend<FS>
+where
+    FS: FileSystem + Send + Sync,
+{
+    fn process_queue(&mut self, vring: &mut MutexGuard<VringState>) -> std::io::Result<bool> {
+        let mut used_descs = false;
+
+        while let Some(desc_chain) = vring
+            .get_queue_mut()
+            .pop_descriptor_chain(self.guest_mem.memory())
+        {
+            let memory = desc_chain.memory();
+            let reader = Reader::from_descriptor_chain(memory, desc_chain.clone())
+                .map_err(|_| Error::InvalidDescriptorChain)?;
+            let writer = VirtioFsWriter::new(memory, desc_chain.clone())
+                .map_err(|_| Error::InvalidDescriptorChain)?;
+
+            self.server
+                .handle_message(
+                    reader,
+                    writer.into(),
+                    self.cache_req
+                        .as_mut()
+                        .map(|req| req as &mut dyn FsCacheReqHandler),
+                    None,
+                )
+                .map_err(Error::HandleRequests)?;
+
+            // TODO: Is len 0 correct?
+            if let Err(error) = vring
+                .get_queue_mut()
+                .add_used(memory, desc_chain.head_index(), 0)
+            {
+                error!(?error, "failed to add desc back to ring");
+            }
+
+            // TODO: What happens if we error out before here?
+            used_descs = true;
+        }
+
+        let needs_notification = if self.event_idx {
+            match vring
+                .get_queue_mut()
+                .needs_notification(self.guest_mem.memory().deref())
+            {
+                Ok(needs_notification) => needs_notification,
+                Err(error) => {
+                    error!(?error, "failed to check if queue needs notification");
+                    true
+                }
+            }
+        } else {
+            true
+        };
+
+        if needs_notification {
+            if let Err(error) = vring.signal_used_queue() {
+                error!(?error, "failed to signal used queue");
+            }
+        }
+
+        Ok(used_descs)
+    }
+}
+
+impl<FS> VhostUserBackendMut<VringMutex> for VhostUserFsBackend<FS>
+where
+    FS: FileSystem + Send + Sync,
+{
+    fn num_queues(&self) -> usize {
+        NUM_QUEUES
+    }
+
+    fn max_queue_size(&self) -> usize {
+        QUEUE_SIZE
+    }
+
+    fn features(&self) -> u64 {
+        1 << VIRTIO_F_VERSION_1
+            | 1 << VIRTIO_RING_F_INDIRECT_DESC
+            | 1 << VIRTIO_RING_F_EVENT_IDX
+            | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits()
+    }
+
+    fn protocol_features(&self) -> VhostUserProtocolFeatures {
+        VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::SLAVE_REQ
+    }
+
+    fn set_event_idx(&mut self, enabled: bool) {
+        self.event_idx = enabled;
+    }
+
+    fn update_memory(&mut self, _mem: GuestMemoryAtomic<GuestMemoryMmap>) -> std::io::Result<()> {
+        // This is what most the vhost user implementations do...
+        Ok(())
+    }
+
+    fn set_slave_req_fd(&mut self, cache_req: SlaveFsCacheReq) {
+        self.cache_req = Some(cache_req);
+    }
+
+    fn handle_event(
+        &mut self,
+        device_event: u16,
+        evset: vmm_sys_util::epoll::EventSet,
+        vrings: &[VringMutex],
+        _thread_id: usize,
+    ) -> std::io::Result<bool> {
+        if evset != EventSet::IN {
+            return Err(Error::HandleEventNotEpollIn.into());
+        }
+
+        let mut queue = match device_event {
+            // High priority queue
+            0 => vrings[0].get_mut(),
+            // Regurlar priority queue
+            1 => vrings[1].get_mut(),
+            _ => {
+                return Err(Error::HandleEventUnknownEvent.into());
+            }
+        };
+
+        if self.event_idx {
+            loop {
+                queue
+                    .get_queue_mut()
+                    .enable_notification(self.guest_mem.memory().deref())
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+                if !self.process_queue(&mut queue)? {
+                    break;
+                }
+            }
+        } else {
+            self.process_queue(&mut queue)?;
+        }
+
+        Ok(false)
+    }
+}
+
+pub fn start_virtiofs_daemon<FS, P>(fs: FS, socket: P) -> io::Result<()>
+where
+    FS: FileSystem + Send + Sync + 'static,
+    P: AsRef<Path>,
+{
+    let guest_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new());
+
+    let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
+
+    let backend = Arc::new(RwLock::new(VhostUserFsBackend {
+        server,
+        guest_mem: guest_mem.clone(),
+        event_idx: false,
+        cache_req: None,
+    }));
+
+    let listener = Listener::new(socket, true).unwrap();
+
+    let mut fs_daemon =
+        VhostUserDaemon::new(String::from("vhost-user-fs-tvix-store"), backend, guest_mem)
+            .map_err(|_| Error::NewDaemon)?;
+
+    fs_daemon.start(listener).map_err(|_| Error::StartDaemon)?;
+
+    fs_daemon.wait().map_err(|_| Error::WaitDaemon)?;
+
+    Ok(())
+}
diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs
new file mode 100644
index 000000000000..f7677e63142d
--- /dev/null
+++ b/tvix/castore/src/import.rs
@@ -0,0 +1,384 @@
+use crate::blobservice::BlobService;
+use crate::directoryservice::DirectoryPutter;
+use crate::directoryservice::DirectoryService;
+use crate::proto::node::Node;
+use crate::proto::Directory;
+use crate::proto::DirectoryNode;
+use crate::proto::FileNode;
+use crate::proto::SymlinkNode;
+use crate::Error as CastoreError;
+use async_stream::stream;
+use futures::pin_mut;
+use futures::Stream;
+use std::fs::FileType;
+
+#[cfg(target_family = "unix")]
+use std::os::unix::ffi::OsStrExt;
+
+use std::{
+    collections::HashMap,
+    fmt::Debug,
+    os::unix::prelude::PermissionsExt,
+    path::{Path, PathBuf},
+};
+use tokio_stream::StreamExt;
+use tracing::instrument;
+use walkdir::DirEntry;
+use walkdir::WalkDir;
+
+#[cfg(debug_assertions)]
+use std::collections::HashSet;
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("failed to upload directory at {0}: {1}")]
+    UploadDirectoryError(PathBuf, CastoreError),
+
+    #[error("invalid encoding encountered for entry {0:?}")]
+    InvalidEncoding(PathBuf),
+
+    #[error("unable to stat {0}: {1}")]
+    UnableToStat(PathBuf, std::io::Error),
+
+    #[error("unable to open {0}: {1}")]
+    UnableToOpen(PathBuf, std::io::Error),
+
+    #[error("unable to read {0}: {1}")]
+    UnableToRead(PathBuf, std::io::Error),
+
+    #[error("unsupported file {0} type: {1:?}")]
+    UnsupportedFileType(PathBuf, FileType),
+}
+
+impl From<CastoreError> for Error {
+    fn from(value: CastoreError) -> Self {
+        match value {
+            CastoreError::InvalidRequest(_) => panic!("tvix bug"),
+            CastoreError::StorageError(_) => panic!("error"),
+        }
+    }
+}
+
+impl From<Error> for std::io::Error {
+    fn from(value: Error) -> Self {
+        std::io::Error::new(std::io::ErrorKind::Other, value)
+    }
+}
+
+/// This processes a given [walkdir::DirEntry] and returns a
+/// proto::node::Node, depending on the type of the entry.
+///
+/// If the entry is a file, its contents are uploaded.
+/// If the entry is a directory, the Directory is uploaded as well.
+/// For this to work, it relies on the caller to provide the directory object
+/// with the previously returned (child) nodes.
+///
+/// It assumes to be called only if all children of it have already been processed. If the entry is
+/// indeed a directory, it'll also upload that directory to the store. For this, the
+/// so-far-assembled Directory object for this path needs to be passed in.
+///
+/// It assumes the caller adds returned nodes to the directories it assembles.
+#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
+async fn process_entry<'a, BS>(
+    blob_service: BS,
+    directory_putter: &'a mut Box<dyn DirectoryPutter>,
+    entry: &'a walkdir::DirEntry,
+    maybe_directory: Option<Directory>,
+) -> Result<Node, Error>
+where
+    BS: AsRef<dyn BlobService> + Clone,
+{
+    let file_type = entry.file_type();
+
+    if file_type.is_dir() {
+        let directory = maybe_directory
+            .expect("tvix bug: must be called with some directory in the case of directory");
+        let directory_digest = directory.digest();
+        let directory_size = directory.size();
+
+        // upload this directory
+        directory_putter
+            .put(directory)
+            .await
+            .map_err(|e| Error::UploadDirectoryError(entry.path().to_path_buf(), e))?;
+
+        return Ok(Node::Directory(DirectoryNode {
+            name: entry.file_name().as_bytes().to_owned().into(),
+            digest: directory_digest.into(),
+            size: directory_size,
+        }));
+    }
+
+    if file_type.is_symlink() {
+        let target: bytes::Bytes = std::fs::read_link(entry.path())
+            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?
+            .as_os_str()
+            .as_bytes()
+            .to_owned()
+            .into();
+
+        return Ok(Node::Symlink(SymlinkNode {
+            name: entry.file_name().as_bytes().to_owned().into(),
+            target,
+        }));
+    }
+
+    if file_type.is_file() {
+        let metadata = entry
+            .metadata()
+            .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?;
+
+        let mut file = tokio::fs::File::open(entry.path())
+            .await
+            .map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?;
+
+        let mut writer = blob_service.as_ref().open_write().await;
+
+        if let Err(e) = tokio::io::copy(&mut file, &mut writer).await {
+            return Err(Error::UnableToRead(entry.path().to_path_buf(), e));
+        };
+
+        let digest = writer
+            .close()
+            .await
+            .map_err(|e| Error::UnableToRead(entry.path().to_path_buf(), e))?;
+
+        return Ok(Node::File(FileNode {
+            name: entry.file_name().as_bytes().to_vec().into(),
+            digest: digest.into(),
+            size: metadata.len(),
+            // If it's executable by the user, it'll become executable.
+            // This matches nix's dump() function behaviour.
+            executable: metadata.permissions().mode() & 64 != 0,
+        }));
+    }
+
+    // Nix says things like: error: file '/home/raito/dev/code.tvl.fyi/tvix/glue/src/tests/import_fixtures/a_devnode' has an unsupported type
+    Err(Error::UnsupportedFileType(
+        entry.path().to_path_buf(),
+        file_type,
+    ))
+}
+
+/// Walk the filesystem at a given path and returns a level-keyed list of directory entries.
+///
+/// This is how [`ingest_path`] assembles the set of entries to pass on [`ingest_entries`].
+/// This low-level function can be used if additional filtering or processing is required on the
+/// entries.
+///
+/// Level here is in the context of graph theory, e.g. 2-level nodes
+/// are nodes that are at depth 2.
+///
+/// This function will walk the filesystem using `walkdir` and will consume
+/// `O(#number of entries)` space.
+#[instrument(fields(path), err)]
+pub fn walk_path_for_ingestion<P>(path: P) -> Result<Vec<Vec<DirEntry>>, Error>
+where
+    P: AsRef<Path> + std::fmt::Debug,
+{
+    let mut entries_per_depths: Vec<Vec<DirEntry>> = vec![Vec::new()];
+    for entry in WalkDir::new(path.as_ref())
+        .follow_links(false)
+        .follow_root_links(false)
+        .contents_first(false)
+        .sort_by_file_name()
+        .into_iter()
+    {
+        // Entry could be a NotFound, if the root path specified does not exist.
+        let entry = entry.map_err(|e| {
+            Error::UnableToOpen(
+                PathBuf::from(path.as_ref()),
+                e.into_io_error().expect("walkdir err must be some"),
+            )
+        })?;
+
+        if entry.depth() >= entries_per_depths.len() {
+            debug_assert!(
+                entry.depth() == entries_per_depths.len(),
+                "Received unexpected entry with depth {} during descent, previously at {}",
+                entry.depth(),
+                entries_per_depths.len()
+            );
+
+            entries_per_depths.push(vec![entry]);
+        } else {
+            entries_per_depths[entry.depth()].push(entry);
+        }
+    }
+
+    Ok(entries_per_depths)
+}
+
+/// Convert a leveled-key vector of filesystem entries into a stream of
+/// [DirEntry] in a way that honors the Merkle invariant, i.e. from bottom to top.
+pub fn leveled_entries_to_stream(
+    entries_per_depths: Vec<Vec<DirEntry>>,
+) -> impl Stream<Item = DirEntry> {
+    stream! {
+        for level in entries_per_depths.into_iter().rev() {
+            for entry in level.into_iter() {
+                yield entry;
+            }
+        }
+    }
+}
+
+/// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and
+/// [DirectoryService]. It returns the root node or an error.
+///
+/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
+#[instrument(skip(blob_service, directory_service), fields(path), err)]
+pub async fn ingest_path<'a, BS, DS, P>(
+    blob_service: BS,
+    directory_service: DS,
+    path: P,
+) -> Result<Node, Error>
+where
+    P: AsRef<Path> + std::fmt::Debug,
+    BS: AsRef<dyn BlobService> + Clone,
+    DS: AsRef<dyn DirectoryService>,
+{
+    let entries = walk_path_for_ingestion(path)?;
+    let entries_stream = leveled_entries_to_stream(entries);
+    pin_mut!(entries_stream);
+
+    ingest_entries(blob_service, directory_service, entries_stream).await
+}
+
+/// The Merkle invariant checker is an internal structure to perform bookkeeping of all directory
+/// entries we are ingesting and verifying we are ingesting them in the right order.
+///
+/// That is, whenever we process an entry `L`, we would like to verify if we didn't process earlier
+/// an entry `P` such that `P` is an **ancestor** of `L`.
+///
+/// If such a thing happened, it means that we have processed something like:
+///
+///```no_trust
+///        A
+///       / \
+///      B   C
+///     / \   \
+///    G  F    P <--------- processed before this one
+///           / \                                  |
+///          D  E                                  |
+///              \                                 |
+///               L  <-----------------------------+
+/// ```
+///
+/// This is exactly what must never happen.
+///
+/// Note: this checker is local, it can only see what happens on our side, not on the remote side,
+/// i.e. the different remote services.
+#[derive(Default)]
+#[cfg(debug_assertions)]
+struct MerkleInvariantChecker {
+    seen: HashSet<PathBuf>,
+}
+
+#[cfg(debug_assertions)]
+impl MerkleInvariantChecker {
+    /// See a directory entry and remember it.
+    fn see(&mut self, node: &DirEntry) {
+        self.seen.insert(node.path().to_owned());
+    }
+
+    /// Returns a potential ancestor already seen for that directory entry.
+    fn find_ancestor(&self, node: &DirEntry) -> Option<PathBuf> {
+        for anc in node.path().ancestors() {
+            if self.seen.contains(anc) {
+                return Some(anc.to_owned());
+            }
+        }
+
+        None
+    }
+}
+
+/// Ingests elements from the given stream of [`DirEntry`] into a the passed [`BlobService`] and
+/// [`DirectoryService`].
+/// It does not follow symlinks at the root, they will be ingested as actual symlinks.
+#[instrument(skip_all, ret, err)]
+pub async fn ingest_entries<'a, BS, DS, S>(
+    blob_service: BS,
+    directory_service: DS,
+    mut entries_async_iterator: S,
+) -> Result<Node, Error>
+where
+    BS: AsRef<dyn BlobService> + Clone,
+    DS: AsRef<dyn DirectoryService>,
+    S: Stream<Item = DirEntry> + std::marker::Unpin,
+{
+    let mut directories: HashMap<PathBuf, Directory> = HashMap::default();
+
+    let mut directory_putter = directory_service.as_ref().put_multiple_start();
+
+    #[cfg(debug_assertions)]
+    let mut invariant_checker: MerkleInvariantChecker = Default::default();
+
+    // We need to process a directory's children before processing
+    // the directory itself in order to have all the data needed
+    // to compute the hash.
+    while let Some(entry) = entries_async_iterator.next().await {
+        #[cfg(debug_assertions)]
+        {
+            // If we find an ancestor before we see this entry, this means that the caller
+            // broke the contract, refer to the documentation of the invariant checker to
+            // understand the reasoning here.
+            if let Some(ancestor) = invariant_checker.find_ancestor(&entry) {
+                panic!("Tvix bug: merkle invariant checker discovered that {} was processed before {}!",
+                    ancestor.display(),
+                    entry.path().display()
+                );
+            }
+
+            invariant_checker.see(&entry);
+        }
+
+        // FUTUREWORK: handle directory putting here, rather than piping it through process_entry.
+        let node = process_entry(
+            blob_service.clone(),
+            &mut directory_putter,
+            &entry,
+            // process_entry wants an Option<Directory> in case the entry points to a directory.
+            // make sure to provide it.
+            // If the directory has contents, we already have it in
+            // `directories` because we iterate over depth in reverse order (deepest to
+            // shallowest).
+            if entry.file_type().is_dir() {
+                Some(
+                    directories
+                        .remove(entry.path())
+                        // In that case, it contained no children
+                        .unwrap_or_default(),
+                )
+            } else {
+                None
+            },
+        )
+        .await?;
+
+        if entry.depth() == 0 {
+            // Make sure all the directories are flushed.
+            // FUTUREWORK: `debug_assert!` the resulting Ok(b3_digest) to be equal
+            // to `directories.get(entry.path())`.
+            if entry.file_type().is_dir() {
+                directory_putter.close().await?;
+            }
+            return Ok(node);
+        } else {
+            // calculate the parent path, and make sure we register the node there.
+            // NOTE: entry.depth() > 0
+            let parent_path = entry.path().parent().unwrap().to_path_buf();
+
+            // record node in parent directory, creating a new [proto:Directory] if not there yet.
+            let parent_directory = directories.entry(parent_path).or_default();
+            match node {
+                Node::Directory(e) => parent_directory.directories.push(e),
+                Node::File(e) => parent_directory.files.push(e),
+                Node::Symlink(e) => parent_directory.symlinks.push(e),
+            }
+        }
+    }
+    // unreachable, we already bailed out before if root doesn't exist.
+    unreachable!("Tvix bug: no root node emitted during ingestion")
+}
diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs
new file mode 100644
index 000000000000..8da0edef786b
--- /dev/null
+++ b/tvix/castore/src/lib.rs
@@ -0,0 +1,20 @@
+mod digests;
+mod errors;
+
+pub mod blobservice;
+pub mod directoryservice;
+pub mod fixtures;
+
+#[cfg(feature = "fs")]
+pub mod fs;
+
+pub mod import;
+pub mod proto;
+pub mod tonic;
+pub mod utils;
+
+pub use digests::{B3Digest, B3_LEN};
+pub use errors::Error;
+
+#[cfg(test)]
+mod tests;
diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
new file mode 100644
index 000000000000..9f3f944da26f
--- /dev/null
+++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
@@ -0,0 +1,168 @@
+use crate::blobservice::BlobService;
+use core::pin::pin;
+use futures::{stream::BoxStream, TryFutureExt};
+use std::{
+    collections::VecDeque,
+    ops::{Deref, DerefMut},
+};
+use tokio_stream::StreamExt;
+use tokio_util::io::ReaderStream;
+use tonic::{async_trait, Request, Response, Status, Streaming};
+use tracing::{instrument, warn};
+
+pub struct GRPCBlobServiceWrapper<T> {
+    blob_service: T,
+}
+
+impl<T> GRPCBlobServiceWrapper<T> {
+    pub fn new(blob_service: T) -> Self {
+        Self { blob_service }
+    }
+}
+
+// This is necessary because bytes::BytesMut comes up with
+// a default 64 bytes capacity that cannot be changed
+// easily if you assume a bytes::BufMut trait implementation
+// Therefore, we override the Default implementation here
+// TODO(raitobezarius?): upstream me properly
+struct BytesMutWithDefaultCapacity<const N: usize> {
+    inner: bytes::BytesMut,
+}
+
+impl<const N: usize> Deref for BytesMutWithDefaultCapacity<N> {
+    type Target = bytes::BytesMut;
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+impl<const N: usize> DerefMut for BytesMutWithDefaultCapacity<N> {
+    fn deref_mut(&mut self) -> &mut Self::Target {
+        &mut self.inner
+    }
+}
+
+impl<const N: usize> Default for BytesMutWithDefaultCapacity<N> {
+    fn default() -> Self {
+        BytesMutWithDefaultCapacity {
+            inner: bytes::BytesMut::with_capacity(N),
+        }
+    }
+}
+
+impl<const N: usize> bytes::Buf for BytesMutWithDefaultCapacity<N> {
+    fn remaining(&self) -> usize {
+        self.inner.remaining()
+    }
+
+    fn chunk(&self) -> &[u8] {
+        self.inner.chunk()
+    }
+
+    fn advance(&mut self, cnt: usize) {
+        self.inner.advance(cnt);
+    }
+}
+
+unsafe impl<const N: usize> bytes::BufMut for BytesMutWithDefaultCapacity<N> {
+    fn remaining_mut(&self) -> usize {
+        self.inner.remaining_mut()
+    }
+
+    unsafe fn advance_mut(&mut self, cnt: usize) {
+        self.inner.advance_mut(cnt);
+    }
+
+    fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
+        self.inner.chunk_mut()
+    }
+}
+
+#[async_trait]
+impl<T> super::blob_service_server::BlobService for GRPCBlobServiceWrapper<T>
+where
+    T: Deref<Target = dyn BlobService> + Send + Sync + 'static,
+{
+    // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
+    type ReadStream = BoxStream<'static, Result<super::BlobChunk, Status>>;
+
+    #[instrument(skip(self))]
+    async fn stat(
+        &self,
+        request: Request<super::StatBlobRequest>,
+    ) -> Result<Response<super::StatBlobResponse>, Status> {
+        let rq = request.into_inner();
+        let req_digest = rq
+            .digest
+            .try_into()
+            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
+
+        match self.blob_service.chunks(&req_digest).await {
+            Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
+            Ok(Some(chunk_metas)) => Ok(Response::new(super::StatBlobResponse {
+                chunks: chunk_metas,
+                ..Default::default()
+            })),
+            Err(e) => Err(e.into()),
+        }
+    }
+
+    #[instrument(skip(self))]
+    async fn read(
+        &self,
+        request: Request<super::ReadBlobRequest>,
+    ) -> Result<Response<Self::ReadStream>, Status> {
+        let rq = request.into_inner();
+
+        let req_digest = rq
+            .digest
+            .try_into()
+            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
+
+        match self.blob_service.open_read(&req_digest).await {
+            Ok(Some(r)) => {
+                let chunks_stream =
+                    ReaderStream::new(r).map(|chunk| Ok(super::BlobChunk { data: chunk? }));
+                Ok(Response::new(Box::pin(chunks_stream)))
+            }
+            Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
+            Err(e) => Err(e.into()),
+        }
+    }
+
+    #[instrument(skip(self))]
+    async fn put(
+        &self,
+        request: Request<Streaming<super::BlobChunk>>,
+    ) -> Result<Response<super::PutBlobResponse>, Status> {
+        let req_inner = request.into_inner();
+
+        let data_stream = req_inner.map(|x| {
+            x.map(|x| VecDeque::from(x.data.to_vec()))
+                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
+        });
+
+        let mut data_reader = tokio_util::io::StreamReader::new(data_stream);
+
+        let mut blob_writer = pin!(self.blob_service.open_write().await);
+
+        tokio::io::copy(&mut data_reader, &mut blob_writer)
+            .await
+            .map_err(|e| {
+                warn!("error copying: {}", e);
+                Status::internal("error copying")
+            })?;
+
+        let digest = blob_writer
+            .close()
+            .map_err(|e| {
+                warn!("error closing stream: {}", e);
+                Status::internal("error closing stream")
+            })
+            .await?;
+
+        Ok(Response::new(super::PutBlobResponse {
+            digest: digest.into(),
+        }))
+    }
+}
diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
new file mode 100644
index 000000000000..b83048045861
--- /dev/null
+++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
@@ -0,0 +1,178 @@
+use crate::proto;
+use crate::{directoryservice::DirectoryService, B3Digest};
+use futures::StreamExt;
+use std::collections::HashMap;
+use std::ops::Deref;
+use tokio::sync::mpsc::channel;
+use tokio_stream::wrappers::ReceiverStream;
+use tonic::{async_trait, Request, Response, Status, Streaming};
+use tracing::{debug, instrument, warn};
+
+pub struct GRPCDirectoryServiceWrapper<T> {
+    directory_service: T,
+}
+
+impl<T> GRPCDirectoryServiceWrapper<T> {
+    pub fn new(directory_service: T) -> Self {
+        Self { directory_service }
+    }
+}
+
+#[async_trait]
+impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<T>
+where
+    T: Deref<Target = dyn DirectoryService> + Send + Sync + 'static,
+{
+    type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>;
+
+    #[instrument(skip(self))]
+    async fn get(
+        &self,
+        request: Request<proto::GetDirectoryRequest>,
+    ) -> Result<Response<Self::GetStream>, Status> {
+        let (tx, rx) = channel(5);
+
+        let req_inner = request.into_inner();
+
+        // look at the digest in the request and put it in the top of the queue.
+        match &req_inner.by_what {
+            None => return Err(Status::invalid_argument("by_what needs to be specified")),
+            Some(proto::get_directory_request::ByWhat::Digest(ref digest)) => {
+                let digest: B3Digest = digest
+                    .clone()
+                    .try_into()
+                    .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
+
+                if !req_inner.recursive {
+                    let e: Result<proto::Directory, Status> =
+                        match self.directory_service.get(&digest).await {
+                            Ok(Some(directory)) => Ok(directory),
+                            Ok(None) => {
+                                Err(Status::not_found(format!("directory {} not found", digest)))
+                            }
+                            Err(e) => Err(e.into()),
+                        };
+
+                    if tx.send(e).await.is_err() {
+                        debug!("receiver dropped");
+                    }
+                } else {
+                    // If recursive was requested, traverse via get_recursive.
+                    let mut directories_it = self.directory_service.get_recursive(&digest);
+
+                    while let Some(e) = directories_it.next().await {
+                        // map err in res from Error to Status
+                        let res = e.map_err(|e| Status::internal(e.to_string()));
+                        if tx.send(res).await.is_err() {
+                            debug!("receiver dropped");
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+
+        let receiver_stream = ReceiverStream::new(rx);
+        Ok(Response::new(receiver_stream))
+    }
+
+    #[instrument(skip(self, request))]
+    async fn put(
+        &self,
+        request: Request<Streaming<proto::Directory>>,
+    ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
+        let mut req_inner = request.into_inner();
+        // TODO: let this use DirectoryPutter to the store it's connected to,
+        // and move the validation logic into [SimplePutter].
+
+        // This keeps track of the seen directory keys, and their size.
+        // This is used to validate the size field of a reference to a previously sent directory.
+        // We don't need to keep the contents around, they're stored in the DB.
+        // https://github.com/rust-lang/rust-clippy/issues/5812
+        #[allow(clippy::mutable_key_type)]
+        let mut seen_directories_sizes: HashMap<B3Digest, u64> = HashMap::new();
+        let mut last_directory_dgst: Option<B3Digest> = None;
+
+        // Consume directories, and insert them into the store.
+        // Reject directory messages that refer to Directories not sent in the same stream.
+        while let Some(directory) = req_inner.message().await? {
+            // validate the directory itself.
+            if let Err(e) = directory.validate() {
+                return Err(Status::invalid_argument(format!(
+                    "directory {} failed validation: {}",
+                    directory.digest(),
+                    e,
+                )));
+            }
+
+            // for each child directory this directory refers to, we need
+            // to ensure it has been seen already in this stream, and that the size
+            // matches what we recorded.
+            for child_directory in &directory.directories {
+                let child_directory_digest: B3Digest = child_directory
+                    .digest
+                    .clone()
+                    .try_into()
+                    .map_err(|_e| Status::internal("invalid child directory digest len"))?;
+
+                match seen_directories_sizes.get(&child_directory_digest) {
+                    None => {
+                        return Err(Status::invalid_argument(format!(
+                            "child directory '{:?}' ({}) in directory '{}' not seen yet",
+                            child_directory.name,
+                            &child_directory_digest,
+                            &directory.digest(),
+                        )));
+                    }
+                    Some(seen_child_directory_size) => {
+                        if seen_child_directory_size != &child_directory.size {
+                            return Err(Status::invalid_argument(format!(
+                                    "child directory '{:?}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}",
+                                    child_directory.name,
+                                    &child_directory_digest,
+                                    &directory.digest(),
+                                    seen_child_directory_size,
+                                    child_directory.size,
+                            )));
+                        }
+                    }
+                }
+            }
+
+            // NOTE: We can't know if a directory we're receiving actually is
+            // part of the closure, because we receive directories from the leaf nodes up to
+            // the root.
+            // The only thing we could to would be doing a final check when the
+            // last Directory was received, that all Directories received so far are
+            // reachable from that (root) node.
+
+            let dgst = directory.digest();
+            seen_directories_sizes.insert(dgst.clone(), directory.size());
+            last_directory_dgst = Some(dgst.clone());
+
+            // check if the directory already exists in the database. We can skip
+            // inserting if it's already there, as that'd be a no-op.
+            match self.directory_service.get(&dgst).await {
+                Err(e) => {
+                    warn!("error checking if directory already exists: {}", e);
+                    return Err(e.into());
+                }
+                // skip if already exists
+                Ok(Some(_)) => {}
+                // insert if it doesn't already exist
+                Ok(None) => {
+                    self.directory_service.put(directory).await?;
+                }
+            }
+        }
+
+        // We're done receiving. peek at last_directory_digest and either return the digest,
+        // or an error, if we received an empty stream.
+        match last_directory_dgst {
+            None => Err(Status::invalid_argument("no directories received")),
+            Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse {
+                root_digest: last_directory_dgst.into(),
+            })),
+        }
+    }
+}
diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs
new file mode 100644
index 000000000000..59f5c1fdf3f6
--- /dev/null
+++ b/tvix/castore/src/proto/mod.rs
@@ -0,0 +1,387 @@
+#![allow(clippy::derive_partial_eq_without_eq, non_snake_case)]
+// https://github.com/hyperium/tonic/issues/1056
+use bstr::ByteSlice;
+use std::{collections::HashSet, iter::Peekable, str};
+
+use prost::Message;
+
+mod grpc_blobservice_wrapper;
+mod grpc_directoryservice_wrapper;
+
+pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper;
+pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper;
+
+use crate::{B3Digest, B3_LEN};
+
+tonic::include_proto!("tvix.castore.v1");
+
+#[cfg(feature = "tonic-reflection")]
+/// Compiled file descriptors for implementing [gRPC
+/// reflection](https://github.com/grpc/grpc/blob/master/doc/server-reflection.md) with e.g.
+/// [`tonic_reflection`](https://docs.rs/tonic-reflection).
+pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix.castore.v1");
+
+#[cfg(test)]
+mod tests;
+
+/// Errors that can occur during the validation of [Directory] messages.
+#[derive(Debug, PartialEq, Eq, thiserror::Error)]
+pub enum ValidateDirectoryError {
+    /// Elements are not in sorted order
+    #[error("{:?} is not sorted", .0.as_bstr())]
+    WrongSorting(Vec<u8>),
+    /// Multiple elements with the same name encountered
+    #[error("{:?} is a duplicate name", .0.as_bstr())]
+    DuplicateName(Vec<u8>),
+    /// Invalid node
+    #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())]
+    InvalidNode(Vec<u8>, ValidateNodeError),
+    #[error("Total size exceeds u32::MAX")]
+    SizeOverflow,
+}
+
+/// Errors that occur during Node validation
+#[derive(Debug, PartialEq, Eq, thiserror::Error)]
+pub enum ValidateNodeError {
+    #[error("No node set")]
+    NoNodeSet,
+    /// Invalid digest length encountered
+    #[error("Invalid Digest length: {0}")]
+    InvalidDigestLen(usize),
+    /// Invalid name encountered
+    #[error("Invalid name: {}", .0.as_bstr())]
+    InvalidName(Vec<u8>),
+    /// Invalid symlink target
+    #[error("Invalid symlink target: {}", .0.as_bstr())]
+    InvalidSymlinkTarget(Vec<u8>),
+}
+
+/// Errors that occur during StatBlobResponse validation
+#[derive(Debug, PartialEq, Eq, thiserror::Error)]
+pub enum ValidateStatBlobResponseError {
+    /// Invalid digest length encountered
+    #[error("Invalid digest length {0} for chunk #{1}")]
+    InvalidDigestLen(usize, usize),
+}
+
+/// Checks a Node name for validity as an intermediate node.
+/// We disallow slashes, null bytes, '.', '..' and the empty string.
+fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> {
+    if name.is_empty()
+        || name == b".."
+        || name == b"."
+        || name.contains(&0x00)
+        || name.contains(&b'/')
+    {
+        Err(ValidateNodeError::InvalidName(name.to_owned()))
+    } else {
+        Ok(())
+    }
+}
+
+/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode]
+/// and [node::Node], so we can ask all of them for the name easily.
+pub trait NamedNode {
+    fn get_name(&self) -> &[u8];
+}
+
+impl NamedNode for &FileNode {
+    fn get_name(&self) -> &[u8] {
+        &self.name
+    }
+}
+
+impl NamedNode for &DirectoryNode {
+    fn get_name(&self) -> &[u8] {
+        &self.name
+    }
+}
+
+impl NamedNode for &SymlinkNode {
+    fn get_name(&self) -> &[u8] {
+        &self.name
+    }
+}
+
+impl NamedNode for node::Node {
+    fn get_name(&self) -> &[u8] {
+        match self {
+            node::Node::File(node_file) => &node_file.name,
+            node::Node::Directory(node_directory) => &node_directory.name,
+            node::Node::Symlink(node_symlink) => &node_symlink.name,
+        }
+    }
+}
+
+impl Node {
+    /// Ensures the node has a valid enum kind (is Some), and passes its
+    // per-enum validation.
+    pub fn validate(&self) -> Result<(), ValidateNodeError> {
+        if let Some(node) = self.node.as_ref() {
+            node.validate()
+        } else {
+            Err(ValidateNodeError::NoNodeSet)
+        }
+    }
+}
+
+impl node::Node {
+    /// Returns the node with a new name.
+    pub fn rename(self, name: bytes::Bytes) -> Self {
+        match self {
+            node::Node::Directory(n) => node::Node::Directory(DirectoryNode { name, ..n }),
+            node::Node::File(n) => node::Node::File(FileNode { name, ..n }),
+            node::Node::Symlink(n) => node::Node::Symlink(SymlinkNode { name, ..n }),
+        }
+    }
+
+    /// Ensures the node has a valid name, and checks the type-specific fields too.
+    pub fn validate(&self) -> Result<(), ValidateNodeError> {
+        match self {
+            // for a directory root node, ensure the digest has the appropriate size.
+            node::Node::Directory(directory_node) => {
+                if directory_node.digest.len() != B3_LEN {
+                    Err(ValidateNodeError::InvalidDigestLen(
+                        directory_node.digest.len(),
+                    ))?;
+                }
+                validate_node_name(&directory_node.name)
+            }
+            // for a file root node, ensure the digest has the appropriate size.
+            node::Node::File(file_node) => {
+                if file_node.digest.len() != B3_LEN {
+                    Err(ValidateNodeError::InvalidDigestLen(file_node.digest.len()))?;
+                }
+                validate_node_name(&file_node.name)
+            }
+            // ensure the symlink target is not empty and doesn't contain null bytes.
+            node::Node::Symlink(symlink_node) => {
+                if symlink_node.target.is_empty() || symlink_node.target.contains(&b'\0') {
+                    Err(ValidateNodeError::InvalidSymlinkTarget(
+                        symlink_node.target.to_vec(),
+                    ))?;
+                }
+                validate_node_name(&symlink_node.name)
+            }
+        }
+    }
+}
+
+impl Eq for node::Node {}
+
+impl PartialOrd for node::Node {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for node::Node {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.get_name().cmp(other.get_name())
+    }
+}
+
+/// Accepts a name, and a mutable reference to the previous name.
+/// If the passed name is larger than the previous one, the reference is updated.
+/// If it's not, an error is returned.
+fn update_if_lt_prev<'n>(
+    prev_name: &mut &'n [u8],
+    name: &'n [u8],
+) -> Result<(), ValidateDirectoryError> {
+    if *name < **prev_name {
+        return Err(ValidateDirectoryError::WrongSorting(name.to_vec()));
+    }
+    *prev_name = name;
+    Ok(())
+}
+
+/// Inserts the given name into a HashSet if it's not already in there.
+/// If it is, an error is returned.
+fn insert_once<'n>(
+    seen_names: &mut HashSet<&'n [u8]>,
+    name: &'n [u8],
+) -> Result<(), ValidateDirectoryError> {
+    if seen_names.get(name).is_some() {
+        return Err(ValidateDirectoryError::DuplicateName(name.to_vec()));
+    }
+    seen_names.insert(name);
+    Ok(())
+}
+
+fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> {
+    iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i))
+}
+
+impl Directory {
+    /// The size of a directory is the number of all regular and symlink elements,
+    /// the number of directory elements, and their size fields.
+    pub fn size(&self) -> u64 {
+        if cfg!(debug_assertions) {
+            self.size_checked()
+                .expect("Directory::size exceeds u64::MAX")
+        } else {
+            self.size_checked().unwrap_or(u64::MAX)
+        }
+    }
+
+    fn size_checked(&self) -> Option<u64> {
+        checked_sum([
+            self.files.len().try_into().ok()?,
+            self.symlinks.len().try_into().ok()?,
+            self.directories.len().try_into().ok()?,
+            checked_sum(self.directories.iter().map(|e| e.size))?,
+        ])
+    }
+
+    /// Calculates the digest of a Directory, which is the blake3 hash of a
+    /// Directory protobuf message, serialized in protobuf canonical form.
+    pub fn digest(&self) -> B3Digest {
+        let mut hasher = blake3::Hasher::new();
+
+        hasher
+            .update(&self.encode_to_vec())
+            .finalize()
+            .as_bytes()
+            .into()
+    }
+
+    /// validate checks the directory for invalid data, such as:
+    /// - violations of name restrictions
+    /// - invalid digest lengths
+    /// - not properly sorted lists
+    /// - duplicate names in the three lists
+    pub fn validate(&self) -> Result<(), ValidateDirectoryError> {
+        let mut seen_names: HashSet<&[u8]> = HashSet::new();
+
+        let mut last_directory_name: &[u8] = b"";
+        let mut last_file_name: &[u8] = b"";
+        let mut last_symlink_name: &[u8] = b"";
+
+        // check directories
+        for directory_node in &self.directories {
+            node::Node::Directory(directory_node.clone())
+                .validate()
+                .map_err(|e| {
+                    ValidateDirectoryError::InvalidNode(directory_node.name.to_vec(), e)
+                })?;
+
+            update_if_lt_prev(&mut last_directory_name, &directory_node.name)?;
+            insert_once(&mut seen_names, &directory_node.name)?;
+        }
+
+        // check files
+        for file_node in &self.files {
+            node::Node::File(file_node.clone())
+                .validate()
+                .map_err(|e| ValidateDirectoryError::InvalidNode(file_node.name.to_vec(), e))?;
+
+            update_if_lt_prev(&mut last_file_name, &file_node.name)?;
+            insert_once(&mut seen_names, &file_node.name)?;
+        }
+
+        // check symlinks
+        for symlink_node in &self.symlinks {
+            node::Node::Symlink(symlink_node.clone())
+                .validate()
+                .map_err(|e| ValidateDirectoryError::InvalidNode(symlink_node.name.to_vec(), e))?;
+
+            update_if_lt_prev(&mut last_symlink_name, &symlink_node.name)?;
+            insert_once(&mut seen_names, &symlink_node.name)?;
+        }
+
+        self.size_checked()
+            .ok_or(ValidateDirectoryError::SizeOverflow)?;
+
+        Ok(())
+    }
+
+    /// Allows iterating over all three nodes ([DirectoryNode], [FileNode],
+    /// [SymlinkNode]) in an ordered fashion, as long as the individual lists
+    /// are sorted (which can be checked by the [Directory::validate]).
+    pub fn nodes(&self) -> DirectoryNodesIterator {
+        return DirectoryNodesIterator {
+            i_directories: self.directories.iter().peekable(),
+            i_files: self.files.iter().peekable(),
+            i_symlinks: self.symlinks.iter().peekable(),
+        };
+    }
+}
+
+impl StatBlobResponse {
+    /// Validates a StatBlobResponse. All chunks must have valid blake3 digests.
+    /// It is allowed to send an empty list, if no more granular chunking is
+    /// available.
+    pub fn validate(&self) -> Result<(), ValidateStatBlobResponseError> {
+        for (i, chunk) in self.chunks.iter().enumerate() {
+            if chunk.digest.len() != blake3::KEY_LEN {
+                return Err(ValidateStatBlobResponseError::InvalidDigestLen(
+                    chunk.digest.len(),
+                    i,
+                ));
+            }
+        }
+        Ok(())
+    }
+}
+
+/// Struct to hold the state of an iterator over all nodes of a Directory.
+///
+/// Internally, this keeps peekable Iterators over all three lists of a
+/// Directory message.
+pub struct DirectoryNodesIterator<'a> {
+    // directory: &Directory,
+    i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>,
+    i_files: Peekable<std::slice::Iter<'a, FileNode>>,
+    i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>,
+}
+
+/// looks at two elements implementing NamedNode, and returns true if "left
+/// is smaller / comes first".
+///
+/// Some(_) is preferred over None.
+fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool {
+    match left {
+        // if left is None, right always wins
+        None => false,
+        Some(left_inner) => {
+            // left is Some.
+            match right {
+                // left is Some, right is None - left wins.
+                None => true,
+                Some(right_inner) => {
+                    // both are Some - compare the name.
+                    return left_inner.get_name() < right_inner.get_name();
+                }
+            }
+        }
+    }
+}
+
+impl Iterator for DirectoryNodesIterator<'_> {
+    type Item = node::Node;
+
+    // next returns the next node in the Directory.
+    // we peek at all three internal iterators, and pick the one with the
+    // smallest name, to ensure lexicographical ordering.
+    // The individual lists are already known to be sorted.
+    fn next(&mut self) -> Option<Self::Item> {
+        if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) {
+            // i_directories is still in the game, compare with symlinks
+            if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) {
+                self.i_directories
+                    .next()
+                    .cloned()
+                    .map(node::Node::Directory)
+            } else {
+                self.i_symlinks.next().cloned().map(node::Node::Symlink)
+            }
+        } else {
+            // i_files is still in the game, compare with symlinks
+            if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) {
+                self.i_files.next().cloned().map(node::Node::File)
+            } else {
+                self.i_symlinks.next().cloned().map(node::Node::Symlink)
+            }
+        }
+    }
+}
diff --git a/tvix/castore/src/proto/tests/directory.rs b/tvix/castore/src/proto/tests/directory.rs
new file mode 100644
index 000000000000..5fda394775b5
--- /dev/null
+++ b/tvix/castore/src/proto/tests/directory.rs
@@ -0,0 +1,373 @@
+use crate::proto::{
+    Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError, ValidateNodeError,
+};
+
+use hex_literal::hex;
+
+const DUMMY_DIGEST: [u8; 32] = [0; 32];
+
+#[test]
+fn size() {
+    {
+        let d = Directory::default();
+        assert_eq!(d.size(), 0);
+    }
+    {
+        let d = Directory {
+            directories: vec![DirectoryNode {
+                name: "foo".into(),
+                digest: DUMMY_DIGEST.to_vec().into(),
+                size: 0,
+            }],
+            ..Default::default()
+        };
+        assert_eq!(d.size(), 1);
+    }
+    {
+        let d = Directory {
+            directories: vec![DirectoryNode {
+                name: "foo".into(),
+                digest: DUMMY_DIGEST.to_vec().into(),
+                size: 4,
+            }],
+            ..Default::default()
+        };
+        assert_eq!(d.size(), 5);
+    }
+    {
+        let d = Directory {
+            files: vec![FileNode {
+                name: "foo".into(),
+                digest: DUMMY_DIGEST.to_vec().into(),
+                size: 42,
+                executable: false,
+            }],
+            ..Default::default()
+        };
+        assert_eq!(d.size(), 1);
+    }
+    {
+        let d = Directory {
+            symlinks: vec![SymlinkNode {
+                name: "foo".into(),
+                target: "bar".into(),
+            }],
+            ..Default::default()
+        };
+        assert_eq!(d.size(), 1);
+    }
+}
+
+#[test]
+#[cfg_attr(not(debug_assertions), ignore)]
+#[should_panic = "Directory::size exceeds u64::MAX"]
+fn size_unchecked_panic() {
+    let d = Directory {
+        directories: vec![DirectoryNode {
+            name: "foo".into(),
+            digest: DUMMY_DIGEST.to_vec().into(),
+            size: u64::MAX,
+        }],
+        ..Default::default()
+    };
+
+    d.size();
+}
+
+#[test]
+#[cfg_attr(debug_assertions, ignore)]
+fn size_unchecked_saturate() {
+    let d = Directory {
+        directories: vec![DirectoryNode {
+            name: "foo".into(),
+            digest: DUMMY_DIGEST.to_vec().into(),
+            size: u64::MAX,
+        }],
+        ..Default::default()
+    };
+
+    assert_eq!(d.size(), u64::MAX);
+}
+
+#[test]
+fn size_checked() {
+    // We don't test the overflow cases that rely purely on immediate
+    // child count, since that would take an absurd amount of memory.
+    {
+        let d = Directory {
+            directories: vec![DirectoryNode {
+                name: "foo".into(),
+                digest: DUMMY_DIGEST.to_vec().into(),
+                size: u64::MAX - 1,
+            }],
+            ..Default::default()
+        };
+        assert_eq!(d.size_checked(), Some(u64::MAX));
+    }
+    {
+        let d = Directory {
+            directories: vec![DirectoryNode {
+                name: "foo".into(),
+                digest: DUMMY_DIGEST.to_vec().into(),
+                size: u64::MAX,
+            }],
+            ..Default::default()
+        };
+        assert_eq!(d.size_checked(), None);
+    }
+    {
+        let d = Directory {
+            directories: vec![
+                DirectoryNode {
+                    name: "foo".into(),
+                    digest: DUMMY_DIGEST.to_vec().into(),
+                    size: u64::MAX / 2,
+                },
+                DirectoryNode {
+                    name: "foo".into(),
+                    digest: DUMMY_DIGEST.to_vec().into(),
+                    size: u64::MAX / 2,
+                },
+            ],
+            ..Default::default()
+        };
+        assert_eq!(d.size_checked(), None);
+    }
+}
+
+#[test]
+fn digest() {
+    let d = Directory::default();
+
+    assert_eq!(
+        d.digest(),
+        (&hex!("af1349b9f5f9a1a6a0404dea36dcc9499bcb25c9adc112b7cc9a93cae41f3262")).into()
+    )
+}
+
+#[test]
+fn validate_empty() {
+    let d = Directory::default();
+    assert_eq!(d.validate(), Ok(()));
+}
+
+#[test]
+fn validate_invalid_names() {
+    {
+        let d = Directory {
+            directories: vec![DirectoryNode {
+                name: "".into(),
+                digest: DUMMY_DIGEST.to_vec().into(),
+                size: 42,
+            }],
+            ..Default::default()
+        };
+        match d.validate().expect_err("must fail") {
+            ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
+                assert_eq!(n, b"")
+            }
+            _ => panic!("unexpected error"),
+        };
+    }
+
+    {
+        let d = Directory {
+            directories: vec![DirectoryNode {
+                name: ".".into(),
+                digest: DUMMY_DIGEST.to_vec().into(),
+                size: 42,
+            }],
+            ..Default::default()
+        };
+        match d.validate().expect_err("must fail") {
+            ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
+                assert_eq!(n, b".")
+            }
+            _ => panic!("unexpected error"),
+        };
+    }
+
+    {
+        let d = Directory {
+            files: vec![FileNode {
+                name: "..".into(),
+                digest: DUMMY_DIGEST.to_vec().into(),
+                size: 42,
+                executable: false,
+            }],
+            ..Default::default()
+        };
+        match d.validate().expect_err("must fail") {
+            ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
+                assert_eq!(n, b"..")
+            }
+            _ => panic!("unexpected error"),
+        };
+    }
+
+    {
+        let d = Directory {
+            symlinks: vec![SymlinkNode {
+                name: "\x00".into(),
+                target: "foo".into(),
+            }],
+            ..Default::default()
+        };
+        match d.validate().expect_err("must fail") {
+            ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
+                assert_eq!(n, b"\x00")
+            }
+            _ => panic!("unexpected error"),
+        };
+    }
+
+    {
+        let d = Directory {
+            symlinks: vec![SymlinkNode {
+                name: "foo/bar".into(),
+                target: "foo".into(),
+            }],
+            ..Default::default()
+        };
+        match d.validate().expect_err("must fail") {
+            ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
+                assert_eq!(n, b"foo/bar")
+            }
+            _ => panic!("unexpected error"),
+        };
+    }
+}
+
+#[test]
+fn validate_invalid_digest() {
+    let d = Directory {
+        directories: vec![DirectoryNode {
+            name: "foo".into(),
+            digest: vec![0x00, 0x42].into(), // invalid length
+            size: 42,
+        }],
+        ..Default::default()
+    };
+    match d.validate().expect_err("must fail") {
+        ValidateDirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => {
+            assert_eq!(n, 2)
+        }
+        _ => panic!("unexpected error"),
+    }
+}
+
+#[test]
+fn validate_sorting() {
+    // "b" comes before "a", bad.
+    {
+        let d = Directory {
+            directories: vec![
+                DirectoryNode {
+                    name: "b".into(),
+                    digest: DUMMY_DIGEST.to_vec().into(),
+                    size: 42,
+                },
+                DirectoryNode {
+                    name: "a".into(),
+                    digest: DUMMY_DIGEST.to_vec().into(),
+                    size: 42,
+                },
+            ],
+            ..Default::default()
+        };
+        match d.validate().expect_err("must fail") {
+            ValidateDirectoryError::WrongSorting(s) => {
+                assert_eq!(s, b"a");
+            }
+            _ => panic!("unexpected error"),
+        }
+    }
+
+    // "a" exists twice, bad.
+    {
+        let d = Directory {
+            directories: vec![
+                DirectoryNode {
+                    name: "a".into(),
+                    digest: DUMMY_DIGEST.to_vec().into(),
+                    size: 42,
+                },
+                DirectoryNode {
+                    name: "a".into(),
+                    digest: DUMMY_DIGEST.to_vec().into(),
+                    size: 42,
+                },
+            ],
+            ..Default::default()
+        };
+        match d.validate().expect_err("must fail") {
+            ValidateDirectoryError::DuplicateName(s) => {
+                assert_eq!(s, b"a");
+            }
+            _ => panic!("unexpected error"),
+        }
+    }
+
+    // "a" comes before "b", all good.
+    {
+        let d = Directory {
+            directories: vec![
+                DirectoryNode {
+                    name: "a".into(),
+                    digest: DUMMY_DIGEST.to_vec().into(),
+                    size: 42,
+                },
+                DirectoryNode {
+                    name: "b".into(),
+                    digest: DUMMY_DIGEST.to_vec().into(),
+                    size: 42,
+                },
+            ],
+            ..Default::default()
+        };
+
+        d.validate().expect("validate shouldn't error");
+    }
+
+    // [b, c] and [a] are both properly sorted.
+    {
+        let d = Directory {
+            directories: vec![
+                DirectoryNode {
+                    name: "b".into(),
+                    digest: DUMMY_DIGEST.to_vec().into(),
+                    size: 42,
+                },
+                DirectoryNode {
+                    name: "c".into(),
+                    digest: DUMMY_DIGEST.to_vec().into(),
+                    size: 42,
+                },
+            ],
+            symlinks: vec![SymlinkNode {
+                name: "a".into(),
+                target: "foo".into(),
+            }],
+            ..Default::default()
+        };
+
+        d.validate().expect("validate shouldn't error");
+    }
+}
+
+#[test]
+fn validate_overflow() {
+    let d = Directory {
+        directories: vec![DirectoryNode {
+            name: "foo".into(),
+            digest: DUMMY_DIGEST.to_vec().into(),
+            size: u64::MAX,
+        }],
+        ..Default::default()
+    };
+
+    match d.validate().expect_err("must fail") {
+        ValidateDirectoryError::SizeOverflow => {}
+        _ => panic!("unexpected error"),
+    }
+}
diff --git a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs
new file mode 100644
index 000000000000..68f147a33210
--- /dev/null
+++ b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs
@@ -0,0 +1,78 @@
+use crate::proto::Directory;
+use crate::proto::DirectoryNode;
+use crate::proto::FileNode;
+use crate::proto::NamedNode;
+use crate::proto::SymlinkNode;
+
+#[test]
+fn iterator() {
+    let d = Directory {
+        directories: vec![
+            DirectoryNode {
+                name: "c".into(),
+                ..DirectoryNode::default()
+            },
+            DirectoryNode {
+                name: "d".into(),
+                ..DirectoryNode::default()
+            },
+            DirectoryNode {
+                name: "h".into(),
+                ..DirectoryNode::default()
+            },
+            DirectoryNode {
+                name: "l".into(),
+                ..DirectoryNode::default()
+            },
+        ],
+        files: vec![
+            FileNode {
+                name: "b".into(),
+                ..FileNode::default()
+            },
+            FileNode {
+                name: "e".into(),
+                ..FileNode::default()
+            },
+            FileNode {
+                name: "g".into(),
+                ..FileNode::default()
+            },
+            FileNode {
+                name: "j".into(),
+                ..FileNode::default()
+            },
+        ],
+        symlinks: vec![
+            SymlinkNode {
+                name: "a".into(),
+                ..SymlinkNode::default()
+            },
+            SymlinkNode {
+                name: "f".into(),
+                ..SymlinkNode::default()
+            },
+            SymlinkNode {
+                name: "i".into(),
+                ..SymlinkNode::default()
+            },
+            SymlinkNode {
+                name: "k".into(),
+                ..SymlinkNode::default()
+            },
+        ],
+    };
+
+    // We keep this strings here and convert to string to make the comparison
+    // less messy.
+    let mut node_names: Vec<String> = vec![];
+
+    for node in d.nodes() {
+        node_names.push(String::from_utf8(node.get_name().to_vec()).unwrap());
+    }
+
+    assert_eq!(
+        vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"],
+        node_names
+    );
+}
diff --git a/tvix/castore/src/proto/tests/grpc_blobservice.rs b/tvix/castore/src/proto/tests/grpc_blobservice.rs
new file mode 100644
index 000000000000..fb202b7d8a51
--- /dev/null
+++ b/tvix/castore/src/proto/tests/grpc_blobservice.rs
@@ -0,0 +1,94 @@
+use crate::fixtures::{BLOB_A, BLOB_A_DIGEST};
+use crate::proto::{BlobChunk, ReadBlobRequest, StatBlobRequest};
+use crate::utils::gen_blobsvc_grpc_client;
+use tokio_stream::StreamExt;
+
+/// Trying to read a non-existent blob should return a not found error.
+#[tokio::test]
+async fn not_found_read() {
+    let mut grpc_client = gen_blobsvc_grpc_client().await;
+
+    let resp = grpc_client
+        .read(ReadBlobRequest {
+            digest: BLOB_A_DIGEST.clone().into(),
+        })
+        .await;
+
+    // We can't use unwrap_err here, because the Ok value doesn't implement
+    // debug.
+    if let Err(e) = resp {
+        assert_eq!(e.code(), tonic::Code::NotFound);
+    } else {
+        panic!("resp is not err")
+    }
+}
+
+/// Trying to stat a non-existent blob should return a not found error.
+#[tokio::test]
+async fn not_found_stat() {
+    let mut grpc_client = gen_blobsvc_grpc_client().await;
+
+    let resp = grpc_client
+        .stat(StatBlobRequest {
+            digest: BLOB_A_DIGEST.clone().into(),
+            ..Default::default()
+        })
+        .await
+        .expect_err("must fail");
+
+    // The resp should be a status with Code::NotFound
+    assert_eq!(resp.code(), tonic::Code::NotFound);
+}
+
+/// Put a blob in the store, get it back.
+#[tokio::test]
+async fn put_read_stat() {
+    let mut grpc_client = gen_blobsvc_grpc_client().await;
+
+    // Send blob A.
+    let put_resp = grpc_client
+        .put(tokio_stream::once(BlobChunk {
+            data: BLOB_A.clone(),
+        }))
+        .await
+        .expect("must succeed")
+        .into_inner();
+
+    assert_eq!(BLOB_A_DIGEST.as_slice(), put_resp.digest);
+
+    // Stat for the digest of A.
+    // We currently don't ask for more granular chunking data, as we don't
+    // expose it yet.
+    let _resp = grpc_client
+        .stat(StatBlobRequest {
+            digest: BLOB_A_DIGEST.clone().into(),
+            ..Default::default()
+        })
+        .await
+        .expect("must succeed")
+        .into_inner();
+
+    // Read the blob. It should return the same data.
+    let resp = grpc_client
+        .read(ReadBlobRequest {
+            digest: BLOB_A_DIGEST.clone().into(),
+        })
+        .await;
+
+    let mut rx = resp.ok().unwrap().into_inner();
+
+    // the stream should contain one element, a BlobChunk with the same contents as BLOB_A.
+    let item = rx
+        .next()
+        .await
+        .expect("must be some")
+        .expect("must succeed");
+
+    assert_eq!(BLOB_A.clone(), item.data);
+
+    // … and no more elements
+    assert!(rx.next().await.is_none());
+
+    // TODO: we rely here on the blob being small enough to not get broken up into multiple chunks.
+    // Test with some bigger blob too
+}
diff --git a/tvix/castore/src/proto/tests/grpc_directoryservice.rs b/tvix/castore/src/proto/tests/grpc_directoryservice.rs
new file mode 100644
index 000000000000..1b522472be5b
--- /dev/null
+++ b/tvix/castore/src/proto/tests/grpc_directoryservice.rs
@@ -0,0 +1,246 @@
+use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
+use crate::proto::directory_service_client::DirectoryServiceClient;
+use crate::proto::get_directory_request::ByWhat;
+use crate::proto::GetDirectoryRequest;
+use crate::proto::{Directory, DirectoryNode, SymlinkNode};
+use crate::utils::gen_directorysvc_grpc_client;
+use tokio_stream::StreamExt;
+use tonic::transport::Channel;
+use tonic::Status;
+
+/// Send the specified GetDirectoryRequest.
+/// Returns an error in the case of an error response, or an error in one of
+// the items in the stream, or a Vec<Directory> in the case of a successful
+/// request.
+async fn get_directories(
+    grpc_client: &mut DirectoryServiceClient<Channel>,
+    get_directory_request: GetDirectoryRequest,
+) -> Result<Vec<Directory>, Status> {
+    let resp = grpc_client.get(get_directory_request).await;
+
+    // if the response is an error itself, return the error, otherwise unpack
+    let stream = match resp {
+        Ok(resp) => resp,
+        Err(status) => return Err(status),
+    }
+    .into_inner();
+
+    let directory_results: Vec<Result<Directory, Status>> = stream.collect().await;
+
+    // turn Vec<Result<Directory, Status> into Result<Vec<Directory>,Status>
+    directory_results.into_iter().collect()
+}
+
+/// Trying to get a non-existent Directory should return a not found error.
+#[tokio::test]
+async fn not_found() {
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
+
+    let resp = grpc_client
+        .get(GetDirectoryRequest {
+            by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())),
+            ..Default::default()
+        })
+        .await;
+
+    let stream = resp.expect("must succeed").into_inner();
+
+    let items: Vec<_> = stream.collect().await;
+
+    // The stream should contain one element, an error with Code::NotFound.
+    assert_eq!(1, items.len());
+    let item = items[0].clone();
+
+    assert!(item.is_err(), "must be err");
+    assert_eq!(
+        tonic::Code::NotFound,
+        item.unwrap_err().code(),
+        "must be err"
+    );
+}
+
+/// Put a Directory into the store, get it back.
+#[tokio::test]
+async fn put_get() {
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
+
+    // send directory A.
+    let put_resp = {
+        grpc_client
+            .put(tokio_stream::once(DIRECTORY_A.clone()))
+            .await
+            .expect("must succeed")
+            .into_inner()
+    };
+
+    // the sent root_digest should match the calculated digest
+    assert_eq!(put_resp.root_digest, DIRECTORY_A.digest().as_slice());
+
+    // get it back
+    let items = get_directories(
+        &mut grpc_client,
+        GetDirectoryRequest {
+            by_what: Some(ByWhat::Digest(DIRECTORY_A.digest().into())),
+            ..Default::default()
+        },
+    )
+    .await
+    .expect("must not error");
+
+    assert_eq!(vec![DIRECTORY_A.clone()], items);
+}
+
+/// Put multiple Directories into the store, and get them back
+#[tokio::test]
+async fn put_get_multiple() {
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
+
+    // sending "b" (which refers to "a") without sending "a" first should fail.
+    let put_resp = {
+        grpc_client
+            .put(tokio_stream::once(DIRECTORY_B.clone()))
+            .await
+            .expect_err("must fail")
+    };
+
+    assert_eq!(tonic::Code::InvalidArgument, put_resp.code());
+
+    // sending "a", then "b" should succeed, and the response should contain the digest of b.
+    let put_resp = {
+        grpc_client
+            .put(tokio_stream::iter(vec![
+                DIRECTORY_A.clone(),
+                DIRECTORY_B.clone(),
+            ]))
+            .await
+            .expect("must succeed")
+            .into_inner()
+    };
+
+    assert_eq!(DIRECTORY_B.digest().as_slice(), put_resp.root_digest);
+
+    // now, request b, first in non-recursive mode.
+    let items = get_directories(
+        &mut grpc_client,
+        GetDirectoryRequest {
+            recursive: false,
+            by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())),
+        },
+    )
+    .await
+    .expect("must not error");
+
+    // We expect to only get b.
+    assert_eq!(vec![DIRECTORY_B.clone()], items);
+
+    // now, request b, but in recursive mode.
+    let items = get_directories(
+        &mut grpc_client,
+        GetDirectoryRequest {
+            recursive: true,
+            by_what: Some(ByWhat::Digest(DIRECTORY_B.digest().into())),
+        },
+    )
+    .await
+    .expect("must not error");
+
+    // We expect to get b, and then a, because that's how we traverse down.
+    assert_eq!(vec![DIRECTORY_B.clone(), DIRECTORY_A.clone()], items);
+}
+
+/// Put multiple Directories into the store, and omit duplicates.
+#[tokio::test]
+async fn put_get_dedup() {
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
+
+    // Send "A", then "C", which refers to "A" two times
+    // Pretend we're a dumb client sending A twice.
+    let put_resp = {
+        grpc_client
+            .put(tokio_stream::iter(vec![
+                DIRECTORY_A.clone(),
+                DIRECTORY_A.clone(),
+                DIRECTORY_C.clone(),
+            ]))
+            .await
+            .expect("must succeed")
+    };
+
+    assert_eq!(
+        DIRECTORY_C.digest().as_slice(),
+        put_resp.into_inner().root_digest
+    );
+
+    // Ask for "C" recursively. We expect to only get "A" once, as there's no point sending it twice.
+    let items = get_directories(
+        &mut grpc_client,
+        GetDirectoryRequest {
+            recursive: true,
+            by_what: Some(ByWhat::Digest(DIRECTORY_C.digest().into())),
+        },
+    )
+    .await
+    .expect("must not error");
+
+    // We expect to get C, and then A (once, as the second A has been deduplicated).
+    assert_eq!(vec![DIRECTORY_C.clone(), DIRECTORY_A.clone()], items);
+}
+
+/// Trying to upload a Directory failing validation should fail.
+#[tokio::test]
+async fn put_reject_failed_validation() {
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
+
+    // construct a broken Directory message that fails validation
+    let broken_directory = Directory {
+        symlinks: vec![SymlinkNode {
+            name: "".into(),
+            target: "doesntmatter".into(),
+        }],
+        ..Default::default()
+    };
+    assert!(broken_directory.validate().is_err());
+
+    // send it over, it must fail
+    let put_resp = {
+        grpc_client
+            .put(tokio_stream::once(broken_directory))
+            .await
+            .expect_err("must fail")
+    };
+
+    assert_eq!(put_resp.code(), tonic::Code::InvalidArgument);
+}
+
+/// Trying to upload a Directory with wrong size should fail.
+#[tokio::test]
+async fn put_reject_wrong_size() {
+    let mut grpc_client = gen_directorysvc_grpc_client().await;
+
+    // Construct a directory referring to DIRECTORY_A, but with wrong size.
+    let broken_parent_directory = Directory {
+        directories: vec![DirectoryNode {
+            name: "foo".into(),
+            digest: DIRECTORY_A.digest().into(),
+            size: 42,
+        }],
+        ..Default::default()
+    };
+    // Make sure we got the size wrong.
+    assert_ne!(
+        broken_parent_directory.directories[0].size,
+        DIRECTORY_A.size()
+    );
+
+    // now upload both (first A, then the broken parent). This must fail.
+    let put_resp = {
+        grpc_client
+            .put(tokio_stream::iter(vec![
+                DIRECTORY_A.clone(),
+                broken_parent_directory,
+            ]))
+            .await
+            .expect_err("must fail")
+    };
+    assert_eq!(put_resp.code(), tonic::Code::InvalidArgument);
+}
diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs
new file mode 100644
index 000000000000..8b62fadeb5a6
--- /dev/null
+++ b/tvix/castore/src/proto/tests/mod.rs
@@ -0,0 +1,4 @@
+mod directory;
+mod directory_nodes_iterator;
+mod grpc_blobservice;
+mod grpc_directoryservice;
diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs
new file mode 100644
index 000000000000..99e993f36da3
--- /dev/null
+++ b/tvix/castore/src/tests/import.rs
@@ -0,0 +1,126 @@
+use crate::blobservice::BlobService;
+use crate::fixtures::*;
+use crate::import::ingest_path;
+use crate::proto;
+use crate::utils::{gen_blob_service, gen_directory_service};
+use std::sync::Arc;
+use tempfile::TempDir;
+
+#[cfg(target_family = "unix")]
+use std::os::unix::ffi::OsStrExt;
+
+#[cfg(target_family = "unix")]
+#[tokio::test]
+async fn symlink() {
+    let blob_service = gen_blob_service();
+    let directory_service = gen_directory_service();
+
+    let tmpdir = TempDir::new().unwrap();
+
+    std::fs::create_dir_all(&tmpdir).unwrap();
+    std::os::unix::fs::symlink(
+        "/nix/store/somewhereelse",
+        tmpdir.path().join("doesntmatter"),
+    )
+    .unwrap();
+
+    let root_node = ingest_path(
+        Arc::from(blob_service),
+        directory_service,
+        tmpdir.path().join("doesntmatter"),
+    )
+    .await
+    .expect("must succeed");
+
+    assert_eq!(
+        proto::node::Node::Symlink(proto::SymlinkNode {
+            name: "doesntmatter".into(),
+            target: "/nix/store/somewhereelse".into(),
+        }),
+        root_node,
+    )
+}
+
+#[tokio::test]
+async fn single_file() {
+    let blob_service: Arc<dyn BlobService> = gen_blob_service().into();
+    let directory_service = gen_directory_service();
+
+    let tmpdir = TempDir::new().unwrap();
+
+    std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap();
+
+    let root_node = ingest_path(
+        blob_service.clone(),
+        directory_service,
+        tmpdir.path().join("root"),
+    )
+    .await
+    .expect("must succeed");
+
+    assert_eq!(
+        proto::node::Node::File(proto::FileNode {
+            name: "root".into(),
+            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
+            executable: false,
+        }),
+        root_node,
+    );
+
+    // ensure the blob has been uploaded
+    assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap());
+}
+
+#[cfg(target_family = "unix")]
+#[tokio::test]
+async fn complicated() {
+    let blob_service: Arc<dyn BlobService> = gen_blob_service().into();
+    let directory_service = gen_directory_service();
+
+    let tmpdir = TempDir::new().unwrap();
+
+    // File ``.keep`
+    std::fs::write(tmpdir.path().join(".keep"), vec![]).unwrap();
+    // Symlink `aa`
+    std::os::unix::fs::symlink("/nix/store/somewhereelse", tmpdir.path().join("aa")).unwrap();
+    // Directory `keep`
+    std::fs::create_dir(tmpdir.path().join("keep")).unwrap();
+    // File ``keep/.keep`
+    std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap();
+
+    let root_node = ingest_path(blob_service.clone(), &directory_service, tmpdir.path())
+        .await
+        .expect("must succeed");
+
+    // ensure root_node matched expectations
+    assert_eq!(
+        proto::node::Node::Directory(proto::DirectoryNode {
+            name: tmpdir
+                .path()
+                .file_name()
+                .unwrap()
+                .as_bytes()
+                .to_owned()
+                .into(),
+            digest: DIRECTORY_COMPLICATED.digest().into(),
+            size: DIRECTORY_COMPLICATED.size(),
+        }),
+        root_node,
+    );
+
+    // ensure DIRECTORY_WITH_KEEP and DIRECTORY_COMPLICATED have been uploaded
+    assert!(directory_service
+        .get(&DIRECTORY_WITH_KEEP.digest())
+        .await
+        .unwrap()
+        .is_some());
+    assert!(directory_service
+        .get(&DIRECTORY_COMPLICATED.digest())
+        .await
+        .unwrap()
+        .is_some());
+
+    // ensure EMPTY_BLOB_CONTENTS has been uploaded
+    assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
+}
diff --git a/tvix/castore/src/tests/mod.rs b/tvix/castore/src/tests/mod.rs
new file mode 100644
index 000000000000..d016f3e0aa55
--- /dev/null
+++ b/tvix/castore/src/tests/mod.rs
@@ -0,0 +1 @@
+mod import;
diff --git a/tvix/castore/src/tonic.rs b/tvix/castore/src/tonic.rs
new file mode 100644
index 000000000000..8ea417548ce9
--- /dev/null
+++ b/tvix/castore/src/tonic.rs
@@ -0,0 +1,118 @@
+use tokio::net::UnixStream;
+use tonic::transport::{Channel, Endpoint};
+
+fn url_wants_wait_connect(url: &url::Url) -> bool {
+    url.query_pairs()
+        .filter(|(k, v)| k == "wait-connect" && v == "1")
+        .count()
+        > 0
+}
+
+/// Turn a [url::Url] to a [Channel] if it can be parsed successfully.
+/// It supports the following schemes (and URLs):
+///  - `grpc+http://[::1]:8000`, connecting over unencrypted HTTP/2 (h2c)
+///  - `grpc+https://[::1]:8000`, connecting over encrypted HTTP/2
+///  - `grpc+unix:/path/to/socket`, connecting to a unix domain socket
+///
+/// All URLs support adding `wait-connect=1` as a URL parameter, in which case
+/// the connection is established lazily.
+pub async fn channel_from_url(url: &url::Url) -> Result<Channel, self::Error> {
+    match url.scheme() {
+        "grpc+unix" => {
+            if url.host_str().is_some() {
+                return Err(Error::HostSetForUnixSocket());
+            }
+
+            let connector = tower::service_fn({
+                let url = url.clone();
+                move |_: tonic::transport::Uri| UnixStream::connect(url.path().to_string().clone())
+            });
+
+            // the URL doesn't matter
+            let endpoint = Endpoint::from_static("http://[::]:50051");
+            if url_wants_wait_connect(url) {
+                Ok(endpoint.connect_with_connector(connector).await?)
+            } else {
+                Ok(endpoint.connect_with_connector_lazy(connector))
+            }
+        }
+        _ => {
+            // ensure path is empty, not supported with gRPC.
+            if !url.path().is_empty() {
+                return Err(Error::PathMayNotBeSet());
+            }
+
+            // Stringify the URL and remove the grpc+ prefix.
+            // We can't use `url.set_scheme(rest)`, as it disallows
+            // setting something http(s) that previously wasn't.
+            let unprefixed_url_str = match url.to_string().strip_prefix("grpc+") {
+                None => return Err(Error::MissingGRPCPrefix()),
+                Some(url_str) => url_str.to_owned(),
+            };
+
+            // Use the regular tonic transport::Endpoint logic, but unprefixed_url_str,
+            // as tonic doesn't know about grpc+http[s].
+            let endpoint = Endpoint::try_from(unprefixed_url_str)?;
+            if url_wants_wait_connect(url) {
+                Ok(endpoint.connect().await?)
+            } else {
+                Ok(endpoint.connect_lazy())
+            }
+        }
+    }
+}
+
+/// Errors occuring when trying to connect to a backend
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error("grpc+ prefix is missing from URL")]
+    MissingGRPCPrefix(),
+
+    #[error("host may not be set for unix domain sockets")]
+    HostSetForUnixSocket(),
+
+    #[error("path may not be set")]
+    PathMayNotBeSet(),
+
+    #[error("transport error: {0}")]
+    TransportError(tonic::transport::Error),
+}
+
+impl From<tonic::transport::Error> for Error {
+    fn from(value: tonic::transport::Error) -> Self {
+        Self::TransportError(value)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::channel_from_url;
+    use test_case::test_case;
+    use url::Url;
+
+    /// Correct scheme to connect to a unix socket.
+    #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")]
+    /// Connecting with wait-connect set to 0 succeeds, as that's the default.
+    #[test_case("grpc+unix:///path/to/somewhere?wait-connect=0", true; "grpc valid unix wait-connect=0")]
+    /// Connecting with wait-connect set to 1 fails, as the path doesn't exist.
+    #[test_case("grpc+unix:///path/to/somewhere?wait-connect=1", false; "grpc valid unix wait-connect=1")]
+    /// Correct scheme for unix socket, but setting a host too, which is invalid.
+    #[test_case("grpc+unix://host.example/path/to/somewhere", false; "grpc invalid unix socket and host")]
+    /// Correct scheme to connect to localhost, with port 12345
+    #[test_case("grpc+http://[::1]:12345", true; "grpc valid IPv6 localhost port 12345")]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[test_case("grpc+http://localhost", true; "grpc valid http host without port")]
+    /// Correct scheme to connect to localhost over http, without specifying a port.
+    #[test_case("grpc+https://localhost", true; "grpc valid https host without port")]
+    /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
+    #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")]
+    /// Connecting with wait-connect set to 0 succeeds, as that's the default.
+    #[test_case("grpc+http://localhost?wait-connect=0", true; "grpc valid host wait-connect=0")]
+    /// Connecting with wait-connect set to 1 fails, as the host doesn't exist.
+    #[test_case("grpc+http://nonexist.invalid?wait-connect=1", false; "grpc valid host wait-connect=1")]
+    #[tokio::test]
+    async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) {
+        let url = Url::parse(uri_str).expect("must parse");
+        assert_eq!(channel_from_url(&url).await.is_ok(), is_ok)
+    }
+}
diff --git a/tvix/castore/src/utils.rs b/tvix/castore/src/utils.rs
new file mode 100644
index 000000000000..ca68f9f26c5a
--- /dev/null
+++ b/tvix/castore/src/utils.rs
@@ -0,0 +1,89 @@
+//! A crate containing constructors to provide instances of a BlobService and
+//! DirectoryService. Only used for testing purposes, but across crates.
+//! Should be removed once we have a better concept of a "Service registry".
+use tonic::transport::{Channel, Endpoint, Server, Uri};
+
+use crate::{
+    blobservice::{BlobService, MemoryBlobService},
+    directoryservice::{DirectoryService, MemoryDirectoryService},
+    proto::{
+        blob_service_client::BlobServiceClient, blob_service_server::BlobServiceServer,
+        directory_service_client::DirectoryServiceClient,
+        directory_service_server::DirectoryServiceServer, GRPCBlobServiceWrapper,
+        GRPCDirectoryServiceWrapper,
+    },
+};
+
+pub fn gen_blob_service() -> Box<dyn BlobService> {
+    Box::<MemoryBlobService>::default()
+}
+
+pub fn gen_directory_service() -> Box<dyn DirectoryService> {
+    Box::<MemoryDirectoryService>::default()
+}
+
+/// This will spawn the a gRPC server with a DirectoryService client, connect a
+/// gRPC DirectoryService client and return it.
+#[allow(dead_code)]
+pub(crate) async fn gen_directorysvc_grpc_client() -> DirectoryServiceClient<Channel> {
+    let (left, right) = tokio::io::duplex(64);
+
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn(async {
+        // spin up a new DirectoryService
+        let mut server = Server::builder();
+        let router = server.add_service(DirectoryServiceServer::new(
+            GRPCDirectoryServiceWrapper::new(gen_directory_service()),
+        ));
+
+        router
+            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+            .await
+    });
+
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+    DirectoryServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
+    )
+}
+
+/// This will spawn the a gRPC server with a BlobService client, connect a
+/// gRPC BlobService client and return it.
+#[allow(dead_code)]
+pub(crate) async fn gen_blobsvc_grpc_client() -> BlobServiceClient<Channel> {
+    let (left, right) = tokio::io::duplex(64);
+
+    // spin up a server, which will only connect once, to the left side.
+    tokio::spawn(async {
+        // spin up a new DirectoryService
+        let mut server = Server::builder();
+        let router = server.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
+            gen_blob_service(),
+        )));
+
+        router
+            .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left)))
+            .await
+    });
+
+    // Create a client, connecting to the right side. The URI is unused.
+    let mut maybe_right = Some(right);
+    BlobServiceClient::new(
+        Endpoint::try_from("http://[::]:50051")
+            .unwrap()
+            .connect_with_connector(tower::service_fn(move |_: Uri| {
+                let right = maybe_right.take().unwrap();
+                async move { Ok::<_, std::io::Error>(right) }
+            }))
+            .await
+            .unwrap(),
+    )
+}