diff options
-rw-r--r-- | tvix/Cargo.lock | 3 | ||||
-rw-r--r-- | tvix/Cargo.nix | 12 | ||||
-rw-r--r-- | tvix/glue/src/fetchers/mod.rs | 52 | ||||
-rw-r--r-- | tvix/nar-bridge/src/nar.rs | 20 | ||||
-rw-r--r-- | tvix/store/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/store/src/nar/hashing_reader.rs | 101 | ||||
-rw-r--r-- | tvix/store/src/nar/import.rs | 170 | ||||
-rw-r--r-- | tvix/store/src/nar/mod.rs | 4 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/nix_http.rs | 1 |
9 files changed, 302 insertions, 64 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 4b4f45cbe630..80e31a8ec9d4 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -4792,8 +4792,10 @@ dependencies = [ "ed25519", "ed25519-dalek", "futures", + "hex-literal", "hyper-util", "lru", + "md-5", "mimalloc", "nix-compat", "parking_lot", @@ -4809,6 +4811,7 @@ dependencies = [ "serde_json", "serde_qs", "serde_with", + "sha1", "sha2", "tempfile", "thiserror", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 313ec256c491..42f793d1d881 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -16032,6 +16032,10 @@ rec { packageId = "lru"; } { + name = "md-5"; + packageId = "md-5"; + } + { name = "mimalloc"; packageId = "mimalloc"; } @@ -16086,6 +16090,10 @@ rec { packageId = "serde_with"; } { + name = "sha1"; + packageId = "sha1"; + } + { name = "sha2"; packageId = "sha2"; } @@ -16184,6 +16192,10 @@ rec { packageId = "async-process"; } { + name = "hex-literal"; + packageId = "hex-literal"; + } + { name = "rstest"; packageId = "rstest"; } diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs index aeb1bd3d082c..8dd4fe8439fe 100644 --- a/tvix/glue/src/fetchers/mod.rs +++ b/tvix/glue/src/fetchers/mod.rs @@ -12,7 +12,7 @@ use tracing::{instrument, warn, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Node}; use tvix_store::{ - nar::NarCalculationService, + nar::{NarCalculationService, NarIngestionError}, pathinfoservice::{PathInfo, PathInfoService}, }; use url::Url; @@ -396,18 +396,7 @@ where let r = self.download(url.clone()).await?; // Pop compression. - let r = DecompressedReader::new(r); - - // Wrap the reader, calculating our own hash. - let mut hasher: Box<dyn DynDigest + Send> = match exp_hash.algo() { - HashAlgo::Md5 => Box::new(Md5::new()), - HashAlgo::Sha1 => Box::new(Sha1::new()), - HashAlgo::Sha256 => Box::new(Sha256::new()), - HashAlgo::Sha512 => Box::new(Sha512::new()), - }; - let mut r = tokio_util::io::InspectReader::new(r, |b| { - hasher.update(b); - }); + let mut r = DecompressedReader::new(r); // Ingest the NAR, get the root node. let (root_node, _actual_nar_sha256, actual_nar_size) = @@ -415,36 +404,19 @@ where self.blob_service.clone(), self.directory_service.clone(), &mut r, + &Some(CAHash::Nar(exp_hash.clone())), ) .await - .map_err(|e| FetcherError::Io(std::io::Error::other(e.to_string())))?; - - // finalize the hasher. - let actual_hash = { - match exp_hash.algo() { - HashAlgo::Md5 => { - NixHash::Md5(hasher.finalize().to_vec().try_into().unwrap()) - } - HashAlgo::Sha1 => { - NixHash::Sha1(hasher.finalize().to_vec().try_into().unwrap()) + .map_err(|e| match e { + NarIngestionError::HashMismatch { expected, actual } => { + FetcherError::HashMismatch { + url, + wanted: expected, + got: actual, + } } - HashAlgo::Sha256 => { - NixHash::Sha256(hasher.finalize().to_vec().try_into().unwrap()) - } - HashAlgo::Sha512 => { - NixHash::Sha512(hasher.finalize().to_vec().try_into().unwrap()) - } - } - }; - - // Ensure the hash matches. - if exp_hash != actual_hash { - return Err(FetcherError::HashMismatch { - url, - wanted: exp_hash, - got: actual_hash, - }); - } + _ => FetcherError::Io(std::io::Error::other(e.to_string())), + })?; Ok(( root_node, // use a CAHash::Nar with the algo from the input. diff --git a/tvix/nar-bridge/src/nar.rs b/tvix/nar-bridge/src/nar.rs index f9b50fd6bc42..c351c9e4bdda 100644 --- a/tvix/nar-bridge/src/nar.rs +++ b/tvix/nar-bridge/src/nar.rs @@ -173,14 +173,18 @@ pub async fn put( })); // ingest the NAR - let (root_node, nar_hash_actual, nar_size) = - ingest_nar_and_hash(blob_service.clone(), directory_service.clone(), &mut r) - .await - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - .map_err(|e| { - warn!(err=%e, "failed to ingest nar"); - StatusCode::INTERNAL_SERVER_ERROR - })?; + let (root_node, nar_hash_actual, nar_size) = ingest_nar_and_hash( + blob_service.clone(), + directory_service.clone(), + &mut r, + &None, + ) + .await + .map_err(io::Error::other) + .map_err(|e| { + warn!(err=%e, "failed to ingest nar"); + StatusCode::INTERNAL_SERVER_ERROR + })?; let s = Span::current(); s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected)); diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 22a7cf19f087..08045e62c125 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -23,7 +23,9 @@ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } serde_with = { workspace = true } serde_qs = { workspace = true } +sha1 = { workspace = true } sha2 = { workspace = true } +md-5 = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } tokio-listener = { workspace = true, features = ["clap", "multi-listener", "sd_listen", "tonic012"] } @@ -61,6 +63,7 @@ rstest = { workspace = true } rstest_reuse = { workspace = true } tempfile = { workspace = true } tokio-retry = { workspace = true } +hex-literal = { workspace = true } [features] default = ["cloud", "fuse", "otlp", "tonic-reflection"] diff --git a/tvix/store/src/nar/hashing_reader.rs b/tvix/store/src/nar/hashing_reader.rs new file mode 100644 index 000000000000..feb90e8d378e --- /dev/null +++ b/tvix/store/src/nar/hashing_reader.rs @@ -0,0 +1,101 @@ +use std::{ + io::Result, + pin::Pin, + task::{ready, Context, Poll}, +}; + +use md5::{digest::DynDigest, Digest}; +use nix_compat::nixhash::{HashAlgo, NixHash}; +use pin_project_lite::pin_project; +use tokio::io::{AsyncRead, ReadBuf}; + +pin_project! { + /// AsyncRead implementation with a type-erased hasher. + /// + /// After it's read the bytes from the underlying reader, it can + /// produce the NixHash value corresponding to the Digest that it's been + /// constructed with. + /// + /// Because we are type-erasing the underlying Digest, it uses dynamic dispatch + /// and boxing. While it may seem like it could be slow, in practice it's used + /// in IO-bound workloads so the slowdown should be negligible. + /// + /// On the other hand it greatly improves ergonomics of using different hashing + /// algorithms and retrieving the corresponding NixHash values. + pub struct HashingReader<R> { + #[pin] + reader: R, + digest: Box<dyn ToHash>, + } +} + +/// Utility trait that simplifies digesting different hashes. +/// +/// The main benefit is that each corresponding impl produces its corresponding +/// NixHash value as opposed to a lower level byte slice. +trait ToHash: DynDigest + Send { + fn consume(self: Box<Self>) -> NixHash; +} + +impl ToHash for sha1::Sha1 { + fn consume(self: Box<Self>) -> NixHash { + NixHash::Sha1(self.finalize().to_vec().try_into().expect("Tvix bug")) + } +} + +impl ToHash for sha2::Sha256 { + fn consume(self: Box<Self>) -> NixHash { + NixHash::Sha256(self.finalize().to_vec().try_into().expect("Tvix bug")) + } +} + +impl ToHash for sha2::Sha512 { + fn consume(self: Box<Self>) -> NixHash { + NixHash::Sha512(Box::new( + self.finalize().to_vec().try_into().expect("Tvix bug"), + )) + } +} + +impl ToHash for md5::Md5 { + fn consume(self: Box<Self>) -> NixHash { + NixHash::Md5(self.finalize().to_vec().try_into().expect("Tvix bug")) + } +} + +impl<R> HashingReader<R> { + /// Given a NixHash, creates a HashingReader that uses the same hashing algorithm. + pub fn new_with_algo(algo: HashAlgo, reader: R) -> Self { + match algo { + HashAlgo::Md5 => HashingReader::new::<md5::Md5>(reader), + HashAlgo::Sha1 => HashingReader::new::<sha1::Sha1>(reader), + HashAlgo::Sha256 => HashingReader::new::<sha2::Sha256>(reader), + HashAlgo::Sha512 => HashingReader::new::<sha2::Sha512>(reader), + } + } + fn new<D: ToHash + Digest + 'static>(reader: R) -> Self { + HashingReader { + reader, + digest: Box::new(D::new()), + } + } + + /// Returns the [`NixHash`] of the data that's been read from this reader. + pub fn consume(self) -> NixHash { + self.digest.consume() + } +} + +impl<R: AsyncRead> AsyncRead for HashingReader<R> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll<Result<()>> { + let me = self.project(); + let filled_length = buf.filled().len(); + ready!(me.reader.poll_read(cx, buf))?; + me.digest.update(&buf.filled()[filled_length..]); + Poll::Ready(Ok(())) + } +} diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index b9a15fe71384..93dfec2d90a6 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,4 +1,7 @@ -use nix_compat::nar::reader::r#async as nar_reader; +use nix_compat::{ + nar::reader::r#async as nar_reader, + nixhash::{CAHash, NixHash}, +}; use sha2::Digest; use tokio::{ io::{AsyncBufRead, AsyncRead}, @@ -15,6 +18,24 @@ use tvix_castore::{ Node, PathBuf, }; +use super::hashing_reader::HashingReader; + +/// Represents errors that can happen during nar ingestion. +#[derive(Debug, thiserror::Error)] +pub enum NarIngestionError { + #[error("{0}")] + IngestionError(#[from] IngestionError<Error>), + + #[error("Hash mismatch, expected: {expected}, got: {actual}.")] + HashMismatch { expected: NixHash, actual: NixHash }, + + #[error("Expected the nar to contain a single file.")] + TypeMismatch, + + #[error("Ingestion failed: {0}")] + Io(#[from] std::io::Error), +} + /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, /// interacting with a [BlobService] and [DirectoryService]. /// Returns the castore root node, as well as the sha256 and size of the NAR @@ -23,7 +44,8 @@ pub async fn ingest_nar_and_hash<R, BS, DS>( blob_service: BS, directory_service: DS, r: &mut R, -) -> Result<(Node, [u8; 32], u64), IngestionError<Error>> + expected_cahash: &Option<CAHash>, +) -> Result<(Node, [u8; 32], u64), NarIngestionError> where R: AsyncRead + Unpin + Send, BS: BlobService + Clone + 'static, @@ -33,20 +55,65 @@ where let mut nar_size = 0; // Assemble NarHash and NarSize as we read bytes. - let r = tokio_util::io::InspectReader::new(r, |b| { + let mut r = tokio_util::io::InspectReader::new(r, |b| { nar_size += b.len() as u64; - use std::io::Write; - nar_hash.write_all(b).unwrap(); + nar_hash.update(b); }); - // HACK: InspectReader doesn't implement AsyncBufRead. - // See if this can be propagated through and we can then require our input - // reader to be buffered too. - let mut r = tokio::io::BufReader::new(r); - - let root_node = ingest_nar(blob_service, directory_service, &mut r).await?; - - Ok((root_node, nar_hash.finalize().into(), nar_size)) + match expected_cahash { + Some(CAHash::Nar(expected_hash)) => { + // We technically don't need the Sha256 hasher as we are already computing the nar hash with the reader above, + // but it makes the control flow more uniform and easier to understand. + let mut ca_reader = HashingReader::new_with_algo(expected_hash.algo(), &mut r); + let mut r = tokio::io::BufReader::new(&mut ca_reader); + let root_node = ingest_nar(blob_service, directory_service, &mut r).await?; + let actual_hash = ca_reader.consume(); + + if actual_hash != *expected_hash { + return Err(NarIngestionError::HashMismatch { + expected: expected_hash.clone(), + actual: actual_hash, + }); + } + Ok((root_node, nar_hash.finalize().into(), nar_size)) + } + Some(CAHash::Flat(expected_hash)) => { + let mut r = tokio::io::BufReader::new(&mut r); + let root_node = ingest_nar(blob_service.clone(), directory_service, &mut r).await?; + match &root_node { + Node::File { digest, .. } => match blob_service.open_read(digest).await? { + Some(blob_reader) => { + let mut ca_reader = + HashingReader::new_with_algo(expected_hash.algo(), blob_reader); + tokio::io::copy(&mut ca_reader, &mut tokio::io::empty()).await?; + let actual_hash = ca_reader.consume(); + + if actual_hash != *expected_hash { + return Err(NarIngestionError::HashMismatch { + expected: expected_hash.clone(), + actual: actual_hash, + }); + } + Ok((root_node, nar_hash.finalize().into(), nar_size)) + } + None => Err(NarIngestionError::Io(std::io::Error::other( + "Ingested data not found", + ))), + }, + _ => Err(NarIngestionError::TypeMismatch), + } + } + // We either got CAHash::Text, or no CAHash at all, so we just don't do any additional + // hash calculation/validation. + // FUTUREWORK: We should figure out what to do with CAHash::Text, according to nix-cpp + // they don't handle it either: + // https://github.com/NixOS/nix/blob/3e9cc78eb5e5c4f1e762e201856273809fd92e71/src/libstore/local-store.cc#L1099-L1133 + _ => { + let mut r = tokio::io::BufReader::new(&mut r); + let root_node = ingest_nar(blob_service, directory_service, &mut r).await?; + Ok((root_node, nar_hash.finalize().into(), nar_size)) + } + } } /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, @@ -162,10 +229,12 @@ pub enum Error { #[cfg(test)] mod test { - use crate::nar::ingest_nar; + use crate::nar::{ingest_nar, ingest_nar_and_hash, NarIngestionError}; use std::io::Cursor; use std::sync::Arc; + use hex_literal::hex; + use nix_compat::nixhash::{CAHash, NixHash}; use rstest::*; use tokio_stream::StreamExt; use tvix_castore::blobservice::BlobService; @@ -267,4 +336,77 @@ mod test { assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]); assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]); } + + #[rstest] + #[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("fbd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())] + #[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("ff5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), &NAR_CONTENTS_COMPLICATED.clone())] + #[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("fd076287532e86365e841e92bfc50d8c")))), &NAR_CONTENTS_HELLOWORLD.clone(), )] + #[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("f24eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())] + #[tokio::test] + async fn ingest_with_cahash_mismatch( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + #[case] ca_hash: Option<CAHash>, + #[case] nar_content: &Vec<u8>, + ) { + let err = ingest_nar_and_hash( + blob_service.clone(), + directory_service.clone(), + &mut Cursor::new(nar_content), + &ca_hash, + ) + .await + .expect_err("Ingestion should have failed"); + assert!( + matches!(err, NarIngestionError::HashMismatch { .. }), + "CAHash should have mismatched" + ); + } + + #[rstest] + #[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())] + #[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("1f5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), &NAR_CONTENTS_COMPLICATED.clone())] + #[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("ed076287532e86365e841e92bfc50d8c")))), &NAR_CONTENTS_HELLOWORLD.clone())] + #[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())] + #[tokio::test] + async fn ingest_with_cahash_correct( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + #[case] ca_hash: Option<CAHash>, + #[case] nar_content: &Vec<u8>, + ) { + let _ = ingest_nar_and_hash( + blob_service.clone(), + directory_service, + &mut Cursor::new(nar_content), + &ca_hash, + ) + .await + .expect("CAHash should have matched"); + } + + #[rstest] + #[case::nar_sha256(Some(CAHash::Flat(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())] + #[case::nar_symlink_sha1(Some(CAHash::Flat(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())] + #[tokio::test] + async fn ingest_with_flat_non_file( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + #[case] ca_hash: Option<CAHash>, + #[case] nar_content: &Vec<u8>, + ) { + let err = ingest_nar_and_hash( + blob_service, + directory_service, + &mut Cursor::new(nar_content), + &ca_hash, + ) + .await + .expect_err("Ingestion should have failed"); + + assert!( + matches!(err, NarIngestionError::TypeMismatch), + "Flat cahash should only be allowed for single file nars" + ); + } } diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs index 86505bcb0c07..4bf60ba44993 100644 --- a/tvix/store/src/nar/mod.rs +++ b/tvix/store/src/nar/mod.rs @@ -1,11 +1,11 @@ use tonic::async_trait; use tvix_castore::B3Digest; +mod hashing_reader; mod import; mod renderer; pub mod seekable; -pub use import::ingest_nar; -pub use import::ingest_nar_and_hash; +pub use import::{ingest_nar, ingest_nar_and_hash, NarIngestionError}; pub use renderer::calculate_size_and_sha256; pub use renderer::write_nar; pub use renderer::SimpleRenderer; diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index af7580b1c61f..e9b83dcf3551 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -204,6 +204,7 @@ where self.blob_service.clone(), self.directory_service.clone(), &mut r, + &narinfo.ca, ) .await .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; |