about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/nix-compat/src/wire/worker_protocol.rs106
-rw-r--r--users/picnoir/tvix-daemon/Cargo.lock7
-rw-r--r--users/picnoir/tvix-daemon/Cargo.nix33
-rw-r--r--users/picnoir/tvix-daemon/Cargo.toml1
-rw-r--r--users/picnoir/tvix-daemon/src/main.rs138
5 files changed, 121 insertions, 164 deletions
diff --git a/tvix/nix-compat/src/wire/worker_protocol.rs b/tvix/nix-compat/src/wire/worker_protocol.rs
index 121b9b2ea5cb..a9d3bd85478b 100644
--- a/tvix/nix-compat/src/wire/worker_protocol.rs
+++ b/tvix/nix-compat/src/wire/worker_protocol.rs
@@ -7,15 +7,15 @@ use enum_primitive_derive::Primitive;
 use num_traits::{FromPrimitive, ToPrimitive};
 use tokio::io::{AsyncReadExt, AsyncWriteExt};
 
-use crate::wire::primitive;
+use crate::wire::{bytes, primitive};
 
 use super::bytes::read_string;
 
-pub static WORKER_MAGIC_1: u64 = 0x6e697863; // "nixc"
-pub static WORKER_MAGIC_2: u64 = 0x6478696f; // "dxio"
+static WORKER_MAGIC_1: u64 = 0x6e697863; // "nixc"
+static WORKER_MAGIC_2: u64 = 0x6478696f; // "dxio"
 pub static STDERR_LAST: u64 = 0x616c7473; // "alts"
-/// Protocol version (1.35)
-pub static PROTOCOL_VERSION: u64 = 1 << 8 | 35;
+/// Protocol version (1.37)
+static PROTOCOL_VERSION: [u8; 8] = [37, 1, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
 
 /// Max length of a Nix setting name/value. In bytes.
 ///
@@ -172,6 +172,75 @@ pub async fn read_client_settings<R: AsyncReadExt + Unpin>(
     })
 }
 
