about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/Cargo.toml154
-rw-r--r--tvix/castore/build.rs4
-rw-r--r--tvix/castore/default.nix2
-rw-r--r--tvix/castore/protos/default.nix18
-rw-r--r--tvix/castore/src/blobservice/chunked_reader.rs31
-rw-r--r--tvix/castore/src/blobservice/combinator.rs11
-rw-r--r--tvix/castore/src/blobservice/from_addr.rs2
-rw-r--r--tvix/castore/src/blobservice/grpc.rs28
-rw-r--r--tvix/castore/src/blobservice/memory.rs14
-rw-r--r--tvix/castore/src/blobservice/mod.rs24
-rw-r--r--tvix/castore/src/blobservice/object_store.rs13
-rw-r--r--tvix/castore/src/blobservice/tests/utils.rs23
-rw-r--r--tvix/castore/src/composition.rs222
-rw-r--r--tvix/castore/src/digests.rs2
-rw-r--r--tvix/castore/src/directoryservice/bigtable.rs49
-rw-r--r--tvix/castore/src/directoryservice/combinators.rs42
-rw-r--r--tvix/castore/src/directoryservice/directory_graph.rs119
-rw-r--r--tvix/castore/src/directoryservice/from_addr.rs37
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs63
-rw-r--r--tvix/castore/src/directoryservice/memory.rs50
-rw-r--r--tvix/castore/src/directoryservice/mod.rs54
-rw-r--r--tvix/castore/src/directoryservice/object_store.rs52
-rw-r--r--tvix/castore/src/directoryservice/order_validator.rs37
-rw-r--r--tvix/castore/src/directoryservice/redb.rs68
-rw-r--r--tvix/castore/src/directoryservice/simple_putter.rs5
-rw-r--r--tvix/castore/src/directoryservice/sled.rs269
-rw-r--r--tvix/castore/src/directoryservice/tests/mod.rs63
-rw-r--r--tvix/castore/src/directoryservice/tests/utils.rs1
-rw-r--r--tvix/castore/src/directoryservice/traverse.rs56
-rw-r--r--tvix/castore/src/directoryservice/utils.rs34
-rw-r--r--tvix/castore/src/errors.rs52
-rw-r--r--tvix/castore/src/fixtures.rs189
-rw-r--r--tvix/castore/src/fs/fuse/tests.rs115
-rw-r--r--tvix/castore/src/fs/inodes.rs33
-rw-r--r--tvix/castore/src/fs/mod.rs111
-rw-r--r--tvix/castore/src/fs/root_nodes.rs22
-rw-r--r--tvix/castore/src/import/archive.rs67
-rw-r--r--tvix/castore/src/import/blobs.rs13
-rw-r--r--tvix/castore/src/import/fs.rs72
-rw-r--r--tvix/castore/src/import/mod.rs90
-rw-r--r--tvix/castore/src/lib.rs8
-rw-r--r--tvix/castore/src/nodes/directory.rs287
-rw-r--r--tvix/castore/src/nodes/mod.rs48
-rw-r--r--tvix/castore/src/nodes/symlink_target.rs223
-rw-r--r--tvix/castore/src/path/component.rs268
-rw-r--r--tvix/castore/src/path/mod.rs (renamed from tvix/castore/src/path.rs)48
-rw-r--r--tvix/castore/src/proto/grpc_directoryservice_wrapper.rs21
-rw-r--r--tvix/castore/src/proto/mod.rs586
-rw-r--r--tvix/castore/src/proto/tests/directory.rs200
-rw-r--r--tvix/castore/src/proto/tests/directory_nodes_iterator.rs78
-rw-r--r--tvix/castore/src/proto/tests/mod.rs47
-rw-r--r--tvix/castore/src/refscan.rs354
-rw-r--r--tvix/castore/src/tests/import.rs49
53 files changed, 2625 insertions, 1903 deletions
diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml
index ded2292db750..aa44e2e8ee4b 100644
--- a/tvix/castore/Cargo.toml
+++ b/tvix/castore/Cargo.toml
@@ -4,101 +4,70 @@ version = "0.1.0"
 edition = "2021"
 
 [dependencies]
-async-compression = { version = "0.4.9", features = ["tokio", "zstd"]}
-async-stream = "0.3.5"
-async-tempfile = "0.4.0"
-blake3 = { version = "1.3.1", features = ["rayon", "std", "traits-preview"] }
-bstr = "1.6.0"
-bytes = "1.4.0"
-data-encoding = "2.6.0"
-digest = "0.10.7"
-fastcdc = { version = "3.1.0", features = ["tokio"] }
-futures = "0.3.30"
-lazy_static = "1.4.0"
-object_store = { version = "0.10.1", features = ["http"] }
-parking_lot = "0.12.1"
-pin-project-lite = "0.2.13"
-prost = "0.13.1"
-sled = { version = "0.34.7" }
-thiserror = "1.0.38"
-tokio-stream = { version = "0.1.14", features = ["fs", "net"] }
-tokio-util = { version = "0.7.9", features = ["io", "io-util", "codec"] }
-tokio-tar = "0.3.1"
-tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
-tonic = "0.12.0"
-tower = "0.4.13"
-tracing = "0.1.37"
-tracing-indicatif = "0.3.6"
+async-compression = { workspace = true, features = ["tokio", "zstd"] }
+async-stream = { workspace = true }
+async-tempfile = { workspace = true }
+blake3 = { workspace = true, features = ["rayon", "std", "traits-preview"] }
+bstr = { workspace = true }
+bytes = { workspace = true }
+data-encoding = { workspace = true }
+digest = { workspace = true }
+fastcdc = { workspace = true, features = ["tokio"] }
+futures = { workspace = true }
+object_store = { workspace = true, features = ["http"] }
+parking_lot = { workspace = true }
+pin-project-lite = { workspace = true }
+prost = { workspace = true }
+thiserror = { workspace = true }
+tokio-stream = { workspace = true, features = ["fs", "net"] }
+tokio-util = { workspace = true, features = ["io", "io-util", "codec"] }
+tokio-tar = { workspace = true }
+tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
+tonic = { workspace = true }
+tower = { workspace = true }
+tracing = { workspace = true }
+tracing-indicatif = { workspace = true }
 tvix-tracing = { path = "../tracing", features = ["tonic"] }
-url = "2.4.0"
-walkdir = "2.4.0"
-zstd = "0.13.0"
-serde = { version = "1.0.197", features = [ "derive" ] }
-serde_with = "3.7.0"
-serde_qs = "0.12.0"
-petgraph = "0.6.4"
-erased-serde = "0.4.5"
-serde_tagged = "0.3.0"
-hyper-util = "0.1.6"
-redb = "2.1.1"
-
-[dependencies.bigtable_rs]
-optional = true
-version = "0.2.10"
-
-[dependencies.fuse-backend-rs]
-optional = true
-version = "0.11.0"
-
-[dependencies.libc]
-optional = true
-version = "0.2.144"
-
-[dependencies.threadpool]
-version = "1.8.1"
-optional = true
-
-[dependencies.tonic-reflection]
-optional = true
-version = "0.12.0"
-
-[dependencies.vhost]
-optional = true
-version = "0.6"
-
-[dependencies.vhost-user-backend]
-optional = true
-version = "0.8"
-
-[dependencies.virtio-queue]
-optional = true
-version = "0.7"
-
-[dependencies.vm-memory]
-optional = true
-version = "0.10"
-
-[dependencies.vmm-sys-util]
-optional = true
-version = "0.11"
-
-[dependencies.virtio-bindings]
-optional = true
-version = "0.2.1"
+url = { workspace = true }
+walkdir = { workspace = true }
+zstd = { workspace = true }
+serde = { workspace = true, features = ["derive"] }
+serde_with = { workspace = true }
+serde_qs = { workspace = true }
+petgraph = { workspace = true }
+pin-project = { workspace = true }
+erased-serde = { workspace = true }
+serde_tagged = { workspace = true }
+hyper-util = { workspace = true }
+redb = { workspace = true, features = ["logging"] }
+bigtable_rs = { workspace = true, optional = true }
+fuse-backend-rs = { workspace = true, optional = true }
+libc = { workspace = true, optional = true }
+threadpool = { workspace = true, optional = true }
+tonic-reflection = { workspace = true, optional = true }
+vhost = { workspace = true, optional = true }
+vhost-user-backend = { workspace = true, optional = true }
+virtio-queue = { workspace = true, optional = true }
+vm-memory = { workspace = true, optional = true }
+vmm-sys-util = { workspace = true, optional = true }
+virtio-bindings = { workspace = true, optional = true }
+wu-manber = { workspace = true }
+auto_impl = "1.2.0"
 
 [build-dependencies]
-prost-build = "0.13.1"
-tonic-build = "0.12.0"
+prost-build = { workspace = true }
+tonic-build = { workspace = true }
 
 [dev-dependencies]
-async-process = "2.1.0"
-rstest = "0.19.0"
-tempfile = "3.3.0"
-tokio-retry = "0.3.0"
-hex-literal = "0.4.1"
-rstest_reuse = "0.6.0"
-xattr = "1.3.1"
-serde_json = "*"
+async-process = { workspace = true }
+rstest = { workspace = true }
+tempfile = { workspace = true }
+tokio-retry = { workspace = true }
+hex-literal = { workspace = true }
+rstest_reuse = { workspace = true }
+xattr = { workspace = true }
+serde_json = { workspace = true }
+tokio-test = { workspace = true }
 
 [features]
 default = ["cloud"]
@@ -122,6 +91,11 @@ virtiofs = [
 ]
 fuse = ["fs"]
 tonic-reflection = ["dep:tonic-reflection"]
+# It's already possible for other crates to build a
+# fully fledged store composition system based on castore composition.
+# However, this feature enables anonymous url syntax which might
+# inherently expose arbitrary composition possibilities to the user.
+xp-store-composition = []
 # Whether to run the integration tests.
 # Requires the following packages in $PATH:
 # cbtemulator, google-cloud-bigtable-tool
