diff options
Diffstat (limited to 'tvix/store/src/nar/seekable.rs')
-rw-r--r-- | tvix/store/src/nar/seekable.rs | 422 |
1 files changed, 422 insertions, 0 deletions
diff --git a/tvix/store/src/nar/seekable.rs b/tvix/store/src/nar/seekable.rs new file mode 100644 index 000000000000..951c1dfc0198 --- /dev/null +++ b/tvix/store/src/nar/seekable.rs @@ -0,0 +1,422 @@ +use std::{ + cmp::min, + io, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use super::RenderError; + +use bytes::{BufMut, Bytes}; + +use nix_compat::nar::writer::sync as nar_writer; +use tvix_castore::blobservice::{BlobReader, BlobService}; +use tvix_castore::directoryservice::{ + DirectoryGraph, DirectoryService, RootToLeavesValidator, ValidatedDirectoryGraph, +}; +use tvix_castore::Directory; +use tvix_castore::{B3Digest, Node}; + +use futures::future::{BoxFuture, FusedFuture, TryMaybeDone}; +use futures::FutureExt; +use futures::TryStreamExt; + +use tokio::io::AsyncSeekExt; + +#[derive(Debug)] +struct BlobRef { + digest: B3Digest, + size: u64, +} + +#[derive(Debug)] +enum Data { + Literal(Bytes), + Blob(BlobRef), +} + +impl Data { + pub fn len(&self) -> u64 { + match self { + Data::Literal(data) => data.len() as u64, + Data::Blob(BlobRef { size, .. }) => *size, + } + } +} + +pub struct Reader<B: BlobService> { + segments: Vec<(u64, Data)>, + position_bytes: u64, + position_index: usize, + blob_service: Arc<B>, + seeking: bool, + current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>, +} + +/// Used during construction. +/// Converts the current buffer (passed as `cur_segment`) into a `Data::Literal` segment and +/// inserts it into `self.segments`. +fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) { + let segment_size = cur_segment.len(); + segments.push((*offset, Data::Literal(cur_segment.into()))); + *offset += segment_size as u64; +} + +/// Used during construction. +/// Recursively walks the node and its children, and fills `segments` with the appropriate +/// `Data::Literal` and `Data::Blob` elements. +fn walk_node( + segments: &mut Vec<(u64, Data)>, + offset: &mut u64, + get_directory: &impl Fn(&B3Digest) -> Directory, + node: Node, + // Includes a reference to the current segment's buffer + nar_node: nar_writer::Node<'_, Vec<u8>>, +) -> Result<(), RenderError> { + match node { + tvix_castore::Node::Symlink { target } => { + nar_node + .symlink(target.as_ref()) + .map_err(RenderError::NARWriterError)?; + } + tvix_castore::Node::File { + digest, + size, + executable, + } => { + let (cur_segment, skip) = nar_node + .file_manual_write(executable, size) + .map_err(RenderError::NARWriterError)?; + + // Flush the segment up until the beginning of the blob + flush_segment(segments, offset, std::mem::take(cur_segment)); + + // Insert the blob segment + segments.push((*offset, Data::Blob(BlobRef { digest, size }))); + *offset += size; + + // Close the file node + // We **intentionally** do not write the file contents anywhere. + // Instead we have stored the blob reference in a Data::Blob segment, + // and the poll_read implementation will take care of serving the + // appropriate blob at this offset. + skip.close(cur_segment) + .map_err(RenderError::NARWriterError)?; + } + tvix_castore::Node::Directory { digest, .. } => { + let directory = get_directory(&digest); + + // start a directory node + let mut nar_node_directory = + nar_node.directory().map_err(RenderError::NARWriterError)?; + + // for each node in the directory, create a new entry with its name, + // and then recurse on that entry. + for (name, node) in directory.nodes() { + let child_node = nar_node_directory + .entry(name.as_ref()) + .map_err(RenderError::NARWriterError)?; + + walk_node(segments, offset, get_directory, node.clone(), child_node)?; + } + + // close the directory + nar_node_directory + .close() + .map_err(RenderError::NARWriterError)?; + } + } + Ok(()) +} + +impl<B: BlobService + 'static> Reader<B> { + /// Creates a new seekable NAR renderer for the given castore root node. + /// + /// This function pre-fetches the directory closure using `get_recursive()` and assembles the + /// NAR structure, except the file contents which are stored as 'holes' with references to a blob + /// of a specific BLAKE3 digest and known size. The AsyncRead implementation will then switch + /// between serving the precomputed literal segments, and the appropriate blob for the file + /// contents. + pub async fn new( + root_node: Node, + blob_service: B, + directory_service: impl DirectoryService, + ) -> Result<Self, RenderError> { + let maybe_directory_closure = match &root_node { + // If this is a directory, resolve all subdirectories + Node::Directory { digest, .. } => { + let mut closure = DirectoryGraph::with_order( + RootToLeavesValidator::new_with_root_digest(digest.clone()), + ); + let mut stream = directory_service.get_recursive(digest); + while let Some(dir) = stream + .try_next() + .await + .map_err(|e| RenderError::StoreError(e.into()))? + { + closure.add(dir).map_err(|e| { + RenderError::StoreError( + tvix_castore::Error::StorageError(e.to_string()).into(), + ) + })?; + } + Some(closure.validate().map_err(|e| { + RenderError::StoreError(tvix_castore::Error::StorageError(e.to_string()).into()) + })?) + } + // If the top-level node is a file or a symlink, just pass it on + Node::File { .. } => None, + Node::Symlink { .. } => None, + }; + + Self::new_with_directory_closure(root_node, blob_service, maybe_directory_closure) + } + + /// Creates a new seekable NAR renderer for the given castore root node. + /// This version of the instantiation does not perform any I/O and as such is not async. + /// However it requires all directories to be passed as a ValidatedDirectoryGraph. + /// + /// panics if the directory closure is not the closure of the root node + pub fn new_with_directory_closure( + root_node: Node, + blob_service: B, + directory_closure: Option<ValidatedDirectoryGraph>, + ) -> Result<Self, RenderError> { + let directories = directory_closure + .map(|directory_closure| { + let mut directories: Vec<(B3Digest, Directory)> = vec![]; + for dir in directory_closure.drain_root_to_leaves() { + let digest = dir.digest(); + let pos = directories + .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| { + digest.as_slice() + }) + .expect_err("duplicate directory"); // DirectoryGraph checks this + directories.insert(pos, (digest, dir)); + } + directories + }) + .unwrap_or_default(); + + let mut segments = vec![]; + let mut cur_segment: Vec<u8> = vec![]; + let mut offset = 0; + + let nar_node = nar_writer::open(&mut cur_segment).map_err(RenderError::NARWriterError)?; + + walk_node( + &mut segments, + &mut offset, + &|digest| { + directories + .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| digest.as_slice()) + .map(|pos| directories[pos].clone()) + .expect("missing directory") // DirectoryGraph checks this + .1 + }, + root_node, + nar_node, + )?; + // Flush the final segment + flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment)); + + Ok(Reader { + segments, + position_bytes: 0, + position_index: 0, + blob_service: blob_service.into(), + seeking: false, + current_blob: TryMaybeDone::Gone, + }) + } + + pub fn stream_len(&self) -> u64 { + self.segments + .last() + .map(|&(off, ref data)| off + data.len()) + .expect("no segment found") + } +} + +impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> { + fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> { + let stream_len = Reader::stream_len(&self); + + let this = &mut *self; + if this.seeking { + return Err(io::Error::new(io::ErrorKind::Other, "Already seeking")); + } + this.seeking = true; + + // TODO(edef): be sane about overflows + let pos = match pos { + io::SeekFrom::Start(n) => n, + io::SeekFrom::End(n) => (stream_len as i64 + n) as u64, + io::SeekFrom::Current(n) => (this.position_bytes as i64 + n) as u64, + }; + + let prev_position_bytes = this.position_bytes; + let prev_position_index = this.position_index; + + this.position_bytes = min(pos, stream_len); + this.position_index = match this + .segments + .binary_search_by_key(&this.position_bytes, |&(off, _)| off) + { + Ok(idx) => idx, + Err(idx) => idx - 1, + }; + + let Some((offset, Data::Blob(BlobRef { digest, .. }))) = + this.segments.get(this.position_index) + else { + // If not seeking into a blob, we clear the active blob reader and then we're done + this.current_blob = TryMaybeDone::Gone; + return Ok(()); + }; + let offset_in_segment = this.position_bytes - offset; + + if prev_position_bytes == this.position_bytes { + // position has not changed. do nothing + } else if prev_position_index == this.position_index { + // seeking within the same segment, re-use the blob reader + let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone); + this.current_blob = futures::future::try_maybe_done( + (async move { + let mut reader = Pin::new(&mut prev).take_output().unwrap(); + reader.seek(io::SeekFrom::Start(offset_in_segment)).await?; + Ok(reader) + }) + .boxed(), + ); + } else { + // seek to a different segment + let blob_service = this.blob_service.clone(); + let digest = digest.clone(); + this.current_blob = futures::future::try_maybe_done( + (async move { + let mut reader = + blob_service + .open_read(&digest) + .await? + .ok_or(io::Error::new( + io::ErrorKind::NotFound, + RenderError::BlobNotFound(digest.clone(), Default::default()), + ))?; + if offset_in_segment != 0 { + reader.seek(io::SeekFrom::Start(offset_in_segment)).await?; + } + Ok(reader) + }) + .boxed(), + ); + }; + + Ok(()) + } + fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> { + let this = &mut *self; + + if !this.current_blob.is_terminated() { + futures::ready!(this.current_blob.poll_unpin(cx))?; + } + this.seeking = false; + + Poll::Ready(Ok(this.position_bytes)) + } +} + +impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &mut tokio::io::ReadBuf, + ) -> Poll<io::Result<()>> { + let this = &mut *self; + + let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else { + return Poll::Ready(Ok(())); // EOF + }; + + let prev_read_buf_pos = buf.filled().len(); + match segment { + Data::Literal(data) => { + let offset_in_segment = this.position_bytes - offset; + let offset_in_segment = usize::try_from(offset_in_segment).unwrap(); + let remaining_data = data.len() - offset_in_segment; + let read_size = std::cmp::min(remaining_data, buf.remaining()); + buf.put(&data[offset_in_segment..offset_in_segment + read_size]); + } + Data::Blob(BlobRef { size, .. }) => { + futures::ready!(this.current_blob.poll_unpin(cx))?; + this.seeking = false; + let blob = Pin::new(&mut this.current_blob) + .output_mut() + .expect("missing blob"); + futures::ready!(Pin::new(blob).poll_read(cx, buf))?; + let read_length = buf.filled().len() - prev_read_buf_pos; + let maximum_expected_read_length = (offset + size) - this.position_bytes; + let is_eof = read_length == 0; + let too_much_returned = read_length as u64 > maximum_expected_read_length; + match (is_eof, too_much_returned) { + (true, false) => { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "blob short read", + ))) + } + (false, true) => { + buf.set_filled(prev_read_buf_pos); + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidInput, + "blob continued to yield data beyond end", + ))); + } + _ => {} + } + } + }; + let new_read_buf_pos = buf.filled().len(); + this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64; + + let prev_position_index = this.position_index; + while { + if let Some(&(offset, ref segment)) = this.segments.get(this.position_index) { + (this.position_bytes - offset) >= segment.len() + } else { + false + } + } { + this.position_index += 1; + } + if prev_position_index != this.position_index { + let Some((_offset, Data::Blob(BlobRef { digest, .. }))) = + this.segments.get(this.position_index) + else { + // If the next segment is not a blob, we clear the active blob reader and then we're done + this.current_blob = TryMaybeDone::Gone; + return Poll::Ready(Ok(())); + }; + + // The next segment is a blob, open the BlobReader + let blob_service = this.blob_service.clone(); + let digest = digest.clone(); + this.current_blob = futures::future::try_maybe_done( + (async move { + let reader = blob_service + .open_read(&digest) + .await? + .ok_or(io::Error::new( + io::ErrorKind::NotFound, + RenderError::BlobNotFound(digest.clone(), Default::default()), + ))?; + Ok(reader) + }) + .boxed(), + ); + } + + Poll::Ready(Ok(())) + } +} |