about summary refs log tree commit diff
diff options
context:
space:
mode:
authorPicnoir <picnoir@alternativebit.fr>2024-03-21T08·53+0100
committerpicnoir picnoir <picnoir@alternativebit.fr>2024-04-03T11·32+0000
commitc35a5ff611eed94c4cf32de4e26baca4fe38889e (patch)
treebf2e369d03cf9c1ebfa9daec11c4d42ab702eff2
parent08cc27cc20d88d1df034e5cb384bdfe694ef5295 (diff)
feat(users/picnoir/tvix-daemon): parse up to the operation r/7845
Using all the primitives recently implemented to nix-compat to reach
the point where the Nix client start to send us operation requests.

Using a small integration test script (or the VM test, but let's face
it, it's too slow to be useful), we manage to reach the point where
we're able to read a store operation:

    2024-03-21T18:53:27.624876Z  INFO tvix_daemon: Incoming connection addr=unix
    2024-03-21T18:53:27.625312Z  INFO worker:perform_init_handshake: tvix_daemon: Trust sent conn=Connection(unix) conn=Connection(unix)
    2024-03-21T18:53:27.625406Z  INFO worker: tvix_daemon: Client hanshake succeeded conn=Connection(unix)
    2024-03-21T18:53:27.625488Z  INFO worker: tvix_daemon: Operation received op=SetOptions conn=Connection(unix)

We had to take some shortcuts wrt. stderr/log management. The CPP Nix
codebase is a bit confusing in that area. I'll need to spend more time
reading this to fully understand what's happening there. For now,
sending the STDERR_LAST command to the client does the trick.

Change-Id: I9b0e20a52d885e64fe29188496aac5334de61edd
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11233
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
-rw-r--r--tvix/nix-compat/src/wire/worker_protocol.rs2
-rw-r--r--users/picnoir/tvix-daemon/src/main.rs43
2 files changed, 34 insertions, 11 deletions
diff --git a/tvix/nix-compat/src/wire/worker_protocol.rs b/tvix/nix-compat/src/wire/worker_protocol.rs
index 3d891068eab7..ed47aedbca46 100644
--- a/tvix/nix-compat/src/wire/worker_protocol.rs
+++ b/tvix/nix-compat/src/wire/worker_protocol.rs
@@ -6,6 +6,8 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
 
 use crate::wire::primitive;
 
+pub static STDERR_LAST: u64 = 0x616c7473;
+
 /// Worker Operation
 ///
 /// These operations are encoded as unsigned 64 bits before being sent
diff --git a/users/picnoir/tvix-daemon/src/main.rs b/users/picnoir/tvix-daemon/src/main.rs
index 87947cee31df..be03f95a5eb9 100644
--- a/users/picnoir/tvix-daemon/src/main.rs
+++ b/users/picnoir/tvix-daemon/src/main.rs
@@ -4,7 +4,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
 use tokio_listener::{self, SystemOptions, UserOptions};
 use tracing::{debug, error, info, instrument};
 
-use nix_compat::wire::primitive;
+use nix_compat::wire::{bytes, primitive, worker_protocol};
 
 #[derive(Parser, Debug)]
 struct Cli {
@@ -26,13 +26,17 @@ async fn main() {
         .parse()
         .expect("Invalid listening socket address");
     let system_options: SystemOptions = Default::default();
-    let user_options: UserOptions = Default::default();
+    let mut user_options: UserOptions = Default::default();
+    user_options.recv_buffer_size = Some(1024);
+    user_options.send_buffer_size = Some(1024);
+    info!(user_options.send_buffer_size);
+    info!(user_options.recv_buffer_size);
     let mut listener = tokio_listener::Listener::bind(&addr, &system_options, &user_options)
         .await
         .unwrap();
-    info!("Listening for incoming connections on {:?}", listener);
+    info!(listener_address = ?listener, "Listening for incoming connections");
     while let Ok((conn, addr)) = listener.accept().await {
-        info!("Incoming connection from {addr}");
+        info!(addr = %addr, "Incoming connection");
         tokio::spawn(async move { worker(conn).await });
     }
 }
@@ -46,9 +50,16 @@ where
 {
     match perform_init_handshake(&mut conn).await {
         Ok(_) => {
-            // TODO: process request here, dispatch to operation
-            // handler.
-            info!("Handshake done, bye now");
+            info!("Client hanshake succeeded");
+            // TODO: implement logging. For now, we'll just send
+            // STDERR_LAST, which is good enough to get Nix respond to
+            // us.
+            primitive::write_u64(&mut conn, worker_protocol::STDERR_LAST)
+                .await
+                .unwrap();
+            //
+            let op = worker_protocol::read_op(&mut conn).await.unwrap();
+            info!(op = ?op, "Operation received");
         }
         Err(e) => error!("Client handshake failed: {}", e),
     }
@@ -106,7 +117,13 @@ where
             // good idea.
             debug!("write version");
             // Plain str padded to 64 bits.
-            conn.write(&"2.3.17\0\0".as_bytes()).await?;
+            bytes::write_bytes(&mut conn, "2.3.17".as_bytes()).await?;
+            conn.flush().await?;
+        }
+        if protocol_minor >= 35 {
+            worker_protocol::write_worker_trust_level(&mut conn, worker_protocol::Trust::Trusted)
+                .await?;
+            info!("Trust sent");
         }
         Ok(())
     }
@@ -137,9 +154,13 @@ mod integration_tests {
             .read(&vec![0; 8])
             // reservespace
             .read(&vec![0; 8])
-            // version
-            .write(&"2.3.17\0\0".as_bytes())
+            // version (size)
+            .write(&vec![0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
+            // version (data == 2.2.17 + padding)
+            .write(&vec![50, 46, 51, 46, 49, 55, 0, 0])
+            // Trusted (1 == client trusted
+            .write(&vec![1, 0, 0, 0, 0, 0, 0, 0])
             .build();
-        crate::worker(&mut test_conn).await;
+        crate::perform_init_handshake(&mut test_conn).await.unwrap();
     }
 }