diff options
Diffstat (limited to 'tvix/store/src')
-rw-r--r-- | tvix/store/src/nar/import.rs | 330 | ||||
-rw-r--r-- | tvix/store/src/nar/mod.rs | 2 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/nix_http.rs | 180 |
3 files changed, 176 insertions, 336 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 70f8137e8951..bfb65629e6b6 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,219 +1,120 @@ -use bytes::Bytes; -use nix_compat::nar; -use std::io::{self, BufRead}; -use tokio_util::io::SyncIoBridge; -use tracing::warn; +use async_recursion::async_recursion; +use nix_compat::nar::reader::r#async as nar_reader; +use tokio::{io::AsyncBufRead, sync::mpsc, try_join}; use tvix_castore::{ blobservice::BlobService, - directoryservice::{DirectoryPutter, DirectoryService}, - proto::{self as castorepb}, - B3Digest, + directoryservice::DirectoryService, + import::{ingest_entries, IngestionEntry, IngestionError}, + proto::{node::Node, NamedNode}, + PathBuf, }; -/// Accepts a reader providing a NAR. -/// Will traverse it, uploading blobs to the given [BlobService], and -/// directories to the given [DirectoryService]. -/// On success, the root node is returned. -/// This function is not async (because the NAR reader is not) -/// and calls [tokio::task::block_in_place] when interacting with backing -/// services, so make sure to only call this with spawn_blocking. -pub fn read_nar<R, BS, DS>( - r: &mut R, +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// It returns the castore root node or an error. +pub async fn ingest_nar<R, BS, DS>( blob_service: BS, directory_service: DS, -) -> io::Result<castorepb::node::Node> + r: &mut R, +) -> Result<Node, IngestionError<Error>> where - R: BufRead + Send, + R: AsyncBufRead + Unpin + Send, BS: BlobService + Clone, DS: DirectoryService, { - let handle = tokio::runtime::Handle::current(); + // open the NAR for reading. + // The NAR reader emits nodes in DFS preorder. + let root_node = nar_reader::open(r).await.map_err(Error::IO)?; - let directory_putter = directory_service.put_multiple_start(); + let (tx, rx) = mpsc::channel(1); + let rx = tokio_stream::wrappers::ReceiverStream::new(rx); - let node = nix_compat::nar::reader::open(r)?; - let (root_node, mut directory_putter) = process_node( - handle.clone(), - "".into(), // this is the root node, it has an empty name - node, - blob_service, - directory_putter, - )?; + let produce = async move { + let res = produce_nar_inner( + blob_service, + root_node, + "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT. + tx.clone(), + ) + .await; - // In case the root node points to a directory, we need to close - // [directory_putter], and ensure the digest we got back from there matches - // what the root node is pointing to. - if let castorepb::node::Node::Directory(ref directory_node) = root_node { - // Close directory_putter to make sure all directories have been inserted. - let directory_putter_digest = - handle.block_on(handle.spawn(async move { directory_putter.close().await }))??; - let root_directory_node_digest: B3Digest = - directory_node.digest.clone().try_into().unwrap(); + tx.send(res) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; - if directory_putter_digest != root_directory_node_digest { - warn!( - root_directory_node_digest = %root_directory_node_digest, - directory_putter_digest =%directory_putter_digest, - "directory digest mismatch", - ); - return Err(io::Error::new( - io::ErrorKind::Other, - "directory digest mismatch", - )); - } - } - // In case it's not a Directory, [directory_putter] doesn't need to be - // closed (as we didn't end up uploading anything). - // It can just be dropped, as documented in its trait. + Ok(()) + }; + + let consume = ingest_entries(directory_service, rx); + + let (_, node) = try_join!(produce, consume)?; - Ok(root_node) + // remove the fake "root" name again + debug_assert_eq!(&node.get_name(), b"root"); + Ok(node.rename("".into())) } -/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node]. -/// It does so by handling all three kinds, and recursing for directories. -/// -/// [DirectoryPutter] is passed around, so a single instance of it can be used, -/// which is sufficient, as this reads through the whole NAR linerarly. -fn process_node<BS>( - handle: tokio::runtime::Handle, - name: bytes::Bytes, - node: nar::reader::Node, +#[async_recursion] +async fn produce_nar_inner<'a: 'async_recursion, 'r: 'async_recursion, BS>( blob_service: BS, - directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)> + node: nar_reader::Node<'a, 'r>, + path: PathBuf, + tx: mpsc::Sender<Result<IngestionEntry, Error>>, +) -> Result<IngestionEntry, Error> where BS: BlobService + Clone, { Ok(match node { - nar::reader::Node::Symlink { target } => ( - castorepb::node::Node::Symlink(castorepb::SymlinkNode { - name, - target: target.into(), - }), - directory_putter, - ), - nar::reader::Node::File { executable, reader } => ( - castorepb::node::Node::File(process_file_reader( - handle, - name, - reader, + nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target }, + nar_reader::Node::File { + executable, + mut reader, + } => { + let (digest, size) = { + let mut blob_writer = blob_service.open_write().await; + // TODO(edef): fix the AsyncBufRead implementation of nix_compat::wire::BytesReader + let size = tokio::io::copy(&mut reader, &mut blob_writer).await?; + + (blob_writer.close().await?, size) + }; + + IngestionEntry::Regular { + path, + size, executable, - blob_service, - )?), - directory_putter, - ), - nar::reader::Node::Directory(dir_reader) => { - let (directory_node, directory_putter) = - process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?; - - ( - castorepb::node::Node::Directory(directory_node), - directory_putter, - ) + digest, + } } - }) -} + nar_reader::Node::Directory(mut dir_reader) => { + while let Some(entry) = dir_reader.next().await? { + let mut path = path.clone(); -/// Given a name and [nar::reader::FileReader], this ingests the file into the -/// passed [BlobService] and returns a [castorepb::FileNode]. -fn process_file_reader<BS>( - handle: tokio::runtime::Handle, - name: Bytes, - mut file_reader: nar::reader::FileReader, - executable: bool, - blob_service: BS, -) -> io::Result<castorepb::FileNode> -where - BS: BlobService, -{ - // store the length. If we read any other length, reading will fail. - let expected_len = file_reader.len(); + // valid NAR names are valid castore names + path.try_push(&entry.name) + .expect("Tvix bug: failed to join name"); - // prepare writing a new blob. - let blob_writer = handle.block_on(async { blob_service.open_write().await }); + let entry = + produce_nar_inner(blob_service.clone(), entry.node, path, tx.clone()).await?; - // write the blob. - let mut blob_writer = { - let mut dst = SyncIoBridge::new(blob_writer); + tx.send(Ok(entry)).await.map_err(|e| { + Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)) + })?; + } - file_reader.copy(&mut dst)?; - dst.shutdown()?; - - // return back the blob_writer - dst.into_inner() - }; - - // close the blob_writer, retrieve the digest. - let blob_digest = handle.block_on(async { blob_writer.close().await })?; - - Ok(castorepb::FileNode { - name, - digest: blob_digest.into(), - size: expected_len, - executable, + IngestionEntry::Dir { path } + } }) } -/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode]. -/// It uses [process_node] to iterate over all children. -/// -/// [DirectoryPutter] is passed around, so a single instance of it can be used, -/// which is sufficient, as this reads through the whole NAR linerarly. -fn process_dir_reader<BS>( - handle: tokio::runtime::Handle, - name: Bytes, - mut dir_reader: nar::reader::DirReader, - blob_service: BS, - directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)> -where - BS: BlobService + Clone, -{ - let mut directory = castorepb::Directory::default(); - - let mut directory_putter = directory_putter; - while let Some(entry) = dir_reader.next()? { - let (node, directory_putter_back) = process_node( - handle.clone(), - entry.name.into(), - entry.node, - blob_service.clone(), - directory_putter, - )?; - - directory_putter = directory_putter_back; - - match node { - castorepb::node::Node::Directory(node) => directory.directories.push(node), - castorepb::node::Node::File(node) => directory.files.push(node), - castorepb::node::Node::Symlink(node) => directory.symlinks.push(node), - } - } - - // calculate digest and size. - let directory_digest = directory.digest(); - let directory_size = directory.size(); - - // upload the directory. This is a bit more verbose, as we want to get back - // directory_putter for later reuse. - let directory_putter = handle.block_on(handle.spawn(async move { - directory_putter.put(directory).await?; - Ok::<_, io::Error>(directory_putter) - }))??; - - Ok(( - castorepb::DirectoryNode { - name, - digest: directory_digest.into(), - size: directory_size, - }, - directory_putter, - )) +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + IO(#[from] std::io::Error), } #[cfg(test)] mod test { - use crate::nar::read_nar; + use crate::nar::ingest_nar; use std::io::Cursor; use std::sync::Arc; @@ -238,19 +139,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking(|| { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), - blob_service, - directory_service, - ) - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service, + directory_service, + &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::Symlink(castorepb::SymlinkNode { @@ -267,22 +162,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking({ - let blob_service = blob_service.clone(); - move || { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), - blob_service, - directory_service, - ) - } - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service.clone(), + directory_service, + &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::File(castorepb::FileNode { @@ -304,23 +190,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking({ - let blob_service = blob_service.clone(); - let directory_service = directory_service.clone(); - || { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), - blob_service, - directory_service, - ) - } - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service.clone(), + directory_service.clone(), + &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::Directory(castorepb::DirectoryNode { diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs index 49bb92fb0f0f..4d5101f9d558 100644 --- a/tvix/store/src/nar/mod.rs +++ b/tvix/store/src/nar/mod.rs @@ -2,7 +2,7 @@ use tvix_castore::B3Digest; mod import; mod renderer; -pub use import::read_nar; +pub use import::ingest_nar; pub use renderer::calculate_size_and_sha256; pub use renderer::write_nar; diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index bdb0e2c3cba7..234340592e89 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -1,5 +1,3 @@ -use std::io::{self, BufRead, Read, Write}; - use data_encoding::BASE64; use futures::{stream::BoxStream, TryStreamExt}; use nix_compat::{ @@ -8,7 +6,10 @@ use nix_compat::{ nixhash::NixHash, }; use reqwest::StatusCode; -use sha2::{digest::FixedOutput, Digest, Sha256}; +use sha2::Digest; +use std::io::{self, Write}; +use tokio::io::{AsyncRead, BufReader}; +use tokio_util::io::InspectReader; use tonic::async_trait; use tracing::{debug, instrument, warn}; use tvix_castore::{ @@ -171,85 +172,83 @@ where ))); } - // get an AsyncRead of the response body. - let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| { + // get a reader of the response body. + let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| { let e = e.without_url(); warn!(e=%e, "failed to get response body"); io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) })); - let sync_r = tokio_util::io::SyncIoBridge::new(async_r); - // handle decompression, by wrapping the reader. - let sync_r: Box<dyn BufRead + Send> = match narinfo.compression { - Some("none") => Box::new(sync_r), - Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))), - Some(comp) => { - return Err(Error::InvalidRequest( - format!("unsupported compression: {}", comp).to_string(), - )) - } - None => { - return Err(Error::InvalidRequest( - "unsupported compression: bzip2".to_string(), - )) + // handle decompression, depending on the compression field. + let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression { + Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>, + Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some(comp_str) => { + return Err(Error::StorageError(format!( + "unsupported compression: {comp_str}" + ))); } }; - - let res = tokio::task::spawn_blocking({ - let blob_service = self.blob_service.clone(); - let directory_service = self.directory_service.clone(); - move || -> io::Result<_> { - // Wrap the reader once more, so we can calculate NarSize and NarHash - let mut sync_r = io::BufReader::new(NarReader::from(sync_r)); - let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?; - - let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner(); - - Ok((root_node, nar_hash, nar_size)) - } - }) + let mut nar_hash = sha2::Sha256::new(); + let mut nar_size = 0; + + // Assemble NarHash and NarSize as we read bytes. + let r = InspectReader::new(r, |b| { + nar_size += b.len() as u64; + nar_hash.write_all(b).unwrap(); + }); + + // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors. + let mut r = BufReader::new(r); + + let root_node = crate::nar::ingest_nar( + self.blob_service.clone(), + self.directory_service.clone(), + &mut r, + ) .await - .unwrap(); - - match res { - Ok((root_node, nar_hash, nar_size)) => { - // ensure the ingested narhash and narsize do actually match. - if narinfo.nar_size != nar_size { - warn!( - narinfo.nar_size = narinfo.nar_size, - http.nar_size = nar_size, - "NARSize mismatch" - ); - Err(io::Error::new( - io::ErrorKind::InvalidData, - "NarSize mismatch".to_string(), - ))?; - } - if narinfo.nar_hash != nar_hash { - warn!( - narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash), - http.nar_hash = %NixHash::Sha256(nar_hash), - "NarHash mismatch" - ); - Err(io::Error::new( - io::ErrorKind::InvalidData, - "NarHash mismatch".to_string(), - ))?; - } - - Ok(Some(PathInfo { - node: Some(castorepb::Node { - // set the name of the root node to the digest-name of the store path. - node: Some( - root_node.rename(narinfo.store_path.to_string().to_owned().into()), - ), - }), - references: pathinfo.references, - narinfo: pathinfo.narinfo, - })) - } - Err(e) => Err(e.into()), + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + // ensure the ingested narhash and narsize do actually match. + if narinfo.nar_size != nar_size { + warn!( + narinfo.nar_size = narinfo.nar_size, + http.nar_size = nar_size, + "NARSize mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarSize mismatch".to_string(), + ))?; } + let nar_hash: [u8; 32] = nar_hash.finalize().into(); + if narinfo.nar_hash != nar_hash { + warn!( + narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash), + http.nar_hash = %NixHash::Sha256(nar_hash), + "NarHash mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarHash mismatch".to_string(), + ))?; + } + + Ok(Some(PathInfo { + node: Some(castorepb::Node { + // set the name of the root node to the digest-name of the store path. + node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())), + }), + references: pathinfo.references, + narinfo: pathinfo.narinfo, + })) } #[instrument(skip_all, fields(path_info=?_path_info))] @@ -277,38 +276,3 @@ where })) } } - -/// Small helper reader implementing [std::io::Read]. -/// It can be used to wrap another reader, counts the number of bytes read -/// and the sha256 digest of the contents. -struct NarReader<R: Read> { - r: R, - - sha256: sha2::Sha256, - bytes_read: u64, -} - -impl<R: Read> NarReader<R> { - pub fn from(inner: R) -> Self { - Self { - r: inner, - sha256: Sha256::new(), - bytes_read: 0, - } - } - - /// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read. - pub fn into_inner(self) -> (R, [u8; 32], u64) { - (self.r, self.sha256.finalize_fixed().into(), self.bytes_read) - } -} - -impl<R: Read> Read for NarReader<R> { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.r.read(buf).map(|n| { - self.bytes_read += n as u64; - self.sha256.write_all(&buf[..n]).unwrap(); - n - }) - } -} |