about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/nix-compat/src/nar/reader/mod.rs88
-rw-r--r--tvix/store/src/nar/import.rs10
2 files changed, 80 insertions, 18 deletions
diff --git a/tvix/nix-compat/src/nar/reader/mod.rs b/tvix/nix-compat/src/nar/reader/mod.rs
index 4958a67de304..fa7ddc77f96c 100644
--- a/tvix/nix-compat/src/nar/reader/mod.rs
+++ b/tvix/nix-compat/src/nar/reader/mod.rs
@@ -7,7 +7,7 @@
 use std::io::{
     self, BufRead,
     ErrorKind::{InvalidData, UnexpectedEof},
-    Read,
+    Read, Write,
 };
 
 // Required reading for understanding this module.
@@ -111,6 +111,62 @@ impl<'a, 'r> FileReader<'a, 'r> {
     }
 }
 
+impl FileReader<'_, '_> {
+    /// Equivalent to [BufRead::fill_buf]
+    ///
+    /// We can't directly implement [BufRead], because [FileReader::consume] needs
+    /// to perform fallible I/O.
+    pub fn fill_buf(&mut self) -> io::Result<&[u8]> {
+        if self.is_empty() {
+            return Ok(&[]);
+        }
+
+        let mut buf = self.reader.fill_buf()?;
+
+        if buf.is_empty() {
+            return Err(UnexpectedEof.into());
+        }
+
+        if buf.len() as u64 > self.len {
+            buf = &buf[..self.len as usize];
+        }
+
+        Ok(buf)
+    }
+
+    /// Analogous to [BufRead::consume], differing only in that it needs
+    /// to perform I/O in order to read padding and terminators.
+    pub fn consume(&mut self, n: usize) -> io::Result<()> {
+        if n == 0 {
+            return Ok(());
+        }
+
+        self.len = self
+            .len
+            .checked_sub(n as u64)
+            .expect("consumed bytes past EOF");
+
+        self.reader.consume(n);
+
+        if self.is_empty() {
+            self.finish()?;
+        }
+
+        Ok(())
+    }
+
+    /// Copy the (remaining) contents of the file into `dst`.
+    pub fn copy(&mut self, mut dst: impl Write) -> io::Result<()> {
+        while !self.is_empty() {
+            let buf = self.fill_buf()?;
+            let n = dst.write(buf)?;
+            self.consume(n)?;
+        }
+
+        Ok(())
+    }
+}
+
 impl Read for FileReader<'_, '_> {
     fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
         if buf.is_empty() || self.is_empty() {
@@ -128,24 +184,30 @@ impl Read for FileReader<'_, '_> {
             return Err(UnexpectedEof.into());
         }
 
-        // If we've reached semantic EOF, consume and verify the padding and terminating TOK_PAR.
-        // Files are padded to 64 bits (8 bytes), just like any other byte string in the wire format.
         if self.is_empty() {
-            let pad = (self.pad & 7) as usize;
+            self.finish()?;
+        }
 
-            if pad != 0 {
-                let mut buf = [0; 8];
-                self.reader.read_exact(&mut buf[pad..])?;
+        Ok(n)
+    }
+}
 
-                if buf != [0; 8] {
-                    return Err(InvalidData.into());
-                }
-            }
+impl FileReader<'_, '_> {
+    /// We've reached semantic EOF, consume and verify the padding and terminating TOK_PAR.
+    /// Files are padded to 64 bits (8 bytes), just like any other byte string in the wire format.
+    fn finish(&mut self) -> io::Result<()> {
+        let pad = (self.pad & 7) as usize;
 
-            read::token(self.reader, &wire::TOK_PAR)?;
+        if pad != 0 {
+            let mut buf = [0; 8];
+            self.reader.read_exact(&mut buf[pad..])?;
+
+            if buf != [0; 8] {
+                return Err(InvalidData.into());
+            }
         }
 
-        Ok(n)
+        read::token(self.reader, &wire::TOK_PAR)
     }
 }
 
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index 000fc05663b2..e9065a670d21 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -135,13 +135,13 @@ fn process_file_reader(
 
     // write the blob.
     let mut blob_writer = {
-        let mut dest = SyncIoBridge::new(blob_writer);
-        io::copy(&mut file_reader, &mut dest)?;
+        let mut dst = SyncIoBridge::new(blob_writer);
 
-        dest.shutdown()?;
+        file_reader.copy(&mut dst)?;
+        dst.shutdown()?;
 
-        // return back the blob_reader
-        dest.into_inner()
+        // return back the blob_writer
+        dst.into_inner()
     };
 
     // close the blob_writer, retrieve the digest.