From bcc00fba8f65c767f831ab05f15278d23ea4b322 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sat, 13 Apr 2024 19:17:58 +0300 Subject: refactor(tvix/castore/import): inline process_entry This did very little, and especially the part of relying on the outside caller to pass in a Directory if the type is a directory required having per-entry-type specific logic anyways. It's cleaner to just inline it. Change-Id: I997a8513ee91c67b0a2443cb5cd9e8700f69211e Reviewed-on: https://cl.tvl.fyi/c/depot/+/11414 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: Connor Brewster --- tvix/castore/src/import.rs | 183 ++++++++++++++++----------------------------- 1 file changed, 65 insertions(+), 118 deletions(-) (limited to 'tvix/castore/src/import.rs') diff --git a/tvix/castore/src/import.rs b/tvix/castore/src/import.rs index 976ee98f3b..2e1d4c2af3 100644 --- a/tvix/castore/src/import.rs +++ b/tvix/castore/src/import.rs @@ -1,5 +1,4 @@ use crate::blobservice::BlobService; -use crate::directoryservice::DirectoryPutter; use crate::directoryservice::DirectoryService; use crate::proto::node::Node; use crate::proto::Directory; @@ -234,28 +233,71 @@ where invariant_checker.see(&entry); } - // FUTUREWORK: handle directory putting here, rather than piping it through process_entry. - let node = process_entry( - blob_service.clone(), - &mut directory_putter, - &entry, - // process_entry wants an Option in case the entry points to a directory. - // make sure to provide it. - // If the directory has contents, we already have it in - // `directories` because we iterate over depth in reverse order (deepest to - // shallowest). - if entry.file_type().is_dir() { - Some( - directories - .remove(entry.path()) - // In that case, it contained no children - .unwrap_or_default(), - ) - } else { - None - }, - ) - .await?; + let file_type = entry.file_type(); + + let node = if file_type.is_dir() { + // if the entry was a directory, use the directory_putter to upload the Directory. + let directory = directories + .remove(entry.path()) + // In that case, it contained no children + .unwrap_or_default(); + + let directory_size = directory.size(); + let directory_digest = directory.digest(); + + directory_putter.put(directory).await?; + + Node::Directory(DirectoryNode { + name: entry.file_name().as_bytes().to_owned().into(), + digest: directory_digest.into(), + size: directory_size, + }) + } else if file_type.is_symlink() { + let target: bytes::Bytes = std::fs::read_link(entry.path()) + .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))? + .as_os_str() + .as_bytes() + .to_owned() + .into(); + + Node::Symlink(SymlinkNode { + name: entry.file_name().as_bytes().to_owned().into(), + target, + }) + } else if file_type.is_file() { + let metadata = entry + .metadata() + .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?; + + let mut file = tokio::fs::File::open(entry.path()) + .await + .map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?; + + let mut writer = blob_service.as_ref().open_write().await; + + if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { + return Err(Error::UnableToRead(entry.path().to_path_buf(), e)); + }; + + let digest = writer + .close() + .await + .map_err(|e| Error::UnableToRead(entry.path().to_path_buf(), e))?; + + Node::File(FileNode { + name: entry.file_name().as_bytes().to_vec().into(), + digest: digest.into(), + size: metadata.len(), + // If it's executable by the user, it'll become executable. + // This matches nix's dump() function behaviour. + executable: metadata.permissions().mode() & 64 != 0, + }) + } else { + return Err(Error::UnsupportedFileType( + entry.path().to_path_buf(), + file_type, + )); + }; if entry.depth() == 0 { // Make sure all the directories are flushed. @@ -282,98 +324,3 @@ where // unreachable, we already bailed out before if root doesn't exist. unreachable!("Tvix bug: no root node emitted during ingestion") } - -/// This processes a given [walkdir::DirEntry] and returns a -/// proto::node::Node, depending on the type of the entry. -/// -/// If the entry is a file, its contents are uploaded. -/// If the entry is a directory, the Directory is uploaded as well. -/// For this to work, it relies on the caller to provide the directory object -/// with the previously returned (child) nodes. -/// -/// It assumes to be called only if all children of it have already been processed. If the entry is -/// indeed a directory, it'll also upload that directory to the store. For this, the -/// so-far-assembled Directory object for this path needs to be passed in. -/// -/// It assumes the caller adds returned nodes to the directories it assembles. -#[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] -async fn process_entry<'a, BS>( - blob_service: BS, - directory_putter: &'a mut Box, - entry: &'a walkdir::DirEntry, - maybe_directory: Option, -) -> Result -where - BS: AsRef + Clone, -{ - let file_type = entry.file_type(); - - if file_type.is_dir() { - let directory = maybe_directory - .expect("tvix bug: must be called with some directory in the case of directory"); - let directory_digest = directory.digest(); - let directory_size = directory.size(); - - // upload this directory - directory_putter - .put(directory) - .await - .map_err(|e| Error::UploadDirectoryError(entry.path().to_path_buf(), e))?; - - return Ok(Node::Directory(DirectoryNode { - name: entry.file_name().as_bytes().to_owned().into(), - digest: directory_digest.into(), - size: directory_size, - })); - } - - if file_type.is_symlink() { - let target: bytes::Bytes = std::fs::read_link(entry.path()) - .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))? - .as_os_str() - .as_bytes() - .to_owned() - .into(); - - return Ok(Node::Symlink(SymlinkNode { - name: entry.file_name().as_bytes().to_owned().into(), - target, - })); - } - - if file_type.is_file() { - let metadata = entry - .metadata() - .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?; - - let mut file = tokio::fs::File::open(entry.path()) - .await - .map_err(|e| Error::UnableToOpen(entry.path().to_path_buf(), e))?; - - let mut writer = blob_service.as_ref().open_write().await; - - if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { - return Err(Error::UnableToRead(entry.path().to_path_buf(), e)); - }; - - let digest = writer - .close() - .await - .map_err(|e| Error::UnableToRead(entry.path().to_path_buf(), e))?; - - return Ok(Node::File(FileNode { - name: entry.file_name().as_bytes().to_vec().into(), - digest: digest.into(), - size: metadata.len(), - // If it's executable by the user, it'll become executable. - // This matches nix's dump() function behaviour. - executable: metadata.permissions().mode() & 64 != 0, - })); - } - - // Nix says things like: error: file '/home/raito/dev/code.tvl.fyi/tvix/glue/src/tests/import_fixtures/a_devnode' has an unsupported type - Err(Error::UnsupportedFileType( - entry.path().to_path_buf(), - file_type, - )) -} -- cgit 1.4.1