about summary refs log tree commit diff
path: root/tvix/store
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store')
-rw-r--r--tvix/store/Cargo.toml2
-rw-r--r--tvix/store/src/nar/import.rs330
-rw-r--r--tvix/store/src/nar/mod.rs2
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs180
4 files changed, 177 insertions, 337 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 4c0e9be49aa2..7034a95f3808 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -5,6 +5,7 @@ edition = "2021"
 
 [dependencies]
 anyhow = "1.0.68"
+async-compression = { version = "0.4.9", features = ["tokio", "bzip2", "gzip", "xz", "zstd"]}
 async-stream = "0.3.5"
 blake3 = { version = "1.3.1", features = ["rayon", "std"] }
 bstr = "1.6.0"
@@ -41,7 +42,6 @@ url = "2.4.0"
 walkdir = "2.4.0"
 async-recursion = "1.0.5"
 reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots", "stream"], default-features = false }
-xz2 = "0.1.7"
 
 [dependencies.tonic-reflection]
 optional = true
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index 70f8137e8951..bfb65629e6b6 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -1,219 +1,120 @@
-use bytes::Bytes;
-use nix_compat::nar;
-use std::io::{self, BufRead};
-use tokio_util::io::SyncIoBridge;
-use tracing::warn;
+use async_recursion::async_recursion;
+use nix_compat::nar::reader::r#async as nar_reader;
+use tokio::{io::AsyncBufRead, sync::mpsc, try_join};
 use tvix_castore::{
     blobservice::BlobService,
-    directoryservice::{DirectoryPutter, DirectoryService},
-    proto::{self as castorepb},
-    B3Digest,
+    directoryservice::DirectoryService,
+    import::{ingest_entries, IngestionEntry, IngestionError},
+    proto::{node::Node, NamedNode},
+    PathBuf,
 };
 
-/// Accepts a reader providing a NAR.
-/// Will traverse it, uploading blobs to the given [BlobService], and
-/// directories to the given [DirectoryService].
-/// On success, the root node is returned.
-/// This function is not async (because the NAR reader is not)
-/// and calls [tokio::task::block_in_place] when interacting with backing
-/// services, so make sure to only call this with spawn_blocking.
-pub fn read_nar<R, BS, DS>(
-    r: &mut R,
+/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
+/// interacting with a [BlobService] and [DirectoryService].
+/// It returns the castore root node or an error.
+pub async fn ingest_nar<R, BS, DS>(
     blob_service: BS,
     directory_service: DS,
-) -> io::Result<castorepb::node::Node>
+    r: &mut R,
+) -> Result<Node, IngestionError<Error>>
 where
-    R: BufRead + Send,
+    R: AsyncBufRead + Unpin + Send,
     BS: BlobService + Clone,
     DS: DirectoryService,
 {
-    let handle = tokio::runtime::Handle::current();
+    // open the NAR for reading.
+    // The NAR reader emits nodes in DFS preorder.
+    let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
 
-    let directory_putter = directory_service.put_multiple_start();
+    let (tx, rx) = mpsc::channel(1);
+    let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
 
-    let node = nix_compat::nar::reader::open(r)?;
-    let (root_node, mut directory_putter) = process_node(
-        handle.clone(),
-        "".into(), // this is the root node, it has an empty name
-        node,
-        blob_service,
-        directory_putter,
-    )?;
+    let produce = async move {
+        let res = produce_nar_inner(
+            blob_service,
+            root_node,
+            "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
+            tx.clone(),
+        )
+        .await;
 
-    // In case the root node points to a directory, we need to close
-    // [directory_putter], and ensure the digest we got back from there matches
-    // what the root node is pointing to.
-    if let castorepb::node::Node::Directory(ref directory_node) = root_node {
-        // Close directory_putter to make sure all directories have been inserted.
-        let directory_putter_digest =
-            handle.block_on(handle.spawn(async move { directory_putter.close().await }))??;
-        let root_directory_node_digest: B3Digest =
-            directory_node.digest.clone().try_into().unwrap();
+        tx.send(res)
+            .await
+            .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
 
-        if directory_putter_digest != root_directory_node_digest {
-            warn!(
-                root_directory_node_digest = %root_directory_node_digest,
-                directory_putter_digest =%directory_putter_digest,
-                "directory digest mismatch",
-            );
-            return Err(io::Error::new(
-                io::ErrorKind::Other,
-                "directory digest mismatch",
-            ));
-        }
-    }
-    // In case it's not a Directory, [directory_putter] doesn't need to be
-    // closed (as we didn't end up uploading anything).
-    // It can just be dropped, as documented in its trait.
+        Ok(())
+    };
+
+    let consume = ingest_entries(directory_service, rx);
+
+    let (_, node) = try_join!(produce, consume)?;
 
-    Ok(root_node)
+    // remove the fake "root" name again
+    debug_assert_eq!(&node.get_name(), b"root");
+    Ok(node.rename("".into()))
 }
 
