about summary refs log tree commit diff
diff options
context:
space:
mode:
authoredef <edef@edef.eu>2023-10-09T21·06+0000
committerclbot <clbot@tvl.fyi>2023-10-10T17·33+0000
commitd23fe6ee20eab7d9809bb0a762ab05bb4fcb8840 (patch)
treea78516aa567cc8afce739b8c1f759011d0b27744
parent8b35d97b4b4b166f89537634661990e3bcc9755a (diff)
feat(tvix/store): use tvix_compat::nar::writer::async r/6769
Change-Id: Iad36872244df6f2225a2884f6b20cacd8f918b31
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9619
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Autosubmit: edef <edef@edef.eu>
-rw-r--r--tvix/Cargo.lock13
-rw-r--r--tvix/Cargo.nix45
-rw-r--r--tvix/store/Cargo.toml5
-rw-r--r--tvix/store/src/nar/renderer.rs134
-rw-r--r--tvix/store/src/tests/nar_renderer.rs31
5 files changed, 156 insertions, 72 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index d59e82b3f4a4..23eba3000513 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -106,6 +106,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
 
 [[package]]
+name = "async-recursion"
+version = "1.0.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0"
+dependencies = [
+ "proc-macro2 1.0.67",
+ "quote 1.0.26",
+ "syn 2.0.16",
+]
+
+[[package]]
 name = "async-stream"
 version = "0.3.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2492,6 +2503,7 @@ checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d"
 dependencies = [
  "bytes",
  "futures-core",
+ "futures-io",
  "futures-sink",
  "pin-project-lite",
  "tokio",
@@ -2798,6 +2810,7 @@ name = "tvix-store"
 version = "0.1.0"
 dependencies = [
  "anyhow",
+ "async-recursion",
  "async-stream",
  "blake3",
  "bytes",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 5409a5b767c8..a13f130371e8 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -381,6 +381,35 @@ rec {
           "serde" = [ "dep:serde" ];
         };
       };
+      "async-recursion" = rec {
+        crateName = "async-recursion";
+        version = "1.0.5";
+        edition = "2018";
+        sha256 = "1l2vlgyaa9a2dd0y1vbqyppzsvpdr1y4rar4gn1qi68pl5dmmmaz";
+        procMacro = true;
+        authors = [
+          "Robert Usher <266585+dcchut@users.noreply.github.com>"
+        ];
+        dependencies = [
+          {
+            name = "proc-macro2";
+            packageId = "proc-macro2 1.0.67";
+            usesDefaultFeatures = false;
+          }
+          {
+            name = "quote";
+            packageId = "quote 1.0.26";
+            usesDefaultFeatures = false;
+          }
+          {
+            name = "syn";
+            packageId = "syn 2.0.16";
+            usesDefaultFeatures = false;
+            features = [ "full" "parsing" "printing" "proc-macro" "clone-impls" ];
+          }
+        ];
+
+      };
       "async-stream" = rec {
         crateName = "async-stream";
         version = "0.3.5";
@@ -2420,7 +2449,7 @@ rec {
         features = {
           "default" = [ "std" ];
         };
-        resolvedDefaultFeatures = [ "std" ];
+        resolvedDefaultFeatures = [ "default" "std" ];
       };
       "futures-macro" = rec {
         crateName = "futures-macro";
@@ -7291,6 +7320,11 @@ rec {
             packageId = "futures-core";
           }
           {
+            name = "futures-io";
+            packageId = "futures-io";
+            optional = true;
+          }
+          {
             name = "futures-sink";
             packageId = "futures-sink";
           }
@@ -7333,7 +7367,7 @@ rec {
           "time" = [ "tokio/time" "slab" ];
           "tracing" = [ "dep:tracing" ];
         };
-        resolvedDefaultFeatures = [ "codec" "default" "io" "io-util" "tracing" ];
+        resolvedDefaultFeatures = [ "codec" "compat" "default" "futures-io" "io" "io-util" "tracing" ];
       };
       "toml" = rec {
         crateName = "toml";
@@ -8475,6 +8509,10 @@ rec {
             packageId = "anyhow";
           }
           {
+            name = "async-recursion";
+            packageId = "async-recursion";
+          }
+          {
             name = "async-stream";
             packageId = "async-stream";
           }
@@ -8521,6 +8559,7 @@ rec {
           {
             name = "nix-compat";
             packageId = "nix-compat";
+            features = [ "async" ];
           }
           {
             name = "parking_lot";
@@ -8564,7 +8603,7 @@ rec {
           {
             name = "tokio-util";
             packageId = "tokio-util";
-            features = [ "io" "io-util" ];
+            features = [ "io" "io-util" "compat" ];
           }
           {
             name = "tonic";
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 43421ef4a302..a995c60c5b39 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -13,7 +13,7 @@ count-write = "0.1.0"
 data-encoding = "2.3.3"
 futures = "0.3.28"
 lazy_static = "1.4.0"
-nix-compat = { path = "../nix-compat" }
+nix-compat = { path = "../nix-compat", features = ["async"] }
 parking_lot = "0.12.1"
 pin-project-lite = "0.2.13"
 prost = "0.12.1"
@@ -21,7 +21,7 @@ sha2 = "0.10.6"
 sled = { version = "0.34.7", features = ["compression"] }
 thiserror = "1.0.38"
 tokio-stream = { version = "0.1.14", features = ["fs"] }
-tokio-util = { version = "0.7.9", features = ["io", "io-util"] }
+tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] }
 tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
 tonic = "0.10.2"
 tower = "0.4.13"
@@ -31,6 +31,7 @@ tvix-castore = { path = "../castore" }
 url = "2.4.0"
 walkdir = "2.4.0"
 tokio-listener = { version = "0.2.1" }
+async-recursion = "1.0.5"
 
 [dependencies.fuse-backend-rs]
 optional = true
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index 55dce911ee1a..6e98d2902df6 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -1,9 +1,15 @@
 use super::RenderError;
+use async_recursion::async_recursion;
 use count_write::CountWrite;
-use nix_compat::nar;
+use nix_compat::nar::writer::r#async as nar_writer;
 use sha2::{Digest, Sha256};
-use std::{io, sync::Arc};
-use tokio::{io::BufReader, task::spawn_blocking};
+use std::{
+    pin::Pin,
+    sync::Arc,
+    task::{self, Poll},
+};
+use tokio::io::{self, AsyncWrite, BufReader};
+use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
 use tracing::warn;
 use tvix_castore::{
     blobservice::BlobService,
@@ -19,57 +25,79 @@ pub async fn calculate_size_and_sha256(
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
 ) -> Result<(u64, [u8; 32]), RenderError> {
-    let h = Sha256::new();
-    let cw = CountWrite::from(h);
+    let mut h = Sha256::new();
+    let mut cw = CountWrite::from(&mut h);
+
+    write_nar(
+        // The hasher doesn't speak async. It doesn't
+        // actually do any I/O, so it's fine to wrap.
+        AsyncIoBridge(&mut cw),
+        root_node,
+        blob_service,
+        directory_service,
+    )
+    .await?;
+
+    Ok((cw.count(), h.finalize().into()))
+}
+
+/// The inverse of [tokio_util::io::SyncIoBridge].
+/// Don't use this with anything that actually does blocking I/O.
+struct AsyncIoBridge<T>(T);
+
+impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> {
+    fn poll_write(
+        self: Pin<&mut Self>,
+        _cx: &mut task::Context<'_>,
+        buf: &[u8],
+    ) -> Poll<io::Result<usize>> {
+        Poll::Ready(self.get_mut().0.write(buf))
+    }
 
-    let cw = write_nar(cw, root_node, blob_service, directory_service).await?;
+    fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
+        Poll::Ready(self.get_mut().0.flush())
+    }
 
-    Ok((cw.count(), cw.into_inner().finalize().into()))
+    fn poll_shutdown(
+        self: Pin<&mut Self>,
+        _cx: &mut task::Context<'_>,
+    ) -> Poll<Result<(), io::Error>> {
+        Poll::Ready(Ok(()))
+    }
 }
 
 /// Accepts a [castorepb::node::Node] pointing to the root of a (store) path,
 /// and uses the passed blob_service and directory_service to perform the
 /// necessary lookups as it traverses the structure.
-/// The contents in NAR serialization are writen to the passed [std::io::Write].
-///
-/// The writer is passed back in the return value. This is done because async Rust
-/// lacks scoped blocking tasks, so we need to transfer ownership of the writer
-/// internally.
-///
-/// # Panics
-/// This will panic if called outside the context of a Tokio runtime.
-pub async fn write_nar<W: std::io::Write + Send + 'static>(
-    mut w: W,
+/// The contents in NAR serialization are writen to the passed [AsyncWrite].
+pub async fn write_nar<W: AsyncWrite + Unpin + Send>(
+    w: W,
     proto_root_node: &castorepb::node::Node,
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
-) -> Result<W, RenderError> {
-    let tokio_handle = tokio::runtime::Handle::current();
-    let proto_root_node = proto_root_node.clone();
-
-    spawn_blocking(move || {
-        // Initialize NAR writer
-        let nar_root_node = nar::writer::open(&mut w).map_err(RenderError::NARWriterError)?;
-
-        walk_node(
-            tokio_handle,
-            nar_root_node,
-            &proto_root_node,
-            blob_service,
-            directory_service,
-        )?;
-
-        Ok(w)
-    })
-    .await
-    .unwrap()
+) -> Result<(), RenderError> {
+    // Initialize NAR writer
+    let mut w = w.compat_write();
+    let nar_root_node = nar_writer::open(&mut w)
+        .await
+        .map_err(RenderError::NARWriterError)?;
+
+    walk_node(
+        nar_root_node,
+        &proto_root_node,
+        blob_service,
+        directory_service,
+    )
+    .await?;
+
+    Ok(())
 }
 
 /// Process an intermediate node in the structure.
 /// This consumes the node.
-fn walk_node(
-    tokio_handle: tokio::runtime::Handle,
-    nar_node: nar::writer::Node,
+#[async_recursion]
+async fn walk_node(
+    nar_node: nar_writer::Node<'async_recursion, '_>,
     proto_node: &castorepb::node::Node,
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
@@ -78,6 +106,7 @@ fn walk_node(
         castorepb::node::Node::Symlink(proto_symlink_node) => {
             nar_node
                 .symlink(&proto_symlink_node.target)
+                .await
                 .map_err(RenderError::NARWriterError)?;
         }
         castorepb::node::Node::File(proto_file_node) => {
@@ -92,8 +121,9 @@ fn walk_node(
                 ))
             })?;
 
-            let blob_reader = match tokio_handle
-                .block_on(async { blob_service.open_read(&digest).await })
+            let blob_reader = match blob_service
+                .open_read(&digest)
+                .await
                 .map_err(RenderError::StoreError)?
             {
                 Some(blob_reader) => Ok(BufReader::new(blob_reader)),
@@ -107,8 +137,9 @@ fn walk_node(
                 .file(
                     proto_file_node.executable,
                     proto_file_node.size.into(),
-                    &mut tokio_util::io::SyncIoBridge::new(blob_reader),
+                    &mut blob_reader.compat(),
                 )
+                .await
                 .map_err(RenderError::NARWriterError)?;
         }
         castorepb::node::Node::Directory(proto_directory_node) => {
@@ -123,8 +154,9 @@ fn walk_node(
                 })?;
 
             // look it up with the directory service
-            match tokio_handle
-                .block_on(async { directory_service.get(&digest).await })
+            match directory_service
+                .get(&digest)
+                .await
                 .map_err(RenderError::StoreError)?
             {
                 // if it's None, that's an error!
@@ -136,27 +168,31 @@ fn walk_node(
                 }
                 Some(proto_directory) => {
                     // start a directory node
-                    let mut nar_node_directory =
-                        nar_node.directory().map_err(RenderError::NARWriterError)?;
+                    let mut nar_node_directory = nar_node
+                        .directory()
+                        .await
+                        .map_err(RenderError::NARWriterError)?;
 
                     // for each node in the directory, create a new entry with its name,
                     // and then invoke walk_node on that entry.
                     for proto_node in proto_directory.nodes() {
                         let child_node = nar_node_directory
                             .entry(proto_node.get_name())
+                            .await
                             .map_err(RenderError::NARWriterError)?;
                         walk_node(
-                            tokio_handle.clone(),
                             child_node,
                             &proto_node,
                             blob_service.clone(),
                             directory_service.clone(),
-                        )?;
+                        )
+                        .await?;
                     }
 
                     // close the directory
                     nar_node_directory
                         .close()
+                        .await
                         .map_err(RenderError::NARWriterError)?;
                 }
             }
diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs
index 485d7d115ff5..f555099877dc 100644
--- a/tvix/store/src/tests/nar_renderer.rs
+++ b/tvix/store/src/tests/nar_renderer.rs
@@ -4,16 +4,17 @@ use crate::tests::fixtures::*;
 use crate::tests::utils::*;
 use sha2::{Digest, Sha256};
 use std::io;
+use tokio::io::sink;
 use tvix_castore::proto::DirectoryNode;
 use tvix_castore::proto::FileNode;
 use tvix_castore::proto::{self as castorepb, SymlinkNode};
 
 #[tokio::test]
 async fn single_symlink() {
-    let buf: Vec<u8> = vec![];
+    let mut buf: Vec<u8> = vec![];
 
-    let buf = write_nar(
-        buf,
+    write_nar(
+        &mut buf,
         &castorepb::node::Node::Symlink(SymlinkNode {
             name: "doesntmatter".into(),
             target: "/nix/store/somewhereelse".into(),
@@ -31,10 +32,8 @@ async fn single_symlink() {
 /// Make sure the NARRenderer fails if a referred blob doesn't exist.
 #[tokio::test]
 async fn single_file_missing_blob() {
-    let buf: Vec<u8> = vec![];
-
     let e = write_nar(
-        buf,
+        sink(),
         &castorepb::node::Node::File(FileNode {
             name: "doesntmatter".into(),
             digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
@@ -78,10 +77,8 @@ async fn single_file_wrong_blob_size() {
     let bs = blob_service.clone();
     // Test with a root FileNode of a too big size
     {
-        let buf: Vec<u8> = vec![];
-
         let e = write_nar(
-            buf,
+            sink(),
             &castorepb::node::Node::File(FileNode {
                 name: "doesntmatter".into(),
                 digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
@@ -105,10 +102,8 @@ async fn single_file_wrong_blob_size() {
     let bs = blob_service.clone();
     // Test with a root FileNode of a too small size
     {
-        let buf: Vec<u8> = vec![];
-
         let e = write_nar(
-            buf,
+            sink(),
             &castorepb::node::Node::File(FileNode {
                 name: "doesntmatter".into(),
                 digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
@@ -148,10 +143,10 @@ async fn single_file() {
         writer.close().await.unwrap()
     );
 
-    let buf: Vec<u8> = vec![];
+    let mut buf: Vec<u8> = vec![];
 
-    let buf = write_nar(
-        buf,
+    write_nar(
+        &mut buf,
         &castorepb::node::Node::File(FileNode {
             name: "doesntmatter".into(),
             digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
@@ -192,13 +187,13 @@ async fn test_complicated() {
         .await
         .unwrap();
 
-    let buf: Vec<u8> = vec![];
+    let mut buf: Vec<u8> = vec![];
 
     let bs = blob_service.clone();
     let ds = directory_service.clone();
 
-    let buf = write_nar(
-        buf,
+    write_nar(
+        &mut buf,
         &castorepb::node::Node::Directory(DirectoryNode {
             name: "doesntmatter".into(),
             digest: DIRECTORY_COMPLICATED.digest().into(),