diff options
Diffstat (limited to 'tvix/store/src/nar/import.rs')
-rw-r--r-- | tvix/store/src/nar/import.rs | 366 |
1 files changed, 124 insertions, 242 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 6f4dcdea5d..36122d419d 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,225 +1,132 @@ -use bytes::Bytes; -use nix_compat::nar; -use std::io::{self, BufRead}; -use tokio_util::io::SyncIoBridge; -use tracing::warn; +use nix_compat::nar::reader::r#async as nar_reader; +use tokio::{io::AsyncBufRead, sync::mpsc, try_join}; use tvix_castore::{ blobservice::BlobService, - directoryservice::{DirectoryPutter, DirectoryService}, - proto::{self as castorepb}, - B3Digest, + directoryservice::DirectoryService, + import::{ + blobs::{self, ConcurrentBlobUploader}, + ingest_entries, IngestionEntry, IngestionError, + }, + proto::{node::Node, NamedNode}, + PathBuf, }; -/// Accepts a reader providing a NAR. -/// Will traverse it, uploading blobs to the given [BlobService], and -/// directories to the given [DirectoryService]. -/// On success, the root node is returned. -/// This function is not async (because the NAR reader is not) -/// and calls [tokio::task::block_in_place] when interacting with backing -/// services, so make sure to only call this with spawn_blocking. -pub fn read_nar<R, BS, DS>( - r: &mut R, +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// It returns the castore root node or an error. +pub async fn ingest_nar<R, BS, DS>( blob_service: BS, directory_service: DS, -) -> io::Result<castorepb::node::Node> + r: &mut R, +) -> Result<Node, IngestionError<Error>> where - R: BufRead + Send, - BS: AsRef<dyn BlobService>, - DS: AsRef<dyn DirectoryService>, + R: AsyncBufRead + Unpin + Send, + BS: BlobService + Clone + 'static, + DS: DirectoryService, { - let handle = tokio::runtime::Handle::current(); - - let directory_putter = directory_service.as_ref().put_multiple_start(); - - let node = nix_compat::nar::reader::open(r)?; - let (root_node, mut directory_putter, _) = process_node( - handle.clone(), - "".into(), // this is the root node, it has an empty name - node, - &blob_service, - directory_putter, - )?; - - // In case the root node points to a directory, we need to close - // [directory_putter], and ensure the digest we got back from there matches - // what the root node is pointing to. - if let castorepb::node::Node::Directory(ref directory_node) = root_node { - // Close directory_putter to make sure all directories have been inserted. - let directory_putter_digest = - handle.block_on(handle.spawn(async move { directory_putter.close().await }))??; - let root_directory_node_digest: B3Digest = - directory_node.digest.clone().try_into().unwrap(); - - if directory_putter_digest != root_directory_node_digest { - warn!( - root_directory_node_digest = %root_directory_node_digest, - directory_putter_digest =%directory_putter_digest, - "directory digest mismatch", - ); - return Err(io::Error::new( - io::ErrorKind::Other, - "directory digest mismatch", - )); - } - } - // In case it's not a Directory, [directory_putter] doesn't need to be - // closed (as we didn't end up uploading anything). - // It can just be dropped, as documented in its trait. - - Ok(root_node) -} + // open the NAR for reading. + // The NAR reader emits nodes in DFS preorder. + let root_node = nar_reader::open(r).await.map_err(Error::IO)?; -/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node]. -/// It does so by handling all three kinds, and recursing for directories. -/// -/// [DirectoryPutter] is passed around, so a single instance of it can be used, -/// which is sufficient, as this reads through the whole NAR linerarly. -fn process_node<BS>( - handle: tokio::runtime::Handle, - name: bytes::Bytes, - node: nar::reader::Node, - blob_service: BS, - directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)> -where - BS: AsRef<dyn BlobService>, -{ - Ok(match node { - nar::reader::Node::Symlink { target } => ( - castorepb::node::Node::Symlink(castorepb::SymlinkNode { - name, - target: target.into(), - }), - directory_putter, - blob_service, - ), - nar::reader::Node::File { executable, reader } => ( - castorepb::node::Node::File(process_file_reader( - handle, - name, - reader, - executable, - &blob_service, - )?), - directory_putter, - blob_service, - ), - nar::reader::Node::Directory(dir_reader) => { - let (directory_node, directory_putter, blob_service_back) = - process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?; - - ( - castorepb::node::Node::Directory(directory_node), - directory_putter, - blob_service_back, - ) - } - }) -} - -/// Given a name and [nar::reader::FileReader], this ingests the file into the -/// passed [BlobService] and returns a [castorepb::FileNode]. -fn process_file_reader<BS>( - handle: tokio::runtime::Handle, - name: Bytes, - mut file_reader: nar::reader::FileReader, - executable: bool, - blob_service: BS, -) -> io::Result<castorepb::FileNode> -where - BS: AsRef<dyn BlobService>, -{ - // store the length. If we read any other length, reading will fail. - let expected_len = file_reader.len(); + let (tx, rx) = mpsc::channel(1); + let rx = tokio_stream::wrappers::ReceiverStream::new(rx); - // prepare writing a new blob. - let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await }); + let produce = async move { + let mut blob_uploader = ConcurrentBlobUploader::new(blob_service); - // write the blob. - let mut blob_writer = { - let mut dst = SyncIoBridge::new(blob_writer); + let res = produce_nar_inner( + &mut blob_uploader, + root_node, + "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT. + tx.clone(), + ) + .await; + + if let Err(err) = blob_uploader.join().await { + tx.send(Err(err.into())) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; + } - file_reader.copy(&mut dst)?; - dst.shutdown()?; + tx.send(res) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; - // return back the blob_writer - dst.into_inner() + Ok(()) }; - // close the blob_writer, retrieve the digest. - let blob_digest = handle.block_on(async { blob_writer.close().await })?; + let consume = ingest_entries(directory_service, rx); - Ok(castorepb::FileNode { - name, - digest: blob_digest.into(), - size: expected_len, - executable, - }) + let (_, node) = try_join!(produce, consume)?; + + // remove the fake "root" name again + debug_assert_eq!(&node.get_name(), b"root"); + Ok(node.rename("".into())) } -/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode]. -/// It uses [process_node] to iterate over all children. -/// -/// [DirectoryPutter] is passed around, so a single instance of it can be used, -/// which is sufficient, as this reads through the whole NAR linerarly. -fn process_dir_reader<BS>( - handle: tokio::runtime::Handle, - name: Bytes, - mut dir_reader: nar::reader::DirReader, - blob_service: BS, - directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)> +async fn produce_nar_inner<BS>( + blob_uploader: &mut ConcurrentBlobUploader<BS>, + node: nar_reader::Node<'_, '_>, + path: PathBuf, + tx: mpsc::Sender<Result<IngestionEntry, Error>>, +) -> Result<IngestionEntry, Error> where - BS: AsRef<dyn BlobService>, + BS: BlobService + Clone + 'static, { - let mut directory = castorepb::Directory::default(); - - let mut directory_putter = directory_putter; - let mut blob_service = blob_service; - while let Some(entry) = dir_reader.next()? { - let (node, directory_putter_back, blob_service_back) = process_node( - handle.clone(), - entry.name.into(), - entry.node, - blob_service, - directory_putter, - )?; - - blob_service = blob_service_back; - directory_putter = directory_putter_back; - - match node { - castorepb::node::Node::Directory(node) => directory.directories.push(node), - castorepb::node::Node::File(node) => directory.files.push(node), - castorepb::node::Node::Symlink(node) => directory.symlinks.push(node), + Ok(match node { + nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target }, + nar_reader::Node::File { + executable, + mut reader, + } => { + let size = reader.len(); + let digest = blob_uploader.upload(&path, size, &mut reader).await?; + + IngestionEntry::Regular { + path, + size, + executable, + digest, + } } - } + nar_reader::Node::Directory(mut dir_reader) => { + while let Some(entry) = dir_reader.next().await? { + let mut path = path.clone(); + + // valid NAR names are valid castore names + path.try_push(entry.name) + .expect("Tvix bug: failed to join name"); + + let entry = Box::pin(produce_nar_inner( + blob_uploader, + entry.node, + path, + tx.clone(), + )) + .await?; + + tx.send(Ok(entry)).await.map_err(|e| { + Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)) + })?; + } + + IngestionEntry::Dir { path } + } + }) +} - // calculate digest and size. - let directory_digest = directory.digest(); - let directory_size = directory.size(); - - // upload the directory. This is a bit more verbose, as we want to get back - // directory_putter for later reuse. - let directory_putter = handle.block_on(handle.spawn(async move { - directory_putter.put(directory).await?; - Ok::<_, io::Error>(directory_putter) - }))??; - - Ok(( - castorepb::DirectoryNode { - name, - digest: directory_digest.into(), - size: directory_size, - }, - directory_putter, - blob_service, - )) +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + IO(#[from] std::io::Error), + + #[error(transparent)] + BlobUpload(#[from] blobs::Error), } #[cfg(test)] mod test { - use crate::nar::read_nar; + use crate::nar::ingest_nar; use std::io::Cursor; use std::sync::Arc; @@ -244,19 +151,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking(|| { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), - blob_service, - directory_service, - ) - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service, + directory_service, + &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::Symlink(castorepb::SymlinkNode { @@ -273,22 +174,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking({ - let blob_service = blob_service.clone(); - move || { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), - blob_service, - directory_service, - ) - } - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service.clone(), + directory_service, + &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::File(castorepb::FileNode { @@ -310,23 +202,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking({ - let blob_service = blob_service.clone(); - let directory_service = directory_service.clone(); - || { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), - blob_service, - directory_service, - ) - } - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service.clone(), + directory_service.clone(), + &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::Directory(castorepb::DirectoryNode { |