about summary refs log tree commit diff
path: root/tvix/store/src/nar/import.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/nar/import.rs')
-rw-r--r--tvix/store/src/nar/import.rs366
1 files changed, 124 insertions, 242 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index 6f4dcdea5d..36122d419d 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -1,225 +1,132 @@
-use bytes::Bytes;
-use nix_compat::nar;
-use std::io::{self, BufRead};
-use tokio_util::io::SyncIoBridge;
-use tracing::warn;
+use nix_compat::nar::reader::r#async as nar_reader;
+use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
 use tvix_castore::{
     blobservice::BlobService,
-    directoryservice::{DirectoryPutter, DirectoryService},
-    proto::{self as castorepb},
-    B3Digest,
+    directoryservice::DirectoryService,
+    import::{
+        blobs::{self, ConcurrentBlobUploader},
+        ingest_entries, IngestionEntry, IngestionError,
+    },
+    proto::{node::Node, NamedNode},
+    PathBuf,
 };
 
-/// Accepts a reader providing a NAR.
-/// Will traverse it, uploading blobs to the given [BlobService], and
-/// directories to the given [DirectoryService].
-/// On success, the root node is returned.
-/// This function is not async (because the NAR reader is not)
-/// and calls [tokio::task::block_in_place] when interacting with backing
-/// services, so make sure to only call this with spawn_blocking.
-pub fn read_nar<R, BS, DS>(
-    r: &mut R,
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
+/// It returns the castore root node or an error.
+pub async fn ingest_nar<R, BS, DS>(
     blob_service: BS,
     directory_service: DS,
-) -> io::Result<castorepb::node::Node>
+    r: &mut R,
+) -> Result<Node, IngestionError<Error>>
 where
-    R: BufRead + Send,
-    BS: AsRef<dyn BlobService>,
-    DS: AsRef<dyn DirectoryService>,
+    R: AsyncBufRead + Unpin + Send,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService,
 {
-    let handle = tokio::runtime::Handle::current();
-
-    let directory_putter = directory_service.as_ref().put_multiple_start();
-
-    let node = nix_compat::nar::reader::open(r)?;
-    let (root_node, mut directory_putter, _) = process_node(
-        handle.clone(),
-        "".into(), // this is the root node, it has an empty name
-        node,
-        &blob_service,
-        directory_putter,
-    )?;
-
-    // In case the root node points to a directory, we need to close
-    // [directory_putter], and ensure the digest we got back from there matches
-    // what the root node is pointing to.
-    if let castorepb::node::Node::Directory(ref directory_node) = root_node {
-        // Close directory_putter to make sure all directories have been inserted.
-        let directory_putter_digest =
-            handle.block_on(handle.spawn(async move { directory_putter.close().await }))??;
-        let root_directory_node_digest: B3Digest =
-            directory_node.digest.clone().try_into().unwrap();
-
-        if directory_putter_digest != root_directory_node_digest {
-            warn!(
-                root_directory_node_digest = %root_directory_node_digest,
-                directory_putter_digest =%directory_putter_digest,
-                "directory digest mismatch",
-            );
-            return Err(io::Error::new(
-                io::ErrorKind::Other,
-                "directory digest mismatch",
-            ));
-        }
-    }
-    // In case it's not a Directory, [directory_putter] doesn't need to be
-    // closed (as we didn't end up uploading anything).
-    // It can just be dropped, as documented in its trait.
-
-    Ok(root_node)
-}
+    // open the NAR for reading.
+    // The NAR reader emits nodes in DFS preorder.
+    let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
 
-/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node].
-/// It does so by handling all three kinds, and recursing for directories.
-///
-/// [DirectoryPutter] is passed around, so a single instance of it can be used,
-/// which is sufficient, as this reads through the whole NAR linerarly.
-fn process_node<BS>(
-    handle: tokio::runtime::Handle,
-    name: bytes::Bytes,
-    node: nar::reader::Node,
-    blob_service: BS,
-    directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)>
-where
-    BS: AsRef<dyn BlobService>,
-{
-    Ok(match node {
-        nar::reader::Node::Symlink { target } => (
-            castorepb::node::Node::Symlink(castorepb::SymlinkNode {
-                name,
-                target: target.into(),
-            }),
-            directory_putter,
-            blob_service,
-        ),
-        nar::reader::Node::File { executable, reader } => (
-            castorepb::node::Node::File(process_file_reader(
-                handle,
-                name,
-                reader,
-                executable,
-                &blob_service,
-            )?),
-            directory_putter,
-            blob_service,
-        ),
-        nar::reader::Node::Directory(dir_reader) => {
-            let (directory_node, directory_putter, blob_service_back) =
-                process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
-
-            (
-                castorepb::node::Node::Directory(directory_node),
-                directory_putter,
-                blob_service_back,
-            )
-        }
-    })
-}
-
-/// Given a name and [nar::reader::FileReader], this ingests the file into the
-/// passed [BlobService] and returns a [castorepb::FileNode].
-fn process_file_reader<BS>(
-    handle: tokio::runtime::Handle,
-    name: Bytes,
-    mut file_reader: nar::reader::FileReader,
-    executable: bool,
-    blob_service: BS,
-) -> io::Result<castorepb::FileNode>
-where
-    BS: AsRef<dyn BlobService>,
-{
-    // store the length. If we read any other length, reading will fail.
-    let expected_len = file_reader.len();
+    let (tx, rx) = mpsc::channel(1);
+    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
 
-    // prepare writing a new blob.
-    let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await });
+    let produce = async move {
+        let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
 
-    // write the blob.
-    let mut blob_writer = {
-        let mut dst = SyncIoBridge::new(blob_writer);
+        let res = produce_nar_inner(
+            &mut blob_uploader,
+            root_node,
+            "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
+            tx.clone(),
+        )
+        .await;
+
+        if let Err(err) = blob_uploader.join().await {
+            tx.send(Err(err.into()))
+                .await
+                .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+        }
 
-        file_reader.copy(&mut dst)?;
-        dst.shutdown()?;
+        tx.send(res)
+            .await
+            .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
 
-        // return back the blob_writer
-        dst.into_inner()
+        Ok(())
     };
 
-    // close the blob_writer, retrieve the digest.
-    let blob_digest = handle.block_on(async { blob_writer.close().await })?;
+    let consume = ingest_entries(directory_service, rx);
 
-    Ok(castorepb::FileNode {
-        name,
-        digest: blob_digest.into(),
-        size: expected_len,
-        executable,
-    })
+    let (_, node) = try_join!(produce, consume)?;
+
+    // remove the fake "root" name again
+    debug_assert_eq!(&node.get_name(), b"root");
+    Ok(node.rename("".into()))
 }
 
