diff options
author | edef <edef@edef.eu> | 2023-10-09T21·06+0000 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-10-10T17·33+0000 |
commit | d23fe6ee20eab7d9809bb0a762ab05bb4fcb8840 (patch) | |
tree | a78516aa567cc8afce739b8c1f759011d0b27744 | |
parent | 8b35d97b4b4b166f89537634661990e3bcc9755a (diff) |
feat(tvix/store): use tvix_compat::nar::writer::async r/6769
Change-Id: Iad36872244df6f2225a2884f6b20cacd8f918b31 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9619 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de> Autosubmit: edef <edef@edef.eu>
-rw-r--r-- | tvix/Cargo.lock | 13 | ||||
-rw-r--r-- | tvix/Cargo.nix | 45 | ||||
-rw-r--r-- | tvix/store/Cargo.toml | 5 | ||||
-rw-r--r-- | tvix/store/src/nar/renderer.rs | 134 | ||||
-rw-r--r-- | tvix/store/src/tests/nar_renderer.rs | 31 |
5 files changed, 156 insertions, 72 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index d59e82b3f4a4..23eba3000513 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -106,6 +106,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] +name = "async-recursion" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0" +dependencies = [ + "proc-macro2 1.0.67", + "quote 1.0.26", + "syn 2.0.16", +] + +[[package]] name = "async-stream" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2492,6 +2503,7 @@ checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -2798,6 +2810,7 @@ name = "tvix-store" version = "0.1.0" dependencies = [ "anyhow", + "async-recursion", "async-stream", "blake3", "bytes", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 5409a5b767c8..a13f130371e8 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -381,6 +381,35 @@ rec { "serde" = [ "dep:serde" ]; }; }; + "async-recursion" = rec { + crateName = "async-recursion"; + version = "1.0.5"; + edition = "2018"; + sha256 = "1l2vlgyaa9a2dd0y1vbqyppzsvpdr1y4rar4gn1qi68pl5dmmmaz"; + procMacro = true; + authors = [ + "Robert Usher <266585+dcchut@users.noreply.github.com>" + ]; + dependencies = [ + { + name = "proc-macro2"; + packageId = "proc-macro2 1.0.67"; + usesDefaultFeatures = false; + } + { + name = "quote"; + packageId = "quote 1.0.26"; + usesDefaultFeatures = false; + } + { + name = "syn"; + packageId = "syn 2.0.16"; + usesDefaultFeatures = false; + features = [ "full" "parsing" "printing" "proc-macro" "clone-impls" ]; + } + ]; + + }; "async-stream" = rec { crateName = "async-stream"; version = "0.3.5"; @@ -2420,7 +2449,7 @@ rec { features = { "default" = [ "std" ]; }; - resolvedDefaultFeatures = [ "std" ]; + resolvedDefaultFeatures = [ "default" "std" ]; }; "futures-macro" = rec { crateName = "futures-macro"; @@ -7291,6 +7320,11 @@ rec { packageId = "futures-core"; } { + name = "futures-io"; + packageId = "futures-io"; + optional = true; + } + { name = "futures-sink"; packageId = "futures-sink"; } @@ -7333,7 +7367,7 @@ rec { "time" = [ "tokio/time" "slab" ]; "tracing" = [ "dep:tracing" ]; }; - resolvedDefaultFeatures = [ "codec" "default" "io" "io-util" "tracing" ]; + resolvedDefaultFeatures = [ "codec" "compat" "default" "futures-io" "io" "io-util" "tracing" ]; }; "toml" = rec { crateName = "toml"; @@ -8475,6 +8509,10 @@ rec { packageId = "anyhow"; } { + name = "async-recursion"; + packageId = "async-recursion"; + } + { name = "async-stream"; packageId = "async-stream"; } @@ -8521,6 +8559,7 @@ rec { { name = "nix-compat"; packageId = "nix-compat"; + features = [ "async" ]; } { name = "parking_lot"; @@ -8564,7 +8603,7 @@ rec { { name = "tokio-util"; packageId = "tokio-util"; - features = [ "io" "io-util" ]; + features = [ "io" "io-util" "compat" ]; } { name = "tonic"; diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 43421ef4a302..a995c60c5b39 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -13,7 +13,7 @@ count-write = "0.1.0" data-encoding = "2.3.3" futures = "0.3.28" lazy_static = "1.4.0" -nix-compat = { path = "../nix-compat" } +nix-compat = { path = "../nix-compat", features = ["async"] } parking_lot = "0.12.1" pin-project-lite = "0.2.13" prost = "0.12.1" @@ -21,7 +21,7 @@ sha2 = "0.10.6" sled = { version = "0.34.7", features = ["compression"] } thiserror = "1.0.38" tokio-stream = { version = "0.1.14", features = ["fs"] } -tokio-util = { version = "0.7.9", features = ["io", "io-util"] } +tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } tonic = "0.10.2" tower = "0.4.13" @@ -31,6 +31,7 @@ tvix-castore = { path = "../castore" } url = "2.4.0" walkdir = "2.4.0" tokio-listener = { version = "0.2.1" } +async-recursion = "1.0.5" [dependencies.fuse-backend-rs] optional = true diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index 55dce911ee1a..6e98d2902df6 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -1,9 +1,15 @@ use super::RenderError; +use async_recursion::async_recursion; use count_write::CountWrite; -use nix_compat::nar; +use nix_compat::nar::writer::r#async as nar_writer; use sha2::{Digest, Sha256}; -use std::{io, sync::Arc}; -use tokio::{io::BufReader, task::spawn_blocking}; +use std::{ + pin::Pin, + sync::Arc, + task::{self, Poll}, +}; +use tokio::io::{self, AsyncWrite, BufReader}; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use tracing::warn; use tvix_castore::{ blobservice::BlobService, @@ -19,57 +25,79 @@ pub async fn calculate_size_and_sha256( blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) -> Result<(u64, [u8; 32]), RenderError> { - let h = Sha256::new(); - let cw = CountWrite::from(h); + let mut h = Sha256::new(); + let mut cw = CountWrite::from(&mut h); + + write_nar( + // The hasher doesn't speak async. It doesn't + // actually do any I/O, so it's fine to wrap. + AsyncIoBridge(&mut cw), + root_node, + blob_service, + directory_service, + ) + .await?; + + Ok((cw.count(), h.finalize().into())) +} + +/// The inverse of [tokio_util::io::SyncIoBridge]. +/// Don't use this with anything that actually does blocking I/O. +struct AsyncIoBridge<T>(T); + +impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + Poll::Ready(self.get_mut().0.write(buf)) + } - let cw = write_nar(cw, root_node, blob_service, directory_service).await?; + fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(self.get_mut().0.flush()) + } - Ok((cw.count(), cw.into_inner().finalize().into())) + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + ) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } } /// Accepts a [castorepb::node::Node] pointing to the root of a (store) path, /// and uses the passed blob_service and directory_service to perform the /// necessary lookups as it traverses the structure. -/// The contents in NAR serialization are writen to the passed [std::io::Write]. -/// -/// The writer is passed back in the return value. This is done because async Rust -/// lacks scoped blocking tasks, so we need to transfer ownership of the writer -/// internally. -/// -/// # Panics -/// This will panic if called outside the context of a Tokio runtime. -pub async fn write_nar<W: std::io::Write + Send + 'static>( - mut w: W, +/// The contents in NAR serialization are writen to the passed [AsyncWrite]. +pub async fn write_nar<W: AsyncWrite + Unpin + Send>( + w: W, proto_root_node: &castorepb::node::Node, blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, -) -> Result<W, RenderError> { - let tokio_handle = tokio::runtime::Handle::current(); - let proto_root_node = proto_root_node.clone(); - - spawn_blocking(move || { - // Initialize NAR writer - let nar_root_node = nar::writer::open(&mut w).map_err(RenderError::NARWriterError)?; - - walk_node( - tokio_handle, - nar_root_node, - &proto_root_node, - blob_service, - directory_service, - )?; - - Ok(w) - }) - .await - .unwrap() +) -> Result<(), RenderError> { + // Initialize NAR writer + let mut w = w.compat_write(); + let nar_root_node = nar_writer::open(&mut w) + .await + .map_err(RenderError::NARWriterError)?; + + walk_node( + nar_root_node, + &proto_root_node, + blob_service, + directory_service, + ) + .await?; + + Ok(()) } /// Process an intermediate node in the structure. /// This consumes the node. -fn walk_node( - tokio_handle: tokio::runtime::Handle, - nar_node: nar::writer::Node, +#[async_recursion] +async fn walk_node( + nar_node: nar_writer::Node<'async_recursion, '_>, proto_node: &castorepb::node::Node, blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, @@ -78,6 +106,7 @@ fn walk_node( castorepb::node::Node::Symlink(proto_symlink_node) => { nar_node .symlink(&proto_symlink_node.target) + .await .map_err(RenderError::NARWriterError)?; } castorepb::node::Node::File(proto_file_node) => { @@ -92,8 +121,9 @@ fn walk_node( )) })?; - let blob_reader = match tokio_handle - .block_on(async { blob_service.open_read(&digest).await }) + let blob_reader = match blob_service + .open_read(&digest) + .await .map_err(RenderError::StoreError)? { Some(blob_reader) => Ok(BufReader::new(blob_reader)), @@ -107,8 +137,9 @@ fn walk_node( .file( proto_file_node.executable, proto_file_node.size.into(), - &mut tokio_util::io::SyncIoBridge::new(blob_reader), + &mut blob_reader.compat(), ) + .await .map_err(RenderError::NARWriterError)?; } castorepb::node::Node::Directory(proto_directory_node) => { @@ -123,8 +154,9 @@ fn walk_node( })?; // look it up with the directory service - match tokio_handle - .block_on(async { directory_service.get(&digest).await }) + match directory_service + .get(&digest) + .await .map_err(RenderError::StoreError)? { // if it's None, that's an error! @@ -136,27 +168,31 @@ fn walk_node( } Some(proto_directory) => { // start a directory node - let mut nar_node_directory = - nar_node.directory().map_err(RenderError::NARWriterError)?; + let mut nar_node_directory = nar_node + .directory() + .await + .map_err(RenderError::NARWriterError)?; // for each node in the directory, create a new entry with its name, // and then invoke walk_node on that entry. for proto_node in proto_directory.nodes() { let child_node = nar_node_directory .entry(proto_node.get_name()) + .await .map_err(RenderError::NARWriterError)?; walk_node( - tokio_handle.clone(), child_node, &proto_node, blob_service.clone(), directory_service.clone(), - )?; + ) + .await?; } // close the directory nar_node_directory .close() + .await .map_err(RenderError::NARWriterError)?; } } diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 485d7d115ff5..f555099877dc 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -4,16 +4,17 @@ use crate::tests::fixtures::*; use crate::tests::utils::*; use sha2::{Digest, Sha256}; use std::io; +use tokio::io::sink; use tvix_castore::proto::DirectoryNode; use tvix_castore::proto::FileNode; use tvix_castore::proto::{self as castorepb, SymlinkNode}; #[tokio::test] async fn single_symlink() { - let buf: Vec<u8> = vec![]; + let mut buf: Vec<u8> = vec![]; - let buf = write_nar( - buf, + write_nar( + &mut buf, &castorepb::node::Node::Symlink(SymlinkNode { name: "doesntmatter".into(), target: "/nix/store/somewhereelse".into(), @@ -31,10 +32,8 @@ async fn single_symlink() { /// Make sure the NARRenderer fails if a referred blob doesn't exist. #[tokio::test] async fn single_file_missing_blob() { - let buf: Vec<u8> = vec![]; - let e = write_nar( - buf, + sink(), &castorepb::node::Node::File(FileNode { name: "doesntmatter".into(), digest: HELLOWORLD_BLOB_DIGEST.clone().into(), @@ -78,10 +77,8 @@ async fn single_file_wrong_blob_size() { let bs = blob_service.clone(); // Test with a root FileNode of a too big size { - let buf: Vec<u8> = vec![]; - let e = write_nar( - buf, + sink(), &castorepb::node::Node::File(FileNode { name: "doesntmatter".into(), digest: HELLOWORLD_BLOB_DIGEST.clone().into(), @@ -105,10 +102,8 @@ async fn single_file_wrong_blob_size() { let bs = blob_service.clone(); // Test with a root FileNode of a too small size { - let buf: Vec<u8> = vec![]; - let e = write_nar( - buf, + sink(), &castorepb::node::Node::File(FileNode { name: "doesntmatter".into(), digest: HELLOWORLD_BLOB_DIGEST.clone().into(), @@ -148,10 +143,10 @@ async fn single_file() { writer.close().await.unwrap() ); - let buf: Vec<u8> = vec![]; + let mut buf: Vec<u8> = vec![]; - let buf = write_nar( - buf, + write_nar( + &mut buf, &castorepb::node::Node::File(FileNode { name: "doesntmatter".into(), digest: HELLOWORLD_BLOB_DIGEST.clone().into(), @@ -192,13 +187,13 @@ async fn test_complicated() { .await .unwrap(); - let buf: Vec<u8> = vec![]; + let mut buf: Vec<u8> = vec![]; let bs = blob_service.clone(); let ds = directory_service.clone(); - let buf = write_nar( - buf, + write_nar( + &mut buf, &castorepb::node::Node::Directory(DirectoryNode { name: "doesntmatter".into(), digest: DIRECTORY_COMPLICATED.digest().into(), |