about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r--tvix/castore/src/directoryservice/bigtable.rs24
-rw-r--r--tvix/castore/src/directoryservice/combinators.rs9
-rw-r--r--tvix/castore/src/directoryservice/directory_graph.rs68
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs40
-rw-r--r--tvix/castore/src/directoryservice/memory.rs34
-rw-r--r--tvix/castore/src/directoryservice/mod.rs489
-rw-r--r--tvix/castore/src/directoryservice/object_store.rs19
-rw-r--r--tvix/castore/src/directoryservice/order_validator.rs17
-rw-r--r--tvix/castore/src/directoryservice/redb.rs43
-rw-r--r--tvix/castore/src/directoryservice/simple_putter.rs5
-rw-r--r--tvix/castore/src/directoryservice/sled.rs46
-rw-r--r--tvix/castore/src/directoryservice/tests/mod.rs67
-rw-r--r--tvix/castore/src/directoryservice/traverse.rs63
-rw-r--r--tvix/castore/src/directoryservice/utils.rs20
14 files changed, 650 insertions, 294 deletions
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs
index d10dddaf9f60..73ab4342d832 100644
--- a/tvix/castore/src/directoryservice/bigtable.rs
+++ b/tvix/castore/src/directoryservice/bigtable.rs
@@ -9,7 +9,9 @@ use std::sync::Arc;
 use tonic::async_trait;
 use tracing::{instrument, trace, warn};
 
-use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter};
+use super::{
+    utils::traverse_directory, Directory, DirectoryPutter, DirectoryService, SimplePutter,
+};
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
 