-/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode].
-/// It uses [process_node] to iterate over all children.
-///
-/// [DirectoryPutter] is passed around, so a single instance of it can be used,
-/// which is sufficient, as this reads through the whole NAR linerarly.
-fn process_dir_reader<BS>(
-    handle: tokio::runtime::Handle,
-    name: Bytes,
-    mut dir_reader: nar::reader::DirReader,
-    blob_service: BS,
-    directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)>
+async fn produce_nar_inner<BS>(
+    blob_uploader: &mut ConcurrentBlobUploader<BS>,
+    node: nar_reader::Node<'_, '_>,
+    path: PathBuf,
+    tx: mpsc::Sender<Result<IngestionEntry, Error>>,
+) -> Result<IngestionEntry, Error>
 where
-    BS: AsRef<dyn BlobService>,
+    BS: BlobService + Clone + 'static,
 {
-    let mut directory = castorepb::Directory::default();
-
-    let mut directory_putter = directory_putter;
-    let mut blob_service = blob_service;
-    while let Some(entry) = dir_reader.next()? {
-        let (node, directory_putter_back, blob_service_back) = process_node(
-            handle.clone(),
-            entry.name.into(),
-            entry.node,
-            blob_service,
-            directory_putter,
-        )?;
-
-        blob_service = blob_service_back;
-        directory_putter = directory_putter_back;
-
-        match node {
-            castorepb::node::Node::Directory(node) => directory.directories.push(node),
-            castorepb::node::Node::File(node) => directory.files.push(node),
-            castorepb::node::Node::Symlink(node) => directory.symlinks.push(node),
+    Ok(match node {
+        nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
+        nar_reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            let size = reader.len();
+            let digest = blob_uploader.upload(&path, size, &mut reader).await?;
+
+            IngestionEntry::Regular {
+                path,
+                size,
+                executable,
+                digest,
+            }
         }
-    }
+        nar_reader::Node::Directory(mut dir_reader) => {
+            while let Some(entry) = dir_reader.next().await? {
+                let mut path = path.clone();
+
+                // valid NAR names are valid castore names
+                path.try_push(entry.name)
+                    .expect("Tvix bug: failed to join name");
+
+                let entry = Box::pin(produce_nar_inner(
+                    blob_uploader,
+                    entry.node,
+                    path,
+                    tx.clone(),
+                ))
+                .await?;
+
+                tx.send(Ok(entry)).await.map_err(|e| {
+                    Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
+                })?;
+            }
+
+            IngestionEntry::Dir { path }
+        }
+    })
+}
 
-    // calculate digest and size.
-    let directory_digest = directory.digest();
-    let directory_size = directory.size();
-
-    // upload the directory. This is a bit more verbose, as we want to get back
-    // directory_putter for later reuse.
-    let directory_putter = handle.block_on(handle.spawn(async move {
-        directory_putter.put(directory).await?;
-        Ok::<_, io::Error>(directory_putter)
-    }))??;
-
-    Ok((
-        castorepb::DirectoryNode {
-            name,
-            digest: directory_digest.into(),
-            size: directory_size,
-        },
-        directory_putter,
-        blob_service,
-    ))
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error(transparent)]
+    IO(#[from] std::io::Error),
+
+    #[error(transparent)]
+    BlobUpload(#[from] blobs::Error),
 }
 
 #[cfg(test)]
 mod test {
-    use crate::nar::read_nar;
+    use crate::nar::ingest_nar;
     use std::io::Cursor;
     use std::sync::Arc;
 
@@ -244,19 +151,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking(|| {
-                read_nar(
-                    &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
-                    blob_service,
-                    directory_service,
-                )
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service,
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::Symlink(castorepb::SymlinkNode {
@@ -273,22 +174,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking({
-                let blob_service = blob_service.clone();
-                move || {
-                    read_nar(
-                        &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
-                        blob_service,
-                        directory_service,
-                    )
-                }
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::File(castorepb::FileNode {
@@ -310,23 +202,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking({
-                let blob_service = blob_service.clone();
-                let directory_service = directory_service.clone();
-                || {
-                    read_nar(
-                        &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
-                        blob_service,
-                        directory_service,
-                    )
-                }
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service.clone(),
+            &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::Directory(castorepb::DirectoryNode {