diff options
Diffstat (limited to 'users/picnoir/tvix-daemon/src/main.rs')
-rw-r--r-- | users/picnoir/tvix-daemon/src/main.rs | 116 |
1 files changed, 116 insertions, 0 deletions
diff --git a/users/picnoir/tvix-daemon/src/main.rs b/users/picnoir/tvix-daemon/src/main.rs new file mode 100644 index 0000000000..dc49b209e0 --- /dev/null +++ b/users/picnoir/tvix-daemon/src/main.rs @@ -0,0 +1,116 @@ +use clap::Parser; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio_listener::{self, SystemOptions, UserOptions}; +use tracing::{debug, error, info, instrument, Level}; + +use nix_compat::worker_protocol::{self, server_handshake_client, ClientSettings, Trust}; +use nix_compat::ProtocolVersion; + +#[derive(Parser, Debug)] +struct Cli { + /// Listening unix socket path + #[arg(short, long)] + socket: Option<String>, + /// Log verbosity level. Can be "error", "warn", "info", "debug", "trace", or a number 1-5 + #[arg(short, long, env)] + verbosity: Option<Level>, +} + +#[tokio::main] +#[instrument()] +async fn main() { + let args = Cli::parse(); + tracing_subscriber::fmt() + .compact() + .with_max_level( + args.verbosity + .unwrap_or_else(|| panic!("Can't parse log verbosity")), + ) + .try_init() + .unwrap(); + info!("Started Tvix daemon"); + let addr = args + .socket + .unwrap_or_else(|| "sd_listen_unix".to_string()) + .parse() + .expect("Invalid listening socket address"); + let system_options: SystemOptions = Default::default(); + let mut user_options: UserOptions = Default::default(); + user_options.recv_buffer_size = Some(1024); + user_options.send_buffer_size = Some(1024); + info!(user_options.send_buffer_size); + info!(user_options.recv_buffer_size); + let mut listener = tokio_listener::Listener::bind(&addr, &system_options, &user_options) + .await + .unwrap(); + info!(listener_address = ?listener, "Listening for incoming connections"); + while let Ok((conn, addr)) = listener.accept().await { + info!(addr = %addr, "Incoming connection"); + tokio::spawn(async move { worker(conn).await }); + } +} + +/// Structure used to hold the client socket connection and some +/// metadata about the connection. +#[derive(Debug)] +struct ClientConnection<R: AsyncReadExt + AsyncWriteExt + Unpin> { + pub conn: R, + pub version: ProtocolVersion, + pub client_settings: Option<ClientSettings>, +} + +/// Worker in charge to respond a Nix client using the Nix wire +/// protocol. +#[instrument()] +async fn worker<R>(mut conn: R) +where + R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug, +{ + match server_handshake_client(&mut conn, "2.18.2", Trust::Trusted).await { + Ok(client_protocol_version) => { + let mut client_connection = ClientConnection { + conn, + version: client_protocol_version, + client_settings: None, + }; + debug!("Client hanshake succeeded"); + debug!(client_protocol_version = ?client_protocol_version); + // TODO: implement logging. For now, we'll just send + // STDERR_LAST, which is good enough to get Nix respond to + // us. + client_connection + .conn + .write_u64_le(worker_protocol::STDERR_LAST) + .await + .unwrap(); + loop { + let op = worker_protocol::read_op(&mut client_connection.conn) + .await + .unwrap(); + match op { + worker_protocol::Operation::SetOptions => { + let settings = op_set_options(&mut client_connection).await.unwrap(); + client_connection.client_settings = Some(settings); + debug!(settings = ?client_connection.client_settings, "Received client settings"); + } + _ => { + error!(op = ?op, "Unimplemented operation"); + break; + } + } + } + } + Err(e) => error!("Client handshake failed: {}", e), + } +} + +async fn op_set_options<R>(conn: &mut ClientConnection<R>) -> std::io::Result<ClientSettings> +where + R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug, +{ + let settings = worker_protocol::read_client_settings(&mut conn.conn, conn.version).await?; + // The client expects us to send some logs when we're processing + // the settings. Sending STDERR_LAST signal we're done processing. + conn.conn.write_u64_le(worker_protocol::STDERR_LAST).await?; + Ok(settings) +} |