about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/nix-compat/src/nix_daemon/handler.rs69
-rw-r--r--tvix/nix-compat/src/nix_daemon/mod.rs22
-rw-r--r--tvix/nix-compat/src/nix_daemon/types.rs130
-rw-r--r--tvix/nix-daemon/src/bin/nix-daemon.rs8
-rw-r--r--tvix/nix-daemon/src/lib.rs79
5 files changed, 289 insertions, 19 deletions
diff --git a/tvix/nix-compat/src/nix_daemon/handler.rs b/tvix/nix-compat/src/nix_daemon/handler.rs
index 65c5c2d60d08..4f43612114d8 100644
--- a/tvix/nix-compat/src/nix_daemon/handler.rs
+++ b/tvix/nix-compat/src/nix_daemon/handler.rs
@@ -1,4 +1,4 @@
-use std::{future::Future, sync::Arc};
+use std::{future::Future, ops::DerefMut, sync::Arc};
 
 use bytes::Bytes;
 use tokio::{
@@ -8,7 +8,8 @@ use tokio::{
 use tracing::{debug, warn};
 
 use super::{
-    types::QueryValidPaths,
+    framing::{NixFramedReader, StderrReadFramedReader},
+    types::{AddToStoreNarRequest, QueryValidPaths},
     worker_protocol::{server_handshake_client, ClientSettings, Operation, Trust, STDERR_LAST},
     NixDaemonIO,
 };
@@ -120,7 +121,7 @@ where
                 Ok(operation) => match operation {
                     Operation::IsValidPath => {
                         let path: StorePath<String> = self.reader.read_value().await?;
-                        self.handle(io.is_valid_path(&path)).await?
+                        Self::handle(&self.writer, io.is_valid_path(&path)).await?
                     }
                     // Note this operation does not currently delegate to NixDaemonIO,
                     // The general idea is that we will pass relevant ClientSettings
@@ -128,23 +129,23 @@ where
                     // For now we just store the settings in the NixDaemon for future use.
                     Operation::SetOptions => {
                         self.client_settings = self.reader.read_value().await?;
-                        self.handle(async { Ok(()) }).await?
+                        Self::handle(&self.writer, async { Ok(()) }).await?
                     }
                     Operation::QueryPathInfo => {
                         let path: StorePath<String> = self.reader.read_value().await?;
-                        self.handle(io.query_path_info(&path)).await?
+                        Self::handle(&self.writer, io.query_path_info(&path)).await?
                     }
                     Operation::QueryPathFromHashPart => {
                         let hash: Bytes = self.reader.read_value().await?;
-                        self.handle(io.query_path_from_hash_part(&hash)).await?
+                        Self::handle(&self.writer, io.query_path_from_hash_part(&hash)).await?
                     }
                     Operation::QueryValidPaths => {
                         let query: QueryValidPaths = self.reader.read_value().await?;
-                        self.handle(io.query_valid_paths(&query)).await?
+                        Self::handle(&self.writer, io.query_valid_paths(&query)).await?
                     }
                     Operation::QueryValidDerivers => {
                         let path: StorePath<String> = self.reader.read_value().await?;
-                        self.handle(io.query_valid_derivers(&path)).await?
+                        Self::handle(&self.writer, io.query_valid_derivers(&path)).await?
                     }
                     // FUTUREWORK: These are just stubs that return an empty list.
                     // It's important not to return an error for the local-overlay:// store
@@ -154,7 +155,7 @@ where
                     // invariants.
                     Operation::QueryReferrers | Operation::QueryRealisation => {
                         let _: String = self.reader.read_value().await?;
-                        self.handle(async move {
+                        Self::handle(&self.writer, async move {
                             warn!(
                                 ?operation,
                                 "This operation is not implemented. Returning empty result..."
@@ -163,6 +164,41 @@ where
                         })
                         .await?
                     }
+                    Operation::AddToStoreNar => {
+                        let request: AddToStoreNarRequest = self.reader.read_value().await?;
+                        let minor_version = self.protocol_version.minor();
+                        match minor_version {
+                            ..21 => {
+                                // Before protocol version 1.21, the nar is sent unframed, so we just
+                                // pass the reader directly to the operation.
+                                Self::handle(
+                                    &self.writer,
+                                    self.io.add_to_store_nar(request, &mut self.reader),
+                                )
+                                .await?
+                            }
+                            21..23 => {
+                                // Protocol versions 1.21 .. 1.23 use STDERR_READ protocol, see logging.md#stderr_read.
+                                Self::handle(&self.writer, async {
+                                    let mut writer = self.writer.lock().await;
+                                    let mut reader = StderrReadFramedReader::new(
+                                        &mut self.reader,
+                                        writer.deref_mut(),
+                                    );
+                                    self.io.add_to_store_nar(request, &mut reader).await
+                                })
+                                .await?
+                            }
+                            23.. => {
+                                // Starting at protocol version 1.23, the framed protocol is used, see serialization.md#framed
+                                let mut framed = NixFramedReader::new(&mut self.reader);
+                                Self::handle(&self.writer, async {
+                                    self.io.add_to_store_nar(request, &mut framed).await
+                                })
+                                .await?
+                            }
+                        }
+                    }
                     _ => {
                         return Err(std::io::Error::other(format!(
                             "Operation {operation:?} is not implemented"
@@ -188,14 +224,14 @@ where
     /// This is a helper method, awaiting on the passed in future and then
     /// handling log lines/activities as described above.
     async fn handle<T>(
-        &mut self,
+        writer: &Arc<Mutex<NixWriter<WriteHalf<RW>>>>,
         future: impl Future<Output = std::io::Result<T>>,
     ) -> Result<(), std::io::Error>
     where
         T: NixSerialize + Send,
     {
         let result = future.await;
-        let mut writer = self.writer.lock().await;
+        let mut writer = writer.lock().await;
 
         match result {
             Ok(r) => {
@@ -244,6 +280,17 @@ mod tests {
         ) -> Result<Option<UnkeyedValidPathInfo>> {
             Ok(None)
         }
+
+        async fn add_to_store_nar<R>(
+            &self,
+            _request: crate::nix_daemon::types::AddToStoreNarRequest,
+            _reader: &mut R,
+        ) -> Result<()>
+        where
+            R: tokio::io::AsyncRead + Send + Unpin,
+        {
+            Ok(())
+        }
     }
 
     #[tokio::test]
diff --git a/tvix/nix-compat/src/nix_daemon/mod.rs b/tvix/nix-compat/src/nix_daemon/mod.rs
index ce56934896be..b1fd15c04ed1 100644
--- a/tvix/nix-compat/src/nix_daemon/mod.rs
+++ b/tvix/nix-compat/src/nix_daemon/mod.rs
@@ -3,8 +3,9 @@ pub mod worker_protocol;
 use std::io::Result;
 
 use futures::future::try_join_all;
+use tokio::io::AsyncRead;
 use tracing::warn;
-use types::{QueryValidPaths, UnkeyedValidPathInfo};
+use types::{AddToStoreNarRequest, QueryValidPaths, UnkeyedValidPathInfo};
 
 use crate::store_path::StorePath;
 
@@ -60,6 +61,14 @@ pub trait NixDaemonIO: Sync {
             Ok(result)
         }
     }
+
+    fn add_to_store_nar<R>(
+        &self,
+        request: AddToStoreNarRequest,
+        reader: &mut R,
+    ) -> impl std::future::Future<Output = Result<()>> + Send
+    where
+        R: AsyncRead + Send + Unpin;
 }
 
 #[cfg(test)]
@@ -89,6 +98,17 @@ mod tests {
         ) -> std::io::Result<Option<UnkeyedValidPathInfo>> {
             Ok(None)
         }
+
+        async fn add_to_store_nar<R>(
+            &self,
+            _request: super::types::AddToStoreNarRequest,
+            _reader: &mut R,
+        ) -> std::io::Result<()>
+        where
+            R: tokio::io::AsyncRead + Send + Unpin,
+        {
+            Ok(())
+        }
     }
 
     #[tokio::test]
diff --git a/tvix/nix-compat/src/nix_daemon/types.rs b/tvix/nix-compat/src/nix_daemon/types.rs
index bf7b1e6f6e58..60d2304c9f7a 100644
--- a/tvix/nix-compat/src/nix_daemon/types.rs
+++ b/tvix/nix-compat/src/nix_daemon/types.rs
@@ -1,3 +1,4 @@
+use crate::wire::de::Error;
 use crate::{
     narinfo::Signature,
     nixhash::CAHash,
@@ -73,6 +74,21 @@ impl NixError {
 
 nix_compat_derive::nix_serialize_remote!(#[nix(display)] Signature<String>);
 
+impl NixDeserialize for Signature<String> {
+    async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
+    where
+        R: ?Sized + NixRead + Send,
+    {
+        let value: Option<String> = reader.try_read_value().await?;
+        match value {
+            Some(value) => Ok(Some(
+                Signature::<String>::parse(&value).map_err(R::Error::invalid_data)?,
+            )),
+            None => Ok(None),
+        }
+    }
+}
+
 impl NixSerialize for CAHash {
     async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
     where
@@ -94,6 +110,42 @@ impl NixSerialize for Option<CAHash> {
     }
 }
 
+impl NixDeserialize for CAHash {
+    async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
+    where
+        R: ?Sized + NixRead + Send,
+    {
+        let value: Option<String> = reader.try_read_value().await?;
+        match value {
+            Some(value) => Ok(Some(CAHash::from_nix_hex_str(&value).ok_or_else(|| {
+                R::Error::invalid_data(format!("Invalid cahash {}", value))
+            })?)),
+            None => Ok(None),
+        }
+    }
+}
+
+impl NixDeserialize for Option<CAHash> {
+    async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
+    where
+        R: ?Sized + NixRead + Send,
+    {
+        let value: Option<String> = reader.try_read_value().await?;
+        match value {
+            Some(value) => {
+                if value.is_empty() {
+                    Ok(None)
+                } else {
+                    Ok(Some(Some(CAHash::from_nix_hex_str(&value).ok_or_else(
+                        || R::Error::invalid_data(format!("Invalid cahash {}", value)),
+                    )?)))
+                }
+            }
+            None => Ok(None),
+        }
+    }
+}
+
 impl NixSerialize for Option<UnkeyedValidPathInfo> {
     async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error>
     where
@@ -125,6 +177,27 @@ impl NixDeserialize for StorePath<String> {
     }
 }
 
+impl NixDeserialize for Option<StorePath<String>> {
+    async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
+    where
+        R: ?Sized + NixRead + Send,
+    {
+        use crate::wire::de::Error;
+        if let Some(buf) = reader.try_read_bytes().await? {
+            if buf.is_empty() {
+                Ok(Some(None))
+            } else {
+                let result = StorePath::<String>::from_absolute_path(&buf);
+                result
+                    .map(|r| Some(Some(r)))
+                    .map_err(R::Error::invalid_data)
+            }
+        } else {
+            Ok(Some(None))
+        }
+    }
+}
+
 // Custom implementation since Display does not use absolute paths.
 impl<S> NixSerialize for StorePath<S>
 where
@@ -174,3 +247,60 @@ pub struct QueryValidPaths {
     #[nix(version = "27..")]
     pub substitute: bool,
 }
+
+/// newtype wrapper for the byte array that correctly implements NixSerialize, NixDeserialize.
+#[derive(Debug)]
+pub struct NarHash([u8; 32]);
+
+impl std::ops::Deref for NarHash {
+    type Target = [u8; 32];
+
+    fn deref(&self) -> &Self::Target {
+        &self.0
+    }
+}
+
+impl NixDeserialize for NarHash {
+    async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error>
+    where
+        R: ?Sized + NixRead + Send,
+    {
+        if let Some(bytes) = reader.try_read_bytes().await? {
+            let result = data_encoding::HEXLOWER
+                .decode(bytes.as_ref())
+                .map_err(R::Error::invalid_data)?;
+            Ok(Some(NarHash(result.try_into().map_err(|_| {
+                R::Error::invalid_data("incorrect length")
+            })?)))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+/// Request type for [super::worker_protocol::Operation::AddToStoreNar]
+#[derive(NixDeserialize, Debug)]
+pub struct AddToStoreNarRequest {
+    // - path :: [StorePath][se-StorePath]
+    pub path: StorePath<String>,
+    // - deriver :: [OptStorePath][se-OptStorePath]
+    pub deriver: Option<StorePath<String>>,
+    // - narHash :: [NARHash][se-NARHash] - always sha256
+    pub nar_hash: NarHash,
+    // - references :: [Set][se-Set] of [StorePath][se-StorePath]
+    pub references: Vec<StorePath<String>>,
+    // - registrationTime :: [Time][se-Time]
+    pub registration_time: u64,
+    // - narSize :: [UInt64][se-UInt64]
+    pub nar_size: u64,
+    // - ultimate :: [Bool64][se-Bool64]
+    pub ultimate: bool,
+    // - signatures :: [Set][se-Set] of [Signature][se-Signature]
+    pub signatures: Vec<Signature<String>>,
+    // - ca :: [OptContentAddress][se-OptContentAddress]
+    pub ca: Option<CAHash>,
+    // - repair :: [Bool64][se-Bool64]
+    pub repair: bool,
+    // - dontCheckSigs :: [Bool64][se-Bool64]
+    pub dont_check_sigs: bool,
+}
diff --git a/tvix/nix-daemon/src/bin/nix-daemon.rs b/tvix/nix-daemon/src/bin/nix-daemon.rs
index 0bb323c994bc..c44ebd925554 100644
--- a/tvix/nix-daemon/src/bin/nix-daemon.rs
+++ b/tvix/nix-daemon/src/bin/nix-daemon.rs
@@ -60,7 +60,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
 }
 
 async fn run(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
-    let (_blob_service, _directory_service, path_info_service, _nar_calculation_service) =
+    let (blob_service, directory_service, path_info_service, _nar_calculation_service) =
         construct_services(cli.service_addrs).await?;
 
     let listen_address = cli.listen_args.listen_address.unwrap_or_else(|| {
@@ -76,7 +76,11 @@ async fn run(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
     )
     .await?;
 
-    let io = Arc::new(TvixDaemon::new(path_info_service));
+    let io = Arc::new(TvixDaemon::new(
+        blob_service,
+        directory_service,
+        path_info_service,
+    ));
 
     while let Ok((connection, _)) = listener.accept().await {
         let io = io.clone();
diff --git a/tvix/nix-daemon/src/lib.rs b/tvix/nix-daemon/src/lib.rs
index e508d0750c9b..9a69ced1e5eb 100644
--- a/tvix/nix-daemon/src/lib.rs
+++ b/tvix/nix-daemon/src/lib.rs
@@ -4,20 +4,35 @@ use std::{
 };
 
 use nix_compat::{
-    nix_daemon::{types::UnkeyedValidPathInfo, NixDaemonIO},
+    nix_daemon::{
+        types::{AddToStoreNarRequest, UnkeyedValidPathInfo},
+        NixDaemonIO,
+    },
     nixbase32,
-    store_path::StorePath,
+    store_path::{build_ca_path, StorePath},
 };
-use tvix_store::{path_info::PathInfo, pathinfoservice::PathInfoService};
+use tracing::warn;
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
+use tvix_store::{nar::ingest_nar_and_hash, path_info::PathInfo, pathinfoservice::PathInfoService};
 
 #[allow(dead_code)]
 pub struct TvixDaemon {
+    blob_service: Arc<dyn BlobService>,
+    directory_service: Arc<dyn DirectoryService>,
     path_info_service: Arc<dyn PathInfoService>,
 }
 
 impl TvixDaemon {
-    pub fn new(path_info_service: Arc<dyn PathInfoService>) -> Self {
-        Self { path_info_service }
+    pub fn new(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+        path_info_service: Arc<dyn PathInfoService>,
+    ) -> Self {
+        Self {
+            blob_service,
+            directory_service,
+            path_info_service,
+        }
     }
 }
 
@@ -48,6 +63,60 @@ impl NixDaemonIO for TvixDaemon {
             None => Ok(None),
         }
     }
+
+    async fn add_to_store_nar<R>(&self, request: AddToStoreNarRequest, reader: &mut R) -> Result<()>
+    where
+        R: tokio::io::AsyncRead + Send + Unpin,
+    {
+        let (root_node, nar_sha256, nar_size) = ingest_nar_and_hash(
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+            reader,
+            &request.ca,
+        )
+        .await
+        .map_err(|e| Error::other(e.to_string()))?;
+
+        if nar_size != request.nar_size || nar_sha256 != *request.nar_hash {
+            warn!(
+                nar_hash.expected = nixbase32::encode(&*request.nar_hash),
+                nar_hash.actual = nixbase32::encode(&nar_sha256),
+                "nar hash mismatch"
+            );
+            return Err(Error::other(
+                "ingested nar ended up different from what was specified in the request",
+            ));
+        }
+
+        if let Some(cahash) = &request.ca {
+            let actual_path: StorePath<String> = build_ca_path(
+                request.path.name(),
+                cahash,
+                request.references.iter().map(|p| p.to_absolute_path()),
+                false,
+            )
+            .map_err(Error::other)?;
+            if actual_path != request.path {
+                return Err(Error::other("path mismatch"));
+            }
+        }
+
+        let path_info = PathInfo {
+            store_path: request.path,
+            node: root_node,
+            references: request.references,
+            nar_size,
+            nar_sha256,
+            signatures: request.signatures,
+            deriver: request.deriver,
+            ca: request.ca,
+        };
+        self.path_info_service
+            .put(path_info)
+            .await
+            .map_err(|e| Error::other(e.to_string()))?;
+        Ok(())
+    }
 }
 
 // PathInfo lives in the tvix-store crate, but does not depend on nix-compat's wire feature,