@@ -149,7 +151,7 @@ fn derive_directory_key(digest: &B3Digest) -> String {
 #[async_trait]
 impl DirectoryService for BigtableDirectoryService {
     #[instrument(skip(self, digest), err, fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let mut client = self.client.clone();
         let directory_key = derive_directory_key(digest);
 
@@ -241,28 +243,20 @@ impl DirectoryService for BigtableDirectoryService {
 
         // Try to parse the value into a Directory message.
         let directory = proto::Directory::decode(Bytes::from(row_cell.value))
-            .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?;
-
-        // validate the Directory.
-        directory
-            .validate()
+            .map_err(|e| Error::StorageError(format!("unable to decode directory proto: {}", e)))?
+            .try_into()
             .map_err(|e| Error::StorageError(format!("invalid Directory message: {}", e)))?;
 
         Ok(Some(directory))
     }
 
     #[instrument(skip(self, directory), err, fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         let directory_digest = directory.digest();
         let mut client = self.client.clone();
         let directory_key = derive_directory_key(&directory_digest);
 
-        // Ensure the directory we're trying to upload passes validation
-        directory
-            .validate()
-            .map_err(|e| Error::InvalidRequest(format!("directory is invalid: {}", e)))?;
-
-        let data = directory.encode_to_vec();
+        let data = proto::Directory::from(directory).encode_to_vec();
         if data.len() as u64 > CELL_SIZE_LIMIT {
             return Err(Error::StorageError(
                 "Directory exceeds cell limit on Bigtable".into(),
@@ -310,7 +304,7 @@ impl DirectoryService for BigtableDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs
index 0fdc82c16cb0..4283142231f9 100644
--- a/tvix/castore/src/directoryservice/combinators.rs
+++ b/tvix/castore/src/directoryservice/combinators.rs
@@ -7,10 +7,9 @@ use futures::TryStreamExt;
 use tonic::async_trait;
 use tracing::{instrument, trace};
 
-use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
+use super::{Directory, DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::directoryservice::DirectoryPutter;
-use crate::proto;
 use crate::B3Digest;
 use crate::Error;
 
@@ -40,7 +39,7 @@ where
     DS2: DirectoryService + Clone + 'static,
 {
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         match self.near.get(digest).await? {
             Some(directory) => {
                 trace!("serving from cache");
@@ -82,7 +81,7 @@ where
     }
 
     #[instrument(skip_all)]
-    async fn put(&self, _directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, _directory: Directory) -> Result<B3Digest, Error> {
         Err(Error::StorageError("unimplemented".to_string()))
     }
 
@@ -90,7 +89,7 @@ where
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         let near = self.near.clone();
         let far = self.far.clone();
         let digest = root_directory_digest.clone();
diff --git a/tvix/castore/src/directoryservice/directory_graph.rs b/tvix/castore/src/directoryservice/directory_graph.rs
index e6b9b163370c..aa60f68a3197 100644
--- a/tvix/castore/src/directoryservice/directory_graph.rs
+++ b/tvix/castore/src/directoryservice/directory_graph.rs
@@ -10,10 +10,8 @@ use petgraph::{
 use tracing::instrument;
 
 use super::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
-use crate::{
-    proto::{self, Directory, DirectoryNode},
-    B3Digest,
-};
+use super::{Directory, DirectoryNode};
+use crate::B3Digest;
 
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
@@ -88,7 +86,7 @@ fn check_edge(dir: &DirectoryNode, child: &Directory) -> Result<(), Error> {
 impl DirectoryGraph<LeavesToRootValidator> {
     /// Insert a new Directory into the closure
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
-    pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
         if !self.order_validator.add_directory(&directory) {
             return Err(Error::ValidationError(
                 "unknown directory was referenced".into(),
@@ -108,7 +106,7 @@ impl DirectoryGraph<RootToLeavesValidator> {
 
     /// Insert a new Directory into the closure
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest(), directory.size=%directory.size()), err)]
-    pub fn add(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    pub fn add(&mut self, directory: Directory) -> Result<(), Error> {
         let digest = directory.digest();
         if !self.order_validator.digest_allowed(&digest) {
             return Err(Error::ValidationError("unexpected digest".into()));
@@ -129,12 +127,7 @@ impl<O: OrderValidator> DirectoryGraph<O> {
     }
 
     /// Adds a directory which has already been confirmed to be in-order to the graph
-    pub fn add_order_unchecked(&mut self, directory: proto::Directory) -> Result<(), Error> {
-        // Do some basic validation
-        directory
-            .validate()
-            .map_err(|e| Error::ValidationError(e.to_string()))?;
-
+    pub fn add_order_unchecked(&mut self, directory: Directory) -> Result<(), Error> {
         let digest = directory.digest();
 
         // Teach the graph about the existence of a node with this digest
@@ -149,12 +142,10 @@ impl<O: OrderValidator> DirectoryGraph<O> {
         }
 
         // set up edges to all child directories
-        for subdir in &directory.directories {
-            let subdir_digest: B3Digest = subdir.digest.clone().try_into().unwrap();
-
+        for subdir in directory.directories() {
             let child_ix = *self
                 .digest_to_node_ix
-                .entry(subdir_digest)
+                .entry(subdir.digest.clone())
                 .or_insert_with(|| self.graph.add_node(None));
 
             let pending_edge_check = match &self.graph[child_ix] {
@@ -266,37 +257,36 @@ impl ValidatedDirectoryGraph {
             .filter_map(move |i| nodes[i.index()].weight.take())
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use crate::{
-        fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C},
-        proto::{self, Directory},
-    };
-    use lazy_static::lazy_static;
-    use rstest::rstest;
-
-    lazy_static! {
+/*
         pub static ref BROKEN_DIRECTORY : Directory = Directory {
-            symlinks: vec![proto::SymlinkNode {
+            symlinks: vec![SymlinkNode {
                 name: "".into(), // invalid name!
                 target: "doesntmatter".into(),
             }],
             ..Default::default()
         };
+*/
+#[cfg(test)]
+mod tests {
+    use crate::directoryservice::{Directory, DirectoryNode, Node};
+    use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
+    use lazy_static::lazy_static;
+    use rstest::rstest;
 
-        pub static ref BROKEN_PARENT_DIRECTORY: Directory = Directory {
-            directories: vec![proto::DirectoryNode {
-                name: "foo".into(),
-                digest: DIRECTORY_A.digest().into(),
-                size: DIRECTORY_A.size() + 42, // wrong!
-            }],
-            ..Default::default()
+    use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator};
+
+    lazy_static! {
+        pub static ref BROKEN_PARENT_DIRECTORY: Directory = {
+            let mut dir = Directory::new();
+            dir.add(Node::Directory(DirectoryNode::new(
+                "foo".into(),
+                DIRECTORY_A.digest(),
+                DIRECTORY_A.size() + 42, // wrong!
+            ).unwrap())).unwrap();
+            dir
         };
     }
 
-    use super::{DirectoryGraph, LeavesToRootValidator, RootToLeavesValidator};
-
     #[rstest]
     /// Uploading an empty directory should succeed.
     #[case::empty_directory(&[&*DIRECTORY_A], false, Some(vec![&*DIRECTORY_A]))]
@@ -312,8 +302,6 @@ mod tests {
     #[case::unconnected_node(&[&*DIRECTORY_A, &*DIRECTORY_C, &*DIRECTORY_B], false, None)]
     /// Uploading B (referring to A) should fail immediately, because A was never uploaded.
     #[case::dangling_pointer(&[&*DIRECTORY_B], true, None)]
-    /// Uploading a directory failing validation should fail immediately.
-    #[case::failing_validation(&[&*BROKEN_DIRECTORY], true, None)]
     /// Uploading a directory which refers to another Directory with a wrong size should fail.
     #[case::wrong_size_in_parent(&[&*DIRECTORY_A, &*BROKEN_PARENT_DIRECTORY], true, None)]
     fn test_uploads(
@@ -366,8 +354,6 @@ mod tests {
     #[case::unconnected_node(&*DIRECTORY_C, &[&*DIRECTORY_C, &*DIRECTORY_B], true, None)]
     /// Downloading B (specified as the root) but receiving A instead should fail immediately, because A has no connection to B (the root).
     #[case::dangling_pointer(&*DIRECTORY_B, &[&*DIRECTORY_A], true, None)]
-    /// Downloading a directory failing validation should fail immediately.
-    #[case::failing_validation(&*BROKEN_DIRECTORY, &[&*BROKEN_DIRECTORY], true, None)]
     /// Downloading a directory which refers to another Directory with a wrong size should fail.
     #[case::wrong_size_in_parent(&*BROKEN_PARENT_DIRECTORY, &[&*BROKEN_PARENT_DIRECTORY, &*DIRECTORY_A], true, None)]
     fn test_downloads(
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index 4dc3931ed410..2079514d2b79 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -1,8 +1,9 @@
 use std::collections::HashSet;
 
-use super::{DirectoryPutter, DirectoryService};
+use super::{Directory, DirectoryPutter, DirectoryService};
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::proto::{self, get_directory_request::ByWhat};
+use crate::ValidateDirectoryError;
 use crate::{B3Digest, Error};
 use async_stream::try_stream;
 use futures::stream::BoxStream;
@@ -41,10 +42,7 @@ where
     T::Future: Send,
 {
     #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))]
-    async fn get(
-        &self,
-        digest: &B3Digest,
-    ) -> Result<Option<crate::proto::Directory>, crate::Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
         let digest_cpy = digest.clone();
@@ -72,15 +70,10 @@ where
                         "requested directory with digest {}, but got {}",
                         digest, actual_digest
                     )))
-                } else if let Err(e) = directory.validate() {
-                    // Validate the Directory itself is valid.
-                    warn!("directory failed validation: {}", e.to_string());
-                    Err(crate::Error::StorageError(format!(
-                        "directory {} failed validation: {}",
-                        digest, e,
-                    )))
                 } else {
-                    Ok(Some(directory))
+                    Ok(Some(directory.try_into().map_err(|_| {
+                        Error::StorageError("invalid root digest length in response".to_string())
+                    })?))
                 }
             }
             Ok(None) => Ok(None),
@@ -90,11 +83,11 @@ where
     }
 
     #[instrument(level = "trace", skip_all, fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: crate::proto::Directory) -> Result<B3Digest, crate::Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, crate::Error> {
         let resp = self
             .grpc_client
             .clone()
-            .put(tokio_stream::once(directory))
+            .put(tokio_stream::once(proto::Directory::from(directory)))
             .await;
 
         match resp {
@@ -113,7 +106,7 @@ where
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         let mut grpc_client = self.grpc_client.clone();
         let root_directory_digest = root_directory_digest.clone();
 
@@ -135,14 +128,6 @@ where
             loop {
                 match stream.message().await {
                     Ok(Some(directory)) => {
-                        // validate the directory itself.
-                        if let Err(e) = directory.validate() {
-                            Err(crate::Error::StorageError(format!(
-                                "directory {} failed validation: {}",
-                                directory.digest(),
-                                e,
-                            )))?;
-                        }
                         // validate we actually expected that directory, and move it from expected to received.
                         let directory_digest = directory.digest();
                         let was_expected = expected_directory_digests.remove(&directory_digest);
@@ -168,6 +153,9 @@ where
                                 .insert(child_directory_digest);
                         }
 
+                        let directory = directory.try_into()
+                            .map_err(|e: ValidateDirectoryError| Error::StorageError(e.to_string()))?;
+
                         yield directory;
                     },
                     Ok(None) if expected_directory_digests.len() == 1 && expected_directory_digests.contains(&root_directory_digest) => {
@@ -279,11 +267,11 @@ pub struct GRPCPutter {
 #[async_trait]
 impl DirectoryPutter for GRPCPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), crate::Error> {
         match self.rq {
             // If we're not already closed, send the directory to directory_sender.
             Some((_, ref directory_sender)) => {
-                if directory_sender.send(directory).is_err() {
+                if directory_sender.send(directory.into()).is_err() {
                     // If the channel has been prematurely closed, invoke close (so we can peek at the error code)
                     // That error code is much more helpful, because it
                     // contains the error message from the server.
diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs
index ada4606a5a57..b039d9bc7d84 100644
--- a/tvix/castore/src/directoryservice/memory.rs
+++ b/tvix/castore/src/directoryservice/memory.rs
@@ -1,4 +1,4 @@
-use crate::{proto, B3Digest, Error};
+use crate::{B3Digest, Error};
 use futures::stream::BoxStream;
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -7,8 +7,9 @@ use tonic::async_trait;
 use tracing::{instrument, warn};
 
 use super::utils::traverse_directory;
-use super::{DirectoryPutter, DirectoryService, SimplePutter};
+use super::{Directory, DirectoryPutter, DirectoryService, SimplePutter};
 use crate::composition::{CompositionContext, ServiceBuilder};
+use crate::proto;
 
 #[derive(Clone, Default)]
 pub struct MemoryDirectoryService {
@@ -18,7 +19,7 @@ pub struct MemoryDirectoryService {
 #[async_trait]
 impl DirectoryService for MemoryDirectoryService {
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let db = self.db.read().await;
 
         match db.get(digest) {
@@ -37,35 +38,20 @@ impl DirectoryService for MemoryDirectoryService {
                     )));
                 }
 
-                // Validate the Directory itself is valid.
-                if let Err(e) = directory.validate() {
-                    warn!("directory failed validation: {}", e.to_string());
-                    return Err(Error::StorageError(format!(
-                        "directory {} failed validation: {}",
-                        actual_digest, e,
-                    )));
-                }
-
-                Ok(Some(directory.clone()))
+                Ok(Some(directory.clone().try_into().map_err(|e| {
+                    crate::Error::StorageError(format!("corrupted directory: {}", e))
+                })?))
             }
         }
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         let digest = directory.digest();
 
-        // validate the directory itself.
-        if let Err(e) = directory.validate() {
-            return Err(Error::InvalidRequest(format!(
-                "directory {} failed validation: {}",
-                digest, e,
-            )));
-        }
-
         // store it
         let mut db = self.db.write().await;
-        db.insert(digest.clone(), directory);
+        db.insert(digest.clone(), directory.into());
 
         Ok(digest)
     }
@@ -74,7 +60,7 @@ impl DirectoryService for MemoryDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
index 17a78b179349..0141a48f75bc 100644
--- a/tvix/castore/src/directoryservice/mod.rs
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -1,6 +1,9 @@
 use crate::composition::{Registry, ServiceBuilder};
-use crate::{proto, B3Digest, Error};
+use crate::proto;
+use crate::{B3Digest, Error};
+use crate::{ValidateDirectoryError, ValidateNodeError};
 
+use bytes::Bytes;
 use futures::stream::BoxStream;
 use tonic::async_trait;
 mod combinators;
@@ -38,7 +41,7 @@ mod bigtable;
 pub use self::bigtable::{BigtableDirectoryService, BigtableParameters};
 
 /// The base trait all Directory services need to implement.
-/// This is a simple get and put of [crate::proto::Directory], returning their
+/// This is a simple get and put of [Directory], returning their
 /// digest.
 #[async_trait]
 pub trait DirectoryService: Send + Sync {
@@ -50,14 +53,14 @@ pub trait DirectoryService: Send + Sync {
     /// Directory digests that are at the "root", aka the last element that's
     /// sent to a DirectoryPutter. This makes sense for implementations bundling
     /// closures of directories together in batches.
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>;
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error>;
     /// Uploads a single Directory message, and returns the calculated
     /// digest, or an error. An error *must* also be returned if the message is
     /// not valid.
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error>;
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error>;
 
-    /// Looks up a closure of [proto::Directory].
-    /// Ideally this would be a `impl Stream<Item = Result<proto::Directory, Error>>`,
+    /// Looks up a closure of [Directory].
+    /// Ideally this would be a `impl Stream<Item = Result<Directory, Error>>`,
     /// and we'd be able to add a default implementation for it here, but
     /// we can't have that yet.
     ///
@@ -75,9 +78,9 @@ pub trait DirectoryService: Send + Sync {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>>;
+    ) -> BoxStream<'static, Result<Directory, Error>>;
 
-    /// Allows persisting a closure of [proto::Directory], which is a graph of
+    /// Allows persisting a closure of [Directory], which is a graph of
     /// connected Directory messages.
     fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>;
 }
@@ -87,18 +90,18 @@ impl<A> DirectoryService for A
 where
     A: AsRef<dyn DirectoryService> + Send + Sync,
 {
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         self.as_ref().get(digest).await
     }
 
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         self.as_ref().put(directory).await
     }
 
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         self.as_ref().get_recursive(root_directory_digest)
     }
 
@@ -107,7 +110,7 @@ where
     }
 }
 
-/// Provides a handle to put a closure of connected [proto::Directory] elements.
+/// Provides a handle to put a closure of connected [Directory] elements.
 ///
 /// The consumer can periodically call [DirectoryPutter::put], starting from the
 /// leaves. Once the root is reached, [DirectoryPutter::close] can be called to
@@ -119,12 +122,12 @@ where
 /// but a single file or symlink.
 #[async_trait]
 pub trait DirectoryPutter: Send {
-    /// Put a individual [proto::Directory] into the store.
+    /// Put a individual [Directory] into the store.
     /// Error semantics and behaviour is up to the specific implementation of
     /// this trait.
     /// Due to bursting, the returned error might refer to an object previously
     /// sent via `put`.
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error>;
+    async fn put(&mut self, directory: Directory) -> Result<(), Error>;
 
     /// Close the stream, and wait for any errors.
     /// If there's been any invalid Directory message uploaded, and error *must*
@@ -145,3 +148,461 @@ pub(crate) fn register_directory_services(reg: &mut Registry) {
         reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable");
     }
 }
+
+/// A Directory can contain Directory, File or Symlink nodes.
+/// Each of these nodes have a name attribute, which is the basename in that
+/// directory and node type specific attributes.
+/// While a Node by itself may have any name, the names of Directory entries:
+///  - MUST not contain slashes or null bytes
+///  - MUST not be '.' or '..'
+///  - MUST be unique across all three lists
+#[derive(Default, Debug, Clone, PartialEq, Eq)]
+pub struct Directory {
+    nodes: Vec<Node>,
+}
+
+/// A DirectoryNode is a pointer to a [Directory], by its [Directory::digest].
+/// It also gives it a `name` and `size`.
+/// Such a node is either an element in the [Directory] it itself is contained in,
+/// or a standalone root node./
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct DirectoryNode {
+    /// The (base)name of the directory
+    name: Bytes,
+    /// The blake3 hash of a Directory message, serialized in protobuf canonical form.
+    digest: B3Digest,
+    /// Number of child elements in the Directory referred to by `digest`.
+    /// Calculated by summing up the numbers of nodes, and for each directory.
+    /// its size field. Can be used for inode allocation.
+    /// This field is precisely as verifiable as any other Merkle tree edge.
+    /// Resolve `digest`, and you can compute it incrementally. Resolve the entire
+    /// tree, and you can fully compute it from scratch.
+    /// A credulous implementation won't reject an excessive size, but this is
+    /// harmless: you'll have some ordinals without nodes. Undersizing is obvious
+    /// and easy to reject: you won't have an ordinal for some nodes.
+    size: u64,
+}
+
+impl DirectoryNode {
+    pub fn new(name: Bytes, digest: B3Digest, size: u64) -> Result<Self, ValidateNodeError> {
+        Ok(Self { name, digest, size })
+    }
+
+    pub fn digest(&self) -> &B3Digest {
+        &self.digest
+    }
+    pub fn size(&self) -> u64 {
+        self.size
+    }
+}
+
+/// A FileNode represents a regular or executable file in a Directory or at the root.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct FileNode {
+    /// The (base)name of the file
+    name: Bytes,
+
+    /// The blake3 digest of the file contents
+    digest: B3Digest,
+
+    /// The file content size
+    size: u64,
+
+    /// Whether the file is executable
+    executable: bool,
+}
+
+impl FileNode {
+    pub fn new(
+        name: Bytes,
+        digest: B3Digest,
+        size: u64,
+        executable: bool,
+    ) -> Result<Self, ValidateNodeError> {
+        Ok(Self {
+            name,
+            digest,
+            size,
+            executable,
+        })
+    }
+
+    pub fn digest(&self) -> &B3Digest {
+        &self.digest
+    }
+
+    pub fn size(&self) -> u64 {
+        self.size
+    }
+
+    pub fn executable(&self) -> bool {
+        self.executable
+    }
+}
+
+/// A SymlinkNode represents a symbolic link in a Directory or at the root.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct SymlinkNode {
+    /// The (base)name of the symlink
+    name: Bytes,
+    /// The target of the symlink.
+    target: Bytes,
+}
+
+impl SymlinkNode {
+    pub fn new(name: Bytes, target: Bytes) -> Result<Self, ValidateNodeError> {
+        if target.is_empty() || target.contains(&b'\0') {
+            return Err(ValidateNodeError::InvalidSymlinkTarget(target));
+        }
+        Ok(Self { name, target })
+    }
+
+    pub fn target(&self) -> &bytes::Bytes {
+        &self.target
+    }
+}
+
+/// A Node is either a [DirectoryNode], [FileNode] or [SymlinkNode].
+/// While a Node by itself may have any name, only those matching specific requirements
+/// can can be added as entries to a [Directory] (see the documentation on [Directory] for details).
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Node {
+    Directory(DirectoryNode),
+    File(FileNode),
+    Symlink(SymlinkNode),
+}
+
+/// NamedNode is implemented for [FileNode], [DirectoryNode] and [SymlinkNode]
+/// and [Node], so we can ask all of them for the name easily.
+pub trait NamedNode {
+    fn get_name(&self) -> &bytes::Bytes;
+}
+
+impl NamedNode for &FileNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+impl NamedNode for FileNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+
+impl NamedNode for &DirectoryNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+impl NamedNode for DirectoryNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+
+impl NamedNode for &SymlinkNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+impl NamedNode for SymlinkNode {
+    fn get_name(&self) -> &bytes::Bytes {
+        &self.name
+    }
+}
+
+impl NamedNode for &Node {
+    fn get_name(&self) -> &bytes::Bytes {
+        match self {
+            Node::File(node_file) => &node_file.name,
+            Node::Directory(node_directory) => &node_directory.name,
+            Node::Symlink(node_symlink) => &node_symlink.name,
+        }
+    }
+}
+impl NamedNode for Node {
+    fn get_name(&self) -> &bytes::Bytes {
+        match self {
+            Node::File(node_file) => &node_file.name,
+            Node::Directory(node_directory) => &node_directory.name,
+            Node::Symlink(node_symlink) => &node_symlink.name,
+        }
+    }
+}
+
+impl Node {
+    /// Returns the node with a new name.
+    pub fn rename(self, name: bytes::Bytes) -> Self {
+        match self {
+            Node::Directory(n) => Node::Directory(DirectoryNode { name, ..n }),
+            Node::File(n) => Node::File(FileNode { name, ..n }),
+            Node::Symlink(n) => Node::Symlink(SymlinkNode { name, ..n }),
+        }
+    }
+}
+
+impl PartialOrd for Node {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for Node {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.get_name().cmp(other.get_name())
+    }
+}
+
+impl PartialOrd for FileNode {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for FileNode {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.get_name().cmp(other.get_name())
+    }
+}
+
+impl PartialOrd for DirectoryNode {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for DirectoryNode {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.get_name().cmp(other.get_name())
+    }
+}
+
+impl PartialOrd for SymlinkNode {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        Some(self.cmp(other))
+    }
+}
+
+impl Ord for SymlinkNode {
+    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+        self.get_name().cmp(other.get_name())
+    }
+}
+
+fn checked_sum(iter: impl IntoIterator<Item = u64>) -> Option<u64> {
+    iter.into_iter().try_fold(0u64, |acc, i| acc.checked_add(i))
+}
+
+impl Directory {
+    pub fn new() -> Self {
+        Directory { nodes: vec![] }
+    }
+
+    /// The size of a directory is the number of all regular and symlink elements,
+    /// the number of directory elements, and their size fields.
+    pub fn size(&self) -> u64 {
+        // It's impossible to create a Directory where the size overflows, because we
+        // check before every add() that the size won't overflow.
+        (self.nodes.len() as u64) + self.directories().map(|e| e.size).sum::<u64>()
+    }
+
+    /// Calculates the digest of a Directory, which is the blake3 hash of a
+    /// Directory protobuf message, serialized in protobuf canonical form.
+    pub fn digest(&self) -> B3Digest {
+        proto::Directory::from(self).digest()
+    }
+
+    /// Allows iterating over all nodes (directories, files and symlinks)
+    /// ordered by their name.
+    pub fn nodes(&self) -> impl Iterator<Item = &Node> + Send + Sync + '_ {
+        self.nodes.iter()
+    }
+
+    /// Allows iterating over the FileNode entries of this directory
+    /// ordered by their name
+    pub fn files(&self) -> impl Iterator<Item = &FileNode> + Send + Sync + '_ {
+        self.nodes.iter().filter_map(|node| match node {
+            Node::File(n) => Some(n),
+            _ => None,
+        })
+    }
+
+    /// Allows iterating over the subdirectories of this directory
+    /// ordered by their name
+    pub fn directories(&self) -> impl Iterator<Item = &DirectoryNode> + Send + Sync + '_ {
+        self.nodes.iter().filter_map(|node| match node {
+            Node::Directory(n) => Some(n),
+            _ => None,
+        })
+    }
+
+    /// Allows iterating over the SymlinkNode entries of this directory
+    /// ordered by their name
+    pub fn symlinks(&self) -> impl Iterator<Item = &SymlinkNode> + Send + Sync + '_ {
+        self.nodes.iter().filter_map(|node| match node {
+            Node::Symlink(n) => Some(n),
+            _ => None,
+        })
+    }
+
+    /// Checks a Node name for validity as a directory entry
+    /// We disallow slashes, null bytes, '.', '..' and the empty string.
+    pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> {
+        if name.is_empty()
+            || name == b".."
+            || name == b"."
+            || name.contains(&0x00)
+            || name.contains(&b'/')
+        {
+            Err(ValidateNodeError::InvalidName(name.to_owned().into()))
+        } else {
+            Ok(())
+        }
+    }
+
+    /// Adds the specified [Node] to the [Directory], preserving sorted entries.
+    ///
+    /// Inserting an element that already exists with the same name in the directory will yield an
+    /// error.
+    /// Inserting an element will validate that its name fulfills the stricter requirements for
+    /// directory entries and yield an error if it is not.
+    pub fn add(&mut self, node: Node) -> Result<(), ValidateDirectoryError> {
+        Self::validate_node_name(node.get_name())
+            .map_err(|e| ValidateDirectoryError::InvalidNode(node.get_name().clone().into(), e))?;
+
+        // Check that the even after adding this new directory entry, the size calculation will not
+        // overflow
+        // FUTUREWORK: add some sort of batch add interface which only does this check once with
+        // all the to-be-added entries
+        checked_sum([
+            self.size(),
+            1,
+            match node {
+                Node::Directory(ref dir) => dir.size,
+                _ => 0,
+            },
+        ])
+        .ok_or(ValidateDirectoryError::SizeOverflow)?;
+
+        // This assumes the [Directory] is sorted, since we don't allow accessing the nodes list
+        // directly and all previous inserts should have been in-order
+        let pos = match self
+            .nodes
+            .binary_search_by_key(&node.get_name(), |n| n.get_name())
+        {
+            Err(pos) => pos, // There is no node with this name; good!
+            Ok(_) => {
+                return Err(ValidateDirectoryError::DuplicateName(
+                    node.get_name().to_vec(),
+                ))
+            }
+        };
+
+        self.nodes.insert(pos, node);
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use super::{Directory, DirectoryNode, FileNode, Node, SymlinkNode};
+    use crate::fixtures::DUMMY_DIGEST;
+    use crate::ValidateDirectoryError;
+
+    #[test]
+    fn add_nodes_to_directory() {
+        let mut d = Directory::new();
+
+        d.add(Node::Directory(
+            DirectoryNode::new("b".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::Directory(
+            DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::Directory(
+            DirectoryNode::new("z".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
+        ))
+        .unwrap();
+
+        d.add(Node::File(
+            FileNode::new("f".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::File(
+            FileNode::new("c".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::File(
+            FileNode::new("g".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
+        ))
+        .unwrap();
+
+        d.add(Node::Symlink(
+            SymlinkNode::new("t".into(), "a".into()).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::Symlink(
+            SymlinkNode::new("o".into(), "a".into()).unwrap(),
+        ))
+        .unwrap();
+        d.add(Node::Symlink(
+            SymlinkNode::new("e".into(), "a".into()).unwrap(),
+        ))
+        .unwrap();
+
+        // Convert to proto struct and back to ensure we are not generating any invalid structures
+        crate::directoryservice::Directory::try_from(crate::proto::Directory::from(d))
+            .expect("directory should be valid");
+    }
+
+    #[test]
+    fn validate_overflow() {
+        let mut d = Directory::new();
+
+        assert_eq!(
+            d.add(Node::Directory(
+                DirectoryNode::new("foo".into(), DUMMY_DIGEST.clone(), u64::MAX).unwrap(),
+            )),
+            Err(ValidateDirectoryError::SizeOverflow)
+        );
+    }
+
+    #[test]
+    fn add_duplicate_node_to_directory() {
+        let mut d = Directory::new();
+
+        d.add(Node::Directory(
+            DirectoryNode::new("a".into(), DUMMY_DIGEST.clone(), 1).unwrap(),
+        ))
+        .unwrap();
+        assert_eq!(
+            format!(
+                "{}",
+                d.add(Node::File(
+                    FileNode::new("a".into(), DUMMY_DIGEST.clone(), 1, true).unwrap(),
+                ))
+                .expect_err("adding duplicate dir entry must fail")
+            ),
+            "\"a\" is a duplicate name"
+        );
+    }
+
+    /// Attempt to add a directory entry with a name which should be rejected.
+    #[tokio::test]
+    async fn directory_reject_invalid_name() {
+        let mut dir = Directory::new();
+        assert!(
+            dir.add(Node::Symlink(
+                SymlinkNode::new(
+                    "".into(), // wrong! can not be added to directory
+                    "doesntmatter".into(),
+                )
+                .unwrap()
+            ))
+            .is_err(),
+            "invalid symlink entry be rejected"
+        );
+    }
+}
diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs
index a9a2cc8ef5c0..6e71c2356def 100644
--- a/tvix/castore/src/directoryservice/object_store.rs
+++ b/tvix/castore/src/directoryservice/object_store.rs
@@ -17,7 +17,8 @@ use tracing::{instrument, trace, warn, Level};
 use url::Url;
 
 use super::{
-    DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator,
+    Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
+    RootToLeavesValidator,
 };
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
@@ -78,13 +79,13 @@ impl DirectoryService for ObjectStoreDirectoryService {
     /// This is the same steps as for get_recursive anyways, so we just call get_recursive and
     /// return the first element of the stream and drop the request.
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         self.get_recursive(digest).take(1).next().await.transpose()
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
-        if !directory.directories.is_empty() {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
+        if directory.directories().next().is_some() {
             return Err(Error::InvalidRequest(
                     "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(),
             ));
@@ -99,7 +100,7 @@ impl DirectoryService for ObjectStoreDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         // Check that we are not passing on bogus from the object store to the client, and that the
         // trust chain from the root digest to the leaves is intact
         let mut order_validator =
@@ -145,6 +146,10 @@ impl DirectoryService for ObjectStoreDirectoryService {
                             warn!("unable to parse directory {}: {}", digest, e);
                             Error::StorageError(e.to_string())
                         })?;
+                        let directory = Directory::try_from(directory).map_err(|e| {
+                            warn!("unable to convert directory {}: {}", digest, e);
+                            Error::StorageError(e.to_string())
+                        })?;
 
                         // Allow the children to appear next
                         order_validator.add_directory_unchecked(&directory);
@@ -244,7 +249,7 @@ impl ObjectStoreDirectoryPutter {
 #[async_trait]
 impl DirectoryPutter for ObjectStoreDirectoryPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
@@ -302,7 +307,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter {
 
                 for directory in directories {
                     directories_sink
-                        .send(directory.encode_to_vec().into())
+                        .send(proto::Directory::from(directory).encode_to_vec().into())
                         .await?;
                 }
 
diff --git a/tvix/castore/src/directoryservice/order_validator.rs b/tvix/castore/src/directoryservice/order_validator.rs
index 6045f5d24198..17431fc8d3b0 100644
--- a/tvix/castore/src/directoryservice/order_validator.rs
+++ b/tvix/castore/src/directoryservice/order_validator.rs
@@ -1,7 +1,8 @@
 use std::collections::HashSet;
 use tracing::warn;
 
-use crate::{proto::Directory, B3Digest};
+use super::Directory;
+use crate::B3Digest;
 
 pub trait OrderValidator {
     /// Update the order validator's state with the directory
@@ -47,10 +48,9 @@ impl RootToLeavesValidator {
             self.expected_digests.insert(directory.digest());
         }
 
-        for subdir in &directory.directories {
+        for subdir in directory.directories() {
             // Allow the children to appear next
-            let subdir_digest = subdir.digest.clone().try_into().unwrap();
-            self.expected_digests.insert(subdir_digest);
+            self.expected_digests.insert(subdir.digest.clone());
         }
     }
 }
@@ -79,12 +79,11 @@ impl OrderValidator for LeavesToRootValidator {
     fn add_directory(&mut self, directory: &Directory) -> bool {
         let digest = directory.digest();
 
-        for subdir in &directory.directories {
-            let subdir_digest = subdir.digest.clone().try_into().unwrap(); // this has been validated in validate_directory()
-            if !self.allowed_references.contains(&subdir_digest) {
+        for subdir in directory.directories() {
+            if !self.allowed_references.contains(&subdir.digest) {
                 warn!(
                     directory.digest = %digest,
-                    subdirectory.digest = %subdir_digest,
+                    subdirectory.digest = %subdir.digest,
                     "unexpected directory reference"
                 );
                 return false;
@@ -101,8 +100,8 @@ impl OrderValidator for LeavesToRootValidator {
 mod tests {
     use super::{LeavesToRootValidator, RootToLeavesValidator};
     use crate::directoryservice::order_validator::OrderValidator;
+    use crate::directoryservice::Directory;
     use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C};
-    use crate::proto::Directory;
     use rstest::rstest;
 
     #[rstest]
diff --git a/tvix/castore/src/directoryservice/redb.rs b/tvix/castore/src/directoryservice/redb.rs
index 51dc87f92574..d253df503bb3 100644
--- a/tvix/castore/src/directoryservice/redb.rs
+++ b/tvix/castore/src/directoryservice/redb.rs
@@ -5,15 +5,15 @@ use std::{path::PathBuf, sync::Arc};
 use tonic::async_trait;
 use tracing::{instrument, warn};
 
+use super::{
+    traverse_directory, Directory, DirectoryGraph, DirectoryPutter, DirectoryService,
+    LeavesToRootValidator,
+};
 use crate::{
     composition::{CompositionContext, ServiceBuilder},
     digests, proto, B3Digest, Error,
 };
 
-use super::{
-    traverse_directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
-};
-
 const DIRECTORY_TABLE: TableDefinition<[u8; digests::B3_LEN], Vec<u8>> =
     TableDefinition::new("directory");
 
@@ -69,7 +69,7 @@ fn create_schema(db: &redb::Database) -> Result<(), redb::Error> {
 #[async_trait]
 impl DirectoryService for RedbDirectoryService {
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let db = self.db.clone();
 
         // Retrieves the protobuf-encoded Directory for the corresponding digest.
@@ -107,13 +107,10 @@ impl DirectoryService for RedbDirectoryService {
         let directory = match proto::Directory::decode(&*directory_data.value()) {
             Ok(dir) => {
                 // The returned Directory must be valid.
-                if let Err(e) = dir.validate() {
+                dir.try_into().map_err(|e| {
                     warn!(err=%e, "Directory failed validation");
-                    return Err(Error::StorageError(
-                        "Directory failed validation".to_string(),
-                    ));
-                }
-                dir
+                    Error::StorageError("Directory failed validation".to_string())
+                })?
             }
             Err(e) => {
                 warn!(err=%e, "failed to parse Directory");
@@ -125,26 +122,21 @@ impl DirectoryService for RedbDirectoryService {
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         tokio::task::spawn_blocking({
             let db = self.db.clone();
             move || {
                 let digest = directory.digest();
 
-                // Validate the directory.
-                if let Err(e) = directory.validate() {
-                    warn!(err=%e, "Directory failed validation");
-                    return Err(Error::StorageError(
-                        "Directory failed validation".to_string(),
-                    ));
-                }
-
                 // Store the directory in the table.
                 let txn = db.begin_write()?;
                 {
                     let mut table = txn.open_table(DIRECTORY_TABLE)?;
                     let digest_as_array: [u8; digests::B3_LEN] = digest.clone().into();
-                    table.insert(digest_as_array, directory.encode_to_vec())?;
+                    table.insert(
+                        digest_as_array,
+                        proto::Directory::from(directory).encode_to_vec(),
+                    )?;
                 }
                 txn.commit()?;
 
@@ -158,7 +150,7 @@ impl DirectoryService for RedbDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         // FUTUREWORK: Ideally we should have all of the directory traversing happen in a single
         // redb transaction to avoid constantly closing and opening new transactions for the
         // database.
@@ -185,7 +177,7 @@ pub struct RedbDirectoryPutter {
 #[async_trait]
 impl DirectoryPutter for RedbDirectoryPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
@@ -228,7 +220,10 @@ impl DirectoryPutter for RedbDirectoryPutter {
                             for directory in directories {
                                 let digest_as_array: [u8; digests::B3_LEN] =
                                     directory.digest().into();
-                                table.insert(digest_as_array, directory.encode_to_vec())?;
+                                table.insert(
+                                    digest_as_array,
+                                    proto::Directory::from(directory).encode_to_vec(),
+                                )?;
                             }
                         }
 
diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs
index dc54e3d11d18..b4daaee61b22 100644
--- a/tvix/castore/src/directoryservice/simple_putter.rs
+++ b/tvix/castore/src/directoryservice/simple_putter.rs
@@ -1,7 +1,6 @@
 use super::DirectoryPutter;
 use super::DirectoryService;
-use super::{DirectoryGraph, LeavesToRootValidator};
-use crate::proto;
+use super::{Directory, DirectoryGraph, LeavesToRootValidator};
 use crate::B3Digest;
 use crate::Error;
 use tonic::async_trait;
@@ -29,7 +28,7 @@ impl<DS: DirectoryService> SimplePutter<DS> {
 #[async_trait]
 impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs
index 5766dec1a5c2..4f3a860d14e4 100644
--- a/tvix/castore/src/directoryservice/sled.rs
+++ b/tvix/castore/src/directoryservice/sled.rs
@@ -1,5 +1,3 @@
-use crate::proto::Directory;
-use crate::{proto, B3Digest, Error};
 use futures::stream::BoxStream;
 use prost::Message;
 use std::ops::Deref;
@@ -9,8 +7,9 @@ use tonic::async_trait;
 use tracing::{instrument, warn};
 
 use super::utils::traverse_directory;
-use super::{DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator};
+use super::{Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator};
 use crate::composition::{CompositionContext, ServiceBuilder};
+use crate::{proto, B3Digest, Error};
 
 #[derive(Clone)]
 pub struct SledDirectoryService {
@@ -44,7 +43,7 @@ impl SledDirectoryService {
 #[async_trait]
 impl DirectoryService for SledDirectoryService {
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         let resp = tokio::task::spawn_blocking({
             let db = self.db.clone();
             let digest = digest.clone();
@@ -61,7 +60,7 @@ impl DirectoryService for SledDirectoryService {
             None => Ok(None),
 
             // The directory was found, try to parse the data as Directory message
-            Some(data) => match Directory::decode(&*data) {
+            Some(data) => match proto::Directory::decode(&*data) {
                 Ok(directory) => {
                     // Validate the retrieved Directory indeed has the
                     // digest we expect it to have, to detect corruptions.
@@ -73,14 +72,10 @@ impl DirectoryService for SledDirectoryService {
                         )));
                     }
 
-                    // Validate the Directory itself is valid.
-                    if let Err(e) = directory.validate() {
-                        warn!("directory failed validation: {}", e.to_string());
-                        return Err(Error::StorageError(format!(
-                            "directory {} failed validation: {}",
-                            actual_digest, e,
-                        )));
-                    }
+                    let directory = directory.try_into().map_err(|e| {
+                        warn!("failed to retrieve directory: {}", e);
+                        Error::StorageError(format!("failed to retrieve directory: {}", e))
+                    })?;
 
                     Ok(Some(directory))
                 }
@@ -93,22 +88,18 @@ impl DirectoryService for SledDirectoryService {
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
         tokio::task::spawn_blocking({
             let db = self.db.clone();
             move || {
                 let digest = directory.digest();
 
-                // validate the directory itself.
-                if let Err(e) = directory.validate() {
-                    return Err(Error::InvalidRequest(format!(
-                        "directory {} failed validation: {}",
-                        digest, e,
-                    )));
-                }
                 // store it
-                db.insert(digest.as_slice(), directory.encode_to_vec())
-                    .map_err(|e| Error::StorageError(e.to_string()))?;
+                db.insert(
+                    digest.as_slice(),
+                    proto::Directory::from(directory).encode_to_vec(),
+                )
+                .map_err(|e| Error::StorageError(e.to_string()))?;
 
                 Ok(digest)
             }
@@ -120,7 +111,7 @@ impl DirectoryService for SledDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
@@ -215,7 +206,7 @@ pub struct SledDirectoryPutter {
 #[async_trait]
 impl DirectoryPutter for SledDirectoryPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
@@ -252,7 +243,10 @@ impl DirectoryPutter for SledDirectoryPutter {
 
                         let mut batch = sled::Batch::default();
                         for directory in directories {
-                            batch.insert(directory.digest().as_slice(), directory.encode_to_vec());
+                            batch.insert(
+                                directory.digest().as_slice(),
+                                proto::Directory::from(directory).encode_to_vec(),
+                            );
                         }
 
                         tree.apply_batch(batch).map_err(|e| {
diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs
index b698f70ea469..3923e66dc2c2 100644
--- a/tvix/castore/src/directoryservice/tests/mod.rs
+++ b/tvix/castore/src/directoryservice/tests/mod.rs
@@ -8,10 +8,8 @@ use rstest_reuse::{self, *};
 
 use super::DirectoryService;
 use crate::directoryservice;
-use crate::{
-    fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D},
-    proto::{self, Directory},
-};
+use crate::directoryservice::{Directory, DirectoryNode, Node};
+use crate::fixtures::{DIRECTORY_A, DIRECTORY_B, DIRECTORY_C, DIRECTORY_D};
 
 mod utils;
 use self::utils::make_grpc_directory_service_client;
@@ -41,10 +39,10 @@ async fn test_non_exist(directory_service: impl DirectoryService) {
 
     // recursive get
     assert_eq!(
-        Vec::<Result<proto::Directory, crate::Error>>::new(),
+        Vec::<Result<Directory, crate::Error>>::new(),
         directory_service
             .get_recursive(&DIRECTORY_A.digest())
-            .collect::<Vec<Result<proto::Directory, crate::Error>>>()
+            .collect::<Vec<Result<Directory, crate::Error>>>()
             .await
     );
 }
@@ -212,59 +210,26 @@ async fn upload_reject_dangling_pointer(directory_service: impl DirectoryService
     }
 }
 
-/// Try uploading a Directory failing its internal validation, ensure it gets
-/// rejected.
-#[apply(directory_services)]
-#[tokio::test]
-async fn upload_reject_failing_validation(directory_service: impl DirectoryService) {
-    let broken_directory = Directory {
-        symlinks: vec![proto::SymlinkNode {
-            name: "".into(), // wrong!
-            target: "doesntmatter".into(),
-        }],
-        ..Default::default()
-    };
-    assert!(broken_directory.validate().is_err());
-
-    // Try to upload via single upload.
-    assert!(
-        directory_service
-            .put(broken_directory.clone())
-            .await
-            .is_err(),
-        "single upload must fail"
-    );
-
-    // Try to upload via put_multiple. We're a bit more permissive here, the
-    // intermediate .put() might succeed, due to client-side bursting (in the
-    // case of gRPC), but then the close MUST fail.
-    let mut handle = directory_service.put_multiple_start();
-    if handle.put(broken_directory).await.is_ok() {
-        assert!(
-            handle.close().await.is_err(),
-            "when succeeding put, close must fail"
-        )
-    }
-}
-
 /// Try uploading a Directory that refers to a previously-uploaded directory.
 /// Both pass their isolated validation, but the size field in the parent is wrong.
 /// This should be rejected.
 #[apply(directory_services)]
 #[tokio::test]
 async fn upload_reject_wrong_size(directory_service: impl DirectoryService) {
-    let wrong_parent_directory = Directory {
-        directories: vec![proto::DirectoryNode {
-            name: "foo".into(),
-            digest: DIRECTORY_A.digest().into(),
-            size: DIRECTORY_A.size() + 42, // wrong!
-        }],
-        ..Default::default()
+    let wrong_parent_directory = {
+        let mut dir = Directory::new();
+        dir.add(Node::Directory(
+            DirectoryNode::new(
+                "foo".into(),
+                DIRECTORY_A.digest(),
+                DIRECTORY_A.size() + 42, // wrong!
+            )
+            .unwrap(),
+        ))
+        .unwrap();
+        dir
     };
 
-    // Make sure isolated validation itself is ok
-    assert!(wrong_parent_directory.validate().is_ok());
-
     // Now upload both. Ensure it either fails during the second put, or during
     // the close.
     let mut handle = directory_service.put_multiple_start();
diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs
index 17a51ae2bbff..3e6dc73a4d6b 100644
--- a/tvix/castore/src/directoryservice/traverse.rs
+++ b/tvix/castore/src/directoryservice/traverse.rs
@@ -1,8 +1,5 @@
-use super::DirectoryService;
-use crate::{
-    proto::{node::Node, NamedNode},
-    B3Digest, Error, Path,
-};
+use super::{DirectoryService, NamedNode, Node};
+use crate::{Error, Path};
 use tracing::{instrument, warn};
 
 /// This descends from a (root) node to the given (sub)path, returning the Node
@@ -25,34 +22,31 @@ where
                 return Ok(None);
             }
             Node::Directory(directory_node) => {
-                let digest: B3Digest = directory_node
-                    .digest
-                    .try_into()
-                    .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
-
                 // fetch the linked node from the directory_service.
-                let directory =
-                    directory_service
-                        .as_ref()
-                        .get(&digest)
-                        .await?
-                        .ok_or_else(|| {
-                            // If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
-                            warn!("directory {} does not exist", digest);
-
-                            Error::StorageError(format!("directory {} does not exist", digest))
-                        })?;
+                let directory = directory_service
+                    .as_ref()
+                    .get(&directory_node.digest)
+                    .await?
+                    .ok_or_else(|| {
+                        // If we didn't get the directory node that's linked, that's a store inconsistency, bail out!
+                        warn!("directory {} does not exist", directory_node.digest);
+
+                        Error::StorageError(format!(
+                            "directory {} does not exist",
+                            directory_node.digest
+                        ))
+                    })?;
 
                 // look for the component in the [Directory].
                 // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we
                 // could stop as soon as e.name is larger than the search string.
                 if let Some(child_node) = directory.nodes().find(|n| n.get_name() == component) {
                     // child node found, update prev_node to that and continue.
-                    parent_node = child_node;
+                    parent_node = child_node.clone();
                 } else {
                     // child node not found means there's no such element inside the directory.
                     return Ok(None);
-                }
+                };
             }
         }
     }
@@ -65,6 +59,7 @@ where
 mod tests {
     use crate::{
         directoryservice,
+        directoryservice::{DirectoryNode, Node},
         fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP},
         PathBuf,
     };
@@ -88,21 +83,21 @@ mod tests {
         handle.close().await.expect("must upload");
 
         // construct the node for DIRECTORY_COMPLICATED
-        let node_directory_complicated =
-            crate::proto::node::Node::Directory(crate::proto::DirectoryNode {
-                name: "doesntmatter".into(),
-                digest: DIRECTORY_COMPLICATED.digest().into(),
-                size: DIRECTORY_COMPLICATED.size(),
-            });
+        let node_directory_complicated = Node::Directory(
+            DirectoryNode::new(
+                "doesntmatter".into(),
+                DIRECTORY_COMPLICATED.digest(),
+                DIRECTORY_COMPLICATED.size(),
+            )
+            .unwrap(),
+        );
 
         // construct the node for DIRECTORY_COMPLICATED
-        let node_directory_with_keep = crate::proto::node::Node::Directory(
-            DIRECTORY_COMPLICATED.directories.first().unwrap().clone(),
-        );
+        let node_directory_with_keep =
+            Node::Directory(DIRECTORY_COMPLICATED.directories().next().unwrap().clone());
 
         // construct the node for the .keep file
-        let node_file_keep =
-            crate::proto::node::Node::File(DIRECTORY_WITH_KEEP.files.first().unwrap().clone());
+        let node_file_keep = Node::File(DIRECTORY_WITH_KEEP.files().next().unwrap().clone());
 
         // traversal to an empty subpath should return the root node.
         {
diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs
index 726734f55eec..352362811a97 100644
--- a/tvix/castore/src/directoryservice/utils.rs
+++ b/tvix/castore/src/directoryservice/utils.rs
@@ -1,5 +1,5 @@
+use super::Directory;
 use super::DirectoryService;
-use crate::proto;
 use crate::B3Digest;
 use crate::Error;
 use async_stream::try_stream;
@@ -8,14 +8,14 @@ use std::collections::{HashSet, VecDeque};
 use tracing::instrument;
 use tracing::warn;
 
-/// Traverses a [proto::Directory] from the root to the children.
+/// Traverses a [Directory] from the root to the children.
 ///
 /// This is mostly BFS, but directories are only returned once.
 #[instrument(skip(directory_service))]
 pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
     directory_service: DS,
     root_directory_digest: &B3Digest,
-) -> BoxStream<'a, Result<proto::Directory, Error>> {
+) -> BoxStream<'a, Result<Directory, Error>> {
     // The list of all directories that still need to be traversed. The next
     // element is picked from the front, new elements are enqueued at the
     // back.
@@ -50,16 +50,6 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
                 Some(dir) => dir,
             };
 
-
-            // validate, we don't want to send invalid directories.
-            current_directory.validate().map_err(|e| {
-               warn!("directory failed validation: {}", e.to_string());
-               Error::StorageError(format!(
-                   "invalid directory: {}",
-                   current_directory_digest
-               ))
-            })?;
-
             // We're about to send this directory, so let's avoid sending it again if a
             // descendant has it.
             sent_directory_digests.insert(current_directory_digest);
@@ -67,9 +57,9 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
             // enqueue all child directory digests to the work queue, as
             // long as they're not part of the worklist or already sent.
             // This panics if the digest looks invalid, it's supposed to be checked first.
-            for child_directory_node in &current_directory.directories {
+            for child_directory_node in current_directory.directories() {
                 // TODO: propagate error
-                let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap();
+                let child_digest: B3Digest = child_directory_node.digest.clone();
 
                 if worklist_directory_digests.contains(&child_digest)
                     || sent_directory_digests.contains(&child_digest)