about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r--tvix/castore/src/directoryservice/memory.rs4
-rw-r--r--tvix/castore/src/directoryservice/mod.rs3
-rw-r--r--tvix/castore/src/directoryservice/simple_putter.rs75
-rw-r--r--tvix/castore/src/directoryservice/sled.rs4
-rw-r--r--tvix/castore/src/directoryservice/utils.rs55
5 files changed, 82 insertions, 59 deletions
diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs
index 528ffe2f2c03..2cbbbd1b1657 100644
--- a/tvix/castore/src/directoryservice/memory.rs
+++ b/tvix/castore/src/directoryservice/memory.rs
@@ -5,8 +5,8 @@ use std::sync::{Arc, RwLock};
 use tonic::async_trait;
 use tracing::{instrument, warn};
 
-use super::utils::{traverse_directory, SimplePutter};
-use super::{DirectoryPutter, DirectoryService};
+use super::utils::traverse_directory;
+use super::{DirectoryPutter, DirectoryService, SimplePutter};
 
 #[derive(Clone, Default)]
 pub struct MemoryDirectoryService {
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
index b6d1c2fcab13..f9d8e08b31e0 100644
--- a/tvix/castore/src/directoryservice/mod.rs
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -6,6 +6,7 @@ mod closure_validator;
 mod from_addr;
 mod grpc;
 mod memory;
+mod simple_putter;
 mod sled;
 mod traverse;
 mod utils;
@@ -14,8 +15,10 @@ pub use self::closure_validator::ClosureValidator;
 pub use self::from_addr::from_addr;
 pub use self::grpc::GRPCDirectoryService;
 pub use self::memory::MemoryDirectoryService;
+pub use self::simple_putter::SimplePutter;
 pub use self::sled::SledDirectoryService;
 pub use self::traverse::descend_to;
+pub use self::utils::traverse_directory;
 
 /// The base trait all Directory services need to implement.
 /// This is a simple get and put of [crate::proto::Directory], returning their
diff --git a/tvix/castore/src/directoryservice/simple_putter.rs b/tvix/castore/src/directoryservice/simple_putter.rs
new file mode 100644
index 000000000000..25617ebcac82
--- /dev/null
+++ b/tvix/castore/src/directoryservice/simple_putter.rs
@@ -0,0 +1,75 @@
+use super::ClosureValidator;
+use super::DirectoryPutter;
+use super::DirectoryService;
+use crate::proto;
+use crate::B3Digest;
+use crate::Error;
+use tonic::async_trait;
+use tracing::instrument;
+use tracing::warn;
+
+/// This is an implementation of DirectoryPutter that simply
+/// inserts individual Directory messages one by one, on close, after
+/// they successfully validated.
+pub struct SimplePutter<DS: DirectoryService> {
+    directory_service: DS,
+
+    directory_validator: Option<ClosureValidator>,
+}
+
+impl<DS: DirectoryService> SimplePutter<DS> {
+    pub fn new(directory_service: DS) -> Self {
+        Self {
+            directory_service,
+            directory_validator: Some(Default::default()),
+        }
+    }
+}
+
+#[async_trait]
+impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
+    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+        match self.directory_validator {
+            None => return Err(Error::StorageError("already closed".to_string())),
+            Some(ref mut validator) => {
+                validator.add(directory)?;
+            }
+        }
+
+        Ok(())
+    }
+
+    #[instrument(level = "trace", skip_all, ret, err)]
+    async fn close(&mut self) -> Result<B3Digest, Error> {
+        match self.directory_validator.take() {
+            None => Err(Error::InvalidRequest("already closed".to_string())),
+            Some(validator) => {
+                // retrieve the validated directories.
+                let directories = validator.finalize()?;
+
+                // Get the root digest, which is at the end (cf. insertion order)
+                let root_digest = directories
+                    .last()
+                    .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))?
+                    .digest();
+
+                // call an individual put for each directory and await the insertion.
+                for directory in directories {
+                    let exp_digest = directory.digest();
+                    let actual_digest = self.directory_service.put(directory).await?;
+
+                    // ensure the digest the backend told us matches our expectations.
+                    if exp_digest != actual_digest {
+                        warn!(directory.digest_expected=%exp_digest, directory.digest_actual=%actual_digest, "unexpected digest");
+                        return Err(Error::StorageError(
+                            "got unexpected digest from backend during put".into(),
+                        ));
+                    }
+                }
+
+                Ok(root_digest)
+            }
+        }
+    }
+}
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs
index 9acd3854184b..f41c5f718872 100644
--- a/tvix/castore/src/directoryservice/sled.rs
+++ b/tvix/castore/src/directoryservice/sled.rs
@@ -7,8 +7,8 @@ use std::path::Path;
 use tonic::async_trait;
 use tracing::{instrument, warn};
 
-use super::utils::{traverse_directory, SimplePutter};
-use super::DirectoryService;
+use super::utils::traverse_directory;
+use super::{DirectoryService, SimplePutter};
 
 #[derive(Clone)]
 pub struct SledDirectoryService {
diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs
index 6fa1a9e5fda0..01c521076c9c 100644
--- a/tvix/castore/src/directoryservice/utils.rs
+++ b/tvix/castore/src/directoryservice/utils.rs
@@ -1,4 +1,3 @@
-use super::DirectoryPutter;
 use super::DirectoryService;
 use crate::proto;
 use crate::B3Digest;
@@ -6,8 +5,6 @@ use crate::Error;
 use async_stream::stream;
 use futures::stream::BoxStream;
 use std::collections::{HashSet, VecDeque};
-use tonic::async_trait;
-use tracing::instrument;
 use tracing::warn;
 
 /// Traverses a [proto::Directory] from the root to the children.
@@ -83,55 +80,3 @@ pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
 
     Box::pin(stream)
 }
-
-/// This is a simple implementation of a Directory uploader.
-/// TODO: verify connectivity? Factor out these checks into generic helpers?
-pub struct SimplePutter<DS: DirectoryService> {
-    directory_service: DS,
-    last_directory_digest: Option<B3Digest>,
-    closed: bool,
-}
-
-impl<DS: DirectoryService> SimplePutter<DS> {
-    pub fn new(directory_service: DS) -> Self {
-        Self {
-            directory_service,
-            closed: false,
-            last_directory_digest: None,
-        }
-    }
-}
-
-#[async_trait]
-impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
-    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
-    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
-        if self.closed {
-            return Err(Error::StorageError("already closed".to_string()));
-        }
-
-        let digest = self.directory_service.put(directory).await?;
-
-        // track the last directory digest
-        self.last_directory_digest = Some(digest);
-
-        Ok(())
-    }
-
-    #[instrument(level = "trace", skip_all, ret, err)]
-    async fn close(&mut self) -> Result<B3Digest, Error> {
-        if self.closed {
-            return Err(Error::StorageError("already closed".to_string()));
-        }
-
-        match &self.last_directory_digest {
-            Some(last_digest) => {
-                self.closed = true;
-                Ok(last_digest.clone())
-            }
-            None => Err(Error::InvalidRequest(
-                "no directories sent, can't show root digest".to_string(),
-            )),
-        }
-    }
-}