about summary refs log tree commit diff
diff options
context:
space:
mode:
authoredef <edef@edef.eu>2023-11-19T06·56+0000
committeredef <edef@edef.eu>2023-11-19T09·53+0000
commita11abc02e26186b698bfe5ba6271f28e512c6cf6 (patch)
tree4011411150684458dda8026f5282c8e99376e4e1
parent785ff80c8b8adad22927eee3a0c9c7aaa60072b3 (diff)
feat(nix-compat/nar/reader): provide passthrough buffered I/O r/7035
Allow taking advantage of the buffer of the underlying reader to avoid
unnecessary copies of file data.

We can't easily implement the methods of BufRead directly, since we
have some extra I/O to perform in the final consume() invocation.

That could be resolved at the cost of additional bookkeeping, but this
will suffice for now.

Change-Id: I8100cf0abd79e7469670b8596bd989be5db44a91
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10089
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
-rw-r--r--tvix/nix-compat/src/nar/reader/mod.rs88
-rw-r--r--tvix/store/src/nar/import.rs10
2 files changed, 80 insertions, 18 deletions
diff --git a/tvix/nix-compat/src/nar/reader/mod.rs b/tvix/nix-compat/src/nar/reader/mod.rs
index 4958a67de3..fa7ddc77f9 100644
--- a/tvix/nix-compat/src/nar/reader/mod.rs
+++ b/tvix/nix-compat/src/nar/reader/mod.rs
@@ -7,7 +7,7 @@
 use std::io::{
     self, BufRead,
     ErrorKind::{InvalidData, UnexpectedEof},
-    Read,
+    Read, Write,
 };
 
 // Required reading for understanding this module.
@@ -111,6 +111,62 @@ impl<'a, 'r> FileReader<'a, 'r> {
     }
 }
 
+impl FileReader<'_, '_> {
+    /// Equivalent to [BufRead::fill_buf]
+    ///
+    /// We can't directly implement [BufRead], because [FileReader::consume] needs
+    /// to perform fallible I/O.
+    pub fn fill_buf(&mut self) -> io::Result<&[u8]> {
+        if self.is_empty() {
+            return Ok(&[]);
+        }
+
+        let mut buf = self.reader.fill_buf()?;
+
+        if buf.is_empty() {
+            return Err(UnexpectedEof.into());
+        }
+
+        if buf.len() as u64 > self.len {
+            buf = &buf[..self.len as usize];
+        }
+
+        Ok(buf)
+    }
+
+    /// Analogous to [BufRead::consume], differing only in that it needs
+    /// to perform I/O in order to read padding and terminators.
+    pub fn consume(&mut self, n: usize) -> io::Result<()> {
+        if n == 0 {
+            return Ok(());
+        }
+
+        self.len = self
+            .len
+            .checked_sub(n as u64)
+            .expect("consumed bytes past EOF");
+
+        self.reader.consume(n);
+
+        if self.is_empty() {
+            self.finish()?;
+        }
+
+        Ok(())
+    }
+
+    /// Copy the (remaining) contents of the file into `dst`.
+    pub fn copy(&mut self, mut dst: impl Write) -> io::Result<()> {
+        while !self.is_empty() {
+            let buf = self.fill_buf()?;
+            let n = dst.write(buf)?;
+            self.consume(n)?;
+        }
+
+        Ok(())
+    }
+}
+
 impl Read for FileReader<'_, '_> {
     fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
         if buf.is_empty() || self.is_empty() {
@@ -128,24 +184,30 @@ impl Read for FileReader<'_, '_> {
             return Err(UnexpectedEof.into());
         }
 
-        // If we've reached semantic EOF, consume and verify the padding and terminating TOK_PAR.
-        // Files are padded to 64 bits (8 bytes), just like any other byte string in the wire format.
         if self.is_empty() {
-            let pad = (self.pad & 7) as usize;
+            self.finish()?;
+        }
 
-            if pad != 0 {
-                let mut buf = [0; 8];
-                self.reader.read_exact(&mut buf[pad..])?;
+        Ok(n)
+    }
+}
 
-                if buf != [0; 8] {
-                    return Err(InvalidData.into());
-                }
-            }
+impl FileReader<'_, '_> {
+    /// We've reached semantic EOF, consume and verify the padding and terminating TOK_PAR.
+    /// Files are padded to 64 bits (8 bytes), just like any other byte string in the wire format.
+    fn finish(&mut self) -> io::Result<()> {
+        let pad = (self.pad & 7) as usize;
 
-            read::token(self.reader, &wire::TOK_PAR)?;
+        if pad != 0 {
+            let mut buf = [0; 8];
+            self.reader.read_exact(&mut buf[pad..])?;
+
+            if buf != [0; 8] {
+                return Err(InvalidData.into());
+            }
         }
 
-        Ok(n)
+        read::token(self.reader, &wire::TOK_PAR)
     }
 }
 
diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs
index 000fc05663..e9065a670d 100644
--- a/tvix/store/src/nar/import.rs
+++ b/tvix/store/src/nar/import.rs
@@ -135,13 +135,13 @@ fn process_file_reader(
 
     // write the blob.
     let mut blob_writer = {
-        let mut dest = SyncIoBridge::new(blob_writer);
-        io::copy(&mut file_reader, &mut dest)?;
+        let mut dst = SyncIoBridge::new(blob_writer);
 
-        dest.shutdown()?;
+        file_reader.copy(&mut dst)?;
+        dst.shutdown()?;
 
-        // return back the blob_reader
-        dest.into_inner()
+        // return back the blob_writer
+        dst.into_inner()
     };
 
     // close the blob_writer, retrieve the digest.