+/// Performs the initial handshake the server is sending to a connecting client.
+///
+/// During the handshake, the client first send a magic u64, to which
+/// the daemon needs to respond with another magic u64. Then, the
+/// daemon retrieve the client version, and discard a bunch of now
+/// obsolete data.
+///
+/// # Arguments
+///
+/// * conn: connection with the Nix client.
+/// * nix_version: semantic version of the Nix daemon. "2.18.2" for
+///   instance.
+/// * trusted: trust level of the Nix client.
+///
+/// # Return
+///
+/// The protocol version of a client encoded as a u64.
+pub async fn server_handshake_client<'a, RW: 'a>(
+    mut conn: &'a mut RW,
+    nix_version: &str,
+    trusted: Trust,
+) -> std::io::Result<u64>
+where
+    &'a mut RW: AsyncReadExt + AsyncWriteExt + Unpin,
+{
+    let worker_magic_1 = primitive::read_u64(&mut conn).await?;
+    if worker_magic_1 != WORKER_MAGIC_1 {
+        Err(std::io::Error::new(
+            ErrorKind::InvalidData,
+            format!("Incorrect worker magic number received: {}", worker_magic_1),
+        ))
+    } else {
+        primitive::write_u64(&mut conn, WORKER_MAGIC_2).await?;
+        conn.write_all(&PROTOCOL_VERSION).await?;
+        conn.flush().await?;
+        let client_version = primitive::read_u64(&mut conn).await?;
+        if client_version < 0x10a {
+            return Err(Error::new(
+                ErrorKind::Unsupported,
+                format!("The nix client version {} is too old", client_version),
+            ));
+        }
+        let protocol_minor = client_version & 0x00ff;
+        let _protocol_major = client_version & 0xff00;
+        if protocol_minor >= 14 {
+            // Obsolete CPU affinity.
+            let read_affinity = primitive::read_u64(&mut conn).await?;
+            if read_affinity != 0 {
+                let _cpu_affinity = primitive::read_u64(&mut conn).await?;
+            };
+        }
+        if protocol_minor >= 11 {
+            // Obsolete reserveSpace
+            let _reserve_space = primitive::read_u64(&mut conn).await?;
+        }
+        if protocol_minor >= 33 {
+            // Nix version. We're plain lying, we're not Nix, but eh…
+            // Setting it to the 2.3 lineage. Not 100% sure this is a
+            // good idea.
+            bytes::write_bytes(&mut conn, nix_version).await?;
+            conn.flush().await?;
+        }
+        if protocol_minor >= 35 {
+            write_worker_trust_level(&mut conn, trusted).await?;
+        }
+        Ok(protocol_minor)
+    }
+}
+
 /// Read a worker [Operation] from the wire.
 pub async fn read_op<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<Operation> {
     let op_number = primitive::read_u64(r).await?;
@@ -204,7 +273,7 @@ pub enum Trust {
 /// decided not to implement it here.
 pub async fn write_worker_trust_level<W>(conn: &mut W, t: Trust) -> std::io::Result<()>
 where
-    W: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
+    W: AsyncReadExt + AsyncWriteExt + Unpin,
 {
     match t {
         Trust::Trusted => primitive::write_u64(conn, 1).await,
@@ -219,6 +288,31 @@ mod tests {
     use tokio_test::io::Builder;
 
     #[tokio::test]
+    async fn test_init_hanshake() {
+        let mut test_conn = tokio_test::io::Builder::new()
+            .read(&WORKER_MAGIC_1.to_le_bytes())
+            .write(&WORKER_MAGIC_2.to_le_bytes())
+            .write(&PROTOCOL_VERSION)
+            // Let's say the client is in sync with the daemon
+            // protocol-wise
+            .read(&PROTOCOL_VERSION)
+            // cpu affinity
+            .read(&[0; 8])
+            // reservespace
+            .read(&[0; 8])
+            // version (size)
+            .write(&[0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
+            // version (data == 2.18.2 + padding)
+            .write(&[50, 46, 49, 56, 46, 50, 0, 0])
+            // Trusted (1 == client trusted
+            .write(&[1, 0, 0, 0, 0, 0, 0, 0])
+            .build();
+        server_handshake_client(&mut test_conn, "2.18.2", Trust::Trusted)
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
     async fn test_read_client_settings_without_overrides() {
         // Client settings bits captured from a Nix 2.3.17 run w/ sockdump (protocol version 21).
         let wire_bits = hex!(
diff --git a/users/picnoir/tvix-daemon/Cargo.lock b/users/picnoir/tvix-daemon/Cargo.lock
index 5522c63032a2..b3b7e43af5c8 100644
--- a/users/picnoir/tvix-daemon/Cargo.lock
+++ b/users/picnoir/tvix-daemon/Cargo.lock
@@ -66,12 +66,6 @@ dependencies = [
 ]
 
 [[package]]
-name = "anyhow"
-version = "1.0.81"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247"
-
-[[package]]
 name = "async-stream"
 version = "0.3.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1350,7 +1344,6 @@ dependencies = [
 name = "tvix-daemon"
 version = "0.1.0"
 dependencies = [
- "anyhow",
  "clap",
  "nix-compat",
  "tokio",
diff --git a/users/picnoir/tvix-daemon/Cargo.nix b/users/picnoir/tvix-daemon/Cargo.nix
index aa3faf4a8986..8dc9b541757b 100644
--- a/users/picnoir/tvix-daemon/Cargo.nix
+++ b/users/picnoir/tvix-daemon/Cargo.nix
@@ -1,5 +1,5 @@
-# This file was @generated by crate2nix 0.13.0 with the command:
-#   "generate" "--all-features"
+# This file was @generated by crate2nix 0.12.0 with the command:
+#   "generate"
 # See https://github.com/kolloch/crate2nix for more info.
 
 { nixpkgs ? <nixpkgs>
@@ -229,20 +229,6 @@ rec {
         ];
 
       };
-      "anyhow" = rec {
-        crateName = "anyhow";
-        version = "1.0.81";
-        edition = "2018";
-        sha256 = "0ivj2k7vajai9hc11lhjp73n6rqs1ykg6wbjjjl1mz9adj580lh9";
-        authors = [
-          "David Tolnay <dtolnay@gmail.com>"
-        ];
-        features = {
-          "backtrace" = [ "dep:backtrace" ];
-          "default" = [ "std" ];
-        };
-        resolvedDefaultFeatures = [ "default" "std" ];
-      };
       "async-stream" = rec {
         crateName = "async-stream";
         version = "0.3.5";
@@ -4160,10 +4146,6 @@ rec {
           else ./.;
         dependencies = [
           {
-            name = "anyhow";
-            packageId = "anyhow";
-          }
-          {
             name = "clap";
             packageId = "clap";
             features = [ "derive" "env" ];
@@ -5211,7 +5193,6 @@ rec {
               (
                 _: {
                   buildTests = true;
-                  release = false;
                 }
               );
             # If the user hasn't set any pre/post commands, we don't want to
@@ -5236,16 +5217,6 @@ rec {
             # recreate a file hierarchy as when running tests with cargo
 
             # the source for test data
-            # It's necessary to locate the source in $NIX_BUILD_TOP/source/
-            # instead of $NIX_BUILD_TOP/
-            # because we compiled those test binaries in the former and not the latter.
-            # So all paths will expect source tree to be there and not in the build top directly.
-            # For example: $NIX_BUILD_TOP := /build in general, if you ask yourself.
-            # TODO(raitobezarius): I believe there could be more edge cases if `crate.sourceRoot`
-            # do exist but it's very hard to reason about them, so let's wait until the first bug report.
-            mkdir -p source/
-            cd source/
-
             ${pkgs.buildPackages.xorg.lndir}/bin/lndir ${crate.src}
 
             # build outputs
diff --git a/users/picnoir/tvix-daemon/Cargo.toml b/users/picnoir/tvix-daemon/Cargo.toml
index 00a7b7c10ada..091116f5b5d6 100644
--- a/users/picnoir/tvix-daemon/Cargo.toml
+++ b/users/picnoir/tvix-daemon/Cargo.toml
@@ -9,7 +9,6 @@ tokio-listener = "0.3.1"
 tokio = { version = "1.36.0", features = ["full"] }
 tracing-subscriber = "0.3.18"
 tracing = "0.1.40"
-anyhow = "1.0.81"
 clap = { version = "4.5.3", features = ["derive", "env"] }
 
 [dev-dependencies]
diff --git a/users/picnoir/tvix-daemon/src/main.rs b/users/picnoir/tvix-daemon/src/main.rs
index 398c8af01fcc..82220637c683 100644
--- a/users/picnoir/tvix-daemon/src/main.rs
+++ b/users/picnoir/tvix-daemon/src/main.rs
@@ -1,12 +1,11 @@
-use anyhow::anyhow;
 use clap::Parser;
 use tokio::io::{AsyncReadExt, AsyncWriteExt};
 use tokio_listener::{self, SystemOptions, UserOptions};
 use tracing::{debug, error, info, instrument, Level};
 
 use nix_compat::wire::{
-    bytes, primitive,
-    worker_protocol::{self, ClientSettings},
+    primitive,
+    worker_protocol::{self, server_handshake_client, ClientSettings, Trust},
 };
 
 #[derive(Parser, Debug)]
@@ -19,15 +18,6 @@ struct Cli {
     verbosity: Option<Level>,
 }
 
-/// Structure used to hold the client socket connection and some
-/// metadata about the connection.
-#[derive(Debug)]
-struct ClientConnection<R: AsyncReadExt + AsyncWriteExt + Unpin> {
-    conn: R,
-    version_minor: u64,
-    client_settings: Option<ClientSettings>,
-}
-
 #[tokio::main]
 #[instrument()]
 async fn main() {
@@ -62,6 +52,15 @@ async fn main() {
     }
 }
 
+/// Structure used to hold the client socket connection and some
+/// metadata about the connection.
+#[derive(Debug)]
+struct ClientConnection<R: AsyncReadExt + AsyncWriteExt + Unpin> {
+    pub conn: R,
+    pub version_minor: u64,
+    pub client_settings: Option<ClientSettings>,
+}
+
 /// Worker in charge to respond a Nix client using the Nix wire
 /// protocol.
 #[instrument()]
@@ -69,9 +68,15 @@ async fn worker<R>(mut conn: R)
 where
     R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
 {
-    match perform_init_handshake(&mut conn).await {
-        Ok(mut client_connection) => {
+    match server_handshake_client(&mut conn, "2.18.2", Trust::Trusted).await {
+        Ok(client_protocol_version) => {
+            let mut client_connection = ClientConnection {
+                conn,
+                version_minor: client_protocol_version,
+                client_settings: None,
+            };
             debug!("Client hanshake succeeded");
+            debug!(client_protocol_version = ?client_protocol_version);
             // TODO: implement logging. For now, we'll just send
             // STDERR_LAST, which is good enough to get Nix respond to
             // us.
@@ -110,108 +115,3 @@ where
     primitive::write_u64(&mut conn.conn, worker_protocol::STDERR_LAST).await?;
     Ok(settings)
 }
-
-/// Performs the initial handshake. During the handshake, the client
-/// will first send a magic u64, to which the daemon needs to respond
-/// with another magic u64.
-///
-/// We then retrieve the client version, and discard a bunch of now
-/// obsolete data.
-#[instrument()]
-async fn perform_init_handshake<'a, R: 'a>(
-    mut conn: &'a mut R,
-) -> anyhow::Result<ClientConnection<&'a mut R>>
-where
-    &'a mut R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
-{
-    let worker_magic_1 = primitive::read_u64(&mut conn).await?;
-    debug!("Hello read");
-    if worker_magic_1 != worker_protocol::WORKER_MAGIC_1 {
-        Err(anyhow!(
-            "Invalid client hello received: {:?}, expected {:?}",
-            worker_magic_1,
-            worker_protocol::WORKER_MAGIC_1
-        ))
-    } else {
-        primitive::write_u64(&mut conn, worker_protocol::WORKER_MAGIC_2).await?;
-        primitive::write_u64(&mut conn, worker_protocol::PROTOCOL_VERSION).await?;
-        conn.flush().await?;
-        debug!("Hello responded");
-        let client_version = primitive::read_u64(&mut conn).await?;
-        debug!("Version read");
-        if client_version < 0x10a {
-            return Err(anyhow!("The nix client version is too old"));
-        }
-        let protocol_minor = client_version & 0x00ff;
-        let protocol_major = client_version & 0xff00;
-        debug!(client.version = %client_version, client.minor = %protocol_minor, client.major = %protocol_major);
-        if protocol_minor >= 14 {
-            debug!("read cpu affinity");
-            // Obsolete CPU affinity.
-            let read_affinity = primitive::read_u64(&mut conn).await?;
-            if read_affinity != 0 {
-                skip_8_bytes(&mut conn).await?;
-            };
-        }
-        if protocol_minor >= 11 {
-            // Obsolete reserveSpace
-            debug!("read reservespace");
-            skip_8_bytes(&mut conn).await?;
-        }
-        if protocol_minor >= 33 {
-            // Nix version. We're plain lying, we're not Nix, but eh…
-            // Setting it to the 2.3 lineage. Not 100% sure this is a
-            // good idea.
-            debug!("write version");
-            // Plain str padded to 64 bits.
-            bytes::write_bytes(&mut conn, "2.3.17").await?;
-            conn.flush().await?;
-        }
-        if protocol_minor >= 35 {
-            worker_protocol::write_worker_trust_level(&mut conn, worker_protocol::Trust::Trusted)
-                .await?;
-            info!("Trust sent");
-        }
-        Ok(ClientConnection {
-            conn,
-            version_minor: protocol_minor,
-            client_settings: None,
-        })
-    }
-}
-
-async fn skip_8_bytes<R>(conn: &mut R) -> anyhow::Result<()>
-where
-    R: AsyncReadExt + Unpin + std::fmt::Debug,
-{
-    let mut _discard_buffer = [0; 8];
-    conn.read_exact(&mut _discard_buffer).await?;
-    Ok(())
-}
-
-#[cfg(test)]
-mod integration_tests {
-    use nix_compat::wire::worker_protocol;
-    #[tokio::test]
-    async fn test_init_handshake() {
-        let mut test_conn = tokio_test::io::Builder::new()
-            .read(&worker_protocol::WORKER_MAGIC_1.to_le_bytes())
-            .write(&worker_protocol::WORKER_MAGIC_2.to_le_bytes())
-            .write(&worker_protocol::PROTOCOL_VERSION.to_le_bytes())
-            // Let's say the client is in sync with the daemon
-            // protocol-wise
-            .read(&worker_protocol::PROTOCOL_VERSION.to_le_bytes())
-            // cpu affinity
-            .read(&vec![0; 8])
-            // reservespace
-            .read(&vec![0; 8])
-            // version (size)
-            .write(&vec![0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
-            // version (data == 2.2.17 + padding)
-            .write(&vec![50, 46, 51, 46, 49, 55, 0, 0])
-            // Trusted (1 == client trusted
-            .write(&vec![1, 0, 0, 0, 0, 0, 0, 0])
-            .build();
-        crate::perform_init_handshake(&mut test_conn).await.unwrap();
-    }
-}