diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/nix-compat/src/nix_daemon/handler.rs | 69 | ||||
-rw-r--r-- | tvix/nix-compat/src/nix_daemon/mod.rs | 22 | ||||
-rw-r--r-- | tvix/nix-compat/src/nix_daemon/types.rs | 130 | ||||
-rw-r--r-- | tvix/nix-daemon/src/bin/nix-daemon.rs | 8 | ||||
-rw-r--r-- | tvix/nix-daemon/src/lib.rs | 79 |
5 files changed, 289 insertions, 19 deletions
diff --git a/tvix/nix-compat/src/nix_daemon/handler.rs b/tvix/nix-compat/src/nix_daemon/handler.rs index 65c5c2d60d08..4f43612114d8 100644 --- a/tvix/nix-compat/src/nix_daemon/handler.rs +++ b/tvix/nix-compat/src/nix_daemon/handler.rs @@ -1,4 +1,4 @@ -use std::{future::Future, sync::Arc}; +use std::{future::Future, ops::DerefMut, sync::Arc}; use bytes::Bytes; use tokio::{ @@ -8,7 +8,8 @@ use tokio::{ use tracing::{debug, warn}; use super::{ - types::QueryValidPaths, + framing::{NixFramedReader, StderrReadFramedReader}, + types::{AddToStoreNarRequest, QueryValidPaths}, worker_protocol::{server_handshake_client, ClientSettings, Operation, Trust, STDERR_LAST}, NixDaemonIO, }; @@ -120,7 +121,7 @@ where Ok(operation) => match operation { Operation::IsValidPath => { let path: StorePath<String> = self.reader.read_value().await?; - self.handle(io.is_valid_path(&path)).await? + Self::handle(&self.writer, io.is_valid_path(&path)).await? } // Note this operation does not currently delegate to NixDaemonIO, // The general idea is that we will pass relevant ClientSettings @@ -128,23 +129,23 @@ where // For now we just store the settings in the NixDaemon for future use. Operation::SetOptions => { self.client_settings = self.reader.read_value().await?; - self.handle(async { Ok(()) }).await? + Self::handle(&self.writer, async { Ok(()) }).await? } Operation::QueryPathInfo => { let path: StorePath<String> = self.reader.read_value().await?; - self.handle(io.query_path_info(&path)).await? + Self::handle(&self.writer, io.query_path_info(&path)).await? } Operation::QueryPathFromHashPart => { let hash: Bytes = self.reader.read_value().await?; - self.handle(io.query_path_from_hash_part(&hash)).await? + Self::handle(&self.writer, io.query_path_from_hash_part(&hash)).await? } Operation::QueryValidPaths => { let query: QueryValidPaths = self.reader.read_value().await?; - self.handle(io.query_valid_paths(&query)).await? + Self::handle(&self.writer, io.query_valid_paths(&query)).await? } Operation::QueryValidDerivers => { let path: StorePath<String> = self.reader.read_value().await?; - self.handle(io.query_valid_derivers(&path)).await? + Self::handle(&self.writer, io.query_valid_derivers(&path)).await? } // FUTUREWORK: These are just stubs that return an empty list. // It's important not to return an error for the local-overlay:// store @@ -154,7 +155,7 @@ where // invariants. Operation::QueryReferrers | Operation::QueryRealisation => { let _: String = self.reader.read_value().await?; - self.handle(async move { + Self::handle(&self.writer, async move { warn!( ?operation, "This operation is not implemented. Returning empty result..." @@ -163,6 +164,41 @@ where }) .await? } + Operation::AddToStoreNar => { + let request: AddToStoreNarRequest = self.reader.read_value().await?; + let minor_version = self.protocol_version.minor(); + match minor_version { + ..21 => { + // Before protocol version 1.21, the nar is sent unframed, so we just + // pass the reader directly to the operation. + Self::handle( + &self.writer, + self.io.add_to_store_nar(request, &mut self.reader), + ) + .await? + } + 21..23 => { + // Protocol versions 1.21 .. 1.23 use STDERR_READ protocol, see logging.md#stderr_read. + Self::handle(&self.writer, async { + let mut writer = self.writer.lock().await; + let mut reader = StderrReadFramedReader::new( + &mut self.reader, + writer.deref_mut(), + ); + self.io.add_to_store_nar(request, &mut reader).await + }) + .await? + } + 23.. => { + // Starting at protocol version 1.23, the framed protocol is used, see serialization.md#framed + let mut framed = NixFramedReader::new(&mut self.reader); + Self::handle(&self.writer, async { + self.io.add_to_store_nar(request, &mut framed).await + }) + .await? + } + } + } _ => { return Err(std::io::Error::other(format!( "Operation {operation:?} is not implemented" @@ -188,14 +224,14 @@ where /// This is a helper method, awaiting on the passed in future and then /// handling log lines/activities as described above. async fn handle<T>( - &mut self, + writer: &Arc<Mutex<NixWriter<WriteHalf<RW>>>>, future: impl Future<Output = std::io::Result<T>>, ) -> Result<(), std::io::Error> where T: NixSerialize + Send, { let result = future.await; - let mut writer = self.writer.lock().await; + let mut writer = writer.lock().await; match result { Ok(r) => { @@ -244,6 +280,17 @@ mod tests { ) -> Result<Option<UnkeyedValidPathInfo>> { Ok(None) } + + async fn add_to_store_nar<R>( + &self, + _request: crate::nix_daemon::types::AddToStoreNarRequest, + _reader: &mut R, + ) -> Result<()> + where + R: tokio::io::AsyncRead + Send + Unpin, + { + Ok(()) + } } #[tokio::test] diff --git a/tvix/nix-compat/src/nix_daemon/mod.rs b/tvix/nix-compat/src/nix_daemon/mod.rs index ce56934896be..b1fd15c04ed1 100644 --- a/tvix/nix-compat/src/nix_daemon/mod.rs +++ b/tvix/nix-compat/src/nix_daemon/mod.rs @@ -3,8 +3,9 @@ pub mod worker_protocol; use std::io::Result; use futures::future::try_join_all; +use tokio::io::AsyncRead; use tracing::warn; -use types::{QueryValidPaths, UnkeyedValidPathInfo}; +use types::{AddToStoreNarRequest, QueryValidPaths, UnkeyedValidPathInfo}; use crate::store_path::StorePath; @@ -60,6 +61,14 @@ pub trait NixDaemonIO: Sync { Ok(result) } } + + fn add_to_store_nar<R>( + &self, + request: AddToStoreNarRequest, + reader: &mut R, + ) -> impl std::future::Future<Output = Result<()>> + Send + where + R: AsyncRead + Send + Unpin; } #[cfg(test)] @@ -89,6 +98,17 @@ mod tests { ) -> std::io::Result<Option<UnkeyedValidPathInfo>> { Ok(None) } + + async fn add_to_store_nar<R>( + &self, + _request: super::types::AddToStoreNarRequest, + _reader: &mut R, + ) -> std::io::Result<()> + where + R: tokio::io::AsyncRead + Send + Unpin, + { + Ok(()) + } } #[tokio::test] diff --git a/tvix/nix-compat/src/nix_daemon/types.rs b/tvix/nix-compat/src/nix_daemon/types.rs index bf7b1e6f6e58..60d2304c9f7a 100644 --- a/tvix/nix-compat/src/nix_daemon/types.rs +++ b/tvix/nix-compat/src/nix_daemon/types.rs @@ -1,3 +1,4 @@ +use crate::wire::de::Error; use crate::{ narinfo::Signature, nixhash::CAHash, @@ -73,6 +74,21 @@ impl NixError { nix_compat_derive::nix_serialize_remote!(#[nix(display)] Signature<String>); +impl NixDeserialize for Signature<String> { + async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error> + where + R: ?Sized + NixRead + Send, + { + let value: Option<String> = reader.try_read_value().await?; + match value { + Some(value) => Ok(Some( + Signature::<String>::parse(&value).map_err(R::Error::invalid_data)?, + )), + None => Ok(None), + } + } +} + impl NixSerialize for CAHash { async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error> where @@ -94,6 +110,42 @@ impl NixSerialize for Option<CAHash> { } } +impl NixDeserialize for CAHash { + async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error> + where + R: ?Sized + NixRead + Send, + { + let value: Option<String> = reader.try_read_value().await?; + match value { + Some(value) => Ok(Some(CAHash::from_nix_hex_str(&value).ok_or_else(|| { + R::Error::invalid_data(format!("Invalid cahash {}", value)) + })?)), + None => Ok(None), + } + } +} + +impl NixDeserialize for Option<CAHash> { + async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error> + where + R: ?Sized + NixRead + Send, + { + let value: Option<String> = reader.try_read_value().await?; + match value { + Some(value) => { + if value.is_empty() { + Ok(None) + } else { + Ok(Some(Some(CAHash::from_nix_hex_str(&value).ok_or_else( + || R::Error::invalid_data(format!("Invalid cahash {}", value)), + )?))) + } + } + None => Ok(None), + } + } +} + impl NixSerialize for Option<UnkeyedValidPathInfo> { async fn serialize<W>(&self, writer: &mut W) -> Result<(), W::Error> where @@ -125,6 +177,27 @@ impl NixDeserialize for StorePath<String> { } } +impl NixDeserialize for Option<StorePath<String>> { + async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error> + where + R: ?Sized + NixRead + Send, + { + use crate::wire::de::Error; + if let Some(buf) = reader.try_read_bytes().await? { + if buf.is_empty() { + Ok(Some(None)) + } else { + let result = StorePath::<String>::from_absolute_path(&buf); + result + .map(|r| Some(Some(r))) + .map_err(R::Error::invalid_data) + } + } else { + Ok(Some(None)) + } + } +} + // Custom implementation since Display does not use absolute paths. impl<S> NixSerialize for StorePath<S> where @@ -174,3 +247,60 @@ pub struct QueryValidPaths { #[nix(version = "27..")] pub substitute: bool, } + +/// newtype wrapper for the byte array that correctly implements NixSerialize, NixDeserialize. +#[derive(Debug)] +pub struct NarHash([u8; 32]); + +impl std::ops::Deref for NarHash { + type Target = [u8; 32]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl NixDeserialize for NarHash { + async fn try_deserialize<R>(reader: &mut R) -> Result<Option<Self>, R::Error> + where + R: ?Sized + NixRead + Send, + { + if let Some(bytes) = reader.try_read_bytes().await? { + let result = data_encoding::HEXLOWER + .decode(bytes.as_ref()) + .map_err(R::Error::invalid_data)?; + Ok(Some(NarHash(result.try_into().map_err(|_| { + R::Error::invalid_data("incorrect length") + })?))) + } else { + Ok(None) + } + } +} + +/// Request type for [super::worker_protocol::Operation::AddToStoreNar] +#[derive(NixDeserialize, Debug)] +pub struct AddToStoreNarRequest { + // - path :: [StorePath][se-StorePath] + pub path: StorePath<String>, + // - deriver :: [OptStorePath][se-OptStorePath] + pub deriver: Option<StorePath<String>>, + // - narHash :: [NARHash][se-NARHash] - always sha256 + pub nar_hash: NarHash, + // - references :: [Set][se-Set] of [StorePath][se-StorePath] + pub references: Vec<StorePath<String>>, + // - registrationTime :: [Time][se-Time] + pub registration_time: u64, + // - narSize :: [UInt64][se-UInt64] + pub nar_size: u64, + // - ultimate :: [Bool64][se-Bool64] + pub ultimate: bool, + // - signatures :: [Set][se-Set] of [Signature][se-Signature] + pub signatures: Vec<Signature<String>>, + // - ca :: [OptContentAddress][se-OptContentAddress] + pub ca: Option<CAHash>, + // - repair :: [Bool64][se-Bool64] + pub repair: bool, + // - dontCheckSigs :: [Bool64][se-Bool64] + pub dont_check_sigs: bool, +} diff --git a/tvix/nix-daemon/src/bin/nix-daemon.rs b/tvix/nix-daemon/src/bin/nix-daemon.rs index 0bb323c994bc..c44ebd925554 100644 --- a/tvix/nix-daemon/src/bin/nix-daemon.rs +++ b/tvix/nix-daemon/src/bin/nix-daemon.rs @@ -60,7 +60,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { } async fn run(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { - let (_blob_service, _directory_service, path_info_service, _nar_calculation_service) = + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = construct_services(cli.service_addrs).await?; let listen_address = cli.listen_args.listen_address.unwrap_or_else(|| { @@ -76,7 +76,11 @@ async fn run(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { ) .await?; - let io = Arc::new(TvixDaemon::new(path_info_service)); + let io = Arc::new(TvixDaemon::new( + blob_service, + directory_service, + path_info_service, + )); while let Ok((connection, _)) = listener.accept().await { let io = io.clone(); diff --git a/tvix/nix-daemon/src/lib.rs b/tvix/nix-daemon/src/lib.rs index e508d0750c9b..9a69ced1e5eb 100644 --- a/tvix/nix-daemon/src/lib.rs +++ b/tvix/nix-daemon/src/lib.rs @@ -4,20 +4,35 @@ use std::{ }; use nix_compat::{ - nix_daemon::{types::UnkeyedValidPathInfo, NixDaemonIO}, + nix_daemon::{ + types::{AddToStoreNarRequest, UnkeyedValidPathInfo}, + NixDaemonIO, + }, nixbase32, - store_path::StorePath, + store_path::{build_ca_path, StorePath}, }; -use tvix_store::{path_info::PathInfo, pathinfoservice::PathInfoService}; +use tracing::warn; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; +use tvix_store::{nar::ingest_nar_and_hash, path_info::PathInfo, pathinfoservice::PathInfoService}; #[allow(dead_code)] pub struct TvixDaemon { + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, path_info_service: Arc<dyn PathInfoService>, } impl TvixDaemon { - pub fn new(path_info_service: Arc<dyn PathInfoService>) -> Self { - Self { path_info_service } + pub fn new( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + path_info_service: Arc<dyn PathInfoService>, + ) -> Self { + Self { + blob_service, + directory_service, + path_info_service, + } } } @@ -48,6 +63,60 @@ impl NixDaemonIO for TvixDaemon { None => Ok(None), } } + + async fn add_to_store_nar<R>(&self, request: AddToStoreNarRequest, reader: &mut R) -> Result<()> + where + R: tokio::io::AsyncRead + Send + Unpin, + { + let (root_node, nar_sha256, nar_size) = ingest_nar_and_hash( + self.blob_service.clone(), + self.directory_service.clone(), + reader, + &request.ca, + ) + .await + .map_err(|e| Error::other(e.to_string()))?; + + if nar_size != request.nar_size || nar_sha256 != *request.nar_hash { + warn!( + nar_hash.expected = nixbase32::encode(&*request.nar_hash), + nar_hash.actual = nixbase32::encode(&nar_sha256), + "nar hash mismatch" + ); + return Err(Error::other( + "ingested nar ended up different from what was specified in the request", + )); + } + + if let Some(cahash) = &request.ca { + let actual_path: StorePath<String> = build_ca_path( + request.path.name(), + cahash, + request.references.iter().map(|p| p.to_absolute_path()), + false, + ) + .map_err(Error::other)?; + if actual_path != request.path { + return Err(Error::other("path mismatch")); + } + } + + let path_info = PathInfo { + store_path: request.path, + node: root_node, + references: request.references, + nar_size, + nar_sha256, + signatures: request.signatures, + deriver: request.deriver, + ca: request.ca, + }; + self.path_info_service + .put(path_info) + .await + .map_err(|e| Error::other(e.to_string()))?; + Ok(()) + } } // PathInfo lives in the tvix-store crate, but does not depend on nix-compat's wire feature, |