about summary refs log tree commit diff
path: root/tvix/castore/src/import/archive.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-05-01T10·52+0300
committerclbot <clbot@tvl.fyi>2024-05-02T15·26+0000
commit516c6dc572581872167851f0a68afc9025ca1350 (patch)
tree2049788097482d242fbd2fee65f39fe1e38507c6 /tvix/castore/src/import/archive.rs
parentabc0553eb8a0015bc277f73c44b0b73424b76aae (diff)
refactor(tvix/castore/import): use crate Path[Buf] in IngestionEntry r/8067
This explicitly splits ingestion-method-specific path types from the
castore types.

Change-Id: Ia3b16105fadb8d52927a4ed79dc4b34efdf4311b
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11563
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/import/archive.rs')
-rw-r--r--tvix/castore/src/import/archive.rs81
1 files changed, 48 insertions, 33 deletions
diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs
index 2da8b945d948..fb8ef9a50b48 100644
--- a/tvix/castore/src/import/archive.rs
+++ b/tvix/castore/src/import/archive.rs
@@ -1,8 +1,8 @@
 //! Imports from an archive (tarballs)
 
+use std::collections::HashMap;
 use std::io::{Cursor, Write};
 use std::sync::Arc;
-use std::{collections::HashMap, path::PathBuf};
 
 use petgraph::graph::{DiGraph, NodeIndex};
 use petgraph::visit::{DfsPostOrder, EdgeRef};
@@ -21,6 +21,8 @@ use crate::import::{ingest_entries, IngestionEntry, IngestionError};
 use crate::proto::node::Node;
 use crate::B3Digest;
 
+type TarPathBuf = std::path::PathBuf;
+
 /// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the
 /// background.
 ///
