diff options
author | Florian Klink <flokli@flokli.de> | 2023-11-04T11·33+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-11-05T12·23+0000 |
commit | cbd72b56b24b9525e5a5fb59429eb01cfec39920 (patch) | |
tree | b1541cf78533a9c7f02acd3769fd8c9849034e4a /tvix/store/src/nar/import.rs | |
parent | 6d7e619b8fa7090b2e3dfced5e5f094980ea2c0a (diff) |
feat(tvix/store/nar): add read_nar method r/6951
This adds a function that can ingest a NAR file into a given blob_service and directory_service, returning the root node. Passing around the directory_putter is a bit annoying, but at least for this linear "reading through a NAR" case, we don't need it to be clone or sync. Change-Id: Ia26d596f5dffc6dbd69aa39a82b59bba41685a9c Reviewed-on: https://cl.tvl.fyi/c/depot/+/9915 Reviewed-by: raitobezarius <tvl@lahfa.xyz> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/nar/import.rs')
-rw-r--r-- | tvix/store/src/nar/import.rs | 336 |
1 files changed, 336 insertions, 0 deletions
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs new file mode 100644 index 000000000000..a9a9cc12d8e1 --- /dev/null +++ b/tvix/store/src/nar/import.rs @@ -0,0 +1,336 @@ +use std::{ + io::{self, Read}, + sync::Arc, +}; + +use bytes::Bytes; +use nix_compat::nar; +use tokio_util::io::SyncIoBridge; +use tracing::warn; +use tvix_castore::{ + blobservice::BlobService, + directoryservice::{DirectoryPutter, DirectoryService}, + proto::{self as castorepb}, + B3Digest, +}; + +/// Accepts a reader providing a NAR. +/// Will traverse it, uploading blobs to the given [BlobService], and +/// directories to the given [DirectoryService]. +/// On success, the root node is returned. +/// This function is not async (because the NAR reader is not) +/// and calls [tokio::task::block_in_place] when interacting with backing +/// services, so make sure to only call this with spawn_blocking. +pub fn read_nar<R: Read + Send>( + r: &mut R, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) -> io::Result<castorepb::node::Node> { + let handle = tokio::runtime::Handle::current(); + + let directory_putter = directory_service.put_multiple_start(); + + let node = nix_compat::nar::reader::open(r)?; + let (root_node, mut directory_putter) = process_node( + handle.clone(), + "".into(), // this is the root node, it has an empty name + node, + blob_service, + directory_putter, + )?; + + // In case the root node points to a directory, we need to close + // [directory_putter], and ensure the digest we got back from there matches + // what the root node is pointing to. + if let castorepb::node::Node::Directory(ref directory_node) = root_node { + // Close directory_putter to make sure all directories have been inserted. + let directory_putter_digest = + handle.block_on(handle.spawn(async move { directory_putter.close().await }))??; + let root_directory_node_digest: B3Digest = + directory_node.digest.clone().try_into().unwrap(); + + if directory_putter_digest != root_directory_node_digest { + warn!( + root_directory_node_digest = %root_directory_node_digest, + directory_putter_digest =%directory_putter_digest, + "directory digest mismatch", + ); + return Err(io::Error::new( + io::ErrorKind::Other, + "directory digest mismatch", + )); + } + } + // In case it's not a Directory, [directory_putter] doesn't need to be + // closed (as we didn't end up uploading anything). + // It can just be dropped, as documented in its trait. + + Ok(root_node) +} + +/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node]. +/// It does so by handling all three kinds, and recursing for directories. +/// +/// [DirectoryPutter] is passed around, so a single instance of it can be used, +/// which is sufficient, as this reads through the whole NAR linerarly. +fn process_node( + handle: tokio::runtime::Handle, + name: bytes::Bytes, + node: nar::reader::Node, + blob_service: Arc<dyn BlobService>, + directory_putter: Box<dyn DirectoryPutter>, +) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)> { + Ok(match node { + nar::reader::Node::Symlink { target } => ( + castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name, + target: target.into(), + }), + directory_putter, + ), + nar::reader::Node::File { executable, reader } => ( + castorepb::node::Node::File(process_file_reader( + handle, + name, + reader, + executable, + blob_service, + )?), + directory_putter, + ), + nar::reader::Node::Directory(dir_reader) => { + let (directory_node, directory_putter) = process_dir_reader( + handle, + name, + dir_reader, + blob_service.clone(), + directory_putter, + )?; + + ( + castorepb::node::Node::Directory(directory_node), + directory_putter, + ) + } + }) +} + +/// Given a name and [nar::reader::FileReader], this ingests the file into the +/// passed [BlobService] and returns a [castorepb::FileNode]. +fn process_file_reader( + handle: tokio::runtime::Handle, + name: Bytes, + mut file_reader: nar::reader::FileReader, + executable: bool, + blob_service: Arc<dyn BlobService>, +) -> io::Result<castorepb::FileNode> { + // store the length. If we read any other length, reading will fail. + let expected_len = file_reader.len(); + + // prepare writing a new blob. + let blob_writer = handle.block_on(handle.spawn({ + let blob_service = blob_service.clone(); + async move { blob_service.open_write().await } + }))?; + + // write the blob. + let mut blob_writer = { + let mut dest = SyncIoBridge::new(blob_writer); + io::copy(&mut file_reader, &mut dest)?; + + dest.shutdown()?; + + // return back the blob_reader + dest.into_inner() + }; + + // close the blob_writer, retrieve the digest. + let blob_digest = handle.block_on(handle.spawn(async move { blob_writer.close().await }))??; + + Ok(castorepb::FileNode { + name, + digest: blob_digest.into(), + size: expected_len, + executable, + }) +} + +/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode]. +/// It uses [process_node] to iterate over all children. +/// +/// [DirectoryPutter] is passed around, so a single instance of it can be used, +/// which is sufficient, as this reads through the whole NAR linerarly. +fn process_dir_reader( + handle: tokio::runtime::Handle, + name: Bytes, + mut dir_reader: nar::reader::DirReader, + blob_service: Arc<dyn BlobService>, + directory_putter: Box<dyn DirectoryPutter>, +) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)> { + let mut directory = castorepb::Directory::default(); + + let mut directory_putter = directory_putter; + while let Some(entry) = dir_reader.next()? { + let (node, directory_putter_back) = process_node( + handle.clone(), + entry.name.into(), + entry.node, + blob_service.clone(), + directory_putter, + )?; + + directory_putter = directory_putter_back; + + match node { + castorepb::node::Node::Directory(node) => directory.directories.push(node), + castorepb::node::Node::File(node) => directory.files.push(node), + castorepb::node::Node::Symlink(node) => directory.symlinks.push(node), + } + } + + // calculate digest and size. + let directory_digest = directory.digest(); + let directory_size = directory.size(); + + // upload the directory. This is a bit more verbose, as we want to get back + // directory_putter for later reuse. + let directory_putter = handle.block_on(handle.spawn(async move { + directory_putter.put(directory).await?; + Ok::<_, io::Error>(directory_putter) + }))??; + + Ok(( + castorepb::DirectoryNode { + name, + digest: directory_digest.into(), + size: directory_size, + }, + directory_putter, + )) +} + +#[cfg(test)] +mod test { + use crate::nar::read_nar; + use std::io::Cursor; + + use tokio_stream::StreamExt; + use tvix_castore::fixtures::{ + DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST, HELLOWORLD_BLOB_CONTENTS, + HELLOWORLD_BLOB_DIGEST, + }; + use tvix_castore::proto as castorepb; + use tvix_castore::utils::{gen_blob_service, gen_directory_service}; + + use crate::tests::fixtures::{ + NAR_CONTENTS_COMPLICATED, NAR_CONTENTS_HELLOWORLD, NAR_CONTENTS_SYMLINK, + }; + + #[tokio::test] + async fn single_symlink() { + let handle = tokio::runtime::Handle::current(); + + let root_node = handle + .spawn_blocking(|| { + read_nar( + &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), + gen_blob_service(), + gen_directory_service(), + ) + }) + .await + .unwrap() + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "".into(), // name must be empty + target: "/nix/store/somewhereelse".into(), + }), + root_node + ); + } + + #[tokio::test] + async fn single_file() { + let blob_service = gen_blob_service(); + + let handle = tokio::runtime::Handle::current(); + + let root_node = handle + .spawn_blocking({ + let blob_service = blob_service.clone(); + || { + read_nar( + &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), + blob_service, + gen_directory_service(), + ) + } + }) + .await + .unwrap() + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::File(castorepb::FileNode { + name: "".into(), // name must be empty + digest: HELLOWORLD_BLOB_DIGEST.clone().into(), + size: HELLOWORLD_BLOB_CONTENTS.len() as u64, + executable: false, + }), + root_node + ); + + // blobservice must contain the blob + assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).await.unwrap()); + } + + #[tokio::test] + async fn complicated() { + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + + let handle = tokio::runtime::Handle::current(); + + let root_node = handle + .spawn_blocking({ + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + || { + read_nar( + &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), + blob_service, + directory_service, + ) + } + }) + .await + .unwrap() + .expect("must parse"); + + assert_eq!( + castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: "".into(), // name must be empty + digest: DIRECTORY_COMPLICATED.digest().into(), + size: DIRECTORY_COMPLICATED.size(), + }), + root_node, + ); + + // blobservice must contain the blob + assert!(blob_service.has(&EMPTY_BLOB_DIGEST).await.unwrap()); + + // directoryservice must contain the directories, at least with get_recursive. + let resp: Result<Vec<castorepb::Directory>, _> = directory_service + .get_recursive(&DIRECTORY_COMPLICATED.digest()) + .collect() + .await; + + let directories = resp.unwrap(); + + assert_eq!(2, directories.len()); + assert_eq!(DIRECTORY_COMPLICATED.clone(), directories[0]); + assert_eq!(DIRECTORY_WITH_KEEP.clone(), directories[1]); + } +} |