about summary refs log tree commit diff
path: root/tvix/store/src/nar
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/nar')
-rw-r--r--tvix/store/src/nar/import.rs276
-rw-r--r--tvix/store/src/nar/mod.rs53
-rw-r--r--tvix/store/src/nar/renderer.rs222
3 files changed, 551 insertions, 0 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
new file mode 100644
index 0000000000..32c2f4e580
--- /dev/null
+++ b/tvix/store/src/nar/import.rs
@@ -0,0 +1,276 @@
+use nix_compat::nar::reader::r#async as nar_reader;
+use sha2::Digest;
+use tokio::{
+    io::{AsyncBufRead, AsyncRead},
+    sync::mpsc,
+    try_join,
+};
+use tvix_castore::{
+    blobservice::BlobService,
+    directoryservice::DirectoryService,
+    import::{
+        blobs::{self, ConcurrentBlobUploader},
+        ingest_entries, IngestionEntry, IngestionError,
+    },
+    proto::{node::Node, NamedNode},
+    PathBuf,
+};
+
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
+/// Returns the castore root node, as well as the sha256 and size of the NAR
+/// contents ingested.
+pub async fn ingest_nar_and_hash<R, BS, DS>(
+    blob_service: BS,
+    directory_service: DS,
+    r: &mut R,
+) -> Result<(Node, [u8; 32], u64), IngestionError<Error>>
+where
+    R: AsyncRead + Unpin + Send,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService,
+{
+    let mut nar_hash = sha2::Sha256::new();
+    let mut nar_size = 0;
+
+    // Assemble NarHash and NarSize as we read bytes.
+    let r = tokio_util::io::InspectReader::new(r, |b| {
+        nar_size += b.len() as u64;
+        use std::io::Write;
+        nar_hash.write_all(b).unwrap();
+    });
+
+    // HACK: InspectReader doesn't implement AsyncBufRead.
+    // See if this can be propagated through and we can then require our input
+    // reader to be buffered too.
+    let mut r = tokio::io::BufReader::new(r);
+
+    let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
+
+    Ok((root_node, nar_hash.finalize().into(), nar_size))
+}
+
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
+/// It returns the castore root node or an error.
+pub async fn ingest_nar<R, BS, DS>(
+    blob_service: BS,
+    directory_service: DS,
+    r: &mut R,
+) -> Result<Node, IngestionError<Error>>
+where
+    R: AsyncBufRead + Unpin + Send,
+    BS: BlobService + Clone + 'static,
+    DS: DirectoryService,
+{
+    // open the NAR for reading.
+    // The NAR reader emits nodes in DFS preorder.
+    let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
+
+    let (tx, rx) = mpsc::channel(1);
+    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
+
+    let produce = async move {
+        let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
+
+        let res = produce_nar_inner(
+            &mut blob_uploader,
+            root_node,
+            "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
+            tx.clone(),
+        )
+        .await;
+
+        if let Err(err) = blob_uploader.join().await {
+            tx.send(Err(err.into()))
+                .await
+                .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+        }
+
+        tx.send(res)
+            .await
+            .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
+
+        Ok(())
+    };
+
+    let consume = ingest_entries(directory_service, rx);
+
+    let (_, node) = try_join!(produce, consume)?;
+
+    // remove the fake "root" name again
+    debug_assert_eq!(&node.get_name(), b"root");
+    Ok(node.rename("".into()))
+}
+
+async fn produce_nar_inner<BS>(
+    blob_uploader: &mut ConcurrentBlobUploader<BS>,
+    node: nar_reader::Node<'_, '_>,
+    path: PathBuf,
+    tx: mpsc::Sender<Result<IngestionEntry, Error>>,
+) -> Result<IngestionEntry, Error>
+where
+    BS: BlobService + Clone + 'static,
+{
+    Ok(match node {
+        nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
+        nar_reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            let size = reader.len();
+            let digest = blob_uploader.upload(&path, size, &mut reader).await?;
+
+            IngestionEntry::Regular {
+                path,
+                size,
+                executable,
+                digest,
+            }
+        }
+        nar_reader::Node::Directory(mut dir_reader) => {
+            while let Some(entry) = dir_reader.next().await? {
+                let mut path = path.clone();
+
+                // valid NAR names are valid castore names
+                path.try_push(entry.name)
+                    .expect("Tvix bug: failed to join name");
+
+                let entry = Box::pin(produce_nar_inner(
+                    blob_uploader,
+                    entry.node,
+                    path,
+                    tx.clone(),
+                ))
+                .await?;
+
+                tx.send(Ok(entry)).await.map_err(|e| {
+                    Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
+                })?;
+            }
+
+            IngestionEntry::Dir { path }
+        }
+    })
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error(transparent)]
+    IO(#[from] std::io::Error),
+
+    #[error(transparent)]
+    BlobUpload(#[from] blobs::Error),
+}
+
+#[cfg(test)]
+mod test {
+    use crate::nar::ingest_nar;
+    use std::io::Cursor;
+    use std::sync::Arc;
+
+    use rstest::*;
+    use tokio_stream::StreamExt;
+    use tvix_castore::blobservice::BlobService;
+    use tvix_castore::directoryservice::DirectoryService;
+    use tvix_castore::fixtures::{
+        DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS,
+        HELLOWORLD_BLOB_DIGEST,
+    };
+    use tvix_castore::proto as castorepb;
+
+    use crate::tests::fixtures::{
+        blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD,
+        NAR_CONTENTS_SYMLINK,
+    };
+
+    #[rstest]
+    #[tokio::test]
+    async fn single_symlink(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service,
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
+        )
+        .await
+        .expect("must parse");
+
+        assert_eq!(
+            castorepb::node::Node::Symlink(castorepb::SymlinkNode {
+                name: "".into(), // name must be empty
+                target: "/nix/store/somewhereelse".into(),
+            }),
+            root_node
+        );
+    }
+
+    #[rstest]
+    #[tokio::test]
+    async fn single_file(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
+        )
+        .await
+        .expect("must parse");
+
+        assert_eq!(
+            castorepb::node::Node::File(castorepb::FileNode {
+                name: "".into(), // name must be empty
+                digest: HELLOWORLD_BLOB_DIGEST.clone().into(),
+                size: HELLOWORLD_BLOB_CONTENTS.len() as u64,
+                executable: false,
+            }),
+            root_node
+        );
+
+        // blobservice must contain the blob
+        assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap());
+    }
+
+    #[rstest]
+    #[tokio::test]
+    async fn complicated(
+        blob_service: Arc<dyn BlobService>,
+        directory_service: Arc<dyn DirectoryService>,
+    ) {
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service.clone(),
+            &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
+        )
+        .await
+        .expect("must parse");
+
+        assert_eq!(
+            castorepb::node::Node::Directory(castorepb::DirectoryNode {
+                name: "".into(), // name must be empty
+                digest: DIRECTORY_COMPLICATED.digest().into(),
+                size: DIRECTORY_COMPLICATED.size(),
+            }),
+            root_node,
+        );
+
+        // blobservice must contain the blob
+        assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
+
+        // directoryservice must contain the directories, at least with get_recursive.
+        let resp: Result<Vec<castorepb::Directory>, _> = directory_service
+            .get_recursive(&DIRECTORY_COMPLICATED.digest())
+            .collect()
+            .await;
+
+        let directories = resp.unwrap();
+
+        assert_eq!(2, directories.len());
+        assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]);
+        assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]);
+    }
+}
diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs
new file mode 100644
index 0000000000..8cbb091f1a
--- /dev/null
+++ b/tvix/store/src/nar/mod.rs
@@ -0,0 +1,53 @@
+use tonic::async_trait;
+use tvix_castore::B3Digest;
+
+mod import;
+mod renderer;
+pub use import::ingest_nar;
+pub use import::ingest_nar_and_hash;
+pub use renderer::calculate_size_and_sha256;
+pub use renderer::write_nar;
+pub use renderer::SimpleRenderer;
+use tvix_castore::proto as castorepb;
+
+#[async_trait]
+pub trait NarCalculationService: Send + Sync {
+    /// Return the nar size and nar sha256 digest for a given root node.
+    /// This can be used to calculate NAR-based output paths.
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error>;
+}
+
+#[async_trait]
+impl<A> NarCalculationService for A
+where
+    A: AsRef<dyn NarCalculationService> + Send + Sync,
+{
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
+        self.as_ref().calculate_nar(root_node).await
+    }
+}
+
+/// Errors that can encounter while rendering NARs.
+#[derive(Debug, thiserror::Error)]
+pub enum RenderError {
+    #[error("failure talking to a backing store client: {0}")]
+    StoreError(#[source] std::io::Error),
+
+    #[error("unable to find directory {}, referred from {:?}", .0, .1)]
+    DirectoryNotFound(B3Digest, bytes::Bytes),
+
+    #[error("unable to find blob {}, referred from {:?}", .0, .1)]
+    BlobNotFound(B3Digest, bytes::Bytes),
+
+    #[error("unexpected size in metadata for blob {}, referred from {:?} returned, expected {}, got {}", .0, .1, .2, .3)]
+    UnexpectedBlobMeta(B3Digest, bytes::Bytes, u32, u32),
+
+    #[error("failure using the NAR writer: {0}")]
+    NARWriterError(std::io::Error),
+}
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
new file mode 100644
index 0000000000..efd67671db
--- /dev/null
+++ b/tvix/store/src/nar/renderer.rs
@@ -0,0 +1,222 @@
+use crate::utils::AsyncIoBridge;
+
+use super::{NarCalculationService, RenderError};
+use count_write::CountWrite;
+use nix_compat::nar::writer::r#async as nar_writer;
+use sha2::{Digest, Sha256};
+use tokio::io::{self, AsyncWrite, BufReader};
+use tonic::async_trait;
+use tvix_castore::{
+    blobservice::BlobService,
+    directoryservice::DirectoryService,
+    proto::{self as castorepb, NamedNode},
+};
+
+pub struct SimpleRenderer<BS, DS> {
+    blob_service: BS,
+    directory_service: DS,
+}
+
+impl<BS, DS> SimpleRenderer<BS, DS> {
+    pub fn new(blob_service: BS, directory_service: DS) -> Self {
+        Self {
+            blob_service,
+            directory_service,
+        }
+    }
+}
+
+#[async_trait]
+impl<BS, DS> NarCalculationService for SimpleRenderer<BS, DS>
+where
+    BS: BlobService + Clone,
+    DS: DirectoryService + Clone,
+{
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), tvix_castore::Error> {
+        calculate_size_and_sha256(
+            root_node,
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+        )
+        .await
+        .map_err(|e| tvix_castore::Error::StorageError(format!("failed rendering nar: {}", e)))
+    }
+}
+
+/// Invoke [write_nar], and return the size and sha256 digest of the produced
+/// NAR output.
+pub async fn calculate_size_and_sha256<BS, DS>(
+    root_node: &castorepb::node::Node,
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(u64, [u8; 32]), RenderError>
+where
+    BS: BlobService + Send,
+    DS: DirectoryService + Send,
+{
+    let mut h = Sha256::new();
+    let mut cw = CountWrite::from(&mut h);
+
+    write_nar(
+        // The hasher doesn't speak async. It doesn't
+        // actually do any I/O, so it's fine to wrap.
+        AsyncIoBridge(&mut cw),
+        root_node,
+        blob_service,
+        directory_service,
+    )
+    .await?;
+
+    Ok((cw.count(), h.finalize().into()))
+}
+
+/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path,
+/// and uses the passed blob_service and directory_service to perform the
+/// necessary lookups as it traverses the structure.
+/// The contents in NAR serialization are writen to the passed [AsyncWrite].
+pub async fn write_nar<W, BS, DS>(
+    mut w: W,
+    proto_root_node: &castorepb::node::Node,
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(), RenderError>
+where
+    W: AsyncWrite + Unpin + Send,
+    BS: BlobService + Send,
+    DS: DirectoryService + Send,
+{
+    // Initialize NAR writer
+    let nar_root_node = nar_writer::open(&mut w)
+        .await
+        .map_err(RenderError::NARWriterError)?;
+
+    walk_node(
+        nar_root_node,
+        proto_root_node,
+        blob_service,
+        directory_service,
+    )
+    .await?;
+
+    Ok(())
+}
+
+/// Process an intermediate node in the structure.
+/// This consumes the node.
+async fn walk_node<BS, DS>(
+    nar_node: nar_writer::Node<'_, '_>,
+    proto_node: &castorepb::node::Node,
+    blob_service: BS,
+    directory_service: DS,
+) -> Result<(BS, DS), RenderError>
+where
+    BS: BlobService + Send,
+    DS: DirectoryService + Send,
+{
+    match proto_node {
+        castorepb::node::Node::Symlink(proto_symlink_node) => {
+            nar_node
+                .symlink(&proto_symlink_node.target)
+                .await
+                .map_err(RenderError::NARWriterError)?;
+        }
+        castorepb::node::Node::File(proto_file_node) => {
+            let digest_len = proto_file_node.digest.len();
+            let digest = proto_file_node.digest.clone().try_into().map_err(|_| {
+                RenderError::StoreError(io::Error::new(
+                    io::ErrorKind::Other,
+                    format!("invalid digest len {} in file node", digest_len),
+                ))
+            })?;
+
+            let mut blob_reader = match blob_service
+                .open_read(&digest)
+                .await
+                .map_err(RenderError::StoreError)?
+            {
+                Some(blob_reader) => Ok(BufReader::new(blob_reader)),
+                None => Err(RenderError::NARWriterError(io::Error::new(
+                    io::ErrorKind::NotFound,
+                    format!("blob with digest {} not found", &digest),
+                ))),
+            }?;
+
+            nar_node
+                .file(
+                    proto_file_node.executable,
+                    proto_file_node.size,
+                    &mut blob_reader,
+                )
+                .await
+                .map_err(RenderError::NARWriterError)?;
+        }
+        castorepb::node::Node::Directory(proto_directory_node) => {
+            let digest_len = proto_directory_node.digest.len();
+            let digest = proto_directory_node
+                .digest
+                .clone()
+                .try_into()
+                .map_err(|_| {
+                    RenderError::StoreError(io::Error::new(
+                        io::ErrorKind::InvalidData,
+                        format!("invalid digest len {} in directory node", digest_len),
+                    ))
+                })?;
+
+            // look it up with the directory service
+            match directory_service
+                .get(&digest)
+                .await
+                .map_err(|e| RenderError::StoreError(e.into()))?
+            {
+                // if it's None, that's an error!
+                None => Err(RenderError::DirectoryNotFound(
+                    digest,
+                    proto_directory_node.name.clone(),
+                ))?,
+                Some(proto_directory) => {
+                    // start a directory node
+                    let mut nar_node_directory = nar_node
+                        .directory()
+                        .await
+                        .map_err(RenderError::NARWriterError)?;
+
+                    // We put blob_service, directory_service back here whenever we come up from
+                    // the recursion.
+                    let mut blob_service = blob_service;
+                    let mut directory_service = directory_service;
+
+                    // for each node in the directory, create a new entry with its name,
+                    // and then recurse on that entry.
+                    for proto_node in proto_directory.nodes() {
+                        let child_node = nar_node_directory
+                            .entry(proto_node.get_name())
+                            .await
+                            .map_err(RenderError::NARWriterError)?;
+
+                        (blob_service, directory_service) = Box::pin(walk_node(
+                            child_node,
+                            &proto_node,
+                            blob_service,
+                            directory_service,
+                        ))
+                        .await?;
+                    }
+
+                    // close the directory
+                    nar_node_directory
+                        .close()
+                        .await
+                        .map_err(RenderError::NARWriterError)?;
+
+                    return Ok((blob_service, directory_service));
+                }
+            }
+        }
+    }
+
+    Ok((blob_service, directory_service))
+}