diff options
Diffstat (limited to 'tvix/store/src/nar/import.rs')
-rw-r--r-- | tvix/store/src/nar/import.rs | 330 |
1 files changed, 103 insertions, 227 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 70f8137e8951..bfb65629e6b6 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,219 +1,120 @@ -use bytes::Bytes; -use nix_compat::nar; -use std::io::{self, BufRead}; -use tokio_util::io::SyncIoBridge; -use tracing::warn; +use async_recursion::async_recursion; +use nix_compat::nar::reader::r#async as nar_reader; +use tokio::{io::AsyncBufRead, sync::mpsc, try_join}; use tvix_castore::{ blobservice::BlobService, - directoryservice::{DirectoryPutter, DirectoryService}, - proto::{self as castorepb}, - B3Digest, + directoryservice::DirectoryService, + import::{ingest_entries, IngestionEntry, IngestionError}, + proto::{node::Node, NamedNode}, + PathBuf, }; -/// Accepts a reader providing a NAR. -/// Will traverse it, uploading blobs to the given [BlobService], and -/// directories to the given [DirectoryService]. -/// On success, the root node is returned. -/// This function is not async (because the NAR reader is not) -/// and calls [tokio::task::block_in_place] when interacting with backing -/// services, so make sure to only call this with spawn_blocking. -pub fn read_nar<R, BS, DS>( - r: &mut R, +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// It returns the castore root node or an error. +pub async fn ingest_nar<R, BS, DS>( blob_service: BS, directory_service: DS, -) -> io::Result<castorepb::node::Node> + r: &mut R, +) -> Result<Node, IngestionError<Error>> where - R: BufRead + Send, + R: AsyncBufRead + Unpin + Send, BS: BlobService + Clone, DS: DirectoryService, { - let handle = tokio::runtime::Handle::current(); + // open the NAR for reading. + // The NAR reader emits nodes in DFS preorder. + let root_node = nar_reader::open(r).await.map_err(Error::IO)?; - let directory_putter = directory_service.put_multiple_start(); + let (tx, rx) = mpsc::channel(1); + let rx = tokio_stream::wrappers::ReceiverStream::new(rx); - let node = nix_compat::nar::reader::open(r)?; - let (root_node, mut directory_putter) = process_node( - handle.clone(), - "".into(), // this is the root node, it has an empty name - node, - blob_service, - directory_putter, - )?; + let produce = async move { + let res = produce_nar_inner( + blob_service, + root_node, + "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT. + tx.clone(), + ) + .await; - // In case the root node points to a directory, we need to close - // [directory_putter], and ensure the digest we got back from there matches - // what the root node is pointing to. - if let castorepb::node::Node::Directory(ref directory_node) = root_node { - // Close directory_putter to make sure all directories have been inserted. - let directory_putter_digest = - handle.block_on(handle.spawn(async move { directory_putter.close().await }))??; - let root_directory_node_digest: B3Digest = - directory_node.digest.clone().try_into().unwrap(); + tx.send(res) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; - if directory_putter_digest != root_directory_node_digest { - warn!( - root_directory_node_digest = %root_directory_node_digest, - directory_putter_digest =%directory_putter_digest, - "directory digest mismatch", - ); - return Err(io::Error::new( - io::ErrorKind::Other, - "directory digest mismatch", - )); - } - } - // In case it's not a Directory, [directory_putter] doesn't need to be - // closed (as we didn't end up uploading anything). - // It can just be dropped, as documented in its trait. + Ok(()) + }; + + let consume = ingest_entries(directory_service, rx); + + let (_, node) = try_join!(produce, consume)?; - Ok(root_node) + // remove the fake "root" name again + debug_assert_eq!(&node.get_name(), b"root"); + Ok(node.rename("".into())) } -/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node]. -/// It does so by handling all three kinds, and recursing for directories. -/// -/// [DirectoryPutter] is passed around, so a single instance of it can be used, -/// which is sufficient, as this reads through the whole NAR linerarly. -fn process_node<BS>( - handle: tokio::runtime::Handle, - name: bytes::Bytes, - node: nar::reader::Node, +#[async_recursion] +async fn produce_nar_inner<'a: 'async_recursion, 'r: 'async_recursion, BS>( blob_service: BS, - directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)> + node: nar_reader::Node<'a, 'r>, + path: PathBuf, + tx: mpsc::Sender<Result<IngestionEntry, Error>>, +) -> Result<IngestionEntry, Error> where BS: BlobService + Clone, { Ok(match node { - nar::reader::Node::Symlink { target } => ( - castorepb::node::Node::Symlink(castorepb::SymlinkNode { - name, - target: target.into(), - }), - directory_putter, - ), - nar::reader::Node::File { executable, reader } => ( - castorepb::node::Node::File(process_file_reader( - handle, - name, - reader, + nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target }, + nar_reader::Node::File { + executable, + mut reader, + } => { + let (digest, size) = { + let mut blob_writer = blob_service.open_write().await; + // TODO(edef): fix the AsyncBufRead implementation of nix_compat::wire::BytesReader + let size = tokio::io::copy(&mut reader, &mut blob_writer).await?; + + (blob_writer.close().await?, size) + }; + + IngestionEntry::Regular { + path, + size, executable, - blob_service, - )?), - directory_putter, - ), - nar::reader::Node::Directory(dir_reader) => { - let (directory_node, directory_putter) = - process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?; - - ( - castorepb::node::Node::Directory(directory_node), - directory_putter, - ) + digest, + } } - }) -} + nar_reader::Node::Directory(mut dir_reader) => { + while let Some(entry) = dir_reader.next().await? { + let mut path = path.clone(); -/// Given a name and [nar::reader::FileReader], this ingests the file into the -/// passed [BlobService] and returns a [castorepb::FileNode]. -fn process_file_reader<BS>( - handle: tokio::runtime::Handle, - name: Bytes, - mut file_reader: nar::reader::FileReader, - executable: bool, - blob_service: BS, -) -> io::Result<castorepb::FileNode> -where - BS: BlobService, -{ - // store the length. If we read any other length, reading will fail. - let expected_len = file_reader.len(); + // valid NAR names are valid castore names + path.try_push(&entry.name) + .expect("Tvix bug: failed to join name"); - // prepare writing a new blob. - let blob_writer = handle.block_on(async { blob_service.open_write().await }); + let entry = + produce_nar_inner(blob_service.clone(), entry.node, path, tx.clone()).await?; - // write the blob. - let mut blob_writer = { - let mut dst = SyncIoBridge::new(blob_writer); + tx.send(Ok(entry)).await.map_err(|e| { + Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)) + })?; + } - file_reader.copy(&mut dst)?; - dst.shutdown()?; - - // return back the blob_writer - dst.into_inner() - }; - - // close the blob_writer, retrieve the digest. - let blob_digest = handle.block_on(async { blob_writer.close().await })?; - - Ok(castorepb::FileNode { - name, - digest: blob_digest.into(), - size: expected_len, - executable, + IngestionEntry::Dir { path } + } }) } -/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode]. -/// It uses [process_node] to iterate over all children. -/// -/// [DirectoryPutter] is passed around, so a single instance of it can be used, -/// which is sufficient, as this reads through the whole NAR linerarly. -fn process_dir_reader<BS>( - handle: tokio::runtime::Handle, - name: Bytes, - mut dir_reader: nar::reader::DirReader, - blob_service: BS, - directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)> -where - BS: BlobService + Clone, -{ - let mut directory = castorepb::Directory::default(); - - let mut directory_putter = directory_putter; - while let Some(entry) = dir_reader.next()? { - let (node, directory_putter_back) = process_node( - handle.clone(), - entry.name.into(), - entry.node, - blob_service.clone(), - directory_putter, - )?; - - directory_putter = directory_putter_back; - - match node { - castorepb::node::Node::Directory(node) => directory.directories.push(node), - castorepb::node::Node::File(node) => directory.files.push(node), - castorepb::node::Node::Symlink(node) => directory.symlinks.push(node), - } - } - - // calculate digest and size. - let directory_digest = directory.digest(); - let directory_size = directory.size(); - - // upload the directory. This is a bit more verbose, as we want to get back - // directory_putter for later reuse. - let directory_putter = handle.block_on(handle.spawn(async move { - directory_putter.put(directory).await?; - Ok::<_, io::Error>(directory_putter) - }))??; - - Ok(( - castorepb::DirectoryNode { - name, - digest: directory_digest.into(), - size: directory_size, - }, - directory_putter, - )) +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + IO(#[from] std::io::Error), } #[cfg(test)] mod test { - use crate::nar::read_nar; + use crate::nar::ingest_nar; use std::io::Cursor; use std::sync::Arc; @@ -238,19 +139,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking(|| { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), - blob_service, - directory_service, - ) - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service, + directory_service, + &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::Symlink(castorepb::SymlinkNode { @@ -267,22 +162,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking({ - let blob_service = blob_service.clone(); - move || { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), - blob_service, - directory_service, - ) - } - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service.clone(), + directory_service, + &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::File(castorepb::FileNode { @@ -304,23 +190,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking({ - let blob_service = blob_service.clone(); - let directory_service = directory_service.clone(); - || { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), - blob_service, - directory_service, - ) - } - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service.clone(), + directory_service.clone(), + &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::Directory(castorepb::DirectoryNode { |