about summary refs log tree commit diff
path: root/tvix/nix-compat
diff options
context:
space:
mode:
authorVova Kryachko <v.kryachko@gmail.com>2024-11-24T18·33-0500
committerVladimir Kryachko <v.kryachko@gmail.com>2024-11-24T22·21+0000
commite9acde3c42f56e21e4cdacdf2386b62cbb9032e4 (patch)
treee0057807beb964c3843e49a065081bb492e144f5 /tvix/nix-compat
parent8ef9ba82a8b15312b4ddd16c030124ec1fd685a4 (diff)
feat(tvix/nix-daemon): New operation AddToStoreNar r/8964
This operation is particularly used when invoking the following
nix commands:

```
nix-store --add-fixed some-path
nix-store --add-fixed --recursive some-path
```

Change-Id: I0f9b129c838c00e10415881f1e6e0d7bc1d7a3a6
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12800
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/nix-compat')
-rw-r--r--tvix/nix-compat/src/nix_daemon/handler.rs69
-rw-r--r--tvix/nix-compat/src/nix_daemon/mod.rs22
-rw-r--r--tvix/nix-compat/src/nix_daemon/types.rs130
3 files changed, 209 insertions, 12 deletions
diff --git a/tvix/nix-compat/src/nix_daemon/handler.rs b/tvix/nix-compat/src/nix_daemon/handler.rs
index 65c5c2d60d08..4f43612114d8 100644
--- a/tvix/nix-compat/src/nix_daemon/handler.rs
+++ b/tvix/nix-compat/src/nix_daemon/handler.rs
@@ -1,4 +1,4 @@
-use std::{future::Future, sync::Arc};
+use std::{future::Future, ops::DerefMut, sync::Arc};
 
 use bytes::Bytes;
 use tokio::{
@@ -8,7 +8,8 @@ use tokio::{
 use tracing::{debug, warn};
 
 use super::{
-    types::QueryValidPaths,
+    framing::{NixFramedReader, StderrReadFramedReader},
+    types::{AddToStoreNarRequest, QueryValidPaths},
     worker_protocol::{server_handshake_client, ClientSettings, Operation, Trust, STDERR_LAST},
     NixDaemonIO,
 };
@@ -120,7 +121,7 @@ where
                 Ok(operation) => match operation {
                     Operation::IsValidPath => {
                         let path: StorePath<String> = self.reader.read_value().await?;
-                        self.handle(io.is_valid_path(&path)).await?
+                        Self::handle(&self.writer, io.is_valid_path(&path)).await?
                     }
                     // Note this operation does not currently delegate to NixDaemonIO,
                     // The general idea is that we will pass relevant ClientSettings
@@ -128,23 +129,23 @@ where
                     // For now we just store the settings in the NixDaemon for future use.
                     Operation::SetOptions => {
                         self.client_settings = self.reader.read_value().await?;
-                        self.handle(async { Ok(()) }).await?
+                        Self::handle(&self.writer, async { Ok(()) }).await?
                     }
                     Operation::QueryPathInfo => {
                         let path: StorePath<String> = self.reader.read_value().await?;
-                        self.handle(io.query_path_info(&path)).await?
+                        Self::handle(&self.writer, io.query_path_info(&path)).await?
                     }
                     Operation::QueryPathFromHashPart => {
                         let hash: Bytes = self.reader.read_value().await?;
-                        self.handle(io.query_path_from_hash_part(&hash)).await?
+                        Self::handle(&self.writer, io.query_path_from_hash_part(&hash)).await?
                     }
                     Operation::QueryValidPaths => {
                         let query: QueryValidPaths = self.reader.read_value().await?;
-                        self.handle(io.query_valid_paths(&query)).await?
+                        Self::handle(&self.writer, io.query_valid_paths(&query)).await?
                     }
                     Operation::QueryValidDerivers => {
                         let path: StorePath<String> = self.reader.read_value().await?;
-                        self.handle(io.query_valid_derivers(&path)).await?
+                        Self::handle(&self.writer, io.query_valid_derivers(&path)).await?
                     }
                     // FUTUREWORK: These are just stubs that return an empty list.
                     // It's important not to return an error for the local-overlay:// store
@@ -154,7 +155,7 @@ where
                     // invariants.
                     Operation::QueryReferrers | Operation::QueryRealisation => {
                         let _: String = self.reader.read_value().await?;
-                        self.handle(async move {
+                        Self::handle(&self.writer, async move {
                             warn!(
                                 ?operation,
                                 "This operation is not implemented. Returning empty result..."
@@ -163,6 +164,41 @@ where
                         })
                         .await?
                     }
+                    Operation::AddToStoreNar => {
+                        let request: AddToStoreNarRequest = self.reader.read_value().await?;
+                        let minor_version = self.protocol_version.minor();
+                        match minor_version {
+                            ..21 => {
+                                // Before protocol version 1.21, the nar is sent unframed, so we just
+                                // pass the reader directly to the operation.
+                                Self::handle(
+                                    &self.writer,
+                                    self.io.add_to_store_nar(request, &mut self.reader),
+                                )
+                                .await?
+                            }
+                            21..23 => {
+                                // Protocol versions 1.21 .. 1.23 use STDERR_READ protocol, see logging.md#stderr_read.
+                                Self::handle(&self.writer, async {
+                                    let mut writer = self.writer.lock().await;
+                                    let mut reader = StderrReadFramedReader::new(
+                                        &mut self.reader,
+                                        writer.deref_mut(),
+                                    );
+                                    self.io.add_to_store_nar(request, &mut reader).await
+                                })
+                                .await?
+                            }
+                            23.. => {
+                                // Starting at protocol version 1.23, the framed protocol is used, see serialization.md#framed
+                                let mut framed = NixFramedReader::new(&mut self.reader);
+                                Self::handle(&self.writer, async {
+                                    self.io.add_to_store_nar(request, &mut framed).await
+                                })
+                                .await?
+                            }
+                        }
+                    }
                     _ => {
                         return Err(std::io::Error::other(format!(
                             "Operation {operation:?} is not implemented"
@@ -188,14 +224,14 @@ where
     /// This is a helper method, awaiting on the passed in future and then
     /// handling log lines/activities as described above.
     async fn handle<T>(
-        &mut self,
+        writer: &Arc<Mutex<NixWriter<WriteHalf<RW>>>>,
         future: impl Future<Output = std::io::Result<T>>,
     ) -> Result<(), std::io::Error>
     where
         T: NixSerialize + Send,
     {
         let result = future.await;
-        let mut writer = self.writer.lock().await;
+        let mut writer = writer.lock().await;
 
         match result {
             Ok(r) => {
@@ -244,6 +280,17 @@ mod tests {
         ) -> Result<Option<UnkeyedValidPathInfo>> {
             Ok(None)
         }
+
+        async fn add_to_store_nar<R>(
+            &self,
+            _request: crate::nix_daemon::types::AddToStoreNarRequest,
+            _reader: &mut R,
+        ) -> Result<()>
+        where
+            R: tokio::io::AsyncRead + Send + Unpin,
+        {
+            Ok(())
+        }
     }
 
     #[tokio::test]
diff --git a/tvix/nix-compat/src/nix_daemon/mod.rs b/tvix/nix-compat/src/nix_daemon/mod.rs
index ce56934896be..b1fd15c04ed1 100644
--- a/tvix/nix-compat/src/nix_daemon/mod.rs
+++ b/tvix/nix-compat/src/nix_daemon/mod.rs
@@ -3,8 +3,9 @@ pub mod worker_protocol;
 use std::io::Result;
 
 use futures::future::try_join_all;
+use tokio::io::AsyncRead;
 use tracing::warn;
-use types::{QueryValidPaths, UnkeyedValidPathInfo};
+use types::{AddToStoreNarRequest, QueryValidPaths, UnkeyedValidPathInfo};
 
 use crate::store_path::StorePath;
 
@@ -60,6 +61,14 @@ pub trait NixDaemonIO: Sync {
             Ok(result)
         }
     }
+
+    fn add_to_store_nar<R>(
+        &self,
+        request: AddToStoreNarRequest,
+        reader: &mut R,
+    ) -> impl std::future::Future<Output = Result<()>> + Send
+    where
+        R: AsyncRead + Send + Unpin;
 }
 
 #[cfg(test)]
@@ -89,6 +98,17 @@ mod tests {
         ) -> std::io::Result<Option<UnkeyedValidPathInfo>> {
             Ok(None)
         }
+
+        async fn add_to_store_nar<R>(
+            &self,
+            _request: super::types::AddToStoreNarRequest,
+            _reader: &mut R,
+        ) -> std::io::Result<()>
+        where
+            R: tokio::io::AsyncRead + Send + Unpin,
+        {
+            Ok(())
+        }
     }
 
     #[tokio::test]
diff --git a/tvix/nix-compat/src/nix_daemon/types.rs b/tvix/nix-compat/src/nix_daemon/types.rs
index bf7b1e6f6e58..60d2304c9f7a 100644
--- a/tvix/nix-compat/src/nix_daemon/types.rs
+++ b/tvix/nix-compat/src/nix_daemon/types.rs
@@ -1,3 +1,4 @@
+use crate::wire::de::Error;
 use crate::{
     narinfo::Signature,
     nixhash::CAHash,
@@ -73,6 +74,21 @@ impl NixError {
 
 nix_compat_derive::nix_serialize_remote!(#[nix(display)] Signature<String>);
 
+impl NixDeserialize for Signature<String> {
+    async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
+    where
+        R: ?Sized + NixRead + Send,
+    {
+        let value: Option<String> = reader.try_read_value().await?;
+        match value {
+            Some(value) => Ok(Some(
+                Signature::<String>::parse(&value).map_err(R::Error::invalid_data)?,
+            )),
+            None => Ok(None),
+        }
+    }
+}
+
 impl NixSerialize for CAHash {
     async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
     where
@@ -94,6 +110,42 @@ impl NixSerialize for Option<CAHash> {
     }
 }
 
+impl NixDeserialize for CAHash {
+    async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
+    where
+        R: ?Sized + NixRead + Send,
+    {
+        let value: Option<String> = reader.try_read_value().await?;
+        match value {
+            Some(value) => Ok(Some(CAHash::from_nix_hex_str(&value).ok_or_else(|| {
+                R::Error::invalid_data(format!("Invalid cahash {}", value))
+            })?)),
+            None => Ok(None),
+        }
+    }
+}
+
+impl NixDeserialize for Option<CAHash> {
+    async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
+    where
+        R: ?Sized + NixRead + Send,
+    {
+        let value: Option<String> = reader.try_read_value().await?;
+        match value {
+            Some(value) => {
+                if value.is_empty() {
+                    Ok(None)
+                } else {
+                    Ok(Some(Some(CAHash::from_nix_hex_str(&value).ok_or_else(
+                        || R::Error::invalid_data(format!("Invalid cahash {}", value)),
+                    )?)))
+                }
+            }
+            None => Ok(None),
+        }
+    }
+}
+
 impl NixSerialize for Option<UnkeyedValidPathInfo> {
     async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
     where
@@ -125,6 +177,27 @@ impl NixDeserialize for StorePath<String> {
     }
 }
 
+impl NixDeserialize for Option<StorePath<String>> {
+    async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
+    where
+        R: ?Sized + NixRead + Send,
+    {
+        use crate::wire::de::Error;
+        if let Some(buf) = reader.try_read_bytes().await? {
+            if buf.is_empty() {
+                Ok(Some(None))
+            } else {
+                let result = StorePath::<String>::from_absolute_path(&buf);
+                result
+                    .map(|r| Some(Some(r)))
+                    .map_err(R::Error::invalid_data)
+            }
+        } else {
+            Ok(Some(None))
+        }
+    }
+}
+
 // Custom implementation since Display does not use absolute paths.
 impl<S> NixSerialize for StorePath<S>
 where
@@ -174,3 +247,60 @@ pub struct QueryValidPaths {
     #[nix(version = "27..")]
     pub substitute: bool,
 }
+
+/// newtype wrapper for the byte array that correctly implements NixSerialize, NixDeserialize.
+#[derive(Debug)]
+pub struct NarHash([u8; 32]);
+
+impl std::ops::Deref for NarHash {
+    type Target = [u8; 32];
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl NixDeserialize for NarHash {
+    async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
+    where
+        R: ?Sized + NixRead + Send,
+    {
+        if let Some(bytes) = reader.try_read_bytes().await? {
+            let result = data_encoding::HEXLOWER
+                .decode(bytes.as_ref())
+                .map_err(R::Error::invalid_data)?;
+            Ok(Some(NarHash(result.try_into().map_err(|_| {
+                R::Error::invalid_data("incorrect length")
+            })?)))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Request type for [super::worker_protocol::Operation::AddToStoreNar]
+#[derive(NixDeserialize, Debug)]
+pub struct AddToStoreNarRequest {
+    // - path :: [StorePath][se-StorePath]
+    pub path: StorePath<String>,
+    // - deriver :: [OptStorePath][se-OptStorePath]
+    pub deriver: Option<StorePath<String>>,
+    // - narHash :: [NARHash][se-NARHash] - always sha256
+    pub nar_hash: NarHash,
+    // - references :: [Set][se-Set] of [StorePath][se-StorePath]
+    pub references: Vec<StorePath<String>>,
+    // - registrationTime :: [Time][se-Time]
+    pub registration_time: u64,
+    // - narSize :: [UInt64][se-UInt64]
+    pub nar_size: u64,
+    // - ultimate :: [Bool64][se-Bool64]
+    pub ultimate: bool,
+    // - signatures :: [Set][se-Set] of [Signature][se-Signature]
+    pub signatures: Vec<Signature<String>>,
+    // - ca :: [OptContentAddress][se-OptContentAddress]
+    pub ca: Option<CAHash>,
+    // - repair :: [Bool64][se-Bool64]
+    pub repair: bool,
+    // - dontCheckSigs :: [Bool64][se-Bool64]
+    pub dont_check_sigs: bool,
+}