about summary refs log tree commit diff
path: root/tvix/glue/src/fetchers/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/glue/src/fetchers/mod.rs')
-rw-r--r--tvix/glue/src/fetchers/mod.rs106
1 files changed, 96 insertions, 10 deletions
diff --git a/tvix/glue/src/fetchers/mod.rs b/tvix/glue/src/fetchers/mod.rs
index 0ebc5fd3a638..376a4cca634c 100644
--- a/tvix/glue/src/fetchers/mod.rs
+++ b/tvix/glue/src/fetchers/mod.rs
@@ -6,8 +6,8 @@ use nix_compat::{
 };
 use sha1::Sha1;
 use sha2::{digest::Output, Digest, Sha256, Sha512};
-use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite};
-use tokio_util::io::InspectReader;
+use tokio::io::{AsyncBufRead, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
+use tokio_util::io::{InspectReader, InspectWriter};
 use tracing::warn;
 use tvix_castore::{
     blobservice::BlobService,
@@ -381,7 +381,7 @@ where
                 });
 
                 // Ingest the NAR, get the root node.
-                let (root_node, actual_nar_sha256, actual_nar_size) =
+                let (root_node, _actual_nar_sha256, actual_nar_size) =
                     tvix_store::nar::ingest_nar_and_hash(
                         self.blob_service.clone(),
                         self.directory_service.clone(),
@@ -416,14 +416,99 @@ where
                         got: actual_hash,
                     });
                 }
-
                 Ok((
                     root_node,
-                    CAHash::Nar(NixHash::Sha256(actual_nar_sha256)),
+                    // use a CAHash::Nar with the algo from the input.
+                    CAHash::Nar(exp_hash),
                     actual_nar_size,
                 ))
             }
-            Fetch::Executable { url: _, hash: _ } => todo!(),
+            Fetch::Executable {
+                url,
+                hash: exp_hash,
+            } => {
+                // Construct a AsyncRead reading from the data as its downloaded.
+                let mut r = self.download(url.clone()).await?;
+
+                // Construct a AsyncWrite to write into the BlobService.
+                let mut blob_writer = self.blob_service.open_write().await;
+
+                // Copy the contents from the download reader to the blob writer.
+                let file_size = tokio::io::copy(&mut r, &mut blob_writer).await?;
+                let blob_digest = blob_writer.close().await?;
+
+                // Render the NAR representation on-the-fly into a hash function with
+                // the same algo as our expected hash.
+                // We cannot do this upfront, as we don't know the actual size.
+                // FUTUREWORK: make opportunistic use of Content-Length header?
+
+                let w = tokio::io::sink();
+                // Construct the hash function.
+                let mut hasher: Box<dyn DynDigest + Send> = match exp_hash.algo() {
+                    HashAlgo::Md5 => Box::new(Md5::new()),
+                    HashAlgo::Sha1 => Box::new(Sha1::new()),
+                    HashAlgo::Sha256 => Box::new(Sha256::new()),
+                    HashAlgo::Sha512 => Box::new(Sha512::new()),
+                };
+
+                let mut nar_size: u64 = 0;
+                let mut w = InspectWriter::new(w, |d| {
+                    hasher.update(d);
+                    nar_size += d.len() as u64;
+                });
+
+                {
+                    let node = nix_compat::nar::writer::r#async::open(&mut w).await?;
+
+                    let blob_reader = self
+                        .blob_service
+                        .open_read(&blob_digest)
+                        .await?
+                        .expect("Tvix bug: just-uploaded blob not found");
+
+                    node.file(true, file_size, &mut BufReader::new(blob_reader))
+                        .await?;
+
+                    w.flush().await?;
+                }
+
+                // finalize the hasher.
+                let actual_hash = {
+                    match exp_hash.algo() {
+                        HashAlgo::Md5 => {
+                            NixHash::Md5(hasher.finalize().to_vec().try_into().unwrap())
+                        }
+                        HashAlgo::Sha1 => {
+                            NixHash::Sha1(hasher.finalize().to_vec().try_into().unwrap())
+                        }
+                        HashAlgo::Sha256 => {
+                            NixHash::Sha256(hasher.finalize().to_vec().try_into().unwrap())
+                        }
+                        HashAlgo::Sha512 => {
+                            NixHash::Sha512(hasher.finalize().to_vec().try_into().unwrap())
+                        }
+                    }
+                };
+
+                if exp_hash != actual_hash {
+                    return Err(FetcherError::HashMismatch {
+                        url,
+                        wanted: exp_hash,
+                        got: actual_hash,
+                    });
+                }
+
+                // Construct and return the FileNode describing the downloaded contents,
+                // make it executable.
+                let root_node = Node::File(FileNode {
+                    name: vec![].into(),
+                    digest: blob_digest.into(),
+                    size: file_size,
+                    executable: true,
+                });
+
+                Ok((root_node, CAHash::Nar(actual_hash), file_size))
+            }
             Fetch::Git() => todo!(),
         }
     }
@@ -441,7 +526,7 @@ where
         // Fetch file, return the (unnamed) (File)Node of its contents, ca hash and filesize.
         let (node, ca_hash, size) = self.ingest(fetch).await?;
 
-        // Calculate the store path to return later, which is done with the ca_hash.
+        // Calculate the store path to return, by calculating from ca_hash.
         let store_path = build_ca_path(name, &ca_hash, Vec::<String>::new(), false)?;
 
         // Rename the node name to match the Store Path.
@@ -450,14 +535,15 @@ where
         // If the resulting hash is not a CAHash::Nar, we also need to invoke
         // `calculate_nar` to calculate this representation, as it's required in
         // the [PathInfo].
+        // FUTUREWORK: allow ingest() to return multiple hashes, or have it feed
+        // nar_calculation_service too?
         let (nar_size, nar_sha256) = match &ca_hash {
-            CAHash::Flat(_nix_hash) => self
+            CAHash::Nar(NixHash::Sha256(nar_sha256)) => (size, *nar_sha256),
+            CAHash::Nar(_) | CAHash::Flat(_) => self
                 .nar_calculation_service
                 .calculate_nar(&node)
                 .await
                 .map_err(|e| FetcherError::Io(e.into()))?,
-            CAHash::Nar(NixHash::Sha256(nar_sha256)) => (size, *nar_sha256),
-            CAHash::Nar(_) => unreachable!("Tvix bug: fetch returned non-sha256 CAHash::Nar"),
             CAHash::Text(_) => unreachable!("Tvix bug: fetch returned CAHash::Text"),
         };