about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/castore/src/directoryservice/sled.rs99
1 files changed, 60 insertions, 39 deletions
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs
index e4a4c2bbed78..d531513e1ed7 100644
--- a/tvix/castore/src/directoryservice/sled.rs
+++ b/tvix/castore/src/directoryservice/sled.rs
@@ -37,12 +37,23 @@ impl SledDirectoryService {
 impl DirectoryService for SledDirectoryService {
     #[instrument(skip(self, digest), fields(directory.digest = %digest))]
     async fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> {
-        match self.db.get(digest.as_slice()) {
+        let resp = tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            let digest = digest.clone();
+            move || db.get(digest.as_slice())
+        })
+        .await?
+        .map_err(|e| {
+            warn!("failed to retrieve directory: {}", e);
+            Error::StorageError(format!("failed to retrieve directory: {}", e))
+        })?;
+
+        match resp {
             // The directory was not found, return
-            Ok(None) => Ok(None),
+            None => Ok(None),
 
             // The directory was found, try to parse the data as Directory message
-            Ok(Some(data)) => match Directory::decode(&*data) {
+            Some(data) => match Directory::decode(&*data) {
                 Ok(directory) => {
                     // Validate the retrieved Directory indeed has the
                     // digest we expect it to have, to detect corruptions.
@@ -70,28 +81,31 @@ impl DirectoryService for SledDirectoryService {
                     Err(Error::StorageError(e.to_string()))
                 }
             },
-            // some storage error?
-            Err(e) => Err(Error::StorageError(e.to_string())),
         }
     }
 
     #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))]
     async fn put(&self, directory: proto::Directory) -> Result<B3Digest, Error> {
-        let digest = directory.digest();
-
-        // validate the directory itself.
-        if let Err(e) = directory.validate() {
-            return Err(Error::InvalidRequest(format!(
-                "directory {} failed validation: {}",
-                digest, e,
-            )));
-        }
-        // store it
-        let result = self.db.insert(digest.as_slice(), directory.encode_to_vec());
-        if let Err(e) = result {
-            return Err(Error::StorageError(e.to_string()));
-        }
-        Ok(digest)
+        tokio::task::spawn_blocking({
+            let db = self.db.clone();
+            move || {
+                let digest = directory.digest();
+
+                // validate the directory itself.
+                if let Err(e) = directory.validate() {
+                    return Err(Error::InvalidRequest(format!(
+                        "directory {} failed validation: {}",
+                        digest, e,
+                    )));
+                }
+                // store it
+                db.insert(digest.as_slice(), directory.encode_to_vec())
+                    .map_err(|e| Error::StorageError(e.to_string()))?;
+
+                Ok(digest)
+            }
+        })
+        .await?
     }
 
     #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
@@ -143,25 +157,32 @@ impl DirectoryPutter for SledDirectoryPutter {
         match self.directory_validator.take() {
             None => Err(Error::InvalidRequest("already closed".to_string())),
             Some(validator) => {
-                // retrieve the validated directories.
-                let directories = validator.finalize()?;
-
-                // Get the root digest, which is at the end (cf. insertion order)
-                let root_digest = directories
-                    .last()
-                    .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))?
-                    .digest();
-
-                let mut batch = sled::Batch::default();
-                for directory in directories {
-                    batch.insert(directory.digest().as_slice(), directory.encode_to_vec());
-                }
-
-                self.tree
-                    .apply_batch(batch)
-                    .map_err(|e| Error::StorageError(format!("unable to apply batch: {}", e)))?;
-
-                Ok(root_digest)
+                // Insert all directories as a batch.
+                tokio::task::spawn_blocking({
+                    let tree = self.tree.clone();
+                    move || {
+                        // retrieve the validated directories.
+                        let directories = validator.finalize()?;
+
+                        // Get the root digest, which is at the end (cf. insertion order)
+                        let root_digest = directories
+                            .last()
+                            .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))?
+                            .digest();
+
+                        let mut batch = sled::Batch::default();
+                        for directory in directories {
+                            batch.insert(directory.digest().as_slice(), directory.encode_to_vec());
+                        }
+
+                        tree.apply_batch(batch).map_err(|e| {
+                            Error::StorageError(format!("unable to apply batch: {}", e))
+                        })?;
+
+                        Ok(root_digest)
+                    }
+                })
+                .await?
             }
         }
     }