about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice/object_store.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/directoryservice/object_store.rs')
-rw-r--r--tvix/castore/src/directoryservice/object_store.rs19
1 files changed, 12 insertions, 7 deletions
diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs
index a9a2cc8ef5c0..6e71c2356def 100644
--- a/tvix/castore/src/directoryservice/object_store.rs
+++ b/tvix/castore/src/directoryservice/object_store.rs
@@ -17,7 +17,8 @@ use tracing::{instrument, trace, warn, Level};
 use url::Url;
 
 use super::{
-    DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator,
+    Directory, DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator,
+    RootToLeavesValidator,
 };
 use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
@@ -78,13 +79,13 @@ impl DirectoryService for ObjectStoreDirectoryService {
     /// This is the same steps as for get_recursive anyways, so we just call get_recursive and
     /// return the first element of the stream and drop the request.
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
-    async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
+    async fn get(&self, digest: &B3Digest) -> Result<Option<Directory>, Error> {
         self.get_recursive(digest).take(1).next().await.transpose()
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
-    async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
-        if !directory.directories.is_empty() {
+    async fn put(&self, directory: Directory) -> Result<B3Digest, Error> {
+        if directory.directories().next().is_some() {
             return Err(Error::InvalidRequest(
                     "only put_multiple_start is supported by the ObjectStoreDirectoryService for directories with children".into(),
             ));
@@ -99,7 +100,7 @@ impl DirectoryService for ObjectStoreDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> BoxStream<'static, Result<proto::Directory, Error>> {
+    ) -> BoxStream<'static, Result<Directory, Error>> {
         // Check that we are not passing on bogus from the object store to the client, and that the
         // trust chain from the root digest to the leaves is intact
         let mut order_validator =
@@ -145,6 +146,10 @@ impl DirectoryService for ObjectStoreDirectoryService {
                             warn!("unable to parse directory {}: {}", digest, e);
                             Error::StorageError(e.to_string())
                         })?;
+                        let directory = Directory::try_from(directory).map_err(|e| {
+                            warn!("unable to convert directory {}: {}", digest, e);
+                            Error::StorageError(e.to_string())
+                        })?;
 
                         // Allow the children to appear next
                         order_validator.add_directory_unchecked(&directory);
@@ -244,7 +249,7 @@ impl ObjectStoreDirectoryPutter {
 #[async_trait]
 impl DirectoryPutter for ObjectStoreDirectoryPutter {
     #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+    async fn put(&mut self, directory: Directory) -> Result<(), Error> {
         match self.directory_validator {
             None => return Err(Error::StorageError("already closed".to_string())),
             Some(ref mut validator) => {
@@ -302,7 +307,7 @@ impl DirectoryPutter for ObjectStoreDirectoryPutter {
 
                 for directory in directories {
                     directories_sink
-                        .send(directory.encode_to_vec().into())
+                        .send(proto::Directory::from(directory).encode_to_vec().into())
                         .await?;
                 }