diff options
Diffstat (limited to 'tvix/store/src/nar')
-rw-r--r-- | tvix/store/src/nar/import.rs | 276 | ||||
-rw-r--r-- | tvix/store/src/nar/mod.rs | 53 | ||||
-rw-r--r-- | tvix/store/src/nar/renderer.rs | 222 |
3 files changed, 551 insertions, 0 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs new file mode 100644 index 0000000000..32c2f4e580 --- /dev/null +++ b/tvix/store/src/nar/import.rs @@ -0,0 +1,276 @@ +use nix_compat::nar::reader::r#async as nar_reader; +use sha2::Digest; +use tokio::{ + io::{AsyncBufRead, AsyncRead}, + sync::mpsc, + try_join, +}; +use tvix_castore::{ + blobservice::BlobService, + directoryservice::DirectoryService, + import::{ + blobs::{self, ConcurrentBlobUploader}, + ingest_entries, IngestionEntry, IngestionError, + }, + proto::{node::Node, NamedNode}, + PathBuf, +}; + +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// Returns the castore root node, as well as the sha256 and size of the NAR +/// contents ingested. +pub async fn ingest_nar_and_hash<R, BS, DS>( + blob_service: BS, + directory_service: DS, + r: &mut R, +) -> Result<(Node, [u8; 32], u64), IngestionError<Error>> +where + R: AsyncRead + Unpin + Send, + BS: BlobService + Clone + 'static, + DS: DirectoryService, +{ + let mut nar_hash = sha2::Sha256::new(); + let mut nar_size = 0; + + // Assemble NarHash and NarSize as we read bytes. + let r = tokio_util::io::InspectReader::new(r, |b| { + nar_size += b.len() as u64; + use std::io::Write; + nar_hash.write_all(b).unwrap(); + }); + + // HACK: InspectReader doesn't implement AsyncBufRead. + // See if this can be propagated through and we can then require our input + // reader to be buffered too. + let mut r = tokio::io::BufReader::new(r); + + let root_node = ingest_nar(blob_service, directory_service, &mut r).await?; + + Ok((root_node, nar_hash.finalize().into(), nar_size)) +} + +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// It returns the castore root node or an error. +pub async fn ingest_nar<R, BS, DS>( + blob_service: BS, + directory_service: DS, + r: &mut R, +) -> Result<Node, IngestionError<Error>> +where + R: AsyncBufRead + Unpin + Send, + BS: BlobService + Clone + 'static, + DS: DirectoryService, +{ + // open the NAR for reading. + // The NAR reader emits nodes in DFS preorder. + let root_node = nar_reader::open(r).await.map_err(Error::IO)?; + + let (tx, rx) = mpsc::channel(1); + let rx = tokio_stream::wrappers::ReceiverStream::new(rx); + + let produce = async move { + let mut blob_uploader = ConcurrentBlobUploader::new(blob_service); + + let res = produce_nar_inner( + &mut blob_uploader, + root_node, + "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT. + tx.clone(), + ) + .await; + + if let Err(err) = blob_uploader.join().await { + tx.send(Err(err.into())) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; + } + + tx.send(res) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; + + Ok(()) + }; + + let consume = ingest_entries(directory_service, rx); + + let (_, node) = try_join!(produce, consume)?; + + // remove the fake "root" name again + debug_assert_eq!(&node.get_name(), b"root"); + Ok(node.rename("".into())) +} + +async fn produce_nar_inner<BS>( + blob_uploader: &mut ConcurrentBlobUploader<BS>, + node: nar_reader::Node<'_, '_>, + path: PathBuf, + tx: mpsc::Sender<Result<IngestionEntry, Error>>, +) -> Result<IngestionEntry, Error> +where + BS: BlobService + Clone + 'static, +{ + Ok(match node { + nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target }, + nar_reader::Node::File { + executable, + mut reader, + } => { + let size = reader.len(); + let digest = blob_uploader.upload(&path, size, &mut reader).await?; + + IngestionEntry::Regular { + path, + size, + executable, + digest, + } + } + nar_reader::Node::Directory(mut dir_reader) => { + while let Some(entry) = dir_reader.next().await? { + let mut path = path.clone(); + + // valid NAR names are valid castore names + path.try_push(entry.name) + .expect("Tvix bug: failed to join name"); + + let entry = Box::pin(produce_nar_inner( + blob_uploader, + entry.node, + path, + tx.clone(), + )) + .await?; + + tx.send(Ok(entry)).await.map_err(|e| { + Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)) + })?; + } + + IngestionEntry::Dir { path } + } + }) +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + IO(#[from] std::io::Error), + + #[error(transparent)] + BlobUpload(#[from] blobs::Error), +} + +#[cfg(test)] +mod test { + use crate::nar::ingest_nar; + use std::io::Cursor; + use std::sync::Arc; + + use rstest::*; + use tokio_stream::StreamExt; + use tvix_castore::blobservice::BlobService; + use tvix_castore::directoryservice::DirectoryService; + use tvix_castore::fixtures::{ + DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS, + HELLOWORLD_BLOB_DIGEST, + }; + use tvix_castore::proto as castorepb; + + use crate::tests::fixtures::{ + blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD, + NAR_CONTENTS_SYMLINK, + }; + + #[rstest] + #[tokio::test] + async fn single_symlink( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) { + let root_node = ingest_nar( + blob_service, + directory_service, + &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), + ) + .await + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "".into(), // name must be empty + target: "/nix/store/somewhereelse".into(), + }), + root_node + ); + } + + #[rstest] + #[tokio::test] + async fn single_file( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) { + let root_node = ingest_nar( + blob_service.clone(), + directory_service, + &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), + ) + .await + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::File(castorepb::FileNode { + name: "".into(), // name must be empty + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u64, + executable: false, + }), + root_node + ); + + // blobservice must contain the blob + assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap()); + } + + #[rstest] + #[tokio::test] + async fn complicated( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) { + let root_node = ingest_nar( + blob_service.clone(), + directory_service.clone(), + &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), + ) + .await + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: "".into(), // name must be empty + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + root_node, + ); + + // blobservice must contain the blob + assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap()); + + // directoryservice must contain the directories, at least with get_recursive. + let resp: Result<Vec<castorepb::Directory>, _> = directory_service + .get_recursive(&DIRECTORY_COMPLICATED.digest()) + .collect() + .await; + + let directories = resp.unwrap(); + + assert_eq!(2, directories.len()); + assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]); + assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]); + } +} diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs new file mode 100644 index 0000000000..8cbb091f1a --- /dev/null +++ b/tvix/store/src/nar/mod.rs @@ -0,0 +1,53 @@ +use tonic::async_trait; +use tvix_castore::B3Digest; + +mod import; +mod renderer; +pub use import::ingest_nar; +pub use import::ingest_nar_and_hash; +pub use renderer::calculate_size_and_sha256; +pub use renderer::write_nar; +pub use renderer::SimpleRenderer; +use tvix_castore::proto as castorepb; + +#[async_trait] +pub trait NarCalculationService: Send + Sync { + /// Return the nar size and nar sha256 digest for a given root node. + /// This can be used to calculate NAR-based output paths. + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error>; +} + +#[async_trait] +impl<A> NarCalculationService for A +where + A: AsRef<dyn NarCalculationService> + Send + Sync, +{ + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error> { + self.as_ref().calculate_nar(root_node).await + } +} + +/// Errors that can encounter while rendering NARs. +#[derive(Debug, thiserror::Error)] +pub enum RenderError { + #[error("failure talking to a backing store client: {0}")] + StoreError(#[source] std::io::Error), + + #[error("unable to find directory {}, referred from {:?}", .0, .1)] + DirectoryNotFound(B3Digest, bytes::Bytes), + + #[error("unable to find blob {}, referred from {:?}", .0, .1)] + BlobNotFound(B3Digest, bytes::Bytes), + + #[error("unexpected size in metadata for blob {}, referred from {:?} returned, expected {}, got {}", .0, .1, .2, .3)] + UnexpectedBlobMeta(B3Digest, bytes::Bytes, u32, u32), + + #[error("failure using the NAR writer: {0}")] + NARWriterError(std::io::Error), +} diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs new file mode 100644 index 0000000000..efd67671db --- /dev/null +++ b/tvix/store/src/nar/renderer.rs @@ -0,0 +1,222 @@ +use crate::utils::AsyncIoBridge; + +use super::{NarCalculationService, RenderError}; +use count_write::CountWrite; +use nix_compat::nar::writer::r#async as nar_writer; +use sha2::{Digest, Sha256}; +use tokio::io::{self, AsyncWrite, BufReader}; +use tonic::async_trait; +use tvix_castore::{ + blobservice::BlobService, + directoryservice::DirectoryService, + proto::{self as castorepb, NamedNode}, +}; + +pub struct SimpleRenderer<BS, DS> { + blob_service: BS, + directory_service: DS, +} + +impl<BS, DS> SimpleRenderer<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { + Self { + blob_service, + directory_service, + } + } +} + +#[async_trait] +impl<BS, DS> NarCalculationService for SimpleRenderer<BS, DS> +where + BS: BlobService + Clone, + DS: DirectoryService + Clone, +{ + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error> { + calculate_size_and_sha256( + root_node, + self.blob_service.clone(), + self.directory_service.clone(), + ) + .await + .map_err(|e| tvix_castore::Error::StorageError(format!("failed rendering nar: {}", e))) + } +} + +/// Invoke [write_nar], and return the size and sha256 digest of the produced +/// NAR output. +pub async fn calculate_size_and_sha256<BS, DS>( + root_node: &castorepb::node::Node, + blob_service: BS, + directory_service: DS, +) -> Result<(u64, [u8; 32]), RenderError> +where + BS: BlobService + Send, + DS: DirectoryService + Send, +{ + let mut h = Sha256::new(); + let mut cw = CountWrite::from(&mut h); + + write_nar( + // The hasher doesn't speak async. It doesn't + // actually do any I/O, so it's fine to wrap. + AsyncIoBridge(&mut cw), + root_node, + blob_service, + directory_service, + ) + .await?; + + Ok((cw.count(), h.finalize().into())) +} + +/// Accepts a [castorepb::node::Node] pointing to the root of a (store) path, +/// and uses the passed blob_service and directory_service to perform the +/// necessary lookups as it traverses the structure. +/// The contents in NAR serialization are writen to the passed [AsyncWrite]. +pub async fn write_nar<W, BS, DS>( + mut w: W, + proto_root_node: &castorepb::node::Node, + blob_service: BS, + directory_service: DS, +) -> Result<(), RenderError> +where + W: AsyncWrite + Unpin + Send, + BS: BlobService + Send, + DS: DirectoryService + Send, +{ + // Initialize NAR writer + let nar_root_node = nar_writer::open(&mut w) + .await + .map_err(RenderError::NARWriterError)?; + + walk_node( + nar_root_node, + proto_root_node, + blob_service, + directory_service, + ) + .await?; + + Ok(()) +} + +/// Process an intermediate node in the structure. +/// This consumes the node. +async fn walk_node<BS, DS>( + nar_node: nar_writer::Node<'_, '_>, + proto_node: &castorepb::node::Node, + blob_service: BS, + directory_service: DS, +) -> Result<(BS, DS), RenderError> +where + BS: BlobService + Send, + DS: DirectoryService + Send, +{ + match proto_node { + castorepb::node::Node::Symlink(proto_symlink_node) => { + nar_node + .symlink(&proto_symlink_node.target) + .await + .map_err(RenderError::NARWriterError)?; + } + castorepb::node::Node::File(proto_file_node) => { + let digest_len = proto_file_node.digest.len(); + let digest = proto_file_node.digest.clone().try_into().map_err(|_| { + RenderError::StoreError(io::Error::new( + io::ErrorKind::Other, + format!("invalid digest len {} in file node", digest_len), + )) + })?; + + let mut blob_reader = match blob_service + .open_read(&digest) + .await + .map_err(RenderError::StoreError)? + { + Some(blob_reader) => Ok(BufReader::new(blob_reader)), + None => Err(RenderError::NARWriterError(io::Error::new( + io::ErrorKind::NotFound, + format!("blob with digest {} not found", &digest), + ))), + }?; + + nar_node + .file( + proto_file_node.executable, + proto_file_node.size, + &mut blob_reader, + ) + .await + .map_err(RenderError::NARWriterError)?; + } + castorepb::node::Node::Directory(proto_directory_node) => { + let digest_len = proto_directory_node.digest.len(); + let digest = proto_directory_node + .digest + .clone() + .try_into() + .map_err(|_| { + RenderError::StoreError(io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid digest len {} in directory node", digest_len), + )) + })?; + + // look it up with the directory service + match directory_service + .get(&digest) + .await + .map_err(|e| RenderError::StoreError(e.into()))? + { + // if it's None, that's an error! + None => Err(RenderError::DirectoryNotFound( + digest, + proto_directory_node.name.clone(), + ))?, + Some(proto_directory) => { + // start a directory node + let mut nar_node_directory = nar_node + .directory() + .await + .map_err(RenderError::NARWriterError)?; + + // We put blob_service, directory_service back here whenever we come up from + // the recursion. + let mut blob_service = blob_service; + let mut directory_service = directory_service; + + // for each node in the directory, create a new entry with its name, + // and then recurse on that entry. + for proto_node in proto_directory.nodes() { + let child_node = nar_node_directory + .entry(proto_node.get_name()) + .await + .map_err(RenderError::NARWriterError)?; + + (blob_service, directory_service) = Box::pin(walk_node( + child_node, + &proto_node, + blob_service, + directory_service, + )) + .await?; + } + + // close the directory + nar_node_directory + .close() + .await + .map_err(RenderError::NARWriterError)?; + + return Ok((blob_service, directory_service)); + } + } + } + } + + Ok((blob_service, directory_service)) +} |