diff --git a/tvix/castore/build.rs b/tvix/castore/build.rs
index 98e2ab348528..2250d4ebf0a2 100644
--- a/tvix/castore/build.rs
+++ b/tvix/castore/build.rs
@@ -18,7 +18,7 @@ fn main() -> Result<()> {
         .emit_rerun_if_changed(false)
         .bytes(["."])
         .type_attribute(".", "#[derive(Eq, Hash)]")
-        .compile(
+        .compile_protos(
             &[
                 "tvix/castore/protos/castore.proto",
                 "tvix/castore/protos/rpc_blobstore.proto",
@@ -26,7 +26,7 @@ fn main() -> Result<()> {
             ],
             // If we are in running `cargo build` manually, using `../..` works fine,
             // but in case we run inside a nix build, we need to instead point PROTO_ROOT
-            // to a sparseTree containing that structure.
+            // to a custom tree containing that structure.
             &[match std::env::var_os("PROTO_ROOT") {
                 Some(proto_root) => proto_root.to_str().unwrap().to_owned(),
                 None => "../..".to_string(),
diff --git a/tvix/castore/default.nix b/tvix/castore/default.nix
index 9c210884f6e3..5314da9e0333 100644
--- a/tvix/castore/default.nix
+++ b/tvix/castore/default.nix
@@ -9,7 +9,7 @@
   meta.ci.targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru);
   passthru = (depot.tvix.utils.mkFeaturePowerset {
     inherit (old) crateName;
-    features = ([ "cloud" "fuse" "tonic-reflection" ]
+    features = ([ "cloud" "fuse" "tonic-reflection" "xp-store-composition" ]
       # virtiofs feature currently fails to build on Darwin
       ++ lib.optional pkgs.stdenv.isLinux "virtiofs");
     override.testPreRun = ''
diff --git a/tvix/castore/protos/default.nix b/tvix/castore/protos/default.nix
index feef55690fb9..08bb8fcfeef1 100644
--- a/tvix/castore/protos/default.nix
+++ b/tvix/castore/protos/default.nix
@@ -1,16 +1,10 @@
-{ depot, pkgs, ... }:
+{ depot, pkgs, lib, ... }:
 let
-  protos = depot.nix.sparseTree {
-    name = "castore-protos";
-    root = depot.path.origSrc;
-    paths = [
-      ./castore.proto
-      ./rpc_blobstore.proto
-      ./rpc_directory.proto
-      ../../../buf.yaml
-      ../../../buf.gen.yaml
-    ];
-  };
+  protos = lib.sourceByRegex depot.path.origSrc [
+    "buf.yaml"
+    "buf.gen.yaml"
+    "^tvix(/castore(/protos(/.*\.proto)?)?)?$"
+  ];
 in
 depot.nix.readTree.drvTargets {
   inherit protos;
diff --git a/tvix/castore/src/blobservice/chunked_reader.rs b/tvix/castore/src/blobservice/chunked_reader.rs
index 6e8355874bca..0e809b300485 100644
--- a/tvix/castore/src/blobservice/chunked_reader.rs
+++ b/tvix/castore/src/blobservice/chunked_reader.rs
@@ -253,14 +253,16 @@ where
 
 #[cfg(test)]
 mod test {
-    use std::{io::SeekFrom, sync::Arc};
+    use std::{
+        io::SeekFrom,
+        sync::{Arc, LazyLock},
+    };
 
     use crate::{
         blobservice::{chunked_reader::ChunkedReader, BlobService, MemoryBlobService},
         B3Digest,
     };
     use hex_literal::hex;
-    use lazy_static::lazy_static;
     use tokio::io::{AsyncReadExt, AsyncSeekExt};
 
     const CHUNK_1: [u8; 2] = hex!("0001");
@@ -269,21 +271,26 @@ mod test {
     const CHUNK_4: [u8; 2] = hex!("0708");
     const CHUNK_5: [u8; 7] = hex!("090a0b0c0d0e0f");
 
-    lazy_static! {
-        // `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]`
-        pub static ref CHUNK_1_DIGEST: B3Digest = blake3::hash(&CHUNK_1).as_bytes().into();
-        pub static ref CHUNK_2_DIGEST: B3Digest = blake3::hash(&CHUNK_2).as_bytes().into();
-        pub static ref CHUNK_3_DIGEST: B3Digest = blake3::hash(&CHUNK_3).as_bytes().into();
-        pub static ref CHUNK_4_DIGEST: B3Digest = blake3::hash(&CHUNK_4).as_bytes().into();
-        pub static ref CHUNK_5_DIGEST: B3Digest = blake3::hash(&CHUNK_5).as_bytes().into();
-        pub static ref BLOB_1_LIST: [(B3Digest, u64); 5] = [
+    // `[ 0 1 ] [ 2 3 4 5 ] [ 6 ] [ 7 8 ] [ 9 10 11 12 13 14 15 ]`
+    pub static CHUNK_1_DIGEST: LazyLock<B3Digest> =
+        LazyLock::new(|| blake3::hash(&CHUNK_1).as_bytes().into());
+    pub static CHUNK_2_DIGEST: LazyLock<B3Digest> =
+        LazyLock::new(|| blake3::hash(&CHUNK_2).as_bytes().into());
+    pub static CHUNK_3_DIGEST: LazyLock<B3Digest> =
+        LazyLock::new(|| blake3::hash(&CHUNK_3).as_bytes().into());
+    pub static CHUNK_4_DIGEST: LazyLock<B3Digest> =
+        LazyLock::new(|| blake3::hash(&CHUNK_4).as_bytes().into());
+    pub static CHUNK_5_DIGEST: LazyLock<B3Digest> =
+        LazyLock::new(|| blake3::hash(&CHUNK_5).as_bytes().into());
+    pub static BLOB_1_LIST: LazyLock<[(B3Digest, u64); 5]> = LazyLock::new(|| {
+        [
             (CHUNK_1_DIGEST.clone(), 2),
             (CHUNK_2_DIGEST.clone(), 4),
             (CHUNK_3_DIGEST.clone(), 1),
             (CHUNK_4_DIGEST.clone(), 2),
             (CHUNK_5_DIGEST.clone(), 7),
-        ];
-    }
+        ]
+    });
 
     use super::ChunkedBlob;
 
diff --git a/tvix/castore/src/blobservice/combinator.rs b/tvix/castore/src/blobservice/combinator.rs
index 6a964c8a8440..e5f61d6d0c2a 100644
--- a/tvix/castore/src/blobservice/combinator.rs
+++ b/tvix/castore/src/blobservice/combinator.rs
@@ -16,6 +16,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
 /// blobservice again, before falling back to the remote one.
 /// The remote BlobService is never written to.
 pub struct CombinedBlobService<BL, BR> {
+    instance_name: String,
     local: BL,
     remote: BR,
 }
@@ -27,6 +28,7 @@ where
 {
     fn clone(&self) -> Self {
         Self {
+            instance_name: self.instance_name.clone(),
             local: self.local.clone(),
             remote: self.remote.clone(),
         }
@@ -39,12 +41,12 @@ where
     BL: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
     BR: AsRef<dyn BlobService> + Clone + Send + Sync + 'static,
 {
-    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn has(&self, digest: &B3Digest) -> std::io::Result<bool> {
         Ok(self.local.as_ref().has(digest).await? || self.remote.as_ref().has(digest).await?)
     }
 
-    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
     async fn open_read(&self, digest: &B3Digest) -> std::io::Result<Option<Box<dyn BlobReader>>> {
         if self.local.as_ref().has(digest).await? {
             // local store has the blob, so we can assume it also has all chunks.
@@ -84,7 +86,7 @@ where
         }
     }
 
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     async fn open_write(&self) -> Box<dyn BlobWriter> {
         // direct writes to the local one.
         self.local.as_ref().open_write().await
@@ -113,7 +115,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig {
     type Output = dyn BlobService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         context: &CompositionContext,
     ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync>> {
         let (local, remote) = futures::join!(
@@ -121,6 +123,7 @@ impl ServiceBuilder for CombinedBlobServiceConfig {
             context.resolve(self.remote.clone())
         );
         Ok(Arc::new(CombinedBlobService {
+            instance_name: instance_name.to_string(),
             local: local?,
             remote: remote?,
         }))
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs
index c5cabaa9d945..803e7fa6a575 100644
--- a/tvix/castore/src/blobservice/from_addr.rs
+++ b/tvix/castore/src/blobservice/from_addr.rs
@@ -27,7 +27,7 @@ pub async fn from_addr(
     })?
     .0;
     let blob_service = blob_service_config
-        .build("anonymous", &CompositionContext::blank())
+        .build("anonymous", &CompositionContext::blank(&REG))
         .await?;
 
     Ok(blob_service)
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 0db3dfea4ad8..3453657d07ab 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -24,6 +24,7 @@ use tracing::{instrument, Instrument as _};
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
 pub struct GRPCBlobService<T> {
+    instance_name: String,
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
     grpc_client: proto::blob_service_client::BlobServiceClient<T>,
@@ -31,8 +32,14 @@ pub struct GRPCBlobService<T> {
 
 impl<T> GRPCBlobService<T> {
     /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient].
-    pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self {
-        Self { grpc_client }
+    pub fn from_client(
+        instance_name: String,
+        grpc_client: proto::blob_service_client::BlobServiceClient<T>,
+    ) -> Self {
+        Self {
+            instance_name,
+            grpc_client,
+        }
     }
 }
 
@@ -44,7 +51,7 @@ where
     <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
     T::Future: Send,
 {
-    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
         match self
             .grpc_client
@@ -61,7 +68,7 @@ where
         }
     }
 
-    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
     async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
         // First try to get a list of chunks. In case there's only one chunk returned,
         // buffer its data into a Vec, otherwise use a ChunkedReader.
@@ -124,7 +131,7 @@ where
 
     /// Returns a BlobWriter, that'll internally wrap each write in a
     /// [proto::BlobChunk], which is send to the gRPC server.
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     async fn open_write(&self) -> Box<dyn BlobWriter> {
         // set up an mpsc channel passing around Bytes.
         let (tx, rx) = tokio::sync::mpsc::channel::<bytes::Bytes>(10);
@@ -154,7 +161,7 @@ where
         })
     }
 
-    #[instrument(skip(self, digest), fields(blob.digest=%digest), err)]
+    #[instrument(skip(self, digest), fields(blob.digest=%digest, instance_name=%self.instance_name), err)]
     async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
         let resp = self
             .grpc_client
@@ -205,13 +212,16 @@ impl ServiceBuilder for GRPCBlobServiceConfig {
     type Output = dyn BlobService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         let client = proto::blob_service_client::BlobServiceClient::new(
             crate::tonic::channel_from_url(&self.url.parse()?).await?,
         );
-        Ok(Arc::new(GRPCBlobService::from_client(client)))
+        Ok(Arc::new(GRPCBlobService::from_client(
+            instance_name.to_string(),
+            client,
+        )))
     }
 }
 
@@ -375,7 +385,7 @@ mod tests {
                     .await
                     .expect("must succeed"),
             );
-            GRPCBlobService::from_client(client)
+            GRPCBlobService::from_client("default".into(), client)
         };
 
         let has = grpc_client
diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs
index 3d733f950470..348b8bb56d5b 100644
--- a/tvix/castore/src/blobservice/memory.rs
+++ b/tvix/castore/src/blobservice/memory.rs
@@ -11,18 +11,19 @@ use crate::{B3Digest, Error};
 
 #[derive(Clone, Default)]
 pub struct MemoryBlobService {
+    instance_name: String,
     db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
 }
 
 #[async_trait]
 impl BlobService for MemoryBlobService {
-    #[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
+    #[instrument(skip_all, ret, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
         let db = self.db.read();
         Ok(db.contains_key(digest))
     }
 
-    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
         let db = self.db.read();
 
@@ -32,7 +33,7 @@ impl BlobService for MemoryBlobService {
         }
     }
 
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     async fn open_write(&self) -> Box<dyn BlobWriter> {
         Box::new(MemoryBlobWriter::new(self.db.clone()))
     }
@@ -58,10 +59,13 @@ impl ServiceBuilder for MemoryBlobServiceConfig {
     type Output = dyn BlobService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
-        Ok(Arc::new(MemoryBlobService::default()))
+        Ok(Arc::new(MemoryBlobService {
+            instance_name: instance_name.to_string(),
+            db: Default::default(),
+        }))
     }
 }
 
diff --git a/tvix/castore/src/blobservice/mod.rs b/tvix/castore/src/blobservice/mod.rs
index 85292722fa7e..efba927b586b 100644
--- a/tvix/castore/src/blobservice/mod.rs
+++ b/tvix/castore/src/blobservice/mod.rs
@@ -1,5 +1,6 @@
 use std::io;
 
+use auto_impl::auto_impl;
 use tonic::async_trait;
 
 use crate::composition::{Registry, ServiceBuilder};
@@ -29,6 +30,7 @@ pub use self::object_store::{ObjectStoreBlobService, ObjectStoreBlobServiceConfi
 /// which will implement a writer interface, and also provides a close funtion,
 /// to finalize a blob and get its digest.
 #[async_trait]
+#[auto_impl(&, &mut, Arc, Box)]
 pub trait BlobService: Send + Sync {
     /// Check if the service has the blob, by its content hash.
     /// On implementations returning chunks, this must also work for chunks.
@@ -60,28 +62,6 @@ pub trait BlobService: Send + Sync {
     }
 }
 
-#[async_trait]
-impl<A> BlobService for A
-where
-    A: AsRef<dyn BlobService> + Send + Sync,
-{
-    async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
-        self.as_ref().has(digest).await
-    }
-
-    async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
-        self.as_ref().open_read(digest).await
-    }
-
-    async fn open_write(&self) -> Box<dyn BlobWriter> {
-        self.as_ref().open_write().await
-    }
-
-    async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
-        self.as_ref().chunks(digest).await
-    }
-}
-
 /// A [tokio::io::AsyncWrite] that the user needs to close() afterwards for persist.
 /// On success, it returns the digest of the written blob.
 #[async_trait]
diff --git a/tvix/castore/src/blobservice/object_store.rs b/tvix/castore/src/blobservice/object_store.rs
index 5bb05cf26123..10874af64011 100644
--- a/tvix/castore/src/blobservice/object_store.rs
+++ b/tvix/castore/src/blobservice/object_store.rs
@@ -64,6 +64,7 @@ use super::{BlobReader, BlobService, BlobWriter, ChunkedReader};
 /// all keys stored so far, but no promises ;-)
 #[derive(Clone)]
 pub struct ObjectStoreBlobService {
+    instance_name: String,
     object_store: Arc<dyn ObjectStore>,
     base_path: Path,
 
@@ -92,7 +93,7 @@ fn derive_chunk_path(base_path: &Path, digest: &B3Digest) -> Path {
 
 #[async_trait]
 impl BlobService for ObjectStoreBlobService {
-    #[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
+    #[instrument(skip_all, ret(level = Level::TRACE), err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
         // TODO: clarify if this should work for chunks or not, and explicitly
         // document in the proto docs.
@@ -112,7 +113,7 @@ impl BlobService for ObjectStoreBlobService {
         }
     }
 
-    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
         // handle reading the empty blob.
         if digest.as_slice() == blake3::hash(b"").as_bytes() {
@@ -169,7 +170,7 @@ impl BlobService for ObjectStoreBlobService {
         }
     }
 
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     async fn open_write(&self) -> Box<dyn BlobWriter> {
         // ObjectStoreBlobWriter implements AsyncWrite, but all the chunking
         // needs an AsyncRead, so we create a pipe here.
@@ -192,7 +193,7 @@ impl BlobService for ObjectStoreBlobService {
         })
     }
 
-    #[instrument(skip_all, err, fields(blob.digest=%digest))]
+    #[instrument(skip_all, err, fields(blob.digest=%digest, instance_name=%self.instance_name))]
     async fn chunks(&self, digest: &B3Digest) -> io::Result<Option<Vec<ChunkMeta>>> {
         match self
             .object_store
@@ -294,7 +295,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig {
     type Output = dyn BlobService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn BlobService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         let (object_store, path) = object_store::parse_url_opts(
@@ -302,6 +303,7 @@ impl ServiceBuilder for ObjectStoreBlobServiceConfig {
             &self.object_store_options,
         )?;
         Ok(Arc::new(ObjectStoreBlobService {
+            instance_name: instance_name.to_string(),
             object_store: Arc::new(object_store),
             base_path: path,
             avg_chunk_size: self.avg_chunk_size,
@@ -582,6 +584,7 @@ mod test {
             object_store::parse_url(&Url::parse("memory:///").unwrap()).unwrap();
         let object_store: Arc<dyn object_store::ObjectStore> = Arc::from(object_store);
         let blobsvc = Arc::new(ObjectStoreBlobService {
+            instance_name: "test".into(),
             object_store: object_store.clone(),
             avg_chunk_size: default_avg_chunk_size(),
             base_path,
diff --git a/tvix/castore/src/blobservice/tests/utils.rs b/tvix/castore/src/blobservice/tests/utils.rs
index 7df4f00d3a09..7032d633dad0 100644
--- a/tvix/castore/src/blobservice/tests/utils.rs
+++ b/tvix/castore/src/blobservice/tests/utils.rs
@@ -29,14 +29,17 @@ pub async fn make_grpc_blob_service_client() -> Box<dyn BlobService> {
     // Create a client, connecting to the right side. The URI is unused.
     let mut maybe_right = Some(right);
 
-    Box::new(GRPCBlobService::from_client(BlobServiceClient::new(
-        Endpoint::try_from("http://[::]:50051")
-            .unwrap()
-            .connect_with_connector(tower::service_fn(move |_: Uri| {
-                let right = maybe_right.take().unwrap();
-                async move { Ok::<_, std::io::Error>(TokioIo::new(right)) }
-            }))
-            .await
-            .unwrap(),
-    )))
+    Box::new(GRPCBlobService::from_client(
+        "default".into(),
+        BlobServiceClient::new(
+            Endpoint::try_from("http://[::]:50051")
+                .unwrap()
+                .connect_with_connector(tower::service_fn(move |_: Uri| {
+                    let right = maybe_right.take().unwrap();
+                    async move { Ok::<_, std::io::Error>(TokioIo::new(right)) }
+                }))
+                .await
+                .unwrap(),
+        ),
+    ))
 }
diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs
index c76daafc523d..b251187e1c34 100644
--- a/tvix/castore/src/composition.rs
+++ b/tvix/castore/src/composition.rs
@@ -70,7 +70,7 @@
 //! });
 //!
 //! let blob_services_configs = with_registry(&REG, || serde_json::from_value(blob_services_configs_json))?;
-//! let mut blob_service_composition = Composition::default();
+//! let mut blob_service_composition = Composition::new(&REG);
 //! blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
 //! let blob_service: Arc<dyn BlobService> = blob_service_composition.build("default").await?;
 //! # Ok(())
@@ -88,11 +88,17 @@
 //! ```
 //!
 //! Continue with Example 2, with my_registry instead of REG
+//!
+//! EXPERIMENTAL: If the xp-store-composition feature is enabled,
+//! entrypoints can also be URL strings, which are created as
+//! anonymous stores. Instantiations of the same URL will
+//! result in a new, distinct anonymous store each time, so creating
+//! two `memory://` stores with this method will not share the same view.
+//! This behavior might change in the future.
 
 use erased_serde::deserialize;
 use futures::future::BoxFuture;
 use futures::FutureExt;
-use lazy_static::lazy_static;
 use serde::de::DeserializeOwned;
 use serde_tagged::de::{BoxFnSeed, SeedFactory};
 use serde_tagged::util::TagString;
@@ -101,7 +107,7 @@ use std::cell::Cell;
 use std::collections::BTreeMap;
 use std::collections::HashMap;
 use std::marker::PhantomData;
-use std::sync::Arc;
+use std::sync::{Arc, LazyLock};
 use tonic::async_trait;
 
 /// Resolves tag names to the corresponding Config type.
@@ -254,14 +260,13 @@ pub fn with_registry<R>(reg: &'static Registry, f: impl FnOnce() -> R) -> R {
     result
 }
 
-lazy_static! {
-    /// The provided registry of tvix_castore, with all builtin BlobStore/DirectoryStore implementations
-    pub static ref REG: Registry = {
-        let mut reg = Default::default();
-        add_default_services(&mut reg);
-        reg
-    };
-}
+/// The provided registry of tvix_castore, with all builtin BlobStore/DirectoryStore implementations
+pub static REG: LazyLock<&'static Registry> = LazyLock::new(|| {
+    let mut reg = Default::default();
+    add_default_services(&mut reg);
+    // explicitly leak to get an &'static, so that we gain `&Registry: Send` from `Registry: Sync`
+    Box::leak(Box::new(reg))
+});
 
 // ---------- End of generic registry code --------- //
 
@@ -278,12 +283,15 @@ pub struct CompositionContext<'a> {
     // The TypeId of the trait object is included to distinguish e.g. the
     // BlobService "default" and the DirectoryService "default".
     stack: Vec<(TypeId, String)>,
+    registry: &'static Registry,
     composition: Option<&'a Composition>,
 }
 
 impl<'a> CompositionContext<'a> {
-    pub fn blank() -> Self {
+    /// Get a composition context for one-off store creation.
+    pub fn blank(registry: &'static Registry) -> Self {
         Self {
+            registry,
             stack: Default::default(),
             composition: None,
         }
@@ -303,10 +311,104 @@ impl<'a> CompositionContext<'a> {
             )
             .into());
         }
-        match self.composition {
-            Some(comp) => Ok(comp.build_internal(self.stack.clone(), entrypoint).await?),
-            None => Err(CompositionError::NotFound(entrypoint).into()),
+
+        Ok(self.build_internal(entrypoint).await?)
+    }
+
+    #[cfg(feature = "xp-store-composition")]
+    async fn build_anonymous<T: ?Sized + Send + Sync + 'static>(
+        &self,
+        entrypoint: String,
+    ) -> Result<Arc<T>, Box<dyn std::error::Error + Send + Sync>> {
+        let url = url::Url::parse(&entrypoint)?;
+        let config: DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = T>>> =
+            with_registry(self.registry, || url.try_into())?;
+        config.0.build("anonymous", self).await
+    }
+
+    fn build_internal<T: ?Sized + Send + Sync + 'static>(
+        &self,
+        entrypoint: String,
+    ) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> {
+        #[cfg(feature = "xp-store-composition")]
+        if entrypoint.contains("://") {
+            // There is a chance this is a url. we are building an anonymous store
+            return Box::pin(async move {
+                self.build_anonymous(entrypoint.clone())
+                    .await
+                    .map_err(|e| CompositionError::Failed(entrypoint, Arc::from(e)))
+            });
         }
+
+        let mut stores = match self.composition {
+            Some(comp) => comp.stores.lock().unwrap(),
+            None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
+        };
+        let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) {
+            Some(v) => v,
+            None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
+        };
+        // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what
+        // the new value should be. the Mutex stays locked the entire time, so nobody will ever see
+        // this temporary value.
+        let prev_val = std::mem::replace(
+            entry,
+            Box::new(InstantiationState::<T>::Done(Err(
+                CompositionError::Poisoned(entrypoint.clone()),
+            ))),
+        );
+        let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() {
+            InstantiationState::Done(service) => (
+                InstantiationState::Done(service.clone()),
+                futures::future::ready(service).boxed(),
+            ),
+            // the construction of the store has not started yet.
+            InstantiationState::Config(config) => {
+                let (tx, rx) = tokio::sync::watch::channel(None);
+                (
+                    InstantiationState::InProgress(rx),
+                    (async move {
+                        let mut new_context = CompositionContext {
+                            composition: self.composition,
+                            registry: self.registry,
+                            stack: self.stack.clone(),
+                        };
+                        new_context
+                            .stack
+                            .push((TypeId::of::<T>(), entrypoint.clone()));
+                        let res =
+                            config.build(&entrypoint, &new_context).await.map_err(|e| {
+                                match e.downcast() {
+                                    Ok(e) => *e,
+                                    Err(e) => CompositionError::Failed(entrypoint, e.into()),
+                                }
+                            });
+                        tx.send(Some(res.clone())).unwrap();
+                        res
+                    })
+                    .boxed(),
+                )
+            }
+            // there is already a task driving forward the construction of this store, wait for it
+            // to notify us via the provided channel
+            InstantiationState::InProgress(mut recv) => {
+                (InstantiationState::InProgress(recv.clone()), {
+                    (async move {
+                        loop {
+                            if let Some(v) =
+                                recv.borrow_and_update().as_ref().map(|res| res.clone())
+                            {
+                                break v;
+                            }
+                            recv.changed().await.unwrap();
+                        }
+                    })
+                    .boxed()
+                })
+            }
+        };
+        *entry = Box::new(new_val);
+        ret
     }
 }
 
@@ -336,8 +438,8 @@ enum InstantiationState<T: ?Sized> {
     Done(Result<Arc<T>, CompositionError>),
 }
 
-#[derive(Default)]
 pub struct Composition {
+    registry: &'static Registry,
     stores: std::sync::Mutex<HashMap<(TypeId, String), Box<dyn Any + Send + Sync>>>,
 }
 
@@ -381,6 +483,14 @@ impl<T: ?Sized + Send + Sync + 'static>
 }
 
 impl Composition {
+    /// The given registry will be used for creation of anonymous stores during composition
+    pub fn new(registry: &'static Registry) -> Self {
+        Self {
+            registry,
+            stores: Default::default(),
+        }
+    }
+
     pub fn extend_with_configs<T: ?Sized + Send + Sync + 'static>(
         &mut self,
         // Keep the concrete `HashMap` type here since it allows for type
@@ -390,87 +500,17 @@ impl Composition {
         self.extend(configs);
     }
 
+    /// Looks up the entrypoint name in the composition and returns an instantiated service.
     pub async fn build<T: ?Sized + Send + Sync + 'static>(
         &self,
         entrypoint: &str,
     ) -> Result<Arc<T>, CompositionError> {
-        self.build_internal(vec![], entrypoint.to_string()).await
-    }
-
-    fn build_internal<T: ?Sized + Send + Sync + 'static>(
-        &self,
-        stack: Vec<(TypeId, String)>,
-        entrypoint: String,
-    ) -> BoxFuture<'_, Result<Arc<T>, CompositionError>> {
-        let mut stores = self.stores.lock().unwrap();
-        let entry = match stores.get_mut(&(TypeId::of::<T>(), entrypoint.clone())) {
-            Some(v) => v,
-            None => return Box::pin(futures::future::err(CompositionError::NotFound(entrypoint))),
-        };
-        // for lifetime reasons, we put a placeholder value in the hashmap while we figure out what
-        // the new value should be. the Mutex stays locked the entire time, so nobody will ever see
-        // this temporary value.
-        let prev_val = std::mem::replace(
-            entry,
-            Box::new(InstantiationState::<T>::Done(Err(
-                CompositionError::Poisoned(entrypoint.clone()),
-            ))),
-        );
-        let (new_val, ret) = match *prev_val.downcast::<InstantiationState<T>>().unwrap() {
-            InstantiationState::Done(service) => (
-                InstantiationState::Done(service.clone()),
-                futures::future::ready(service).boxed(),
-            ),
-            // the construction of the store has not started yet.
-            InstantiationState::Config(config) => {
-                let (tx, rx) = tokio::sync::watch::channel(None);
-                (
-                    InstantiationState::InProgress(rx),
-                    (async move {
-                        let mut new_context = CompositionContext {
-                            stack: stack.clone(),
-                            composition: Some(self),
-                        };
-                        new_context
-                            .stack
-                            .push((TypeId::of::<T>(), entrypoint.clone()));
-                        let res =
-                            config.build(&entrypoint, &new_context).await.map_err(|e| {
-                                match e.downcast() {
-                                    Ok(e) => *e,
-                                    Err(e) => CompositionError::Failed(entrypoint, e.into()),
-                                }
-                            });
-                        tx.send(Some(res.clone())).unwrap();
-                        res
-                    })
-                    .boxed(),
-                )
-            }
-            // there is already a task driving forward the construction of this store, wait for it
-            // to notify us via the provided channel
-            InstantiationState::InProgress(mut recv) => {
-                (InstantiationState::InProgress(recv.clone()), {
-                    (async move {
-                        loop {
-                            if let Some(v) =
-                                recv.borrow_and_update().as_ref().map(|res| res.clone())
-                            {
-                                break v;
-                            }
-                            recv.changed().await.unwrap();
-                        }
-                    })
-                    .boxed()
-                })
-            }
-        };
-        *entry = Box::new(new_val);
-        ret
+        self.context().build_internal(entrypoint.to_string()).await
     }
 
     pub fn context(&self) -> CompositionContext {
         CompositionContext {
+            registry: self.registry,
             stack: vec![],
             composition: Some(self),
         }
@@ -496,7 +536,7 @@ mod test {
 
         let blob_services_configs =
             with_registry(&REG, || serde_json::from_value(blob_services_configs_json)).unwrap();
-        let mut blob_service_composition = Composition::default();
+        let mut blob_service_composition = Composition::new(&REG);
         blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
         let (blob_service1, blob_service2) = tokio::join!(
             blob_service_composition.build::<dyn BlobService>("default"),
@@ -526,7 +566,7 @@ mod test {
 
         let blob_services_configs =
             with_registry(&REG, || serde_json::from_value(blob_services_configs_json)).unwrap();
-        let mut blob_service_composition = Composition::default();
+        let mut blob_service_composition = Composition::new(&REG);
         blob_service_composition.extend_with_configs::<dyn BlobService>(blob_services_configs);
         match blob_service_composition
             .build::<dyn BlobService>("default")
diff --git a/tvix/castore/src/digests.rs b/tvix/castore/src/digests.rs
index ef9a7326b3fb..4d919ff0d873 100644
--- a/tvix/castore/src/digests.rs
+++ b/tvix/castore/src/digests.rs
@@ -6,7 +6,7 @@ use thiserror::Error;
 pub struct B3Digest(Bytes);
 
 // TODO: allow converting these errors to crate::Error
-#[derive(Error, Debug)]
+#[derive(Error, Debug, PartialEq)]
 pub enum Error {
     #[error("invalid digest length: {0}")]
     InvalidDigestLen(usize),
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs
index d10dddaf9f60..7473481c94b5 100644
--- a/tvix/castore/src/directoryservice/bigtable.rs
+++ b/tvix/castore/src/directoryservice/bigtable.rs
@@ -9,7 +9,9 @@ use std::sync::Arc;
 use tonic::async_trait;
 use tracing::{instrument, trace, warn};
 
-use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter};
+use super::{
+    utils::traverse_directory, Directory, DirectoryPutter, DirectoryService, SimplePutter,
+};
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
 
@@ -35,6 +37,7 @@ const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
 /// directly at the root, so rely on store composition.
 #[derive(Clone)]
 pub struct BigtableDirectoryService {
+    instance_name: String,
     client: bigtable::BigTable,
     params: BigtableParameters,
 
@@ -47,7 +50,10 @@ pub struct BigtableDirectoryService {
 
 impl BigtableDirectoryService {
     #[cfg(not(test))]
-    pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
+    pub async fn connect(
+        instance_name: String,
+        params: BigtableParameters,
+    ) -> Result<Self, bigtable::Error> {
         let connection = bigtable::BigTableConnection::new(
             &params.project_id,
             &params.instance_name,
@@ -58,13 +64,17 @@ impl BigtableDirectoryService {
         .await?;
 
         Ok(Self {
+            instance_name,
             client: connection.client(),
             params,
         })
     }
 
     #[cfg(test)]
-    pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
+    pub async fn connect(
+        instance_name: String,
+        params: BigtableParameters,
+    ) -> Result<Self, bigtable::Error> {
         use std::time::Duration;
 
         use async_process::{Command, Stdio};
@@ -133,6 +143,7 @@ impl BigtableDirectoryService {
         )?;
 
         Ok(Self {
+            instance_name,
             client: connection.client(),
             params,
             emulator: (tmpdir, emulator_process).into(),
@@ -148,8 +159,8 @@ fn derive_directory_key(digest: &B3Digest) -> String {
 
 #[async_trait]
 impl DirectoryService for BigtableDirectoryService {
-    #[instrument(skip(self, digest), err, fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let mut client = self.client.clone();
         let directory_key = derive_directory_key(digest);
 
@@ -241,28 +252,20 @@ impl DirectoryService for BigtableDirectoryService {
 
         // Try to parse the value into a Directory message.
         let directory = proto::Directory::decode(Bytes::from(row_cell.value))
-            .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?;
-
-        // validate the Directory.
-        directory
-            .validate()
+            .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?
+            .try_into()
             .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?;
 
         Ok(Some(directory))
     }
 
-    #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         let directory_digest = directory.digest();
         let mut client = self.client.clone();
         let directory_key = derive_directory_key(&directory_digest);
 
-        // Ensure the directory we're trying to upload passes validation
-        directory
-            .validate()
-            .map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?;
-
-        let data = directory.encode_to_vec();
+        let data = proto::Directory::from(directory).encode_to_vec();
         if data.len() as u64 > CELL_SIZE_LIMIT {
             return Err(Error::StorageError(
                 "Directory exceeds cell limit on Bigtable".into(),
@@ -306,15 +309,15 @@ impl DirectoryService for BigtableDirectoryService {
         Ok(directory_digest)
     }
 
-    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
     where
         Self: Clone,
@@ -352,11 +355,11 @@ impl ServiceBuilder for BigtableParameters {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> {
         Ok(Arc::new(
-            BigtableDirectoryService::connect(self.clone()).await?,
+            BigtableDirectoryService::connect(instance_name.to_string(), self.clone()).await?,
         ))
     }
 }
diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs
index 0fdc82c16cb0..450c642715ac 100644
--- a/tvix/castore/src/directoryservice/combinators.rs
+++ b/tvix/castore/src/directoryservice/combinators.rs
@@ -7,10 +7,9 @@ use futures::TryStreamExt;
 use tonic::async_trait;
 use tracing::{instrument, trace};
 
-use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
+use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::directoryservice::DirectoryPutter;
-use crate::proto;
 use crate::B3Digest;
 use crate::Error;
 
@@ -23,13 +22,18 @@ use crate::Error;
 /// Inserts and listings are not implemented for now.
 #[derive(Clone)]
 pub struct Cache<DS1, DS2> {
+    instance_name: String,
     near: DS1,
     far: DS2,
 }
 
 impl<DS1, DS2> Cache<DS1, DS2> {
-    pub fn new(near: DS1, far: DS2) -> Self {
-        Self { near, far }
+    pub fn new(instance_name: String, near: DS1, far: DS2) -> Self {
+        Self {
+            instance_name,
+            near,
+            far,
+        }
     }
 }
 
@@ -39,8 +43,8 @@ where
     DS1: DirectoryService + Clone + 'static,
     DS2: DirectoryService + Clone + 'static,
 {
-    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         match self.near.get(digest).await? {
             Some(directory) => {
                 trace!("serving from cache");
@@ -81,16 +85,16 @@ where
         }
     }
 
-    #[instrument(skip_all)]
-    async fn put(&self, _directory: proto::Directory) -> Result<B3Digest, Error> {
+    #[instrument(skip_all, fields(instance_name = %self.instance_name))]
+    async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> {
         Err(Error::StorageError("unimplemented".to_string()))
     }
 
-    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         let near = self.near.clone();
         let far = self.far.clone();
         let digest = root_directory_digest.clone();
@@ -153,11 +157,12 @@ pub struct CacheConfig {
 
 impl TryFrom<url::Url> for CacheConfig {
     type Error = Box<dyn std::error::Error + Send + Sync>;
-    fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
-        Err(Error::StorageError(
-            "Instantiating a CombinedDirectoryService from a url is not supported".into(),
-        )
-        .into())
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        // cache doesn't support host or path in the URL.
+        if url.has_host() || !url.path().is_empty() {
+            return Err(Error::StorageError("invalid url".to_string()).into());
+        }
+        Ok(serde_qs::from_str(url.query().unwrap_or_default())?)
     }
 }
 
@@ -166,14 +171,15 @@ impl ServiceBuilder for CacheConfig {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         let (near, far) = futures::join!(
-            context.resolve(self.near.clone()),
-            context.resolve(self.far.clone())
+            context.resolve::<Self::Output>(self.near.clone()),
+            context.resolve::<Self::Output>(self.far.clone())
         );
         Ok(Arc::new(Cache {
+            instance_name: instance_name.to_string(),
             near: near?,
             far: far?,
         }))
diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs
index e6b9b163370c..54f3cb3e9353 100644
--- a/tvix/castore/src/directoryservice/directory_graph.rs
+++ b/tvix/castore/src/directoryservice/directory_graph.rs
@@ -1,7 +1,5 @@
 use std::collections::HashMap;
 
-use bstr::ByteSlice;
-
 use petgraph::{
     graph::{DiGraph, NodeIndex},
     visit::{Bfs, DfsPostOrder, EdgeRef, IntoNodeIdentifiers, Walker},
@@ -10,10 +8,7 @@ use petgraph::{
 use tracing::instrument;
 
 use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
-use crate::{
-    proto::{self, Directory, DirectoryNode},
-    B3Digest,
-};
+use crate::{path::PathComponent, B3Digest, Directory, Node};
 
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
@@ -21,6 +16,11 @@ pub enum Error {
     ValidationError(String),
 }
 
+struct EdgeWeight {
+    name: PathComponent,
+    size: u64,
+}
+
 /// This can be used to validate and/or re-order a Directory closure (DAG of
 /// connected Directories), and their insertion order.
 ///
@@ -58,7 +58,7 @@ pub struct DirectoryGraph<O> {
     //
     // The option in the edge weight tracks the pending validation state of the respective edge, for example if
     // the child has not been added yet.
-    graph: DiGraph<Option<Directory>, Option<DirectoryNode>>,
+    graph: DiGraph<Option<Directory>, Option<EdgeWeight>>,
 
     // A lookup table from directory digest to node index.
     digest_to_node_ix: HashMap<B3Digest, NodeIndex>,
@@ -67,18 +67,18 @@ pub struct DirectoryGraph<O> {
 }
 
 pub struct ValidatedDirectoryGraph {
-    graph: DiGraph<Option<Directory>, Option<DirectoryNode>>,
+    graph: DiGraph<Option<Directory>, Option<EdgeWeight>>,
 
     root: Option<NodeIndex>,
 }
 
-fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> {
+fn check_edge(edge: &EdgeWeight, child: &Directory) -> Result<(), Error> {
     // Ensure the size specified in the child node matches our records.
-    if dir.size != child.size() {
+    if edge.size != child.size() {
         return Err(Error::ValidationError(format!(
             "'{}' has wrong size, specified {}, recorded {}",
-            dir.name.as_bstr(),
-            dir.size,
+            edge.name,
+            edge.size,
             child.size(),
         )));
     }
@@ -88,7 +88,7 @@ fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> {
 impl DirectoryGraph<LeavesToRootValidator> {
     /// Insert a new Directory into the closure
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
-    pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
         if !self.order_validator.add_directory(&directory) {
             return Err(Error::ValidationError(
                 "unknown directory was referenced".into(),
@@ -108,7 +108,7 @@ impl DirectoryGraph<RootToLeavesValidator> {
 
     /// Insert a new Directory into the closure
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
-    pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
         let digest = directory.digest();
         if !self.order_validator.digest_allowed(&digest) {
             return Err(Error::ValidationError("unexpected digest".into()));
@@ -129,12 +129,7 @@ impl<O: OrderValidator> DirectoryGraph<O> {
     }
 
     /// Adds a directory which has already been confirmed to be in-order to the graph
-    pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> {
-        // Do some basic validation
-        directory
-            .validate()
-            .map_err(|e| Error::ValidationError(e.to_string()))?;
-
+    pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> {
         let digest = directory.digest();
 
         // Teach the graph about the existence of a node with this digest
@@ -149,23 +144,32 @@ impl<O: OrderValidator> DirectoryGraph<O> {
         }
 
         // set up edges to all child directories
-        for subdir in &directory.directories {
-            let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap();
-
-            let child_ix = *self
-                .digest_to_node_ix
-                .entry(subdir_digest)
-                .or_insert_with(|| self.graph.add_node(None));
-
-            let pending_edge_check = match &self.graph[child_ix] {
-                Some(child) => {
-                    // child is already available, validate the edge now
-                    check_edge(subdir, child)?;
-                    None
-                }
-                None => Some(subdir.clone()), // pending validation
-            };
-            self.graph.add_edge(ix, child_ix, pending_edge_check);
+        for (name, node) in directory.nodes() {
+            if let Node::Directory { digest, size } = node {
+                let child_ix = *self
+                    .digest_to_node_ix
+                    .entry(digest.clone())
+                    .or_insert_with(|| self.graph.add_node(None));
+
+                let pending_edge_check = match &self.graph[child_ix] {
+                    Some(child) => {
+                        // child is already available, validate the edge now
+                        check_edge(
+                            &EdgeWeight {
+                                name: name.clone(),
+                                size: *size,
+                            },
+                            child,
+                        )?;
+                        None
+                    }
+                    None => Some(EdgeWeight {
+                        name: name.clone(),
+                        size: *size,
+                    }), // pending validation
+                };
+                self.graph.add_edge(ix, child_ix, pending_edge_check);
+            }
         }
 
         // validate the edges from parents to this node
@@ -183,6 +187,7 @@ impl<O: OrderValidator> DirectoryGraph<O> {
                 .expect("edge not found")
                 .take()
                 .expect("edge is already validated");
+
             check_edge(&edge_weight, &directory)?;
         }
 
@@ -269,33 +274,23 @@ impl ValidatedDirectoryGraph {
 
 #[cfg(test)]
 mod tests {
-    use crate::{
-        fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C},
-        proto::{self, Directory},
-    };
-    use lazy_static::lazy_static;
+    use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
+    use crate::{Directory, Node};
     use rstest::rstest;
+    use std::sync::LazyLock;
 
-    lazy_static! {
-        pub static ref BROKEN_DIRECTORY : Directory = Directory {
-            symlinks: vec![proto::SymlinkNode {
-                name: "".into(), // invalid name!
-                target: "doesntmatter".into(),
-            }],
-            ..Default::default()
-        };
+    use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator};
 
-        pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory {
-            directories: vec![proto::DirectoryNode {
-                name: "foo".into(),
-                digest: DIRECTORY_A.digest().into(),
+    pub static BROKEN_PARENT_DIRECTORY: LazyLock<Directory> = LazyLock::new(|| {
+        Directory::try_from_iter([(
+            "foo".try_into().unwrap(),
+            Node::Directory {
+                digest: DIRECTORY_A.digest(),
                 size: DIRECTORY_A.size() + 42, // wrong!
-            }],
-            ..Default::default()
-        };
-    }
-
-    use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator};
+            },
+        )])
+        .unwrap()
+    });
 
     #[rstest]
     /// Uploading an empty directory should succeed.
@@ -312,8 +307,6 @@ mod tests {
     #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)]
     /// Uploading B (referring to A) should fail immediately, because A was never uploaded.
     #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)]
-    /// Uploading a directory failing validation should fail immediately.
-    #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)]
     /// Uploading a directory which refers to another Directory with a wrong size should fail.
     #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)]
     fn test_uploads(
@@ -366,8 +359,6 @@ mod tests {
     #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)]
     /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root).
     #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)]
-    /// Downloading a directory failing validation should fail immediately.
-    #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)]
     /// Downloading a directory which refers to another Directory with a wrong size should fail.
     #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)]
     fn test_downloads(
diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs
index 3feb8f3509fe..2f7fc6d7de5a 100644
--- a/tvix/castore/src/directoryservice/from_addr.rs
+++ b/tvix/castore/src/directoryservice/from_addr.rs
@@ -13,10 +13,6 @@ use super::DirectoryService;
 /// The following URIs are supported:
 /// - `memory:`
 ///   Uses a in-memory implementation.
-/// - `sled:`
-///   Uses a in-memory sled implementation.
-/// - `sled:///absolute/path/to/somewhere`
-///   Uses sled, using a path on the disk for persistency. Can be only opened
 ///   from one process at the same time.
 /// - `redb:`
 ///   Uses a in-memory redb implementation.
@@ -41,7 +37,7 @@ pub async fn from_addr(
     })?
     .0;
     let directory_service = directory_service_config
-        .build("anonymous", &CompositionContext::blank())
+        .build("anonymous", &CompositionContext::blank(&REG))
         .await?;
 
     Ok(directory_service)
@@ -49,31 +45,18 @@ pub async fn from_addr(
 
 #[cfg(test)]
 mod tests {
+    use std::sync::LazyLock;
+
     use super::from_addr;
-    use lazy_static::lazy_static;
     use rstest::rstest;
     use tempfile::TempDir;
 
-    lazy_static! {
-        static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap();
-        static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap();
-        static ref TMPDIR_REDB_1: TempDir = TempDir::new().unwrap();
-        static ref TMPDIR_REDB_2: TempDir = TempDir::new().unwrap();
-    }
+    static TMPDIR_REDB_1: LazyLock<TempDir> = LazyLock::new(|| TempDir::new().unwrap());
+    static TMPDIR_REDB_2: LazyLock<TempDir> = LazyLock::new(|| TempDir::new().unwrap());
 
     #[rstest]
     /// This uses an unsupported scheme.
     #[case::unsupported_scheme("http://foo.example/test", false)]
-    /// This configures sled in temporary mode.
-    #[case::sled_valid_temporary("sled://", true)]
-    /// This configures sled with /, which should fail.
-    #[case::sled_invalid_root("sled:///", false)]
-    /// This configures sled with a host, not path, which should fail.
-    #[case::sled_invalid_host("sled://foo.example", false)]
-    /// This configures sled with a valid path path, which should succeed.
-    #[case::sled_valid_path(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true)]
-    /// This configures sled with a host, and a valid path path, which should fail.
-    #[case::sled_invalid_host_with_valid_path(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false)]
     /// This correctly sets the scheme, and doesn't set a path.
     #[case::memory_valid("memory://", true)]
     /// This sets a memory url host to `foo`
@@ -104,6 +87,16 @@ mod tests {
     #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)]
     /// Correct scheme to connect to localhost over http, but with additional path, which is invalid.
     #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)]
+    /// A valid example for store composition using anonymous urls
+    #[cfg_attr(
+        feature = "xp-store-composition",
+        case::anonymous_url_composition("cache://?near=memory://&far=memory://", true)
+    )]
+    /// Store composition with anonymous urls should fail if the feature is disabled
+    #[cfg_attr(
+        not(feature = "xp-store-composition"),
+        case::anonymous_url_composition("cache://?near=memory://&far=memory://", false)
+    )]
     /// A valid example for Bigtable
     #[cfg_attr(
         all(feature = "cloud", feature = "integration"),
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index 4dc3931ed410..3fd177a34f28 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -1,9 +1,9 @@
 use std::collections::HashSet;
 
-use super::{DirectoryPutter, DirectoryService};
+use super::{Directory, DirectoryPutter, DirectoryService};
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::proto::{self, get_directory_request::ByWhat};
-use crate::{B3Digest, Error};
+use crate::{B3Digest, DirectoryError, Error};
 use async_stream::try_stream;
 use futures::stream::BoxStream;
 use std::sync::Arc;
@@ -17,6 +17,7 @@ use tracing::{instrument, warn, Instrument as _};
 /// Connects to a (remote) tvix-store DirectoryService over gRPC.
 #[derive(Clone)]
 pub struct GRPCDirectoryService<T> {
+    instance_name: String,
     /// The internal reference to a gRPC client.
     /// Cloning it is cheap, and it internally handles concurrent requests.
     grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
@@ -26,9 +27,13 @@ impl<T> GRPCDirectoryService<T> {
     /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient].
     /// panics if called outside the context of a tokio runtime.
     pub fn from_client(
+        instance_name: String,
         grpc_client: proto::directory_service_client::DirectoryServiceClient<T>,
     ) -> Self {
-        Self { grpc_client }
+        Self {
+            instance_name,
+            grpc_client,
+        }
     }
 }
 
@@ -40,11 +45,8 @@ where
     <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
     T::Future: Send,
 {
-    #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))]
-    async fn get(
-        &self,
-        digest: &B3Digest,
-    ) -> Result<Option<crate::proto::Directory>, crate::Error> {
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))]
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
         let digest_cpy = digest.clone();
@@ -72,15 +74,10 @@ where
                         "requested directory with digest {}, but got {}",
                         digest, actual_digest
                     )))
-                } else if let Err(e) = directory.validate() {
-                    // Validate the Directory itself is valid.
-                    warn!("directory failed validation: {}", e.to_string());
-                    Err(crate::Error::StorageError(format!(
-                        "directory {} failed validation: {}",
-                        digest, e,
-                    )))
                 } else {
-                    Ok(Some(directory))
+                    Ok(Some(directory.try_into().map_err(|_| {
+                        Error::StorageError("invalid root digest length in response".to_string())
+                    })?))
                 }
             }
             Ok(None) => Ok(None),
@@ -89,12 +86,12 @@ where
         }
     }
 
-    #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
+    async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> {
         let resp = self
             .grpc_client
             .clone()
-            .put(tokio_stream::once(directory))
+            .put(tokio_stream::once(proto::Directory::from(directory)))
             .await;
 
         match resp {
@@ -109,11 +106,11 @@ where
         }
     }
 
-    #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         let mut grpc_client = self.grpc_client.clone();
         let root_directory_digest = root_directory_digest.clone();
 
@@ -135,14 +132,6 @@ where
             loop {
                 match stream.message().await {
                     Ok(Some(directory)) => {
-                        // validate the directory itself.
-                        if let Err(e) = directory.validate() {
-                            Err(crate::Error::StorageError(format!(
-                                "directory {} failed validation: {}",
-                                directory.digest(),
-                                e,
-                            )))?;
-                        }
                         // validate we actually expected that directory, and move it from expected to received.
                         let directory_digest = directory.digest();
                         let was_expected = expected_directory_digests.remove(&directory_digest);
@@ -168,6 +157,9 @@ where
                                 .insert(child_directory_digest);
                         }
 
+                        let directory = directory.try_into()
+                            .map_err(|e: DirectoryError| Error::StorageError(e.to_string()))?;
+
                         yield directory;
                     },
                     Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => {
@@ -253,13 +245,16 @@ impl ServiceBuilder for GRPCDirectoryServiceConfig {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         let client = proto::directory_service_client::DirectoryServiceClient::new(
             crate::tonic::channel_from_url(&self.url.parse()?).await?,
         );
-        Ok(Arc::new(GRPCDirectoryService::from_client(client)))
+        Ok(Arc::new(GRPCDirectoryService::from_client(
+            instance_name.to_string(),
+            client,
+        )))
     }
 }
 
@@ -279,11 +274,11 @@ pub struct GRPCPutter {
 #[async_trait]
 impl DirectoryPutter for GRPCPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> {
         match self.rq {
             // If we're not already closed, send the directory to directory_sender.
             Some((_, ref directory_sender)) => {
-                if directory_sender.send(directory).is_err() {
+                if directory_sender.send(directory.into()).is_err() {
                     // If the channel has been prematurely closed, invoke close (so we can peek at the error code)
                     // That error code is much more helpful, because it
                     // contains the error message from the server.
@@ -387,7 +382,7 @@ mod tests {
                     .await
                     .expect("must succeed"),
             );
-            GRPCDirectoryService::from_client(client)
+            GRPCDirectoryService::from_client("test-instance".into(), client)
         };
 
         assert!(grpc_client
diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs
index ada4606a5a57..a43d7b8d8d31 100644
--- a/tvix/castore/src/directoryservice/memory.rs
+++ b/tvix/castore/src/directoryservice/memory.rs
@@ -1,4 +1,4 @@
-use crate::{proto, B3Digest, Error};
+use crate::{B3Digest, Error};
 use futures::stream::BoxStream;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -7,18 +7,20 @@ use tonic::async_trait;
 use tracing::{instrument, warn};
 
 use super::utils::traverse_directory;
-use super::{DirectoryPutter, DirectoryService, SimplePutter};
+use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter};
 use crate::composition::{CompositionContext, ServiceBuilder};
+use crate::proto;
 
 #[derive(Clone, Default)]
 pub struct MemoryDirectoryService {
+    instance_name: String,
     db: Arc<RwLock<HashMap<B3Digest, proto::Directory>>>,
 }
 
 #[async_trait]
 impl DirectoryService for MemoryDirectoryService {
-    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    #[instrument(skip(self, digest), err, fields(directory.digest = %digest, instance_name=%self.instance_name))]
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let db = self.db.read().await;
 
         match db.get(digest) {
@@ -37,48 +39,33 @@ impl DirectoryService for MemoryDirectoryService {
                     )));
                 }
 
-                // Validate the Directory itself is valid.
-                if let Err(e) = directory.validate() {
-                    warn!("directory failed validation: {}", e.to_string());
-                    return Err(Error::StorageError(format!(
-                        "directory {} failed validation: {}",
-                        actual_digest, e,
-                    )));
-                }
-
-                Ok(Some(directory.clone()))
+                Ok(Some(directory.clone().try_into().map_err(|e| {
+                    crate::Error::StorageError(format!("corrupted directory: {}", e))
+                })?))
             }
         }
     }
 
-    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest(), instance_name=%self.instance_name))]
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         let digest = directory.digest();
 
-        // validate the directory itself.
-        if let Err(e) = directory.validate() {
-            return Err(Error::InvalidRequest(format!(
-                "directory {} failed validation: {}",
-                digest, e,
-            )));
-        }
-
         // store it
         let mut db = self.db.write().await;
-        db.insert(digest.clone(), directory);
+        db.insert(digest.clone(), directory.into());
 
         Ok(digest)
     }
 
-    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name=%self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
-    #[instrument(skip_all)]
+    #[instrument(skip_all, fields(instance_name=%self.instance_name))]
     fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
     where
         Self: Clone,
@@ -107,9 +94,12 @@ impl ServiceBuilder for MemoryDirectoryServiceConfig {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
-        Ok(Arc::new(MemoryDirectoryService::default()))
+        Ok(Arc::new(MemoryDirectoryService {
+            instance_name: instance_name.to_string(),
+            db: Default::default(),
+        }))
     }
 }
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
index 17a78b179349..b3cb0f4fd67b 100644
--- a/tvix/castore/src/directoryservice/mod.rs
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -1,6 +1,7 @@
 use crate::composition::{Registry, ServiceBuilder};
-use crate::{proto, B3Digest, Error};
+use crate::{B3Digest, Directory, Error};
 
+use auto_impl::auto_impl;
 use futures::stream::BoxStream;
 use tonic::async_trait;
 mod combinators;
@@ -12,14 +13,13 @@ mod object_store;
 mod order_validator;
 mod redb;
 mod simple_putter;
-mod sled;
 #[cfg(test)]
 pub mod tests;
 mod traverse;
 mod utils;
 
 pub use self::combinators::{Cache, CacheConfig};
-pub use self::directory_graph::DirectoryGraph;
+pub use self::directory_graph::{DirectoryGraph, ValidatedDirectoryGraph};
 pub use self::from_addr::from_addr;
 pub use self::grpc::{GRPCDirectoryService, GRPCDirectoryServiceConfig};
 pub use self::memory::{MemoryDirectoryService, MemoryDirectoryServiceConfig};
@@ -27,7 +27,6 @@ pub use self::object_store::{ObjectStoreDirectoryService, ObjectStoreDirectorySe
 pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
 pub use self::redb::{RedbDirectoryService, RedbDirectoryServiceConfig};
 pub use self::simple_putter::SimplePutter;
-pub use self::sled::{SledDirectoryService, SledDirectoryServiceConfig};
 pub use self::traverse::descend_to;
 pub use self::utils::traverse_directory;
 
@@ -38,9 +37,10 @@ mod bigtable;
 pub use self::bigtable::{BigtableDirectoryService, BigtableParameters};
 
 /// The base trait all Directory services need to implement.
-/// This is a simple get and put of [crate::proto::Directory], returning their
+/// This is a simple get and put of [Directory], returning their
 /// digest.
 #[async_trait]
+#[auto_impl(&, &mut, Arc, Box)]
 pub trait DirectoryService: Send + Sync {
     /// Looks up a single Directory message by its digest.
     /// The returned Directory message *must* be valid.
@@ -50,14 +50,14 @@ pub trait DirectoryService: Send + Sync {
     /// Directory digests that are at the "root", aka the last element that's
     /// sent to a DirectoryPutter. This makes sense for implementations bundling
     /// closures of directories together in batches.
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error>;
     /// Uploads a single Directory message, and returns the calculated
     /// digest, or an error. An error *must* also be returned if the message is
     /// not valid.
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error>;
 
-    /// Looks up a closure of [proto::Directory].
-    /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`,
+    /// Looks up a closure of [Directory].
+    /// Ideally this would be a `impl Stream<Item = Result<Directory, Error>>`,
     /// and we'd be able to add a default implementation for it here, but
     /// we can't have that yet.
     ///
@@ -75,39 +75,14 @@ pub trait DirectoryService: Send + Sync {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>>;
+    ) -> BoxStream<'static, Result<Directory, Error>>;
 
-    /// Allows persisting a closure of [proto::Directory], which is a graph of
+    /// Allows persisting a closure of [Directory], which is a graph of
     /// connected Directory messages.
     fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>;
 }
 
-#[async_trait]
-impl<A> DirectoryService for A
-where
-    A: AsRef<dyn DirectoryService> + Send + Sync,
-{
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
-        self.as_ref().get(digest).await
-    }
-
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
-        self.as_ref().put(directory).await
-    }
-
-    fn get_recursive(
-        &self,
-        root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
-        self.as_ref().get_recursive(root_directory_digest)
-    }
-
-    fn put_multiple_start(&self) -> Box<dyn DirectoryPutter> {
-        self.as_ref().put_multiple_start()
-    }
-}
-
-/// Provides a handle to put a closure of connected [proto::Directory] elements.
+/// Provides a handle to put a closure of connected [Directory] elements.
 ///
 /// The consumer can periodically call [DirectoryPutter::put], starting from the
 /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to
@@ -119,12 +94,12 @@ where
 /// but a single file or symlink.
 #[async_trait]
 pub trait DirectoryPutter: Send {
-    /// Put a individual [proto::Directory] into the store.
+    /// Put a individual [Directory] into the store.
     /// Error semantics and behaviour is up to the specific implementation of
     /// this trait.
     /// Due to bursting, the returned error might refer to an object previously
     /// sent via `put`.
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
+    async fn put(&mut self, directory: Directory) -> Result<(), Error>;
 
     /// Close the stream, and wait for any errors.
     /// If there's been any invalid Directory message uploaded, and error *must*
@@ -138,7 +113,6 @@ pub(crate) fn register_directory_services(reg: &mut Registry) {
     reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::MemoryDirectoryServiceConfig>("memory");
     reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::CacheConfig>("cache");
     reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::GRPCDirectoryServiceConfig>("grpc");
-    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::SledDirectoryServiceConfig>("sled");
     reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::RedbDirectoryServiceConfig>("redb");
     #[cfg(feature = "cloud")]
     {
diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs
index a9a2cc8ef5c0..77578ec92f02 100644
--- a/tvix/castore/src/directoryservice/object_store.rs
+++ b/tvix/castore/src/directoryservice/object_store.rs
@@ -17,10 +17,11 @@ use tracing::{instrument, trace, warn, Level};
 use url::Url;
 
 use super::{
-    DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator,
+    Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
+    RootToLeavesValidator,
 };
 use crate::composition::{CompositionContext, ServiceBuilder};
-use crate::{proto, B3Digest, Error};
+use crate::{proto, B3Digest, Error, Node};
 
 /// Stores directory closures in an object store.
 /// Notably, this makes use of the option to disallow accessing child directories except when
@@ -31,6 +32,7 @@ use crate::{proto, B3Digest, Error};
 /// be returned to the client in get_recursive.
 #[derive(Clone)]
 pub struct ObjectStoreDirectoryService {
+    instance_name: String,
     object_store: Arc<dyn ObjectStore>,
     base_path: Path,
 }
@@ -62,6 +64,7 @@ impl ObjectStoreDirectoryService {
         let (object_store, path) = object_store::parse_url_opts(url, options)?;
 
         Ok(Self {
+            instance_name: "default".into(),
             object_store: Arc::new(object_store),
             base_path: path,
         })
@@ -71,20 +74,32 @@ impl ObjectStoreDirectoryService {
     pub fn parse_url(url: &Url) -> Result<Self, object_store::Error> {
         Self::parse_url_opts(url, Vec::<(String, String)>::new())
     }
+
+    pub fn new(instance_name: String, object_store: Arc<dyn ObjectStore>, base_path: Path) -> Self {
+        Self {
+            instance_name,
+            object_store,
+            base_path,
+        }
+    }
 }
 
 #[async_trait]
 impl DirectoryService for ObjectStoreDirectoryService {
     /// This is the same steps as for get_recursive anyways, so we just call get_recursive and
     /// return the first element of the stream and drop the request.
-    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %digest, instance_name = %self.instance_name))]
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         self.get_recursive(digest).take(1).next().await.transpose()
     }
 
-    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
-        if !directory.directories.is_empty() {
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
+        // Ensure the directory doesn't contain other directory children
+        if directory
+            .nodes()
+            .any(|(_, e)| matches!(e, Node::Directory { .. }))
+        {
             return Err(Error::InvalidRequest(
                     "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(),
             ));
@@ -95,11 +110,11 @@ impl DirectoryService for ObjectStoreDirectoryService {
         handle.close().await
     }
 
-    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(level = "trace", skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         // Check that we are not passing on bogus from the object store to the client, and that the
         // trust chain from the root digest to the leaves is intact
         let mut order_validator =
@@ -145,6 +160,10 @@ impl DirectoryService for ObjectStoreDirectoryService {
                             warn!("unable to parse directory {}: {}", digest, e);
                             Error::StorageError(e.to_string())
                         })?;
+                        let directory = Directory::try_from(directory).map_err(|e| {
+                            warn!("unable to convert directory {}: {}", digest, e);
+                            Error::StorageError(e.to_string())
+                        })?;
 
                         // Allow the children to appear next
                         order_validator.add_directory_unchecked(&directory);
@@ -210,17 +229,18 @@ impl ServiceBuilder for ObjectStoreDirectoryServiceConfig {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         let (object_store, path) = object_store::parse_url_opts(
             &self.object_store_url.parse()?,
             &self.object_store_options,
         )?;
-        Ok(Arc::new(ObjectStoreDirectoryService {
-            object_store: Arc::new(object_store),
-            base_path: path,
-        }))
+        Ok(Arc::new(ObjectStoreDirectoryService::new(
+            instance_name.to_string(),
+            Arc::new(object_store),
+            path,
+        )))
     }
 }
 
@@ -244,7 +264,7 @@ impl ObjectStoreDirectoryPutter {
 #[async_trait]
 impl DirectoryPutter for ObjectStoreDirectoryPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
@@ -302,7 +322,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter {
 
                 for directory in directories {
                     directories_sink
-                        .send(directory.encode_to_vec().into())
+                        .send(proto::Directory::from(directory).encode_to_vec().into())
                         .await?;
                 }
 
diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs
index 6045f5d24198..973af92e1294 100644
--- a/tvix/castore/src/directoryservice/order_validator.rs
+++ b/tvix/castore/src/directoryservice/order_validator.rs
@@ -1,7 +1,8 @@
 use std::collections::HashSet;
 use tracing::warn;
 
-use crate::{proto::Directory, B3Digest};
+use super::Directory;
+use crate::{B3Digest, Node};
 
 pub trait OrderValidator {
     /// Update the order validator's state with the directory
@@ -47,10 +48,11 @@ impl RootToLeavesValidator {
             self.expected_digests.insert(directory.digest());
         }
 
-        for subdir in &directory.directories {
-            // Allow the children to appear next
-            let subdir_digest = subdir.digest.clone().try_into().unwrap();
-            self.expected_digests.insert(subdir_digest);
+        // Allow the children to appear next
+        for (_, node) in directory.nodes() {
+            if let Node::Directory { digest, .. } = node {
+                self.expected_digests.insert(digest.clone());
+            }
         }
     }
 }
@@ -79,15 +81,20 @@ impl OrderValidator for LeavesToRootValidator {
     fn add_directory(&mut self, directory: &Directory) -> bool {
         let digest = directory.digest();
 
-        for subdir in &directory.directories {
-            let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory()
-            if !self.allowed_references.contains(&subdir_digest) {
-                warn!(
-                    directory.digest = %digest,
-                    subdirectory.digest = %subdir_digest,
-                    "unexpected directory reference"
-                );
-                return false;
+        for (_, node) in directory.nodes() {
+            if let Node::Directory {
+                digest: subdir_node_digest,
+                ..
+            } = node
+            {
+                if !self.allowed_references.contains(subdir_node_digest) {
+                    warn!(
+                        directory.digest = %digest,
+                        subdirectory.digest = %subdir_node_digest,
+                        "unexpected directory reference"
+                    );
+                    return false;
+                }
             }
         }
 
@@ -101,8 +108,8 @@ impl OrderValidator for LeavesToRootValidator {
 mod tests {
     use super::{LeavesToRootValidator, RootToLeavesValidator};
     use crate::directoryservice::order_validator::OrderValidator;
+    use crate::directoryservice::Directory;
     use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
-    use crate::proto::Directory;
     use rstest::rstest;
 
     #[rstest]
diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs
index 51dc87f92574..ba1fb682d5da 100644
--- a/tvix/castore/src/directoryservice/redb.rs
+++ b/tvix/castore/src/directoryservice/redb.rs
@@ -5,20 +5,21 @@ use std::{path::PathBuf, sync::Arc};
 use tonic::async_trait;
 use tracing::{instrument, warn};
 
+use super::{
+    traverse_directory, Directory, DirectoryGraph, DirectoryPutter, DirectoryService,
+    LeavesToRootValidator,
+};
 use crate::{
     composition::{CompositionContext, ServiceBuilder},
     digests, proto, B3Digest, Error,
 };
 
-use super::{
-    traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
-};
-
 const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> =
     TableDefinition::new("directory");
 
 #[derive(Clone)]
 pub struct RedbDirectoryService {
+    instance_name: String,
     // We wrap the db in an Arc to be able to move it into spawn_blocking,
     // as discussed in https://github.com/cberner/redb/issues/789
     db: Arc<Database>,
@@ -27,7 +28,7 @@ pub struct RedbDirectoryService {
 impl RedbDirectoryService {
     /// Constructs a new instance using the specified filesystem path for
     /// storage.
-    pub async fn new(path: PathBuf) -> Result<Self, Error> {
+    pub async fn new(instance_name: String, path: PathBuf) -> Result<Self, Error> {
         if path == PathBuf::from("/") {
             return Err(Error::StorageError(
                 "cowardly refusing to open / with redb".to_string(),
@@ -41,7 +42,10 @@ impl RedbDirectoryService {
         })
         .await??;
 
-        Ok(Self { db: Arc::new(db) })
+        Ok(Self {
+            instance_name,
+            db: Arc::new(db),
+        })
     }
 
     /// Constructs a new instance using the in-memory backend.
@@ -51,7 +55,10 @@ impl RedbDirectoryService {
 
         create_schema(&db)?;
 
-        Ok(Self { db: Arc::new(db) })
+        Ok(Self {
+            instance_name: "default".into(),
+            db: Arc::new(db),
+        })
     }
 }
 
@@ -68,8 +75,8 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
 
 #[async_trait]
 impl DirectoryService for RedbDirectoryService {
-    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    #[instrument(skip(self, digest), fields(directory.digest = %digest, instance_name = %self.instance_name))]
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let db = self.db.clone();
 
         // Retrieves the protobuf-encoded Directory for the corresponding digest.
@@ -107,13 +114,10 @@ impl DirectoryService for RedbDirectoryService {
         let directory = match proto::Directory::decode(&*directory_data.value()) {
             Ok(dir) => {
                 // The returned Directory must be valid.
-                if let Err(e) = dir.validate() {
+                dir.try_into().map_err(|e| {
                     warn!(err=%e, "Directory failed validation");
-                    return Err(Error::StorageError(
-                        "Directory failed validation".to_string(),
-                    ));
-                }
-                dir
+                    Error::StorageError("Directory failed validation".to_string())
+                })?
             }
             Err(e) => {
                 warn!(err=%e, "failed to parse Directory");
@@ -124,27 +128,22 @@ impl DirectoryService for RedbDirectoryService {
         Ok(Some(directory))
     }
 
-    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest(), instance_name = %self.instance_name))]
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         tokio::task::spawn_blocking({
             let db = self.db.clone();
             move || {
                 let digest = directory.digest();
 
-                // Validate the directory.
-                if let Err(e) = directory.validate() {
-                    warn!(err=%e, "Directory failed validation");
-                    return Err(Error::StorageError(
-                        "Directory failed validation".to_string(),
-                    ));
-                }
-
                 // Store the directory in the table.
                 let txn = db.begin_write()?;
                 {
                     let mut table = txn.open_table(DIRECTORY_TABLE)?;
                     let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into();
-                    table.insert(digest_as_array, directory.encode_to_vec())?;
+                    table.insert(
+                        digest_as_array,
+                        proto::Directory::from(directory).encode_to_vec(),
+                    )?;
                 }
                 txn.commit()?;
 
@@ -154,11 +153,11 @@ impl DirectoryService for RedbDirectoryService {
         .await?
     }
 
-    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
+    #[instrument(skip_all, fields(directory.digest = %root_directory_digest, instance_name = %self.instance_name))]
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single
         // redb transaction to avoid constantly closing and opening new transactions for the
         // database.
@@ -185,7 +184,7 @@ pub struct RedbDirectoryPutter {
 #[async_trait]
 impl DirectoryPutter for RedbDirectoryPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
@@ -228,7 +227,10 @@ impl DirectoryPutter for RedbDirectoryPutter {
                             for directory in directories {
                                 let digest_as_array: [u8; digests::B3_LEN] =
                                     directory.digest().into();
-                                table.insert(digest_as_array, directory.encode_to_vec())?;
+                                table.insert(
+                                    digest_as_array,
+                                    proto::Directory::from(directory).encode_to_vec(),
+                                )?;
                             }
                         }
 
@@ -280,7 +282,7 @@ impl ServiceBuilder for RedbDirectoryServiceConfig {
     type Output = dyn DirectoryService;
     async fn build<'a>(
         &'a self,
-        _instance_name: &str,
+        instance_name: &str,
         _context: &CompositionContext,
     ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
         match self {
@@ -302,7 +304,9 @@ impl ServiceBuilder for RedbDirectoryServiceConfig {
             RedbDirectoryServiceConfig {
                 is_temporary: false,
                 path: Some(path),
-            } => Ok(Arc::new(RedbDirectoryService::new(path.into()).await?)),
+            } => Ok(Arc::new(
+                RedbDirectoryService::new(instance_name.to_string(), path.into()).await?,
+            )),
         }
     }
 }
diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs
index dc54e3d11d18..b4daaee61b22 100644
--- a/tvix/castore/src/directoryservice/simple_putter.rs
+++ b/tvix/castore/src/directoryservice/simple_putter.rs
@@ -1,7 +1,6 @@
 use super::DirectoryPutter;
 use super::DirectoryService;
-use super::{DirectoryGraph, LeavesToRootValidator};
-use crate::proto;
+use super::{Directory, DirectoryGraph, LeavesToRootValidator};
 use crate::B3Digest;
 use crate::Error;
 use tonic::async_trait;
@@ -29,7 +28,7 @@ impl<DS: DirectoryService> SimplePutter<DS> {
 #[async_trait]
 impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs
deleted file mode 100644
index 5766dec1a5c2..000000000000
--- a/tvix/castore/src/directoryservice/sled.rs
+++ /dev/null
@@ -1,269 +0,0 @@
-use crate::proto::Directory;
-use crate::{proto, B3Digest, Error};
-use futures::stream::BoxStream;
-use prost::Message;
-use std::ops::Deref;
-use std::path::Path;
-use std::sync::Arc;
-use tonic::async_trait;
-use tracing::{instrument, warn};
-
-use super::utils::traverse_directory;
-use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator};
-use crate::composition::{CompositionContext, ServiceBuilder};
-
-#[derive(Clone)]
-pub struct SledDirectoryService {
-    db: sled::Db,
-}
-
-impl SledDirectoryService {
-    pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> {
-        if p.as_ref() == Path::new("/") {
-            return Err(sled::Error::Unsupported(
-                "cowardly refusing to open / with sled".to_string(),
-            ));
-        }
-
-        let config = sled::Config::default()
-            .use_compression(false) // is a required parameter
-            .path(p);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-
-    pub fn new_temporary() -> Result<Self, sled::Error> {
-        let config = sled::Config::default().temporary(true);
-        let db = config.open()?;
-
-        Ok(Self { db })
-    }
-}
-
-#[async_trait]
-impl DirectoryService for SledDirectoryService {
-    #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
-        let resp = tokio::task::spawn_blocking({
-            let db = self.db.clone();
-            let digest = digest.clone();
-            move || db.get(digest.as_slice())
-        })
-        .await?
-        .map_err(|e| {
-            warn!("failed to retrieve directory: {}", e);
-            Error::StorageError(format!("failed to retrieve directory: {}", e))
-        })?;
-
-        match resp {
-            // The directory was not found, return
-            None => Ok(None),
-
-            // The directory was found, try to parse the data as Directory message
-            Some(data) => match Directory::decode(&*data) {
-                Ok(directory) => {
-                    // Validate the retrieved Directory indeed has the
-                    // digest we expect it to have, to detect corruptions.
-                    let actual_digest = directory.digest();
-                    if actual_digest != *digest {
-                        return Err(Error::StorageError(format!(
-                            "requested directory with digest {}, but got {}",
-                            digest, actual_digest
-                        )));
-                    }
-
-                    // Validate the Directory itself is valid.
-                    if let Err(e) = directory.validate() {
-                        warn!("directory failed validation: {}", e.to_string());
-                        return Err(Error::StorageError(format!(
-                            "directory {} failed validation: {}",
-                            actual_digest, e,
-                        )));
-                    }
-
-                    Ok(Some(directory))
-                }
-                Err(e) => {
-                    warn!("unable to parse directory {}: {}", digest, e);
-                    Err(Error::StorageError(e.to_string()))
-                }
-            },
-        }
-    }
-
-    #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
-        tokio::task::spawn_blocking({
-            let db = self.db.clone();
-            move || {
-                let digest = directory.digest();
-
-                // validate the directory itself.
-                if let Err(e) = directory.validate() {
-                    return Err(Error::InvalidRequest(format!(
-                        "directory {} failed validation: {}",
-                        digest, e,
-                    )));
-                }
-                // store it
-                db.insert(digest.as_slice(), directory.encode_to_vec())
-                    .map_err(|e| Error::StorageError(e.to_string()))?;
-
-                Ok(digest)
-            }
-        })
-        .await?
-    }
-
-    #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
-    fn get_recursive(
-        &self,
-        root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
-        traverse_directory(self.clone(), root_directory_digest)
-    }
-
-    #[instrument(skip_all)]
-    fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
-    where
-        Self: Clone,
-    {
-        Box::new(SledDirectoryPutter {
-            tree: self.db.deref().clone(),
-            directory_validator: Some(Default::default()),
-        })
-    }
-}
-
-#[derive(serde::Deserialize)]
-#[serde(deny_unknown_fields)]
-pub struct SledDirectoryServiceConfig {
-    is_temporary: bool,
-    #[serde(default)]
-    /// required when is_temporary = false
-    path: Option<String>,
-}
-
-impl TryFrom<url::Url> for SledDirectoryServiceConfig {
-    type Error = Box<dyn std::error::Error + Send + Sync>;
-    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
-        // sled doesn't support host, and a path can be provided (otherwise
-        // it'll live in memory only).
-        if url.has_host() {
-            return Err(Error::StorageError("no host allowed".to_string()).into());
-        }
-
-        // TODO: expose compression and other parameters as URL parameters?
-
-        Ok(if url.path().is_empty() {
-            SledDirectoryServiceConfig {
-                is_temporary: true,
-                path: None,
-            }
-        } else {
-            SledDirectoryServiceConfig {
-                is_temporary: false,
-                path: Some(url.path().to_string()),
-            }
-        })
-    }
-}
-
-#[async_trait]
-impl ServiceBuilder for SledDirectoryServiceConfig {
-    type Output = dyn DirectoryService;
-    async fn build<'a>(
-        &'a self,
-        _instance_name: &str,
-        _context: &CompositionContext,
-    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
-        match self {
-            SledDirectoryServiceConfig {
-                is_temporary: true,
-                path: None,
-            } => Ok(Arc::new(SledDirectoryService::new_temporary()?)),
-            SledDirectoryServiceConfig {
-                is_temporary: true,
-                path: Some(_),
-            } => Err(Error::StorageError(
-                "Temporary SledDirectoryService can not have path".into(),
-            )
-            .into()),
-            SledDirectoryServiceConfig {
-                is_temporary: false,
-                path: None,
-            } => Err(Error::StorageError("SledDirectoryService is missing path".into()).into()),
-            SledDirectoryServiceConfig {
-                is_temporary: false,
-                path: Some(path),
-            } => Ok(Arc::new(SledDirectoryService::new(path)?)),
-        }
-    }
-}
-
-/// Buffers Directory messages to be uploaded and inserts them in a batch
-/// transaction on close.
-pub struct SledDirectoryPutter {
-    tree: sled::Tree,
-
-    /// The directories (inside the directory validator) that we insert later,
-    /// or None, if they were already inserted.
-    directory_validator: Option<DirectoryGraph<LeavesToRootValidator>>,
-}
-
-#[async_trait]
-impl DirectoryPutter for SledDirectoryPutter {
-    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
-        match self.directory_validator {
-            None => return Err(Error::StorageError("already closed".to_string())),
-            Some(ref mut validator) => {
-                validator
-                    .add(directory)
-                    .map_err(|e| Error::StorageError(e.to_string()))?;
-            }
-        }
-
-        Ok(())
-    }
-
-    #[instrument(level = "trace", skip_all, ret, err)]
-    async fn close(&mut self) -> Result<B3Digest, Error> {
-        match self.directory_validator.take() {
-            None => Err(Error::InvalidRequest("already closed".to_string())),
-            Some(validator) => {
-                // Insert all directories as a batch.
-                tokio::task::spawn_blocking({
-                    let tree = self.tree.clone();
-                    move || {
-                        // retrieve the validated directories.
-                        let directories = validator
-                            .validate()
-                            .map_err(|e| Error::StorageError(e.to_string()))?
-                            .drain_leaves_to_root()
-                            .collect::<Vec<_>>();
-
-                        // Get the root digest, which is at the end (cf. insertion order)
-                        let root_digest = directories
-                            .last()
-                            .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))?
-                            .digest();
-
-                        let mut batch = sled::Batch::default();
-                        for directory in directories {
-                            batch.insert(directory.digest().as_slice(), directory.encode_to_vec());
-                        }
-
-                        tree.apply_batch(batch).map_err(|e| {
-                            Error::StorageError(format!("unable to apply batch: {}", e))
-                        })?;
-
-                        Ok(root_digest)
-                    }
-                })
-                .await?
-            }
-        }
-    }
-}
diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs
index b698f70ea469..d394a5679c32 100644
--- a/tvix/castore/src/directoryservice/tests/mod.rs
+++ b/tvix/castore/src/directoryservice/tests/mod.rs
@@ -8,10 +8,8 @@ use rstest_reuse::{self, *};
 
 use super::DirectoryService;
 use crate::directoryservice;
-use crate::{
-    fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D},
-    proto::{self, Directory},
-};
+use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D};
+use crate::{Directory, Node};
 
 mod utils;
 use self::utils::make_grpc_directory_service_client;
@@ -25,7 +23,6 @@ use self::utils::make_grpc_directory_service_client;
 #[rstest]
 #[case::grpc(make_grpc_directory_service_client().await)]
 #[case::memory(directoryservice::from_addr("memory://").await.unwrap())]
-#[case::sled(directoryservice::from_addr("sled://").await.unwrap())]
 #[case::redb(directoryservice::from_addr("redb://").await.unwrap())]
 #[case::objectstore(directoryservice::from_addr("objectstore+memory://").await.unwrap())]
 #[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))]
@@ -41,10 +38,10 @@ async fn test_non_exist(directory_service: impl DirectoryService) {
 
     // recursive get
     assert_eq!(
-        Vec::<Result<proto::Directory, crate::Error>>::new(),
+        Vec::<Result<Directory, crate::Error>>::new(),
         directory_service
             .get_recursive(&DIRECTORY_A.digest())
-            .collect::<Vec<Result<proto::Directory, crate::Error>>>()
+            .collect::<Vec<Result<Directory, crate::Error>>>()
             .await
     );
 }
@@ -212,58 +209,20 @@ async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService
     }
 }
 
-/// Try uploading a Directory failing its internal validation, ensure it gets
-/// rejected.
-#[apply(directory_services)]
-#[tokio::test]
-async fn upload_reject_failing_validation(directory_service: impl DirectoryService) {
-    let broken_directory = Directory {
-        symlinks: vec![proto::SymlinkNode {
-            name: "".into(), // wrong!
-            target: "doesntmatter".into(),
-        }],
-        ..Default::default()
-    };
-    assert!(broken_directory.validate().is_err());
-
-    // Try to upload via single upload.
-    assert!(
-        directory_service
-            .put(broken_directory.clone())
-            .await
-            .is_err(),
-        "single upload must fail"
-    );
-
-    // Try to upload via put_multiple. We're a bit more permissive here, the
-    // intermediate .put() might succeed, due to client-side bursting (in the
-    // case of gRPC), but then the close MUST fail.
-    let mut handle = directory_service.put_multiple_start();
-    if handle.put(broken_directory).await.is_ok() {
-        assert!(
-            handle.close().await.is_err(),
-            "when succeeding put, close must fail"
-        )
-    }
-}
-
 /// Try uploading a Directory that refers to a previously-uploaded directory.
 /// Both pass their isolated validation, but the size field in the parent is wrong.
 /// This should be rejected.
 #[apply(directory_services)]
 #[tokio::test]
 async fn upload_reject_wrong_size(directory_service: impl DirectoryService) {
-    let wrong_parent_directory = Directory {
-        directories: vec![proto::DirectoryNode {
-            name: "foo".into(),
-            digest: DIRECTORY_A.digest().into(),
+    let wrong_parent_directory = Directory::try_from_iter([(
+        "foo".try_into().unwrap(),
+        Node::Directory {
+            digest: DIRECTORY_A.digest(),
             size: DIRECTORY_A.size() + 42, // wrong!
-        }],
-        ..Default::default()
-    };
-
-    // Make sure isolated validation itself is ok
-    assert!(wrong_parent_directory.validate().is_ok());
+        },
+    )])
+    .unwrap();
 
     // Now upload both. Ensure it either fails during the second put, or during
     // the close.
diff --git a/tvix/castore/src/directoryservice/tests/utils.rs b/tvix/castore/src/directoryservice/tests/utils.rs
index 3d245ea412d5..1c5d68de0108 100644
--- a/tvix/castore/src/directoryservice/tests/utils.rs
+++ b/tvix/castore/src/directoryservice/tests/utils.rs
@@ -33,6 +33,7 @@ pub async fn make_grpc_directory_service_client() -> Box<dyn DirectoryService> {
     // Create a client, connecting to the right side. The URI is unused.
     let mut maybe_right = Some(right);
     Box::new(GRPCDirectoryService::from_client(
+        "default".into(),
         DirectoryServiceClient::new(
             Endpoint::try_from("http://[::]:50051")
                 .unwrap()
diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs
index 17a51ae2bbff..0bd67e9bcf1f 100644
--- a/tvix/castore/src/directoryservice/traverse.rs
+++ b/tvix/castore/src/directoryservice/traverse.rs
@@ -1,8 +1,4 @@
-use super::DirectoryService;
-use crate::{
-    proto::{node::Node, NamedNode},
-    B3Digest, Error, Path,
-};
+use crate::{directoryservice::DirectoryService, Error, Node, Path};
 use tracing::{instrument, warn};
 
 /// This descends from a (root) node to the given (sub)path, returning the Node
@@ -17,19 +13,14 @@ where
     DS: AsRef<dyn DirectoryService>,
 {
     let mut parent_node = root_node;
-    for component in path.as_ref().components() {
+    for component in path.as_ref().components_bytes() {
         match parent_node {
-            Node::File(_) | Node::Symlink(_) => {
+            Node::File { .. } | Node::Symlink { .. } => {
                 // There's still some path left, but the parent node is no directory.
                 // This means the path doesn't exist, as we can't reach it.
                 return Ok(None);
             }
-            Node::Directory(directory_node) => {
-                let digest: B3Digest = directory_node
-                    .digest
-                    .try_into()
-                    .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
-
+            Node::Directory { digest, .. } => {
                 // fetch the linked node from the directory_service.
                 let directory =
                     directory_service
@@ -44,15 +35,16 @@ where
                         })?;
 
                 // look for the component in the [Directory].
-                // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we
-                // could stop as soon as e.name is larger than the search string.
-                if let Some(child_node) = directory.nodes().find(|n| n.get_name() == component) {
+                if let Some((_child_name, child_node)) = directory
+                    .into_nodes()
+                    .find(|(name, _node)| name.as_ref() == component)
+                {
                     // child node found, update prev_node to that and continue.
-                    parent_node = child_node;
+                    parent_node = child_node.clone();
                 } else {
                     // child node not found means there's no such element inside the directory.
                     return Ok(None);
-                }
+                };
             }
         }
     }
@@ -65,8 +57,8 @@ where
 mod tests {
     use crate::{
         directoryservice,
-        fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP},
-        PathBuf,
+        fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST},
+        Node, PathBuf,
     };
 
     use super::descend_to;
@@ -88,21 +80,23 @@ mod tests {
         handle.close().await.expect("must upload");
 
         // construct the node for DIRECTORY_COMPLICATED
-        let node_directory_complicated =
-            crate::proto::node::Node::Directory(crate::proto::DirectoryNode {
-                name: "doesntmatter".into(),
-                digest: DIRECTORY_COMPLICATED.digest().into(),
-                size: DIRECTORY_COMPLICATED.size(),
-            });
+        let node_directory_complicated = Node::Directory {
+            digest: DIRECTORY_COMPLICATED.digest(),
+            size: DIRECTORY_COMPLICATED.size(),
+        };
 
         // construct the node for DIRECTORY_COMPLICATED
-        let node_directory_with_keep = crate::proto::node::Node::Directory(
-            DIRECTORY_COMPLICATED.directories.first().unwrap().clone(),
-        );
+        let node_directory_with_keep = Node::Directory {
+            digest: DIRECTORY_WITH_KEEP.digest(),
+            size: DIRECTORY_WITH_KEEP.size(),
+        };
 
         // construct the node for the .keep file
-        let node_file_keep =
-            crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone());
+        let node_file_keep = Node::File {
+            digest: EMPTY_BLOB_DIGEST.clone(),
+            size: 0,
+            executable: false,
+        };
 
         // traversal to an empty subpath should return the root node.
         {
diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs
index 726734f55eec..d073c2c3c8ec 100644
--- a/tvix/castore/src/directoryservice/utils.rs
+++ b/tvix/castore/src/directoryservice/utils.rs
@@ -1,21 +1,22 @@
+use super::Directory;
 use super::DirectoryService;
-use crate::proto;
 use crate::B3Digest;
 use crate::Error;
+use crate::Node;
 use async_stream::try_stream;
 use futures::stream::BoxStream;
 use std::collections::{HashSet, VecDeque};
 use tracing::instrument;
 use tracing::warn;
 
-/// Traverses a [proto::Directory] from the root to the children.
+/// Traverses a [Directory] from the root to the children.
 ///
 /// This is mostly BFS, but directories are only returned once.
 #[instrument(skip(directory_service))]
 pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
     directory_service: DS,
     root_directory_digest: &B3Digest,
-) -> BoxStream<'a, Result<proto::Directory, Error>> {
+) -> BoxStream<'a, Result<Directory, Error>> {
     // The list of all directories that still need to be traversed. The next
     // element is picked from the front, new elements are enqueued at the
     // back.
@@ -50,16 +51,6 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
                 Some(dir) => dir,
             };
 
-
-            // validate, we don't want to send invalid directories.
-            current_directory.validate().map_err(|e| {
-               warn!("directory failed validation: {}", e.to_string());
-               Error::StorageError(format!(
-                   "invalid directory: {}",
-                   current_directory_digest
-               ))
-            })?;
-
             // We're about to send this directory, so let's avoid sending it again if a
             // descendant has it.
             sent_directory_digests.insert(current_directory_digest);
@@ -67,16 +58,15 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
             // enqueue all child directory digests to the work queue, as
             // long as they're not part of the worklist or already sent.
             // This panics if the digest looks invalid, it's supposed to be checked first.
-            for child_directory_node in &current_directory.directories {
-                // TODO: propagate error
-                let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap();
-
-                if worklist_directory_digests.contains(&child_digest)
-                    || sent_directory_digests.contains(&child_digest)
-                {
-                    continue;
+            for (_, child_directory_node) in current_directory.nodes() {
+                if let Node::Directory{digest: child_digest, ..} = child_directory_node {
+                    if worklist_directory_digests.contains(child_digest)
+                        || sent_directory_digests.contains(child_digest)
+                    {
+                        continue;
+                    }
+                    worklist_directory_digests.push_back(child_digest.clone());
                 }
-                worklist_directory_digests.push_back(child_digest);
             }
 
             yield current_directory;
diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs
index 5bbcd7b04ef1..1c4605200842 100644
--- a/tvix/castore/src/errors.rs
+++ b/tvix/castore/src/errors.rs
@@ -1,7 +1,13 @@
+use bstr::ByteSlice;
 use thiserror::Error;
 use tokio::task::JoinError;
 use tonic::Status;
 
+use crate::{
+    path::{PathComponent, PathComponentError},
+    SymlinkTargetError,
+};
+
 /// Errors related to communication with the store.
 #[derive(Debug, Error, PartialEq)]
 pub enum Error {
@@ -12,6 +18,52 @@ pub enum Error {
     StorageError(String),
 }
 
+/// Errors that occur during construction of [crate::Node]
+#[derive(Debug, thiserror::Error, PartialEq)]
+pub enum ValidateNodeError {
+    /// Invalid digest length encountered
+    #[error("invalid digest length: {0}")]
+    InvalidDigestLen(usize),
+    /// Invalid symlink target
+    #[error("Invalid symlink target: {0}")]
+    InvalidSymlinkTarget(SymlinkTargetError),
+}
+
+impl From<crate::digests::Error> for ValidateNodeError {
+    fn from(e: crate::digests::Error) -> Self {
+        match e {
+            crate::digests::Error::InvalidDigestLen(n) => ValidateNodeError::InvalidDigestLen(n),
+        }
+    }
+}
+
+/// Errors that can occur when populating [crate::Directory] messages,
+/// or parsing [crate::proto::Directory]
+#[derive(Debug, thiserror::Error, PartialEq)]
+pub enum DirectoryError {
+    /// Multiple elements with the same name encountered
+    #[error("{:?} is a duplicate name", .0)]
+    DuplicateName(PathComponent),
+    /// Node failed validation
+    #[error("invalid node with name {}: {:?}", .0.as_bstr(), .1.to_string())]
+    InvalidNode(bytes::Bytes, ValidateNodeError),
+    #[error("Total size exceeds u64::MAX")]
+    SizeOverflow,
+    /// Invalid name encountered
+    #[error("Invalid name: {0}")]
+    InvalidName(PathComponentError),
+    /// This can occur if a protobuf node with a name is passed where we expect
+    /// it to be anonymous.
+    #[error("Name is set when it shouldn't")]
+    NameInAnonymousNode,
+    /// Elements are not in sorted order. Can only happen on protos
+    #[error("{:?} is not sorted", .0.as_bstr())]
+    WrongSorting(bytes::Bytes),
+    /// This can only happen if there's an unknown node type (on protos)
+    #[error("No node set")]
+    NoNodeSet,
+}
+
 impl From<JoinError> for Error {
     fn from(value: JoinError) -> Self {
         Error::StorageError(value.to_string())
diff --git a/tvix/castore/src/fixtures.rs b/tvix/castore/src/fixtures.rs
index 3ebda64a818a..db0ee59daf60 100644
--- a/tvix/castore/src/fixtures.rs
+++ b/tvix/castore/src/fixtures.rs
@@ -1,103 +1,120 @@
-use crate::{
-    proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode},
-    B3Digest,
-};
-use lazy_static::lazy_static;
+use bytes::Bytes;
+use std::sync::LazyLock;
+
+use crate::{B3Digest, Directory, Node};
 
 pub const HELLOWORLD_BLOB_CONTENTS: &[u8] = b"Hello World!";
 pub const EMPTY_BLOB_CONTENTS: &[u8] = b"";
 
-lazy_static! {
-    pub static ref DUMMY_DIGEST: B3Digest = {
-        let u = [0u8; 32];
-        (&u).into()
-    };
-    pub static ref DUMMY_DIGEST_2: B3Digest = {
-        let mut u = [0u8; 32];
-        u[0] = 0x10;
-        (&u).into()
-    };
-    pub static ref DUMMY_DATA_1: bytes::Bytes = vec![0x01, 0x02, 0x03].into();
-    pub static ref DUMMY_DATA_2: bytes::Bytes = vec![0x04, 0x05].into();
+pub static DUMMY_DIGEST: LazyLock<B3Digest> = LazyLock::new(|| (&[0u8; 32]).into());
+pub static DUMMY_DIGEST_2: LazyLock<B3Digest> = LazyLock::new(|| {
+    let mut u = [0u8; 32];
+    u[0] = 0x10;
+    (&u).into()
+});
+pub static DUMMY_DATA_1: LazyLock<Bytes> = LazyLock::new(|| vec![0x01, 0x02, 0x03].into());
+pub static DUMMY_DATA_2: LazyLock<Bytes> = LazyLock::new(|| vec![0x04, 0x05].into());
 
-    pub static ref HELLOWORLD_BLOB_DIGEST: B3Digest =
-        blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().into();
-    pub static ref EMPTY_BLOB_DIGEST: B3Digest =
-        blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().into();
+pub static HELLOWORLD_BLOB_DIGEST: LazyLock<B3Digest> =
+    LazyLock::new(|| blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().into());
+pub static EMPTY_BLOB_DIGEST: LazyLock<B3Digest> =
+    LazyLock::new(|| blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().into());
 
-    // 2 bytes
-    pub static ref BLOB_A: bytes::Bytes = vec![0x00, 0x01].into();
-    pub static ref BLOB_A_DIGEST: B3Digest = blake3::hash(&BLOB_A).as_bytes().into();
+// 2 bytes
+pub static BLOB_A: LazyLock<Bytes> = LazyLock::new(|| vec![0x00, 0x01].into());
+pub static BLOB_A_DIGEST: LazyLock<B3Digest> =
+    LazyLock::new(|| blake3::hash(&BLOB_A).as_bytes().into());
 
-    // 1MB
-    pub static ref BLOB_B: bytes::Bytes = (0..255).collect::<Vec<u8>>().repeat(4 * 1024).into();
-    pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into();
+// 1MB
+pub static BLOB_B: LazyLock<Bytes> =
+    LazyLock::new(|| (0..255).collect::<Vec<u8>>().repeat(4 * 1024).into());
+pub static BLOB_B_DIGEST: LazyLock<B3Digest> =
+    LazyLock::new(|| blake3::hash(&BLOB_B).as_bytes().into());
 
-    // Directories
-    pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory {
-        directories: vec![],
-        files: vec![FileNode {
-            name: b".keep".to_vec().into(),
-            digest: EMPTY_BLOB_DIGEST.clone().into(),
-            size: 0,
-            executable: false,
-        }],
-        symlinks: vec![],
-    };
-    pub static ref DIRECTORY_COMPLICATED: proto::Directory = proto::Directory {
-        directories: vec![DirectoryNode {
-            name: b"keep".to_vec().into(),
-            digest: DIRECTORY_WITH_KEEP.digest().into(),
-            size: DIRECTORY_WITH_KEEP.size(),
-        }],
-        files: vec![FileNode {
-            name: b".keep".to_vec().into(),
-            digest: EMPTY_BLOB_DIGEST.clone().into(),
+// Directories
+pub static DIRECTORY_WITH_KEEP: LazyLock<Directory> = LazyLock::new(|| {
+    Directory::try_from_iter([(
+        ".keep".try_into().unwrap(),
+        Node::File {
+            digest: EMPTY_BLOB_DIGEST.clone(),
             size: 0,
             executable: false,
-        }],
-        symlinks: vec![SymlinkNode {
-            name: b"aa".to_vec().into(),
-            target: b"/nix/store/somewhereelse".to_vec().into(),
-        }],
-    };
-    pub static ref DIRECTORY_A: Directory = Directory::default();
-    pub static ref DIRECTORY_B: Directory = Directory {
-        directories: vec![DirectoryNode {
-            name: b"a".to_vec().into(),
-            digest: DIRECTORY_A.digest().into(),
+        },
+    )])
+    .unwrap()
+});
+pub static DIRECTORY_COMPLICATED: LazyLock<Directory> = LazyLock::new(|| {
+    Directory::try_from_iter([
+        (
+            "keep".try_into().unwrap(),
+            Node::Directory {
+                digest: DIRECTORY_WITH_KEEP.digest(),
+                size: DIRECTORY_WITH_KEEP.size(),
+            },
+        ),
+        (
+            ".keep".try_into().unwrap(),
+            Node::File {
+                digest: EMPTY_BLOB_DIGEST.clone(),
+                size: 0,
+                executable: false,
+            },
+        ),
+        (
+            "aa".try_into().unwrap(),
+            Node::Symlink {
+                target: "/nix/store/somewhereelse".try_into().unwrap(),
+            },
+        ),
+    ])
+    .unwrap()
+});
+pub static DIRECTORY_A: LazyLock<Directory> = LazyLock::new(Directory::new);
+pub static DIRECTORY_B: LazyLock<Directory> = LazyLock::new(|| {
+    Directory::try_from_iter([(
+        "a".try_into().unwrap(),
+        Node::Directory {
+            digest: DIRECTORY_A.digest(),
             size: DIRECTORY_A.size(),
-        }],
-        ..Default::default()
-    };
-    pub static ref DIRECTORY_C: Directory = Directory {
-        directories: vec![
-            DirectoryNode {
-                name: b"a".to_vec().into(),
-                digest: DIRECTORY_A.digest().into(),
+        },
+    )])
+    .unwrap()
+});
+pub static DIRECTORY_C: LazyLock<Directory> = LazyLock::new(|| {
+    Directory::try_from_iter([
+        (
+            "a".try_into().unwrap(),
+            Node::Directory {
+                digest: DIRECTORY_A.digest(),
                 size: DIRECTORY_A.size(),
             },
-            DirectoryNode {
-                name: b"a'".to_vec().into(),
-                digest: DIRECTORY_A.digest().into(),
+        ),
+        (
+            "a'".try_into().unwrap(),
+            Node::Directory {
+                digest: DIRECTORY_A.digest(),
                 size: DIRECTORY_A.size(),
-            }
-        ],
-        ..Default::default()
-    };
-    pub static ref DIRECTORY_D: proto::Directory = proto::Directory {
-        directories: vec![
-            DirectoryNode {
-                name: b"a".to_vec().into(),
-                digest: DIRECTORY_A.digest().into(),
+            },
+        ),
+    ])
+    .unwrap()
+});
+pub static DIRECTORY_D: LazyLock<Directory> = LazyLock::new(|| {
+    Directory::try_from_iter([
+        (
+            "a".try_into().unwrap(),
+            Node::Directory {
+                digest: DIRECTORY_A.digest(),
                 size: DIRECTORY_A.size(),
             },
-            DirectoryNode {
-                name: b"b'".to_vec().into(),
-                digest: DIRECTORY_B.digest().into(),
+        ),
+        (
+            "b".try_into().unwrap(),
+            Node::Directory {
+                digest: DIRECTORY_B.digest(),
                 size: DIRECTORY_B.size(),
-            }
-        ],
-        ..Default::default()
-    };
-}
+            },
+        ),
+    ])
+    .unwrap()
+});
diff --git a/tvix/castore/src/fs/fuse/tests.rs b/tvix/castore/src/fs/fuse/tests.rs
index bcebcf4a7292..0d68af090daf 100644
--- a/tvix/castore/src/fs/fuse/tests.rs
+++ b/tvix/castore/src/fs/fuse/tests.rs
@@ -1,5 +1,4 @@
 use bstr::ByteSlice;
-use bytes::Bytes;
 use std::{
     collections::BTreeMap,
     ffi::{OsStr, OsString},
@@ -12,13 +11,14 @@ use tempfile::TempDir;
 use tokio_stream::{wrappers::ReadDirStream, StreamExt};
 
 use super::FuseDaemon;
-use crate::fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST};
-use crate::proto as castorepb;
-use crate::proto::node::Node;
 use crate::{
     blobservice::{BlobService, MemoryBlobService},
     directoryservice::{DirectoryService, MemoryDirectoryService},
-    fixtures,
+    fixtures, Node,
+};
+use crate::{
+    fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST},
+    PathComponent,
 };
 
 const BLOB_A_NAME: &str = "00000000000000000000000000000000-test";
@@ -39,14 +39,14 @@ fn gen_svcs() -> (Arc<dyn BlobService>, Arc<dyn DirectoryService>) {
 fn do_mount<P: AsRef<Path>, BS, DS>(
     blob_service: BS,
     directory_service: DS,
-    root_nodes: BTreeMap<bytes::Bytes, Node>,
+    root_nodes: BTreeMap<PathComponent, Node>,
     mountpoint: P,
     list_root: bool,
     show_xattr: bool,
 ) -> io::Result<FuseDaemon>
 where
-    BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
-    DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
+    BS: BlobService + Send + Sync + Clone + 'static,
+    DS: DirectoryService + Send + Sync + Clone + 'static,
 {
     let fs = TvixStoreFs::new(
         blob_service,
@@ -60,7 +60,7 @@ where
 
 async fn populate_blob_a(
     blob_service: &Arc<dyn BlobService>,
-    root_nodes: &mut BTreeMap<Bytes, Node>,
+    root_nodes: &mut BTreeMap<PathComponent, Node>,
 ) {
     let mut bw = blob_service.open_write().await;
     tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw)
@@ -69,19 +69,18 @@ async fn populate_blob_a(
     bw.close().await.expect("must succeed closing");
 
     root_nodes.insert(
-        BLOB_A_NAME.into(),
-        Node::File(castorepb::FileNode {
-            name: BLOB_A_NAME.into(),
-            digest: fixtures::BLOB_A_DIGEST.clone().into(),
+        BLOB_A_NAME.try_into().unwrap(),
+        Node::File {
+            digest: fixtures::BLOB_A_DIGEST.clone(),
             size: fixtures::BLOB_A.len() as u64,
             executable: false,
-        }),
+        },
     );
 }
 
 async fn populate_blob_b(
     blob_service: &Arc<dyn BlobService>,
-    root_nodes: &mut BTreeMap<Bytes, Node>,
+    root_nodes: &mut BTreeMap<PathComponent, Node>,
 ) {
     let mut bw = blob_service.open_write().await;
     tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw)
@@ -90,20 +89,19 @@ async fn populate_blob_b(
     bw.close().await.expect("must succeed closing");
 
     root_nodes.insert(
-        BLOB_B_NAME.into(),
-        Node::File(castorepb::FileNode {
-            name: BLOB_B_NAME.into(),
-            digest: fixtures::BLOB_B_DIGEST.clone().into(),
+        BLOB_B_NAME.try_into().unwrap(),
+        Node::File {
+            digest: fixtures::BLOB_B_DIGEST.clone(),
             size: fixtures::BLOB_B.len() as u64,
             executable: false,
-        }),
+        },
     );
 }
 
 /// adds a blob containing helloworld and marks it as executable
 async fn populate_blob_helloworld(
     blob_service: &Arc<dyn BlobService>,
-    root_nodes: &mut BTreeMap<Bytes, Node>,
+    root_nodes: &mut BTreeMap<PathComponent, Node>,
 ) {
     let mut bw = blob_service.open_write().await;
     tokio::io::copy(
@@ -115,42 +113,39 @@ async fn populate_blob_helloworld(
     bw.close().await.expect("must succeed closing");
 
     root_nodes.insert(
-        HELLOWORLD_BLOB_NAME.into(),
-        Node::File(castorepb::FileNode {
-            name: HELLOWORLD_BLOB_NAME.into(),
-            digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(),
+        HELLOWORLD_BLOB_NAME.try_into().unwrap(),
+        Node::File {
+            digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone(),
             size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64,
             executable: true,
-        }),
+        },
     );
 }
 
-async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) {
+async fn populate_symlink(root_nodes: &mut BTreeMap<PathComponent, Node>) {
     root_nodes.insert(
-        SYMLINK_NAME.into(),
-        Node::Symlink(castorepb::SymlinkNode {
-            name: SYMLINK_NAME.into(),
-            target: BLOB_A_NAME.into(),
-        }),
+        SYMLINK_NAME.try_into().unwrap(),
+        Node::Symlink {
+            target: BLOB_A_NAME.try_into().unwrap(),
+        },
     );
 }
 
 /// This writes a symlink pointing to /nix/store/somewhereelse,
 /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED.
-async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) {
+async fn populate_symlink2(root_nodes: &mut BTreeMap<PathComponent, Node>) {
     root_nodes.insert(
-        SYMLINK_NAME2.into(),
-        Node::Symlink(castorepb::SymlinkNode {
-            name: SYMLINK_NAME2.into(),
-            target: "/nix/store/somewhereelse".into(),
-        }),
+        SYMLINK_NAME2.try_into().unwrap(),
+        Node::Symlink {
+            target: "/nix/store/somewhereelse".try_into().unwrap(),
+        },
     );
 }
 
 async fn populate_directory_with_keep(
     blob_service: &Arc<dyn BlobService>,
     directory_service: &Arc<dyn DirectoryService>,
-    root_nodes: &mut BTreeMap<Bytes, Node>,
+    root_nodes: &mut BTreeMap<PathComponent, Node>,
 ) {
     // upload empty blob
     let mut bw = blob_service.open_write().await;
@@ -166,45 +161,42 @@ async fn populate_directory_with_keep(
         .expect("must succeed uploading");
 
     root_nodes.insert(
-        DIRECTORY_WITH_KEEP_NAME.into(),
-        castorepb::node::Node::Directory(castorepb::DirectoryNode {
-            name: DIRECTORY_WITH_KEEP_NAME.into(),
-            digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(),
+        DIRECTORY_WITH_KEEP_NAME.try_into().unwrap(),
+        Node::Directory {
+            digest: fixtures::DIRECTORY_WITH_KEEP.digest(),
             size: fixtures::DIRECTORY_WITH_KEEP.size(),
-        }),
+        },
     );
 }
 
 /// Create a root node for DIRECTORY_WITH_KEEP, but don't upload the Directory
 /// itself.
-async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) {
+async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<PathComponent, Node>) {
     root_nodes.insert(
-        DIRECTORY_WITH_KEEP_NAME.into(),
-        castorepb::node::Node::Directory(castorepb::DirectoryNode {
-            name: DIRECTORY_WITH_KEEP_NAME.into(),
-            digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(),
+        DIRECTORY_WITH_KEEP_NAME.try_into().unwrap(),
+        Node::Directory {
+            digest: fixtures::DIRECTORY_WITH_KEEP.digest(),
             size: fixtures::DIRECTORY_WITH_KEEP.size(),
-        }),
+        },
     );
 }
 
 /// Insert BLOB_A, but don't provide the blob .keep is pointing to.
-async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) {
+async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<PathComponent, Node>) {
     root_nodes.insert(
-        BLOB_A_NAME.into(),
-        Node::File(castorepb::FileNode {
-            name: BLOB_A_NAME.into(),
-            digest: fixtures::BLOB_A_DIGEST.clone().into(),
+        BLOB_A_NAME.try_into().unwrap(),
+        Node::File {
+            digest: fixtures::BLOB_A_DIGEST.clone(),
             size: fixtures::BLOB_A.len() as u64,
             executable: false,
-        }),
+        },
     );
 }
 
 async fn populate_directory_complicated(
     blob_service: &Arc<dyn BlobService>,
     directory_service: &Arc<dyn DirectoryService>,
-    root_nodes: &mut BTreeMap<Bytes, Node>,
+    root_nodes: &mut BTreeMap<PathComponent, Node>,
 ) {
     // upload empty blob
     let mut bw = blob_service.open_write().await;
@@ -226,12 +218,11 @@ async fn populate_directory_complicated(
         .expect("must succeed uploading");
 
     root_nodes.insert(
-        DIRECTORY_COMPLICATED_NAME.into(),
-        Node::Directory(castorepb::DirectoryNode {
-            name: DIRECTORY_COMPLICATED_NAME.into(),
-            digest: fixtures::DIRECTORY_COMPLICATED.digest().into(),
+        DIRECTORY_COMPLICATED_NAME.try_into().unwrap(),
+        Node::Directory {
+            digest: fixtures::DIRECTORY_COMPLICATED.digest(),
             size: fixtures::DIRECTORY_COMPLICATED.size(),
-        }),
+        },
     );
 }
 
diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs
index bdd459543470..2696fdede378 100644
--- a/tvix/castore/src/fs/inodes.rs
+++ b/tvix/castore/src/fs/inodes.rs
@@ -2,10 +2,7 @@
 //! about inodes, which present tvix-castore nodes in a filesystem.
 use std::time::Duration;
 
-use bytes::Bytes;
-
-use crate::proto as castorepb;
-use crate::B3Digest;
+use crate::{path::PathComponent, B3Digest, Node};
 
 #[derive(Clone, Debug)]
 pub enum InodeData {
@@ -20,27 +17,23 @@ pub enum InodeData {
 /// lookup and did fetch the data.
 #[derive(Clone, Debug)]
 pub enum DirectoryInodeData {
-    Sparse(B3Digest, u64),                                  // digest, size
-    Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)]
+    Sparse(B3Digest, u64),                                // digest, size
+    Populated(B3Digest, Vec<(u64, PathComponent, Node)>), // [(child_inode, name, node)]
 }
 
 impl InodeData {
     /// Constructs a new InodeData by consuming a [Node].
-    /// It splits off the orginal name, so it can be used later.
-    pub fn from_node(node: castorepb::node::Node) -> (Self, Bytes) {
+    pub fn from_node(node: &Node) -> Self {
         match node {
-            castorepb::node::Node::Directory(n) => (
-                Self::Directory(DirectoryInodeData::Sparse(
-                    n.digest.try_into().unwrap(),
-                    n.size,
-                )),
-                n.name,
-            ),
-            castorepb::node::Node::File(n) => (
-                Self::Regular(n.digest.try_into().unwrap(), n.size, n.executable),
-                n.name,
-            ),
-            castorepb::node::Node::Symlink(n) => (Self::Symlink(n.target), n.name),
+            Node::Directory { digest, size } => {
+                Self::Directory(DirectoryInodeData::Sparse(digest.clone(), *size))
+            }
+            Node::File {
+                digest,
+                size,
+                executable,
+            } => Self::Regular(digest.clone(), *size, *executable),
+            Node::Symlink { target } => Self::Symlink(target.clone().into()),
         }
     }
 
diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs
index b565ed60ac42..4f50868b8f44 100644
--- a/tvix/castore/src/fs/mod.rs
+++ b/tvix/castore/src/fs/mod.rs
@@ -15,15 +15,13 @@ use self::{
     inode_tracker::InodeTracker,
     inodes::{DirectoryInodeData, InodeData},
 };
-use crate::proto as castorepb;
 use crate::{
     blobservice::{BlobReader, BlobService},
     directoryservice::DirectoryService,
-    proto::{node::Node, NamedNode},
-    B3Digest,
+    path::PathComponent,
+    B3Digest, Node,
 };
 use bstr::ByteVec;
-use bytes::Bytes;
 use fuse_backend_rs::abi::fuse_abi::{stat64, OpenOptions};
 use fuse_backend_rs::api::filesystem::{
     Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID,
@@ -89,7 +87,7 @@ pub struct TvixStoreFs<BS, DS, RN> {
     show_xattr: bool,
 
     /// This maps a given basename in the root to the inode we allocated for the node.
-    root_nodes: RwLock<HashMap<Bytes, u64>>,
+    root_nodes: RwLock<HashMap<PathComponent, u64>>,
 
     /// This keeps track of inodes and data alongside them.
     inode_tracker: RwLock<InodeTracker>,
@@ -105,7 +103,7 @@ pub struct TvixStoreFs<BS, DS, RN> {
             u64,
             (
                 Span,
-                Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>,
+                Arc<Mutex<mpsc::Receiver<(usize, Result<(PathComponent, Node), crate::Error>)>>>,
             ),
         >,
     >,
@@ -123,8 +121,8 @@ pub struct TvixStoreFs<BS, DS, RN> {
 
 impl<BS, DS, RN> TvixStoreFs<BS, DS, RN>
 where
-    BS: AsRef<dyn BlobService> + Clone + Send,
-    DS: AsRef<dyn DirectoryService> + Clone + Send + 'static,
+    BS: BlobService + Clone + Send,
+    DS: DirectoryService + Clone + Send + 'static,
     RN: RootNodes + Clone + 'static,
 {
     pub fn new(
@@ -156,7 +154,7 @@ where
 
     /// Retrieves the inode for a given root node basename, if present.
     /// This obtains a read lock on self.root_nodes.
-    fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> {
+    fn get_inode_for_root_name(&self, name: &PathComponent) -> Option<u64> {
         self.root_nodes.read().get(name).cloned()
     }
 
@@ -167,8 +165,12 @@ where
     /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup
     /// in self.directory_service is performed, and self.inode_tracker is updated with the
     /// [DirectoryInodeData::Populated].
+    #[allow(clippy::type_complexity)]
     #[instrument(skip(self), err)]
-    fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> {
+    fn get_directory_children(
+        &self,
+        ino: u64,
+    ) -> io::Result<(B3Digest, Vec<(u64, PathComponent, Node)>)> {
         let data = self.inode_tracker.read().get(ino).unwrap();
         match *data {
             // if it's populated already, return children.
@@ -184,7 +186,7 @@ where
                     .block_on({
                         let directory_service = self.directory_service.clone();
                         let parent_digest = parent_digest.to_owned();
-                        async move { directory_service.as_ref().get(&parent_digest).await }
+                        async move { directory_service.get(&parent_digest).await }
                     })?
                     .ok_or_else(|| {
                         warn!(directory.digest=%parent_digest, "directory not found");
@@ -198,13 +200,13 @@ where
                 let children = {
                     let mut inode_tracker = self.inode_tracker.write();
 
-                    let children: Vec<(u64, castorepb::node::Node)> = directory
-                        .nodes()
-                        .map(|child_node| {
-                            let (inode_data, _) = InodeData::from_node(child_node.clone());
+                    let children: Vec<(u64, PathComponent, Node)> = directory
+                        .into_nodes()
+                        .map(|(child_name, child_node)| {
+                            let inode_data = InodeData::from_node(&child_node);
 
                             let child_ino = inode_tracker.put(inode_data);
-                            (child_ino, child_node)
+                            (child_ino, child_name, child_node)
                         })
                         .collect();
 
@@ -238,12 +240,12 @@ where
     /// In the case the name can't be found, a libc::ENOENT is returned.
     fn name_in_root_to_ino_and_data(
         &self,
-        name: &std::ffi::CStr,
+        name: &PathComponent,
     ) -> io::Result<(u64, Arc<InodeData>)> {
         // Look up the inode for that root node.
         // If there's one, [self.inode_tracker] MUST also contain the data,
         // which we can then return.
-        if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) {
+        if let Some(inode) = self.get_inode_for_root_name(name) {
             return Ok((
                 inode,
                 self.inode_tracker
@@ -257,7 +259,8 @@ where
         // We don't have it yet, look it up in [self.root_nodes].
         match self.tokio_handle.block_on({
             let root_nodes_provider = self.root_nodes_provider.clone();
-            async move { root_nodes_provider.get_by_basename(name.to_bytes()).await }
+            let name = name.clone();
+            async move { root_nodes_provider.get_by_basename(&name).await }
         }) {
             // if there was an error looking up the root node, propagate up an IO error.
             Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)),
@@ -265,15 +268,9 @@ where
             Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
             // The root node does exist
             Ok(Some(root_node)) => {
-                // The name must match what's passed in the lookup, otherwise this is also a ENOENT.
-                if root_node.get_name() != name.to_bytes() {
-                    debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch");
-                    return Err(io::Error::from_raw_os_error(libc::ENOENT));
-                }
-
                 // Let's check if someone else beat us to updating the inode tracker and
                 // root_nodes map. This avoids locking inode_tracker for writing.
-                if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) {
+                if let Some(ino) = self.root_nodes.read().get(name) {
                     return Ok((
                         *ino,
                         self.inode_tracker.read().get(*ino).expect("must exist"),
@@ -287,9 +284,9 @@ where
 
                 // insert the (sparse) inode data and register in
                 // self.root_nodes.
-                let (inode_data, name) = InodeData::from_node(root_node);
+                let inode_data = InodeData::from_node(&root_node);
                 let ino = inode_tracker.put(inode_data.clone());
-                root_nodes.insert(name, ino);
+                root_nodes.insert(name.to_owned(), ino);
 
                 Ok((ino, Arc::new(inode_data)))
             }
@@ -303,10 +300,22 @@ const ROOT_NODES_BUFFER_SIZE: usize = 16;
 const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest";
 const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest";
 
+#[cfg(all(feature = "virtiofs", target_os = "linux"))]
+impl<BS, DS, RN> fuse_backend_rs::api::filesystem::Layer for TvixStoreFs<BS, DS, RN>
+where
+    BS: BlobService + Clone + Send + 'static,
+    DS: DirectoryService + Send + Clone + 'static,
+    RN: RootNodes + Clone + 'static,
+{
+    fn root_inode(&self) -> Self::Inode {
+        ROOT_ID
+    }
+}
+
 impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN>
 where
-    BS: AsRef<dyn BlobService> + Clone + Send + 'static,
-    DS: AsRef<dyn DirectoryService> + Send + Clone + 'static,
+    BS: BlobService + Clone + Send + 'static,
+    DS: DirectoryService + Send + Clone + 'static,
     RN: RootNodes + Clone + 'static,
 {
     type Handle = u64;
@@ -345,13 +354,17 @@ where
     ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> {
         debug!("lookup");
 
+        // convert the CStr to a PathComponent
+        // If it can't be converted, we definitely don't have anything here.
+        let name: PathComponent = name.try_into().map_err(|_| std::io::ErrorKind::NotFound)?;
+
         // This goes from a parent inode to a node.
         // - If the parent is [ROOT_ID], we need to check
         //   [self.root_nodes] (fetching from a [RootNode] provider if needed)
         // - Otherwise, lookup the parent in [self.inode_tracker] (which must be
         //   a [InodeData::Directory]), and find the child with that name.
         if parent == ROOT_ID {
-            let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?;
+            let (ino, inode_data) = self.name_in_root_to_ino_and_data(&name)?;
 
             debug!(inode_data=?&inode_data, ino=ino, "Some");
             return Ok(inode_data.as_fuse_entry(ino));
@@ -364,7 +377,7 @@ where
         // Search for that name in the list of children and return the FileAttrs.
 
         // in the children, find the one with the desired name.
-        if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) {
+        if let Some((child_ino, _, _)) = children.iter().find(|(_, n, _)| n == &name) {
             // lookup the child [InodeData] in [self.inode_tracker].
             // We know the inodes for children have already been allocated.
             let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap();
@@ -400,8 +413,8 @@ where
             self.tokio_handle.spawn(
                 async move {
                     let mut stream = root_nodes_provider.list().enumerate();
-                    while let Some(node) = stream.next().await {
-                        if tx.send(node).await.is_err() {
+                    while let Some(e) = stream.next().await {
+                        if tx.send(e).await.is_err() {
                             // If we get a send error, it means the sync code
                             // doesn't want any more entries.
                             break;
@@ -463,12 +476,12 @@ where
                 .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
 
             while let Some((i, n)) = rx.blocking_recv() {
-                let root_node = n.map_err(|e| {
+                let (name, node) = n.map_err(|e| {
                     warn!("failed to retrieve root node: {}", e);
                     io::Error::from_raw_os_error(libc::EIO)
                 })?;
 
-                let (inode_data, name) = InodeData::from_node(root_node);
+                let inode_data = InodeData::from_node(&node);
 
                 // obtain the inode, or allocate a new one.
                 let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
@@ -483,7 +496,7 @@ where
                     ino,
                     offset: offset + (i as u64) + 1,
                     type_: inode_data.as_fuse_type(),
-                    name: &name,
+                    name: name.as_ref(),
                 })?;
                 // If the buffer is full, add_entry will return `Ok(0)`.
                 if written == 0 {
@@ -497,15 +510,17 @@ where
         let (parent_digest, children) = self.get_directory_children(inode)?;
         Span::current().record("directory.digest", parent_digest.to_string());
 
-        for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
-            let (inode_data, name) = InodeData::from_node(child_node);
+        for (i, (ino, child_name, child_node)) in
+            children.into_iter().skip(offset as usize).enumerate()
+        {
+            let inode_data = InodeData::from_node(&child_node);
 
             // the second parameter will become the "offset" parameter on the next call.
             let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
                 ino,
                 offset: offset + (i as u64) + 1,
                 type_: inode_data.as_fuse_type(),
-                name: &name,
+                name: child_name.as_ref(),
             })?;
             // If the buffer is full, add_entry will return `Ok(0)`.
             if written == 0 {
@@ -550,12 +565,12 @@ where
                 .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
 
             while let Some((i, n)) = rx.blocking_recv() {
-                let root_node = n.map_err(|e| {
+                let (name, node) = n.map_err(|e| {
                     warn!("failed to retrieve root node: {}", e);
                     io::Error::from_raw_os_error(libc::EPERM)
                 })?;
 
-                let (inode_data, name) = InodeData::from_node(root_node);
+                let inode_data = InodeData::from_node(&node);
 
                 // obtain the inode, or allocate a new one.
                 let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
@@ -571,7 +586,7 @@ where
                         ino,
                         offset: offset + (i as u64) + 1,
                         type_: inode_data.as_fuse_type(),
-                        name: &name,
+                        name: name.as_ref(),
                     },
                     inode_data.as_fuse_entry(ino),
                 )?;
@@ -587,8 +602,8 @@ where
         let (parent_digest, children) = self.get_directory_children(inode)?;
         Span::current().record("directory.digest", parent_digest.to_string());
 
-        for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
-            let (inode_data, name) = InodeData::from_node(child_node);
+        for (i, (ino, name, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
+            let inode_data = InodeData::from_node(&child_node);
 
             // the second parameter will become the "offset" parameter on the next call.
             let written = add_entry(
@@ -596,7 +611,7 @@ where
                     ino,
                     offset: offset + (i as u64) + 1,
                     type_: inode_data.as_fuse_type(),
-                    name: &name,
+                    name: name.as_ref(),
                 },
                 inode_data.as_fuse_entry(ino),
             )?;
@@ -641,6 +656,7 @@ where
     ) -> io::Result<(
         Option<Self::Handle>,
         fuse_backend_rs::api::filesystem::OpenOptions,
+        Option<u32>,
     )> {
         if inode == ROOT_ID {
             return Err(io::Error::from_raw_os_error(libc::ENOSYS));
@@ -659,7 +675,7 @@ where
                 match self.tokio_handle.block_on({
                     let blob_service = self.blob_service.clone();
                     let blob_digest = blob_digest.clone();
-                    async move { blob_service.as_ref().open_read(&blob_digest).await }
+                    async move { blob_service.open_read(&blob_digest).await }
                 }) {
                     Ok(None) => {
                         warn!("blob not found");
@@ -684,6 +700,7 @@ where
                         Ok((
                             Some(fh),
                             fuse_backend_rs::api::filesystem::OpenOptions::empty(),
+                            None,
                         ))
                     }
                 }
diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs
index 6609e049a1fc..5ed1a4d8d6c0 100644
--- a/tvix/castore/src/fs/root_nodes.rs
+++ b/tvix/castore/src/fs/root_nodes.rs
@@ -1,7 +1,6 @@
 use std::collections::BTreeMap;
 
-use crate::{proto::node::Node, Error};
-use bytes::Bytes;
+use crate::{path::PathComponent, Error, Node};
 use futures::stream::BoxStream;
 use tonic::async_trait;
 
@@ -11,11 +10,12 @@ use tonic::async_trait;
 pub trait RootNodes: Send + Sync {
     /// Looks up a root CA node based on the basename of the node in the root
     /// directory of the filesystem.
-    async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>;
+    async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error>;
 
-    /// Lists all root CA nodes in the filesystem. An error can be returned
-    /// in case listing is not allowed
-    fn list(&self) -> BoxStream<Result<Node, Error>>;
+    /// Lists all root CA nodes in the filesystem, as a tuple of (base)name
+    /// and Node.
+    /// An error can be returned in case listing is not allowed.
+    fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>>;
 }
 
 #[async_trait]
@@ -23,15 +23,17 @@ pub trait RootNodes: Send + Sync {
 /// the key is the node name.
 impl<T> RootNodes for T
 where
-    T: AsRef<BTreeMap<Bytes, Node>> + Send + Sync,
+    T: AsRef<BTreeMap<PathComponent, Node>> + Send + Sync,
 {
-    async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> {
+    async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error> {
         Ok(self.as_ref().get(name).cloned())
     }
 
-    fn list(&self) -> BoxStream<Result<Node, Error>> {
+    fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>> {
         Box::pin(tokio_stream::iter(
-            self.as_ref().iter().map(|(_, v)| Ok(v.clone())),
+            self.as_ref()
+                .iter()
+                .map(|(name, node)| Ok((name.to_owned(), node.to_owned()))),
         ))
     }
 }
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
index cd5b1290e031..4cbff687b864 100644
--- a/tvix/castore/src/import/archive.rs
+++ b/tvix/castore/src/import/archive.rs
@@ -13,7 +13,7 @@ use tracing::{instrument, warn, Level};
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
 use crate::import::{ingest_entries, IngestionEntry, IngestionError};
-use crate::proto::node::Node;
+use crate::Node;
 
 use super::blobs::{self, ConcurrentBlobUploader};
 
@@ -292,44 +292,43 @@ impl IngestionEntryGraph {
 
 #[cfg(test)]
 mod test {
-    use crate::import::IngestionEntry;
-    use crate::B3Digest;
+    use std::sync::LazyLock;
 
     use super::{Error, IngestionEntryGraph};
+    use crate::import::IngestionEntry;
+    use crate::B3Digest;
 
-    use lazy_static::lazy_static;
     use rstest::rstest;
 
-    lazy_static! {
-        pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into();
-        pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir {
-            path: "a".parse().unwrap()
-        };
-        pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir {
-            path: "b".parse().unwrap()
-        };
-        pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir {
-            path: "a/b".parse().unwrap()
-        };
-        pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular {
-            path: "a".parse().unwrap(),
-            size: 0,
-            executable: false,
-            digest: EMPTY_DIGEST.clone(),
-        };
-        pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular {
-            path: "a/b".parse().unwrap(),
-            size: 0,
-            executable: false,
-            digest: EMPTY_DIGEST.clone(),
-        };
-        pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular {
-            path: "a/b/c".parse().unwrap(),
-            size: 0,
-            executable: false,
-            digest: EMPTY_DIGEST.clone(),
-        };
-    }
+    pub static EMPTY_DIGEST: LazyLock<B3Digest> =
+        LazyLock::new(|| blake3::hash(&[]).as_bytes().into());
+    pub static DIR_A: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir {
+        path: "a".parse().unwrap(),
+    });
+    pub static DIR_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir {
+        path: "b".parse().unwrap(),
+    });
+    pub static DIR_A_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Dir {
+        path: "a/b".parse().unwrap(),
+    });
+    pub static FILE_A: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular {
+        path: "a".parse().unwrap(),
+        size: 0,
+        executable: false,
+        digest: EMPTY_DIGEST.clone(),
+    });
+    pub static FILE_A_B: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular {
+        path: "a/b".parse().unwrap(),
+        size: 0,
+        executable: false,
+        digest: EMPTY_DIGEST.clone(),
+    });
+    pub static FILE_A_B_C: LazyLock<IngestionEntry> = LazyLock::new(|| IngestionEntry::Regular {
+        path: "a/b/c".parse().unwrap(),
+        size: 0,
+        executable: false,
+        digest: EMPTY_DIGEST.clone(),
+    });
 
     #[rstest]
     #[case::implicit_directories(&[&*FILE_A_B_C], &[&*FILE_A_B_C, &*DIR_A_B, &*DIR_A])]
diff --git a/tvix/castore/src/import/blobs.rs b/tvix/castore/src/import/blobs.rs
index 8135d871d6c0..f71ee1e63768 100644
--- a/tvix/castore/src/import/blobs.rs
+++ b/tvix/castore/src/import/blobs.rs
@@ -28,6 +28,9 @@ pub enum Error {
     #[error("unable to read blob contents for {0}: {1}")]
     BlobRead(PathBuf, std::io::Error),
 
+    #[error("unable to check whether blob at {0} already exists: {1}")]
+    BlobCheck(PathBuf, std::io::Error),
+
     // FUTUREWORK: proper error for blob finalize
     #[error("unable to finalize blob {0}: {1}")]
     BlobFinalize(PathBuf, std::io::Error),
@@ -118,6 +121,16 @@ where
                 let path = path.to_owned();
                 let r = Cursor::new(buffer);
                 async move {
+                    // We know the blob digest already, check it exists before sending it.
+                    if blob_service
+                        .has(&expected_digest)
+                        .await
+                        .map_err(|e| Error::BlobCheck(path.clone(), e))?
+                    {
+                        drop(permit);
+                        return Ok(());
+                    }
+
                     let digest = upload_blob(&blob_service, &path, expected_size, r).await?;
 
                     assert_eq!(digest, expected_digest, "Tvix bug: blob digest mismatch");
diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs
index dc7821b8101e..78eac6f6128d 100644
--- a/tvix/castore/src/import/fs.rs
+++ b/tvix/castore/src/import/fs.rs
@@ -8,7 +8,9 @@ use std::os::unix::fs::MetadataExt;
 use std::os::unix::fs::PermissionsExt;
 use tokio::io::BufReader;
 use tokio_util::io::InspectReader;
+use tracing::info_span;
 use tracing::instrument;
+use tracing::Instrument;
 use tracing::Span;
 use tracing_indicatif::span_ext::IndicatifSpanExt;
 use walkdir::DirEntry;
@@ -16,8 +18,8 @@ use walkdir::WalkDir;
 
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
-use crate::proto::node::Node;
-use crate::B3Digest;
+use crate::refscan::{ReferenceReader, ReferenceScanner};
+use crate::{B3Digest, Node};
 
 use super::ingest_entries;
 use super::IngestionEntry;
@@ -30,20 +32,24 @@ use super::IngestionError;
 ///
 /// This function will walk the filesystem using `walkdir` and will consume
 /// `O(#number of entries)` space.
-#[instrument(skip(blob_service, directory_service), fields(path, indicatif.pb_show=1), err)]
-pub async fn ingest_path<BS, DS, P>(
+#[instrument(
+    skip(blob_service, directory_service, reference_scanner),
+    fields(path),
+    err
+)]
+pub async fn ingest_path<BS, DS, P, P2>(
     blob_service: BS,
     directory_service: DS,
     path: P,
+    reference_scanner: Option<&ReferenceScanner<P2>>,
 ) -> Result<Node, IngestionError<Error>>
 where
     P: AsRef<std::path::Path> + std::fmt::Debug,
     BS: BlobService + Clone,
     DS: DirectoryService,
+    P2: AsRef<[u8]> + Send + Sync,
 {
     let span = Span::current();
-    span.pb_set_message(&format!("Ingesting {:?}", path));
-    span.pb_start();
 
     let iter = WalkDir::new(path.as_ref())
         .follow_links(false)
@@ -51,7 +57,8 @@ where
         .contents_first(true)
         .into_iter();
 
-    let entries = dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref());
+    let entries =
+        dir_entries_to_ingestion_stream(blob_service, iter, path.as_ref(), reference_scanner);
     ingest_entries(
         directory_service,
         entries.inspect({
@@ -72,14 +79,16 @@ where
 /// The produced stream is buffered, so uploads can happen concurrently.
 ///
 /// The root is the [Path] in the filesystem that is being ingested into the castore.
-pub fn dir_entries_to_ingestion_stream<'a, BS, I>(
+pub fn dir_entries_to_ingestion_stream<'a, BS, I, P>(
     blob_service: BS,
     iter: I,
     root: &'a std::path::Path,
+    reference_scanner: Option<&'a ReferenceScanner<P>>,
 ) -> BoxStream<'a, Result<IngestionEntry, Error>>
 where
     BS: BlobService + Clone + 'a,
     I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a,
+    P: AsRef<[u8]> + Send + Sync,
 {
     let prefix = root.parent().unwrap_or_else(|| std::path::Path::new(""));
 
@@ -90,7 +99,13 @@ where
                 async move {
                     match x {
                         Ok(dir_entry) => {
-                            dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await
+                            dir_entry_to_ingestion_entry(
+                                blob_service,
+                                &dir_entry,
+                                prefix,
+                                reference_scanner,
+                            )
+                            .await
                         }
                         Err(e) => Err(Error::Stat(
                             prefix.to_path_buf(),
@@ -108,13 +123,15 @@ where
 ///
 /// The prefix path is stripped from the path of each entry. This is usually the parent path
 /// of the path being ingested so that the last element of the stream only has one component.
-pub async fn dir_entry_to_ingestion_entry<BS>(
+pub async fn dir_entry_to_ingestion_entry<BS, P>(
     blob_service: BS,
     entry: &DirEntry,
     prefix: &std::path::Path,
+    reference_scanner: Option<&ReferenceScanner<P>>,
 ) -> Result<IngestionEntry, Error>
 where
     BS: BlobService,
+    P: AsRef<[u8]>,
 {
     let file_type = entry.file_type();
 
@@ -135,13 +152,25 @@ where
             .into_os_string()
             .into_vec();
 
+        if let Some(reference_scanner) = &reference_scanner {
+            reference_scanner.scan(&target);
+        }
+
         Ok(IngestionEntry::Symlink { path, target })
     } else if file_type.is_file() {
         let metadata = entry
             .metadata()
             .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?;
 
-        let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?;
+        let digest = upload_blob(blob_service, entry.path().to_path_buf(), reference_scanner)
+            .instrument({
+                let span = info_span!("upload_blob", "indicatif.pb_show" = tracing::field::Empty);
+                span.pb_set_message(&format!("Uploading blob for {:?}", fs_path));
+                span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
+
+                span
+            })
+            .await?;
 
         Ok(IngestionEntry::Regular {
             path,
@@ -157,17 +186,17 @@ where
 }
 
 /// Uploads the file at the provided [Path] the the [BlobService].
-#[instrument(skip(blob_service), fields(path, indicatif.pb_show=1), err)]
-async fn upload_blob<BS>(
+#[instrument(skip(blob_service, reference_scanner), fields(path), err)]
+async fn upload_blob<BS, P>(
     blob_service: BS,
     path: impl AsRef<std::path::Path>,
+    reference_scanner: Option<&ReferenceScanner<P>>,
 ) -> Result<B3Digest, Error>
 where
     BS: BlobService,
+    P: AsRef<[u8]>,
 {
     let span = Span::current();
-    span.pb_set_style(&tvix_tracing::PB_TRANSFER_STYLE);
-    span.pb_set_message(&format!("Uploading blob for {:?}", path.as_ref()));
     span.pb_start();
 
     let file = tokio::fs::File::open(path.as_ref())
@@ -185,9 +214,16 @@ where
     });
 
     let mut writer = blob_service.open_write().await;
-    tokio::io::copy(&mut BufReader::new(reader), &mut writer)
-        .await
-        .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
+    if let Some(reference_scanner) = reference_scanner {
+        let mut reader = ReferenceReader::new(reference_scanner, BufReader::new(reader));
+        tokio::io::copy(&mut reader, &mut writer)
+            .await
+            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
+    } else {
+        tokio::io::copy(&mut BufReader::new(reader), &mut writer)
+            .await
+            .map_err(|e| Error::BlobRead(path.as_ref().to_path_buf(), e))?;
+    }
 
     let digest = writer
         .close()
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs
index a9ac0be6b064..6e10a64939a4 100644
--- a/tvix/castore/src/import/mod.rs
+++ b/tvix/castore/src/import/mod.rs
@@ -4,15 +4,9 @@
 //! Specific implementations, such as ingesting from the filesystem, live in
 //! child modules.
 
-use crate::directoryservice::DirectoryPutter;
-use crate::directoryservice::DirectoryService;
+use crate::directoryservice::{DirectoryPutter, DirectoryService};
 use crate::path::{Path, PathBuf};
-use crate::proto::node::Node;
-use crate::proto::Directory;
-use crate::proto::DirectoryNode;
-use crate::proto::FileNode;
-use crate::proto::SymlinkNode;
-use crate::B3Digest;
+use crate::{B3Digest, Directory, Node, SymlinkTargetError};
 use futures::{Stream, StreamExt};
 use tracing::Level;
 
@@ -65,14 +59,6 @@ where
             // we break the loop manually.
             .expect("Tvix bug: unexpected end of stream")?;
 
-        let name = entry
-            .path()
-            .file_name()
-            // If this is the root node, it will have an empty name.
-            .unwrap_or_default()
-            .to_owned()
-            .into();
-
         let node = match &mut entry {
             IngestionEntry::Dir { .. } => {
                 // If the entry is a directory, we traversed all its children (and
@@ -98,27 +84,31 @@ where
                         IngestionError::UploadDirectoryError(entry.path().to_owned(), e)
                     })?;
 
-                Node::Directory(DirectoryNode {
-                    name,
-                    digest: directory_digest.into(),
+                Node::Directory {
+                    digest: directory_digest,
                     size: directory_size,
-                })
+                }
             }
-            IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode {
-                name,
-                target: target.to_owned().into(),
-            }),
+            IngestionEntry::Symlink { ref target, .. } => Node::Symlink {
+                target: bytes::Bytes::copy_from_slice(target).try_into().map_err(
+                    |e: SymlinkTargetError| {
+                        IngestionError::UploadDirectoryError(
+                            entry.path().to_owned(),
+                            crate::Error::StorageError(format!("invalid symlink target: {}", e)),
+                        )
+                    },
+                )?,
+            },
             IngestionEntry::Regular {
                 size,
                 executable,
                 digest,
                 ..
-            } => Node::File(FileNode {
-                name,
-                digest: digest.to_owned().into(),
+            } => Node::File {
+                digest: digest.clone(),
                 size: *size,
                 executable: *executable,
-            }),
+            },
         };
 
         let parent = entry
@@ -129,8 +119,24 @@ where
         if parent == crate::Path::ROOT {
             break node;
         } else {
+            let name = entry
+                .path()
+                .file_name()
+                // If this is the root node, it will have an empty name.
+                .unwrap_or_else(|| "".try_into().unwrap())
+                .to_owned();
+
             // record node in parent directory, creating a new [Directory] if not there yet.
-            directories.entry(parent.to_owned()).or_default().add(node);
+            directories
+                .entry(parent.to_owned())
+                .or_default()
+                .add(name, node)
+                .map_err(|e| {
+                    IngestionError::UploadDirectoryError(
+                        entry.path().to_owned(),
+                        crate::Error::StorageError(e.to_string()),
+                    )
+                })?;
         }
     };
 
@@ -155,15 +161,8 @@ where
 
         #[cfg(debug_assertions)]
         {
-            if let Node::Directory(directory_node) = &root_node {
-                debug_assert_eq!(
-                    root_directory_digest,
-                    directory_node
-                        .digest
-                        .to_vec()
-                        .try_into()
-                        .expect("invalid digest len")
-                )
+            if let Node::Directory { digest, .. } = &root_node {
+                debug_assert_eq!(&root_directory_digest, digest);
             } else {
                 unreachable!("Tvix bug: directory putter initialized but no root directory node");
             }
@@ -209,9 +208,8 @@ mod test {
     use rstest::rstest;
 
     use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST};
-    use crate::proto::node::Node;
-    use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode};
     use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST};
+    use crate::{Directory, Node};
 
     use super::ingest_entries;
     use super::IngestionEntry;
@@ -223,18 +221,18 @@ mod test {
         executable: true,
         digest: DUMMY_DIGEST.clone(),
     }],
-        Node::File(FileNode { name: "foo".into(), digest: DUMMY_DIGEST.clone().into(), size: 42, executable: true }
-    ))]
+        Node::File{digest: DUMMY_DIGEST.clone(), size: 42, executable: true}
+    )]
     #[case::single_symlink(vec![IngestionEntry::Symlink {
         path: "foo".parse().unwrap(),
         target: b"blub".into(),
     }],
-        Node::Symlink(SymlinkNode { name: "foo".into(), target: "blub".into()})
+        Node::Symlink{target: "blub".try_into().unwrap()}
     )]
     #[case::single_dir(vec![IngestionEntry::Dir {
         path: "foo".parse().unwrap(),
     }],
-        Node::Directory(DirectoryNode { name: "foo".into(), digest: Directory::default().digest().into(), size: Directory::default().size()})
+        Node::Directory{digest: Directory::default().digest(), size: Directory::default().size()}
     )]
     #[case::dir_with_keep(vec![
         IngestionEntry::Regular {
@@ -247,7 +245,7 @@ mod test {
             path: "foo".parse().unwrap(),
         },
     ],
-        Node::Directory(DirectoryNode { name: "foo".into(), digest: DIRECTORY_WITH_KEEP.digest().into(), size: DIRECTORY_WITH_KEEP.size() })
+        Node::Directory{ digest: DIRECTORY_WITH_KEEP.digest(), size: DIRECTORY_WITH_KEEP.size()}
     )]
     /// This is intentionally a bit unsorted, though it still satisfies all
     /// requirements we have on the order of elements in the stream.
@@ -275,7 +273,7 @@ mod test {
             path: "blub".parse().unwrap(),
         },
     ],
-        Node::Directory(DirectoryNode { name: "blub".into(), digest: DIRECTORY_COMPLICATED.digest().into(), size:DIRECTORY_COMPLICATED.size() })
+    Node::Directory{ digest: DIRECTORY_COMPLICATED.digest(), size: DIRECTORY_COMPLICATED.size() }
     )]
     #[tokio::test]
     async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) {
diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs
index 4fca9801c97b..93d06fd4582d 100644
--- a/tvix/castore/src/lib.rs
+++ b/tvix/castore/src/lib.rs
@@ -6,19 +6,23 @@ pub mod blobservice;
 pub mod composition;
 pub mod directoryservice;
 pub mod fixtures;
+pub mod refscan;
 
 #[cfg(feature = "fs")]
 pub mod fs;
 
+mod nodes;
+pub use nodes::*;
+
 mod path;
-pub use path::{Path, PathBuf};
+pub use path::{Path, PathBuf, PathComponent, PathComponentError};
 
 pub mod import;
 pub mod proto;
 pub mod tonic;
 
 pub use digests::{B3Digest, B3_LEN};
-pub use errors::Error;
+pub use errors::{DirectoryError, Error, ValidateNodeError};
 pub use hashing_reader::{B3HashingReader, HashingReader};
 
 #[cfg(test)]
diff --git a/tvix/castore/src/nodes/directory.rs b/tvix/castore/src/nodes/directory.rs
new file mode 100644
index 000000000000..f80e055dde80
--- /dev/null
+++ b/tvix/castore/src/nodes/directory.rs
@@ -0,0 +1,287 @@
+use std::collections::btree_map::{self, BTreeMap};
+
+use crate::{errors::DirectoryError, path::PathComponent, proto, B3Digest, Node};
+
+/// A Directory contains nodes, which can be Directory, File or Symlink nodes.
+/// It attaches names to these nodes, which is the basename in that directory.
+/// These names:
+///  - MUST not contain slashes or null bytes
+///  - MUST not be '.' or '..'
+///  - MUST be unique across all three lists
+#[derive(Default, Debug, Clone, PartialEq, Eq)]
+pub struct Directory {
+    nodes: BTreeMap<PathComponent, Node>,
+}
+
+impl Directory {
+    /// Constructs a new, empty Directory.
+    pub fn new() -> Self {
+        Directory {
+            nodes: BTreeMap::new(),
+        }
+    }
+
+    /// Construct a [Directory] from tuples of name and [Node].
+    ///
+    /// Inserting multiple elements with the same name will yield an error, as
+    /// well as exceeding the maximum size.
+    pub fn try_from_iter<T: IntoIterator<Item = (PathComponent, Node)>>(
+        iter: T,
+    ) -> Result<Directory, DirectoryError> {
+        let mut nodes = BTreeMap::new();
+
+        iter.into_iter().try_fold(0u64, |size, (name, node)| {
+            check_insert_node(size, &mut nodes, name, node)
+        })?;
+
+        Ok(Self { nodes })
+    }
+
+    /// The size of a directory is the number of all regular and symlink elements,
+    /// the number of directory elements, and their size fields.
+    pub fn size(&self) -> u64 {
+        // It's impossible to create a Directory where the size overflows, because we
+        // check before every add() that the size won't overflow.
+        (self.nodes.len() as u64)
+            + self
+                .nodes()
+                .map(|(_name, n)| match n {
+                    Node::Directory { size, .. } => 1 + size,
+                    Node::File { .. } | Node::Symlink { .. } => 1,
+                })
+                .sum::<u64>()
+    }
+
+    /// Calculates the digest of a Directory, which is the blake3 hash of a
+    /// Directory protobuf message, serialized in protobuf canonical form.
+    pub fn digest(&self) -> B3Digest {
+        proto::Directory::from(self.clone()).digest()
+    }
+
+    /// Allows iterating over all nodes (directories, files and symlinks)
+    /// For each, it returns a tuple of its name and node.
+    /// The elements are sorted by their names.
+    pub fn nodes(&self) -> impl Iterator<Item = (&PathComponent, &Node)> + Send + Sync + '_ {
+        self.nodes.iter()
+    }
+
+    /// Dissolves a Directory into its individual names and nodes.
+    /// The elements are sorted by their names.
+    pub fn into_nodes(self) -> impl Iterator<Item = (PathComponent, Node)> + Send + Sync {
+        self.nodes.into_iter()
+    }
+
+    /// Adds the specified [Node] to the [Directory] with a given name.
+    ///
+    /// Inserting a node that already exists with the same name in the directory
+    /// will yield an error, as well as exceeding the maximum size.
+    ///
+    /// In case you want to construct a [Directory] from multiple elements, use
+    /// [from_iter] instead.
+    pub fn add(&mut self, name: PathComponent, node: Node) -> Result<(), DirectoryError> {
+        check_insert_node(self.size(), &mut self.nodes, name, node)?;
+        Ok(())
+    }
+}
+
+fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> {
+    iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i))
+}
+
+/// Helper function dealing with inserting nodes into the nodes [BTreeMap],
+/// after ensuring the new size doesn't overlow and the key doesn't exist already.
+///
+/// Returns the new total size, or an error.
+fn check_insert_node(
+    current_size: u64,
+    nodes: &mut BTreeMap<PathComponent, Node>,
+    name: PathComponent,
+    node: Node,
+) -> Result<u64, DirectoryError> {
+    // Check that the even after adding this new directory entry, the size calculation will not
+    // overflow
+    let new_size = checked_sum([
+        current_size,
+        1,
+        match node {
+            Node::Directory { size, .. } => size,
+            _ => 0,
+        },
+    ])
+    .ok_or(DirectoryError::SizeOverflow)?;
+
+    match nodes.entry(name) {
+        btree_map::Entry::Vacant(e) => {
+            e.insert(node);
+        }
+        btree_map::Entry::Occupied(occupied) => {
+            return Err(DirectoryError::DuplicateName(occupied.key().to_owned()))
+        }
+    }
+
+    Ok(new_size)
+}
+
+#[cfg(test)]
+mod test {
+    use super::{Directory, Node};
+    use crate::fixtures::DUMMY_DIGEST;
+    use crate::{DirectoryError, PathComponent};
+
+    #[test]
+    fn from_iter_single() {
+        Directory::try_from_iter([(
+            PathComponent::try_from("b").unwrap(),
+            Node::Directory {
+                digest: DUMMY_DIGEST.clone(),
+                size: 1,
+            },
+        )])
+        .unwrap();
+    }
+
+    #[test]
+    fn from_iter_multiple() {
+        let d = Directory::try_from_iter([
+            (
+                "b".try_into().unwrap(),
+                Node::Directory {
+                    digest: DUMMY_DIGEST.clone(),
+                    size: 1,
+                },
+            ),
+            (
+                "a".try_into().unwrap(),
+                Node::Directory {
+                    digest: DUMMY_DIGEST.clone(),
+                    size: 1,
+                },
+            ),
+            (
+                "z".try_into().unwrap(),
+                Node::Directory {
+                    digest: DUMMY_DIGEST.clone(),
+                    size: 1,
+                },
+            ),
+            (
+                "f".try_into().unwrap(),
+                Node::File {
+                    digest: DUMMY_DIGEST.clone(),
+                    size: 1,
+                    executable: true,
+                },
+            ),
+            (
+                "c".try_into().unwrap(),
+                Node::File {
+                    digest: DUMMY_DIGEST.clone(),
+                    size: 1,
+                    executable: true,
+                },
+            ),
+            (
+                "g".try_into().unwrap(),
+                Node::File {
+                    digest: DUMMY_DIGEST.clone(),
+                    size: 1,
+                    executable: true,
+                },
+            ),
+            (
+                "t".try_into().unwrap(),
+                Node::Symlink {
+                    target: "a".try_into().unwrap(),
+                },
+            ),
+            (
+                "o".try_into().unwrap(),
+                Node::Symlink {
+                    target: "a".try_into().unwrap(),
+                },
+            ),
+            (
+                "e".try_into().unwrap(),
+                Node::Symlink {
+                    target: "a".try_into().unwrap(),
+                },
+            ),
+        ])
+        .unwrap();
+
+        // Convert to proto struct and back to ensure we are not generating any invalid structures
+        crate::Directory::try_from(crate::proto::Directory::from(d))
+            .expect("directory should be valid");
+    }
+
+    #[test]
+    fn add_nodes_to_directory() {
+        let mut d = Directory::new();
+
+        d.add(
+            "b".try_into().unwrap(),
+            Node::Directory {
+                digest: DUMMY_DIGEST.clone(),
+                size: 1,
+            },
+        )
+        .unwrap();
+        d.add(
+            "a".try_into().unwrap(),
+            Node::Directory {
+                digest: DUMMY_DIGEST.clone(),
+                size: 1,
+            },
+        )
+        .unwrap();
+
+        // Convert to proto struct and back to ensure we are not generating any invalid structures
+        crate::Directory::try_from(crate::proto::Directory::from(d))
+            .expect("directory should be valid");
+    }
+
+    #[test]
+    fn validate_overflow() {
+        let mut d = Directory::new();
+
+        assert_eq!(
+            d.add(
+                "foo".try_into().unwrap(),
+                Node::Directory {
+                    digest: DUMMY_DIGEST.clone(),
+                    size: u64::MAX
+                }
+            ),
+            Err(DirectoryError::SizeOverflow)
+        );
+    }
+
+    #[test]
+    fn add_duplicate_node_to_directory() {
+        let mut d = Directory::new();
+
+        d.add(
+            "a".try_into().unwrap(),
+            Node::Directory {
+                digest: DUMMY_DIGEST.clone(),
+                size: 1,
+            },
+        )
+        .unwrap();
+        assert_eq!(
+            format!(
+                "{}",
+                d.add(
+                    "a".try_into().unwrap(),
+                    Node::File {
+                        digest: DUMMY_DIGEST.clone(),
+                        size: 1,
+                        executable: true
+                    }
+                )
+                .expect_err("adding duplicate dir entry must fail")
+            ),
+            "\"a\" is a duplicate name"
+        );
+    }
+}
diff --git a/tvix/castore/src/nodes/mod.rs b/tvix/castore/src/nodes/mod.rs
new file mode 100644
index 000000000000..ac7aa1e666df
--- /dev/null
+++ b/tvix/castore/src/nodes/mod.rs
@@ -0,0 +1,48 @@
+//! This holds types describing nodes in the tvix-castore model.
+mod directory;
+mod symlink_target;
+
+use crate::B3Digest;
+pub use directory::Directory;
+pub use symlink_target::{SymlinkTarget, SymlinkTargetError};
+
+/// A Node is either a [DirectoryNode], [FileNode] or [SymlinkNode].
+/// Nodes themselves don't have names, what gives them names is either them
+/// being inside a [Directory], or a root node with its own name attached to it.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Node {
+    /// A DirectoryNode is a pointer to a [Directory], by its [Directory::digest].
+    /// It also records a`size`.
+    /// Such a node is either an element in the [Directory] it itself is contained in,
+    /// or a standalone root node.
+    Directory {
+        /// The blake3 hash of a Directory message, serialized in protobuf canonical form.
+        digest: B3Digest,
+        /// Number of child elements in the Directory referred to by `digest`.
+        /// Calculated by summing up the numbers of nodes, and for each directory,
+        /// its size field. Can be used for inode allocation.
+        /// This field is precisely as verifiable as any other Merkle tree edge.
+        /// Resolve `digest`, and you can compute it incrementally. Resolve the entire
+        /// tree, and you can fully compute it from scratch.
+        /// A credulous implementation won't reject an excessive size, but this is
+        /// harmless: you'll have some ordinals without nodes. Undersizing is obvious
+        /// and easy to reject: you won't have an ordinal for some nodes.
+        size: u64,
+    },
+    /// A FileNode represents a regular or executable file in a Directory or at the root.
+    File {
+        /// The blake3 digest of the file contents
+        digest: B3Digest,
+
+        /// The file content size
+        size: u64,
+
+        /// Whether the file is executable
+        executable: bool,
+    },
+    /// A SymlinkNode represents a symbolic link in a Directory or at the root.
+    Symlink {
+        /// The target of the symlink.
+        target: SymlinkTarget,
+    },
+}
diff --git a/tvix/castore/src/nodes/symlink_target.rs b/tvix/castore/src/nodes/symlink_target.rs
new file mode 100644
index 000000000000..e9a1a0bd05c2
--- /dev/null
+++ b/tvix/castore/src/nodes/symlink_target.rs
@@ -0,0 +1,223 @@
+use bstr::ByteSlice;
+use std::fmt::{self, Debug, Display};
+
+/// A wrapper type for symlink targets.
+/// Internally uses a [bytes::Bytes], but disallows empty targets and those
+/// containing null bytes.
+#[repr(transparent)]
+#[derive(Clone, PartialEq, Eq)]
+pub struct SymlinkTarget {
+    inner: bytes::Bytes,
+}
+
+/// The maximum length a symlink target can have.
+/// Linux allows 4095 bytes here.
+pub const MAX_TARGET_LEN: usize = 4095;
+
+impl AsRef<[u8]> for SymlinkTarget {
+    fn as_ref(&self) -> &[u8] {
+        self.inner.as_ref()
+    }
+}
+
+impl From<SymlinkTarget> for bytes::Bytes {
+    fn from(value: SymlinkTarget) -> Self {
+        value.inner
+    }
+}
+
+fn validate_symlink_target<B: AsRef<[u8]>>(symlink_target: B) -> Result<B, SymlinkTargetError> {
+    let v = symlink_target.as_ref();
+
+    if v.is_empty() {
+        return Err(SymlinkTargetError::Empty);
+    }
+    if v.len() > MAX_TARGET_LEN {
+        return Err(SymlinkTargetError::TooLong);
+    }
+    if v.contains(&0x00) {
+        return Err(SymlinkTargetError::Null);
+    }
+
+    Ok(symlink_target)
+}
+
+impl TryFrom<bytes::Bytes> for SymlinkTarget {
+    type Error = SymlinkTargetError;
+
+    fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> {
+        if let Err(e) = validate_symlink_target(&value) {
+            return Err(SymlinkTargetError::Convert(value, Box::new(e)));
+        }
+
+        Ok(Self { inner: value })
+    }
+}
+
+impl TryFrom<&'static [u8]> for SymlinkTarget {
+    type Error = SymlinkTargetError;
+
+    fn try_from(value: &'static [u8]) -> Result<Self, Self::Error> {
+        if let Err(e) = validate_symlink_target(&value) {
+            return Err(SymlinkTargetError::Convert(value.into(), Box::new(e)));
+        }
+
+        Ok(Self {
+            inner: bytes::Bytes::from_static(value),
+        })
+    }
+}
+
+impl TryFrom<&str> for SymlinkTarget {
+    type Error = SymlinkTargetError;
+
+    fn try_from(value: &str) -> Result<Self, Self::Error> {
+        if let Err(e) = validate_symlink_target(value) {
+            return Err(SymlinkTargetError::Convert(
+                value.to_owned().into(),
+                Box::new(e),
+            ));
+        }
+
+        Ok(Self {
+            inner: bytes::Bytes::copy_from_slice(value.as_bytes()),
+        })
+    }
+}
+
+impl Debug for SymlinkTarget {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        Debug::fmt(self.inner.as_bstr(), f)
+    }
+}
+
+impl Display for SymlinkTarget {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        Display::fmt(self.inner.as_bstr(), f)
+    }
+}
+
+/// Errors created when constructing / converting to [SymlinkTarget].
+#[derive(Debug, PartialEq, Eq, thiserror::Error)]
+#[cfg_attr(test, derive(Clone))]
+pub enum SymlinkTargetError {
+    #[error("cannot be empty")]
+    Empty,
+    #[error("cannot contain null bytes")]
+    Null,
+    #[error("cannot be over {} bytes long", MAX_TARGET_LEN)]
+    TooLong,
+    #[error("unable to convert '{:?}", .0.as_bstr())]
+    Convert(bytes::Bytes, Box<Self>),
+}
+
+#[cfg(test)]
+mod tests {
+    use bytes::Bytes;
+    use rstest::rstest;
+
+    use super::validate_symlink_target;
+    use super::{SymlinkTarget, SymlinkTargetError};
+
+    #[rstest]
+    #[case::empty(b"", SymlinkTargetError::Empty)]
+    #[case::null(b"foo\0", SymlinkTargetError::Null)]
+    fn errors(#[case] v: &'static [u8], #[case] err: SymlinkTargetError) {
+        {
+            assert_eq!(
+                Err(err.clone()),
+                validate_symlink_target(v),
+                "validate_symlink_target must fail as expected"
+            );
+        }
+
+        let exp_err_v = Bytes::from_static(v);
+
+        // Bytes
+        {
+            let v = Bytes::from_static(v);
+            assert_eq!(
+                Err(SymlinkTargetError::Convert(
+                    exp_err_v.clone(),
+                    Box::new(err.clone())
+                )),
+                SymlinkTarget::try_from(v),
+                "conversion must fail as expected"
+            );
+        }
+        // &[u8]
+        {
+            assert_eq!(
+                Err(SymlinkTargetError::Convert(
+                    exp_err_v.clone(),
+                    Box::new(err.clone())
+                )),
+                SymlinkTarget::try_from(v),
+                "conversion must fail as expected"
+            );
+        }
+        // &str, if this is valid UTF-8
+        {
+            if let Ok(v) = std::str::from_utf8(v) {
+                assert_eq!(
+                    Err(SymlinkTargetError::Convert(
+                        exp_err_v.clone(),
+                        Box::new(err.clone())
+                    )),
+                    SymlinkTarget::try_from(v),
+                    "conversion must fail as expected"
+                );
+            }
+        }
+    }
+
+    #[test]
+    fn error_toolong() {
+        assert_eq!(
+            Err(SymlinkTargetError::TooLong),
+            validate_symlink_target("X".repeat(5000).into_bytes().as_slice())
+        )
+    }
+
+    #[rstest]
+    #[case::boring(b"aa")]
+    #[case::dot(b".")]
+    #[case::dotsandslashes(b"./..")]
+    #[case::dotdot(b"..")]
+    #[case::slashes(b"a/b")]
+    #[case::slashes_and_absolute(b"/a/b")]
+    #[case::invalid_utf8(b"\xc5\xc4\xd6")]
+    fn success(#[case] v: &'static [u8]) {
+        let exp = SymlinkTarget { inner: v.into() };
+
+        // Bytes
+        {
+            let v: Bytes = v.into();
+            assert_eq!(
+                Ok(exp.clone()),
+                SymlinkTarget::try_from(v),
+                "conversion must succeed"
+            )
+        }
+
+        // &[u8]
+        {
+            assert_eq!(
+                Ok(exp.clone()),
+                SymlinkTarget::try_from(v),
+                "conversion must succeed"
+            )
+        }
+
+        // &str, if this is valid UTF-8
+        {
+            if let Ok(v) = std::str::from_utf8(v) {
+                assert_eq!(
+                    Ok(exp.clone()),
+                    SymlinkTarget::try_from(v),
+                    "conversion must succeed"
+                )
+            }
+        }
+    }
+}
diff --git a/tvix/castore/src/path/component.rs b/tvix/castore/src/path/component.rs
new file mode 100644
index 000000000000..78aca03c50fe
--- /dev/null
+++ b/tvix/castore/src/path/component.rs
@@ -0,0 +1,268 @@
+use bstr::ByteSlice;
+use std::fmt::{self, Debug, Display};
+
+/// A wrapper type for validated path components in the castore model.
+/// Internally uses a [bytes::Bytes], but disallows
+/// slashes, and null bytes to be present, as well as
+/// '.', '..' and the empty string.
+/// It also rejects components that are too long (> 255 bytes).
+#[repr(transparent)]
+#[derive(Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
+pub struct PathComponent {
+    pub(super) inner: bytes::Bytes,
+}
+
+/// The maximum length an individual path component can have.
+/// Linux allows 255 bytes of actual name, so we pick that.
+pub const MAX_NAME_LEN: usize = 255;
+
+impl AsRef<[u8]> for PathComponent {
+    fn as_ref(&self) -> &[u8] {
+        self.inner.as_ref()
+    }
+}
+
+impl From<PathComponent> for bytes::Bytes {
+    fn from(value: PathComponent) -> Self {
+        value.inner
+    }
+}
+
+pub(super) fn validate_name<B: AsRef<[u8]>>(name: B) -> Result<(), PathComponentError> {
+    match name.as_ref() {
+        b"" => Err(PathComponentError::Empty),
+        b".." => Err(PathComponentError::Parent),
+        b"." => Err(PathComponentError::CurDir),
+        v if v.len() > MAX_NAME_LEN => Err(PathComponentError::TooLong),
+        v if v.contains(&0x00) => Err(PathComponentError::Null),
+        v if v.contains(&b'/') => Err(PathComponentError::Slashes),
+        _ => Ok(()),
+    }
+}
+
+impl TryFrom<bytes::Bytes> for PathComponent {
+    type Error = PathComponentError;
+
+    fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> {
+        if let Err(e) = validate_name(&value) {
+            return Err(PathComponentError::Convert(value, Box::new(e)));
+        }
+
+        Ok(Self { inner: value })
+    }
+}
+
+impl TryFrom<&'static [u8]> for PathComponent {
+    type Error = PathComponentError;
+
+    fn try_from(value: &'static [u8]) -> Result<Self, Self::Error> {
+        if let Err(e) = validate_name(value) {
+            return Err(PathComponentError::Convert(value.into(), Box::new(e)));
+        }
+
+        Ok(Self {
+            inner: bytes::Bytes::from_static(value),
+        })
+    }
+}
+
+impl TryFrom<&str> for PathComponent {
+    type Error = PathComponentError;
+
+    fn try_from(value: &str) -> Result<Self, Self::Error> {
+        if let Err(e) = validate_name(value) {
+            return Err(PathComponentError::Convert(
+                value.to_owned().into(),
+                Box::new(e),
+            ));
+        }
+
+        Ok(Self {
+            inner: bytes::Bytes::copy_from_slice(value.as_bytes()),
+        })
+    }
+}
+
+impl TryFrom<&std::ffi::CStr> for PathComponent {
+    type Error = PathComponentError;
+
+    fn try_from(value: &std::ffi::CStr) -> Result<Self, Self::Error> {
+        let value = value.to_bytes();
+        if let Err(e) = validate_name(value) {
+            return Err(PathComponentError::Convert(
+                value.to_owned().into(),
+                Box::new(e),
+            ));
+        }
+
+        Ok(Self {
+            inner: bytes::Bytes::copy_from_slice(value),
+        })
+    }
+}
+
+impl Debug for PathComponent {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        Debug::fmt(self.inner.as_bstr(), f)
+    }
+}
+
+impl Display for PathComponent {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        Display::fmt(self.inner.as_bstr(), f)
+    }
+}
+
+/// Errors created when parsing / validating [PathComponent].
+#[derive(Debug, PartialEq, thiserror::Error)]
+#[cfg_attr(test, derive(Clone))]
+pub enum PathComponentError {
+    #[error("cannot be empty")]
+    Empty,
+    #[error("cannot contain null bytes")]
+    Null,
+    #[error("cannot be '.'")]
+    CurDir,
+    #[error("cannot be '..'")]
+    Parent,
+    #[error("cannot contain slashes")]
+    Slashes,
+    #[error("cannot be over {} bytes long", MAX_NAME_LEN)]
+    TooLong,
+    #[error("unable to convert '{:?}'", .0.as_bstr())]
+    Convert(bytes::Bytes, #[source] Box<Self>),
+}
+
+#[cfg(test)]
+mod tests {
+    use std::ffi::CString;
+
+    use bytes::Bytes;
+    use rstest::rstest;
+
+    use super::{validate_name, PathComponent, PathComponentError};
+
+    #[rstest]
+    #[case::empty(b"", PathComponentError::Empty)]
+    #[case::null(b"foo\0", PathComponentError::Null)]
+    #[case::curdir(b".", PathComponentError::CurDir)]
+    #[case::parent(b"..", PathComponentError::Parent)]
+    #[case::slashes1(b"a/b", PathComponentError::Slashes)]
+    #[case::slashes2(b"/", PathComponentError::Slashes)]
+    fn errors(#[case] v: &'static [u8], #[case] err: PathComponentError) {
+        {
+            assert_eq!(
+                Err(err.clone()),
+                validate_name(v),
+                "validate_name must fail as expected"
+            );
+        }
+
+        let exp_err_v = Bytes::from_static(v);
+
+        // Bytes
+        {
+            let v = Bytes::from_static(v);
+            assert_eq!(
+                Err(PathComponentError::Convert(
+                    exp_err_v.clone(),
+                    Box::new(err.clone())
+                )),
+                PathComponent::try_from(v),
+                "conversion must fail as expected"
+            );
+        }
+        // &[u8]
+        {
+            assert_eq!(
+                Err(PathComponentError::Convert(
+                    exp_err_v.clone(),
+                    Box::new(err.clone())
+                )),
+                PathComponent::try_from(v),
+                "conversion must fail as expected"
+            );
+        }
+        // &str, if it is valid UTF-8
+        {
+            if let Ok(v) = std::str::from_utf8(v) {
+                assert_eq!(
+                    Err(PathComponentError::Convert(
+                        exp_err_v.clone(),
+                        Box::new(err.clone())
+                    )),
+                    PathComponent::try_from(v),
+                    "conversion must fail as expected"
+                );
+            }
+        }
+        // &CStr, if it can be constructed (fails if the payload contains null bytes)
+        {
+            if let Ok(v) = CString::new(v) {
+                let v = v.as_ref();
+                assert_eq!(
+                    Err(PathComponentError::Convert(
+                        exp_err_v.clone(),
+                        Box::new(err.clone())
+                    )),
+                    PathComponent::try_from(v),
+                    "conversion must fail as expected"
+                );
+            }
+        }
+    }
+
+    #[test]
+    fn error_toolong() {
+        assert_eq!(
+            Err(PathComponentError::TooLong),
+            validate_name("X".repeat(500).into_bytes().as_slice())
+        )
+    }
+
+    #[test]
+    fn success() {
+        let exp = PathComponent { inner: "aa".into() };
+
+        // Bytes
+        {
+            let v: Bytes = "aa".into();
+            assert_eq!(
+                Ok(exp.clone()),
+                PathComponent::try_from(v),
+                "conversion must succeed"
+            );
+        }
+
+        // &[u8]
+        {
+            let v: &[u8] = b"aa";
+            assert_eq!(
+                Ok(exp.clone()),
+                PathComponent::try_from(v),
+                "conversion must succeed"
+            );
+        }
+
+        // &str
+        {
+            let v: &str = "aa";
+            assert_eq!(
+                Ok(exp.clone()),
+                PathComponent::try_from(v),
+                "conversion must succeed"
+            );
+        }
+
+        // &CStr
+        {
+            let v = CString::new("aa").expect("CString must construct");
+            let v = v.as_c_str();
+            assert_eq!(
+                Ok(exp.clone()),
+                PathComponent::try_from(v),
+                "conversion must succeed"
+            );
+        }
+    }
+}
diff --git a/tvix/castore/src/path.rs b/tvix/castore/src/path/mod.rs
index fcc2bd01fbd6..15f31a570da9 100644
--- a/tvix/castore/src/path.rs
+++ b/tvix/castore/src/path/mod.rs
@@ -1,5 +1,5 @@
 //! Contains data structures to deal with Paths in the tvix-castore model.
-
+use bstr::ByteSlice;
 use std::{
     borrow::Borrow,
     fmt::{self, Debug, Display},
@@ -8,9 +8,8 @@ use std::{
     str::FromStr,
 };
 
-use bstr::ByteSlice;
-
-use crate::proto::validate_node_name;
+mod component;
+pub use component::{PathComponent, PathComponentError};
 
 /// Represents a Path in the castore model.
 /// These are always relative, and platform-independent, which distinguishes
@@ -38,7 +37,9 @@ impl Path {
         if !bytes.is_empty() {
             // Ensure all components are valid castore node names.
             for component in bytes.split_str(b"/") {
-                validate_node_name(component).ok()?;
+                if component::validate_name(component).is_err() {
+                    return None;
+                }
             }
         }
 
@@ -81,10 +82,26 @@ impl Path {
         Ok(v)
     }
 
+    /// Provides an iterator over the components of the path,
+    /// which are invividual [PathComponent].
+    /// In case the path is empty, an empty iterator is returned.
+    pub fn components(&self) -> impl Iterator<Item = PathComponent> + '_ {
+        let mut iter = self.inner.split_str(&b"/");
+
+        // We don't want to return an empty element, consume it if it's the only one.
+        if self.inner.is_empty() {
+            let _ = iter.next();
+        }
+
+        iter.map(|b| PathComponent {
+            inner: bytes::Bytes::copy_from_slice(b),
+        })
+    }
+
     /// Produces an iterator over the components of the path, which are
     /// individual byte slices.
     /// In case the path is empty, an empty iterator is returned.
-    pub fn components(&self) -> impl Iterator<Item = &[u8]> {
+    pub fn components_bytes(&self) -> impl Iterator<Item = &[u8]> {
         let mut iter = self.inner.split_str(&b"/");
 
         // We don't want to return an empty element, consume it if it's the only one.
@@ -95,11 +112,16 @@ impl Path {
         iter
     }
 
-    /// Returns the final component of the Path, if there is one.
-    pub fn file_name(&self) -> Option<&[u8]> {
+    /// Returns the final component of the Path, if there is one, in bytes.
+    pub fn file_name(&self) -> Option<PathComponent> {
         self.components().last()
     }
 
+    /// Returns the final component of the Path, if there is one, in bytes.
+    pub fn file_name_bytes(&self) -> Option<&[u8]> {
+        self.components_bytes().last()
+    }
+
     pub fn as_bytes(&self) -> &[u8] {
         &self.inner
     }
@@ -211,7 +233,9 @@ impl PathBuf {
 
     /// Adjoins `name` to self.
     pub fn try_push(&mut self, name: &[u8]) -> Result<(), std::io::Error> {
-        validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?;
+        if component::validate_name(name).is_err() {
+            return Err(std::io::ErrorKind::InvalidData.into());
+        }
 
         if !self.inner.is_empty() {
             self.inner.push(b'/');
@@ -329,7 +353,7 @@ mod test {
         assert_eq!(s.as_bytes(), p.as_bytes(), "inner bytes mismatch");
         assert_eq!(
             num_components,
-            p.components().count(),
+            p.components_bytes().count(),
             "number of components mismatch"
         );
     }
@@ -396,10 +420,10 @@ mod test {
     #[case("a", vec!["a"])]
     #[case("a/b", vec!["a", "b"])]
     #[case("a/b/c", vec!["a","b", "c"])]
-    pub fn components(#[case] p: PathBuf, #[case] exp_components: Vec<&str>) {
+    pub fn components_bytes(#[case] p: PathBuf, #[case] exp_components: Vec<&str>) {
         assert_eq!(
             exp_components,
-            p.components()
+            p.components_bytes()
                 .map(|x| x.to_str().unwrap())
                 .collect::<Vec<_>>()
         );
diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
index ce1d2bcd244a..62fdb34a25a0 100644
--- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
+++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
@@ -1,7 +1,5 @@
-use crate::directoryservice::DirectoryGraph;
-use crate::directoryservice::LeavesToRootValidator;
-use crate::proto;
-use crate::{directoryservice::DirectoryService, B3Digest};
+use crate::directoryservice::{DirectoryGraph, DirectoryService, LeavesToRootValidator};
+use crate::{proto, B3Digest, DirectoryError};
 use futures::stream::BoxStream;
 use futures::TryStreamExt;
 use std::ops::Deref;
@@ -58,13 +56,16 @@ where
                                 Status::not_found(format!("directory {} not found", digest))
                             })?;
 
-                        Box::pin(once(Ok(directory)))
+                        Box::pin(once(Ok(directory.into())))
                     } else {
                         // If recursive was requested, traverse via get_recursive.
                         Box::pin(
-                            self.directory_service.get_recursive(&digest).map_err(|e| {
-                                tonic::Status::new(tonic::Code::Internal, e.to_string())
-                            }),
+                            self.directory_service
+                                .get_recursive(&digest)
+                                .map_ok(proto::Directory::from)
+                                .map_err(|e| {
+                                    tonic::Status::new(tonic::Code::Internal, e.to_string())
+                                }),
                         )
                     }
                 }))
@@ -83,7 +84,9 @@ where
         let mut validator = DirectoryGraph::<LeavesToRootValidator>::default();
         while let Some(directory) = req_inner.message().await? {
             validator
-                .add(directory)
+                .add(directory.try_into().map_err(|e: DirectoryError| {
+                    tonic::Status::new(tonic::Code::Internal, e.to_string())
+                })?)
                 .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
         }
 
diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs
index a0cec896f753..89c68a4ad97b 100644
--- a/tvix/castore/src/proto/mod.rs
+++ b/tvix/castore/src/proto/mod.rs
@@ -1,18 +1,14 @@
-#![allow(non_snake_case)]
-// https://github.com/hyperium/tonic/issues/1056
-use bstr::ByteSlice;
-use std::{collections::HashSet, iter::Peekable, str};
-
 use prost::Message;
 
+use std::cmp::Ordering;
+
 mod grpc_blobservice_wrapper;
 mod grpc_directoryservice_wrapper;
 
+use crate::{path::PathComponent, B3Digest, DirectoryError};
 pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper;
 pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper;
 
-use crate::{B3Digest, B3_LEN};
-
 tonic::include_proto!("tvix.castore.v1");
 
 #[cfg(feature = "tonic-reflection")]
@@ -24,38 +20,6 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("tvix
 #[cfg(test)]
 mod tests;
 
-/// Errors that can occur during the validation of [Directory] messages.
-#[derive(Debug, PartialEq, Eq, thiserror::Error)]
-pub enum ValidateDirectoryError {
-    /// Elements are not in sorted order
-    #[error("{:?} is not sorted", .0.as_bstr())]
-    WrongSorting(Vec<u8>),
-    /// Multiple elements with the same name encountered
-    #[error("{:?} is a duplicate name", .0.as_bstr())]
-    DuplicateName(Vec<u8>),
-    /// Invalid node
-    #[error("invalid node with name {:?}: {:?}", .0.as_bstr(), .1.to_string())]
-    InvalidNode(Vec<u8>, ValidateNodeError),
-    #[error("Total size exceeds u32::MAX")]
-    SizeOverflow,
-}
-
-/// Errors that occur during Node validation
-#[derive(Debug, PartialEq, Eq, thiserror::Error)]
-pub enum ValidateNodeError {
-    #[error("No node set")]
-    NoNodeSet,
-    /// Invalid digest length encountered
-    #[error("Invalid Digest length: {0}")]
-    InvalidDigestLen(usize),
-    /// Invalid name encountered
-    #[error("Invalid name: {}", .0.as_bstr())]
-    InvalidName(Vec<u8>),
-    /// Invalid symlink target
-    #[error("Invalid symlink target: {}", .0.as_bstr())]
-    InvalidSymlinkTarget(Vec<u8>),
-}
-
 /// Errors that occur during StatBlobResponse validation
 #[derive(Debug, PartialEq, Eq, thiserror::Error)]
 pub enum ValidateStatBlobResponseError {
@@ -64,186 +28,6 @@ pub enum ValidateStatBlobResponseError {
     InvalidDigestLen(usize, usize),
 }
 
-/// Checks a Node name for validity as an intermediate node.
-/// We disallow slashes, null bytes, '.', '..' and the empty string.
-pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> {
-    if name.is_empty()
-        || name == b".."
-        || name == b"."
-        || name.contains(&0x00)
-        || name.contains(&b'/')
-    {
-        Err(ValidateNodeError::InvalidName(name.to_owned()))
-    } else {
-        Ok(())
-    }
-}
-
-/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode]
-/// and [node::Node], so we can ask all of them for the name easily.
-pub trait NamedNode {
-    fn get_name(&self) -> &[u8];
-}
-
-impl NamedNode for &FileNode {
-    fn get_name(&self) -> &[u8] {
-        &self.name
-    }
-}
-
-impl NamedNode for &DirectoryNode {
-    fn get_name(&self) -> &[u8] {
-        &self.name
-    }
-}
-
-impl NamedNode for &SymlinkNode {
-    fn get_name(&self) -> &[u8] {
-        &self.name
-    }
-}
-
-impl NamedNode for node::Node {
-    fn get_name(&self) -> &[u8] {
-        match self {
-            node::Node::File(node_file) => &node_file.name,
-            node::Node::Directory(node_directory) => &node_directory.name,
-            node::Node::Symlink(node_symlink) => &node_symlink.name,
-        }
-    }
-}
-
-impl Node {
-    /// Ensures the node has a valid enum kind (is Some), and passes its
-    // per-enum validation.
-    // The inner root node is returned for easier consumption.
-    pub fn validate(&self) -> Result<&node::Node, ValidateNodeError> {
-        if let Some(node) = self.node.as_ref() {
-            node.validate()?;
-            Ok(node)
-        } else {
-            Err(ValidateNodeError::NoNodeSet)
-        }
-    }
-}
-
-impl node::Node {
-    /// Returns the node with a new name.
-    pub fn rename(self, name: bytes::Bytes) -> Self {
-        match self {
-            node::Node::Directory(n) => node::Node::Directory(DirectoryNode { name, ..n }),
-            node::Node::File(n) => node::Node::File(FileNode { name, ..n }),
-            node::Node::Symlink(n) => node::Node::Symlink(SymlinkNode { name, ..n }),
-        }
-    }
-
-    /// Ensures the node has a valid name, and checks the type-specific fields too.
-    pub fn validate(&self) -> Result<(), ValidateNodeError> {
-        match self {
-            // for a directory root node, ensure the digest has the appropriate size.
-            node::Node::Directory(directory_node) => {
-                if directory_node.digest.len() != B3_LEN {
-                    Err(ValidateNodeError::InvalidDigestLen(
-                        directory_node.digest.len(),
-                    ))?;
-                }
-                validate_node_name(&directory_node.name)
-            }
-            // for a file root node, ensure the digest has the appropriate size.
-            node::Node::File(file_node) => {
-                if file_node.digest.len() != B3_LEN {
-                    Err(ValidateNodeError::InvalidDigestLen(file_node.digest.len()))?;
-                }
-                validate_node_name(&file_node.name)
-            }
-            // ensure the symlink target is not empty and doesn't contain null bytes.
-            node::Node::Symlink(symlink_node) => {
-                if symlink_node.target.is_empty() || symlink_node.target.contains(&b'\0') {
-                    Err(ValidateNodeError::InvalidSymlinkTarget(
-                        symlink_node.target.to_vec(),
-                    ))?;
-                }
-                validate_node_name(&symlink_node.name)
-            }
-        }
-    }
-}
-
-impl PartialOrd for node::Node {
-    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
-        Some(self.cmp(other))
-    }
-}
-
-impl Ord for node::Node {
-    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
-        self.get_name().cmp(other.get_name())
-    }
-}
-
-impl PartialOrd for FileNode {
-    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
-        Some(self.cmp(other))
-    }
-}
-
-impl Ord for FileNode {
-    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
-        self.get_name().cmp(other.get_name())
-    }
-}
-
-impl PartialOrd for SymlinkNode {
-    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
-        Some(self.cmp(other))
-    }
-}
-
-impl Ord for SymlinkNode {
-    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
-        self.get_name().cmp(other.get_name())
-    }
-}
-
-impl PartialOrd for DirectoryNode {
-    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
-        Some(self.cmp(other))
-    }
-}
-
-impl Ord for DirectoryNode {
-    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
-        self.get_name().cmp(other.get_name())
-    }
-}
-
-/// Accepts a name, and a mutable reference to the previous name.
-/// If the passed name is larger than the previous one, the reference is updated.
-/// If it's not, an error is returned.
-fn update_if_lt_prev<'n>(
-    prev_name: &mut &'n [u8],
-    name: &'n [u8],
-) -> Result<(), ValidateDirectoryError> {
-    if *name < **prev_name {
-        return Err(ValidateDirectoryError::WrongSorting(name.to_vec()));
-    }
-    *prev_name = name;
-    Ok(())
-}
-
-/// Inserts the given name into a HashSet if it's not already in there.
-/// If it is, an error is returned.
-fn insert_once<'n>(
-    seen_names: &mut HashSet<&'n [u8]>,
-    name: &'n [u8],
-) -> Result<(), ValidateDirectoryError> {
-    if seen_names.get(name).is_some() {
-        return Err(ValidateDirectoryError::DuplicateName(name.to_vec()));
-    }
-    seen_names.insert(name);
-    Ok(())
-}
-
 fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> {
     iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i))
 }
@@ -280,117 +64,233 @@ impl Directory {
             .as_bytes()
             .into()
     }
+}
 
-    /// validate checks the directory for invalid data, such as:
-    /// - violations of name restrictions
-    /// - invalid digest lengths
-    /// - not properly sorted lists
-    /// - duplicate names in the three lists
-    pub fn validate(&self) -> Result<(), ValidateDirectoryError> {
-        let mut seen_names: HashSet<&[u8]> = HashSet::new();
-
-        let mut last_directory_name: &[u8] = b"";
-        let mut last_file_name: &[u8] = b"";
-        let mut last_symlink_name: &[u8] = b"";
-
-        // check directories
-        for directory_node in &self.directories {
-            node::Node::Directory(directory_node.clone())
-                .validate()
-                .map_err(|e| {
-                    ValidateDirectoryError::InvalidNode(directory_node.name.to_vec(), e)
-                })?;
-
-            update_if_lt_prev(&mut last_directory_name, &directory_node.name)?;
-            insert_once(&mut seen_names, &directory_node.name)?;
-        }
+impl TryFrom<Directory> for crate::Directory {
+    type Error = DirectoryError;
+
+    fn try_from(value: Directory) -> Result<Self, Self::Error> {
+        // Check directories, files and symlinks are sorted
+        // We'll notice duplicates across all three fields when constructing the Directory.
+        // FUTUREWORK: use is_sorted() once stable, and/or implement the producer for
+        // [crate::Directory::try_from_iter] iterating over all three and doing all checks inline.
+        value
+            .directories
+            .iter()
+            .try_fold(&b""[..], |prev_name, e| {
+                match e.name.as_ref().cmp(prev_name) {
+                    Ordering::Less => Err(DirectoryError::WrongSorting(e.name.to_owned())),
+                    Ordering::Equal => Err(DirectoryError::DuplicateName(
+                        e.name
+                            .to_owned()
+                            .try_into()
+                            .map_err(DirectoryError::InvalidName)?,
+                    )),
+                    Ordering::Greater => Ok(e.name.as_ref()),
+                }
+            })?;
+        value.files.iter().try_fold(&b""[..], |prev_name, e| {
+            match e.name.as_ref().cmp(prev_name) {
+                Ordering::Less => Err(DirectoryError::WrongSorting(e.name.to_owned())),
+                Ordering::Equal => Err(DirectoryError::DuplicateName(
+                    e.name
+                        .to_owned()
+                        .try_into()
+                        .map_err(DirectoryError::InvalidName)?,
+                )),
+                Ordering::Greater => Ok(e.name.as_ref()),
+            }
+        })?;
+        value.symlinks.iter().try_fold(&b""[..], |prev_name, e| {
+            match e.name.as_ref().cmp(prev_name) {
+                Ordering::Less => Err(DirectoryError::WrongSorting(e.name.to_owned())),
+                Ordering::Equal => Err(DirectoryError::DuplicateName(
+                    e.name
+                        .to_owned()
+                        .try_into()
+                        .map_err(DirectoryError::InvalidName)?,
+                )),
+                Ordering::Greater => Ok(e.name.as_ref()),
+            }
+        })?;
 
-        // check files
-        for file_node in &self.files {
-            node::Node::File(file_node.clone())
-                .validate()
-                .map_err(|e| ValidateDirectoryError::InvalidNode(file_node.name.to_vec(), e))?;
+        // FUTUREWORK: use is_sorted() once stable, and/or implement the producer for
+        // [crate::Directory::try_from_iter] iterating over all three and doing all checks inline.
+        let mut elems: Vec<(PathComponent, crate::Node)> =
+            Vec::with_capacity(value.directories.len() + value.files.len() + value.symlinks.len());
 
-            update_if_lt_prev(&mut last_file_name, &file_node.name)?;
-            insert_once(&mut seen_names, &file_node.name)?;
+        for e in value.directories {
+            elems.push(
+                Node {
+                    node: Some(node::Node::Directory(e)),
+                }
+                .try_into_name_and_node()?,
+            );
         }
 
-        // check symlinks
-        for symlink_node in &self.symlinks {
-            node::Node::Symlink(symlink_node.clone())
-                .validate()
-                .map_err(|e| ValidateDirectoryError::InvalidNode(symlink_node.name.to_vec(), e))?;
+        for e in value.files {
+            elems.push(
+                Node {
+                    node: Some(node::Node::File(e)),
+                }
+                .try_into_name_and_node()?,
+            )
+        }
 
-            update_if_lt_prev(&mut last_symlink_name, &symlink_node.name)?;
-            insert_once(&mut seen_names, &symlink_node.name)?;
+        for e in value.symlinks {
+            elems.push(
+                Node {
+                    node: Some(node::Node::Symlink(e)),
+                }
+                .try_into_name_and_node()?,
+            )
         }
 
-        self.size_checked()
-            .ok_or(ValidateDirectoryError::SizeOverflow)?;
+        crate::Directory::try_from_iter(elems)
+    }
+}
 
-        Ok(())
+impl From<crate::Directory> for Directory {
+    fn from(value: crate::Directory) -> Self {
+        let mut directories = vec![];
+        let mut files = vec![];
+        let mut symlinks = vec![];
+
+        for (name, node) in value.into_nodes() {
+            match node {
+                crate::Node::File {
+                    digest,
+                    size,
+                    executable,
+                } => files.push(FileNode {
+                    name: name.into(),
+                    digest: digest.into(),
+                    size,
+                    executable,
+                }),
+                crate::Node::Directory { digest, size } => directories.push(DirectoryNode {
+                    name: name.into(),
+                    digest: digest.into(),
+                    size,
+                }),
+                crate::Node::Symlink { target } => {
+                    symlinks.push(SymlinkNode {
+                        name: name.into(),
+                        target: target.into(),
+                    });
+                }
+            }
+        }
+
+        Directory {
+            directories,
+            files,
+            symlinks,
+        }
     }
+}
 
-    /// Allows iterating over all three nodes ([DirectoryNode], [FileNode],
-    /// [SymlinkNode]) in an ordered fashion, as long as the individual lists
-    /// are sorted (which can be checked by the [Directory::validate]).
-    pub fn nodes(&self) -> DirectoryNodesIterator {
-        return DirectoryNodesIterator {
-            i_directories: self.directories.iter().peekable(),
-            i_files: self.files.iter().peekable(),
-            i_symlinks: self.symlinks.iter().peekable(),
-        };
+impl Node {
+    /// Converts a proto [Node] to a [crate::Node], and splits off the name as a [PathComponent].
+    pub fn try_into_name_and_node(self) -> Result<(PathComponent, crate::Node), DirectoryError> {
+        let (name_bytes, node) = self.try_into_unchecked_name_and_checked_node()?;
+        Ok((
+            name_bytes.try_into().map_err(DirectoryError::InvalidName)?,
+            node,
+        ))
     }
 
-    /// Adds the specified [node::Node] to the [Directory], preserving sorted entries.
-    /// This assumes the [Directory] to be sorted prior to adding the node.
-    ///
-    /// Inserting an element that already exists with the same name in the directory is not
-    /// supported.
-    pub fn add(&mut self, node: node::Node) {
-        debug_assert!(
-            !self.files.iter().any(|x| x.get_name() == node.get_name()),
-            "name already exists in files"
-        );
-        debug_assert!(
-            !self
-                .directories
-                .iter()
-                .any(|x| x.get_name() == node.get_name()),
-            "name already exists in directories"
-        );
-        debug_assert!(
-            !self
-                .symlinks
-                .iter()
-                .any(|x| x.get_name() == node.get_name()),
-            "name already exists in symlinks"
-        );
-
-        match node {
-            node::Node::File(node) => {
-                let pos = self
-                    .files
-                    .binary_search(&node)
-                    .expect_err("Tvix bug: dir entry with name already exists");
-                self.files.insert(pos, node);
+    /// Converts a proto [Node] to a [crate::Node], and splits off the name as a
+    /// [bytes::Bytes] without doing any checking of it.
+    fn try_into_unchecked_name_and_checked_node(
+        self,
+    ) -> Result<(bytes::Bytes, crate::Node), DirectoryError> {
+        match self.node.ok_or_else(|| DirectoryError::NoNodeSet)? {
+            node::Node::Directory(n) => {
+                let digest = B3Digest::try_from(n.digest)
+                    .map_err(|e| DirectoryError::InvalidNode(n.name.clone(), e.into()))?;
+
+                let node = crate::Node::Directory {
+                    digest,
+                    size: n.size,
+                };
+
+                Ok((n.name, node))
             }
-            node::Node::Directory(node) => {
-                let pos = self
-                    .directories
-                    .binary_search(&node)
-                    .expect_err("Tvix bug: dir entry with name already exists");
-                self.directories.insert(pos, node);
+            node::Node::File(n) => {
+                let digest = B3Digest::try_from(n.digest)
+                    .map_err(|e| DirectoryError::InvalidNode(n.name.clone(), e.into()))?;
+
+                let node = crate::Node::File {
+                    digest,
+                    size: n.size,
+                    executable: n.executable,
+                };
+
+                Ok((n.name, node))
             }
-            node::Node::Symlink(node) => {
-                let pos = self
-                    .symlinks
-                    .binary_search(&node)
-                    .expect_err("Tvix bug: dir entry with name already exists");
-                self.symlinks.insert(pos, node);
+
+            node::Node::Symlink(n) => {
+                let node = crate::Node::Symlink {
+                    target: n.target.try_into().map_err(|e| {
+                        DirectoryError::InvalidNode(
+                            n.name.clone(),
+                            crate::ValidateNodeError::InvalidSymlinkTarget(e),
+                        )
+                    })?,
+                };
+
+                Ok((n.name, node))
             }
         }
     }
+
+    /// Converts a proto [Node] to a [crate::Node], and splits off the name and returns it as a
+    /// [bytes::Bytes].
+    ///
+    /// The name must be empty.
+    pub fn try_into_anonymous_node(self) -> Result<crate::Node, DirectoryError> {
+        let (name, node) = Self::try_into_unchecked_name_and_checked_node(self)?;
+
+        if !name.is_empty() {
+            return Err(DirectoryError::NameInAnonymousNode);
+        }
+
+        Ok(node)
+    }
+
+    /// Constructs a [Node] from a name and [crate::Node].
+    /// The name is a [bytes::Bytes], not a [PathComponent], as we have use an
+    /// empty name in some places.
+    pub fn from_name_and_node(name: bytes::Bytes, n: crate::Node) -> Self {
+        match n {
+            crate::Node::Directory { digest, size } => Self {
+                node: Some(node::Node::Directory(DirectoryNode {
+                    name,
+                    digest: digest.into(),
+                    size,
+                })),
+            },
+            crate::Node::File {
+                digest,
+                size,
+                executable,
+            } => Self {
+                node: Some(node::Node::File(FileNode {
+                    name,
+                    digest: digest.into(),
+                    size,
+                    executable,
+                })),
+            },
+            crate::Node::Symlink { target } => Self {
+                node: Some(node::Node::Symlink(SymlinkNode {
+                    name,
+                    target: target.into(),
+                })),
+            },
+        }
+    }
 }
 
 impl StatBlobResponse {
@@ -409,65 +309,3 @@ impl StatBlobResponse {
         Ok(())
     }
 }
-
-/// Struct to hold the state of an iterator over all nodes of a Directory.
-///
-/// Internally, this keeps peekable Iterators over all three lists of a
-/// Directory message.
-pub struct DirectoryNodesIterator<'a> {
-    // directory: &Directory,
-    i_directories: Peekable<std::slice::Iter<'a, DirectoryNode>>,
-    i_files: Peekable<std::slice::Iter<'a, FileNode>>,
-    i_symlinks: Peekable<std::slice::Iter<'a, SymlinkNode>>,
-}
-
-/// looks at two elements implementing NamedNode, and returns true if "left
-/// is smaller / comes first".
-///
-/// Some(_) is preferred over None.
-fn left_name_lt_right<A: NamedNode, B: NamedNode>(left: Option<&A>, right: Option<&B>) -> bool {
-    match left {
-        // if left is None, right always wins
-        None => false,
-        Some(left_inner) => {
-            // left is Some.
-            match right {
-                // left is Some, right is None - left wins.
-                None => true,
-                Some(right_inner) => {
-                    // both are Some - compare the name.
-                    return left_inner.get_name() < right_inner.get_name();
-                }
-            }
-        }
-    }
-}
-
-impl Iterator for DirectoryNodesIterator<'_> {
-    type Item = node::Node;
-
-    // next returns the next node in the Directory.
-    // we peek at all three internal iterators, and pick the one with the
-    // smallest name, to ensure lexicographical ordering.
-    // The individual lists are already known to be sorted.
-    fn next(&mut self) -> Option<Self::Item> {
-        if left_name_lt_right(self.i_directories.peek(), self.i_files.peek()) {
-            // i_directories is still in the game, compare with symlinks
-            if left_name_lt_right(self.i_directories.peek(), self.i_symlinks.peek()) {
-                self.i_directories
-                    .next()
-                    .cloned()
-                    .map(node::Node::Directory)
-            } else {
-                self.i_symlinks.next().cloned().map(node::Node::Symlink)
-            }
-        } else {
-            // i_files is still in the game, compare with symlinks
-            if left_name_lt_right(self.i_files.peek(), self.i_symlinks.peek()) {
-                self.i_files.next().cloned().map(node::Node::File)
-            } else {
-                self.i_symlinks.next().cloned().map(node::Node::Symlink)
-            }
-        }
-    }
-}
diff --git a/tvix/castore/src/proto/tests/directory.rs b/tvix/castore/src/proto/tests/directory.rs
index 81b73a048d52..efbc4e9f2af1 100644
--- a/tvix/castore/src/proto/tests/directory.rs
+++ b/tvix/castore/src/proto/tests/directory.rs
@@ -1,7 +1,5 @@
-use crate::proto::{
-    node, Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError,
-    ValidateNodeError,
-};
+use crate::proto::{Directory, DirectoryError, DirectoryNode, FileNode, SymlinkNode};
+use crate::ValidateNodeError;
 
 use hex_literal::hex;
 
@@ -149,7 +147,7 @@ fn digest() {
 #[test]
 fn validate_empty() {
     let d = Directory::default();
-    assert_eq!(d.validate(), Ok(()));
+    assert!(crate::Directory::try_from(d).is_ok());
 }
 
 #[test]
@@ -157,18 +155,15 @@ fn validate_invalid_names() {
     {
         let d = Directory {
             directories: vec![DirectoryNode {
-                name: "".into(),
+                name: b"\0"[..].into(),
                 digest: DUMMY_DIGEST.to_vec().into(),
                 size: 42,
             }],
             ..Default::default()
         };
-        match d.validate().expect_err("must fail") {
-            ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
-                assert_eq!(n, b"")
-            }
-            _ => panic!("unexpected error"),
-        };
+
+        let e = crate::Directory::try_from(d).expect_err("must fail");
+        assert!(matches!(e, DirectoryError::InvalidName(_)));
     }
 
     {
@@ -180,12 +175,8 @@ fn validate_invalid_names() {
             }],
             ..Default::default()
         };
-        match d.validate().expect_err("must fail") {
-            ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
-                assert_eq!(n, b".")
-            }
-            _ => panic!("unexpected error"),
-        };
+        let e = crate::Directory::try_from(d).expect_err("must fail");
+        assert!(matches!(e, DirectoryError::InvalidName(_)));
     }
 
     {
@@ -198,12 +189,8 @@ fn validate_invalid_names() {
             }],
             ..Default::default()
         };
-        match d.validate().expect_err("must fail") {
-            ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
-                assert_eq!(n, b"..")
-            }
-            _ => panic!("unexpected error"),
-        };
+        let e = crate::Directory::try_from(d).expect_err("must fail");
+        assert!(matches!(e, DirectoryError::InvalidName(_)));
     }
 
     {
@@ -214,12 +201,8 @@ fn validate_invalid_names() {
             }],
             ..Default::default()
         };
-        match d.validate().expect_err("must fail") {
-            ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
-                assert_eq!(n, b"\x00")
-            }
-            _ => panic!("unexpected error"),
-        };
+        let e = crate::Directory::try_from(d).expect_err("must fail");
+        assert!(matches!(e, DirectoryError::InvalidName(_)));
     }
 
     {
@@ -230,12 +213,20 @@ fn validate_invalid_names() {
             }],
             ..Default::default()
         };
-        match d.validate().expect_err("must fail") {
-            ValidateDirectoryError::InvalidNode(n, ValidateNodeError::InvalidName(_)) => {
-                assert_eq!(n, b"foo/bar")
-            }
-            _ => panic!("unexpected error"),
+        let e = crate::Directory::try_from(d).expect_err("must fail");
+        assert!(matches!(e, DirectoryError::InvalidName(_)));
+    }
+
+    {
+        let d = Directory {
+            symlinks: vec![SymlinkNode {
+                name: bytes::Bytes::copy_from_slice("X".repeat(500).into_bytes().as_slice()),
+                target: "foo".into(),
+            }],
+            ..Default::default()
         };
+        let e = crate::Directory::try_from(d).expect_err("must fail");
+        assert!(matches!(e, DirectoryError::InvalidName(_)));
     }
 }
 
@@ -249,8 +240,8 @@ fn validate_invalid_digest() {
         }],
         ..Default::default()
     };
-    match d.validate().expect_err("must fail") {
-        ValidateDirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => {
+    match crate::Directory::try_from(d).expect_err("must fail") {
+        DirectoryError::InvalidNode(_, ValidateNodeError::InvalidDigestLen(n)) => {
             assert_eq!(n, 2)
         }
         _ => panic!("unexpected error"),
@@ -276,15 +267,15 @@ fn validate_sorting() {
             ],
             ..Default::default()
         };
-        match d.validate().expect_err("must fail") {
-            ValidateDirectoryError::WrongSorting(s) => {
-                assert_eq!(s, b"a");
+        match crate::Directory::try_from(d).expect_err("must fail") {
+            DirectoryError::WrongSorting(s) => {
+                assert_eq!(s.as_ref(), b"a");
             }
             _ => panic!("unexpected error"),
         }
     }
 
-    // "a" exists twice, bad.
+    // "a" exists twice (same types), bad.
     {
         let d = Directory {
             directories: vec![
@@ -301,9 +292,31 @@ fn validate_sorting() {
             ],
             ..Default::default()
         };
-        match d.validate().expect_err("must fail") {
-            ValidateDirectoryError::DuplicateName(s) => {
-                assert_eq!(s, b"a");
+        match crate::Directory::try_from(d).expect_err("must fail") {
+            DirectoryError::DuplicateName(s) => {
+                assert_eq!(s.as_ref(), b"a");
+            }
+            _ => panic!("unexpected error"),
+        }
+    }
+
+    // "a" exists twice (different types), bad.
+    {
+        let d = Directory {
+            directories: vec![DirectoryNode {
+                name: "a".into(),
+                digest: DUMMY_DIGEST.to_vec().into(),
+                size: 42,
+            }],
+            symlinks: vec![SymlinkNode {
+                name: "a".into(),
+                target: "b".into(),
+            }],
+            ..Default::default()
+        };
+        match crate::Directory::try_from(d).expect_err("must fail") {
+            DirectoryError::DuplicateName(s) => {
+                assert_eq!(s.as_ref(), b"a");
             }
             _ => panic!("unexpected error"),
         }
@@ -327,7 +340,7 @@ fn validate_sorting() {
             ..Default::default()
         };
 
-        d.validate().expect("validate shouldn't error");
+        crate::Directory::try_from(d).expect("validate shouldn't error");
     }
 
     // [b, c] and [a] are both properly sorted.
@@ -352,101 +365,6 @@ fn validate_sorting() {
             ..Default::default()
         };
 
-        d.validate().expect("validate shouldn't error");
+        crate::Directory::try_from(d).expect("validate shouldn't error");
     }
 }
-
-#[test]
-fn validate_overflow() {
-    let d = Directory {
-        directories: vec![DirectoryNode {
-            name: "foo".into(),
-            digest: DUMMY_DIGEST.to_vec().into(),
-            size: u64::MAX,
-        }],
-        ..Default::default()
-    };
-
-    match d.validate().expect_err("must fail") {
-        ValidateDirectoryError::SizeOverflow => {}
-        _ => panic!("unexpected error"),
-    }
-}
-
-#[test]
-fn add_nodes_to_directory() {
-    let mut d = Directory {
-        ..Default::default()
-    };
-
-    d.add(node::Node::Directory(DirectoryNode {
-        name: "b".into(),
-        digest: DUMMY_DIGEST.to_vec().into(),
-        size: 1,
-    }));
-    d.add(node::Node::Directory(DirectoryNode {
-        name: "a".into(),
-        digest: DUMMY_DIGEST.to_vec().into(),
-        size: 1,
-    }));
-    d.add(node::Node::Directory(DirectoryNode {
-        name: "z".into(),
-        digest: DUMMY_DIGEST.to_vec().into(),
-        size: 1,
-    }));
-
-    d.add(node::Node::File(FileNode {
-        name: "f".into(),
-        digest: DUMMY_DIGEST.to_vec().into(),
-        size: 1,
-        executable: true,
-    }));
-    d.add(node::Node::File(FileNode {
-        name: "c".into(),
-        digest: DUMMY_DIGEST.to_vec().into(),
-        size: 1,
-        executable: true,
-    }));
-    d.add(node::Node::File(FileNode {
-        name: "g".into(),
-        digest: DUMMY_DIGEST.to_vec().into(),
-        size: 1,
-        executable: true,
-    }));
-
-    d.add(node::Node::Symlink(SymlinkNode {
-        name: "t".into(),
-        target: "a".into(),
-    }));
-    d.add(node::Node::Symlink(SymlinkNode {
-        name: "o".into(),
-        target: "a".into(),
-    }));
-    d.add(node::Node::Symlink(SymlinkNode {
-        name: "e".into(),
-        target: "a".into(),
-    }));
-
-    d.validate().expect("directory should be valid");
-}
-
-#[test]
-#[cfg_attr(not(debug_assertions), ignore)]
-#[should_panic = "name already exists in directories"]
-fn add_duplicate_node_to_directory_panics() {
-    let mut d = Directory {
-        ..Default::default()
-    };
-
-    d.add(node::Node::Directory(DirectoryNode {
-        name: "a".into(),
-        digest: DUMMY_DIGEST.to_vec().into(),
-        size: 1,
-    }));
-    d.add(node::Node::File(FileNode {
-        name: "a".into(),
-        digest: DUMMY_DIGEST.to_vec().into(),
-        size: 1,
-        executable: true,
-    }));
-}
diff --git a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs b/tvix/castore/src/proto/tests/directory_nodes_iterator.rs
deleted file mode 100644
index 68f147a33210..000000000000
--- a/tvix/castore/src/proto/tests/directory_nodes_iterator.rs
+++ /dev/null
@@ -1,78 +0,0 @@
-use crate::proto::Directory;
-use crate::proto::DirectoryNode;
-use crate::proto::FileNode;
-use crate::proto::NamedNode;
-use crate::proto::SymlinkNode;
-
-#[test]
-fn iterator() {
-    let d = Directory {
-        directories: vec![
-            DirectoryNode {
-                name: "c".into(),
-                ..DirectoryNode::default()
-            },
-            DirectoryNode {
-                name: "d".into(),
-                ..DirectoryNode::default()
-            },
-            DirectoryNode {
-                name: "h".into(),
-                ..DirectoryNode::default()
-            },
-            DirectoryNode {
-                name: "l".into(),
-                ..DirectoryNode::default()
-            },
-        ],
-        files: vec![
-            FileNode {
-                name: "b".into(),
-                ..FileNode::default()
-            },
-            FileNode {
-                name: "e".into(),
-                ..FileNode::default()
-            },
-            FileNode {
-                name: "g".into(),
-                ..FileNode::default()
-            },
-            FileNode {
-                name: "j".into(),
-                ..FileNode::default()
-            },
-        ],
-        symlinks: vec![
-            SymlinkNode {
-                name: "a".into(),
-                ..SymlinkNode::default()
-            },
-            SymlinkNode {
-                name: "f".into(),
-                ..SymlinkNode::default()
-            },
-            SymlinkNode {
-                name: "i".into(),
-                ..SymlinkNode::default()
-            },
-            SymlinkNode {
-                name: "k".into(),
-                ..SymlinkNode::default()
-            },
-        ],
-    };
-
-    // We keep this strings here and convert to string to make the comparison
-    // less messy.
-    let mut node_names: Vec<String> = vec![];
-
-    for node in d.nodes() {
-        node_names.push(String::from_utf8(node.get_name().to_vec()).unwrap());
-    }
-
-    assert_eq!(
-        vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"],
-        node_names
-    );
-}
diff --git a/tvix/castore/src/proto/tests/mod.rs b/tvix/castore/src/proto/tests/mod.rs
index 8d903bacb6c5..9f6330914bff 100644
--- a/tvix/castore/src/proto/tests/mod.rs
+++ b/tvix/castore/src/proto/tests/mod.rs
@@ -1,2 +1,47 @@
+use super::{node, Node, SymlinkNode};
+use crate::DirectoryError;
+
 mod directory;
-mod directory_nodes_iterator;
+
+/// Create a node with an empty symlink target, and ensure it fails validation.
+#[test]
+fn convert_symlink_empty_target_invalid() {
+    Node {
+        node: Some(node::Node::Symlink(SymlinkNode {
+            name: "foo".into(),
+            target: "".into(),
+        })),
+    }
+    .try_into_name_and_node()
+    .expect_err("must fail validation");
+}
+
+/// Create a node with a symlink target including null bytes, and ensure it
+/// fails validation.
+#[test]
+fn convert_symlink_target_null_byte_invalid() {
+    Node {
+        node: Some(node::Node::Symlink(SymlinkNode {
+            name: "foo".into(),
+            target: "foo\0".into(),
+        })),
+    }
+    .try_into_name_and_node()
+    .expect_err("must fail validation");
+}
+
+/// Create a node with a name, and ensure our ano
+#[test]
+fn convert_anonymous_with_name_fail() {
+    assert_eq!(
+        DirectoryError::NameInAnonymousNode,
+        Node {
+            node: Some(node::Node::Symlink(SymlinkNode {
+                name: "foo".into(),
+                target: "somewhereelse".into(),
+            })),
+        }
+        .try_into_anonymous_node()
+        .expect_err("must fail")
+    )
+}
diff --git a/tvix/castore/src/refscan.rs b/tvix/castore/src/refscan.rs
new file mode 100644
index 000000000000..352593020472
--- /dev/null
+++ b/tvix/castore/src/refscan.rs
@@ -0,0 +1,354 @@
+//! Simple scanner for non-overlapping, known references of Nix store paths in a
+//! given string.
+//!
+//! This is used for determining build references (see
+//! //tvix/eval/docs/build-references.md for more details).
+//!
+//! The scanner itself is using the Wu-Manber string-matching algorithm, using
+//! our fork of the `wu-mamber` crate.
+use pin_project::pin_project;
+use std::collections::BTreeSet;
+use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use std::task::{ready, Poll};
+use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
+use wu_manber::TwoByteWM;
+
+/// A searcher that incapsulates the candidates and the Wu-Manber searcher.
+/// This is separate from the scanner because we need to look for the same
+/// pattern in multiple outputs and don't want to pay the price of constructing
+/// the searcher for each build output.
+pub struct ReferencePatternInner<P> {
+    candidates: Vec<P>,
+    longest_candidate: usize,
+    // FUTUREWORK: Support overlapping patterns to be compatible with cpp Nix
+    searcher: Option<TwoByteWM>,
+}
+
+#[derive(Clone)]
+pub struct ReferencePattern<P> {
+    inner: Arc<ReferencePatternInner<P>>,
+}
+
+impl<P> ReferencePattern<P> {
+    pub fn candidates(&self) -> &[P] {
+        &self.inner.candidates
+    }
+
+    pub fn longest_candidate(&self) -> usize {
+        self.inner.longest_candidate
+    }
+}
+
+impl<P: AsRef<[u8]>> ReferencePattern<P> {
+    /// Construct a new `ReferencePattern` that knows how to scan for the given
+    /// candidates.
+    pub fn new(candidates: Vec<P>) -> Self {
+        let searcher = if candidates.is_empty() {
+            None
+        } else {
+            Some(TwoByteWM::new(&candidates))
+        };
+        let longest_candidate = candidates.iter().fold(0, |v, c| v.max(c.as_ref().len()));
+
+        ReferencePattern {
+            inner: Arc::new(ReferencePatternInner {
+                searcher,
+                candidates,
+                longest_candidate,
+            }),
+        }
+    }
+}
+
+impl<P> From<Vec<P>> for ReferencePattern<P>
+where
+    P: AsRef<[u8]>,
+{
+    fn from(candidates: Vec<P>) -> Self {
+        Self::new(candidates)
+    }
+}
+
+/// Represents a "primed" reference scanner with an automaton that knows the set
+/// of bytes patterns to scan for.
+pub struct ReferenceScanner<P> {
+    pattern: ReferencePattern<P>,
+    matches: Vec<AtomicBool>,
+}
+
+impl<P: AsRef<[u8]>> ReferenceScanner<P> {
+    /// Construct a new `ReferenceScanner` that knows how to scan for the given
+    /// candidate bytes patterns.
+    pub fn new<IP: Into<ReferencePattern<P>>>(pattern: IP) -> Self {
+        let pattern = pattern.into();
+        let mut matches = Vec::new();
+        for _ in 0..pattern.candidates().len() {
+            matches.push(AtomicBool::new(false));
+        }
+        ReferenceScanner { pattern, matches }
+    }
+
+    /// Scan the given buffer for all non-overlapping matches and collect them
+    /// in the scanner.
+    pub fn scan<S: AsRef<[u8]>>(&self, haystack: S) {
+        if haystack.as_ref().len() < self.pattern.longest_candidate() {
+            return;
+        }
+
+        if let Some(searcher) = &self.pattern.inner.searcher {
+            for m in searcher.find(haystack) {
+                self.matches[m.pat_idx].store(true, Ordering::Release);
+            }
+        }
+    }
+
+    pub fn pattern(&self) -> &ReferencePattern<P> {
+        &self.pattern
+    }
+
+    pub fn matches(&self) -> Vec<bool> {
+        self.matches
+            .iter()
+            .map(|m| m.load(Ordering::Acquire))
+            .collect()
+    }
+
+    pub fn candidate_matches(&self) -> impl Iterator<Item = &P> {
+        let candidates = self.pattern.candidates();
+        self.matches.iter().enumerate().filter_map(|(idx, found)| {
+            if found.load(Ordering::Acquire) {
+                Some(&candidates[idx])
+            } else {
+                None
+            }
+        })
+    }
+}
+
+impl<P: Clone + Ord + AsRef<[u8]>> ReferenceScanner<P> {
+    /// Finalise the reference scanner and return the resulting matches.
+    pub fn finalise(self) -> BTreeSet<P> {
+        self.candidate_matches().cloned().collect()
+    }
+}
+
+const DEFAULT_BUF_SIZE: usize = 8 * 1024;
+
+#[pin_project]
+pub struct ReferenceReader<'a, P, R> {
+    scanner: &'a ReferenceScanner<P>,
+    buffer: Vec<u8>,
+    consumed: usize,
+    #[pin]
+    reader: R,
+}
+
+impl<'a, P, R> ReferenceReader<'a, P, R>
+where
+    P: AsRef<[u8]>,
+{
+    pub fn new(scanner: &'a ReferenceScanner<P>, reader: R) -> Self {
+        Self::with_capacity(DEFAULT_BUF_SIZE, scanner, reader)
+    }
+
+    pub fn with_capacity(capacity: usize, scanner: &'a ReferenceScanner<P>, reader: R) -> Self {
+        // If capacity is not at least as long as longest_candidate we can't do a scan
+        let capacity = capacity.max(scanner.pattern().longest_candidate());
+        ReferenceReader {
+            scanner,
+            buffer: Vec::with_capacity(capacity),
+            consumed: 0,
+            reader,
+        }
+    }
+}
+
+impl<'a, P, R> AsyncRead for ReferenceReader<'a, P, R>
+where
+    R: AsyncRead,
+    P: AsRef<[u8]>,
+{
+    fn poll_read(
+        mut self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &mut tokio::io::ReadBuf<'_>,
+    ) -> Poll<std::io::Result<()>> {
+        let internal_buf = ready!(self.as_mut().poll_fill_buf(cx))?;
+        let amt = buf.remaining().min(internal_buf.len());
+        buf.put_slice(&internal_buf[..amt]);
+        self.consume(amt);
+        Poll::Ready(Ok(()))
+    }
+}
+
+impl<'a, P, R> AsyncBufRead for ReferenceReader<'a, P, R>
+where
+    R: AsyncRead,
+    P: AsRef<[u8]>,
+{
+    fn poll_fill_buf(
+        self: Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<std::io::Result<&[u8]>> {
+        #[allow(clippy::manual_saturating_arithmetic)] // for clarity
+        let overlap = self
+            .scanner
+            .pattern
+            .longest_candidate()
+            .checked_sub(1)
+            // If this overflows (longest_candidate = 0), that means there are no needles,
+            // so there is no need to have any overlap
+            .unwrap_or(0);
+        let mut this = self.project();
+        // Still data in buffer
+        if *this.consumed < this.buffer.len() {
+            return Poll::Ready(Ok(&this.buffer[*this.consumed..]));
+        }
+        // We need to copy last `overlap` bytes to front to deal with references that overlap reads
+        if *this.consumed > overlap {
+            let start = this.buffer.len() - overlap;
+            this.buffer.copy_within(start.., 0);
+            this.buffer.truncate(overlap);
+            *this.consumed = overlap;
+        }
+        // Read at least until self.buffer.len() > overlap so we can do one scan
+        loop {
+            let filled = {
+                let mut buf = ReadBuf::uninit(this.buffer.spare_capacity_mut());
+                ready!(this.reader.as_mut().poll_read(cx, &mut buf))?;
+                buf.filled().len()
+            };
+            // SAFETY: We just read `filled` amount of data above
+            unsafe {
+                this.buffer.set_len(filled + this.buffer.len());
+            }
+            if filled == 0 || this.buffer.len() > overlap {
+                break;
+            }
+        }
+
+        #[allow(clippy::needless_borrows_for_generic_args)] // misfiring lint (breaks code below)
+        this.scanner.scan(&this.buffer);
+
+        Poll::Ready(Ok(&this.buffer[*this.consumed..]))
+    }
+
+    fn consume(self: Pin<&mut Self>, amt: usize) {
+        debug_assert!(self.consumed + amt <= self.buffer.len());
+        let this = self.project();
+        *this.consumed += amt;
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use rstest::rstest;
+    use tokio::io::AsyncReadExt as _;
+    use tokio_test::io::Builder;
+
+    use super::*;
+
+    // The actual derivation of `nixpkgs.hello`.
+    const HELLO_DRV: &str = r#"Derive([("out","/nix/store/33l4p0pn0mybmqzaxfkpppyh7vx1c74p-hello-2.12.1","","")],[("/nix/store/6z1jfnqqgyqr221zgbpm30v91yfj3r45-bash-5.1-p16.drv",["out"]),("/nix/store/ap9g09fxbicj836zm88d56dn3ff4clxl-stdenv-linux.drv",["out"]),("/nix/store/pf80kikyxr63wrw56k00i1kw6ba76qik-hello-2.12.1.tar.gz.drv",["out"])],["/nix/store/9krlzvny65gdc8s7kpb6lkx8cd02c25b-default-builder.sh"],"x86_64-linux","/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16/bin/bash",["-e","/nix/store/9krlzvny65gdc8s7kpb6lkx8cd02c25b-default-builder.sh"],[("buildInputs",""),("builder","/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16/bin/bash"),("cmakeFlags",""),("configureFlags",""),("depsBuildBuild",""),("depsBuildBuildPropagated",""),("depsBuildTarget",""),("depsBuildTargetPropagated",""),("depsHostHost",""),("depsHostHostPropagated",""),("depsTargetTarget",""),("depsTargetTargetPropagated",""),("doCheck","1"),("doInstallCheck",""),("mesonFlags",""),("name","hello-2.12.1"),("nativeBuildInputs",""),("out","/nix/store/33l4p0pn0mybmqzaxfkpppyh7vx1c74p-hello-2.12.1"),("outputs","out"),("patches",""),("pname","hello"),("propagatedBuildInputs",""),("propagatedNativeBuildInputs",""),("src","/nix/store/pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz"),("stdenv","/nix/store/cp65c8nk29qq5cl1wyy5qyw103cwmax7-stdenv-linux"),("strictDeps",""),("system","x86_64-linux"),("version","2.12.1")])"#;
+
+    #[test]
+    fn test_no_patterns() {
+        let scanner: ReferenceScanner<String> = ReferenceScanner::new(vec![]);
+
+        scanner.scan(HELLO_DRV);
+
+        let result = scanner.finalise();
+
+        assert_eq!(result.len(), 0);
+    }
+
+    #[test]
+    fn test_single_match() {
+        let scanner = ReferenceScanner::new(vec![
+            "/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16".to_string(),
+        ]);
+        scanner.scan(HELLO_DRV);
+
+        let result = scanner.finalise();
+
+        assert_eq!(result.len(), 1);
+        assert!(result.contains("/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16"));
+    }
+
+    #[test]
+    fn test_multiple_matches() {
+        let candidates = vec![
+            // these exist in the drv:
+            "/nix/store/33l4p0pn0mybmqzaxfkpppyh7vx1c74p-hello-2.12.1".to_string(),
+            "/nix/store/pf80kikyxr63wrw56k00i1kw6ba76qik-hello-2.12.1.tar.gz.drv".to_string(),
+            "/nix/store/cp65c8nk29qq5cl1wyy5qyw103cwmax7-stdenv-linux".to_string(),
+            // this doesn't:
+            "/nix/store/fn7zvafq26f0c8b17brs7s95s10ibfzs-emacs-28.2.drv".to_string(),
+        ];
+
+        let scanner = ReferenceScanner::new(candidates.clone());
+        scanner.scan(HELLO_DRV);
+
+        let result = scanner.finalise();
+        assert_eq!(result.len(), 3);
+
+        for c in candidates[..3].iter() {
+            assert!(result.contains(c));
+        }
+    }
+
+    #[rstest]
+    #[case::normal(8096, 8096)]
+    #[case::small_capacity(8096, 1)]
+    #[case::small_read(1, 8096)]
+    #[case::all_small(1, 1)]
+    #[tokio::test]
+    async fn test_reference_reader(#[case] chunk_size: usize, #[case] capacity: usize) {
+        let candidates = vec![
+            // these exist in the drv:
+            "33l4p0pn0mybmqzaxfkpppyh7vx1c74p",
+            "pf80kikyxr63wrw56k00i1kw6ba76qik",
+            "cp65c8nk29qq5cl1wyy5qyw103cwmax7",
+            // this doesn't:
+            "fn7zvafq26f0c8b17brs7s95s10ibfzs",
+        ];
+        let pattern = ReferencePattern::new(candidates.clone());
+        let scanner = ReferenceScanner::new(pattern);
+        let mut mock = Builder::new();
+        for c in HELLO_DRV.as_bytes().chunks(chunk_size) {
+            mock.read(c);
+        }
+        let mock = mock.build();
+        let mut reader = ReferenceReader::with_capacity(capacity, &scanner, mock);
+        let mut s = String::new();
+        reader.read_to_string(&mut s).await.unwrap();
+        assert_eq!(s, HELLO_DRV);
+
+        let result = scanner.finalise();
+        assert_eq!(result.len(), 3);
+
+        for c in candidates[..3].iter() {
+            assert!(result.contains(c));
+        }
+    }
+
+    #[tokio::test]
+    async fn test_reference_reader_no_patterns() {
+        let pattern = ReferencePattern::new(Vec::<&str>::new());
+        let scanner = ReferenceScanner::new(pattern);
+        let mut mock = Builder::new();
+        mock.read(HELLO_DRV.as_bytes());
+        let mock = mock.build();
+        let mut reader = ReferenceReader::new(&scanner, mock);
+        let mut s = String::new();
+        reader.read_to_string(&mut s).await.unwrap();
+        assert_eq!(s, HELLO_DRV);
+
+        let result = scanner.finalise();
+        assert_eq!(result.len(), 0);
+    }
+
+    // FUTUREWORK: Test with large file
+}
diff --git a/tvix/castore/src/tests/import.rs b/tvix/castore/src/tests/import.rs
index 72fb06aea877..51d1b68a4ec9 100644
--- a/tvix/castore/src/tests/import.rs
+++ b/tvix/castore/src/tests/import.rs
@@ -2,14 +2,11 @@ use crate::blobservice::{self, BlobService};
 use crate::directoryservice;
 use crate::fixtures::*;
 use crate::import::fs::ingest_path;
-use crate::proto;
+use crate::Node;
 
 use tempfile::TempDir;
 
 #[cfg(target_family = "unix")]
-use std::os::unix::ffi::OsStrExt;
-
-#[cfg(target_family = "unix")]
 #[tokio::test]
 async fn symlink() {
     let blob_service = blobservice::from_addr("memory://").await.unwrap();
@@ -24,19 +21,19 @@ async fn symlink() {
     )
     .unwrap();
 
-    let root_node = ingest_path(
+    let root_node = ingest_path::<_, _, _, &[u8]>(
         blob_service,
         directory_service,
         tmpdir.path().join("doesntmatter"),
+        None,
     )
     .await
     .expect("must succeed");
 
     assert_eq!(
-        proto::node::Node::Symlink(proto::SymlinkNode {
-            name: "doesntmatter".into(),
-            target: "/nix/store/somewhereelse".into(),
-        }),
+        Node::Symlink {
+            target: "/nix/store/somewhereelse".try_into().unwrap()
+        },
         root_node,
     )
 }
@@ -50,21 +47,21 @@ async fn single_file() {
 
     std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap();
 
-    let root_node = ingest_path(
+    let root_node = ingest_path::<_, _, _, &[u8]>(
         blob_service.clone(),
         directory_service,
         tmpdir.path().join("root"),
+        None,
     )
     .await
     .expect("must succeed");
 
     assert_eq!(
-        proto::node::Node::File(proto::FileNode {
-            name: "root".into(),
-            digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+        Node::File {
+            digest: HELLOWORLD_BLOB_DIGEST.clone(),
             size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
             executable: false,
-        }),
+        },
         root_node,
     );
 
@@ -89,23 +86,21 @@ async fn complicated() {
     // File ``keep/.keep`
     std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap();
 
-    let root_node = ingest_path(blob_service.clone(), &directory_service, tmpdir.path())
-        .await
-        .expect("must succeed");
+    let root_node = ingest_path::<_, _, _, &[u8]>(
+        blob_service.clone(),
+        &directory_service,
+        tmpdir.path(),
+        None,
+    )
+    .await
+    .expect("must succeed");
 
     // ensure root_node matched expectations
     assert_eq!(
-        proto::node::Node::Directory(proto::DirectoryNode {
-            name: tmpdir
-                .path()
-                .file_name()
-                .unwrap()
-                .as_bytes()
-                .to_owned()
-                .into(),
-            digest: DIRECTORY_COMPLICATED.digest().into(),
+        Node::Directory {
+            digest: DIRECTORY_COMPLICATED.digest().clone(),
             size: DIRECTORY_COMPLICATED.size(),
-        }),
+        },
         root_node,
     );