use nix_compat::nar::reader::r#async as nar_reader;
use sha2::Digest;
use tokio::{
io::{AsyncBufRead, AsyncRead},
sync::mpsc,
try_join,
};
use tvix_castore::{
blobservice::BlobService,
directoryservice::DirectoryService,
import::{
blobs::{self, ConcurrentBlobUploader},
ingest_entries, IngestionEntry, IngestionError,
},
PathBuf, {NamedNode, Node},
};
/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
/// interacting with a [BlobService] and [DirectoryService].
/// Returns the castore root node, as well as the sha256 and size of the NAR
/// contents ingested.
pub async fn ingest_nar_and_hash<R, BS, DS>(
blob_service: BS,
directory_service: DS,
r: &mut R,
) -> Result<(Node, [u8; 32], u64), IngestionError<Error>>
where
R: AsyncRead + Unpin + Send,
BS: BlobService + Clone + 'static,
DS: DirectoryService,
{
let mut nar_hash = sha2::Sha256::new();
let mut nar_size = 0;
// Assemble NarHash and NarSize as we read bytes.
let r = tokio_util::io::InspectReader::new(r, |b| {
nar_size += b.len() as u64;
use std::io::Write;
nar_hash.write_all(b).unwrap();
});
// HACK: InspectReader doesn't implement AsyncBufRead.
// See if this can be propagated through and we can then require our input
// reader to be buffered too.
let mut r = tokio::io::BufReader::new(r);
let root_node = ingest_nar(blob_service, directory_service, &mut r).await?;
Ok((root_node, nar_hash.finalize().into(), nar_size))
}
/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store,
/// interacting with a [BlobService] and [DirectoryService].
/// It returns the castore root node or an error.
pub async fn ingest_nar<R, BS, DS>(
blob_service: BS,
directory_service: DS,
r: &mut R,
) -> Result<Node, IngestionError<Error>>
where
R: AsyncBufRead + Unpin + Send,
BS: BlobService + Clone + 'static,
DS: DirectoryService,
{
// open the NAR for reading.
// The NAR reader emits nodes in DFS preorder.
let root_node = nar_reader::open(r).await.map_err(Error::IO)?;
let (tx, rx) = mpsc::channel(1);
let rx = tokio_stream::wrappers::ReceiverStream::new(rx);
let produce = async move {
let mut blob_uploader = ConcurrentBlobUploader::new(blob_service);
let res = produce_nar_inner(
&mut blob_uploader,
root_node,
"root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT.
tx.clone(),
)
.await;
if let Err(err) = blob_uploader.join().await {
tx.send(Err(err.into()))
.await
.map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
}
tx.send(res)
.await
.map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?;
Ok(())
};
let consume = ingest_entries(directory_service, rx);
let (_, node) = try_join!(produce, consume)?;
// remove the fake "root" name again
debug_assert_eq!(&node.get_name()[..], b"root");
Ok(node.rename("".into()))
}
async fn produce_nar_inner<BS>(
blob_uploader: &mut ConcurrentBlobUploader<BS>,
node: nar_reader::Node<'_, '_>,
path: PathBuf,
tx: mpsc::Sender<Result<IngestionEntry, Error>>,
) -> Result<IngestionEntry, Error>
where
BS: BlobService + Clone + 'static,
{
Ok(match node {
nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target },
nar_reader::Node::File {
executable,
mut reader,
} => {
let size = reader.len();
let digest = blob_uploader.upload(&path, size, &mut reader).await?;
IngestionEntry::Regular {
path,
size,
executable,
digest,
}
}
nar_reader::Node::Directory(mut dir_reader) => {
while let Some(entry) = dir_reader.next().await? {
let mut path = path.clone();
// valid NAR names are valid castore names
path.try_push(entry.name)
.expect("Tvix bug: failed to join name");
let entry = Box::pin(produce_nar_inner(
blob_uploader,
entry.node,
path,
tx.clone(),
))
.await?;
tx.send(Ok(entry)).await.map_err(|e| {
Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e))
})?;
}
IngestionEntry::Dir { path }
}
})
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
IO(#[from] std::io::Error),
#[error(transparent)]
BlobUpload(#[from] blobs::Error),
}
#[cfg(test)]
mod test {
use crate::nar::ingest_nar;
use std::io::Cursor;
use std::sync::Arc;
use rstest::*;
use tokio_stream::StreamExt;
use tvix_castore::blobservice::BlobService;
use tvix_castore::directoryservice::DirectoryService;
use tvix_castore::fixtures::{
DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS,
HELLOWORLD_BLOB_DIGEST,
};
use tvix_castore::{Directory, DirectoryNode, FileNode, Node, SymlinkNode};
use crate::tests::fixtures::{
blob_service, directory_service, NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD,
NAR_CONTENTS_SYMLINK,
};
#[rstest]
#[tokio::test]
async fn single_symlink(
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) {
let root_node = ingest_nar(
blob_service,
directory_service,
&mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()),
)
.await
.expect("must parse");
assert_eq!(
Node::Symlink(
SymlinkNode::new(
"".into(), // name must be empty
"/nix/store/somewhereelse".into(),
)
.unwrap()
),
root_node
);
}
#[rstest]
#[tokio::test]
async fn single_file(
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) {
let root_node = ingest_nar(
blob_service.clone(),
directory_service,
&mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()),
)
.await
.expect("must parse");
assert_eq!(
Node::File(
FileNode::new(
"".into(), // name must be empty
HELLOWORLD_BLOB_DIGEST.clone(),
HELLOWORLD_BLOB_CONTENTS.len() as u64,
false,
)
.unwrap()
),
root_node
);
// blobservice must contain the blob
assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap());
}
#[rstest]
#[tokio::test]
async fn complicated(
blob_service: Arc<dyn BlobService>,
directory_service: Arc<dyn DirectoryService>,
) {
let root_node = ingest_nar(
blob_service.clone(),
directory_service.clone(),
&mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()),
)
.await
.expect("must parse");
assert_eq!(
Node::Directory(
DirectoryNode::new(
"".into(), // name must be empty
DIRECTORY_COMPLICATED.digest(),
DIRECTORY_COMPLICATED.size(),
)
.unwrap()
),
root_node,
);
// blobservice must contain the blob
assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap());
// directoryservice must contain the directories, at least with get_recursive.
let resp: Result<Vec<Directory>, _> = directory_service
.get_recursive(&DIRECTORY_COMPLICATED.digest())
.collect()
.await;
let directories = resp.unwrap();
assert_eq!(2, directories.len());
assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]);
assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]);
}
}