From c35a5ff611eed94c4cf32de4e26baca4fe38889e Mon Sep 17 00:00:00 2001 From: Picnoir Date: Thu, 21 Mar 2024 09:53:23 +0100 Subject: feat(users/picnoir/tvix-daemon): parse up to the operation Using all the primitives recently implemented to nix-compat to reach the point where the Nix client start to send us operation requests. Using a small integration test script (or the VM test, but let's face it, it's too slow to be useful), we manage to reach the point where we're able to read a store operation: 2024-03-21T18:53:27.624876Z INFO tvix_daemon: Incoming connection addr=unix 2024-03-21T18:53:27.625312Z INFO worker:perform_init_handshake: tvix_daemon: Trust sent conn=Connection(unix) conn=Connection(unix) 2024-03-21T18:53:27.625406Z INFO worker: tvix_daemon: Client hanshake succeeded conn=Connection(unix) 2024-03-21T18:53:27.625488Z INFO worker: tvix_daemon: Operation received op=SetOptions conn=Connection(unix) We had to take some shortcuts wrt. stderr/log management. The CPP Nix codebase is a bit confusing in that area. I'll need to spend more time reading this to fully understand what's happening there. For now, sending the STDERR_LAST command to the client does the trick. Change-Id: I9b0e20a52d885e64fe29188496aac5334de61edd Reviewed-on: https://cl.tvl.fyi/c/depot/+/11233 Tested-by: BuildkiteCI Reviewed-by: flokli --- tvix/nix-compat/src/wire/worker_protocol.rs | 2 ++ users/picnoir/tvix-daemon/src/main.rs | 43 +++++++++++++++++++++-------- 2 files changed, 34 insertions(+), 11 deletions(-) diff --git a/tvix/nix-compat/src/wire/worker_protocol.rs b/tvix/nix-compat/src/wire/worker_protocol.rs index 3d891068eab7..ed47aedbca46 100644 --- a/tvix/nix-compat/src/wire/worker_protocol.rs +++ b/tvix/nix-compat/src/wire/worker_protocol.rs @@ -6,6 +6,8 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use crate::wire::primitive; +pub static STDERR_LAST: u64 = 0x616c7473; + /// Worker Operation /// /// These operations are encoded as unsigned 64 bits before being sent diff --git a/users/picnoir/tvix-daemon/src/main.rs b/users/picnoir/tvix-daemon/src/main.rs index 87947cee31df..be03f95a5eb9 100644 --- a/users/picnoir/tvix-daemon/src/main.rs +++ b/users/picnoir/tvix-daemon/src/main.rs @@ -4,7 +4,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_listener::{self, SystemOptions, UserOptions}; use tracing::{debug, error, info, instrument}; -use nix_compat::wire::primitive; +use nix_compat::wire::{bytes, primitive, worker_protocol}; #[derive(Parser, Debug)] struct Cli { @@ -26,13 +26,17 @@ async fn main() { .parse() .expect("Invalid listening socket address"); let system_options: SystemOptions = Default::default(); - let user_options: UserOptions = Default::default(); + let mut user_options: UserOptions = Default::default(); + user_options.recv_buffer_size = Some(1024); + user_options.send_buffer_size = Some(1024); + info!(user_options.send_buffer_size); + info!(user_options.recv_buffer_size); let mut listener = tokio_listener::Listener::bind(&addr, &system_options, &user_options) .await .unwrap(); - info!("Listening for incoming connections on {:?}", listener); + info!(listener_address = ?listener, "Listening for incoming connections"); while let Ok((conn, addr)) = listener.accept().await { - info!("Incoming connection from {addr}"); + info!(addr = %addr, "Incoming connection"); tokio::spawn(async move { worker(conn).await }); } } @@ -46,9 +50,16 @@ where { match perform_init_handshake(&mut conn).await { Ok(_) => { - // TODO: process request here, dispatch to operation - // handler. - info!("Handshake done, bye now"); + info!("Client hanshake succeeded"); + // TODO: implement logging. For now, we'll just send + // STDERR_LAST, which is good enough to get Nix respond to + // us. + primitive::write_u64(&mut conn, worker_protocol::STDERR_LAST) + .await + .unwrap(); + // + let op = worker_protocol::read_op(&mut conn).await.unwrap(); + info!(op = ?op, "Operation received"); } Err(e) => error!("Client handshake failed: {}", e), } @@ -106,7 +117,13 @@ where // good idea. debug!("write version"); // Plain str padded to 64 bits. - conn.write(&"2.3.17\0\0".as_bytes()).await?; + bytes::write_bytes(&mut conn, "2.3.17".as_bytes()).await?; + conn.flush().await?; + } + if protocol_minor >= 35 { + worker_protocol::write_worker_trust_level(&mut conn, worker_protocol::Trust::Trusted) + .await?; + info!("Trust sent"); } Ok(()) } @@ -137,9 +154,13 @@ mod integration_tests { .read(&vec![0; 8]) // reservespace .read(&vec![0; 8]) - // version - .write(&"2.3.17\0\0".as_bytes()) + // version (size) + .write(&vec![0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + // version (data == 2.2.17 + padding) + .write(&vec![50, 46, 51, 46, 49, 55, 0, 0]) + // Trusted (1 == client trusted + .write(&vec![1, 0, 0, 0, 0, 0, 0, 0]) .build(); - crate::worker(&mut test_conn).await; + crate::perform_init_handshake(&mut test_conn).await.unwrap(); } } -- cgit 1.4.1