diff options
-rw-r--r-- | tvix/nix-compat/src/wire/worker_protocol.rs | 106 | ||||
-rw-r--r-- | users/picnoir/tvix-daemon/Cargo.lock | 7 | ||||
-rw-r--r-- | users/picnoir/tvix-daemon/Cargo.nix | 33 | ||||
-rw-r--r-- | users/picnoir/tvix-daemon/Cargo.toml | 1 | ||||
-rw-r--r-- | users/picnoir/tvix-daemon/src/main.rs | 138 |
5 files changed, 121 insertions, 164 deletions
diff --git a/tvix/nix-compat/src/wire/worker_protocol.rs b/tvix/nix-compat/src/wire/worker_protocol.rs index 121b9b2ea5cb..a9d3bd85478b 100644 --- a/tvix/nix-compat/src/wire/worker_protocol.rs +++ b/tvix/nix-compat/src/wire/worker_protocol.rs @@ -7,15 +7,15 @@ use enum_primitive_derive::Primitive; use num_traits::{FromPrimitive, ToPrimitive}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use crate::wire::primitive; +use crate::wire::{bytes, primitive}; use super::bytes::read_string; -pub static WORKER_MAGIC_1: u64 = 0x6e697863; // "nixc" -pub static WORKER_MAGIC_2: u64 = 0x6478696f; // "dxio" +static WORKER_MAGIC_1: u64 = 0x6e697863; // "nixc" +static WORKER_MAGIC_2: u64 = 0x6478696f; // "dxio" pub static STDERR_LAST: u64 = 0x616c7473; // "alts" -/// Protocol version (1.35) -pub static PROTOCOL_VERSION: u64 = 1 << 8 | 35; +/// Protocol version (1.37) +static PROTOCOL_VERSION: [u8; 8] = [37, 1, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]; /// Max length of a Nix setting name/value. In bytes. /// @@ -172,6 +172,75 @@ pub async fn read_client_settings<R: AsyncReadExt + Unpin>( }) } +/// Performs the initial handshake the server is sending to a connecting client. +/// +/// During the handshake, the client first send a magic u64, to which +/// the daemon needs to respond with another magic u64. Then, the +/// daemon retrieve the client version, and discard a bunch of now +/// obsolete data. +/// +/// # Arguments +/// +/// * conn: connection with the Nix client. +/// * nix_version: semantic version of the Nix daemon. "2.18.2" for +/// instance. +/// * trusted: trust level of the Nix client. +/// +/// # Return +/// +/// The protocol version of a client encoded as a u64. +pub async fn server_handshake_client<'a, RW: 'a>( + mut conn: &'a mut RW, + nix_version: &str, + trusted: Trust, +) -> std::io::Result<u64> +where + &'a mut RW: AsyncReadExt + AsyncWriteExt + Unpin, +{ + let worker_magic_1 = primitive::read_u64(&mut conn).await?; + if worker_magic_1 != WORKER_MAGIC_1 { + Err(std::io::Error::new( + ErrorKind::InvalidData, + format!("Incorrect worker magic number received: {}", worker_magic_1), + )) + } else { + primitive::write_u64(&mut conn, WORKER_MAGIC_2).await?; + conn.write_all(&PROTOCOL_VERSION).await?; + conn.flush().await?; + let client_version = primitive::read_u64(&mut conn).await?; + if client_version < 0x10a { + return Err(Error::new( + ErrorKind::Unsupported, + format!("The nix client version {} is too old", client_version), + )); + } + let protocol_minor = client_version & 0x00ff; + let _protocol_major = client_version & 0xff00; + if protocol_minor >= 14 { + // Obsolete CPU affinity. + let read_affinity = primitive::read_u64(&mut conn).await?; + if read_affinity != 0 { + let _cpu_affinity = primitive::read_u64(&mut conn).await?; + }; + } + if protocol_minor >= 11 { + // Obsolete reserveSpace + let _reserve_space = primitive::read_u64(&mut conn).await?; + } + if protocol_minor >= 33 { + // Nix version. We're plain lying, we're not Nix, but eh… + // Setting it to the 2.3 lineage. Not 100% sure this is a + // good idea. + bytes::write_bytes(&mut conn, nix_version).await?; + conn.flush().await?; + } + if protocol_minor >= 35 { + write_worker_trust_level(&mut conn, trusted).await?; + } + Ok(protocol_minor) + } +} + /// Read a worker [Operation] from the wire. pub async fn read_op<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<Operation> { let op_number = primitive::read_u64(r).await?; @@ -204,7 +273,7 @@ pub enum Trust { /// decided not to implement it here. pub async fn write_worker_trust_level<W>(conn: &mut W, t: Trust) -> std::io::Result<()> where - W: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug, + W: AsyncReadExt + AsyncWriteExt + Unpin, { match t { Trust::Trusted => primitive::write_u64(conn, 1).await, @@ -219,6 +288,31 @@ mod tests { use tokio_test::io::Builder; #[tokio::test] + async fn test_init_hanshake() { + let mut test_conn = tokio_test::io::Builder::new() + .read(&WORKER_MAGIC_1.to_le_bytes()) + .write(&WORKER_MAGIC_2.to_le_bytes()) + .write(&PROTOCOL_VERSION) + // Let's say the client is in sync with the daemon + // protocol-wise + .read(&PROTOCOL_VERSION) + // cpu affinity + .read(&[0; 8]) + // reservespace + .read(&[0; 8]) + // version (size) + .write(&[0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + // version (data == 2.18.2 + padding) + .write(&[50, 46, 49, 56, 46, 50, 0, 0]) + // Trusted (1 == client trusted + .write(&[1, 0, 0, 0, 0, 0, 0, 0]) + .build(); + server_handshake_client(&mut test_conn, "2.18.2", Trust::Trusted) + .await + .unwrap(); + } + + #[tokio::test] async fn test_read_client_settings_without_overrides() { // Client settings bits captured from a Nix 2.3.17 run w/ sockdump (protocol version 21). let wire_bits = hex!( diff --git a/users/picnoir/tvix-daemon/Cargo.lock b/users/picnoir/tvix-daemon/Cargo.lock index 5522c63032a2..b3b7e43af5c8 100644 --- a/users/picnoir/tvix-daemon/Cargo.lock +++ b/users/picnoir/tvix-daemon/Cargo.lock @@ -66,12 +66,6 @@ dependencies = [ ] [[package]] -name = "anyhow" -version = "1.0.81" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" - -[[package]] name = "async-stream" version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1350,7 +1344,6 @@ dependencies = [ name = "tvix-daemon" version = "0.1.0" dependencies = [ - "anyhow", "clap", "nix-compat", "tokio", diff --git a/users/picnoir/tvix-daemon/Cargo.nix b/users/picnoir/tvix-daemon/Cargo.nix index aa3faf4a8986..8dc9b541757b 100644 --- a/users/picnoir/tvix-daemon/Cargo.nix +++ b/users/picnoir/tvix-daemon/Cargo.nix @@ -1,5 +1,5 @@ -# This file was @generated by crate2nix 0.13.0 with the command: -# "generate" "--all-features" +# This file was @generated by crate2nix 0.12.0 with the command: +# "generate" # See https://github.com/kolloch/crate2nix for more info. { nixpkgs ? <nixpkgs> @@ -229,20 +229,6 @@ rec { ]; }; - "anyhow" = rec { - crateName = "anyhow"; - version = "1.0.81"; - edition = "2018"; - sha256 = "0ivj2k7vajai9hc11lhjp73n6rqs1ykg6wbjjjl1mz9adj580lh9"; - authors = [ - "David Tolnay <dtolnay@gmail.com>" - ]; - features = { - "backtrace" = [ "dep:backtrace" ]; - "default" = [ "std" ]; - }; - resolvedDefaultFeatures = [ "default" "std" ]; - }; "async-stream" = rec { crateName = "async-stream"; version = "0.3.5"; @@ -4160,10 +4146,6 @@ rec { else ./.; dependencies = [ { - name = "anyhow"; - packageId = "anyhow"; - } - { name = "clap"; packageId = "clap"; features = [ "derive" "env" ]; @@ -5211,7 +5193,6 @@ rec { ( _: { buildTests = true; - release = false; } ); # If the user hasn't set any pre/post commands, we don't want to @@ -5236,16 +5217,6 @@ rec { # recreate a file hierarchy as when running tests with cargo # the source for test data - # It's necessary to locate the source in $NIX_BUILD_TOP/source/ - # instead of $NIX_BUILD_TOP/ - # because we compiled those test binaries in the former and not the latter. - # So all paths will expect source tree to be there and not in the build top directly. - # For example: $NIX_BUILD_TOP := /build in general, if you ask yourself. - # TODO(raitobezarius): I believe there could be more edge cases if `crate.sourceRoot` - # do exist but it's very hard to reason about them, so let's wait until the first bug report. - mkdir -p source/ - cd source/ - ${pkgs.buildPackages.xorg.lndir}/bin/lndir ${crate.src} # build outputs diff --git a/users/picnoir/tvix-daemon/Cargo.toml b/users/picnoir/tvix-daemon/Cargo.toml index 00a7b7c10ada..091116f5b5d6 100644 --- a/users/picnoir/tvix-daemon/Cargo.toml +++ b/users/picnoir/tvix-daemon/Cargo.toml @@ -9,7 +9,6 @@ tokio-listener = "0.3.1" tokio = { version = "1.36.0", features = ["full"] } tracing-subscriber = "0.3.18" tracing = "0.1.40" -anyhow = "1.0.81" clap = { version = "4.5.3", features = ["derive", "env"] } [dev-dependencies] diff --git a/users/picnoir/tvix-daemon/src/main.rs b/users/picnoir/tvix-daemon/src/main.rs index 398c8af01fcc..82220637c683 100644 --- a/users/picnoir/tvix-daemon/src/main.rs +++ b/users/picnoir/tvix-daemon/src/main.rs @@ -1,12 +1,11 @@ -use anyhow::anyhow; use clap::Parser; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_listener::{self, SystemOptions, UserOptions}; use tracing::{debug, error, info, instrument, Level}; use nix_compat::wire::{ - bytes, primitive, - worker_protocol::{self, ClientSettings}, + primitive, + worker_protocol::{self, server_handshake_client, ClientSettings, Trust}, }; #[derive(Parser, Debug)] @@ -19,15 +18,6 @@ struct Cli { verbosity: Option<Level>, } -/// Structure used to hold the client socket connection and some -/// metadata about the connection. -#[derive(Debug)] -struct ClientConnection<R: AsyncReadExt + AsyncWriteExt + Unpin> { - conn: R, - version_minor: u64, - client_settings: Option<ClientSettings>, -} - #[tokio::main] #[instrument()] async fn main() { @@ -62,6 +52,15 @@ async fn main() { } } +/// Structure used to hold the client socket connection and some +/// metadata about the connection. +#[derive(Debug)] +struct ClientConnection<R: AsyncReadExt + AsyncWriteExt + Unpin> { + pub conn: R, + pub version_minor: u64, + pub client_settings: Option<ClientSettings>, +} + /// Worker in charge to respond a Nix client using the Nix wire /// protocol. #[instrument()] @@ -69,9 +68,15 @@ async fn worker<R>(mut conn: R) where R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug, { - match perform_init_handshake(&mut conn).await { - Ok(mut client_connection) => { + match server_handshake_client(&mut conn, "2.18.2", Trust::Trusted).await { + Ok(client_protocol_version) => { + let mut client_connection = ClientConnection { + conn, + version_minor: client_protocol_version, + client_settings: None, + }; debug!("Client hanshake succeeded"); + debug!(client_protocol_version = ?client_protocol_version); // TODO: implement logging. For now, we'll just send // STDERR_LAST, which is good enough to get Nix respond to // us. @@ -110,108 +115,3 @@ where primitive::write_u64(&mut conn.conn, worker_protocol::STDERR_LAST).await?; Ok(settings) } - -/// Performs the initial handshake. During the handshake, the client -/// will first send a magic u64, to which the daemon needs to respond -/// with another magic u64. -/// -/// We then retrieve the client version, and discard a bunch of now -/// obsolete data. -#[instrument()] -async fn perform_init_handshake<'a, R: 'a>( - mut conn: &'a mut R, -) -> anyhow::Result<ClientConnection<&'a mut R>> -where - &'a mut R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug, -{ - let worker_magic_1 = primitive::read_u64(&mut conn).await?; - debug!("Hello read"); - if worker_magic_1 != worker_protocol::WORKER_MAGIC_1 { - Err(anyhow!( - "Invalid client hello received: {:?}, expected {:?}", - worker_magic_1, - worker_protocol::WORKER_MAGIC_1 - )) - } else { - primitive::write_u64(&mut conn, worker_protocol::WORKER_MAGIC_2).await?; - primitive::write_u64(&mut conn, worker_protocol::PROTOCOL_VERSION).await?; - conn.flush().await?; - debug!("Hello responded"); - let client_version = primitive::read_u64(&mut conn).await?; - debug!("Version read"); - if client_version < 0x10a { - return Err(anyhow!("The nix client version is too old")); - } - let protocol_minor = client_version & 0x00ff; - let protocol_major = client_version & 0xff00; - debug!(client.version = %client_version, client.minor = %protocol_minor, client.major = %protocol_major); - if protocol_minor >= 14 { - debug!("read cpu affinity"); - // Obsolete CPU affinity. - let read_affinity = primitive::read_u64(&mut conn).await?; - if read_affinity != 0 { - skip_8_bytes(&mut conn).await?; - }; - } - if protocol_minor >= 11 { - // Obsolete reserveSpace - debug!("read reservespace"); - skip_8_bytes(&mut conn).await?; - } - if protocol_minor >= 33 { - // Nix version. We're plain lying, we're not Nix, but eh… - // Setting it to the 2.3 lineage. Not 100% sure this is a - // good idea. - debug!("write version"); - // Plain str padded to 64 bits. - bytes::write_bytes(&mut conn, "2.3.17").await?; - conn.flush().await?; - } - if protocol_minor >= 35 { - worker_protocol::write_worker_trust_level(&mut conn, worker_protocol::Trust::Trusted) - .await?; - info!("Trust sent"); - } - Ok(ClientConnection { - conn, - version_minor: protocol_minor, - client_settings: None, - }) - } -} - -async fn skip_8_bytes<R>(conn: &mut R) -> anyhow::Result<()> -where - R: AsyncReadExt + Unpin + std::fmt::Debug, -{ - let mut _discard_buffer = [0; 8]; - conn.read_exact(&mut _discard_buffer).await?; - Ok(()) -} - -#[cfg(test)] -mod integration_tests { - use nix_compat::wire::worker_protocol; - #[tokio::test] - async fn test_init_handshake() { - let mut test_conn = tokio_test::io::Builder::new() - .read(&worker_protocol::WORKER_MAGIC_1.to_le_bytes()) - .write(&worker_protocol::WORKER_MAGIC_2.to_le_bytes()) - .write(&worker_protocol::PROTOCOL_VERSION.to_le_bytes()) - // Let's say the client is in sync with the daemon - // protocol-wise - .read(&worker_protocol::PROTOCOL_VERSION.to_le_bytes()) - // cpu affinity - .read(&vec![0; 8]) - // reservespace - .read(&vec![0; 8]) - // version (size) - .write(&vec![0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) - // version (data == 2.2.17 + padding) - .write(&vec![50, 46, 51, 46, 49, 55, 0, 0]) - // Trusted (1 == client trusted - .write(&vec![1, 0, 0, 0, 0, 0, 0, 0]) - .build(); - crate::perform_init_handshake(&mut test_conn).await.unwrap(); - } -} |