diff options
-rw-r--r-- | tvix/store/src/nar/import.rs | 41 | ||||
-rw-r--r-- | tvix/store/src/nar/mod.rs | 1 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/nix_http.rs | 27 |
3 files changed, 46 insertions, 23 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 36122d419d00..32c2f4e58061 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,5 +1,10 @@ use nix_compat::nar::reader::r#async as nar_reader; -use tokio::{io::AsyncBufRead, sync::mpsc, try_join}; +use sha2::Digest; +use tokio::{ + io::{AsyncBufRead, AsyncRead}, + sync::mpsc, + try_join, +}; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, @@ -13,6 +18,40 @@ use tvix_castore::{ /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, /// interacting with a [BlobService] and [DirectoryService]. +/// Returns the castore root node, as well as the sha256 and size of the NAR +/// contents ingested. +pub async fn ingest_nar_and_hash<R, BS, DS>( + blob_service: BS, + directory_service: DS, + r: &mut R, +) -> Result<(Node, [u8; 32], u64), IngestionError<Error>> +where + R: AsyncRead + Unpin + Send, + BS: BlobService + Clone + 'static, + DS: DirectoryService, +{ + let mut nar_hash = sha2::Sha256::new(); + let mut nar_size = 0; + + // Assemble NarHash and NarSize as we read bytes. + let r = tokio_util::io::InspectReader::new(r, |b| { + nar_size += b.len() as u64; + use std::io::Write; + nar_hash.write_all(b).unwrap(); + }); + + // HACK: InspectReader doesn't implement AsyncBufRead. + // See if this can be propagated through and we can then require our input + // reader to be buffered too. + let mut r = tokio::io::BufReader::new(r); + + let root_node = ingest_nar(blob_service, directory_service, &mut r).await?; + + Ok((root_node, nar_hash.finalize().into(), nar_size)) +} + +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. /// It returns the castore root node or an error. pub async fn ingest_nar<R, BS, DS>( blob_service: BS, diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs index 164748a655e8..8cbb091f1aab 100644 --- a/tvix/store/src/nar/mod.rs +++ b/tvix/store/src/nar/mod.rs @@ -4,6 +4,7 @@ use tvix_castore::B3Digest; mod import; mod renderer; pub use import::ingest_nar; +pub use import::ingest_nar_and_hash; pub use renderer::calculate_size_and_sha256; pub use renderer::write_nar; pub use renderer::SimpleRenderer; diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index cccd4805c6ca..1dd7da4831b4 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -1,3 +1,5 @@ +use super::PathInfoService; +use crate::{nar::ingest_nar_and_hash, proto::PathInfo}; use futures::{stream::BoxStream, TryStreamExt}; use nix_compat::{ narinfo::{self, NarInfo}, @@ -5,20 +7,13 @@ use nix_compat::{ nixhash::NixHash, }; use reqwest::StatusCode; -use sha2::Digest; -use std::io::{self, Write}; -use tokio::io::{AsyncRead, BufReader}; -use tokio_util::io::InspectReader; +use tokio::io::{self, AsyncRead}; use tonic::async_trait; use tracing::{debug, instrument, warn}; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, }; -use crate::proto::PathInfo; - -use super::PathInfoService; - /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix /// Store Model. @@ -178,7 +173,7 @@ where })); // handle decompression, depending on the compression field. - let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression { + let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression { Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>, Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r)) as Box<dyn AsyncRead + Send + Unpin>, @@ -194,19 +189,8 @@ where ))); } }; - let mut nar_hash = sha2::Sha256::new(); - let mut nar_size = 0; - - // Assemble NarHash and NarSize as we read bytes. - let r = InspectReader::new(r, |b| { - nar_size += b.len() as u64; - nar_hash.write_all(b).unwrap(); - }); - - // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors. - let mut r = BufReader::new(r); - let root_node = crate::nar::ingest_nar( + let (root_node, nar_hash, nar_size) = ingest_nar_and_hash( self.blob_service.clone(), self.directory_service.clone(), &mut r, @@ -226,7 +210,6 @@ where "NarSize mismatch".to_string(), ))?; } - let nar_hash: [u8; 32] = nar_hash.finalize().into(); if narinfo.nar_hash != nar_hash { warn!( narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash), |