diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/Cargo.nix | 10 | ||||
-rw-r--r-- | tvix/nix-compat/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/nix-compat/src/nix_daemon/handler.rs | 31 | ||||
-rw-r--r-- | tvix/nix-compat/src/nix_daemon/mod.rs | 197 | ||||
-rw-r--r-- | tvix/nix-compat/src/nix_daemon/types.rs | 14 | ||||
-rw-r--r-- | tvix/nix-daemon/src/lib.rs | 21 |
6 files changed, 264 insertions, 12 deletions
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 5a6628af6dd5..857138412736 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -7223,6 +7223,11 @@ rec { packageId = "enum-primitive-derive"; } { + name = "futures"; + packageId = "futures"; + optional = true; + } + { name = "glob"; packageId = "glob"; } @@ -7332,14 +7337,15 @@ rec { features = { "async" = [ "tokio" ]; "bytes" = [ "dep:bytes" ]; - "daemon" = [ "tokio" "nix-compat-derive" ]; + "daemon" = [ "tokio" "nix-compat-derive" "futures" ]; "default" = [ "async" "daemon" "wire" "nix-compat-derive" ]; + "futures" = [ "dep:futures" ]; "nix-compat-derive" = [ "dep:nix-compat-derive" ]; "pin-project-lite" = [ "dep:pin-project-lite" ]; "tokio" = [ "dep:tokio" ]; "wire" = [ "tokio" "pin-project-lite" "bytes" ]; }; - resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "nix-compat-derive" "pin-project-lite" "test" "tokio" "wire" ]; + resolvedDefaultFeatures = [ "async" "bytes" "daemon" "default" "futures" "nix-compat-derive" "pin-project-lite" "test" "tokio" "wire" ]; }; "nix-compat-derive" = rec { crateName = "nix-compat-derive"; diff --git a/tvix/nix-compat/Cargo.toml b/tvix/nix-compat/Cargo.toml index e9b44ddb7adf..160eb2c20c16 100644 --- a/tvix/nix-compat/Cargo.toml +++ b/tvix/nix-compat/Cargo.toml @@ -10,7 +10,7 @@ async = ["tokio"] wire = ["tokio", "pin-project-lite", "bytes"] # nix-daemon protocol handling -daemon = ["tokio", "nix-compat-derive"] +daemon = ["tokio", "nix-compat-derive", "futures"] test = [] # Enable all features by default. @@ -23,6 +23,7 @@ data-encoding = { workspace = true } ed25519 = { workspace = true } ed25519-dalek = { workspace = true } enum-primitive-derive = { workspace = true } +futures = { workspace = true, optional = true } glob = { workspace = true } mimalloc = { workspace = true } nom = { workspace = true } diff --git a/tvix/nix-compat/src/nix_daemon/handler.rs b/tvix/nix-compat/src/nix_daemon/handler.rs index 7ac281ec2ceb..a61a4810c807 100644 --- a/tvix/nix-compat/src/nix_daemon/handler.rs +++ b/tvix/nix-compat/src/nix_daemon/handler.rs @@ -1,5 +1,6 @@ use std::{future::Future, sync::Arc}; +use bytes::Bytes; use tokio::{ io::{split, AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf}, sync::Mutex, @@ -7,6 +8,7 @@ use tokio::{ use tracing::debug; use super::{ + types::QueryValidPaths, worker_protocol::{server_handshake_client, ClientSettings, Operation, Trust, STDERR_LAST}, NixDaemonIO, }; @@ -114,7 +116,16 @@ where loop { let op_code = self.reader.read_number().await?; match TryInto::<Operation>::try_into(op_code) { + // Note: please keep operations sorted in ascending order of their numerical op number. Ok(operation) => match operation { + Operation::IsValidPath => { + let path: StorePath<String> = self.reader.read_value().await?; + self.handle(io.is_valid_path(&path)).await? + } + // Note this operation does not currently delegate to NixDaemonIO, + // The general idea is that we will pass relevant ClientSettings + // into individual NixDaemonIO method calls if the need arises. + // For now we just store the settings in the NixDaemon for future use. Operation::SetOptions => { self.client_settings = self.reader.read_value().await?; self.handle(async { Ok(()) }).await? @@ -123,10 +134,17 @@ where let path: StorePath<String> = self.reader.read_value().await?; self.handle(io.query_path_info(&path)).await? } - Operation::IsValidPath => { + Operation::QueryPathFromHashPart => { + let hash: Bytes = self.reader.read_value().await?; + self.handle(io.query_path_from_hash_part(&hash)).await? + } + Operation::QueryValidPaths => { + let query: QueryValidPaths = self.reader.read_value().await?; + self.handle(io.query_valid_paths(&query)).await? + } + Operation::QueryValidDerivers => { let path: StorePath<String> = self.reader.read_value().await?; - self.handle(async { Ok(io.query_path_info(&path).await?.is_some()) }) - .await? + self.handle(io.query_valid_derivers(&path)).await? } _ => { return Err(std::io::Error::other(format!( @@ -202,6 +220,13 @@ mod tests { ) -> Result<Option<UnkeyedValidPathInfo>> { Ok(None) } + + async fn query_path_from_hash_part( + &self, + _hash: &[u8], + ) -> Result<Option<UnkeyedValidPathInfo>> { + Ok(None) + } } #[tokio::test] diff --git a/tvix/nix-compat/src/nix_daemon/mod.rs b/tvix/nix-compat/src/nix_daemon/mod.rs index d6e60aa9a4dc..e475263d2302 100644 --- a/tvix/nix-compat/src/nix_daemon/mod.rs +++ b/tvix/nix-compat/src/nix_daemon/mod.rs @@ -2,7 +2,9 @@ pub mod worker_protocol; use std::io::Result; -use types::UnkeyedValidPathInfo; +use futures::future::try_join_all; +use tracing::warn; +use types::{QueryValidPaths, UnkeyedValidPathInfo}; use crate::store_path::StorePath; @@ -10,9 +12,200 @@ pub mod handler; pub mod types; /// Represents all possible operations over the nix-daemon protocol. -pub trait NixDaemonIO { +pub trait NixDaemonIO: Sync { + fn is_valid_path( + &self, + path: &StorePath<String>, + ) -> impl std::future::Future<Output = Result<bool>> + Send { + async move { Ok(self.query_path_info(path).await?.is_some()) } + } + fn query_path_info( &self, path: &StorePath<String>, ) -> impl std::future::Future<Output = Result<Option<UnkeyedValidPathInfo>>> + Send; + + fn query_path_from_hash_part( + &self, + hash: &[u8], + ) -> impl std::future::Future<Output = Result<Option<UnkeyedValidPathInfo>>> + Send; + + fn query_valid_paths( + &self, + request: &QueryValidPaths, + ) -> impl std::future::Future<Output = Result<Vec<UnkeyedValidPathInfo>>> + Send { + async move { + if request.substitute { + warn!("tvix does not yet support substitution, ignoring the 'substitute' flag..."); + } + // Using try_join_all here to avoid returning partial results to the client. + // The only reason query_path_info can fail is due to transient IO errors, + // so we return such errors to the client as opposed to only returning paths + // that succeeded. + let result = + try_join_all(request.paths.iter().map(|path| self.query_path_info(path))).await?; + let result: Vec<UnkeyedValidPathInfo> = result.into_iter().flatten().collect(); + Ok(result) + } + } + + fn query_valid_derivers( + &self, + path: &StorePath<String>, + ) -> impl std::future::Future<Output = Result<Vec<StorePath<String>>>> + Send { + async move { + let result = self.query_path_info(path).await?; + let result: Vec<_> = result.into_iter().filter_map(|info| info.deriver).collect(); + Ok(result) + } + } +} + +#[cfg(test)] +mod tests { + + use crate::{nix_daemon::types::QueryValidPaths, store_path::StorePath}; + + use super::{types::UnkeyedValidPathInfo, NixDaemonIO}; + + // Very simple mock + // Unable to use mockall as it does not support unboxed async traits. + pub struct MockNixDaemonIO { + query_path_info_result: Option<UnkeyedValidPathInfo>, + } + + impl NixDaemonIO for MockNixDaemonIO { + async fn query_path_info( + &self, + _path: &StorePath<String>, + ) -> std::io::Result<Option<UnkeyedValidPathInfo>> { + Ok(self.query_path_info_result.clone()) + } + + async fn query_path_from_hash_part( + &self, + _hash: &[u8], + ) -> std::io::Result<Option<UnkeyedValidPathInfo>> { + Ok(None) + } + } + + #[tokio::test] + async fn test_is_valid_path_returns_true() { + let path = + StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes()) + .unwrap(); + let io = MockNixDaemonIO { + query_path_info_result: Some(UnkeyedValidPathInfo::default()), + }; + + let result = io + .is_valid_path(&path) + .await + .expect("expected to get a non-empty response"); + assert!(result, "expected to get true"); + } + + #[tokio::test] + async fn test_is_valid_path_returns_false() { + let path = + StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes()) + .unwrap(); + let io = MockNixDaemonIO { + query_path_info_result: None, + }; + + let result = io + .is_valid_path(&path) + .await + .expect("expected to get a non-empty response"); + assert!(!result, "expected to get false"); + } + + #[tokio::test] + async fn test_query_valid_paths_returns_empty_response() { + let path = + StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes()) + .unwrap(); + let io = MockNixDaemonIO { + query_path_info_result: None, + }; + + let result = io + .query_valid_paths(&QueryValidPaths { + paths: vec![path], + substitute: false, + }) + .await + .expect("expected to get a non-empty response"); + assert_eq!(result, vec![], "expected to get empty response"); + } + + #[tokio::test] + async fn test_query_valid_paths_returns_non_empty_response() { + let path = + StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes()) + .unwrap(); + let io = MockNixDaemonIO { + query_path_info_result: Some(UnkeyedValidPathInfo::default()), + }; + + let result = io + .query_valid_paths(&QueryValidPaths { + paths: vec![path], + substitute: false, + }) + .await + .expect("expected to get a non-empty response"); + assert_eq!( + result, + vec![UnkeyedValidPathInfo::default()], + "expected to get non empty response" + ); + } + + #[tokio::test] + async fn test_query_valid_derivers_returns_empty_response() { + let path = + StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes()) + .unwrap(); + let io = MockNixDaemonIO { + query_path_info_result: None, + }; + + let result = io + .query_valid_derivers(&path) + .await + .expect("expected to get a non-empty response"); + assert_eq!(result, vec![], "expected to get empty response"); + } + + #[tokio::test] + async fn test_query_valid_derivers_returns_non_empty_response() { + let path = + StorePath::<String>::from_bytes("z6r3bn5l51679pwkvh9nalp6c317z34m-hello".as_bytes()) + .unwrap(); + let deriver = StorePath::<String>::from_bytes( + "z6r3bn5l51679pwkvh9nalp6c317z34m-hello.drv".as_bytes(), + ) + .unwrap(); + let io = MockNixDaemonIO { + query_path_info_result: Some(UnkeyedValidPathInfo { + deriver: Some(deriver.clone()), + nar_hash: "".to_owned(), + references: vec![], + registration_time: 0, + nar_size: 1, + ultimate: true, + signatures: vec![], + ca: None, + }), + }; + + let result = io + .query_valid_derivers(&path) + .await + .expect("expected to get a non-empty response"); + assert_eq!(result, vec![deriver], "expected to get non empty response"); + } } diff --git a/tvix/nix-compat/src/nix_daemon/types.rs b/tvix/nix-compat/src/nix_daemon/types.rs index db7d26321e3e..bf7b1e6f6e58 100644 --- a/tvix/nix-compat/src/nix_daemon/types.rs +++ b/tvix/nix-compat/src/nix_daemon/types.rs @@ -152,7 +152,7 @@ impl NixSerialize for Option<StorePath<String>> { } } -#[derive(NixSerialize, Debug)] +#[derive(NixSerialize, Debug, Clone, Default, PartialEq)] pub struct UnkeyedValidPathInfo { pub deriver: Option<StorePath<String>>, pub nar_hash: String, @@ -164,5 +164,13 @@ pub struct UnkeyedValidPathInfo { pub ca: Option<CAHash>, } -#[cfg(test)] -mod tests {} +/// Request tupe for [super::worker_protocol::Operation::QueryValidPaths] +#[derive(NixDeserialize)] +pub struct QueryValidPaths { + // Paths to query + pub paths: Vec<StorePath<String>>, + + // Whether to try and substitute the paths. + #[nix(version = "27..")] + pub substitute: bool, +} diff --git a/tvix/nix-daemon/src/lib.rs b/tvix/nix-daemon/src/lib.rs index 89bfbf9b3dc0..e508d0750c9b 100644 --- a/tvix/nix-daemon/src/lib.rs +++ b/tvix/nix-daemon/src/lib.rs @@ -1,4 +1,7 @@ -use std::{io::Result, sync::Arc}; +use std::{ + io::{Error, Result}, + sync::Arc, +}; use nix_compat::{ nix_daemon::{types::UnkeyedValidPathInfo, NixDaemonIO}, @@ -25,6 +28,22 @@ impl NixDaemonIO for TvixDaemon { path: &StorePath<String>, ) -> Result<Option<UnkeyedValidPathInfo>> { match self.path_info_service.get(*path.digest()).await? { + Some(path_info) => { + if path_info.store_path.name() == path.name() { + Ok(Some(into_unkeyed_path_info(path_info))) + } else { + Ok(None) + } + } + None => Ok(None), + } + } + + async fn query_path_from_hash_part(&self, hash: &[u8]) -> Result<Option<UnkeyedValidPathInfo>> { + let digest = hash + .try_into() + .map_err(|_| Error::other("invalid digest length"))?; + match self.path_info_service.get(digest).await? { Some(path_info) => Ok(Some(into_unkeyed_path_info(path_info))), None => Ok(None), } |