diff options
-rw-r--r-- | tvix/store/src/nar/import.rs | 34 |
1 files changed, 22 insertions, 12 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 3d7c50014aa8..36122d419d00 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -3,7 +3,10 @@ use tokio::{io::AsyncBufRead, sync::mpsc, try_join}; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, - import::{ingest_entries, IngestionEntry, IngestionError}, + import::{ + blobs::{self, ConcurrentBlobUploader}, + ingest_entries, IngestionEntry, IngestionError, + }, proto::{node::Node, NamedNode}, PathBuf, }; @@ -18,7 +21,7 @@ pub async fn ingest_nar<R, BS, DS>( ) -> Result<Node, IngestionError<Error>> where R: AsyncBufRead + Unpin + Send, - BS: BlobService + Clone, + BS: BlobService + Clone + 'static, DS: DirectoryService, { // open the NAR for reading. @@ -29,14 +32,22 @@ where let rx = tokio_stream::wrappers::ReceiverStream::new(rx); let produce = async move { + let mut blob_uploader = ConcurrentBlobUploader::new(blob_service); + let res = produce_nar_inner( - blob_service, + &mut blob_uploader, root_node, "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT. tx.clone(), ) .await; + if let Err(err) = blob_uploader.join().await { + tx.send(Err(err.into())) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; + } + tx.send(res) .await .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; @@ -54,13 +65,13 @@ where } async fn produce_nar_inner<BS>( - blob_service: BS, + blob_uploader: &mut ConcurrentBlobUploader<BS>, node: nar_reader::Node<'_, '_>, path: PathBuf, tx: mpsc::Sender<Result<IngestionEntry, Error>>, ) -> Result<IngestionEntry, Error> where - BS: BlobService + Clone, + BS: BlobService + Clone + 'static, { Ok(match node { nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target }, @@ -68,12 +79,8 @@ where executable, mut reader, } => { - let (digest, size) = { - let mut blob_writer = blob_service.open_write().await; - let size = tokio::io::copy_buf(&mut reader, &mut blob_writer).await?; - - (blob_writer.close().await?, size) - }; + let size = reader.len(); + let digest = blob_uploader.upload(&path, size, &mut reader).await?; IngestionEntry::Regular { path, @@ -91,7 +98,7 @@ where .expect("Tvix bug: failed to join name"); let entry = Box::pin(produce_nar_inner( - blob_service.clone(), + blob_uploader, entry.node, path, tx.clone(), @@ -112,6 +119,9 @@ where pub enum Error { #[error(transparent)] IO(#[from] std::io::Error), + + #[error(transparent)] + BlobUpload(#[from] blobs::Error), } #[cfg(test)] |