about summary refs log tree commit diff
path: root/tvix/nix-compat/src/nix_daemon/handler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/nix-compat/src/nix_daemon/handler.rs')
-rw-r--r--tvix/nix-compat/src/nix_daemon/handler.rs69
1 files changed, 58 insertions, 11 deletions
diff --git a/tvix/nix-compat/src/nix_daemon/handler.rs b/tvix/nix-compat/src/nix_daemon/handler.rs
index 65c5c2d60d08..4f43612114d8 100644
--- a/tvix/nix-compat/src/nix_daemon/handler.rs
+++ b/tvix/nix-compat/src/nix_daemon/handler.rs
@@ -1,4 +1,4 @@
-use std::{future::Future, sync::Arc};
+use std::{future::Future, ops::DerefMut, sync::Arc};
 
 use bytes::Bytes;
 use tokio::{
@@ -8,7 +8,8 @@ use tokio::{
 use tracing::{debug, warn};
 
 use super::{
-    types::QueryValidPaths,
+    framing::{NixFramedReader, StderrReadFramedReader},
+    types::{AddToStoreNarRequest, QueryValidPaths},
     worker_protocol::{server_handshake_client, ClientSettings, Operation, Trust, STDERR_LAST},
     NixDaemonIO,
 };
@@ -120,7 +121,7 @@ where
                 Ok(operation) => match operation {
                     Operation::IsValidPath => {
                         let path: StorePath<String> = self.reader.read_value().await?;
-                        self.handle(io.is_valid_path(&path)).await?
+                        Self::handle(&self.writer, io.is_valid_path(&path)).await?
                     }
                     // Note this operation does not currently delegate to NixDaemonIO,
                     // The general idea is that we will pass relevant ClientSettings
@@ -128,23 +129,23 @@ where
                     // For now we just store the settings in the NixDaemon for future use.
                     Operation::SetOptions => {
                         self.client_settings = self.reader.read_value().await?;
-                        self.handle(async { Ok(()) }).await?
+                        Self::handle(&self.writer, async { Ok(()) }).await?
                     }
                     Operation::QueryPathInfo => {
                         let path: StorePath<String> = self.reader.read_value().await?;
-                        self.handle(io.query_path_info(&path)).await?
+                        Self::handle(&self.writer, io.query_path_info(&path)).await?
                     }
                     Operation::QueryPathFromHashPart => {
                         let hash: Bytes = self.reader.read_value().await?;
-                        self.handle(io.query_path_from_hash_part(&hash)).await?
+                        Self::handle(&self.writer, io.query_path_from_hash_part(&hash)).await?
                     }
                     Operation::QueryValidPaths => {
                         let query: QueryValidPaths = self.reader.read_value().await?;
-                        self.handle(io.query_valid_paths(&query)).await?
+                        Self::handle(&self.writer, io.query_valid_paths(&query)).await?
                     }
                     Operation::QueryValidDerivers => {
                         let path: StorePath<String> = self.reader.read_value().await?;
-                        self.handle(io.query_valid_derivers(&path)).await?
+                        Self::handle(&self.writer, io.query_valid_derivers(&path)).await?
                     }
                     // FUTUREWORK: These are just stubs that return an empty list.
                     // It's important not to return an error for the local-overlay:// store
@@ -154,7 +155,7 @@ where
                     // invariants.
                     Operation::QueryReferrers | Operation::QueryRealisation => {
                         let _: String = self.reader.read_value().await?;
-                        self.handle(async move {
+                        Self::handle(&self.writer, async move {
                             warn!(
                                 ?operation,
                                 "This operation is not implemented. Returning empty result..."
@@ -163,6 +164,41 @@ where
                         })
                         .await?
                     }
+                    Operation::AddToStoreNar => {
+                        let request: AddToStoreNarRequest = self.reader.read_value().await?;
+                        let minor_version = self.protocol_version.minor();
+                        match minor_version {
+                            ..21 => {
+                                // Before protocol version 1.21, the nar is sent unframed, so we just
+                                // pass the reader directly to the operation.
+                                Self::handle(
+                                    &self.writer,
+                                    self.io.add_to_store_nar(request, &mut self.reader),
+                                )
+                                .await?
+                            }
+                            21..23 => {
+                                // Protocol versions 1.21 .. 1.23 use STDERR_READ protocol, see logging.md#stderr_read.
+                                Self::handle(&self.writer, async {
+                                    let mut writer = self.writer.lock().await;
+                                    let mut reader = StderrReadFramedReader::new(
+                                        &mut self.reader,
+                                        writer.deref_mut(),
+                                    );
+                                    self.io.add_to_store_nar(request, &mut reader).await
+                                })
+                                .await?
+                            }
+                            23.. => {
+                                // Starting at protocol version 1.23, the framed protocol is used, see serialization.md#framed
+                                let mut framed = NixFramedReader::new(&mut self.reader);
+                                Self::handle(&self.writer, async {
+                                    self.io.add_to_store_nar(request, &mut framed).await
+                                })
+                                .await?
+                            }
+                        }
+                    }
                     _ => {
                         return Err(std::io::Error::other(format!(
                             "Operation {operation:?} is not implemented"
@@ -188,14 +224,14 @@ where
     /// This is a helper method, awaiting on the passed in future and then
     /// handling log lines/activities as described above.
     async fn handle<T>(
-        &mut self,
+        writer: &Arc<Mutex<NixWriter<WriteHalf<RW>>>>,
         future: impl Future<Output = std::io::Result<T>>,
     ) -> Result<(), std::io::Error>
     where
         T: NixSerialize + Send,
     {
         let result = future.await;
-        let mut writer = self.writer.lock().await;
+        let mut writer = writer.lock().await;
 
         match result {
             Ok(r) => {
@@ -244,6 +280,17 @@ mod tests {
         ) -> Result<Option<UnkeyedValidPathInfo>> {
             Ok(None)
         }
+
+        async fn add_to_store_nar<R>(
+            &self,
+            _request: crate::nix_daemon::types::AddToStoreNarRequest,
+            _reader: &mut R,
+        ) -> Result<()>
+        where
+            R: tokio::io::AsyncRead + Send + Unpin,
+        {
+            Ok(())
+        }
     }
 
     #[tokio::test]