about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-11-27T17·43+0200
committerflokli <flokli@flokli.de>2023-11-28T16·08+0000
commit7e8719be9146ed370038cb51a87c53ebea388ec3 (patch)
tree762912cadcbd9ebc0b49f2a047f0b4ff76e41bdd
parentfce9c0e99ef219a97bd8ba14cf0eed6c998a1c80 (diff)
feat(tvix/store/pathinfosvc/nix_http): check Nar{Size,Hash} matches r/7081
Ensure the initially communicated NarHash/NarSize from the NarInfo
matches what we read, and don't return a PathInfo message if there's
a mismatch.

Also move the buffering layer around a bit.

Change-Id: I68c60ecfaf0f9cd5edacea648437ecb0c9729251
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10148
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs97
1 files changed, 84 insertions, 13 deletions
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
index 49cd515ce913..26e516a90d4f 100644
--- a/tvix/store/src/pathinfoservice/nix_http.rs
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -1,5 +1,5 @@
 use std::{
-    io::{self, BufRead},
+    io::{self, BufRead, Read, Write},
     pin::Pin,
     sync::Arc,
 };
@@ -8,6 +8,7 @@ use data_encoding::BASE64;
 use futures::{Stream, TryStreamExt};
 use nix_compat::{narinfo::NarInfo, nixbase32};
 use reqwest::StatusCode;
+use sha2::{digest::FixedOutput, Digest, Sha256};
 use tonic::async_trait;
 use tracing::{debug, instrument, warn};
 use tvix_castore::{
@@ -147,12 +148,12 @@ impl PathInfoService for NixHTTPPathInfoService {
             warn!(e=%e, "failed to get response body");
             io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
         }));
-        let sync_r = std::io::BufReader::new(tokio_util::io::SyncIoBridge::new(async_r));
+        let sync_r = tokio_util::io::SyncIoBridge::new(async_r);
 
         // handle decompression, by wrapping the reader.
-        let mut sync_r: Box<dyn BufRead + Send> = match narinfo.compression {
+        let sync_r: Box<dyn BufRead + Send> = match narinfo.compression {
             Some("none") => Box::new(sync_r),
-            Some("xz") => Box::new(std::io::BufReader::new(xz2::read::XzDecoder::new(sync_r))),
+            Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))),
             Some(comp) => {
                 return Err(Error::InvalidRequest(
                     format!("unsupported compression: {}", comp).to_string(),
@@ -168,20 +169,55 @@ impl PathInfoService for NixHTTPPathInfoService {
         let res = tokio::task::spawn_blocking({
             let blob_service = self.blob_service.clone();
             let directory_service = self.directory_service.clone();
-            move || crate::nar::read_nar(&mut sync_r, blob_service, directory_service)
+            move || -> io::Result<_> {
+                // Wrap the reader once more, so we can calculate NarSize and NarHash
+                let mut sync_r = io::BufReader::new(NarReader::from(sync_r));
+                let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?;
+
+                let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner();
+
+                Ok((root_node, nar_hash, nar_size))
+            }
         })
         .await
         .unwrap();
 
         match res {
-            Ok(root_node) => Ok(Some(PathInfo {
-                node: Some(castorepb::Node {
-                    // set the name of the root node to the digest-name of the store path.
-                    node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
-                }),
-                references: pathinfo.references,
-                narinfo: pathinfo.narinfo,
-            })),
+            Ok((root_node, nar_hash, nar_size)) => {
+                // ensure the ingested narhash and narsize do actually match.
+                if narinfo.nar_size != nar_size {
+                    warn!(
+                        narinfo.nar_size = narinfo.nar_size,
+                        http.nar_size = nar_size,
+                        "NARSize mismatch"
+                    );
+                    Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        "NarSize mismatch".to_string(),
+                    ))?;
+                } else if narinfo.nar_hash != nar_hash {
+                    warn!(
+                        narinfo.nar_hash = BASE64.encode(&narinfo.nar_hash),
+                        http.nar_hash = BASE64.encode(&nar_hash),
+                        "NarHash mismatch"
+                    );
+                    Err(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        "NarHash mismatch".to_string(),
+                    ))?;
+                }
+
+                Ok(Some(PathInfo {
+                    node: Some(castorepb::Node {
+                        // set the name of the root node to the digest-name of the store path.
+                        node: Some(
+                            root_node.rename(narinfo.store_path.to_string().to_owned().into()),
+                        ),
+                    }),
+                    references: pathinfo.references,
+                    narinfo: pathinfo.narinfo,
+                }))
+            }
             Err(e) => Err(e.into()),
         }
     }
@@ -211,3 +247,38 @@ impl PathInfoService for NixHTTPPathInfoService {
         }))
     }
 }
+
+/// Small helper reader implementing [std::io::Read].
+/// It can be used to wrap another reader, counts the number of bytes read
+/// and the sha256 digest of the contents.
+struct NarReader<R: Read> {
+    r: R,
+
+    sha256: sha2::Sha256,
+    bytes_read: u64,
+}
+
+impl<R: Read> NarReader<R> {
+    pub fn from(inner: R) -> Self {
+        Self {
+            r: inner,
+            sha256: Sha256::new(),
+            bytes_read: 0,
+        }
+    }
+
+    /// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read.
+    pub fn into_inner(self) -> (R, [u8; 32], u64) {
+        (self.r, self.sha256.finalize_fixed().into(), self.bytes_read)
+    }
+}
+
+impl<R: Read> Read for NarReader<R> {
+    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+        self.r.read(buf).map(|n| {
+            self.bytes_read += n as u64;
+            self.sha256.write_all(&buf[..n]).unwrap();
+            n
+        })
+    }
+}