about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/Cargo.lock3
-rw-r--r--tvix/Cargo.nix12
-rw-r--r--tvix/glue/src/fetchers/mod.rs52
-rw-r--r--tvix/nar-bridge/src/nar.rs20
-rw-r--r--tvix/store/Cargo.toml3
-rw-r--r--tvix/store/src/nar/hashing_reader.rs101
-rw-r--r--tvix/store/src/nar/import.rs170
-rw-r--r--tvix/store/src/nar/mod.rs4
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs1
9 files changed, 302 insertions, 64 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index 4b4f45cbe630..80e31a8ec9d4 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -4792,8 +4792,10 @@ dependencies = [
  "ed25519",
  "ed25519-dalek",
  "futures",
+ "hex-literal",
  "hyper-util",
  "lru",
+ "md-5",
  "mimalloc",
  "nix-compat",
  "parking_lot",
@@ -4809,6 +4811,7 @@ dependencies = [
  "serde_json",
  "serde_qs",
  "serde_with",
+ "sha1",
  "sha2",
  "tempfile",
  "thiserror",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index 313ec256c491..42f793d1d881 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -16032,6 +16032,10 @@ rec {
             packageId = "lru";
           }
           {
+            name = "md-5";
+            packageId = "md-5";
+          }
+          {
             name = "mimalloc";
             packageId = "mimalloc";
           }
@@ -16086,6 +16090,10 @@ rec {
             packageId = "serde_with";
           }
           {
+            name = "sha1";
+            packageId = "sha1";
+          }
+          {
             name = "sha2";
             packageId = "sha2";
           }
@@ -16184,6 +16192,10 @@ rec {
             packageId = "async-process";
           }
           {
+            name = "hex-literal";
+            packageId = "hex-literal";
+          }
+          {
             name = "rstest";
             packageId = "rstest";
           }
diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs
index aeb1bd3d082c..8dd4fe8439fe 100644
--- a/tvix/glue/src/fetchers/mod.rs
+++ b/tvix/glue/src/fetchers/mod.rs
@@ -12,7 +12,7 @@ use tracing::{instrument, warn, Span};
 use tracing_indicatif::span_ext::IndicatifSpanExt;
 use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Node};
 use tvix_store::{
-    nar::NarCalculationService,
+    nar::{NarCalculationService, NarIngestionError},
     pathinfoservice::{PathInfo, PathInfoService},
 };
 use url::Url;
@@ -396,18 +396,7 @@ where
                 let r = self.download(url.clone()).await?;
 
                 // Pop compression.
-                let r = DecompressedReader::new(r);
-
-                // Wrap the reader, calculating our own hash.
-                let mut hasher: Box<dyn DynDigest + Send> = match exp_hash.algo() {
-                    HashAlgo::Md5 => Box::new(Md5::new()),
-                    HashAlgo::Sha1 => Box::new(Sha1::new()),
-                    HashAlgo::Sha256 => Box::new(Sha256::new()),
-                    HashAlgo::Sha512 => Box::new(Sha512::new()),
-                };
-                let mut r = tokio_util::io::InspectReader::new(r, |b| {
-                    hasher.update(b);
-                });
+                let mut r = DecompressedReader::new(r);
 
                 // Ingest the NAR, get the root node.
                 let (root_node, _actual_nar_sha256, actual_nar_size) =
@@ -415,36 +404,19 @@ where
                         self.blob_service.clone(),
                         self.directory_service.clone(),
                         &mut r,
+                        &Some(CAHash::Nar(exp_hash.clone())),
                     )
                     .await
