From a11abc02e26186b698bfe5ba6271f28e512c6cf6 Mon Sep 17 00:00:00 2001 From: edef Date: Sun, 19 Nov 2023 06:56:03 +0000 Subject: feat(nix-compat/nar/reader): provide passthrough buffered I/O Allow taking advantage of the buffer of the underlying reader to avoid unnecessary copies of file data. We can't easily implement the methods of BufRead directly, since we have some extra I/O to perform in the final consume() invocation. That could be resolved at the cost of additional bookkeeping, but this will suffice for now. Change-Id: I8100cf0abd79e7469670b8596bd989be5db44a91 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10089 Reviewed-by: flokli Tested-by: BuildkiteCI --- tvix/nix-compat/src/nar/reader/mod.rs | 88 +++++++++++++++++++++++++++++------ tvix/store/src/nar/import.rs | 10 ++-- 2 files changed, 80 insertions(+), 18 deletions(-) (limited to 'tvix') diff --git a/tvix/nix-compat/src/nar/reader/mod.rs b/tvix/nix-compat/src/nar/reader/mod.rs index 4958a67de304..fa7ddc77f96c 100644 --- a/tvix/nix-compat/src/nar/reader/mod.rs +++ b/tvix/nix-compat/src/nar/reader/mod.rs @@ -7,7 +7,7 @@ use std::io::{ self, BufRead, ErrorKind::{InvalidData, UnexpectedEof}, - Read, + Read, Write, }; // Required reading for understanding this module. @@ -111,6 +111,62 @@ impl<'a, 'r> FileReader<'a, 'r> { } } +impl FileReader<'_, '_> { + /// Equivalent to [BufRead::fill_buf] + /// + /// We can't directly implement [BufRead], because [FileReader::consume] needs + /// to perform fallible I/O. + pub fn fill_buf(&mut self) -> io::Result<&[u8]> { + if self.is_empty() { + return Ok(&[]); + } + + let mut buf = self.reader.fill_buf()?; + + if buf.is_empty() { + return Err(UnexpectedEof.into()); + } + + if buf.len() as u64 > self.len { + buf = &buf[..self.len as usize]; + } + + Ok(buf) + } + + /// Analogous to [BufRead::consume], differing only in that it needs + /// to perform I/O in order to read padding and terminators. + pub fn consume(&mut self, n: usize) -> io::Result<()> { + if n == 0 { + return Ok(()); + } + + self.len = self + .len + .checked_sub(n as u64) + .expect("consumed bytes past EOF"); + + self.reader.consume(n); + + if self.is_empty() { + self.finish()?; + } + + Ok(()) + } + + /// Copy the (remaining) contents of the file into `dst`. + pub fn copy(&mut self, mut dst: impl Write) -> io::Result<()> { + while !self.is_empty() { + let buf = self.fill_buf()?; + let n = dst.write(buf)?; + self.consume(n)?; + } + + Ok(()) + } +} + impl Read for FileReader<'_, '_> { fn read(&mut self, mut buf: &mut [u8]) -> io::Result { if buf.is_empty() || self.is_empty() { @@ -128,24 +184,30 @@ impl Read for FileReader<'_, '_> { return Err(UnexpectedEof.into()); } - // If we've reached semantic EOF, consume and verify the padding and terminating TOK_PAR. - // Files are padded to 64 bits (8 bytes), just like any other byte string in the wire format. if self.is_empty() { - let pad = (self.pad & 7) as usize; + self.finish()?; + } - if pad != 0 { - let mut buf = [0; 8]; - self.reader.read_exact(&mut buf[pad..])?; + Ok(n) + } +} - if buf != [0; 8] { - return Err(InvalidData.into()); - } - } +impl FileReader<'_, '_> { + /// We've reached semantic EOF, consume and verify the padding and terminating TOK_PAR. + /// Files are padded to 64 bits (8 bytes), just like any other byte string in the wire format. + fn finish(&mut self) -> io::Result<()> { + let pad = (self.pad & 7) as usize; - read::token(self.reader, &wire::TOK_PAR)?; + if pad != 0 { + let mut buf = [0; 8]; + self.reader.read_exact(&mut buf[pad..])?; + + if buf != [0; 8] { + return Err(InvalidData.into()); + } } - Ok(n) + read::token(self.reader, &wire::TOK_PAR) } } diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 000fc05663b2..e9065a670d21 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -135,13 +135,13 @@ fn process_file_reader( // write the blob. let mut blob_writer = { - let mut dest = SyncIoBridge::new(blob_writer); - io::copy(&mut file_reader, &mut dest)?; + let mut dst = SyncIoBridge::new(blob_writer); - dest.shutdown()?; + file_reader.copy(&mut dst)?; + dst.shutdown()?; - // return back the blob_reader - dest.into_inner() + // return back the blob_writer + dst.into_inner() }; // close the blob_writer, retrieve the digest. -- cgit 1.4.1