about summary refs log tree commit diff
path: root/tvix/store/src/nar/import.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/nar/import.rs')
-rw-r--r--tvix/store/src/nar/import.rs109
1 files changed, 76 insertions, 33 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index 3d7c50014aa8..b9a15fe71384 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -1,15 +1,56 @@
 use nix_compat::nar::reader::r#async as nar_reader;
-use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
+use sha2::Digest;
+use tokio::{
+    io::{AsyncBufRead, AsyncRead},
+    sync::mpsc,
+    try_join,
+};
 use tvix_castore::{
     blobservice::BlobService,
     directoryservice::DirectoryService,
-    import::{ingest_entries, IngestionEntry, IngestionError},
-    proto::{node::Node, NamedNode},
-    PathBuf,
+    import::{
+        blobs::{self, ConcurrentBlobUploader},
+        ingest_entries, IngestionEntry, IngestionError,
+    },
+    Node, PathBuf,
 };
 
 /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
 /// interacting with a [BlobService] and [DirectoryService].
+/// Returns the castore root node, as well as the sha256 and size of the NAR
+/// contents ingested.
+pub async fn ingest_nar_and_hash<R, BS, DS>(
+    blob_service: BS,
+    directory_service: DS,
+    r: &mut R,
+) -> Result<(Node, [u8; 32], u64), IngestionError<Error>>
+where
+    R: AsyncRead + Unpin + Send,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService,
+{
+    let mut nar_hash = sha2::Sha256::new();
+    let mut nar_size = 0;
+
+    // Assemble NarHash and NarSize as we read bytes.
+    let r = tokio_util::io::InspectReader::new(r, |b| {
+        nar_size += b.len() as u64;
+        use std::io::Write;
+        nar_hash.write_all(b).unwrap();
+    });
+
+    // HACK: InspectReader doesn't implement AsyncBufRead.
+    // See if this can be propagated through and we can then require our input
+    // reader to be buffered too.
+    let mut r = tokio::io::BufReader::new(r);
+
+    let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
+
+    Ok((root_node, nar_hash.finalize().into(), nar_size))
+}
+
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
 /// It returns the castore root node or an error.
 pub async fn ingest_nar<R, BS, DS>(
     blob_service: BS,
@@ -18,7 +59,7 @@ pub async fn ingest_nar<R, BS, DS>(
 ) -> Result<Node, IngestionError<Error>>
 where
     R: AsyncBufRead + Unpin + Send,
-    BS: BlobService + Clone,
+    BS: BlobService + Clone + 'static,
     DS: DirectoryService,
 {
     // open the NAR for reading.
@@ -29,14 +70,22 @@ where
     let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
 
     let produce = async move {
+        let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
+
         let res = produce_nar_inner(
-            blob_service,
+            &mut blob_uploader,
             root_node,
             "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
             tx.clone(),
         )
         .await;
 
+        if let Err(err) = blob_uploader.join().await {
+            tx.send(Err(err.into()))
+                .await
+                .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+        }
+
         tx.send(res)
             .await
             .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
@@ -48,19 +97,17 @@ where
 
     let (_, node) = try_join!(produce, consume)?;
 
-    // remove the fake "root" name again
-    debug_assert_eq!(&node.get_name(), b"root");
-    Ok(node.rename("".into()))
+    Ok(node)
 }
 
 async fn produce_nar_inner<BS>(
-    blob_service: BS,
+    blob_uploader: &mut ConcurrentBlobUploader<BS>,
     node: nar_reader::Node<'_, '_>,
     path: PathBuf,
     tx: mpsc::Sender<Result<IngestionEntry, Error>>,
 ) -> Result<IngestionEntry, Error>
 where
-    BS: BlobService + Clone,
+    BS: BlobService + Clone + 'static,
 {
     Ok(match node {
         nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
@@ -68,12 +115,8 @@ where
             executable,
             mut reader,
         } => {
-            let (digest, size) = {
-                let mut blob_writer = blob_service.open_write().await;
-                let size = tokio::io::copy_buf(&mut reader, &mut blob_writer).await?;
-
-                (blob_writer.close().await?, size)
-            };
+            let size = reader.len();
+            let digest = blob_uploader.upload(&path, size, &mut reader).await?;
 
             IngestionEntry::Regular {
                 path,
@@ -91,7 +134,7 @@ where
                     .expect("Tvix bug: failed to join name");
 
                 let entry = Box::pin(produce_nar_inner(
-                    blob_service.clone(),
+                    blob_uploader,
                     entry.node,
                     path,
                     tx.clone(),
@@ -112,6 +155,9 @@ where
 pub enum Error {
     #[error(transparent)]
     IO(#[from] std::io::Error),
+
+    #[error(transparent)]
+    BlobUpload(#[from] blobs::Error),
 }
 
 #[cfg(test)]
@@ -128,7 +174,7 @@ mod test {
         DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS,
         HELLOWORLD_BLOB_DIGEST,
     };
-    use tvix_castore::proto as castorepb;
+    use tvix_castore::{Directory, Node};
 
     use crate::tests::fixtures::{
         blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD,
@@ -150,10 +196,9 @@ mod test {
         .expect("must parse");
 
         assert_eq!(
-            castorepb::node::Node::Symlink(castorepb::SymlinkNode {
-                name: "".into(), // name must be empty
-                target: "/nix/store/somewhereelse".into(),
-            }),
+            Node::Symlink {
+                target: "/nix/store/somewhereelse".try_into().unwrap()
+            },
             root_node
         );
     }
@@ -173,12 +218,11 @@ mod test {
         .expect("must parse");
 
         assert_eq!(
-            castorepb::node::Node::File(castorepb::FileNode {
-                name: "".into(), // name must be empty
-                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+            Node::File {
+                digest: HELLOWORLD_BLOB_DIGEST.clone(),
                 size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
                 executable: false,
-            }),
+            },
             root_node
         );
 
@@ -201,11 +245,10 @@ mod test {
         .expect("must parse");
 
         assert_eq!(
-            castorepb::node::Node::Directory(castorepb::DirectoryNode {
-                name: "".into(), // name must be empty
-                digest: DIRECTORY_COMPLICATED.digest().into(),
-                size: DIRECTORY_COMPLICATED.size(),
-            }),
+            Node::Directory {
+                digest: DIRECTORY_COMPLICATED.digest(),
+                size: DIRECTORY_COMPLICATED.size()
+            },
             root_node,
         );
 
@@ -213,7 +256,7 @@ mod test {
         assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
 
         // directoryservice must contain the directories, at least with get_recursive.
-        let resp: Result<Vec<castorepb::Directory>, _> = directory_service
+        let resp: Result<Vec<Directory>, _> = directory_service
             .get_recursive(&DIRECTORY_COMPLICATED.digest())
             .collect()
             .await;