-/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node].
-/// It does so by handling all three kinds, and recursing for directories.
-///
-/// [DirectoryPutter] is passed around, so a single instance of it can be used,
-/// which is sufficient, as this reads through the whole NAR linerarly.
-fn process_node<BS>(
-    handle: tokio::runtime::Handle,
-    name: bytes::Bytes,
-    node: nar::reader::Node,
+#[async_recursion]
+async fn produce_nar_inner<'a: 'async_recursion, 'r: 'async_recursion, BS>(
     blob_service: BS,
-    directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)>
+    node: nar_reader::Node<'a, 'r>,
+    path: PathBuf,
+    tx: mpsc::Sender<Result<IngestionEntry, Error>>,
+) -> Result<IngestionEntry, Error>
 where
     BS: BlobService + Clone,
 {
     Ok(match node {
-        nar::reader::Node::Symlink { target } => (
-            castorepb::node::Node::Symlink(castorepb::SymlinkNode {
-                name,
-                target: target.into(),
-            }),
-            directory_putter,
-        ),
-        nar::reader::Node::File { executable, reader } => (
-            castorepb::node::Node::File(process_file_reader(
-                handle,
-                name,
-                reader,
+        nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
+        nar_reader::Node::File {
+            executable,
+            mut reader,
+        } => {
+            let (digest, size) = {
+                let mut blob_writer = blob_service.open_write().await;
+                // TODO(edef): fix the AsyncBufRead implementation of nix_compat::wire::BytesReader
+                let size = tokio::io::copy(&mut reader, &mut blob_writer).await?;
+
+                (blob_writer.close().await?, size)
+            };
+
+            IngestionEntry::Regular {
+                path,
+                size,
                 executable,
-                blob_service,
-            )?),
-            directory_putter,
-        ),
-        nar::reader::Node::Directory(dir_reader) => {
-            let (directory_node, directory_putter) =
-                process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?;
-
-            (
-                castorepb::node::Node::Directory(directory_node),
-                directory_putter,
-            )
+                digest,
+            }
         }
-    })
-}
+        nar_reader::Node::Directory(mut dir_reader) => {
+            while let Some(entry) = dir_reader.next().await? {
+                let mut path = path.clone();
 
-/// Given a name and [nar::reader::FileReader], this ingests the file into the
-/// passed [BlobService] and returns a [castorepb::FileNode].
-fn process_file_reader<BS>(
-    handle: tokio::runtime::Handle,
-    name: Bytes,
-    mut file_reader: nar::reader::FileReader,
-    executable: bool,
-    blob_service: BS,
-) -> io::Result<castorepb::FileNode>
-where
-    BS: BlobService,
-{
-    // store the length. If we read any other length, reading will fail.
-    let expected_len = file_reader.len();
+                // valid NAR names are valid castore names
+                path.try_push(&entry.name)
+                    .expect("Tvix bug: failed to join name");
 
-    // prepare writing a new blob.
-    let blob_writer = handle.block_on(async { blob_service.open_write().await });
+                let entry =
+                    produce_nar_inner(blob_service.clone(), entry.node, path, tx.clone()).await?;
 
-    // write the blob.
-    let mut blob_writer = {
-        let mut dst = SyncIoBridge::new(blob_writer);
+                tx.send(Ok(entry)).await.map_err(|e| {
+                    Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
+                })?;
+            }
 
-        file_reader.copy(&mut dst)?;
-        dst.shutdown()?;
-
-        // return back the blob_writer
-        dst.into_inner()
-    };
-
-    // close the blob_writer, retrieve the digest.
-    let blob_digest = handle.block_on(async { blob_writer.close().await })?;
-
-    Ok(castorepb::FileNode {
-        name,
-        digest: blob_digest.into(),
-        size: expected_len,
-        executable,
+            IngestionEntry::Dir { path }
+        }
     })
 }
 
-/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode].
-/// It uses [process_node] to iterate over all children.
-///
-/// [DirectoryPutter] is passed around, so a single instance of it can be used,
-/// which is sufficient, as this reads through the whole NAR linerarly.
-fn process_dir_reader<BS>(
-    handle: tokio::runtime::Handle,
-    name: Bytes,
-    mut dir_reader: nar::reader::DirReader,
-    blob_service: BS,
-    directory_putter: Box<dyn DirectoryPutter>,
-) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)>
-where
-    BS: BlobService + Clone,
-{
-    let mut directory = castorepb::Directory::default();
-
-    let mut directory_putter = directory_putter;
-    while let Some(entry) = dir_reader.next()? {
-        let (node, directory_putter_back) = process_node(
-            handle.clone(),
-            entry.name.into(),
-            entry.node,
-            blob_service.clone(),
-            directory_putter,
-        )?;
-
-        directory_putter = directory_putter_back;
-
-        match node {
-            castorepb::node::Node::Directory(node) => directory.directories.push(node),
-            castorepb::node::Node::File(node) => directory.files.push(node),
-            castorepb::node::Node::Symlink(node) => directory.symlinks.push(node),
-        }
-    }
-
-    // calculate digest and size.
-    let directory_digest = directory.digest();
-    let directory_size = directory.size();
-
-    // upload the directory. This is a bit more verbose, as we want to get back
-    // directory_putter for later reuse.
-    let directory_putter = handle.block_on(handle.spawn(async move {
-        directory_putter.put(directory).await?;
-        Ok::<_, io::Error>(directory_putter)
-    }))??;
-
-    Ok((
-        castorepb::DirectoryNode {
-            name,
-            digest: directory_digest.into(),
-            size: directory_size,
-        },
-        directory_putter,
-    ))
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+    #[error(transparent)]
+    IO(#[from] std::io::Error),
 }
 
 #[cfg(test)]
 mod test {
-    use crate::nar::read_nar;
+    use crate::nar::ingest_nar;
     use std::io::Cursor;
     use std::sync::Arc;
 
@@ -238,19 +139,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking(|| {
-                read_nar(
-                    &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
-                    blob_service,
-                    directory_service,
-                )
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service,
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::Symlink(castorepb::SymlinkNode {
@@ -267,22 +162,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking({
-                let blob_service = blob_service.clone();
-                move || {
-                    read_nar(
-                        &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
-                        blob_service,
-                        directory_service,
-                    )
-                }
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service,
+            &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::File(castorepb::FileNode {
@@ -304,23 +190,13 @@ mod test {
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
     ) {
-        let handle = tokio::runtime::Handle::current();
-
-        let root_node = handle
-            .spawn_blocking({
-                let blob_service = blob_service.clone();
-                let directory_service = directory_service.clone();
-                || {
-                    read_nar(
-                        &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
-                        blob_service,
-                        directory_service,
-                    )
-                }
-            })
-            .await
-            .unwrap()
-            .expect("must parse");
+        let root_node = ingest_nar(
+            blob_service.clone(),
+            directory_service.clone(),
+            &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
+        )
+        .await
+        .expect("must parse");
 
         assert_eq!(
             castorepb::node::Node::Directory(castorepb::DirectoryNode {
diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs
index 49bb92fb0f0f..4d5101f9d558 100644
--- a/tvix/store/src/nar/mod.rs
+++ b/tvix/store/src/nar/mod.rs
@@ -2,7 +2,7 @@ use tvix_castore::B3Digest;
 
 mod import;
 mod renderer;
-pub use import::read_nar;
+pub use import::ingest_nar;
 pub use renderer::calculate_size_and_sha256;
 pub use renderer::write_nar;
 
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
index bdb0e2c3cba7..234340592e89 100644
--- a/tvix/store/src/pathinfoservice/nix_http.rs
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -1,5 +1,3 @@
-use std::io::{self, BufRead, Read, Write};
-
 use data_encoding::BASE64;
 use futures::{stream::BoxStream, TryStreamExt};
 use nix_compat::{
@@ -8,7 +6,10 @@ use nix_compat::{
     nixhash::NixHash,
 };
 use reqwest::StatusCode;
-use sha2::{digest::FixedOutput, Digest, Sha256};
+use sha2::Digest;
+use std::io::{self, Write};
+use tokio::io::{AsyncRead, BufReader};
+use tokio_util::io::InspectReader;
 use tonic::async_trait;
 use tracing::{debug, instrument, warn};
 use tvix_castore::{
@@ -171,85 +172,83 @@ where
             )));
         }
 
-        // get an AsyncRead of the response body.
-        let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
+        // get a reader of the response body.
+        let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| {
             let e = e.without_url();
             warn!(e=%e, "failed to get response body");
             io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())
         }));
-        let sync_r = tokio_util::io::SyncIoBridge::new(async_r);
 
-        // handle decompression, by wrapping the reader.
-        let sync_r: Box<dyn BufRead + Send> = match narinfo.compression {
-            Some("none") => Box::new(sync_r),
-            Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))),
-            Some(comp) => {
-                return Err(Error::InvalidRequest(
-                    format!("unsupported compression: {}", comp).to_string(),
-                ))
-            }
-            None => {
-                return Err(Error::InvalidRequest(
-                    "unsupported compression: bzip2".to_string(),
-                ))
+        // handle decompression, depending on the compression field.
+        let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression {
+            Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>,
+            Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r))
+                as Box<dyn AsyncRead + Send + Unpin>,
+            Some(comp_str) => {
+                return Err(Error::StorageError(format!(
+                    "unsupported compression: {comp_str}"
+                )));
             }
         };
-
-        let res = tokio::task::spawn_blocking({
-            let blob_service = self.blob_service.clone();
-            let directory_service = self.directory_service.clone();
-            move || -> io::Result<_> {
-                // Wrap the reader once more, so we can calculate NarSize and NarHash
-                let mut sync_r = io::BufReader::new(NarReader::from(sync_r));
-                let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?;
-
-                let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner();
-
-                Ok((root_node, nar_hash, nar_size))
-            }
-        })
+        let mut nar_hash = sha2::Sha256::new();
+        let mut nar_size = 0;
+
+        // Assemble NarHash and NarSize as we read bytes.
+        let r = InspectReader::new(r, |b| {
+            nar_size += b.len() as u64;
+            nar_hash.write_all(b).unwrap();
+        });
+
+        // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors.
+        let mut r = BufReader::new(r);
+
+        let root_node = crate::nar::ingest_nar(
+            self.blob_service.clone(),
+            self.directory_service.clone(),
+            &mut r,
+        )
         .await
-        .unwrap();
-
-        match res {
-            Ok((root_node, nar_hash, nar_size)) => {
-                // ensure the ingested narhash and narsize do actually match.
-                if narinfo.nar_size != nar_size {
-                    warn!(
-                        narinfo.nar_size = narinfo.nar_size,
-                        http.nar_size = nar_size,
-                        "NARSize mismatch"
-                    );
-                    Err(io::Error::new(
-                        io::ErrorKind::InvalidData,
-                        "NarSize mismatch".to_string(),
-                    ))?;
-                }
-                if narinfo.nar_hash != nar_hash {
-                    warn!(
-                        narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
-                        http.nar_hash = %NixHash::Sha256(nar_hash),
-                        "NarHash mismatch"
-                    );
-                    Err(io::Error::new(
-                        io::ErrorKind::InvalidData,
-                        "NarHash mismatch".to_string(),
-                    ))?;
-                }
-
-                Ok(Some(PathInfo {
-                    node: Some(castorepb::Node {
-                        // set the name of the root node to the digest-name of the store path.
-                        node: Some(
-                            root_node.rename(narinfo.store_path.to_string().to_owned().into()),
-                        ),
-                    }),
-                    references: pathinfo.references,
-                    narinfo: pathinfo.narinfo,
-                }))
-            }
-            Err(e) => Err(e.into()),
+        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
+
+        // ensure the ingested narhash and narsize do actually match.
+        if narinfo.nar_size != nar_size {
+            warn!(
+                narinfo.nar_size = narinfo.nar_size,
+                http.nar_size = nar_size,
+                "NARSize mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarSize mismatch".to_string(),
+            ))?;
         }
+        let nar_hash: [u8; 32] = nar_hash.finalize().into();
+        if narinfo.nar_hash != nar_hash {
+            warn!(
+                narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash),
+                http.nar_hash = %NixHash::Sha256(nar_hash),
+                "NarHash mismatch"
+            );
+            Err(io::Error::new(
+                io::ErrorKind::InvalidData,
+                "NarHash mismatch".to_string(),
+            ))?;
+        }
+
+        Ok(Some(PathInfo {
+            node: Some(castorepb::Node {
+                // set the name of the root node to the digest-name of the store path.
+                node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())),
+            }),
+            references: pathinfo.references,
+            narinfo: pathinfo.narinfo,
+        }))
     }
 
     #[instrument(skip_all, fields(path_info=?_path_info))]
@@ -277,38 +276,3 @@ where
         }))
     }
 }
-
-/// Small helper reader implementing [std::io::Read].
-/// It can be used to wrap another reader, counts the number of bytes read
-/// and the sha256 digest of the contents.
-struct NarReader<R: Read> {
-    r: R,
-
-    sha256: sha2::Sha256,
-    bytes_read: u64,
-}
-
-impl<R: Read> NarReader<R> {
-    pub fn from(inner: R) -> Self {
-        Self {
-            r: inner,
-            sha256: Sha256::new(),
-            bytes_read: 0,
-        }
-    }
-
-    /// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read.
-    pub fn into_inner(self) -> (R, [u8; 32], u64) {
-        (self.r, self.sha256.finalize_fixed().into(), self.bytes_read)
-    }
-}
-
-impl<R: Read> Read for NarReader<R> {
-    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-        self.r.read(buf).map(|n| {
-            self.bytes_read += n as u64;
-            self.sha256.write_all(&buf[..n]).unwrap();
-            n
-        })
-    }
-}