about summary refs log tree commit diff
diff options
context:
space:
mode:
authorVova Kryachko <v.kryachko@gmail.com>2024-11-12T04·15-0500
committerVladimir Kryachko <v.kryachko@gmail.com>2024-11-13T21·21+0000
commitfa9c067dc9b631363659b55053f5f6d4428eb8a4 (patch)
tree16b36ee54159658b33943296a5d1fda147c65635
parent6aada9106209d431407c3a45f466ef59b3cff504 (diff)
feat(nix-daemon): Implement more nix daemon operations. r/8917
In particular QueryPathFromHashPart, QueryValidPaths, QueryValidDerivers

Change-Id: Ie6ad83cec5ce9580044b85e201e4e23394f87075
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12762
Tested-by: BuildkiteCI
Reviewed-by: edef <edef@edef.eu>
Reviewed-by: flokli <flokli@flokli.de>
-rw-r--r--tvix/Cargo.nix10
-rw-r--r--tvix/nix-compat/Cargo.toml3
-rw-r--r--tvix/nix-compat/src/nix_daemon/handler.rs31
-rw-r--r--tvix/nix-compat/src/nix_daemon/mod.rs197
-rw-r--r--tvix/nix-compat/src/nix_daemon/types.rs14
-rw-r--r--tvix/nix-daemon/src/lib.rs21
-rw-r--r--users/edef/crunch-v2/Cargo.lock1
-rw-r--r--users/edef/crunch-v2/Cargo.nix14
-rw-r--r--users/edef/fetchroots/Cargo.lock1
-rw-r--r--users/edef/fetchroots/Cargo.nix14
-rw-r--r--users/edef/narinfo2parquet/Cargo.lock37
-rw-r--r--users/edef/narinfo2parquet/Cargo.nix50
-rw-r--r--users/edef/weave/Cargo.lock1
-rw-r--r--users/edef/weave/Cargo.nix14
-rw-r--r--users/picnoir/tvix-daemon/Cargo.lock38
-rw-r--r--users/picnoir/tvix-daemon/Cargo.nix152
16 files changed, 538 insertions, 60 deletions
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 5a6628af6dd5..857138412736 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -7223,6 +7223,11 @@ rec {
             packageId = "enum-primitive-derive";
           }
           {
+            name = "futures";
+            packageId = "futures";
+            optional = true;
+          }
+          {
             name = "glob";
             packageId = "glob";
           }