@@ -41,29 +43,32 @@ pub enum Error {
     NextEntry(std::io::Error),
 
     #[error("unable to read path for entry: {0}")]
-    Path(std::io::Error),
+    PathRead(std::io::Error),
+
+    #[error("unable to convert path {0} for entry: {1}")]
+    PathConvert(TarPathBuf, std::io::Error),
 
     #[error("unable to read size field for {0}: {1}")]
-    Size(PathBuf, std::io::Error),
+    Size(TarPathBuf, std::io::Error),
 
     #[error("unable to read mode field for {0}: {1}")]
-    Mode(PathBuf, std::io::Error),
+    Mode(TarPathBuf, std::io::Error),
 
     #[error("unable to read link name field for {0}: {1}")]
-    LinkName(PathBuf, std::io::Error),
+    LinkName(TarPathBuf, std::io::Error),
 
     #[error("unable to read blob contents for {0}: {1}")]
-    BlobRead(PathBuf, std::io::Error),
+    BlobRead(TarPathBuf, std::io::Error),
 
-    // TODO: proper error for blob finalize
+    // FUTUREWORK: proper error for blob finalize
     #[error("unable to finalize blob {0}: {1}")]
-    BlobFinalize(PathBuf, std::io::Error),
+    BlobFinalize(TarPathBuf, std::io::Error),
 
     #[error("unsupported tar entry {0} type: {1:?}")]
-    EntryType(PathBuf, tokio_tar::EntryType),
+    EntryType(TarPathBuf, tokio_tar::EntryType),
 
     #[error("symlink missing target {0}")]
-    MissingSymlinkTarget(PathBuf),
+    MissingSymlinkTarget(TarPathBuf),
 
     #[error("unexpected number of top level directory entries")]
     UnexpectedNumberOfTopLevelEntries,
@@ -94,7 +99,11 @@ where
 
     let mut entries_iter = archive.entries().map_err(Error::Entries)?;
     while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? {
-        let path: PathBuf = entry.path().map_err(Error::Path)?.into();
+        let tar_path: TarPathBuf = entry.path().map_err(Error::PathRead)?.into();
+
+        // construct a castore PathBuf, which we use in the produced IngestionEntry.
+        let path = crate::path::PathBuf::from_host_path(tar_path.as_path(), true)
+            .map_err(|e| Error::PathConvert(tar_path.clone(), e))?;
 
         let header = entry.header();
         let entry = match header.entry_type() {
@@ -103,7 +112,7 @@ where
             | tokio_tar::EntryType::Continuous => {
                 let header_size = header
                     .size()
-                    .map_err(|e| Error::Size(path.to_path_buf(), e))?;
+                    .map_err(|e| Error::Size(tar_path.clone(), e))?;
 
                 // If the blob is small enough, read it off the wire, compute the digest,
                 // and upload it to the [BlobService] in the background.
@@ -126,7 +135,7 @@ where
                         .unwrap();
                     let size = tokio::io::copy(&mut reader, &mut buffer)
                         .await
-                        .map_err(|e| Error::Size(path.to_path_buf(), e))?;
+                        .map_err(|e| Error::Size(tar_path.clone(), e))?;
 
                     let digest: B3Digest = hasher.finalize().as_bytes().into();
 
@@ -134,18 +143,18 @@ where
                         let blob_service = blob_service.clone();
                         let digest = digest.clone();
                         async_blob_uploads.spawn({
-                            let path = path.clone();
+                            let tar_path = tar_path.clone();
                             async move {
                                 let mut writer = blob_service.open_write().await;
 
                                 tokio::io::copy(&mut Cursor::new(buffer), &mut writer)
                                     .await
-                                    .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?;
+                                    .map_err(|e| Error::BlobRead(tar_path.clone(), e))?;
 
                                 let blob_digest = writer
                                     .close()
                                     .await
-                                    .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?;
+                                    .map_err(|e| Error::BlobFinalize(tar_path, e))?;
 
                                 assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch");
 
@@ -163,12 +172,12 @@ where
 
                     let size = tokio::io::copy(&mut entry, &mut writer)
                         .await
-                        .map_err(|e| Error::BlobRead(path.to_path_buf(), e))?;
+                        .map_err(|e| Error::BlobRead(tar_path.clone(), e))?;
 
                     let digest = writer
                         .close()
                         .await
-                        .map_err(|e| Error::BlobFinalize(path.to_path_buf(), e))?;
+                        .map_err(|e| Error::BlobFinalize(tar_path.clone(), e))?;
 
                     (size, digest)
                 };
@@ -176,7 +185,7 @@ where
                 let executable = entry
                     .header()
                     .mode()
-                    .map_err(|e| Error::Mode(path.to_path_buf(), e))?
+                    .map_err(|e| Error::Mode(tar_path, e))?
                     & 64
                     != 0;
 
@@ -190,8 +199,8 @@ where
             tokio_tar::EntryType::Symlink => IngestionEntry::Symlink {
                 target: entry
                     .link_name()
-                    .map_err(|e| Error::LinkName(path.to_path_buf(), e))?
-                    .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))?
+                    .map_err(|e| Error::LinkName(tar_path.clone(), e))?
+                    .ok_or_else(|| Error::MissingSymlinkTarget(tar_path.clone()))?
                     .into_owned()
                     .into_os_string()
                     .into_encoded_bytes(),
@@ -200,11 +209,11 @@ where
             // Push a bogus directory marker so we can make sure this directoy gets
             // created. We don't know the digest and size until after reading the full
             // tarball.
-            tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() },
+            tokio_tar::EntryType::Directory => IngestionEntry::Dir { path },
 
             tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue,
 
-            entry_type => return Err(Error::EntryType(path, entry_type).into()),
+            entry_type => return Err(Error::EntryType(tar_path, entry_type).into()),
         };
 
         nodes.add(entry)?;
@@ -239,7 +248,7 @@ where
 /// An error is returned if this is not the case and ingestion will fail.
 struct IngestionEntryGraph {
     graph: DiGraph<IngestionEntry, ()>,
-    path_to_index: HashMap<PathBuf, NodeIndex>,
+    path_to_index: HashMap<crate::path::PathBuf, NodeIndex>,
     root_node: Option<NodeIndex>,
 }
 
@@ -264,7 +273,7 @@ impl IngestionEntryGraph {
     /// and new nodes are not directories, the node is replaced and is disconnected from its
     /// children.
     pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> {
-        let path = entry.path().to_path_buf();
+        let path = entry.path().to_owned();
 
         let index = match self.path_to_index.get(entry.path()) {
             Some(&index) => {
@@ -284,7 +293,7 @@ impl IngestionEntryGraph {
             // We expect archives to contain a single root node, if there is another root node
             // entry with a different path name, this is unsupported.
             if let Some(root_node) = self.root_node {
-                if self.get_node(root_node).path() != path {
+                if self.get_node(root_node).path() != &path {
                     return Err(Error::UnexpectedNumberOfTopLevelEntries);
                 }
             }
@@ -293,7 +302,7 @@ impl IngestionEntryGraph {
         } else if let Some(parent_path) = path.parent() {
             // Recursively add the parent node until it hits the root node.
             let parent_index = self.add(IngestionEntry::Dir {
-                path: parent_path.to_path_buf(),
+                path: parent_path.to_owned(),
             })?;
 
             // Insert an edge from the parent directory to the child entry.
@@ -378,23 +387,29 @@ mod test {
 
     lazy_static! {
         pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into();
-        pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() };
-        pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() };
-        pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() };
+        pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir {
+            path: "a".parse().unwrap()
+        };
+        pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir {
+            path: "b".parse().unwrap()
+        };
+        pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir {
+            path: "a/b".parse().unwrap()
+        };
         pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular {
-            path: "a".into(),
+            path: "a".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),
         };
         pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular {
-            path: "a/b".into(),
+            path: "a/b".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),
         };
         pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular {
-            path: "a/b/c".into(),
+            path: "a/b/c".parse().unwrap(),
             size: 0,
             executable: false,
             digest: EMPTY_DIGEST.clone(),