diff options
Diffstat (limited to 'tvix/castore/src/import/mod.rs')
-rw-r--r-- | tvix/castore/src/import/mod.rs | 247 |
1 files changed, 178 insertions, 69 deletions
diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index e9fdc750f8..c57c5bcada 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -4,9 +4,9 @@ //! Specific implementations, such as ingesting from the filesystem, live in //! child modules. -use crate::blobservice::BlobService; use crate::directoryservice::DirectoryPutter; use crate::directoryservice::DirectoryService; +use crate::path::{Path, PathBuf}; use crate::proto::node::Node; use crate::proto::Directory; use crate::proto::DirectoryNode; @@ -14,23 +14,19 @@ use crate::proto::FileNode; use crate::proto::SymlinkNode; use crate::B3Digest; use futures::{Stream, StreamExt}; -use std::fs::FileType; +use tracing::Span; +use tracing_indicatif::span_ext::IndicatifSpanExt; use tracing::Level; -#[cfg(target_family = "unix")] -use std::os::unix::ffi::OsStrExt; - -use std::{ - collections::HashMap, - path::{Path, PathBuf}, -}; +use std::collections::HashMap; use tracing::instrument; mod error; -pub use error::Error; +pub use error::IngestionError; pub mod archive; +pub mod blobs; pub mod fs; /// Ingests [IngestionEntry] from the given stream into a the passed [DirectoryService]. @@ -50,16 +46,22 @@ pub mod fs; /// map and upload it to the [DirectoryService] through a lazily created [DirectoryPutter]. /// /// On success, returns the root node. -#[instrument(skip_all, ret(level = Level::TRACE), err)] -pub async fn ingest_entries<DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error> +#[instrument(skip_all, fields(indicatif.pb_show=1), ret(level = Level::TRACE), err)] +pub async fn ingest_entries<DS, S, E>( + directory_service: DS, + mut entries: S, +) -> Result<Node, IngestionError<E>> where - DS: AsRef<dyn DirectoryService>, - S: Stream<Item = Result<IngestionEntry, Error>> + Send + std::marker::Unpin, + DS: DirectoryService, + S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin, + E: std::error::Error, { // For a given path, this holds the [Directory] structs as they are populated. let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); let mut maybe_directory_putter: Option<Box<dyn DirectoryPutter>> = None; + Span::current().pb_start(); + let root_node = loop { let mut entry = entries .next() @@ -68,20 +70,11 @@ where // we break the loop manually. .expect("Tvix bug: unexpected end of stream")?; - debug_assert!( - entry - .path() - .components() - .all(|x| matches!(x, std::path::Component::Normal(_))), - "path may only contain normal components" - ); - let name = entry .path() .file_name() // If this is the root node, it will have an empty name. .unwrap_or_default() - .as_bytes() .to_owned() .into(); @@ -89,7 +82,8 @@ where IngestionEntry::Dir { .. } => { // If the entry is a directory, we traversed all its children (and // populated it in `directories`). - // If we don't have it in there, it's an empty directory. + // If we don't have it in directories, it's a directory without + // children. let directory = directories .remove(entry.path()) // In that case, it contained no children @@ -102,9 +96,12 @@ where // If we don't have one yet (as that's the first one to upload), // initialize the putter. maybe_directory_putter - .get_or_insert_with(|| directory_service.as_ref().put_multiple_start()) + .get_or_insert_with(|| directory_service.put_multiple_start()) .put(directory) - .await?; + .await + .map_err(|e| { + IngestionError::UploadDirectoryError(entry.path().to_owned(), e) + })?; Node::Directory(DirectoryNode { name, @@ -114,7 +111,7 @@ where } IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode { name, - target: target.as_os_str().as_bytes().to_owned().into(), + target: target.to_owned().into(), }), IngestionEntry::Regular { size, @@ -127,23 +124,27 @@ where size: *size, executable: *executable, }), - IngestionEntry::Unknown { path, file_type } => { - return Err(Error::UnsupportedFileType(path.clone(), *file_type)); - } }; - if entry.path().components().count() == 1 { + let parent = entry + .path() + .parent() + .expect("Tvix bug: got entry with root node"); + + if parent == crate::Path::ROOT { break node; + } else { + // record node in parent directory, creating a new [Directory] if not there yet. + directories.entry(parent.to_owned()).or_default().add(node); } - - // record node in parent directory, creating a new [Directory] if not there yet. - directories - .entry(entry.path().parent().unwrap().to_path_buf()) - .or_default() - .add(node); }; assert!( + entries.count().await == 0, + "Tvix bug: left over elements in the stream" + ); + + assert!( directories.is_empty(), "Tvix bug: left over directories after processing ingestion stream" ); @@ -152,7 +153,10 @@ where // they're all persisted to the backend. if let Some(mut directory_putter) = maybe_directory_putter { #[cfg_attr(not(debug_assertions), allow(unused))] - let root_directory_digest = directory_putter.close().await?; + let root_directory_digest = directory_putter + .close() + .await + .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?; #[cfg(debug_assertions)] { @@ -174,31 +178,6 @@ where Ok(root_node) } -/// Uploads the file at the provided [Path] the the [BlobService]. -#[instrument(skip(blob_service), fields(path), err)] -async fn upload_blob_at_path<BS>(blob_service: BS, path: PathBuf) -> Result<B3Digest, Error> -where - BS: BlobService, -{ - let mut file = match tokio::fs::File::open(&path).await { - Ok(file) => file, - Err(e) => return Err(Error::UnableToRead(path, e)), - }; - - let mut writer = blob_service.open_write().await; - - if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { - return Err(Error::UnableToRead(path, e)); - }; - - let digest = writer - .close() - .await - .map_err(|e| Error::UnableToRead(path, e))?; - - Ok(digest) -} - #[derive(Debug, Clone, Eq, PartialEq)] pub enum IngestionEntry { Regular { @@ -209,15 +188,11 @@ pub enum IngestionEntry { }, Symlink { path: PathBuf, - target: PathBuf, + target: Vec<u8>, }, Dir { path: PathBuf, }, - Unknown { - path: PathBuf, - file_type: FileType, - }, } impl IngestionEntry { @@ -226,7 +201,6 @@ impl IngestionEntry { IngestionEntry::Regular { path, .. } => path, IngestionEntry::Symlink { path, .. } => path, IngestionEntry::Dir { path } => path, - IngestionEntry::Unknown { path, .. } => path, } } @@ -234,3 +208,138 @@ impl IngestionEntry { matches!(self, IngestionEntry::Dir { .. }) } } + +#[cfg(test)] +mod test { + use rstest::rstest; + + use crate::fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP, EMPTY_BLOB_DIGEST}; + use crate::proto::node::Node; + use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode}; + use crate::{directoryservice::MemoryDirectoryService, fixtures::DUMMY_DIGEST}; + + use super::ingest_entries; + use super::IngestionEntry; + + #[rstest] + #[case::single_file(vec![IngestionEntry::Regular { + path: "foo".parse().unwrap(), + size: 42, + executable: true, + digest: DUMMY_DIGEST.clone(), + }], + Node::File(FileNode { name: "foo".into(), digest: DUMMY_DIGEST.clone().into(), size: 42, executable: true } + ))] + #[case::single_symlink(vec![IngestionEntry::Symlink { + path: "foo".parse().unwrap(), + target: b"blub".into(), + }], + Node::Symlink(SymlinkNode { name: "foo".into(), target: "blub".into()}) + )] + #[case::single_dir(vec![IngestionEntry::Dir { + path: "foo".parse().unwrap(), + }], + Node::Directory(DirectoryNode { name: "foo".into(), digest: Directory::default().digest().into(), size: Directory::default().size()}) + )] + #[case::dir_with_keep(vec![ + IngestionEntry::Regular { + path: "foo/.keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + IngestionEntry::Dir { + path: "foo".parse().unwrap(), + }, + ], + Node::Directory(DirectoryNode { name: "foo".into(), digest: DIRECTORY_WITH_KEEP.digest().into(), size: DIRECTORY_WITH_KEEP.size() }) + )] + /// This is intentionally a bit unsorted, though it still satisfies all + /// requirements we have on the order of elements in the stream. + #[case::directory_complicated(vec![ + IngestionEntry::Regular { + path: "blub/.keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + IngestionEntry::Regular { + path: "blub/keep/.keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + IngestionEntry::Dir { + path: "blub/keep".parse().unwrap(), + }, + IngestionEntry::Symlink { + path: "blub/aa".parse().unwrap(), + target: b"/nix/store/somewhereelse".into(), + }, + IngestionEntry::Dir { + path: "blub".parse().unwrap(), + }, + ], + Node::Directory(DirectoryNode { name: "blub".into(), digest: DIRECTORY_COMPLICATED.digest().into(), size:DIRECTORY_COMPLICATED.size() }) + )] + #[tokio::test] + async fn test_ingestion(#[case] entries: Vec<IngestionEntry>, #[case] exp_root_node: Node) { + let directory_service = MemoryDirectoryService::default(); + + let root_node = ingest_entries( + directory_service.clone(), + futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)), + ) + .await + .expect("must succeed"); + + assert_eq!(exp_root_node, root_node, "root node should match"); + } + + #[rstest] + #[should_panic] + #[case::empty_entries(vec![])] + #[should_panic] + #[case::missing_intermediate_dir(vec![ + IngestionEntry::Regular { + path: "blub/.keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + ])] + #[should_panic] + #[case::leaf_after_parent(vec![ + IngestionEntry::Dir { + path: "blub".parse().unwrap(), + }, + IngestionEntry::Regular { + path: "blub/.keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + ])] + #[should_panic] + #[case::root_in_entry(vec![ + IngestionEntry::Regular { + path: ".keep".parse().unwrap(), + size: 0, + executable: false, + digest: EMPTY_BLOB_DIGEST.clone(), + }, + IngestionEntry::Dir { + path: "".parse().unwrap(), + }, + ])] + #[tokio::test] + async fn test_ingestion_fail(#[case] entries: Vec<IngestionEntry>) { + let directory_service = MemoryDirectoryService::default(); + + let _ = ingest_entries( + directory_service.clone(), + futures::stream::iter(entries.into_iter().map(Ok::<_, std::io::Error>)), + ) + .await; + } +} |