about summary refs log tree commit diff
path: root/tvix/store/src/nar/import.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/nar/import.rs')
-rw-r--r--tvix/store/src/nar/import.rs227
1 files changed, 227 insertions, 0 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
new file mode 100644
index 0000000000..3d7c50014a
--- /dev/null
+++ b/tvix/store/src/nar/import.rs
@@ -0,0 +1,227 @@
+use nix_compat::nar::reader::r#async as nar_reader;
+use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
+use tvix_castore::{
+    blobservice::BlobService,
+    directoryservice::DirectoryService,
+    import::{ingest_entries, IngestionEntry, IngestionError},
+    proto::{node::Node, NamedNode},
+    PathBuf,
+};
+
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
+/// It returns the castore root node or an error.
+pub async fn ingest_nar<R, BS, DS>(
+    blob_service: BS,
+    directory_service: DS,
+    r: &mut R,
+) -> Result<Node, IngestionError<Error>>
+where
+    R: AsyncBufRead + Unpin + Send,
+    BS: BlobService + Clone,
+    DS: DirectoryService,
+{
+    // open the NAR for reading.
+    // The NAR reader emits nodes in DFS preorder.
+    let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
+
+    let (tx, rx) = mpsc::channel(1);
+    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
+
+    let produce = async move {
+        let res = produce_nar_inner(
+            blob_service,
+            root_node,
+            "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
+            tx.clone(),
+        )
+        .await;
+
+        tx.send(res)
+            .await
+            .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+
+        Ok(())
+    };
+
+    let consume = ingest_entries(directory_service, rx);
+
+    let (_, node) = try_join!(produce, consume)?;
+
+    // remove the fake "root" name again
+    debug_assert_eq!(&node.get_name(), b"root");
+    Ok(node.rename("".into()))
+}
+
+async fn produce_nar_inner<BS>(
+    blob_service: BS,
+    node: nar_reader::Node<'_, '_>,
+    path: PathBuf,
+    tx: mpsc::Sender<Result<IngestionEntry, Error>>,
+) -> Result<IngestionEntry, Error>
+where
+    BS: BlobService + Clone,
+{
+    Ok(match node {
+        nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
+        nar_reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            let (digest, size) = {
+                let mut blob_writer = blob_service.open_write().await;
+                let size = tokio::io::copy_buf(&mut reader, &mut blob_writer).await?;
+
+                (blob_writer.close().await?, size)
+            };
+
+            IngestionEntry::Regular {
+                path,
+                size,
+                executable,
+                digest,
+            }
+        }
+        nar_reader::Node::Directory(mut dir_reader) => {
+            while let Some(entry) = dir_reader.next().await? {
+                let mut path = path.clone();
+
+                // valid NAR names are valid castore names
+                path.try_push(entry.name)
+                    .expect("Tvix bug: failed to join name");
+
+                let entry = Box::pin(produce_nar_inner(
+                    blob_service.clone(),
+                    entry.node,
+                    path,
+                    tx.clone(),
+                ))
+                .await?;
+
+                tx.send(Ok(entry)).await.map_err(|e| {
+                    Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
+                })?;
+            }
+
+            IngestionEntry::Dir { path }
+        }
+    })
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error(transparent)]
+    IO(#[from] std::io::Error),
+}
+
+#[cfg(test)]
+mod test {
+    use crate::nar::ingest_nar;
+    use std::io::Cursor;
+    use std::sync::Arc;
+
+    use rstest::*;
+    use tokio_stream::StreamExt;
+    use tvix_castore::blobservice::BlobService;
+    use tvix_castore::directoryservice::DirectoryService;
+    use tvix_castore::fixtures::{
+        DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS,
+        HELLOWORLD_BLOB_DIGEST,
+    };
+    use tvix_castore::proto as castorepb;
+
+    use crate::tests::fixtures::{
+        blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD,
+        NAR_CONTENTS_SYMLINK,
+    };
+
+    #[rstest]
+    #[tokio::test]
+    async fn single_symlink(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service,
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
+        )
+        .await
+        .expect("must parse");
+
+        assert_eq!(
+            castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+                name: "".into(), // name must be empty
+                target: "/nix/store/somewhereelse".into(),
+            }),
+            root_node
+        );
+    }
+
+    #[rstest]
+    #[tokio::test]
+    async fn single_file(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
+        )
+        .await
+        .expect("must parse");
+
+        assert_eq!(
+            castorepb::node::Node::File(castorepb::FileNode {
+                name: "".into(), // name must be empty
+                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+                size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
+                executable: false,
+            }),
+            root_node
+        );
+
+        // blobservice must contain the blob
+        assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap());
+    }
+
+    #[rstest]
+    #[tokio::test]
+    async fn complicated(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service.clone(),
+            &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
+        )
+        .await
+        .expect("must parse");
+
+        assert_eq!(
+            castorepb::node::Node::Directory(castorepb::DirectoryNode {
+                name: "".into(), // name must be empty
+                digest: DIRECTORY_COMPLICATED.digest().into(),
+                size: DIRECTORY_COMPLICATED.size(),
+            }),
+            root_node,
+        );
+
+        // blobservice must contain the blob
+        assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
+
+        // directoryservice must contain the directories, at least with get_recursive.
+        let resp: Result<Vec<castorepb::Directory>, _> = directory_service
+            .get_recursive(&DIRECTORY_COMPLICATED.digest())
+            .collect()
+            .await;
+
+        let directories = resp.unwrap();
+
+        assert_eq!(2, directories.len());
+        assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]);
+        assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]);
+    }
+}