about summary refs log tree commit diff
path: root/tvix/cli/src/tvix_store_io.rs
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-09-19T16·46-0500
committerclbot <clbot@tvl.fyi>2023-09-21T17·58+0000
commit37a348b4fae16b2b1c5ec12deaa085a049833d7f (patch)
tree7a1b1a7160036777b010cd81628960c1ca07486e /tvix/cli/src/tvix_store_io.rs
parent7e737fde34260daa477794d63b0b3344b4a1d81b (diff)
refactor(tvix/store): Asyncify PathInfoService and DirectoryService r/6623
We've decided to asyncify all of the services to reduce some of the
pains going back and for between sync<->async. The end goal will be for
all the tvix-store internals to be async and then expose a sync
interface for things like tvix eval io.

Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369
Autosubmit: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/cli/src/tvix_store_io.rs')
-rw-r--r--tvix/cli/src/tvix_store_io.rs70
1 files changed, 39 insertions, 31 deletions
diff --git a/tvix/cli/src/tvix_store_io.rs b/tvix/cli/src/tvix_store_io.rs
index 1a373a705fe4..1ea718f1a188 100644
--- a/tvix/cli/src/tvix_store_io.rs
+++ b/tvix/cli/src/tvix_store_io.rs
@@ -57,12 +57,15 @@ impl TvixStoreIO {
         store_path: &StorePath,
         sub_path: &Path,
     ) -> Result<Option<Node>, io::Error> {
-        let path_info = {
-            match self.path_info_service.get(store_path.digest)? {
-                // If there's no PathInfo found, early exit
-                None => return Ok(None),
-                Some(path_info) => path_info,
-            }
+        let path_info_service = self.path_info_service.clone();
+        let digest = store_path.digest.clone();
+        let task = self
+            .tokio_handle
+            .spawn(async move { path_info_service.get(digest).await });
+        let path_info = match self.tokio_handle.block_on(task).unwrap()? {
+            // If there's no PathInfo found, early exit
+            None => return Ok(None),
+            Some(path_info) => path_info,
         };
 
         let root_node = {
@@ -84,11 +87,13 @@ impl TvixStoreIO {
             }
         };
 
-        Ok(directoryservice::traverse_to(
-            self.directory_service.clone(),
-            root_node,
-            sub_path,
-        )?)
+        let directory_service = self.directory_service.clone();
+        let sub_path = sub_path.to_owned();
+        let task = self.tokio_handle.spawn(async move {
+            directoryservice::traverse_to(directory_service, root_node, &sub_path).await
+        });
+
+        Ok(self.tokio_handle.block_on(task).unwrap()?)
     }
 }
 
@@ -195,17 +200,23 @@ impl EvalIO for TvixStoreIO {
                 match node {
                     Node::Directory(directory_node) => {
                         // fetch the Directory itself.
-                        let digest = directory_node.digest.clone().try_into().map_err(|_e| {
-                            io::Error::new(
-                                io::ErrorKind::InvalidData,
-                                format!(
-                                    "invalid digest length in directory node: {:?}",
-                                    directory_node
-                                ),
-                            )
-                        })?;
-
-                        if let Some(directory) = self.directory_service.get(&digest)? {
+                        let digest: B3Digest =
+                            directory_node.digest.clone().try_into().map_err(|_e| {
+                                io::Error::new(
+                                    io::ErrorKind::InvalidData,
+                                    format!(
+                                        "invalid digest length in directory node: {:?}",
+                                        directory_node
+                                    ),
+                                )
+                            })?;
+
+                        let directory_service = self.directory_service.clone();
+                        let digest_clone = digest.clone();
+                        let task = self
+                            .tokio_handle
+                            .spawn(async move { directory_service.get(&digest_clone).await });
+                        if let Some(directory) = self.tokio_handle.block_on(task).unwrap()? {
                             let mut children: Vec<(bytes::Bytes, FileType)> = Vec::new();
                             for node in directory.nodes() {
                                 children.push(match node {
@@ -299,14 +310,11 @@ async fn import_path_with_pathinfo(
         .await
         .expect("error during import_path");
 
-    // Render the NAR. This is blocking.
-    let calc_task = tokio::task::spawn_blocking(move || {
-        let (nar_size, nar_sha256) =
-            calculate_size_and_sha256(&root_node, blob_service.clone(), directory_service.clone())
-                .expect("error during nar calculation"); // TODO: handle error
-        (nar_size, nar_sha256, root_node)
-    });
-    let (nar_size, nar_sha256, root_node) = calc_task.await.unwrap();
+    // Render the NAR.
+    let (nar_size, nar_sha256) =
+        calculate_size_and_sha256(&root_node, blob_service.clone(), directory_service.clone())
+            .await
+            .expect("error during nar calculation"); // TODO: handle error
 
     // TODO: make a path_to_name helper function?
     let name = path
@@ -339,7 +347,7 @@ async fn import_path_with_pathinfo(
 
     // put into [PathInfoService], and return the [PathInfo] that we get
     // back from there (it might contain additional signatures).
-    let path_info = path_info_service.put(path_info)?;
+    let path_info = path_info_service.put(path_info).await?;
 
     Ok(path_info)
 }