From 45cf7ae657086993cedaa7c72b813e319e805484 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Wed, 10 Apr 2024 16:33:02 +0300 Subject: refactor(tvix/nix-compat): move nar writer to tokio There's little reason to keep the nar writer using Async{Read,Write} traits from futures, while everything else async in tvix (and nix-compat) uses tokio. Change-Id: I8cd1efcd0dd5bb76471de997603c7b701a5095de Reviewed-on: https://cl.tvl.fyi/c/depot/+/11391 Tested-by: BuildkiteCI Reviewed-by: raitobezarius Reviewed-by: Brian Olsen --- tvix/Cargo.lock | 1 - tvix/Cargo.nix | 11 +--- tvix/nix-compat/Cargo.toml | 3 +- tvix/nix-compat/src/nar/writer/async.rs | 4 +- tvix/nix-compat/src/nar/writer/test.rs | 103 +++++++++++++++----------------- tvix/store/src/nar/renderer.rs | 8 +-- 6 files changed, 57 insertions(+), 73 deletions(-) diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 80b70579b4..1fff246e86 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -2187,7 +2187,6 @@ dependencies = [ "ed25519-dalek", "enum-primitive-derive", "futures", - "futures-util", "glob", "hex-literal", "lazy_static", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 216dd2c6e1..f4bc00bb3c 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -6717,12 +6717,6 @@ rec { name = "enum-primitive-derive"; packageId = "enum-primitive-derive"; } - { - name = "futures-util"; - packageId = "futures-util"; - optional = true; - features = [ "io" ]; - } { name = "glob"; packageId = "glob"; @@ -6814,13 +6808,12 @@ rec { } ]; features = { - "async" = [ "futures-util" ]; - "futures-util" = [ "dep:futures-util" ]; + "async" = [ "tokio" ]; "pin-project-lite" = [ "dep:pin-project-lite" ]; "tokio" = [ "dep:tokio" ]; "wire" = [ "tokio" "pin-project-lite" ]; }; - resolvedDefaultFeatures = [ "async" "futures-util" "pin-project-lite" "tokio" "wire" ]; + resolvedDefaultFeatures = [ "async" "pin-project-lite" "tokio" "wire" ]; }; "nom" = rec { crateName = "nom"; diff --git a/tvix/nix-compat/Cargo.toml b/tvix/nix-compat/Cargo.toml index 8056409428..674734a0e9 100644 --- a/tvix/nix-compat/Cargo.toml +++ b/tvix/nix-compat/Cargo.toml @@ -5,7 +5,7 @@ edition = "2021" [features] # async NAR writer -async = ["futures-util"] +async = ["tokio"] # code emitting low-level packets used in the daemon protocol. wire = ["tokio", "pin-project-lite"] @@ -16,7 +16,6 @@ data-encoding = "2.3.3" ed25519 = "2.2.3" ed25519-dalek = "2.1.0" enum-primitive-derive = "0.3.0" -futures-util = { version = "0.3.30", features = ["io"], optional = true } glob = "0.3.0" nom = "7.1.3" num-traits = "0.2.18" diff --git a/tvix/nix-compat/src/nar/writer/async.rs b/tvix/nix-compat/src/nar/writer/async.rs index 11aefab9cb..a2ce68fc3c 100644 --- a/tvix/nix-compat/src/nar/writer/async.rs +++ b/tvix/nix-compat/src/nar/writer/async.rs @@ -10,7 +10,7 @@ //! //! ```rust //! # futures::executor::block_on(async { -//! # use futures::io::BufReader; +//! # use tokio::io::BufReader; //! # let some_file: Vec = vec![0, 1, 2, 3, 4]; //! //! // Output location to write the NAR to. @@ -31,7 +31,6 @@ //! ``` use crate::nar::wire; -use futures_util::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt}; use std::{ io::{ self, @@ -39,6 +38,7 @@ use std::{ }, pin::Pin, }; +use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt}; /// Convenience type alias for types implementing [`AsyncWrite`]. pub type Writer<'a> = dyn AsyncWrite + Unpin + Send + 'a; diff --git a/tvix/nix-compat/src/nar/writer/test.rs b/tvix/nix-compat/src/nar/writer/test.rs index 7b1dd1a2a9..d7f18a49af 100644 --- a/tvix/nix-compat/src/nar/writer/test.rs +++ b/tvix/nix-compat/src/nar/writer/test.rs @@ -11,16 +11,14 @@ fn symlink() { } #[cfg(feature = "async")] -#[test] -fn symlink_async() { +#[tokio::test] +async fn symlink_async() { let mut buf = vec![]; - futures::executor::block_on(async { - let node = nar::writer::r#async::open(&mut buf).await.unwrap(); - node.symlink("/nix/store/somewhereelse".as_bytes()) - .await - .unwrap(); - }); + let node = nar::writer::r#async::open(&mut buf).await.unwrap(); + node.symlink("/nix/store/somewhereelse".as_bytes()) + .await + .unwrap(); assert_eq!(include_bytes!("../tests/symlink.nar"), buf.as_slice()); } @@ -42,22 +40,22 @@ fn file() { } #[cfg(feature = "async")] -#[test] -fn file_async() { +#[tokio::test] +async fn file_async() { + use std::io::Cursor; + let mut buf = vec![]; - futures::executor::block_on(async { - let node = nar::writer::r#async::open(&mut buf).await.unwrap(); + let node = nar::writer::r#async::open(&mut buf).await.unwrap(); - let file_contents = "Hello World!".to_string(); - node.file( - false, - file_contents.len() as u64, - &mut futures::io::Cursor::new(file_contents), - ) - .await - .unwrap(); - }); + let file_contents = "Hello World!".to_string(); + node.file( + false, + file_contents.len() as u64, + &mut Cursor::new(file_contents), + ) + .await + .unwrap(); assert_eq!(include_bytes!("../tests/helloworld.nar"), buf.as_slice()); } @@ -93,41 +91,38 @@ fn complicated() { } #[cfg(feature = "async")] -#[test] -fn complicated_async() { +#[tokio::test] +async fn complicated_async() { + use std::io::Cursor; + let mut buf = vec![]; - futures::executor::block_on(async { - let node = nar::writer::r#async::open(&mut buf).await.unwrap(); - - let mut dir_node = node.directory().await.unwrap(); - - let e = dir_node.entry(".keep".as_bytes()).await.unwrap(); - e.file(false, 0, &mut futures::io::Cursor::new([])) - .await - .expect("read .keep must succeed"); - - let e = dir_node.entry("aa".as_bytes()).await.unwrap(); - e.symlink("/nix/store/somewhereelse".as_bytes()) - .await - .expect("symlink must succeed"); - - let e = dir_node.entry("keep".as_bytes()).await.unwrap(); - let mut subdir_node = e.directory().await.expect("directory must succeed"); - - let e_sub = subdir_node - .entry(".keep".as_bytes()) - .await - .expect("subdir entry must succeed"); - e_sub - .file(false, 0, &mut futures::io::Cursor::new([])) - .await - .unwrap(); - - // close the subdir, and then the dir, which is required. - subdir_node.close().await.unwrap(); - dir_node.close().await.unwrap(); - }); + let node = nar::writer::r#async::open(&mut buf).await.unwrap(); + + let mut dir_node = node.directory().await.unwrap(); + + let e = dir_node.entry(".keep".as_bytes()).await.unwrap(); + e.file(false, 0, &mut Cursor::new([])) + .await + .expect("read .keep must succeed"); + + let e = dir_node.entry("aa".as_bytes()).await.unwrap(); + e.symlink("/nix/store/somewhereelse".as_bytes()) + .await + .expect("symlink must succeed"); + + let e = dir_node.entry("keep".as_bytes()).await.unwrap(); + let mut subdir_node = e.directory().await.expect("directory must succeed"); + + let e_sub = subdir_node + .entry(".keep".as_bytes()) + .await + .expect("subdir entry must succeed"); + e_sub.file(false, 0, &mut Cursor::new([])).await.unwrap(); + + // close the subdir, and then the dir, which is required. + subdir_node.close().await.unwrap(); + dir_node.close().await.unwrap(); assert_eq!(include_bytes!("../tests/complicated.nar"), buf.as_slice()); } diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index 36d184f3b6..0816b8e973 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -6,7 +6,6 @@ use count_write::CountWrite; use nix_compat::nar::writer::r#async as nar_writer; use sha2::{Digest, Sha256}; use tokio::io::{self, AsyncWrite, BufReader}; -use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, @@ -45,7 +44,7 @@ where /// necessary lookups as it traverses the structure. /// The contents in NAR serialization are writen to the passed [AsyncWrite]. pub async fn write_nar( - w: W, + mut w: W, proto_root_node: &castorepb::node::Node, blob_service: BS, directory_service: DS, @@ -56,7 +55,6 @@ where DS: DirectoryService + Send, { // Initialize NAR writer - let mut w = w.compat_write(); let nar_root_node = nar_writer::open(&mut w) .await .map_err(RenderError::NARWriterError)?; @@ -101,7 +99,7 @@ where )) })?; - let blob_reader = match blob_service + let mut blob_reader = match blob_service .open_read(&digest) .await .map_err(RenderError::StoreError)? @@ -117,7 +115,7 @@ where .file( proto_file_node.executable, proto_file_node.size, - &mut blob_reader.compat(), + &mut blob_reader, ) .await .map_err(RenderError::NARWriterError)?; -- cgit 1.4.1