From 717081ae376b244f4649c6a50ba4b33bd41572f0 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 10 May 2024 13:47:32 +0300 Subject: fix(tvix/castore/directory/sled): use spawn_blocking This does IO, which might take a longer amount of time than what we want to be blocking the normal executor. Use spawn_blocking instead. I didn't add it for the constructors, as we only call these once. Change-Id: I96231fcff8d10abe90cafde25a099a2db6ea9414 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11617 Reviewed-by: Connor Brewster Tested-by: BuildkiteCI Autosubmit: flokli --- tvix/castore/src/directoryservice/sled.rs | 99 +++++++++++++++++++------------ 1 file changed, 60 insertions(+), 39 deletions(-) (limited to 'tvix/castore/src/directoryservice') diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index e4a4c2bbed78..d531513e1ed7 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -37,12 +37,23 @@ impl SledDirectoryService { impl DirectoryService for SledDirectoryService { #[instrument(skip(self, digest), fields(directory.digest = %digest))] async fn get(&self, digest: &B3Digest) -> Result, Error> { - match self.db.get(digest.as_slice()) { + let resp = tokio::task::spawn_blocking({ + let db = self.db.clone(); + let digest = digest.clone(); + move || db.get(digest.as_slice()) + }) + .await? + .map_err(|e| { + warn!("failed to retrieve directory: {}", e); + Error::StorageError(format!("failed to retrieve directory: {}", e)) + })?; + + match resp { // The directory was not found, return - Ok(None) => Ok(None), + None => Ok(None), // The directory was found, try to parse the data as Directory message - Ok(Some(data)) => match Directory::decode(&*data) { + Some(data) => match Directory::decode(&*data) { Ok(directory) => { // Validate the retrieved Directory indeed has the // digest we expect it to have, to detect corruptions. @@ -70,28 +81,31 @@ impl DirectoryService for SledDirectoryService { Err(Error::StorageError(e.to_string())) } }, - // some storage error? - Err(e) => Err(Error::StorageError(e.to_string())), } } #[instrument(skip(self, directory), fields(directory.digest = %directory.digest()))] async fn put(&self, directory: proto::Directory) -> Result { - let digest = directory.digest(); - - // validate the directory itself. - if let Err(e) = directory.validate() { - return Err(Error::InvalidRequest(format!( - "directory {} failed validation: {}", - digest, e, - ))); - } - // store it - let result = self.db.insert(digest.as_slice(), directory.encode_to_vec()); - if let Err(e) = result { - return Err(Error::StorageError(e.to_string())); - } - Ok(digest) + tokio::task::spawn_blocking({ + let db = self.db.clone(); + move || { + let digest = directory.digest(); + + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Error::InvalidRequest(format!( + "directory {} failed validation: {}", + digest, e, + ))); + } + // store it + db.insert(digest.as_slice(), directory.encode_to_vec()) + .map_err(|e| Error::StorageError(e.to_string()))?; + + Ok(digest) + } + }) + .await? } #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] @@ -143,25 +157,32 @@ impl DirectoryPutter for SledDirectoryPutter { match self.directory_validator.take() { None => Err(Error::InvalidRequest("already closed".to_string())), Some(validator) => { - // retrieve the validated directories. - let directories = validator.finalize()?; - - // Get the root digest, which is at the end (cf. insertion order) - let root_digest = directories - .last() - .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))? - .digest(); - - let mut batch = sled::Batch::default(); - for directory in directories { - batch.insert(directory.digest().as_slice(), directory.encode_to_vec()); - } - - self.tree - .apply_batch(batch) - .map_err(|e| Error::StorageError(format!("unable to apply batch: {}", e)))?; - - Ok(root_digest) + // Insert all directories as a batch. + tokio::task::spawn_blocking({ + let tree = self.tree.clone(); + move || { + // retrieve the validated directories. + let directories = validator.finalize()?; + + // Get the root digest, which is at the end (cf. insertion order) + let root_digest = directories + .last() + .ok_or_else(|| Error::InvalidRequest("got no directories".to_string()))? + .digest(); + + let mut batch = sled::Batch::default(); + for directory in directories { + batch.insert(directory.digest().as_slice(), directory.encode_to_vec()); + } + + tree.apply_batch(batch).map_err(|e| { + Error::StorageError(format!("unable to apply batch: {}", e)) + })?; + + Ok(root_digest) + } + }) + .await? } } } -- cgit 1.4.1