about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice
diff options
context:
space:
mode:
authorYureka <tvl@yuka.dev>2024-07-29T12·34+0200
committeryuka <tvl@yuka.dev>2024-08-13T12·17+0000
commit3ca0b53840b352b24f3a315404df11458b0bdbbb (patch)
tree2635e8d67d0c334cc5760dc4205b7515c6283a77 /tvix/castore/src/directoryservice
parent5d3f3158d6102baee48d2772e85f05cfc1fac95e (diff)
refactor(tvix/castore): use Directory struct separate from proto one r/8484
This uses our own data type to deal with Directories in the castore model.

It makes some undesired states unrepresentable, removing the need for conversions and checking in various places:

 - In the protobuf, blake3 digests could have a wrong length, as proto doesn't know fixed-size fields. We now use `B3Digest`, which makes cloning cheaper, and removes the need to do size-checking everywhere.
 - In the protobuf, we had three different lists for `files`, `symlinks` and `directories`. This was mostly a protobuf size optimization, but made interacting with them a bit awkward. This has now been replaced with a list of enums, and convenience iterators to get various nodes, and add new ones.

Change-Id: I7b92691bb06d77ff3f58a5ccea94a22c16f84f04
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12057
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r--tvix/castore/src/directoryservice/bigtable.rs24
-rw-r--r--tvix/castore/src/directoryservice/combinators.rs9
-rw-r--r--tvix/castore/src/directoryservice/directory_graph.rs68
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs40
-rw-r--r--tvix/castore/src/directoryservice/memory.rs34
-rw-r--r--tvix/castore/src/directoryservice/mod.rs489
-rw-r--r--tvix/castore/src/directoryservice/object_store.rs19
-rw-r--r--tvix/castore/src/directoryservice/order_validator.rs17
-rw-r--r--tvix/castore/src/directoryservice/redb.rs43
-rw-r--r--tvix/castore/src/directoryservice/simple_putter.rs5
-rw-r--r--tvix/castore/src/directoryservice/sled.rs46
-rw-r--r--tvix/castore/src/directoryservice/tests/mod.rs67
-rw-r--r--tvix/castore/src/directoryservice/traverse.rs63
-rw-r--r--tvix/castore/src/directoryservice/utils.rs20
14 files changed, 650 insertions, 294 deletions
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs
index d10dddaf9f60..73ab4342d832 100644
--- a/tvix/castore/src/directoryservice/bigtable.rs
+++ b/tvix/castore/src/directoryservice/bigtable.rs
@@ -9,7 +9,9 @@ use std::sync::Arc;
 use tonic::async_trait;
 use tracing::{instrument, trace, warn};
 
-use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter};
+use super::{
+    utils::traverse_directory, Directory, DirectoryPutter, DirectoryService, SimplePutter,
+};
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
 