@@ -7332,14 +7337,15 @@ rec {
         features = {
           "async" = [ "tokio" ];
           "bytes" = [ "dep:bytes" ];
-          "daemon" = [ "tokio" "nix-compat-derive" ];
+          "daemon" = [ "tokio" "nix-compat-derive" "futures" ];
           "default" = [ "async" "daemon" "wire" "nix-compat-derive" ];
+          "futures" = [ "dep:futures" ];
           "nix-compat-derive" = [ "dep:nix-compat-derive" ];
           "pin-project-lite" = [ "dep:pin-project-lite" ];
           "tokio" = [ "dep:tokio" ];
           "wire" = [ "tokio" "pin-project-lite" "bytes" ];
         };
-        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "nix-compat-derive" "pin-project-lite" "test" "tokio" "wire" ];
+        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "futures" "nix-compat-derive" "pin-project-lite" "test" "tokio" "wire" ];
       };
       "nix-compat-derive" = rec {
         crateName = "nix-compat-derive";
diff --git a/tvix/nix-compat/Cargo.toml b/tvix/nix-compat/Cargo.toml
index e9b44ddb7adf..160eb2c20c16 100644
--- a/tvix/nix-compat/Cargo.toml
+++ b/tvix/nix-compat/Cargo.toml
@@ -10,7 +10,7 @@ async = ["tokio"]
 wire = ["tokio", "pin-project-lite", "bytes"]
 
 # nix-daemon protocol handling
-daemon = ["tokio", "nix-compat-derive"]
+daemon = ["tokio", "nix-compat-derive", "futures"]
 test = []
 
 # Enable all features by default.
@@ -23,6 +23,7 @@ data-encoding = { workspace = true }
 ed25519 = { workspace = true }
 ed25519-dalek = { workspace = true }
 enum-primitive-derive = { workspace = true }
+futures = { workspace = true, optional = true }
 glob = { workspace = true }
 mimalloc = { workspace = true }
 nom = { workspace = true }
diff --git a/tvix/nix-compat/src/nix_daemon/handler.rs b/tvix/nix-compat/src/nix_daemon/handler.rs
index 7ac281ec2ceb..a61a4810c807 100644
--- a/tvix/nix-compat/src/nix_daemon/handler.rs
+++ b/tvix/nix-compat/src/nix_daemon/handler.rs
@@ -1,5 +1,6 @@
 use std::{future::Future, sync::Arc};
 
+use bytes::Bytes;
 use tokio::{
     io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf},
     sync::Mutex,
@@ -7,6 +8,7 @@ use tokio::{
 use tracing::debug;
 
 use super::{
+    types::QueryValidPaths,
     worker_protocol::{server_handshake_client, ClientSettings, Operation, Trust, STDERR_LAST},
     NixDaemonIO,
 };
@@ -114,7 +116,16 @@ where
         loop {
             let op_code = self.reader.read_number().await?;
             match TryInto::<Operation>::try_into(op_code) {
+                // Note: please keep operations sorted in ascending order of their numerical op number.
                 Ok(operation) => match operation {
+                    Operation::IsValidPath => {
+                        let path: StorePath<String> = self.reader.read_value().await?;
+                        self.handle(io.is_valid_path(&path)).await?
+                    }
+                    // Note this operation does not currently delegate to NixDaemonIO,
+                    // The general idea is that we will pass relevant ClientSettings
+                    // into individual NixDaemonIO method calls if the need arises.
+                    // For now we just store the settings in the NixDaemon for future use.
                     Operation::SetOptions => {
                         self.client_settings = self.reader.read_value().await?;
                         self.handle(async { Ok(()) }).await?
@@ -123,10 +134,17 @@ where
                         let path: StorePath<String> = self.reader.read_value().await?;
                         self.handle(io.query_path_info(&path)).await?
                     }
-                    Operation::IsValidPath => {
+                    Operation::QueryPathFromHashPart => {
+                        let hash: Bytes = self.reader.read_value().await?;
+                        self.handle(io.query_path_from_hash_part(&hash)).await?
+                    }
+                    Operation::QueryValidPaths => {
+                        let query: QueryValidPaths = self.reader.read_value().await?;
+                        self.handle(io.query_valid_paths(&query)).await?
+                    }
+                    Operation::QueryValidDerivers => {
                         let path: StorePath<String> = self.reader.read_value().await?;
-                        self.handle(async { Ok(io.query_path_info(&path).await?.is_some()) })
-                            .await?
+                        self.handle(io.query_valid_derivers(&path)).await?
                     }
                     _ => {
                         return Err(std::io::Error::other(format!(
@@ -202,6 +220,13 @@ mod tests {
         ) -> Result<Option<UnkeyedValidPathInfo>> {
             Ok(None)
         }
+
+        async fn query_path_from_hash_part(
+            &self,
+            _hash: &[u8],
+        ) -> Result<Option<UnkeyedValidPathInfo>> {
+            Ok(None)
+        }
     }
 
     #[tokio::test]
diff --git a/tvix/nix-compat/src/nix_daemon/mod.rs b/tvix/nix-compat/src/nix_daemon/mod.rs
index d6e60aa9a4dc..e475263d2302 100644
--- a/tvix/nix-compat/src/nix_daemon/mod.rs
+++ b/tvix/nix-compat/src/nix_daemon/mod.rs
@@ -2,7 +2,9 @@ pub mod worker_protocol;
 
 use std::io::Result;
 
-use types::UnkeyedValidPathInfo;
+use futures::future::try_join_all;
+use tracing::warn;
+use types::{QueryValidPaths, UnkeyedValidPathInfo};
 
 use crate::store_path::StorePath;
 
@@ -10,9 +12,200 @@ pub mod handler;
 pub mod types;
 
 /// Represents all possible operations over the nix-daemon protocol.
-pub trait NixDaemonIO {
+pub trait NixDaemonIO: Sync {
+    fn is_valid_path(
+        &self,
+        path: &StorePath<String>,
+    ) -> impl std::future::Future<Output = Result<bool>> + Send {
+        async move { Ok(self.query_path_info(path).await?.is_some()) }
+    }
+
     fn query_path_info(
         &self,
         path: &StorePath<String>,
     ) -> impl std::future::Future<Output = Result<Option<UnkeyedValidPathInfo>>> + Send;
+
+    fn query_path_from_hash_part(
+        &self,
+        hash: &[u8],
+    ) -> impl std::future::Future<Output = Result<Option<UnkeyedValidPathInfo>>> + Send;
+
+    fn query_valid_paths(
+        &self,
+        request: &QueryValidPaths,
+    ) -> impl std::future::Future<Output = Result<Vec<UnkeyedValidPathInfo>>> + Send {
+        async move {
+            if request.substitute {
+                warn!("tvix does not yet support substitution, ignoring the 'substitute' flag...");
+            }
+            // Using try_join_all here to avoid returning partial results to the client.
+            // The only reason query_path_info can fail is due to transient IO errors,
+            // so we return such errors to the client as opposed to only returning paths
+            // that succeeded.
+            let result =
+                try_join_all(request.paths.iter().map(|path| self.query_path_info(path))).await?;
+            let result: Vec<UnkeyedValidPathInfo> = result.into_iter().flatten().collect();
+            Ok(result)
+        }
+    }
+
+    fn query_valid_derivers(
+        &self,
+        path: &StorePath<String>,
+    ) -> impl std::future::Future<Output = Result<Vec<StorePath<String>>>> + Send {
+        async move {
+            let result = self.query_path_info(path).await?;
+            let result: Vec<_> = result.into_iter().filter_map(|info| info.deriver).collect();
+            Ok(result)
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+
+    use crate::{nix_daemon::types::QueryValidPaths, store_path::StorePath};
+
+    use super::{types::UnkeyedValidPathInfo, NixDaemonIO};
+
+    // Very simple mock
+    // Unable to use mockall as it does not support unboxed async traits.
+    pub struct MockNixDaemonIO {
+        query_path_info_result: Option<UnkeyedValidPathInfo>,
+    }
+
+    impl NixDaemonIO for MockNixDaemonIO {
+        async fn query_path_info(
+            &self,
+            _path: &StorePath<String>,
+        ) -> std::io::Result<Option<UnkeyedValidPathInfo>> {
+            Ok(self.query_path_info_result.clone())
+        }
+
+        async fn query_path_from_hash_part(
+            &self,
+            _hash: &[u8],
+        ) -> std::io::Result<Option<UnkeyedValidPathInfo>> {
+            Ok(None)
+        }
+    }
+
+    #[tokio::test]
+    async fn test_is_valid_path_returns_true() {
+        let path =
+            StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
+                .unwrap();
+        let io = MockNixDaemonIO {
+            query_path_info_result: Some(UnkeyedValidPathInfo::default()),
+        };
+
+        let result = io
+            .is_valid_path(&path)
+            .await
+            .expect("expected to get a non-empty response");
+        assert!(result, "expected to get true");
+    }
+
+    #[tokio::test]
+    async fn test_is_valid_path_returns_false() {
+        let path =
+            StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
+                .unwrap();
+        let io = MockNixDaemonIO {
+            query_path_info_result: None,
+        };
+
+        let result = io
+            .is_valid_path(&path)
+            .await
+            .expect("expected to get a non-empty response");
+        assert!(!result, "expected to get false");
+    }
+
+    #[tokio::test]
+    async fn test_query_valid_paths_returns_empty_response() {
+        let path =
+            StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
+                .unwrap();
+        let io = MockNixDaemonIO {
+            query_path_info_result: None,
+        };
+
+        let result = io
+            .query_valid_paths(&QueryValidPaths {
+                paths: vec![path],
+                substitute: false,
+            })
+            .await
+            .expect("expected to get a non-empty response");
+        assert_eq!(result, vec![], "expected to get empty response");
+    }
+
+    #[tokio::test]
+    async fn test_query_valid_paths_returns_non_empty_response() {
+        let path =
+            StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
+                .unwrap();
+        let io = MockNixDaemonIO {
+            query_path_info_result: Some(UnkeyedValidPathInfo::default()),
+        };
+
+        let result = io
+            .query_valid_paths(&QueryValidPaths {
+                paths: vec![path],
+                substitute: false,
+            })
+            .await
+            .expect("expected to get a non-empty response");
+        assert_eq!(
+            result,
+            vec![UnkeyedValidPathInfo::default()],
+            "expected to get non empty response"
+        );
+    }
+
+    #[tokio::test]
+    async fn test_query_valid_derivers_returns_empty_response() {
+        let path =
+            StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
+                .unwrap();
+        let io = MockNixDaemonIO {
+            query_path_info_result: None,
+        };
+
+        let result = io
+            .query_valid_derivers(&path)
+            .await
+            .expect("expected to get a non-empty response");
+        assert_eq!(result, vec![], "expected to get empty response");
+    }
+
+    #[tokio::test]
+    async fn test_query_valid_derivers_returns_non_empty_response() {
+        let path =
+            StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes())
+                .unwrap();
+        let deriver = StorePath::<String>::from_bytes(
+            "z6r3bn5l51679pwkvh9nalp6c317z34m-hello.drv".as_bytes(),
+        )
+        .unwrap();
+        let io = MockNixDaemonIO {
+            query_path_info_result: Some(UnkeyedValidPathInfo {
+                deriver: Some(deriver.clone()),
+                nar_hash: "".to_owned(),
+                references: vec![],
+                registration_time: 0,
+                nar_size: 1,
+                ultimate: true,
+                signatures: vec![],
+                ca: None,
+            }),
+        };
+
+        let result = io
+            .query_valid_derivers(&path)
+            .await
+            .expect("expected to get a non-empty response");
+        assert_eq!(result, vec![deriver], "expected to get non empty response");
+    }
 }
diff --git a/tvix/nix-compat/src/nix_daemon/types.rs b/tvix/nix-compat/src/nix_daemon/types.rs
index db7d26321e3e..bf7b1e6f6e58 100644
--- a/tvix/nix-compat/src/nix_daemon/types.rs
+++ b/tvix/nix-compat/src/nix_daemon/types.rs
@@ -152,7 +152,7 @@ impl NixSerialize for Option<StorePath<String>> {
     }
 }
 
-#[derive(NixSerialize, Debug)]
+#[derive(NixSerialize, Debug, Clone, Default, PartialEq)]
 pub struct UnkeyedValidPathInfo {
     pub deriver: Option<StorePath<String>>,
     pub nar_hash: String,
@@ -164,5 +164,13 @@ pub struct UnkeyedValidPathInfo {
     pub ca: Option<CAHash>,
 }
 
-#[cfg(test)]
-mod tests {}
+/// Request tupe for [super::worker_protocol::Operation::QueryValidPaths]
+#[derive(NixDeserialize)]
+pub struct QueryValidPaths {
+    // Paths to query
+    pub paths: Vec<StorePath<String>>,
+
+    // Whether to try and substitute the paths.
+    #[nix(version = "27..")]
+    pub substitute: bool,
+}
diff --git a/tvix/nix-daemon/src/lib.rs b/tvix/nix-daemon/src/lib.rs
index 89bfbf9b3dc0..e508d0750c9b 100644
--- a/tvix/nix-daemon/src/lib.rs
+++ b/tvix/nix-daemon/src/lib.rs
@@ -1,4 +1,7 @@
-use std::{io::Result, sync::Arc};
+use std::{
+    io::{Error, Result},
+    sync::Arc,
+};
 
 use nix_compat::{
     nix_daemon::{types::UnkeyedValidPathInfo, NixDaemonIO},
@@ -25,6 +28,22 @@ impl NixDaemonIO for TvixDaemon {
         path: &StorePath<String>,
     ) -> Result<Option<UnkeyedValidPathInfo>> {
         match self.path_info_service.get(*path.digest()).await? {
+            Some(path_info) => {
+                if path_info.store_path.name() == path.name() {
+                    Ok(Some(into_unkeyed_path_info(path_info)))
+                } else {
+                    Ok(None)
+                }
+            }
+            None => Ok(None),
+        }
+    }
+
+    async fn query_path_from_hash_part(&self, hash: &[u8]) -> Result<Option<UnkeyedValidPathInfo>> {
+        let digest = hash
+            .try_into()
+            .map_err(|_| Error::other("invalid digest length"))?;
+        match self.path_info_service.get(digest).await? {
             Some(path_info) => Ok(Some(into_unkeyed_path_info(path_info))),
             None => Ok(None),
         }
diff --git a/users/edef/crunch-v2/Cargo.lock b/users/edef/crunch-v2/Cargo.lock
index 5a167c6d79f9..9d36769b2e6f 100644
--- a/users/edef/crunch-v2/Cargo.lock
+++ b/users/edef/crunch-v2/Cargo.lock
@@ -1425,6 +1425,7 @@ dependencies = [
  "ed25519",
  "ed25519-dalek",
  "enum-primitive-derive",
+ "futures",
  "glob",
  "mimalloc",
  "nix-compat-derive",
diff --git a/users/edef/crunch-v2/Cargo.nix b/users/edef/crunch-v2/Cargo.nix
index 502d2ea497cd..c27e4aa2f79b 100644
--- a/users/edef/crunch-v2/Cargo.nix
+++ b/users/edef/crunch-v2/Cargo.nix
@@ -4121,6 +4121,11 @@ rec {
             packageId = "enum-primitive-derive";
           }
           {
+            name = "futures";
+            packageId = "futures";
+            optional = true;
+          }
+          {
             name = "glob";
             packageId = "glob";
           }
@@ -4180,6 +4185,10 @@ rec {
         ];
         devDependencies = [
           {
+            name = "futures";
+            packageId = "futures";
+          }
+          {
             name = "mimalloc";
             packageId = "mimalloc";
           }
@@ -4191,14 +4200,15 @@ rec {
         features = {
           "async" = [ "tokio" ];
           "bytes" = [ "dep:bytes" ];
-          "daemon" = [ "tokio" "nix-compat-derive" ];
+          "daemon" = [ "tokio" "nix-compat-derive" "futures" ];
           "default" = [ "async" "daemon" "wire" "nix-compat-derive" ];
+          "futures" = [ "dep:futures" ];
           "nix-compat-derive" = [ "dep:nix-compat-derive" ];
           "pin-project-lite" = [ "dep:pin-project-lite" ];
           "tokio" = [ "dep:tokio" ];
           "wire" = [ "tokio" "pin-project-lite" "bytes" ];
         };
-        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "nix-compat-derive" "pin-project-lite" "tokio" "wire" ];
+        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "futures" "nix-compat-derive" "pin-project-lite" "tokio" "wire" ];
       };
       "nix-compat-derive" = rec {
         crateName = "nix-compat-derive";
diff --git a/users/edef/fetchroots/Cargo.lock b/users/edef/fetchroots/Cargo.lock
index 3f6b463d8ec0..cb3906594528 100644
--- a/users/edef/fetchroots/Cargo.lock
+++ b/users/edef/fetchroots/Cargo.lock
@@ -1694,6 +1694,7 @@ dependencies = [
  "ed25519",
  "ed25519-dalek",
  "enum-primitive-derive",
+ "futures",
  "glob",
  "mimalloc",
  "nix-compat-derive",
diff --git a/users/edef/fetchroots/Cargo.nix b/users/edef/fetchroots/Cargo.nix
index 8049acaadc71..825b12ebdfae 100644
--- a/users/edef/fetchroots/Cargo.nix
+++ b/users/edef/fetchroots/Cargo.nix
@@ -5523,6 +5523,11 @@ rec {
             packageId = "enum-primitive-derive";
           }
           {
+            name = "futures";
+            packageId = "futures";
+            optional = true;
+          }
+          {
             name = "glob";
             packageId = "glob";
           }
@@ -5582,6 +5587,10 @@ rec {
         ];
         devDependencies = [
           {
+            name = "futures";
+            packageId = "futures";
+          }
+          {
             name = "mimalloc";
             packageId = "mimalloc";
           }
@@ -5593,14 +5602,15 @@ rec {
         features = {
           "async" = [ "tokio" ];
           "bytes" = [ "dep:bytes" ];
-          "daemon" = [ "tokio" "nix-compat-derive" ];
+          "daemon" = [ "tokio" "nix-compat-derive" "futures" ];
           "default" = [ "async" "daemon" "wire" "nix-compat-derive" ];
+          "futures" = [ "dep:futures" ];
           "nix-compat-derive" = [ "dep:nix-compat-derive" ];
           "pin-project-lite" = [ "dep:pin-project-lite" ];
           "tokio" = [ "dep:tokio" ];
           "wire" = [ "tokio" "pin-project-lite" "bytes" ];
         };
-        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "nix-compat-derive" "pin-project-lite" "tokio" "wire" ];
+        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "futures" "nix-compat-derive" "pin-project-lite" "tokio" "wire" ];
       };
       "nix-compat-derive" = rec {
         crateName = "nix-compat-derive";
diff --git a/users/edef/narinfo2parquet/Cargo.lock b/users/edef/narinfo2parquet/Cargo.lock
index 7ebfe1350278..72825b414be4 100644
--- a/users/edef/narinfo2parquet/Cargo.lock
+++ b/users/edef/narinfo2parquet/Cargo.lock
@@ -572,9 +572,9 @@ checksum = "ee1b05cbd864bcaecbd3455d6d967862d446e4ebfc3c2e5e5b9841e53cba6673"
 
 [[package]]
 name = "futures"
-version = "0.3.29"
+version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335"
+checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
 dependencies = [
  "futures-channel",
  "futures-core",
@@ -587,9 +587,9 @@ dependencies = [
 
 [[package]]
 name = "futures-channel"
-version = "0.3.29"
+version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb"
+checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
 dependencies = [
  "futures-core",
  "futures-sink",
@@ -597,15 +597,15 @@ dependencies = [
 
 [[package]]
 name = "futures-core"
-version = "0.3.29"
+version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c"
+checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
 
 [[package]]
 name = "futures-executor"
-version = "0.3.29"
+version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc"
+checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
 dependencies = [
  "futures-core",
  "futures-task",
@@ -614,15 +614,15 @@ dependencies = [
 
 [[package]]
 name = "futures-io"
-version = "0.3.29"
+version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa"
+checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
 
 [[package]]
 name = "futures-macro"
-version = "0.3.29"
+version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb"
+checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -631,21 +631,21 @@ dependencies = [
 
 [[package]]
 name = "futures-sink"
-version = "0.3.29"
+version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817"
+checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
 
 [[package]]
 name = "futures-task"
-version = "0.3.29"
+version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2"
+checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988"
 
 [[package]]
 name = "futures-util"
-version = "0.3.29"
+version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104"
+checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
 dependencies = [
  "futures-channel",
  "futures-core",
@@ -969,6 +969,7 @@ dependencies = [
  "ed25519",
  "ed25519-dalek",
  "enum-primitive-derive",
+ "futures",
  "glob",
  "mimalloc",
  "nix-compat-derive",
diff --git a/users/edef/narinfo2parquet/Cargo.nix b/users/edef/narinfo2parquet/Cargo.nix
index 05ce99a9029c..3448aa99f786 100644
--- a/users/edef/narinfo2parquet/Cargo.nix
+++ b/users/edef/narinfo2parquet/Cargo.nix
@@ -1682,9 +1682,9 @@ rec {
       };
       "futures" = rec {
         crateName = "futures";
-        version = "0.3.29";
+        version = "0.3.31";
         edition = "2018";
-        sha256 = "0dak2ilpcmyjrb1j54fzy9hlw6vd10vqljq9gd59pbrq9dqr00ns";
+        sha256 = "0xh8ddbkm9jy8kc5gbvjp9a4b6rqqxvc8471yb2qaz5wm2qhgg35";
         dependencies = [
           {
             name = "futures-channel";
@@ -1743,9 +1743,9 @@ rec {
       };
       "futures-channel" = rec {
         crateName = "futures-channel";
-        version = "0.3.29";
+        version = "0.3.31";
         edition = "2018";
-        sha256 = "1jxsifvrbqzdadk0svbax71cba5d3qg3wgjq8i160mxmd1kdckgz";
+        sha256 = "040vpqpqlbk099razq8lyn74m0f161zd0rp36hciqrwcg2zibzrd";
         libName = "futures_channel";
         dependencies = [
           {
@@ -1771,9 +1771,9 @@ rec {
       };
       "futures-core" = rec {
         crateName = "futures-core";
-        version = "0.3.29";
+        version = "0.3.31";
         edition = "2018";
-        sha256 = "1308bpj0g36nhx2y6bl4mm6f1gnh9xyvvw2q2wpdgnb6dv3247gb";
+        sha256 = "0gk6yrxgi5ihfanm2y431jadrll00n5ifhnpx090c2f2q1cr1wh5";
         libName = "futures_core";
         features = {
           "default" = [ "std" ];
@@ -1784,9 +1784,9 @@ rec {
       };
       "futures-executor" = rec {
         crateName = "futures-executor";
-        version = "0.3.29";
+        version = "0.3.31";
         edition = "2018";
-        sha256 = "1g4pjni0sw28djx6mlcfz584abm2lpifz86cmng0kkxh7mlvhkqg";
+        sha256 = "17vcci6mdfzx4gbk0wx64chr2f13wwwpvyf3xd5fb1gmjzcx2a0y";
         libName = "futures_executor";
         dependencies = [
           {
@@ -1815,9 +1815,9 @@ rec {
       };
       "futures-io" = rec {
         crateName = "futures-io";
-        version = "0.3.29";
+        version = "0.3.31";
         edition = "2018";
-        sha256 = "1ajsljgny3zfxwahba9byjzclrgvm1ypakca8z854k2w7cb4mwwb";
+        sha256 = "1ikmw1yfbgvsychmsihdkwa8a1knank2d9a8dk01mbjar9w1np4y";
         libName = "futures_io";
         features = {
           "default" = [ "std" ];
@@ -1826,9 +1826,9 @@ rec {
       };
       "futures-macro" = rec {
         crateName = "futures-macro";
-        version = "0.3.29";
+        version = "0.3.31";
         edition = "2018";
-        sha256 = "1nwd18i8kvpkdfwm045hddjli0n96zi7pn6f99zi9c74j7ym7cak";
+        sha256 = "0l1n7kqzwwmgiznn0ywdc5i24z72zvh9q1dwps54mimppi7f6bhn";
         procMacro = true;
         libName = "futures_macro";
         dependencies = [
@@ -1850,9 +1850,9 @@ rec {
       };
       "futures-sink" = rec {
         crateName = "futures-sink";
-        version = "0.3.29";
+        version = "0.3.31";
         edition = "2018";
-        sha256 = "05q8jykqddxzp8nwf00wjk5m5mqi546d7i8hsxma7hiqxrw36vg3";
+        sha256 = "1xyly6naq6aqm52d5rh236snm08kw8zadydwqz8bip70s6vzlxg5";
         libName = "futures_sink";
         features = {
           "default" = [ "std" ];
@@ -1862,9 +1862,9 @@ rec {
       };
       "futures-task" = rec {
         crateName = "futures-task";
-        version = "0.3.29";
+        version = "0.3.31";
         edition = "2018";
-        sha256 = "1qmsss8rb5ppql4qvd4r70h9gpfcpd0bg2b3qilxrnhdkc397lgg";
+        sha256 = "124rv4n90f5xwfsm9qw6y99755y021cmi5dhzh253s920z77s3zr";
         libName = "futures_task";
         features = {
           "default" = [ "std" ];
@@ -1874,9 +1874,9 @@ rec {
       };
       "futures-util" = rec {
         crateName = "futures-util";
-        version = "0.3.29";
+        version = "0.3.31";
         edition = "2018";
-        sha256 = "0141rkqh0psj4h8x8lgsl1p29dhqr7z2wcixkcbs60z74kb2d5d1";
+        sha256 = "10aa1ar8bgkgbr4wzxlidkqkcxf77gffyj8j7768h831pcaq784z";
         libName = "futures_util";
         dependencies = [
           {
@@ -2847,6 +2847,11 @@ rec {
             packageId = "enum-primitive-derive";
           }
           {
+            name = "futures";
+            packageId = "futures";
+            optional = true;
+          }
+          {
             name = "glob";
             packageId = "glob";
           }
@@ -2906,6 +2911,10 @@ rec {
         ];
         devDependencies = [
           {
+            name = "futures";
+            packageId = "futures";
+          }
+          {
             name = "mimalloc";
             packageId = "mimalloc";
           }
@@ -2917,14 +2926,15 @@ rec {
         features = {
           "async" = [ "tokio" ];
           "bytes" = [ "dep:bytes" ];
-          "daemon" = [ "tokio" "nix-compat-derive" ];
+          "daemon" = [ "tokio" "nix-compat-derive" "futures" ];
           "default" = [ "async" "daemon" "wire" "nix-compat-derive" ];
+          "futures" = [ "dep:futures" ];
           "nix-compat-derive" = [ "dep:nix-compat-derive" ];
           "pin-project-lite" = [ "dep:pin-project-lite" ];
           "tokio" = [ "dep:tokio" ];
           "wire" = [ "tokio" "pin-project-lite" "bytes" ];
         };
-        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "nix-compat-derive" "pin-project-lite" "tokio" "wire" ];
+        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "futures" "nix-compat-derive" "pin-project-lite" "tokio" "wire" ];
       };
       "nix-compat-derive" = rec {
         crateName = "nix-compat-derive";
diff --git a/users/edef/weave/Cargo.lock b/users/edef/weave/Cargo.lock
index 192b029683ef..6b565854c264 100644
--- a/users/edef/weave/Cargo.lock
+++ b/users/edef/weave/Cargo.lock
@@ -1008,6 +1008,7 @@ dependencies = [
  "ed25519",
  "ed25519-dalek",
  "enum-primitive-derive",
+ "futures",
  "glob",
  "mimalloc",
  "nix-compat-derive",
diff --git a/users/edef/weave/Cargo.nix b/users/edef/weave/Cargo.nix
index 61f7170d050d..ea1d8a1b2ef3 100644
--- a/users/edef/weave/Cargo.nix
+++ b/users/edef/weave/Cargo.nix
@@ -2930,6 +2930,11 @@ rec {
             packageId = "enum-primitive-derive";
           }
           {
+            name = "futures";
+            packageId = "futures";
+            optional = true;
+          }
+          {
             name = "glob";
             packageId = "glob";
           }
@@ -2989,6 +2994,10 @@ rec {
         ];
         devDependencies = [
           {
+            name = "futures";
+            packageId = "futures";
+          }
+          {
             name = "mimalloc";
             packageId = "mimalloc";
           }
@@ -3000,14 +3009,15 @@ rec {
         features = {
           "async" = [ "tokio" ];
           "bytes" = [ "dep:bytes" ];
-          "daemon" = [ "tokio" "nix-compat-derive" ];
+          "daemon" = [ "tokio" "nix-compat-derive" "futures" ];
           "default" = [ "async" "daemon" "wire" "nix-compat-derive" ];
+          "futures" = [ "dep:futures" ];
           "nix-compat-derive" = [ "dep:nix-compat-derive" ];
           "pin-project-lite" = [ "dep:pin-project-lite" ];
           "tokio" = [ "dep:tokio" ];
           "wire" = [ "tokio" "pin-project-lite" "bytes" ];
         };
-        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "nix-compat-derive" "pin-project-lite" "tokio" "wire" ];
+        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "futures" "nix-compat-derive" "pin-project-lite" "tokio" "wire" ];
       };
       "nix-compat-derive" = rec {
         crateName = "nix-compat-derive";
diff --git a/users/picnoir/tvix-daemon/Cargo.lock b/users/picnoir/tvix-daemon/Cargo.lock
index 8731e7cbe68b..5acddf235d12 100644
--- a/users/picnoir/tvix-daemon/Cargo.lock
+++ b/users/picnoir/tvix-daemon/Cargo.lock
@@ -430,12 +430,28 @@ dependencies = [
 ]
 
 [[package]]
+name = "futures"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
+dependencies = [
+ "futures-channel",
+ "futures-core",
+ "futures-executor",
+ "futures-io",
+ "futures-sink",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
 name = "futures-channel"
 version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10"
 dependencies = [
  "futures-core",
+ "futures-sink",
 ]
 
 [[package]]
@@ -445,6 +461,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
 
 [[package]]
+name = "futures-executor"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
+dependencies = [
+ "futures-core",
+ "futures-task",
+ "futures-util",
+]
+
+[[package]]
+name = "futures-io"
+version = "0.3.31"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6"
+
+[[package]]
 name = "futures-macro"
 version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -473,9 +506,13 @@ version = "0.3.31"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
 dependencies = [
+ "futures-channel",
  "futures-core",
+ "futures-io",
  "futures-macro",
+ "futures-sink",
  "futures-task",
+ "memchr",
  "pin-project-lite",
  "pin-utils",
  "slab",
@@ -755,6 +792,7 @@ dependencies = [
  "ed25519",
  "ed25519-dalek",
  "enum-primitive-derive",
+ "futures",
  "glob",
  "mimalloc",
  "nix-compat-derive",
diff --git a/users/picnoir/tvix-daemon/Cargo.nix b/users/picnoir/tvix-daemon/Cargo.nix
index 04fa57091bf0..82cbeca5d50f 100644
--- a/users/picnoir/tvix-daemon/Cargo.nix
+++ b/users/picnoir/tvix-daemon/Cargo.nix
@@ -1368,6 +1368,67 @@ rec {
         };
         resolvedDefaultFeatures = [ "alloc" "default" "std" ];
       };
+      "futures" = rec {
+        crateName = "futures";
+        version = "0.3.31";
+        edition = "2018";
+        sha256 = "0xh8ddbkm9jy8kc5gbvjp9a4b6rqqxvc8471yb2qaz5wm2qhgg35";
+        dependencies = [
+          {
+            name = "futures-channel";
+            packageId = "futures-channel";
+            usesDefaultFeatures = false;
+            features = [ "sink" ];
+          }
+          {
+            name = "futures-core";
+            packageId = "futures-core";
+            usesDefaultFeatures = false;
+          }
+          {
+            name = "futures-executor";
+            packageId = "futures-executor";
+            optional = true;
+            usesDefaultFeatures = false;
+          }
+          {
+            name = "futures-io";
+            packageId = "futures-io";
+            usesDefaultFeatures = false;
+          }
+          {
+            name = "futures-sink";
+            packageId = "futures-sink";
+            usesDefaultFeatures = false;
+          }
+          {
+            name = "futures-task";
+            packageId = "futures-task";
+            usesDefaultFeatures = false;
+          }
+          {
+            name = "futures-util";
+            packageId = "futures-util";
+            usesDefaultFeatures = false;
+            features = [ "sink" ];
+          }
+        ];
+        features = {
+          "alloc" = [ "futures-core/alloc" "futures-task/alloc" "futures-sink/alloc" "futures-channel/alloc" "futures-util/alloc" ];
+          "async-await" = [ "futures-util/async-await" "futures-util/async-await-macro" ];
+          "bilock" = [ "futures-util/bilock" ];
+          "compat" = [ "std" "futures-util/compat" ];
+          "default" = [ "std" "async-await" "executor" ];
+          "executor" = [ "std" "futures-executor/std" ];
+          "futures-executor" = [ "dep:futures-executor" ];
+          "io-compat" = [ "compat" "futures-util/io-compat" ];
+          "std" = [ "alloc" "futures-core/std" "futures-task/std" "futures-io/std" "futures-sink/std" "futures-util/std" "futures-util/io" "futures-util/channel" ];
+          "thread-pool" = [ "executor" "futures-executor/thread-pool" ];
+          "unstable" = [ "futures-core/unstable" "futures-task/unstable" "futures-channel/unstable" "futures-io/unstable" "futures-util/unstable" ];
+          "write-all-vectored" = [ "futures-util/write-all-vectored" ];
+        };
+        resolvedDefaultFeatures = [ "alloc" "async-await" "default" "executor" "futures-executor" "std" ];
+      };
       "futures-channel" = rec {
         crateName = "futures-channel";
         version = "0.3.31";
@@ -1380,6 +1441,12 @@ rec {
             packageId = "futures-core";
             usesDefaultFeatures = false;
           }
+          {
+            name = "futures-sink";
+            packageId = "futures-sink";
+            optional = true;
+            usesDefaultFeatures = false;
+          }
         ];
         features = {
           "alloc" = [ "futures-core/alloc" ];
@@ -1388,7 +1455,7 @@ rec {
           "sink" = [ "futures-sink" ];
           "std" = [ "alloc" "futures-core/std" ];
         };
-        resolvedDefaultFeatures = [ "alloc" "default" "std" ];
+        resolvedDefaultFeatures = [ "alloc" "default" "futures-sink" "sink" "std" ];
       };
       "futures-core" = rec {
         crateName = "futures-core";
@@ -1403,6 +1470,48 @@ rec {
         };
         resolvedDefaultFeatures = [ "alloc" "default" "std" ];
       };
+      "futures-executor" = rec {
+        crateName = "futures-executor";
+        version = "0.3.31";
+        edition = "2018";
+        sha256 = "17vcci6mdfzx4gbk0wx64chr2f13wwwpvyf3xd5fb1gmjzcx2a0y";
+        libName = "futures_executor";
+        dependencies = [
+          {
+            name = "futures-core";
+            packageId = "futures-core";
+            usesDefaultFeatures = false;
+          }
+          {
+            name = "futures-task";
+            packageId = "futures-task";
+            usesDefaultFeatures = false;
+          }
+          {
+            name = "futures-util";
+            packageId = "futures-util";
+            usesDefaultFeatures = false;
+          }
+        ];
+        features = {
+          "default" = [ "std" ];
+          "num_cpus" = [ "dep:num_cpus" ];
+          "std" = [ "futures-core/std" "futures-task/std" "futures-util/std" ];
+          "thread-pool" = [ "std" "num_cpus" ];
+        };
+        resolvedDefaultFeatures = [ "std" ];
+      };
+      "futures-io" = rec {
+        crateName = "futures-io";
+        version = "0.3.31";
+        edition = "2018";
+        sha256 = "1ikmw1yfbgvsychmsihdkwa8a1knank2d9a8dk01mbjar9w1np4y";
+        libName = "futures_io";
+        features = {
+          "default" = [ "std" ];
+        };
+        resolvedDefaultFeatures = [ "std" ];
+      };
       "futures-macro" = rec {
         crateName = "futures-macro";
         version = "0.3.31";
@@ -1459,22 +1568,47 @@ rec {
         libName = "futures_util";
         dependencies = [
           {
+            name = "futures-channel";
+            packageId = "futures-channel";
+            optional = true;
+            usesDefaultFeatures = false;
+            features = [ "std" ];
+          }
+          {
             name = "futures-core";
             packageId = "futures-core";
             usesDefaultFeatures = false;
           }
           {
+            name = "futures-io";
+            packageId = "futures-io";
+            optional = true;
+            usesDefaultFeatures = false;
+            features = [ "std" ];
+          }
+          {
             name = "futures-macro";
             packageId = "futures-macro";
             optional = true;
             usesDefaultFeatures = false;
           }
           {
+            name = "futures-sink";
+            packageId = "futures-sink";
+            optional = true;
+            usesDefaultFeatures = false;
+          }
+          {
             name = "futures-task";
             packageId = "futures-task";
             usesDefaultFeatures = false;
           }
           {
+            name = "memchr";
+            packageId = "memchr";
+            optional = true;
+          }
+          {
             name = "pin-project-lite";
             packageId = "pin-project-lite";
           }
@@ -1510,7 +1644,7 @@ rec {
           "unstable" = [ "futures-core/unstable" "futures-task/unstable" ];
           "write-all-vectored" = [ "io" ];
         };
-        resolvedDefaultFeatures = [ "alloc" "async-await" "async-await-macro" "default" "futures-macro" "slab" "std" ];
+        resolvedDefaultFeatures = [ "alloc" "async-await" "async-await-macro" "channel" "default" "futures-channel" "futures-io" "futures-macro" "futures-sink" "io" "memchr" "sink" "slab" "std" ];
       };
       "generic-array" = rec {
         crateName = "generic-array";
@@ -2336,6 +2470,11 @@ rec {
             packageId = "enum-primitive-derive";
           }
           {
+            name = "futures";
+            packageId = "futures";
+            optional = true;
+          }
+          {
             name = "glob";
             packageId = "glob";
           }
@@ -2395,6 +2534,10 @@ rec {
         ];
         devDependencies = [
           {
+            name = "futures";
+            packageId = "futures";
+          }
+          {
             name = "mimalloc";
             packageId = "mimalloc";
           }
@@ -2406,14 +2549,15 @@ rec {
         features = {
           "async" = [ "tokio" ];
           "bytes" = [ "dep:bytes" ];
-          "daemon" = [ "tokio" "nix-compat-derive" ];
+          "daemon" = [ "tokio" "nix-compat-derive" "futures" ];
           "default" = [ "async" "daemon" "wire" "nix-compat-derive" ];
+          "futures" = [ "dep:futures" ];
           "nix-compat-derive" = [ "dep:nix-compat-derive" ];
           "pin-project-lite" = [ "dep:pin-project-lite" ];
           "tokio" = [ "dep:tokio" ];
           "wire" = [ "tokio" "pin-project-lite" "bytes" ];
         };
-        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "nix-compat-derive" "pin-project-lite" "tokio" "wire" ];
+        resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "futures" "nix-compat-derive" "pin-project-lite" "tokio" "wire" ];
       };
       "nix-compat-derive" = rec {
         crateName = "nix-compat-derive";