about summary refs log tree commit diff
path: root/tvix/store/src/import.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/import.rs')
-rw-r--r--tvix/store/src/import.rs54
1 files changed, 16 insertions, 38 deletions
diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs
index e62097ec468d..bf80eb4b71b9 100644
--- a/tvix/store/src/import.rs
+++ b/tvix/store/src/import.rs
@@ -1,19 +1,17 @@
-use crate::{chunkservice::read_all_and_chunk, directoryservice::DirectoryPutter, proto};
+use crate::{blobservice::BlobService, directoryservice::DirectoryService};
+use crate::{blobservice::BlobWriter, directoryservice::DirectoryPutter, proto};
 use std::{
     collections::HashMap,
     fmt::Debug,
     fs,
     fs::File,
+    io,
     os::unix::prelude::PermissionsExt,
     path::{Path, PathBuf},
 };
 use tracing::instrument;
 use walkdir::WalkDir;
 
-use crate::{
-    blobservice::BlobService, chunkservice::ChunkService, directoryservice::DirectoryService,
-};
-
 #[derive(Debug, thiserror::Error)]
 pub enum Error {
     #[error("failed to upload directory at {0}: {1}")]
@@ -57,9 +55,8 @@ impl From<super::Error> for Error {
 //
 // It assumes the caller adds returned nodes to the directories it assembles.
 #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))]
-fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: DirectoryPutter>(
+fn process_entry<BS: BlobService, DP: DirectoryPutter>(
     blob_service: &mut BS,
-    chunk_service: &mut CS,
     directory_putter: &mut DP,
     entry: &walkdir::DirEntry,
     maybe_directory: Option<proto::Directory>,
@@ -112,23 +109,16 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire
             .metadata()
             .map_err(|e| Error::UnableToStat(entry_path.clone(), e.into()))?;
 
-        let file = File::open(entry_path.clone())
+        let mut file = File::open(entry_path.clone())
             .map_err(|e| Error::UnableToOpen(entry_path.clone(), e))?;
 
-        let (blob_digest, blob_meta) = read_all_and_chunk(chunk_service, file)?;
-
-        // upload blobmeta if not there yet
-        if blob_service
-            .stat(&proto::StatBlobRequest {
-                digest: blob_digest.to_vec(),
-                include_chunks: false,
-                include_bao: false,
-            })?
-            .is_none()
-        {
-            // upload blobmeta
-            blob_service.put(&blob_digest, blob_meta)?;
-        }
+        let mut writer = blob_service.open_write()?;
+
+        if let Err(e) = io::copy(&mut file, &mut writer) {
+            return Err(Error::UnableToRead(entry_path, e));
+        };
+
+        let digest = writer.close()?;
 
         return Ok(proto::node::Node::File(proto::FileNode {
             name: entry
@@ -136,7 +126,7 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire
                 .to_str()
                 .map(|s| Ok(s.to_owned()))
                 .unwrap_or(Err(Error::InvalidEncoding(entry.path().to_path_buf())))?,
-            digest: blob_digest,
+            digest: digest.to_vec(),
             size: metadata.len() as u32,
             // If it's executable by the user, it'll become executable.
             // This matches nix's dump() function behaviour.
@@ -152,15 +142,9 @@ fn process_entry<BS: BlobService, CS: ChunkService + std::marker::Sync, DP: Dire
 /// to the PathInfoService.
 //
 // returns the root node, or an error.
-#[instrument(skip(blob_service, chunk_service, directory_service), fields(path=?p))]
-pub fn import_path<
-    BS: BlobService,
-    CS: ChunkService + std::marker::Sync,
-    DS: DirectoryService,
-    P: AsRef<Path> + Debug,
->(
+#[instrument(skip(blob_service, directory_service), fields(path=?p))]
+pub fn import_path<BS: BlobService, DS: DirectoryService, P: AsRef<Path> + Debug>(
     blob_service: &mut BS,
-    chunk_service: &mut CS,
     directory_service: &mut DS,
     p: P,
 ) -> Result<proto::node::Node, Error> {
@@ -212,13 +196,7 @@ pub fn import_path<
             }
         };
 
-        let node = process_entry(
-            blob_service,
-            chunk_service,
-            &mut directory_putter,
-            &entry,
-            maybe_directory,
-        )?;
+        let node = process_entry(blob_service, &mut directory_putter, &entry, maybe_directory)?;
 
         if entry.depth() == 0 {
             return Ok(node);