@@ -149,7 +151,7 @@ fn derive_directory_key(digest: &B3Digest) -> String {
 #[async_trait]
 impl DirectoryService for BigtableDirectoryService {
     #[instrument(skip(self, digest), err, fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let mut client = self.client.clone();
         let directory_key = derive_directory_key(digest);
 
@@ -241,28 +243,20 @@ impl DirectoryService for BigtableDirectoryService {
 
         // Try to parse the value into a Directory message.
         let directory = proto::Directory::decode(Bytes::from(row_cell.value))
-            .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?;
-
-        // validate the Directory.
-        directory
-            .validate()
+            .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?
+            .try_into()
             .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?;
 
         Ok(Some(directory))
     }
 
     #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         let directory_digest = directory.digest();
         let mut client = self.client.clone();
         let directory_key = derive_directory_key(&directory_digest);
 
-        // Ensure the directory we're trying to upload passes validation
-        directory
-            .validate()
-            .map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?;
-
-        let data = directory.encode_to_vec();
+        let data = proto::Directory::from(directory).encode_to_vec();
         if data.len() as u64 > CELL_SIZE_LIMIT {
             return Err(Error::StorageError(
                 "Directory exceeds cell limit on Bigtable".into(),
@@ -310,7 +304,7 @@ impl DirectoryService for BigtableDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs
index 0fdc82c16cb0..4283142231f9 100644
--- a/tvix/castore/src/directoryservice/combinators.rs
+++ b/tvix/castore/src/directoryservice/combinators.rs
@@ -7,10 +7,9 @@ use futures::TryStreamExt;
 use tonic::async_trait;
 use tracing::{instrument, trace};
 
-use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
+use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::directoryservice::DirectoryPutter;
-use crate::proto;
 use crate::B3Digest;
 use crate::Error;
 
@@ -40,7 +39,7 @@ where
     DS2: DirectoryService + Clone + 'static,
 {
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         match self.near.get(digest).await? {
             Some(directory) => {
                 trace!("serving from cache");
@@ -82,7 +81,7 @@ where
     }
 
     #[instrument(skip_all)]
-    async fn put(&self, _directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> {
         Err(Error::StorageError("unimplemented".to_string()))
     }
 
@@ -90,7 +89,7 @@ where
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         let near = self.near.clone();
         let far = self.far.clone();
         let digest = root_directory_digest.clone();
diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs
index e6b9b163370c..aa60f68a3197 100644
--- a/tvix/castore/src/directoryservice/directory_graph.rs
+++ b/tvix/castore/src/directoryservice/directory_graph.rs
@@ -10,10 +10,8 @@ use petgraph::{
 use tracing::instrument;
 
 use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
-use crate::{
-    proto::{self, Directory, DirectoryNode},
-    B3Digest,
-};
+use super::{Directory, DirectoryNode};
+use crate::B3Digest;
 
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
@@ -88,7 +86,7 @@ fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> {
 impl DirectoryGraph<LeavesToRootValidator> {
     /// Insert a new Directory into the closure
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
-    pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
         if !self.order_validator.add_directory(&directory) {
             return Err(Error::ValidationError(
                 "unknown directory was referenced".into(),
@@ -108,7 +106,7 @@ impl DirectoryGraph<RootToLeavesValidator> {
 
     /// Insert a new Directory into the closure
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
-    pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
         let digest = directory.digest();
         if !self.order_validator.digest_allowed(&digest) {
             return Err(Error::ValidationError("unexpected digest".into()));
@@ -129,12 +127,7 @@ impl<O: OrderValidator> DirectoryGraph<O> {
     }
 
     /// Adds a directory which has already been confirmed to be in-order to the graph
-    pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> {
-        // Do some basic validation
-        directory
-            .validate()
-            .map_err(|e| Error::ValidationError(e.to_string()))?;
-
+    pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> {
         let digest = directory.digest();
 
         // Teach the graph about the existence of a node with this digest
@@ -149,12 +142,10 @@ impl<O: OrderValidator> DirectoryGraph<O> {
         }
 
         // set up edges to all child directories
-        for subdir in &directory.directories {
-            let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap();
-
+        for subdir in directory.directories() {
             let child_ix = *self
                 .digest_to_node_ix
-                .entry(subdir_digest)
+                .entry(subdir.digest.clone())
                 .or_insert_with(|| self.graph.add_node(None));
 
             let pending_edge_check = match &self.graph[child_ix] {
@@ -266,37 +257,36 @@ impl ValidatedDirectoryGraph {
             .filter_map(move |i| nodes[i.index()].weight.take())
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use crate::{
-        fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C},
-        proto::{self, Directory},
-    };
-    use lazy_static::lazy_static;
-    use rstest::rstest;
-
-    lazy_static! {
+/*
         pub static ref BROKEN_DIRECTORY : Directory = Directory {
-            symlinks: vec![proto::SymlinkNode {
+            symlinks: vec![SymlinkNode {
                 name: "".into(), // invalid name!
                 target: "doesntmatter".into(),
             }],
             ..Default::default()
         };
+*/
+#[cfg(test)]
+mod tests {
+    use crate::directoryservice::{Directory, DirectoryNode, Node};
+    use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
+    use lazy_static::lazy_static;
+    use rstest::rstest;
 
-        pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory {
-            directories: vec![proto::DirectoryNode {
-                name: "foo".into(),
-                digest: DIRECTORY_A.digest().into(),
-                size: DIRECTORY_A.size() + 42, // wrong!
-            }],
-            ..Default::default()
+    use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator};
+
+    lazy_static! {
+        pub static ref BROKEN_PARENT_DIRECTORY: Directory = {
+            let mut dir = Directory::new();
+            dir.add(Node::Directory(DirectoryNode::new(
+                "foo".into(),
+                DIRECTORY_A.digest(),
+                DIRECTORY_A.size() + 42, // wrong!
+            ).unwrap())).unwrap();
+            dir
         };
     }
 
-    use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator};
-
     #[rstest]
     /// Uploading an empty directory should succeed.
     #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))]
@@ -312,8 +302,6 @@ mod tests {
     #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)]
     /// Uploading B (referring to A) should fail immediately, because A was never uploaded.
     #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)]
-    /// Uploading a directory failing validation should fail immediately.
-    #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)]
     /// Uploading a directory which refers to another Directory with a wrong size should fail.
     #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)]
     fn test_uploads(
@@ -366,8 +354,6 @@ mod tests {
     #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)]
     /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root).
     #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)]
-    /// Downloading a directory failing validation should fail immediately.
-    #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)]
     /// Downloading a directory which refers to another Directory with a wrong size should fail.
     #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)]
     fn test_downloads(
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index 4dc3931ed410..2079514d2b79 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -1,8 +1,9 @@
 use std::collections::HashSet;
 
-use super::{DirectoryPutter, DirectoryService};
+use super::{Directory, DirectoryPutter, DirectoryService};
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::proto::{self, get_directory_request::ByWhat};
+use crate::ValidateDirectoryError;
 use crate::{B3Digest, Error};
 use async_stream::try_stream;
 use futures::stream::BoxStream;
@@ -41,10 +42,7 @@ where
     T::Future: Send,
 {
     #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))]
-    async fn get(
-        &self,
-        digest: &B3Digest,
-    ) -> Result<Option<crate::proto::Directory>, crate::Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
         let digest_cpy = digest.clone();
@@ -72,15 +70,10 @@ where
                         "requested directory with digest {}, but got {}",
                         digest, actual_digest
                     )))
-                } else if let Err(e) = directory.validate() {
-                    // Validate the Directory itself is valid.
-                    warn!("directory failed validation: {}", e.to_string());
-                    Err(crate::Error::StorageError(format!(
-                        "directory {} failed validation: {}",
-                        digest, e,
-                    )))
                 } else {
-                    Ok(Some(directory))
+                    Ok(Some(directory.try_into().map_err(|_| {
+                        Error::StorageError("invalid root digest length in response".to_string())
+                    })?))
                 }
             }
             Ok(None) => Ok(None),
@@ -90,11 +83,11 @@ where
     }
 
     #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> {
         let resp = self
             .grpc_client
             .clone()
-            .put(tokio_stream::once(directory))
+            .put(tokio_stream::once(proto::Directory::from(directory)))
             .await;
 
         match resp {
@@ -113,7 +106,7 @@ where
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         let mut grpc_client = self.grpc_client.clone();
         let root_directory_digest = root_directory_digest.clone();
 
@@ -135,14 +128,6 @@ where
             loop {
                 match stream.message().await {
                     Ok(Some(directory)) => {
-                        // validate the directory itself.
-                        if let Err(e) = directory.validate() {
-                            Err(crate::Error::StorageError(format!(
-                                "directory {} failed validation: {}",
-                                directory.digest(),
-                                e,
-                            )))?;
-                        }
                         // validate we actually expected that directory, and move it from expected to received.
                         let directory_digest = directory.digest();
                         let was_expected = expected_directory_digests.remove(&directory_digest);
@@ -168,6 +153,9 @@ where
                                 .insert(child_directory_digest);
                         }
 
+                        let directory = directory.try_into()
+                            .map_err(|e: ValidateDirectoryError| Error::StorageError(e.to_string()))?;
+
                         yield directory;
                     },
                     Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => {
@@ -279,11 +267,11 @@ pub struct GRPCPutter {
 #[async_trait]
 impl DirectoryPutter for GRPCPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> {
         match self.rq {
             // If we're not already closed, send the directory to directory_sender.
             Some((_, ref directory_sender)) => {
-                if directory_sender.send(directory).is_err() {
+                if directory_sender.send(directory.into()).is_err() {
                     // If the channel has been prematurely closed, invoke close (so we can peek at the error code)
                     // That error code is much more helpful, because it
                     // contains the error message from the server.
diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs
index ada4606a5a57..b039d9bc7d84 100644
--- a/tvix/castore/src/directoryservice/memory.rs
+++ b/tvix/castore/src/directoryservice/memory.rs
@@ -1,4 +1,4 @@
-use crate::{proto, B3Digest, Error};
+use crate::{B3Digest, Error};
 use futures::stream::BoxStream;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -7,8 +7,9 @@ use tonic::async_trait;
 use tracing::{instrument, warn};
 
 use super::utils::traverse_directory;
-use super::{DirectoryPutter, DirectoryService, SimplePutter};
+use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter};
 use crate::composition::{CompositionContext, ServiceBuilder};
+use crate::proto;
 
 #[derive(Clone, Default)]
 pub struct MemoryDirectoryService {
@@ -18,7 +19,7 @@ pub struct MemoryDirectoryService {
 #[async_trait]
 impl DirectoryService for MemoryDirectoryService {
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let db = self.db.read().await;
 
         match db.get(digest) {
@@ -37,35 +38,20 @@ impl DirectoryService for MemoryDirectoryService {
                     )));
                 }
 
-                // Validate the Directory itself is valid.
-                if let Err(e) = directory.validate() {
-                    warn!("directory failed validation: {}", e.to_string());
-                    return Err(Error::StorageError(format!(
-                        "directory {} failed validation: {}",
-                        actual_digest, e,
-                    )));
-                }
-
-                Ok(Some(directory.clone()))
+                Ok(Some(directory.clone().try_into().map_err(|e| {
+                    crate::Error::StorageError(format!("corrupted directory: {}", e))
+                })?))
             }
         }
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         let digest = directory.digest();
 
-        // validate the directory itself.
-        if let Err(e) = directory.validate() {
-            return Err(Error::InvalidRequest(format!(
-                "directory {} failed validation: {}",
-                digest, e,
-            )));
-        }
-
         // store it
         let mut db = self.db.write().await;
-        db.insert(digest.clone(), directory);
+        db.insert(digest.clone(), directory.into());
 
         Ok(digest)
     }
@@ -74,7 +60,7 @@ impl DirectoryService for MemoryDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
index 17a78b179349..0141a48f75bc 100644
--- a/tvix/castore/src/directoryservice/mod.rs
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -1,6 +1,9 @@
 use crate::composition::{Registry, ServiceBuilder};
-use crate::{proto, B3Digest, Error};
+use crate::proto;
+use crate::{B3Digest, Error};
+use crate::{ValidateDirectoryError, ValidateNodeError};
 
+use bytes::Bytes;
 use futures::stream::BoxStream;
 use tonic::async_trait;
 mod combinators;
@@ -38,7 +41,7 @@ mod bigtable;
 pub use self::bigtable::{BigtableDirectoryService, BigtableParameters};
 
 /// The base trait all Directory services need to implement.
-/// This is a simple get and put of [crate::proto::Directory], returning their
+/// This is a simple get and put of [Directory], returning their
 /// digest.
 #[async_trait]
 pub trait DirectoryService: Send + Sync {
@@ -50,14 +53,14 @@ pub trait DirectoryService: Send + Sync {
     /// Directory digests that are at the "root", aka the last element that's
     /// sent to a DirectoryPutter. This makes sense for implementations bundling
     /// closures of directories together in batches.
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error>;
     /// Uploads a single Directory message, and returns the calculated
     /// digest, or an error. An error *must* also be returned if the message is
     /// not valid.
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error>;
 
-    /// Looks up a closure of [proto::Directory].
-    /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`,
+    /// Looks up a closure of [Directory].
+    /// Ideally this would be a `impl Stream<Item = Result<Directory, Error>>`,
     /// and we'd be able to add a default implementation for it here, but
     /// we can't have that yet.
     ///
@@ -75,9 +78,9 @@ pub trait DirectoryService: Send + Sync {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>>;
+    ) -> BoxStream<'static, Result<Directory, Error>>;
 
-    /// Allows persisting a closure of [proto::Directory], which is a graph of
+    /// Allows persisting a closure of [Directory], which is a graph of
     /// connected Directory messages.
     fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>;
 }
@@ -87,18 +90,18 @@ impl<A> DirectoryService for A
 where
     A: AsRef<dyn DirectoryService> + Send + Sync,
 {
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         self.as_ref().get(digest).await
     }
 
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         self.as_ref().put(directory).await
     }
 
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         self.as_ref().get_recursive(root_directory_digest)
     }
 
@@ -107,7 +110,7 @@ where
     }
 }
 
-/// Provides a handle to put a closure of connected [proto::Directory] elements.
+/// Provides a handle to put a closure of connected [Directory] elements.
 ///
 /// The consumer can periodically call [DirectoryPutter::put], starting from the
 /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to
@@ -119,12 +122,12 @@ where
 /// but a single file or symlink.
 #[async_trait]
 pub trait DirectoryPutter: Send {
-    /// Put a individual [proto::Directory] into the store.
+    /// Put a individual [Directory] into the store.
     /// Error semantics and behaviour is up to the specific implementation of
     /// this trait.
     /// Due to bursting, the returned error might refer to an object previously
     /// sent via `put`.
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
+    async fn put(&mut self, directory: Directory) -> Result<(), Error>;
 
     /// Close the stream, and wait for any errors.
     /// If there's been any invalid Directory message uploaded, and error *must*
@@ -145,3 +148,461 @@ pub(crate) fn register_directory_services(reg: &mut Registry) {
         reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable");
     }
 }
+
+/// A Directory can contain Directory, File or Symlink nodes.
+/// Each of these nodes have a name attribute, which is the basename in that
+/// directory and node type specific attributes.
+/// While a Node by itself may have any name, the names of Directory entries:
+///  - MUST not contain slashes or null bytes
+///  - MUST not be '.' or '..'
+///  - MUST be unique across all three lists
+#[derive(Default, Debug, Clone, PartialEq, Eq)]
+pub struct Directory {
+    nodes: Vec<Node>,
+}
+
+/// A DirectoryNode is a pointer to a [Directory], by its [Directory::digest].
+/// It also gives it a `name` and `size`.
+/// Such a node is either an element in the [Directory] it itself is contained in,
+/// or a standalone root node./
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct DirectoryNode {
+    /// The (base)name of the directory
+    name: Bytes,
+    /// The blake3 hash of a Directory message, serialized in protobuf canonical form.
+    digest: B3Digest,
+    /// Number of child elements in the Directory referred to by `digest`.
+    /// Calculated by summing up the numbers of nodes, and for each directory.
+    /// its size field. Can be used for inode allocation.
+    /// This field is precisely as verifiable as any other Merkle tree edge.
+    /// Resolve `digest`, and you can compute it incrementally. Resolve the entire
+    /// tree, and you can fully compute it from scratch.
+    /// A credulous implementation won't reject an excessive size, but this is
+    /// harmless: you'll have some ordinals without nodes. Undersizing is obvious
+    /// and easy to reject: you won't have an ordinal for some nodes.
+    size: u64,
+}
+
+impl DirectoryNode {
+    pub fn new(name: Bytes, digest: B3Digest, size: u64) -> Result<Self, ValidateNodeError> {
+        Ok(Self { name, digest, size })
+    }
+
+    pub fn digest(&self) -> &B3Digest {
+        &self.digest
+    }
+    pub fn size(&self) -> u64 {
+        self.size
+    }
+}
+
+/// A FileNode represents a regular or executable file in a Directory or at the root.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct FileNode {
+    /// The (base)name of the file
+    name: Bytes,
+
+    /// The blake3 digest of the file contents
+    digest: B3Digest,
+
+    /// The file content size
+    size: u64,
+
+    /// Whether the file is executable
+    executable: bool,
+}
+
+impl FileNode {
+    pub fn new(
+        name: Bytes,
+        digest: B3Digest,
+        size: u64,
+        executable: bool,
+    ) -> Result<Self, ValidateNodeError> {
+        Ok(Self {
+            name,
+            digest,
+            size,
+            executable,
+        })
+    }
+
+    pub fn digest(&self) -> &B3Digest {
+        &self.digest
+    }
+
+    pub fn size(&self) -> u64 {
+        self.size
+    }
+
+    pub fn executable(&self) -> bool {
+        self.executable
+    }
+}
+
+/// A SymlinkNode represents a symbolic link in a Directory or at the root.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct SymlinkNode {
+    /// The (base)name of the symlink
+    name: Bytes,
+    /// The target of the symlink.
+    target: Bytes,
+}
+
+impl SymlinkNode {
+    pub fn new(name: Bytes, target: Bytes) -> Result<Self, ValidateNodeError> {
+        if target.is_empty() || target.contains(&b'\0') {
+            return Err(ValidateNodeError::InvalidSymlinkTarget(target));
+        }
+        Ok(Self { name, target })
+    }
+
+    pub fn target(&self) -> &bytes::Bytes {
+        &self.target
+    }
+}
+
+/// A Node is either a [DirectoryNode], [FileNode] or [SymlinkNode].
+/// While a Node by itself may have any name, only those matching specific requirements
+/// can can be added as entries to a [Directory] (see the documentation on [Directory] for details).
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Node {
+    Directory(DirectoryNode),
+    File(FileNode),
+    Symlink(SymlinkNode),
+}
+
+/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode]
+/// and [Node], so we can ask all of them for the name easily.
+pub trait NamedNode {
+    fn get_name(&self) -> &bytes::Bytes;
+}
+
+impl NamedNode for &FileNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+impl NamedNode for FileNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+
+impl NamedNode for &DirectoryNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+impl NamedNode for DirectoryNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+
+impl NamedNode for &SymlinkNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+impl NamedNode for SymlinkNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+
+impl NamedNode for &Node {
+    fn get_name(&self) -> &bytes::Bytes {
+        match self {
+            Node::File(node_file) => &node_file.name,
+            Node::Directory(node_directory) => &node_directory.name,
+            Node::Symlink(node_symlink) => &node_symlink.name,
+        }
+    }
+}
+impl NamedNode for Node {
+    fn get_name(&self) -> &bytes::Bytes {
+        match self {
+            Node::File(node_file) => &node_file.name,
+            Node::Directory(node_directory) => &node_directory.name,
+            Node::Symlink(node_symlink) => &node_symlink.name,
+        }
+    }
+}
+
+impl Node {
+    /// Returns the node with a new name.
+    pub fn rename(self, name: bytes::Bytes) -> Self {
+        match self {
+            Node::Directory(n) => Node::Directory(DirectoryNode { name, ..n }),
+            Node::File(n) => Node::File(FileNode { name, ..n }),
+            Node::Symlink(n) => Node::Symlink(SymlinkNode { name, ..n }),
+        }
+    }
+}
+
+impl PartialOrd for Node {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for Node {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.get_name().cmp(other.get_name())
+    }
+}
+
+impl PartialOrd for FileNode {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for FileNode {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.get_name().cmp(other.get_name())
+    }
+}
+
+impl PartialOrd for DirectoryNode {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for DirectoryNode {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.get_name().cmp(other.get_name())
+    }
+}
+
+impl PartialOrd for SymlinkNode {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for SymlinkNode {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.get_name().cmp(other.get_name())
+    }
+}
+
+fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> {
+    iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i))
+}
+
+impl Directory {
+    pub fn new() -> Self {
+        Directory { nodes: vec![] }
+    }
+
+    /// The size of a directory is the number of all regular and symlink elements,
+    /// the number of directory elements, and their size fields.
+    pub fn size(&self) -> u64 {
+        // It's impossible to create a Directory where the size overflows, because we
+        // check before every add() that the size won't overflow.
+        (self.nodes.len() as u64) + self.directories().map(|e| e.size).sum::<u64>()
+    }
+
+    /// Calculates the digest of a Directory, which is the blake3 hash of a
+    /// Directory protobuf message, serialized in protobuf canonical form.
+    pub fn digest(&self) -> B3Digest {
+        proto::Directory::from(self).digest()
+    }
+
+    /// Allows iterating over all nodes (directories, files and symlinks)
+    /// ordered by their name.
+    pub fn nodes(&self) -> impl Iterator<Item = &Node> + Send + Sync + '_ {
+        self.nodes.iter()
+    }
+
+    /// Allows iterating over the FileNode entries of this directory
+    /// ordered by their name
+    pub fn files(&self) -> impl Iterator<Item = &FileNode> + Send + Sync + '_ {
+        self.nodes.iter().filter_map(|node| match node {
+            Node::File(n) => Some(n),
+            _ => None,
+        })
+    }
+
+    /// Allows iterating over the subdirectories of this directory
+    /// ordered by their name
+    pub fn directories(&self) -> impl Iterator<Item = &DirectoryNode> + Send + Sync + '_ {
+        self.nodes.iter().filter_map(|node| match node {
+            Node::Directory(n) => Some(n),
+            _ => None,
+        })
+    }
+
+    /// Allows iterating over the SymlinkNode entries of this directory
+    /// ordered by their name
+    pub fn symlinks(&self) -> impl Iterator<Item = &SymlinkNode> + Send + Sync + '_ {
+        self.nodes.iter().filter_map(|node| match node {
+            Node::Symlink(n) => Some(n),
+            _ => None,
+        })
+    }
+
+    /// Checks a Node name for validity as a directory entry
+    /// We disallow slashes, null bytes, '.', '..' and the empty string.
+    pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> {
+        if name.is_empty()
+            || name == b".."
+            || name == b"."
+            || name.contains(&0x00)
+            || name.contains(&b'/')
+        {
+            Err(ValidateNodeError::InvalidName(name.to_owned().into()))
+        } else {
+            Ok(())
+        }
+    }
+
+    /// Adds the specified [Node] to the [Directory], preserving sorted entries.
+    ///
+    /// Inserting an element that already exists with the same name in the directory will yield an
+    /// error.
+    /// Inserting an element will validate that its name fulfills the stricter requirements for
+    /// directory entries and yield an error if it is not.
+    pub fn add(&mut self, node: Node) -> Result<(), ValidateDirectoryError> {
+        Self::validate_node_name(node.get_name())
+            .map_err(|e| ValidateDirectoryError::InvalidNode(node.get_name().clone().into(), e))?;
+
+        // Check that the even after adding this new directory entry, the size calculation will not
+        // overflow
+        // FUTUREWORK: add some sort of batch add interface which only does this check once with
+        // all the to-be-added entries
+        checked_sum([
+            self.size(),
+            1,
+            match node {
+                Node::Directory(ref dir) => dir.size,
+                _ => 0,
+            },
+        ])
+        .ok_or(ValidateDirectoryError::SizeOverflow)?;
+
+        // This assumes the [Directory] is sorted, since we don't allow accessing the nodes list
+        // directly and all previous inserts should have been in-order
+        let pos = match self
+            .nodes
+            .binary_search_by_key(&node.get_name(), |n| n.get_name())
+        {
+            Err(pos) => pos, // There is no node with this name; good!
+            Ok(_) => {
+                return Err(ValidateDirectoryError::DuplicateName(
+                    node.get_name().to_vec(),
+                ))
+            }
+        };
+
+        self.nodes.insert(pos, node);
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::{Directory, DirectoryNode, FileNode, Node, SymlinkNode};
+    use crate::fixtures::DUMMY_DIGEST;
+    use crate::ValidateDirectoryError;
+
+    #[test]
+    fn add_nodes_to_directory() {
+        let mut d = Directory::new();
+
+        d.add(Node::Directory(
+            DirectoryNode::new("b".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::Directory(
+            DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::Directory(
+            DirectoryNode::new("z".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
+        ))
+        .unwrap();
+
+        d.add(Node::File(
+            FileNode::new("f".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::File(
+            FileNode::new("c".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::File(
+            FileNode::new("g".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
+        ))
+        .unwrap();
+
+        d.add(Node::Symlink(
+            SymlinkNode::new("t".into(), "a".into()).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::Symlink(
+            SymlinkNode::new("o".into(), "a".into()).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::Symlink(
+            SymlinkNode::new("e".into(), "a".into()).unwrap(),
+        ))
+        .unwrap();
+
+        // Convert to proto struct and back to ensure we are not generating any invalid structures
+        crate::directoryservice::Directory::try_from(crate::proto::Directory::from(d))
+            .expect("directory should be valid");
+    }
+
+    #[test]
+    fn validate_overflow() {
+        let mut d = Directory::new();
+
+        assert_eq!(
+            d.add(Node::Directory(
+                DirectoryNode::new("foo".into(), DUMMY_DIGEST.clone(), u64::MAX).unwrap(),
+            )),
+            Err(ValidateDirectoryError::SizeOverflow)
+        );
+    }
+
+    #[test]
+    fn add_duplicate_node_to_directory() {
+        let mut d = Directory::new();
+
+        d.add(Node::Directory(
+            DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
+        ))
+        .unwrap();
+        assert_eq!(
+            format!(
+                "{}",
+                d.add(Node::File(
+                    FileNode::new("a".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
+                ))
+                .expect_err("adding duplicate dir entry must fail")
+            ),
+            "\"a\" is a duplicate name"
+        );
+    }
+
+    /// Attempt to add a directory entry with a name which should be rejected.
+    #[tokio::test]
+    async fn directory_reject_invalid_name() {
+        let mut dir = Directory::new();
+        assert!(
+            dir.add(Node::Symlink(
+                SymlinkNode::new(
+                    "".into(), // wrong! can not be added to directory
+                    "doesntmatter".into(),
+                )
+                .unwrap()
+            ))
+            .is_err(),
+            "invalid symlink entry be rejected"
+        );
+    }
+}
diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs
index a9a2cc8ef5c0..6e71c2356def 100644
--- a/tvix/castore/src/directoryservice/object_store.rs
+++ b/tvix/castore/src/directoryservice/object_store.rs
@@ -17,7 +17,8 @@ use tracing::{instrument, trace, warn, Level};
 use url::Url;
 
 use super::{
-    DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator,
+    Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
+    RootToLeavesValidator,
 };
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
@@ -78,13 +79,13 @@ impl DirectoryService for ObjectStoreDirectoryService {
     /// This is the same steps as for get_recursive anyways, so we just call get_recursive and
     /// return the first element of the stream and drop the request.
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         self.get_recursive(digest).take(1).next().await.transpose()
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
-        if !directory.directories.is_empty() {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
+        if directory.directories().next().is_some() {
             return Err(Error::InvalidRequest(
                     "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(),
             ));
@@ -99,7 +100,7 @@ impl DirectoryService for ObjectStoreDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         // Check that we are not passing on bogus from the object store to the client, and that the
         // trust chain from the root digest to the leaves is intact
         let mut order_validator =
@@ -145,6 +146,10 @@ impl DirectoryService for ObjectStoreDirectoryService {
                             warn!("unable to parse directory {}: {}", digest, e);
                             Error::StorageError(e.to_string())
                         })?;
+                        let directory = Directory::try_from(directory).map_err(|e| {
+                            warn!("unable to convert directory {}: {}", digest, e);
+                            Error::StorageError(e.to_string())
+                        })?;
 
                         // Allow the children to appear next
                         order_validator.add_directory_unchecked(&directory);
@@ -244,7 +249,7 @@ impl ObjectStoreDirectoryPutter {
 #[async_trait]
 impl DirectoryPutter for ObjectStoreDirectoryPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
@@ -302,7 +307,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter {
 
                 for directory in directories {
                     directories_sink
-                        .send(directory.encode_to_vec().into())
+                        .send(proto::Directory::from(directory).encode_to_vec().into())
                         .await?;
                 }
 
diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs
index 6045f5d24198..17431fc8d3b0 100644
--- a/tvix/castore/src/directoryservice/order_validator.rs
+++ b/tvix/castore/src/directoryservice/order_validator.rs
@@ -1,7 +1,8 @@
 use std::collections::HashSet;
 use tracing::warn;
 
-use crate::{proto::Directory, B3Digest};
+use super::Directory;
+use crate::B3Digest;
 
 pub trait OrderValidator {
     /// Update the order validator's state with the directory
@@ -47,10 +48,9 @@ impl RootToLeavesValidator {
             self.expected_digests.insert(directory.digest());
         }
 
-        for subdir in &directory.directories {
+        for subdir in directory.directories() {
             // Allow the children to appear next
-            let subdir_digest = subdir.digest.clone().try_into().unwrap();
-            self.expected_digests.insert(subdir_digest);
+            self.expected_digests.insert(subdir.digest.clone());
         }
     }
 }
@@ -79,12 +79,11 @@ impl OrderValidator for LeavesToRootValidator {
     fn add_directory(&mut self, directory: &Directory) -> bool {
         let digest = directory.digest();
 
-        for subdir in &directory.directories {
-            let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory()
-            if !self.allowed_references.contains(&subdir_digest) {
+        for subdir in directory.directories() {
+            if !self.allowed_references.contains(&subdir.digest) {
                 warn!(
                     directory.digest = %digest,
-                    subdirectory.digest = %subdir_digest,
+                    subdirectory.digest = %subdir.digest,
                     "unexpected directory reference"
                 );
                 return false;
@@ -101,8 +100,8 @@ impl OrderValidator for LeavesToRootValidator {
 mod tests {
     use super::{LeavesToRootValidator, RootToLeavesValidator};
     use crate::directoryservice::order_validator::OrderValidator;
+    use crate::directoryservice::Directory;
     use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
-    use crate::proto::Directory;
     use rstest::rstest;
 
     #[rstest]
diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs
index 51dc87f92574..d253df503bb3 100644
--- a/tvix/castore/src/directoryservice/redb.rs
+++ b/tvix/castore/src/directoryservice/redb.rs
@@ -5,15 +5,15 @@ use std::{path::PathBuf, sync::Arc};
 use tonic::async_trait;
 use tracing::{instrument, warn};
 
+use super::{
+    traverse_directory, Directory, DirectoryGraph, DirectoryPutter, DirectoryService,
+    LeavesToRootValidator,
+};
 use crate::{
     composition::{CompositionContext, ServiceBuilder},
     digests, proto, B3Digest, Error,
 };
 
-use super::{
-    traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
-};
-
 const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> =
     TableDefinition::new("directory");
 
@@ -69,7 +69,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
 #[async_trait]
 impl DirectoryService for RedbDirectoryService {
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let db = self.db.clone();
 
         // Retrieves the protobuf-encoded Directory for the corresponding digest.
@@ -107,13 +107,10 @@ impl DirectoryService for RedbDirectoryService {
         let directory = match proto::Directory::decode(&*directory_data.value()) {
             Ok(dir) => {
                 // The returned Directory must be valid.
-                if let Err(e) = dir.validate() {
+                dir.try_into().map_err(|e| {
                     warn!(err=%e, "Directory failed validation");
-                    return Err(Error::StorageError(
-                        "Directory failed validation".to_string(),
-                    ));
-                }
-                dir
+                    Error::StorageError("Directory failed validation".to_string())
+                })?
             }
             Err(e) => {
                 warn!(err=%e, "failed to parse Directory");
@@ -125,26 +122,21 @@ impl DirectoryService for RedbDirectoryService {
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         tokio::task::spawn_blocking({
             let db = self.db.clone();
             move || {
                 let digest = directory.digest();
 
-                // Validate the directory.
-                if let Err(e) = directory.validate() {
-                    warn!(err=%e, "Directory failed validation");
-                    return Err(Error::StorageError(
-                        "Directory failed validation".to_string(),
-                    ));
-                }
-
                 // Store the directory in the table.
                 let txn = db.begin_write()?;
                 {
                     let mut table = txn.open_table(DIRECTORY_TABLE)?;
                     let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into();
-                    table.insert(digest_as_array, directory.encode_to_vec())?;
+                    table.insert(
+                        digest_as_array,
+                        proto::Directory::from(directory).encode_to_vec(),
+                    )?;
                 }
                 txn.commit()?;
 
@@ -158,7 +150,7 @@ impl DirectoryService for RedbDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single
         // redb transaction to avoid constantly closing and opening new transactions for the
         // database.
@@ -185,7 +177,7 @@ pub struct RedbDirectoryPutter {
 #[async_trait]
 impl DirectoryPutter for RedbDirectoryPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
@@ -228,7 +220,10 @@ impl DirectoryPutter for RedbDirectoryPutter {
                             for directory in directories {
                                 let digest_as_array: [u8; digests::B3_LEN] =
                                     directory.digest().into();
-                                table.insert(digest_as_array, directory.encode_to_vec())?;
+                                table.insert(
+                                    digest_as_array,
+                                    proto::Directory::from(directory).encode_to_vec(),
+                                )?;
                             }
                         }
 
diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs
index dc54e3d11d18..b4daaee61b22 100644
--- a/tvix/castore/src/directoryservice/simple_putter.rs
+++ b/tvix/castore/src/directoryservice/simple_putter.rs
@@ -1,7 +1,6 @@
 use super::DirectoryPutter;
 use super::DirectoryService;
-use super::{DirectoryGraph, LeavesToRootValidator};
-use crate::proto;
+use super::{Directory, DirectoryGraph, LeavesToRootValidator};
 use crate::B3Digest;
 use crate::Error;
 use tonic::async_trait;
@@ -29,7 +28,7 @@ impl<DS: DirectoryService> SimplePutter<DS> {
 #[async_trait]
 impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs
index 5766dec1a5c2..4f3a860d14e4 100644
--- a/tvix/castore/src/directoryservice/sled.rs
+++ b/tvix/castore/src/directoryservice/sled.rs
@@ -1,5 +1,3 @@
-use crate::proto::Directory;
-use crate::{proto, B3Digest, Error};
 use futures::stream::BoxStream;
 use prost::Message;
 use std::ops::Deref;
@@ -9,8 +7,9 @@ use tonic::async_trait;
 use tracing::{instrument, warn};
 
 use super::utils::traverse_directory;
-use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator};
+use super::{Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator};
 use crate::composition::{CompositionContext, ServiceBuilder};
+use crate::{proto, B3Digest, Error};
 
 #[derive(Clone)]
 pub struct SledDirectoryService {
@@ -44,7 +43,7 @@ impl SledDirectoryService {
 #[async_trait]
 impl DirectoryService for SledDirectoryService {
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let resp = tokio::task::spawn_blocking({
             let db = self.db.clone();
             let digest = digest.clone();
@@ -61,7 +60,7 @@ impl DirectoryService for SledDirectoryService {
             None => Ok(None),
 
             // The directory was found, try to parse the data as Directory message
-            Some(data) => match Directory::decode(&*data) {
+            Some(data) => match proto::Directory::decode(&*data) {
                 Ok(directory) => {
                     // Validate the retrieved Directory indeed has the
                     // digest we expect it to have, to detect corruptions.
@@ -73,14 +72,10 @@ impl DirectoryService for SledDirectoryService {
                         )));
                     }
 
-                    // Validate the Directory itself is valid.
-                    if let Err(e) = directory.validate() {
-                        warn!("directory failed validation: {}", e.to_string());
-                        return Err(Error::StorageError(format!(
-                            "directory {} failed validation: {}",
-                            actual_digest, e,
-                        )));
-                    }
+                    let directory = directory.try_into().map_err(|e| {
+                        warn!("failed to retrieve directory: {}", e);
+                        Error::StorageError(format!("failed to retrieve directory: {}", e))
+                    })?;
 
                     Ok(Some(directory))
                 }
@@ -93,22 +88,18 @@ impl DirectoryService for SledDirectoryService {
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         tokio::task::spawn_blocking({
             let db = self.db.clone();
             move || {
                 let digest = directory.digest();
 
-                // validate the directory itself.
-                if let Err(e) = directory.validate() {
-                    return Err(Error::InvalidRequest(format!(
-                        "directory {} failed validation: {}",
-                        digest, e,
-                    )));
-                }
                 // store it
-                db.insert(digest.as_slice(), directory.encode_to_vec())
-                    .map_err(|e| Error::StorageError(e.to_string()))?;
+                db.insert(
+                    digest.as_slice(),
+                    proto::Directory::from(directory).encode_to_vec(),
+                )
+                .map_err(|e| Error::StorageError(e.to_string()))?;
 
                 Ok(digest)
             }
@@ -120,7 +111,7 @@ impl DirectoryService for SledDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
@@ -215,7 +206,7 @@ pub struct SledDirectoryPutter {
 #[async_trait]
 impl DirectoryPutter for SledDirectoryPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
@@ -252,7 +243,10 @@ impl DirectoryPutter for SledDirectoryPutter {
 
                         let mut batch = sled::Batch::default();
                         for directory in directories {
-                            batch.insert(directory.digest().as_slice(), directory.encode_to_vec());
+                            batch.insert(
+                                directory.digest().as_slice(),
+                                proto::Directory::from(directory).encode_to_vec(),
+                            );
                         }
 
                         tree.apply_batch(batch).map_err(|e| {
diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs
index b698f70ea469..3923e66dc2c2 100644
--- a/tvix/castore/src/directoryservice/tests/mod.rs
+++ b/tvix/castore/src/directoryservice/tests/mod.rs
@@ -8,10 +8,8 @@ use rstest_reuse::{self, *};
 
 use super::DirectoryService;
 use crate::directoryservice;
-use crate::{
-    fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D},
-    proto::{self, Directory},
-};
+use crate::directoryservice::{Directory, DirectoryNode, Node};
+use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D};
 
 mod utils;
 use self::utils::make_grpc_directory_service_client;
@@ -41,10 +39,10 @@ async fn test_non_exist(directory_service: impl DirectoryService) {
 
     // recursive get
     assert_eq!(
-        Vec::<Result<proto::Directory, crate::Error>>::new(),
+        Vec::<Result<Directory, crate::Error>>::new(),
         directory_service
             .get_recursive(&DIRECTORY_A.digest())
-            .collect::<Vec<Result<proto::Directory, crate::Error>>>()
+            .collect::<Vec<Result<Directory, crate::Error>>>()
             .await
     );
 }
@@ -212,59 +210,26 @@ async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService
     }
 }
 
-/// Try uploading a Directory failing its internal validation, ensure it gets
-/// rejected.
-#[apply(directory_services)]
-#[tokio::test]
-async fn upload_reject_failing_validation(directory_service: impl DirectoryService) {
-    let broken_directory = Directory {
-        symlinks: vec![proto::SymlinkNode {
-            name: "".into(), // wrong!
-            target: "doesntmatter".into(),
-        }],
-        ..Default::default()
-    };
-    assert!(broken_directory.validate().is_err());
-
-    // Try to upload via single upload.
-    assert!(
-        directory_service
-            .put(broken_directory.clone())
-            .await
-            .is_err(),
-        "single upload must fail"
-    );
-
-    // Try to upload via put_multiple. We're a bit more permissive here, the
-    // intermediate .put() might succeed, due to client-side bursting (in the
-    // case of gRPC), but then the close MUST fail.
-    let mut handle = directory_service.put_multiple_start();
-    if handle.put(broken_directory).await.is_ok() {
-        assert!(
-            handle.close().await.is_err(),
-            "when succeeding put, close must fail"
-        )
-    }
-}
-
 /// Try uploading a Directory that refers to a previously-uploaded directory.
 /// Both pass their isolated validation, but the size field in the parent is wrong.
 /// This should be rejected.
 #[apply(directory_services)]
 #[tokio::test]
 async fn upload_reject_wrong_size(directory_service: impl DirectoryService) {
-    let wrong_parent_directory = Directory {
-        directories: vec![proto::DirectoryNode {
-            name: "foo".into(),
-            digest: DIRECTORY_A.digest().into(),
-            size: DIRECTORY_A.size() + 42, // wrong!
-        }],
-        ..Default::default()
+    let wrong_parent_directory = {
+        let mut dir = Directory::new();
+        dir.add(Node::Directory(
+            DirectoryNode::new(
+                "foo".into(),
+                DIRECTORY_A.digest(),
+                DIRECTORY_A.size() + 42, // wrong!
+            )
+            .unwrap(),
+        ))
+        .unwrap();
+        dir
     };
 
-    // Make sure isolated validation itself is ok
-    assert!(wrong_parent_directory.validate().is_ok());
-
     // Now upload both. Ensure it either fails during the second put, or during
     // the close.
     let mut handle = directory_service.put_multiple_start();
diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs
index 17a51ae2bbff..3e6dc73a4d6b 100644
--- a/tvix/castore/src/directoryservice/traverse.rs
+++ b/tvix/castore/src/directoryservice/traverse.rs
@@ -1,8 +1,5 @@
-use super::DirectoryService;
-use crate::{
-    proto::{node::Node, NamedNode},
-    B3Digest, Error, Path,
-};
+use super::{DirectoryService, NamedNode, Node};
+use crate::{Error, Path};
 use tracing::{instrument, warn};
 
 /// This descends from a (root) node to the given (sub)path, returning the Node
@@ -25,34 +22,31 @@ where
                 return Ok(None);
             }
             Node::Directory(directory_node) => {
-                let digest: B3Digest = directory_node
-                    .digest
-                    .try_into()
-                    .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
-
                 // fetch the linked node from the directory_service.
-                let directory =
-                    directory_service
-                        .as_ref()
-                        .get(&digest)
-                        .await?
-                        .ok_or_else(|| {
-                            // If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
-                            warn!("directory {} does not exist", digest);
-
-                            Error::StorageError(format!("directory {} does not exist", digest))
-                        })?;
+                let directory = directory_service
+                    .as_ref()
+                    .get(&directory_node.digest)
+                    .await?
+                    .ok_or_else(|| {
+                        // If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
+                        warn!("directory {} does not exist", directory_node.digest);
+
+                        Error::StorageError(format!(
+                            "directory {} does not exist",
+                            directory_node.digest
+                        ))
+                    })?;
 
                 // look for the component in the [Directory].
                 // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we
                 // could stop as soon as e.name is larger than the search string.
                 if let Some(child_node) = directory.nodes().find(|n| n.get_name() == component) {
                     // child node found, update prev_node to that and continue.
-                    parent_node = child_node;
+                    parent_node = child_node.clone();
                 } else {
                     // child node not found means there's no such element inside the directory.
                     return Ok(None);
-                }
+                };
             }
         }
     }
@@ -65,6 +59,7 @@ where
 mod tests {
     use crate::{
         directoryservice,
+        directoryservice::{DirectoryNode, Node},
         fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP},
         PathBuf,
     };
@@ -88,21 +83,21 @@ mod tests {
         handle.close().await.expect("must upload");
 
         // construct the node for DIRECTORY_COMPLICATED
-        let node_directory_complicated =
-            crate::proto::node::Node::Directory(crate::proto::DirectoryNode {
-                name: "doesntmatter".into(),
-                digest: DIRECTORY_COMPLICATED.digest().into(),
-                size: DIRECTORY_COMPLICATED.size(),
-            });
+        let node_directory_complicated = Node::Directory(
+            DirectoryNode::new(
+                "doesntmatter".into(),
+                DIRECTORY_COMPLICATED.digest(),
+                DIRECTORY_COMPLICATED.size(),
+            )
+            .unwrap(),
+        );
 
         // construct the node for DIRECTORY_COMPLICATED
-        let node_directory_with_keep = crate::proto::node::Node::Directory(
-            DIRECTORY_COMPLICATED.directories.first().unwrap().clone(),
-        );
+        let node_directory_with_keep =
+            Node::Directory(DIRECTORY_COMPLICATED.directories().next().unwrap().clone());
 
         // construct the node for the .keep file
-        let node_file_keep =
-            crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone());
+        let node_file_keep = Node::File(DIRECTORY_WITH_KEEP.files().next().unwrap().clone());
 
         // traversal to an empty subpath should return the root node.
         {
diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs
index 726734f55eec..352362811a97 100644
--- a/tvix/castore/src/directoryservice/utils.rs
+++ b/tvix/castore/src/directoryservice/utils.rs
@@ -1,5 +1,5 @@
+use super::Directory;
 use super::DirectoryService;
-use crate::proto;
 use crate::B3Digest;
 use crate::Error;
 use async_stream::try_stream;
@@ -8,14 +8,14 @@ use std::collections::{HashSet, VecDeque};
 use tracing::instrument;
 use tracing::warn;
 
-/// Traverses a [proto::Directory] from the root to the children.
+/// Traverses a [Directory] from the root to the children.
 ///
 /// This is mostly BFS, but directories are only returned once.
 #[instrument(skip(directory_service))]
 pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
     directory_service: DS,
     root_directory_digest: &B3Digest,
-) -> BoxStream<'a, Result<proto::Directory, Error>> {
+) -> BoxStream<'a, Result<Directory, Error>> {
     // The list of all directories that still need to be traversed. The next
     // element is picked from the front, new elements are enqueued at the
     // back.
@@ -50,16 +50,6 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
                 Some(dir) => dir,
             };
 
-
-            // validate, we don't want to send invalid directories.
-            current_directory.validate().map_err(|e| {
-               warn!("directory failed validation: {}", e.to_string());
-               Error::StorageError(format!(
-                   "invalid directory: {}",
-                   current_directory_digest
-               ))
-            })?;
-
             // We're about to send this directory, so let's avoid sending it again if a
             // descendant has it.
             sent_directory_digests.insert(current_directory_digest);
@@ -67,9 +57,9 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
             // enqueue all child directory digests to the work queue, as
             // long as they're not part of the worklist or already sent.
             // This panics if the digest looks invalid, it's supposed to be checked first.
-            for child_directory_node in &current_directory.directories {
+            for child_directory_node in current_directory.directories() {
                 // TODO: propagate error
-                let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap();
+                let child_digest: B3Digest = child_directory_node.digest.clone();
 
                 if worklist_directory_digests.contains(&child_digest)
                     || sent_directory_digests.contains(&child_digest)