about summary refs log tree commit diff
path: root/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/proto/grpc_directoryservice_wrapper.rs')
-rw-r--r--tvix/castore/src/proto/grpc_directoryservice_wrapper.rs32
1 files changed, 21 insertions, 11 deletions
diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
index 5c1428690cb4..62fdb34a25a0 100644
--- a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
+++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
@@ -1,6 +1,5 @@
-use crate::directoryservice::ClosureValidator;
-use crate::proto;
-use crate::{directoryservice::DirectoryService, B3Digest};
+use crate::directoryservice::{DirectoryGraph, DirectoryService, LeavesToRootValidator};
+use crate::{proto, B3Digest, DirectoryError};
 use futures::stream::BoxStream;
 use futures::TryStreamExt;
 use std::ops::Deref;
@@ -57,13 +56,16 @@ where
                                 Status::not_found(format!("directory {} not found", digest))
                             })?;
 
-                        Box::pin(once(Ok(directory)))
+                        Box::pin(once(Ok(directory.into())))
                     } else {
                         // If recursive was requested, traverse via get_recursive.
                         Box::pin(
-                            self.directory_service.get_recursive(&digest).map_err(|e| {
-                                tonic::Status::new(tonic::Code::Internal, e.to_string())
-                            }),
+                            self.directory_service
+                                .get_recursive(&digest)
+                                .map_ok(proto::Directory::from)
+                                .map_err(|e| {
+                                    tonic::Status::new(tonic::Code::Internal, e.to_string())
+                                }),
                         )
                     }
                 }))
@@ -78,14 +80,22 @@ where
     ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
         let mut req_inner = request.into_inner();
 
-        // We put all Directory messages we receive into ClosureValidator first.
-        let mut validator = ClosureValidator::default();
+        // We put all Directory messages we receive into DirectoryGraph.
+        let mut validator = DirectoryGraph::<LeavesToRootValidator>::default();
         while let Some(directory) = req_inner.message().await? {
-            validator.add(directory)?;
+            validator
+                .add(directory.try_into().map_err(|e: DirectoryError| {
+                    tonic::Status::new(tonic::Code::Internal, e.to_string())
+                })?)
+                .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?;
         }
 
         // drain, which validates connectivity too.
-        let directories = validator.finalize()?;
+        let directories = validator
+            .validate()
+            .map_err(|e| tonic::Status::new(tonic::Code::Internal, e.to_string()))?
+            .drain_leaves_to_root()
+            .collect::<Vec<_>>();
 
         let mut directory_putter = self.directory_service.put_multiple_start();
         for directory in directories {