about summary refs log tree commit diff
path: root/tvix/store/src/nar/renderer.rs
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-09-19T16·46-0500
committerclbot <clbot@tvl.fyi>2023-09-21T17·58+0000
commit37a348b4fae16b2b1c5ec12deaa085a049833d7f (patch)
tree7a1b1a7160036777b010cd81628960c1ca07486e /tvix/store/src/nar/renderer.rs
parent7e737fde34260daa477794d63b0b3344b4a1d81b (diff)
refactor(tvix/store): Asyncify PathInfoService and DirectoryService r/6623
We've decided to asyncify all of the services to reduce some of the
pains going back and for between sync<->async. The end goal will be for
all the tvix-store internals to be async and then expose a sync
interface for things like tvix eval io.

Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369
Autosubmit: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/nar/renderer.rs')
-rw-r--r--tvix/store/src/nar/renderer.rs61
1 files changed, 38 insertions, 23 deletions
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index 4255148fc5..f1392472a5 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -8,20 +8,20 @@ use count_write::CountWrite;
 use nix_compat::nar;
 use sha2::{Digest, Sha256};
 use std::{io, sync::Arc};
-use tokio::io::BufReader;
+use tokio::{io::BufReader, task::spawn_blocking};
 use tracing::warn;
 
 /// Invoke [write_nar], and return the size and sha256 digest of the produced
 /// NAR output.
-pub fn calculate_size_and_sha256(
+pub async fn calculate_size_and_sha256(
     root_node: &proto::node::Node,
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
 ) -> Result<(u64, [u8; 32]), RenderError> {
     let h = Sha256::new();
-    let mut cw = CountWrite::from(h);
+    let cw = CountWrite::from(h);
 
-    write_nar(&mut cw, root_node, blob_service, directory_service)?;
+    let cw = write_nar(cw, root_node, blob_service, directory_service).await?;
 
     Ok((cw.count(), cw.into_inner().finalize().into()))
 }
@@ -30,26 +30,44 @@ pub fn calculate_size_and_sha256(
 /// and uses the passed blob_service and directory_service to
 /// perform the necessary lookups as it traverses the structure.
 /// The contents in NAR serialization are writen to the passed [std::io::Write].
-pub fn write_nar<W: std::io::Write>(
-    w: &mut W,
+///
+/// The writer is passed back in the return value. This is done because async Rust
+/// lacks scoped blocking tasks, so we need to transfer ownership of the writer
+/// internally.
+///
+/// # Panics
+/// This will panic if called outside the context of a Tokio runtime.
+pub async fn write_nar<W: std::io::Write + Send + 'static>(
+    mut w: W,
     proto_root_node: &proto::node::Node,
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
-) -> Result<(), RenderError> {
-    // Initialize NAR writer
-    let nar_root_node = nar::writer::open(w).map_err(RenderError::NARWriterError)?;
+) -> Result<W, RenderError> {
+    let tokio_handle = tokio::runtime::Handle::current();
+    let proto_root_node = proto_root_node.clone();
+
+    spawn_blocking(move || {
+        // Initialize NAR writer
+        let nar_root_node = nar::writer::open(&mut w).map_err(RenderError::NARWriterError)?;
 
-    walk_node(
-        nar_root_node,
-        proto_root_node,
-        blob_service,
-        directory_service,
-    )
+        walk_node(
+            tokio_handle,
+            nar_root_node,
+            &proto_root_node,
+            blob_service,
+            directory_service,
+        )?;
+
+        Ok(w)
+    })
+    .await
+    .unwrap()
 }
 
 /// Process an intermediate node in the structure.
 /// This consumes the node.
 fn walk_node(
+    tokio_handle: tokio::runtime::Handle,
     nar_node: nar::writer::Node,
     proto_node: &proto::node::Node,
     blob_service: Arc<dyn BlobService>,
@@ -73,9 +91,6 @@ fn walk_node(
                 ))
             })?;
 
-            // HACK: blob_service is async, but this function isn't async yet..
-            let tokio_handle = tokio::runtime::Handle::current();
-
             let blob_reader = match tokio_handle
                 .block_on(async { blob_service.open_read(&digest).await })
                 .map_err(RenderError::StoreError)?
@@ -107,11 +122,10 @@ fn walk_node(
                 })?;
 
             // look it up with the directory service
-            let resp = directory_service
-                .get(&digest)
-                .map_err(RenderError::StoreError)?;
-
-            match resp {
+            match tokio_handle
+                .block_on(async { directory_service.get(&digest).await })
+                .map_err(RenderError::StoreError)?
+            {
                 // if it's None, that's an error!
                 None => {
                     return Err(RenderError::DirectoryNotFound(
@@ -131,6 +145,7 @@ fn walk_node(
                             .entry(proto_node.get_name())
                             .map_err(RenderError::NARWriterError)?;
                         walk_node(
+                            tokio_handle.clone(),
                             child_node,
                             &proto_node,
                             blob_service.clone(),