about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/store/src/nar/import.rs41
-rw-r--r--tvix/store/src/nar/mod.rs1
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs27
3 files changed, 46 insertions, 23 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index 36122d419d00..32c2f4e58061 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -1,5 +1,10 @@
 use nix_compat::nar::reader::r#async as nar_reader;
-use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
+use sha2::Digest;
+use tokio::{
+    io::{AsyncBufRead, AsyncRead},
+    sync::mpsc,
+    try_join,
+};
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::DirectoryService,
@@ -13,6 +18,40 @@ use tvix_castore::{
 
 /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
 /// interacting with a [BlobService] and [DirectoryService].
+/// Returns the castore root node, as well as the sha256 and size of the NAR
+/// contents ingested.
+pub async fn ingest_nar_and_hash<R, BS, DS>(
+    blob_service: BS,
+    directory_service: DS,
+    r: &mut R,
+) -> Result<(Node, [u8; 32], u64), IngestionError<Error>>
+where
+    R: AsyncRead + Unpin + Send,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService,
+{
+    let mut nar_hash = sha2::Sha256::new();
+    let mut nar_size = 0;
+
+    // Assemble NarHash and NarSize as we read bytes.
+    let r = tokio_util::io::InspectReader::new(r, |b| {
+        nar_size += b.len() as u64;
+        use std::io::Write;
+        nar_hash.write_all(b).unwrap();
+    });
+
+    // HACK: InspectReader doesn't implement AsyncBufRead.
+    // See if this can be propagated through and we can then require our input
+    // reader to be buffered too.
+    let mut r = tokio::io::BufReader::new(r);
+
+    let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
+
+    Ok((root_node, nar_hash.finalize().into(), nar_size))
+}
+
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
 /// It returns the castore root node or an error.
 pub async fn ingest_nar<R, BS, DS>(
     blob_service: BS,
diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs
index 164748a655e8..8cbb091f1aab 100644
--- a/tvix/store/src/nar/mod.rs
+++ b/tvix/store/src/nar/mod.rs
@@ -4,6 +4,7 @@ use tvix_castore::B3Digest;
 mod import;
 mod renderer;
 pub use import::ingest_nar;
+pub use import::ingest_nar_and_hash;
 pub use renderer::calculate_size_and_sha256;
 pub use renderer::write_nar;
 pub use renderer::SimpleRenderer;
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
index cccd4805c6ca..1dd7da4831b4 100644
--- a/tvix/store/src/pathinfoservice/nix_http.rs
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -1,3 +1,5 @@
+use super::PathInfoService;
+use crate::{nar::ingest_nar_and_hash, proto::PathInfo};
 use futures::{stream::BoxStream, TryStreamExt};
 use nix_compat::{
     narinfo::{self, NarInfo},
@@ -5,20 +7,13 @@ use nix_compat::{
     nixhash::NixHash,
 };
 use reqwest::StatusCode;
-use sha2::Digest;
-use std::io::{self, Write};
-use tokio::io::{AsyncRead, BufReader};
-use tokio_util::io::InspectReader;
+use tokio::io::{self, AsyncRead};
 use tonic::async_trait;
 use tracing::{debug, instrument, warn};
 use tvix_castore::{
     blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error,
 };
 
-use crate::proto::PathInfo;
-
-use super::PathInfoService;
-
 /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
 /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix
 /// Store Model.
@@ -178,7 +173,7 @@ where
         }));
 
         // handle decompression, depending on the compression field.
-        let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
+        let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
             Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
             Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
                 as Box<dyn AsyncRead + Send + Unpin>,
@@ -194,19 +189,8 @@ where
                 )));
             }
         };
-        let mut nar_hash = sha2::Sha256::new();
-        let mut nar_size = 0;
-
-        // Assemble NarHash and NarSize as we read bytes.
-        let r = InspectReader::new(r, |b| {
-            nar_size += b.len() as u64;
-            nar_hash.write_all(b).unwrap();
-        });
-
-        // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors.
-        let mut r = BufReader::new(r);
 
-        let root_node = crate::nar::ingest_nar(
+        let (root_node, nar_hash, nar_size) = ingest_nar_and_hash(
             self.blob_service.clone(),
             self.directory_service.clone(),
             &mut r,
@@ -226,7 +210,6 @@ where
                 "NarSize mismatch".to_string(),
             ))?;
         }
-        let nar_hash: [u8; 32] = nar_hash.finalize().into();
         if narinfo.nar_hash != nar_hash {
             warn!(
                 narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),