about summary refs log tree commit diff
diff options
context:
space:
mode:
authorPicnoir <picnoir@alternativebit.fr>2024-04-05T09·35+0200
committerpicnoir picnoir <picnoir@alternativebit.fr>2024-04-07T20·38+0000
commitefd8ab5e0b595648acad8a96d0b4f0d49fddee8f (patch)
treecf21ab0b24801c9d7a578726f30a6dbd6ef585e9
parent199f9b0a79de0fb0fd57ce9307b36390339ee7e7 (diff)
feat(users/picnoir/tvix-daemon): implement set_options operation r/7868
The protocol is more stateful than I initially thought. We need to
keep track to a bunch of things, including but not limited to: the
client settings, the client version. I moved things around a bit to
keep this state along with the client socket.

Change-Id: Ibd34fbe7821c20a460934ea1af0719f5de46e491
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11359
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
-rw-r--r--users/picnoir/tvix-daemon/src/main.rs61
1 files changed, 52 insertions, 9 deletions
diff --git a/users/picnoir/tvix-daemon/src/main.rs b/users/picnoir/tvix-daemon/src/main.rs
index 807b4791fe38..595991b31e02 100644
--- a/users/picnoir/tvix-daemon/src/main.rs
+++ b/users/picnoir/tvix-daemon/src/main.rs
@@ -4,7 +4,10 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
 use tokio_listener::{self, SystemOptions, UserOptions};
 use tracing::{debug, error, info, instrument, Level};
 
-use nix_compat::wire::{bytes, primitive, worker_protocol};
+use nix_compat::wire::{
+    bytes, primitive,
+    worker_protocol::{self, ClientSettings},
+};
 
 #[derive(Parser, Debug)]
 struct Cli {
@@ -16,6 +19,15 @@ struct Cli {
     verbosity: Option<Level>,
 }
 
+/// Structure used to hold the client socket connection and some
+/// metadata about the connection.
+#[derive(Debug)]
+struct ClientConnection<R: AsyncReadExt + AsyncWriteExt + Unpin> {
+    conn: R,
+    version_minor: u64,
+    client_settings: Option<ClientSettings>,
+}
+
 #[tokio::main]
 #[instrument()]
 async fn main() {
@@ -58,22 +70,47 @@ where
     R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
 {
     match perform_init_handshake(&mut conn).await {
-        Ok(_) => {
-            info!("Client hanshake succeeded");
+        Ok(mut client_connection) => {
+            debug!("Client hanshake succeeded");
             // TODO: implement logging. For now, we'll just send
             // STDERR_LAST, which is good enough to get Nix respond to
             // us.
-            primitive::write_u64(&mut conn, worker_protocol::STDERR_LAST)
+            primitive::write_u64(&mut client_connection.conn, worker_protocol::STDERR_LAST)
                 .await
                 .unwrap();
-            //
-            let op = worker_protocol::read_op(&mut conn).await.unwrap();
-            info!(op = ?op, "Operation received");
+            loop {
+                let op = worker_protocol::read_op(&mut client_connection.conn)
+                    .await
+                    .unwrap();
+                match op {
+                    worker_protocol::Operation::SetOptions => {
+                        let settings = op_set_options(&mut client_connection).await.unwrap();
+                        client_connection.client_settings = Some(settings);
+                        debug!(settings = ?client_connection.client_settings, "Received client settings");
+                    }
+                    _ => {
+                        error!(op = ?op, "Unimplemented operation");
+                        break;
+                    }
+                }
+            }
         }
         Err(e) => error!("Client handshake failed: {}", e),
     }
 }
 
+async fn op_set_options<R>(conn: &mut ClientConnection<R>) -> std::io::Result<ClientSettings>
+where
+    R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
+{
+    let settings =
+        worker_protocol::read_client_settings(&mut conn.conn, conn.version_minor).await?;
+    // The client expects us to send some logs when we're processing
+    // the settings. Sending STDERR_LAST signal we're done processing.
+    primitive::write_u64(&mut conn.conn, worker_protocol::STDERR_LAST).await?;
+    Ok(settings)
+}
+
 /// Performs the initial handshake. During the handshake, the client
 /// will first send a magic u64, to which the daemon needs to respond
 /// with another magic u64.
@@ -81,7 +118,9 @@ where
 /// We then retrieve the client version, and discard a bunch of now
 /// obsolete data.
 #[instrument()]
-async fn perform_init_handshake<'a, R: 'a>(mut conn: &'a mut R) -> anyhow::Result<()>
+async fn perform_init_handshake<'a, R: 'a>(
+    mut conn: &'a mut R,
+) -> anyhow::Result<ClientConnection<&'a mut R>>
 where
     &'a mut R: AsyncReadExt + AsyncWriteExt + Unpin + std::fmt::Debug,
 {
@@ -134,7 +173,11 @@ where
                 .await?;
             info!("Trust sent");
         }
-        Ok(())
+        Ok(ClientConnection {
+            conn,
+            version_minor: protocol_minor,
+            client_settings: None,
+        })
     }
 }