about summary refs log tree commit diff
path: root/tvix/store
diff options
Diffstat (limited to 'tvix/store')
5 files changed, 263 insertions, 16 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 22a7cf19f087..08045e62c125 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -23,7 +23,9 @@ serde = { workspace = true, features = ["derive"] }
 serde_json = { workspace = true }
 serde_with = { workspace = true }
 serde_qs = { workspace = true }
+sha1 = { workspace = true }
 sha2 = { workspace = true }
+md-5 = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
 tokio-listener = { workspace = true, features = ["clap", "multi-listener", "sd_listen", "tonic012"] }
@@ -61,6 +63,7 @@ rstest = { workspace = true }
 rstest_reuse = { workspace = true }
 tempfile = { workspace = true }
 tokio-retry = { workspace = true }
+hex-literal = { workspace = true }
 default = ["cloud", "fuse", "otlp", "tonic-reflection"]
diff --git a/tvix/store/src/nar/hashing_reader.rs b/tvix/store/src/nar/hashing_reader.rs
new file mode 100644
index 000000000000..feb90e8d378e
--- /dev/null
+++ b/tvix/store/src/nar/hashing_reader.rs
@@ -0,0 +1,101 @@
+use std::{
+    io::Result,
+    pin::Pin,
+    task::{ready, Context, Poll},
+use md5::{digest::DynDigest, Digest};
+use nix_compat::nixhash::{HashAlgo, NixHash};
+use pin_project_lite::pin_project;
+use tokio::io::{AsyncRead, ReadBuf};
+pin_project! {
+    /// AsyncRead implementation with a type-erased hasher.
+    ///
+    /// After it's read the bytes from the underlying reader, it can
+    /// produce the NixHash value corresponding to the Digest that it's been
+    /// constructed with.
+    ///
+    /// Because we are type-erasing the underlying Digest, it uses dynamic dispatch
+    /// and boxing. While it may seem like it could be slow, in practice it's used
+    /// in IO-bound workloads so the slowdown should be negligible.
+    ///
+    /// On the other hand it greatly improves ergonomics of using different hashing
+    /// algorithms and retrieving the corresponding NixHash values.
+    pub struct HashingReader<R> {
+        #[pin]
+        reader: R,
+        digest: Box<dyn ToHash>,
+    }
+/// Utility trait that simplifies digesting different hashes.
+/// The main benefit is that each corresponding impl produces its corresponding
+/// NixHash value as opposed to a lower level byte slice.
+trait ToHash: DynDigest + Send {
+    fn consume(self: Box<Self>) -> NixHash;
+impl ToHash for sha1::Sha1 {
+    fn consume(self: Box<Self>) -> NixHash {
+        NixHash::Sha1(self.finalize().to_vec().try_into().expect("Tvix bug"))
+    }
+impl ToHash for sha2::Sha256 {
+    fn consume(self: Box<Self>) -> NixHash {
+        NixHash::Sha256(self.finalize().to_vec().try_into().expect("Tvix bug"))
+    }
+impl ToHash for sha2::Sha512 {
+    fn consume(self: Box<Self>) -> NixHash {
+        NixHash::Sha512(Box::new(
+            self.finalize().to_vec().try_into().expect("Tvix bug"),
+        ))
+    }
+impl ToHash for md5::Md5 {
+    fn consume(self: Box<Self>) -> NixHash {
+        NixHash::Md5(self.finalize().to_vec().try_into().expect("Tvix bug"))
+    }
+impl<R> HashingReader<R> {
+    /// Given a NixHash, creates a HashingReader that uses the same hashing algorithm.
+    pub fn new_with_algo(algo: HashAlgo, reader: R) -> Self {
+        match algo {
+            HashAlgo::Md5 => HashingReader::new::<md5::Md5>(reader),
+            HashAlgo::Sha1 => HashingReader::new::<sha1::Sha1>(reader),
+            HashAlgo::Sha256 => HashingReader::new::<sha2::Sha256>(reader),
+            HashAlgo::Sha512 => HashingReader::new::<sha2::Sha512>(reader),
+        }
+    }
+    fn new<D: ToHash + Digest + 'static>(reader: R) -> Self {
+        HashingReader {
+            reader,
+            digest: Box::new(D::new()),
+        }
+    }
+    /// Returns the [`NixHash`] of the data that's been read from this reader.
+    pub fn consume(self) -> NixHash {
+        self.digest.consume()
+    }
+impl<R: AsyncRead> AsyncRead for HashingReader<R> {
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<Result<()>> {
+        let me = self.project();
+        let filled_length = buf.filled().len();
+        ready!(me.reader.poll_read(cx, buf))?;
+        me.digest.update(&buf.filled()[filled_length..]);
+        Poll::Ready(Ok(()))
+    }
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index b9a15fe71384..93dfec2d90a6 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -1,4 +1,7 @@
-use nix_compat::nar::reader::r#async as nar_reader;
+use nix_compat::{
+    nar::reader::r#async as nar_reader,
+    nixhash::{CAHash, NixHash},
 use sha2::Digest;
 use tokio::{
     io::{AsyncBufRead, AsyncRead},
@@ -15,6 +18,24 @@ use tvix_castore::{
     Node, PathBuf,
+use super::hashing_reader::HashingReader;
+/// Represents errors that can happen during nar ingestion.
+#[derive(Debug, thiserror::Error)]
+pub enum NarIngestionError {
+    #[error("{0}")]
+    IngestionError(#[from] IngestionError<Error>),
+    #[error("Hash mismatch, expected: {expected}, got: {actual}.")]
+    HashMismatch { expected: NixHash, actual: NixHash },
+    #[error("Expected the nar to contain a single file.")]
+    TypeMismatch,
+    #[error("Ingestion failed: {0}")]
+    Io(#[from] std::io::Error),
 /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
 /// interacting with a [BlobService] and [DirectoryService].
 /// Returns the castore root node, as well as the sha256 and size of the NAR
@@ -23,7 +44,8 @@ pub async fn ingest_nar_and_hash<R, BS, DS>(
     blob_service: BS,
     directory_service: DS,
     r: &mut R,
-) -> Result<(Node, [u8; 32], u64), IngestionError<Error>>
+    expected_cahash: &Option<CAHash>,
+) -> Result<(Node, [u8; 32], u64), NarIngestionError>
     R: AsyncRead + Unpin + Send,
     BS: BlobService + Clone + 'static,
@@ -33,20 +55,65 @@ where
     let mut nar_size = 0;
     // Assemble NarHash and NarSize as we read bytes.
-    let r = tokio_util::io::InspectReader::new(r, |b| {
+    let mut r = tokio_util::io::InspectReader::new(r, |b| {
         nar_size += b.len() as u64;
-        use std::io::Write;
-        nar_hash.write_all(b).unwrap();
+        nar_hash.update(b);
-    // HACK: InspectReader doesn't implement AsyncBufRead.
-    // See if this can be propagated through and we can then require our input
-    // reader to be buffered too.
-    let mut r = tokio::io::BufReader::new(r);
-    let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
-    Ok((root_node, nar_hash.finalize().into(), nar_size))
+    match expected_cahash {
+        Some(CAHash::Nar(expected_hash)) => {
+            // We technically don't need the Sha256 hasher as we are already computing the nar hash with the reader above,
+            // but it makes the control flow more uniform and easier to understand.
+            let mut ca_reader = HashingReader::new_with_algo(expected_hash.algo(), &mut r);
+            let mut r = tokio::io::BufReader::new(&mut ca_reader);
+            let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
+            let actual_hash = ca_reader.consume();
+            if actual_hash != *expected_hash {
+                return Err(NarIngestionError::HashMismatch {
+                    expected: expected_hash.clone(),
+                    actual: actual_hash,
+                });
+            }
+            Ok((root_node, nar_hash.finalize().into(), nar_size))
+        }
+        Some(CAHash::Flat(expected_hash)) => {
+            let mut r = tokio::io::BufReader::new(&mut r);
+            let root_node = ingest_nar(blob_service.clone(), directory_service, &mut r).await?;
+            match &root_node {
+                Node::File { digest, .. } => match blob_service.open_read(digest).await? {
+                    Some(blob_reader) => {
+                        let mut ca_reader =
+                            HashingReader::new_with_algo(expected_hash.algo(), blob_reader);
+                        tokio::io::copy(&mut ca_reader, &mut tokio::io::empty()).await?;
+                        let actual_hash = ca_reader.consume();
+                        if actual_hash != *expected_hash {
+                            return Err(NarIngestionError::HashMismatch {
+                                expected: expected_hash.clone(),
+                                actual: actual_hash,
+                            });
+                        }
+                        Ok((root_node, nar_hash.finalize().into(), nar_size))
+                    }
+                    None => Err(NarIngestionError::Io(std::io::Error::other(
+                        "Ingested data not found",
+                    ))),
+                },
+                _ => Err(NarIngestionError::TypeMismatch),
+            }
+        }
+        // We either got CAHash::Text, or no CAHash at all, so we just don't do any additional
+        // hash calculation/validation.
+        // FUTUREWORK: We should figure out what to do with CAHash::Text, according to nix-cpp
+        // they don't handle it either:
+        // https://github.com/NixOS/nix/blob/3e9cc78eb5e5c4f1e762e201856273809fd92e71/src/libstore/local-store.cc#L1099-L1133
+        _ => {
+            let mut r = tokio::io::BufReader::new(&mut r);
+            let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
+            Ok((root_node, nar_hash.finalize().into(), nar_size))
+        }
+    }
 /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
@@ -162,10 +229,12 @@ pub enum Error {
 mod test {
-    use crate::nar::ingest_nar;
+    use crate::nar::{ingest_nar, ingest_nar_and_hash, NarIngestionError};
     use std::io::Cursor;
     use std::sync::Arc;
+    use hex_literal::hex;
+    use nix_compat::nixhash::{CAHash, NixHash};
     use rstest::*;
     use tokio_stream::StreamExt;
     use tvix_castore::blobservice::BlobService;
@@ -267,4 +336,77 @@ mod test {
         assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]);
         assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]);
+    #[rstest]
+    #[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("fbd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
+    #[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("ff5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), &NAR_CONTENTS_COMPLICATED.clone())]
+    #[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("fd076287532e86365e841e92bfc50d8c")))), &NAR_CONTENTS_HELLOWORLD.clone(), )]
+    #[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("f24eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
+    #[tokio::test]
+    async fn ingest_with_cahash_mismatch(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+        #[case] ca_hash: Option<CAHash>,
+        #[case] nar_content: &Vec<u8>,
+    ) {
+        let err = ingest_nar_and_hash(
+            blob_service.clone(),
+            directory_service.clone(),
+            &mut Cursor::new(nar_content),
+            &ca_hash,
+        )
+        .await
+        .expect_err("Ingestion should have failed");
+        assert!(
+            matches!(err, NarIngestionError::HashMismatch { .. }),
+            "CAHash should have mismatched"
+        );
+    }
+    #[rstest]
+    #[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
+    #[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("1f5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), &NAR_CONTENTS_COMPLICATED.clone())]
+    #[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("ed076287532e86365e841e92bfc50d8c")))), &NAR_CONTENTS_HELLOWORLD.clone())]
+    #[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
+    #[tokio::test]
+    async fn ingest_with_cahash_correct(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+        #[case] ca_hash: Option<CAHash>,
+        #[case] nar_content: &Vec<u8>,
+    ) {
+        let _ = ingest_nar_and_hash(
+            blob_service.clone(),
+            directory_service,
+            &mut Cursor::new(nar_content),
+            &ca_hash,
+        )
+        .await
+        .expect("CAHash should have matched");
+    }
+    #[rstest]
+    #[case::nar_sha256(Some(CAHash::Flat(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
+    #[case::nar_symlink_sha1(Some(CAHash::Flat(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
+    #[tokio::test]
+    async fn ingest_with_flat_non_file(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+        #[case] ca_hash: Option<CAHash>,
+        #[case] nar_content: &Vec<u8>,
+    ) {
+        let err = ingest_nar_and_hash(
+            blob_service,
+            directory_service,
+            &mut Cursor::new(nar_content),
+            &ca_hash,
+        )
+        .await
+        .expect_err("Ingestion should have failed");
+        assert!(
+            matches!(err, NarIngestionError::TypeMismatch),
+            "Flat cahash should only be allowed for single file nars"
+        );
+    }
diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs
index 86505bcb0c07..4bf60ba44993 100644
--- a/tvix/store/src/nar/mod.rs
+++ b/tvix/store/src/nar/mod.rs
@@ -1,11 +1,11 @@
 use tonic::async_trait;
 use tvix_castore::B3Digest;
+mod hashing_reader;
 mod import;
 mod renderer;
 pub mod seekable;
-pub use import::ingest_nar;
-pub use import::ingest_nar_and_hash;
+pub use import::{ingest_nar, ingest_nar_and_hash, NarIngestionError};
 pub use renderer::calculate_size_and_sha256;
 pub use renderer::write_nar;
 pub use renderer::SimpleRenderer;
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
index af7580b1c61f..e9b83dcf3551 100644
--- a/tvix/store/src/pathinfoservice/nix_http.rs
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -204,6 +204,7 @@ where
             &mut r,
+            &narinfo.ca,
         .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;