-                    .map_err(|e| FetcherError::Io(std::io::Error::other(e.to_string())))?;
-
-                // finalize the hasher.
-                let actual_hash = {
-                    match exp_hash.algo() {
-                        HashAlgo::Md5 => {
-                            NixHash::Md5(hasher.finalize().to_vec().try_into().unwrap())
-                        }
-                        HashAlgo::Sha1 => {
-                            NixHash::Sha1(hasher.finalize().to_vec().try_into().unwrap())
+                    .map_err(|e| match e {
+                        NarIngestionError::HashMismatch { expected, actual } => {
+                            FetcherError::HashMismatch {
+                                url,
+                                wanted: expected,
+                                got: actual,
+                            }
                         }
-                        HashAlgo::Sha256 => {
-                            NixHash::Sha256(hasher.finalize().to_vec().try_into().unwrap())
-                        }
-                        HashAlgo::Sha512 => {
-                            NixHash::Sha512(hasher.finalize().to_vec().try_into().unwrap())
-                        }
-                    }
-                };
-
-                // Ensure the hash matches.
-                if exp_hash != actual_hash {
-                    return Err(FetcherError::HashMismatch {
-                        url,
-                        wanted: exp_hash,
-                        got: actual_hash,
-                    });
-                }
+                        _ => FetcherError::Io(std::io::Error::other(e.to_string())),
+                    })?;
                 Ok((
                     root_node,
                     // use a CAHash::Nar with the algo from the input.
diff --git a/tvix/nar-bridge/src/nar.rs b/tvix/nar-bridge/src/nar.rs
index f9b50fd6bc42..c351c9e4bdda 100644
--- a/tvix/nar-bridge/src/nar.rs
+++ b/tvix/nar-bridge/src/nar.rs
@@ -173,14 +173,18 @@ pub async fn put(
     }));
 
     // ingest the NAR
-    let (root_node, nar_hash_actual, nar_size) =
-        ingest_nar_and_hash(blob_service.clone(), directory_service.clone(), &mut r)
-            .await
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
-            .map_err(|e| {
-                warn!(err=%e, "failed to ingest nar");
-                StatusCode::INTERNAL_SERVER_ERROR
-            })?;
+    let (root_node, nar_hash_actual, nar_size) = ingest_nar_and_hash(
+        blob_service.clone(),
+        directory_service.clone(),
+        &mut r,
+        &None,
+    )
+    .await
+    .map_err(io::Error::other)
+    .map_err(|e| {
+        warn!(err=%e, "failed to ingest nar");
+        StatusCode::INTERNAL_SERVER_ERROR
+    })?;
 
     let s = Span::current();
     s.record("nar_hash.expected", nixbase32::encode(&nar_hash_expected));
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 22a7cf19f087..08045e62c125 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -23,7 +23,9 @@ serde = { workspace = true, features = ["derive"] }
 serde_json = { workspace = true }
 serde_with = { workspace = true }
 serde_qs = { workspace = true }
+sha1 = { workspace = true }
 sha2 = { workspace = true }
+md-5 = { workspace = true }
 thiserror = { workspace = true }
 tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] }
 tokio-listener = { workspace = true, features = ["clap", "multi-listener", "sd_listen", "tonic012"] }
@@ -61,6 +63,7 @@ rstest = { workspace = true }
 rstest_reuse = { workspace = true }
 tempfile = { workspace = true }
 tokio-retry = { workspace = true }
+hex-literal = { workspace = true }
 
 [features]
 default = ["cloud", "fuse", "otlp", "tonic-reflection"]
