diff options
author | Florian Klink <flokli@flokli.de> | 2023-11-27T17·43+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-11-28T16·08+0000 |
commit | 7e8719be9146ed370038cb51a87c53ebea388ec3 (patch) | |
tree | 762912cadcbd9ebc0b49f2a047f0b4ff76e41bdd | |
parent | fce9c0e99ef219a97bd8ba14cf0eed6c998a1c80 (diff) |
feat(tvix/store/pathinfosvc/nix_http): check Nar{Size,Hash} matches r/7081
Ensure the initially communicated NarHash/NarSize from the NarInfo matches what we read, and don't return a PathInfo message if there's a mismatch. Also move the buffering layer around a bit. Change-Id: I68c60ecfaf0f9cd5edacea648437ecb0c9729251 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10148 Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
-rw-r--r-- | tvix/store/src/pathinfoservice/nix_http.rs | 97 |
1 files changed, 84 insertions, 13 deletions
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index 49cd515ce913..26e516a90d4f 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -1,5 +1,5 @@ use std::{ - io::{self, BufRead}, + io::{self, BufRead, Read, Write}, pin::Pin, sync::Arc, }; @@ -8,6 +8,7 @@ use data_encoding::BASE64; use futures::{Stream, TryStreamExt}; use nix_compat::{narinfo::NarInfo, nixbase32}; use reqwest::StatusCode; +use sha2::{digest::FixedOutput, Digest, Sha256}; use tonic::async_trait; use tracing::{debug, instrument, warn}; use tvix_castore::{ @@ -147,12 +148,12 @@ impl PathInfoService for NixHTTPPathInfoService { warn!(e=%e, "failed to get response body"); io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) })); - let sync_r = std::io::BufReader::new(tokio_util::io::SyncIoBridge::new(async_r)); + let sync_r = tokio_util::io::SyncIoBridge::new(async_r); // handle decompression, by wrapping the reader. - let mut sync_r: Box<dyn BufRead + Send> = match narinfo.compression { + let sync_r: Box<dyn BufRead + Send> = match narinfo.compression { Some("none") => Box::new(sync_r), - Some("xz") => Box::new(std::io::BufReader::new(xz2::read::XzDecoder::new(sync_r))), + Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))), Some(comp) => { return Err(Error::InvalidRequest( format!("unsupported compression: {}", comp).to_string(), @@ -168,20 +169,55 @@ impl PathInfoService for NixHTTPPathInfoService { let res = tokio::task::spawn_blocking({ let blob_service = self.blob_service.clone(); let directory_service = self.directory_service.clone(); - move || crate::nar::read_nar(&mut sync_r, blob_service, directory_service) + move || -> io::Result<_> { + // Wrap the reader once more, so we can calculate NarSize and NarHash + let mut sync_r = io::BufReader::new(NarReader::from(sync_r)); + let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?; + + let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner(); + + Ok((root_node, nar_hash, nar_size)) + } }) .await .unwrap(); match res { - Ok(root_node) => Ok(Some(PathInfo { - node: Some(castorepb::Node { - // set the name of the root node to the digest-name of the store path. - node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())), - }), - references: pathinfo.references, - narinfo: pathinfo.narinfo, - })), + Ok((root_node, nar_hash, nar_size)) => { + // ensure the ingested narhash and narsize do actually match. + if narinfo.nar_size != nar_size { + warn!( + narinfo.nar_size = narinfo.nar_size, + http.nar_size = nar_size, + "NARSize mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarSize mismatch".to_string(), + ))?; + } else if narinfo.nar_hash != nar_hash { + warn!( + narinfo.nar_hash = BASE64.encode(&narinfo.nar_hash), + http.nar_hash = BASE64.encode(&nar_hash), + "NarHash mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarHash mismatch".to_string(), + ))?; + } + + Ok(Some(PathInfo { + node: Some(castorepb::Node { + // set the name of the root node to the digest-name of the store path. + node: Some( + root_node.rename(narinfo.store_path.to_string().to_owned().into()), + ), + }), + references: pathinfo.references, + narinfo: pathinfo.narinfo, + })) + } Err(e) => Err(e.into()), } } @@ -211,3 +247,38 @@ impl PathInfoService for NixHTTPPathInfoService { })) } } + +/// Small helper reader implementing [std::io::Read]. +/// It can be used to wrap another reader, counts the number of bytes read +/// and the sha256 digest of the contents. +struct NarReader<R: Read> { + r: R, + + sha256: sha2::Sha256, + bytes_read: u64, +} + +impl<R: Read> NarReader<R> { + pub fn from(inner: R) -> Self { + Self { + r: inner, + sha256: Sha256::new(), + bytes_read: 0, + } + } + + /// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read. + pub fn into_inner(self) -> (R, [u8; 32], u64) { + (self.r, self.sha256.finalize_fixed().into(), self.bytes_read) + } +} + +impl<R: Read> Read for NarReader<R> { + fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { + self.r.read(buf).map(|n| { + self.bytes_read += n as u64; + self.sha256.write_all(&buf[..n]).unwrap(); + n + }) + } +} |