diff options
Diffstat (limited to 'tvix/store/src/nar/import.rs')
-rw-r--r-- | tvix/store/src/nar/import.rs | 109 |
1 files changed, 76 insertions, 33 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 3d7c50014aa8..b9a15fe71384 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,15 +1,56 @@ use nix_compat::nar::reader::r#async as nar_reader; -use tokio::{io::AsyncBufRead, sync::mpsc, try_join}; +use sha2::Digest; +use tokio::{ + io::{AsyncBufRead, AsyncRead}, + sync::mpsc, + try_join, +}; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, - import::{ingest_entries, IngestionEntry, IngestionError}, - proto::{node::Node, NamedNode}, - PathBuf, + import::{ + blobs::{self, ConcurrentBlobUploader}, + ingest_entries, IngestionEntry, IngestionError, + }, + Node, PathBuf, }; /// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, /// interacting with a [BlobService] and [DirectoryService]. +/// Returns the castore root node, as well as the sha256 and size of the NAR +/// contents ingested. +pub async fn ingest_nar_and_hash<R, BS, DS>( + blob_service: BS, + directory_service: DS, + r: &mut R, +) -> Result<(Node, [u8; 32], u64), IngestionError<Error>> +where + R: AsyncRead + Unpin + Send, + BS: BlobService + Clone + 'static, + DS: DirectoryService, +{ + let mut nar_hash = sha2::Sha256::new(); + let mut nar_size = 0; + + // Assemble NarHash and NarSize as we read bytes. + let r = tokio_util::io::InspectReader::new(r, |b| { + nar_size += b.len() as u64; + use std::io::Write; + nar_hash.write_all(b).unwrap(); + }); + + // HACK: InspectReader doesn't implement AsyncBufRead. + // See if this can be propagated through and we can then require our input + // reader to be buffered too. + let mut r = tokio::io::BufReader::new(r); + + let root_node = ingest_nar(blob_service, directory_service, &mut r).await?; + + Ok((root_node, nar_hash.finalize().into(), nar_size)) +} + +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. /// It returns the castore root node or an error. pub async fn ingest_nar<R, BS, DS>( blob_service: BS, @@ -18,7 +59,7 @@ pub async fn ingest_nar<R, BS, DS>( ) -> Result<Node, IngestionError<Error>> where R: AsyncBufRead + Unpin + Send, - BS: BlobService + Clone, + BS: BlobService + Clone + 'static, DS: DirectoryService, { // open the NAR for reading. @@ -29,14 +70,22 @@ where let rx = tokio_stream::wrappers::ReceiverStream::new(rx); let produce = async move { + let mut blob_uploader = ConcurrentBlobUploader::new(blob_service); + let res = produce_nar_inner( - blob_service, + &mut blob_uploader, root_node, "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT. tx.clone(), ) .await; + if let Err(err) = blob_uploader.join().await { + tx.send(Err(err.into())) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; + } + tx.send(res) .await .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; @@ -48,19 +97,17 @@ where let (_, node) = try_join!(produce, consume)?; - // remove the fake "root" name again - debug_assert_eq!(&node.get_name(), b"root"); - Ok(node.rename("".into())) + Ok(node) } async fn produce_nar_inner<BS>( - blob_service: BS, + blob_uploader: &mut ConcurrentBlobUploader<BS>, node: nar_reader::Node<'_, '_>, path: PathBuf, tx: mpsc::Sender<Result<IngestionEntry, Error>>, ) -> Result<IngestionEntry, Error> where - BS: BlobService + Clone, + BS: BlobService + Clone + 'static, { Ok(match node { nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target }, @@ -68,12 +115,8 @@ where executable, mut reader, } => { - let (digest, size) = { - let mut blob_writer = blob_service.open_write().await; - let size = tokio::io::copy_buf(&mut reader, &mut blob_writer).await?; - - (blob_writer.close().await?, size) - }; + let size = reader.len(); + let digest = blob_uploader.upload(&path, size, &mut reader).await?; IngestionEntry::Regular { path, @@ -91,7 +134,7 @@ where .expect("Tvix bug: failed to join name"); let entry = Box::pin(produce_nar_inner( - blob_service.clone(), + blob_uploader, entry.node, path, tx.clone(), @@ -112,6 +155,9 @@ where pub enum Error { #[error(transparent)] IO(#[from] std::io::Error), + + #[error(transparent)] + BlobUpload(#[from] blobs::Error), } #[cfg(test)] @@ -128,7 +174,7 @@ mod test { DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS, HELLOWORLD_BLOB_DIGEST, }; - use tvix_castore::proto as castorepb; + use tvix_castore::{Directory, Node}; use crate::tests::fixtures::{ blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD, @@ -150,10 +196,9 @@ mod test { .expect("must parse"); assert_eq!( - castorepb::node::Node::Symlink(castorepb::SymlinkNode { - name: "".into(), // name must be empty - target: "/nix/store/somewhereelse".into(), - }), + Node::Symlink { + target: "/nix/store/somewhereelse".try_into().unwrap() + }, root_node ); } @@ -173,12 +218,11 @@ mod test { .expect("must parse"); assert_eq!( - castorepb::node::Node::File(castorepb::FileNode { - name: "".into(), // name must be empty - digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + Node::File { + digest: HELLOWORLD_BLOB_DIGEST.clone(), size: HELLOWORLD_BLOB_CONTENTS.len() as u64, executable: false, - }), + }, root_node ); @@ -201,11 +245,10 @@ mod test { .expect("must parse"); assert_eq!( - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: "".into(), // name must be empty - digest: DIRECTORY_COMPLICATED.digest().into(), - size: DIRECTORY_COMPLICATED.size(), - }), + Node::Directory { + digest: DIRECTORY_COMPLICATED.digest(), + size: DIRECTORY_COMPLICATED.size() + }, root_node, ); @@ -213,7 +256,7 @@ mod test { assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap()); // directoryservice must contain the directories, at least with get_recursive. - let resp: Result<Vec<castorepb::Directory>, _> = directory_service + let resp: Result<Vec<Directory>, _> = directory_service .get_recursive(&DIRECTORY_COMPLICATED.digest()) .collect() .await; |