about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/src/directoryservice/sled.rs62
1 files changed, 59 insertions, 3 deletions
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs
index f41c5f718872..e4a4c2bbed78 100644
--- a/tvix/castore/src/directoryservice/sled.rs
+++ b/tvix/castore/src/directoryservice/sled.rs
@@ -1,14 +1,14 @@
-use crate::directoryservice::DirectoryPutter;
 use crate::proto::Directory;
 use crate::{proto, B3Digest, Error};
 use futures::stream::BoxStream;
 use prost::Message;
+use std::ops::Deref;
 use std::path::Path;
 use tonic::async_trait;
 use tracing::{instrument, warn};
 
 use super::utils::traverse_directory;
-use super::{DirectoryService, SimplePutter};
+use super::{ClosureValidator, DirectoryPutter, DirectoryService};
 
 #[derive(Clone)]
 pub struct SledDirectoryService {
@@ -107,6 +107,62 @@ impl DirectoryService for SledDirectoryService {
     where
         Self: Clone,
     {
-        Box::new(SimplePutter::new(self.clone()))
+        Box::new(SledDirectoryPutter {
+            tree: self.db.deref().clone(),
+            directory_validator: Some(Default::default()),
+        })
+    }
+}
+
+/// Buffers Directory messages to be uploaded and inserts them in a batch
+/// transaction on close.
+pub struct SledDirectoryPutter {
+    tree: sled::Tree,
+
+    /// The directories (inside the directory validator) that we insert later,
+    /// or None, if they were already inserted.
+    directory_validator: Option<ClosureValidator>,
+}
+
+#[async_trait]
+impl DirectoryPutter for SledDirectoryPutter {
+    #[instrument(level = "trace", skip_all, fields(directory.digest=%directory.digest()), err)]
+    async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
+        match self.directory_validator {
+            None => return Err(Error::StorageError("already closed".to_string())),
+            Some(ref mut validator) => {
+                validator.add(directory)?;
+            }
+        }
+
+        Ok(())
+    }
+
+    #[instrument(level = "trace", skip_all, ret, err)]
+    async fn close(&mut self) -> Result<B3Digest, Error> {
+        match self.directory_validator.take() {
+            None => Err(Error::InvalidRequest("already closed".to_string())),
+            Some(validator) => {
+                // retrieve the validated directories.
+                let directories = validator.finalize()?;
+
+                // Get the root digest, which is at the end (cf. insertion order)
+                let root_digest = directories
+                    .last()
+                    .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))?
+                    .digest();
+
+                let mut batch = sled::Batch::default();
+                for directory in directories {
+                    batch.insert(directory.digest().as_slice(), directory.encode_to_vec());
+                }
+
+                self.tree
+                    .apply_batch(batch)
+                    .map_err(|e| Error::StorageError(format!("unable to apply batch: {}", e)))?;
+
+                Ok(root_digest)
+            }
+        }
     }
 }