diff --git a/tvix/store/src/nar/hashing_reader.rs b/tvix/store/src/nar/hashing_reader.rs
new file mode 100644
index 000000000000..feb90e8d378e
--- /dev/null
+++ b/tvix/store/src/nar/hashing_reader.rs
@@ -0,0 +1,101 @@
+use std::{
+    io::Result,
+    pin::Pin,
+    task::{ready, Context, Poll},
+};
+
+use md5::{digest::DynDigest, Digest};
+use nix_compat::nixhash::{HashAlgo, NixHash};
+use pin_project_lite::pin_project;
+use tokio::io::{AsyncRead, ReadBuf};
+
+pin_project! {
+    /// AsyncRead implementation with a type-erased hasher.
+    ///
+    /// After it's read the bytes from the underlying reader, it can
+    /// produce the NixHash value corresponding to the Digest that it's been
+    /// constructed with.
+    ///
+    /// Because we are type-erasing the underlying Digest, it uses dynamic dispatch
+    /// and boxing. While it may seem like it could be slow, in practice it's used
+    /// in IO-bound workloads so the slowdown should be negligible.
+    ///
+    /// On the other hand it greatly improves ergonomics of using different hashing
+    /// algorithms and retrieving the corresponding NixHash values.
+    pub struct HashingReader<R> {
+        #[pin]
+        reader: R,
+        digest: Box<dyn ToHash>,
+    }
+}
+
+/// Utility trait that simplifies digesting different hashes.
+///
+/// The main benefit is that each corresponding impl produces its corresponding
+/// NixHash value as opposed to a lower level byte slice.
+trait ToHash: DynDigest + Send {
+    fn consume(self: Box<Self>) -> NixHash;
+}
+
+impl ToHash for sha1::Sha1 {
+    fn consume(self: Box<Self>) -> NixHash {
+        NixHash::Sha1(self.finalize().to_vec().try_into().expect("Tvix bug"))
+    }
+}
+
+impl ToHash for sha2::Sha256 {
+    fn consume(self: Box<Self>) -> NixHash {
+        NixHash::Sha256(self.finalize().to_vec().try_into().expect("Tvix bug"))
+    }
+}
+
+impl ToHash for sha2::Sha512 {
+    fn consume(self: Box<Self>) -> NixHash {
+        NixHash::Sha512(Box::new(
+            self.finalize().to_vec().try_into().expect("Tvix bug"),
+        ))
+    }
+}
+
+impl ToHash for md5::Md5 {
+    fn consume(self: Box<Self>) -> NixHash {
+        NixHash::Md5(self.finalize().to_vec().try_into().expect("Tvix bug"))
+    }
+}
+
+impl<R> HashingReader<R> {
+    /// Given a NixHash, creates a HashingReader that uses the same hashing algorithm.
+    pub fn new_with_algo(algo: HashAlgo, reader: R) -> Self {
+        match algo {
+            HashAlgo::Md5 => HashingReader::new::<md5::Md5>(reader),
+            HashAlgo::Sha1 => HashingReader::new::<sha1::Sha1>(reader),
+            HashAlgo::Sha256 => HashingReader::new::<sha2::Sha256>(reader),
+            HashAlgo::Sha512 => HashingReader::new::<sha2::Sha512>(reader),
+        }
+    }
+    fn new<D: ToHash + Digest + 'static>(reader: R) -> Self {
+        HashingReader {
+            reader,
+            digest: Box::new(D::new()),
+        }
+    }
+
+    /// Returns the [`NixHash`] of the data that's been read from this reader.
+    pub fn consume(self) -> NixHash {
+        self.digest.consume()
+    }
+}
+
+impl<R: AsyncRead> AsyncRead for HashingReader<R> {
+    fn poll_read(
+        self: Pin<&mut Self>,
+        cx: &mut Context<'_>,
+        buf: &mut ReadBuf<'_>,
+    ) -> Poll<Result<()>> {
+        let me = self.project();
+        let filled_length = buf.filled().len();
+        ready!(me.reader.poll_read(cx, buf))?;
+        me.digest.update(&buf.filled()[filled_length..]);
+        Poll::Ready(Ok(()))
+    }
+}
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index b9a15fe71384..93dfec2d90a6 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -1,4 +1,7 @@
-use nix_compat::nar::reader::r#async as nar_reader;
+use nix_compat::{
+    nar::reader::r#async as nar_reader,
+    nixhash::{CAHash, NixHash},
+};
 use sha2::Digest;
 use tokio::{
     io::{AsyncBufRead, AsyncRead},
@@ -15,6 +18,24 @@ use tvix_castore::{
     Node, PathBuf,
 };
 
+use super::hashing_reader::HashingReader;
+
+/// Represents errors that can happen during nar ingestion.
+#[derive(Debug, thiserror::Error)]
+pub enum NarIngestionError {
+    #[error("{0}")]
+    IngestionError(#[from] IngestionError<Error>),
+
+    #[error("Hash mismatch, expected: {expected}, got: {actual}.")]
+    HashMismatch { expected: NixHash, actual: NixHash },
+
+    #[error("Expected the nar to contain a single file.")]
+    TypeMismatch,
+
+    #[error("Ingestion failed: {0}")]
+    Io(#[from] std::io::Error),
+}
+
 /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
 /// interacting with a [BlobService] and [DirectoryService].
 /// Returns the castore root node, as well as the sha256 and size of the NAR
@@ -23,7 +44,8 @@ pub async fn ingest_nar_and_hash<R, BS, DS>(
     blob_service: BS,
     directory_service: DS,
     r: &mut R,
-) -> Result<(Node, [u8; 32], u64), IngestionError<Error>>
+    expected_cahash: &Option<CAHash>,
+) -> Result<(Node, [u8; 32], u64), NarIngestionError>
 where
     R: AsyncRead + Unpin + Send,
     BS: BlobService + Clone + 'static,
@@ -33,20 +55,65 @@ where
     let mut nar_size = 0;
 
     // Assemble NarHash and NarSize as we read bytes.
-    let r = tokio_util::io::InspectReader::new(r, |b| {
+    let mut r = tokio_util::io::InspectReader::new(r, |b| {
         nar_size += b.len() as u64;
-        use std::io::Write;
-        nar_hash.write_all(b).unwrap();
+        nar_hash.update(b);
     });
 
-    // HACK: InspectReader doesn't implement AsyncBufRead.
-    // See if this can be propagated through and we can then require our input
-    // reader to be buffered too.
-    let mut r = tokio::io::BufReader::new(r);
-
-    let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
-
-    Ok((root_node, nar_hash.finalize().into(), nar_size))
+    match expected_cahash {
+        Some(CAHash::Nar(expected_hash)) => {
+            // We technically don't need the Sha256 hasher as we are already computing the nar hash with the reader above,
+            // but it makes the control flow more uniform and easier to understand.
+            let mut ca_reader = HashingReader::new_with_algo(expected_hash.algo(), &mut r);
+            let mut r = tokio::io::BufReader::new(&mut ca_reader);
+            let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
+            let actual_hash = ca_reader.consume();
+
+            if actual_hash != *expected_hash {
+                return Err(NarIngestionError::HashMismatch {
+                    expected: expected_hash.clone(),
+                    actual: actual_hash,
+                });
+            }
+            Ok((root_node, nar_hash.finalize().into(), nar_size))
+        }
+        Some(CAHash::Flat(expected_hash)) => {
+            let mut r = tokio::io::BufReader::new(&mut r);
+            let root_node = ingest_nar(blob_service.clone(), directory_service, &mut r).await?;
+            match &root_node {
+                Node::File { digest, .. } => match blob_service.open_read(digest).await? {
+                    Some(blob_reader) => {
+                        let mut ca_reader =
+                            HashingReader::new_with_algo(expected_hash.algo(), blob_reader);
+                        tokio::io::copy(&mut ca_reader, &mut tokio::io::empty()).await?;
+                        let actual_hash = ca_reader.consume();
+
+                        if actual_hash != *expected_hash {
+                            return Err(NarIngestionError::HashMismatch {
+                                expected: expected_hash.clone(),
+                                actual: actual_hash,
+                            });
+                        }
+                        Ok((root_node, nar_hash.finalize().into(), nar_size))
+                    }
+                    None => Err(NarIngestionError::Io(std::io::Error::other(
+                        "Ingested data not found",
+                    ))),
+                },
+                _ => Err(NarIngestionError::TypeMismatch),
+            }
+        }
+        // We either got CAHash::Text, or no CAHash at all, so we just don't do any additional
+        // hash calculation/validation.
+        // FUTUREWORK: We should figure out what to do with CAHash::Text, according to nix-cpp
+        // they don't handle it either:
+        // https://github.com/NixOS/nix/blob/3e9cc78eb5e5c4f1e762e201856273809fd92e71/src/libstore/local-store.cc#L1099-L1133
+        _ => {
+            let mut r = tokio::io::BufReader::new(&mut r);
+            let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
+            Ok((root_node, nar_hash.finalize().into(), nar_size))
+        }
+    }
 }
 
 /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
@@ -162,10 +229,12 @@ pub enum Error {
 
 #[cfg(test)]
 mod test {
-    use crate::nar::ingest_nar;
+    use crate::nar::{ingest_nar, ingest_nar_and_hash, NarIngestionError};
     use std::io::Cursor;
     use std::sync::Arc;
 
+    use hex_literal::hex;
+    use nix_compat::nixhash::{CAHash, NixHash};
     use rstest::*;
     use tokio_stream::StreamExt;
     use tvix_castore::blobservice::BlobService;
@@ -267,4 +336,77 @@ mod test {
         assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]);
         assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]);
     }
+
+    #[rstest]
+    #[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("fbd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
+    #[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("ff5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), &NAR_CONTENTS_COMPLICATED.clone())]
+    #[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("fd076287532e86365e841e92bfc50d8c")))), &NAR_CONTENTS_HELLOWORLD.clone(), )]
+    #[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("f24eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
+    #[tokio::test]
+    async fn ingest_with_cahash_mismatch(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+        #[case] ca_hash: Option<CAHash>,
+        #[case] nar_content: &Vec<u8>,
+    ) {
+        let err = ingest_nar_and_hash(
+            blob_service.clone(),
+            directory_service.clone(),
+            &mut Cursor::new(nar_content),
+            &ca_hash,
+        )
+        .await
+        .expect_err("Ingestion should have failed");
+        assert!(
+            matches!(err, NarIngestionError::HashMismatch { .. }),
+            "CAHash should have mismatched"
+        );
+    }
+
+    #[rstest]
+    #[case::nar_sha256(Some(CAHash::Nar(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
+    #[case::nar_sha512(Some(CAHash::Nar(NixHash::Sha512(Box::new(hex!("1f5d43941411f35f09211f8596b426ee6e4dd3af1639e0ed2273cbe44b818fc4a59e3af02a057c5b18fbfcf435497de5f1994206c137f469b3df674966a922f0"))))), &NAR_CONTENTS_COMPLICATED.clone())]
+    #[case::flat_md5(Some(CAHash::Flat(NixHash::Md5(hex!("ed076287532e86365e841e92bfc50d8c")))), &NAR_CONTENTS_HELLOWORLD.clone())]
+    #[case::nar_symlink_sha1(Some(CAHash::Nar(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
+    #[tokio::test]
+    async fn ingest_with_cahash_correct(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+        #[case] ca_hash: Option<CAHash>,
+        #[case] nar_content: &Vec<u8>,
+    ) {
+        let _ = ingest_nar_and_hash(
+            blob_service.clone(),
+            directory_service,
+            &mut Cursor::new(nar_content),
+            &ca_hash,
+        )
+        .await
+        .expect("CAHash should have matched");
+    }
+
+    #[rstest]
+    #[case::nar_sha256(Some(CAHash::Flat(NixHash::Sha256(hex!("ebd52279a8df024c9fd5718de4103bf5e760dc7f2cf49044ee7dea87ab16911a")))), &NAR_CONTENTS_COMPLICATED.clone())]
+    #[case::nar_symlink_sha1(Some(CAHash::Flat(NixHash::Sha1(hex!("424eeaaa9cc016bab030bf007cb1be6483e7ba9e")))), &NAR_CONTENTS_SYMLINK.clone())]
+    #[tokio::test]
+    async fn ingest_with_flat_non_file(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+        #[case] ca_hash: Option<CAHash>,
+        #[case] nar_content: &Vec<u8>,
+    ) {
+        let err = ingest_nar_and_hash(
+            blob_service,
+            directory_service,
+            &mut Cursor::new(nar_content),
+            &ca_hash,
+        )
+        .await
+        .expect_err("Ingestion should have failed");
+
+        assert!(
+            matches!(err, NarIngestionError::TypeMismatch),
+            "Flat cahash should only be allowed for single file nars"
+        );
+    }
 }
diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs
index 86505bcb0c07..4bf60ba44993 100644
--- a/tvix/store/src/nar/mod.rs
+++ b/tvix/store/src/nar/mod.rs
@@ -1,11 +1,11 @@
 use tonic::async_trait;
 use tvix_castore::B3Digest;
 
+mod hashing_reader;
 mod import;
 mod renderer;
 pub mod seekable;
-pub use import::ingest_nar;
-pub use import::ingest_nar_and_hash;
+pub use import::{ingest_nar, ingest_nar_and_hash, NarIngestionError};
 pub use renderer::calculate_size_and_sha256;
 pub use renderer::write_nar;
 pub use renderer::SimpleRenderer;
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
index af7580b1c61f..e9b83dcf3551 100644
--- a/tvix/store/src/pathinfoservice/nix_http.rs
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -204,6 +204,7 @@ where
             self.blob_service.clone(),
             self.directory_service.clone(),
             &mut r,
+            &narinfo.ca,
         )
         .await
         .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;