diff options
author | Picnoir <picnoir@alternativebit.fr> | 2024-04-05T09·35+0200 |
---|---|---|
committer | picnoir picnoir <picnoir@alternativebit.fr> | 2024-04-07T20·38+0000 |
commit | efd8ab5e0b595648acad8a96d0b4f0d49fddee8f (patch) | |
tree | cf21ab0b24801c9d7a578726f30a6dbd6ef585e9 /users | |
parent | 199f9b0a79de0fb0fd57ce9307b36390339ee7e7 (diff) |
feat(users/picnoir/tvix-daemon): implement set_options operation r/7868
The protocol is more stateful than I initially thought. We need to keep track to a bunch of things, including but not limited to: the client settings, the client version. I moved things around a bit to keep this state along with the client socket. Change-Id: Ibd34fbe7821c20a460934ea1af0719f5de46e491 Reviewed-on: https://cl.tvl.fyi/c/depot/+/11359 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'users')
-rw-r--r-- | users/picnoir/tvix-daemon/src/main.rs | 61 |
1 files changed, 52 insertions, 9 deletions
diff --git a/users/picnoir/tvix-daemon/src/main.rs b/users/picnoir/tvix-daemon/src/main.rs index 807b4791fe38..595991b31e02 100644 --- a/users/picnoir/tvix-daemon/src/main.rs +++ b/users/picnoir/tvix-daemon/src/main.rs @@ -4,7 +4,10 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_listener::{self, SystemOptions, UserOptions}; use tracing::{debug, error, info, instrument, Level}; -use nix_compat::wire::{bytes, primitive, worker_protocol}; +use nix_compat::wire::{ + bytes, primitive, + worker_protocol::{self, ClientSettings}, +}; #[derive(Parser, Debug)] struct Cli { @@ -16,6 +19,15 @@ struct Cli { verbosity: Option<Level>, } +/// Structure used to hold the client socket connection and some +/// metadata about the connection. +#[derive(Debug)] +struct ClientConnection<R: AsyncReadExt + AsyncWriteExt + Unpin> { + conn: R, + version_minor: u64, + client_settings: Option<ClientSettings>, +} + #[tokio::main] #[instrument()] async fn main() { @@ -58,22 +70,47 @@ where R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug, { match perform_init_handshake(&mut conn).await { - Ok(_) => { - info!("Client hanshake succeeded"); + Ok(mut client_connection) => { + debug!("Client hanshake succeeded"); // TODO: implement logging. For now, we'll just send // STDERR_LAST, which is good enough to get Nix respond to // us. - primitive::write_u64(&mut conn, worker_protocol::STDERR_LAST) + primitive::write_u64(&mut client_connection.conn, worker_protocol::STDERR_LAST) .await .unwrap(); - // - let op = worker_protocol::read_op(&mut conn).await.unwrap(); - info!(op = ?op, "Operation received"); + loop { + let op = worker_protocol::read_op(&mut client_connection.conn) + .await + .unwrap(); + match op { + worker_protocol::Operation::SetOptions => { + let settings = op_set_options(&mut client_connection).await.unwrap(); + client_connection.client_settings = Some(settings); + debug!(settings = ?client_connection.client_settings, "Received client settings"); + } + _ => { + error!(op = ?op, "Unimplemented operation"); + break; + } + } + } } Err(e) => error!("Client handshake failed: {}", e), } } +async fn op_set_options<R>(conn: &mut ClientConnection<R>) -> std::io::Result<ClientSettings> +where + R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug, +{ + let settings = + worker_protocol::read_client_settings(&mut conn.conn, conn.version_minor).await?; + // The client expects us to send some logs when we're processing + // the settings. Sending STDERR_LAST signal we're done processing. + primitive::write_u64(&mut conn.conn, worker_protocol::STDERR_LAST).await?; + Ok(settings) +} + /// Performs the initial handshake. During the handshake, the client /// will first send a magic u64, to which the daemon needs to respond /// with another magic u64. @@ -81,7 +118,9 @@ where /// We then retrieve the client version, and discard a bunch of now /// obsolete data. #[instrument()] -async fn perform_init_handshake<'a, R: 'a>(mut conn: &'a mut R) -> anyhow::Result<()> +async fn perform_init_handshake<'a, R: 'a>( + mut conn: &'a mut R, +) -> anyhow::Result<ClientConnection<&'a mut R>> where &'a mut R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug, { @@ -134,7 +173,11 @@ where .await?; info!("Trust sent"); } - Ok(()) + Ok(ClientConnection { + conn, + version_minor: protocol_minor, + client_settings: None, + }) } } |