use std::{
cmp::min,
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use super::RenderError;
use bytes::{BufMut, Bytes};
use nix_compat::nar::writer::sync as nar_writer;
use tvix_castore::blobservice::{BlobReader, BlobService};
use tvix_castore::directoryservice::{
DirectoryGraph, DirectoryService, RootToLeavesValidator, ValidatedDirectoryGraph,
};
use tvix_castore::Directory;
use tvix_castore::{B3Digest, Node};
use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
use futures::FutureExt;
use futures::TryStreamExt;
use tokio::io::AsyncSeekExt;
#[derive(Debug)]
struct BlobRef {
digest: B3Digest,
size: u64,
}
#[derive(Debug)]
enum Data {
Literal(Bytes),
Blob(BlobRef),
}
impl Data {
pub fn len(&self) -> u64 {
match self {
Data::Literal(data) => data.len() as u64,
Data::Blob(BlobRef { size, .. }) => *size,
}
}
}
pub struct Reader<B: BlobService> {
segments: Vec<(u64, Data)>,
position_bytes: u64,
position_index: usize,
blob_service: Arc<B>,
seeking: bool,
current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
}
/// Used during construction.
/// Converts the current buffer (passed as `cur_segment`) into a `Data::Literal` segment and
/// inserts it into `self.segments`.
fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
let segment_size = cur_segment.len();
segments.push((*offset, Data::Literal(cur_segment.into())));
*offset += segment_size as u64;
}
/// Used during construction.
/// Recursively walks the node and its children, and fills `segments` with the appropriate
/// `Data::Literal` and `Data::Blob` elements.
fn walk_node(
segments: &mut Vec<(u64, Data)>,
offset: &mut u64,
get_directory: &impl Fn(&B3Digest) -> Directory,
node: Node,
// Includes a reference to the current segment's buffer
nar_node: nar_writer::Node<'_, Vec<u8>>,
) -> Result<(), RenderError> {
match node {
tvix_castore::Node::Symlink { target } => {
nar_node
.symlink(target.as_ref())
.map_err(RenderError::NARWriterError)?;
}
tvix_castore::Node::File {
digest,
size,
executable,
} => {
let (cur_segment, skip) = nar_node
.file_manual_write(executable, size)
.map_err(RenderError::NARWriterError)?;
// Flush the segment up until the beginning of the blob
flush_segment(segments, offset, std::mem::take(cur_segment));
// Insert the blob segment
segments.push((*offset, Data::Blob(BlobRef { digest, size })));
*offset += size;
// Close the file node
// We **intentionally** do not write the file contents anywhere.
// Instead we have stored the blob reference in a Data::Blob segment,
// and the poll_read implementation will take care of serving the
// appropriate blob at this offset.
skip.close(cur_segment)
.map_err(RenderError::NARWriterError)?;
}
tvix_castore::Node::Directory { digest, .. } => {
let directory = get_directory(&digest);
// start a directory node
let mut nar_node_directory =
nar_node.directory().map_err(RenderError::NARWriterError)?;
// for each node in the directory, create a new entry with its name,
// and then recurse on that entry.
for (name, node) in directory.nodes() {
let child_node = nar_node_directory
.entry(name.as_ref())
.map_err(RenderError::NARWriterError)?;
walk_node(segments, offset, get_directory, node.clone(), child_node)?;
}
// close the directory
nar_node_directory
.close()
.map_err(RenderError::NARWriterError)?;
}
}
Ok(())
}
impl<B: BlobService + 'static> Reader<B> {
/// Creates a new seekable NAR renderer for the given castore root node.
///
/// This function pre-fetches the directory closure using `get_recursive()` and assembles the
/// NAR structure, except the file contents which are stored as 'holes' with references to a blob
/// of a specific BLAKE3 digest and known size. The AsyncRead implementation will then switch
/// between serving the precomputed literal segments, and the appropriate blob for the file
/// contents.
pub async fn new(
root_node: Node,
blob_service: B,
directory_service: impl DirectoryService,
) -> Result<Self, RenderError> {
let maybe_directory_closure = match &root_node {
// If this is a directory, resolve all subdirectories
Node::Directory { digest, .. } => {
let mut closure = DirectoryGraph::with_order(
RootToLeavesValidator::new_with_root_digest(digest.clone()),
);
let mut stream = directory_service.get_recursive(digest);
while let Some(dir) = stream
.try_next()
.await
.map_err(|e| RenderError::StoreError(e.into()))?
{
closure.add(dir).map_err(|e| {
RenderError::StoreError(
tvix_castore::Error::StorageError(e.to_string()).into(),
)
})?;
}
Some(closure.validate().map_err(|e| {
RenderError::StoreError(tvix_castore::Error::StorageError(e.to_string()).into())
})?)
}
// If the top-level node is a file or a symlink, just pass it on
Node::File { .. } => None,
Node::Symlink { .. } => None,
};
Self::new_with_directory_closure(root_node, blob_service, maybe_directory_closure)
}
/// Creates a new seekable NAR renderer for the given castore root node.
/// This version of the instantiation does not perform any I/O and as such is not async.
/// However it requires all directories to be passed as a ValidatedDirectoryGraph.
///
/// panics if the directory closure is not the closure of the root node
pub fn new_with_directory_closure(
root_node: Node,
blob_service: B,
directory_closure: Option<ValidatedDirectoryGraph>,
) -> Result<Self, RenderError> {
let directories = directory_closure
.map(|directory_closure| {
let mut directories: Vec<(B3Digest, Directory)> = vec![];
for dir in directory_closure.drain_root_to_leaves() {
let digest = dir.digest();
let pos = directories
.binary_search_by_key(&digest.as_slice(), |(digest, _dir)| {
digest.as_slice()
})
.expect_err("duplicate directory"); // DirectoryGraph checks this
directories.insert(pos, (digest, dir));
}
directories
})
.unwrap_or_default();
let mut segments = vec![];
let mut cur_segment: Vec<u8> = vec![];
let mut offset = 0;
let nar_node = nar_writer::open(&mut cur_segment).map_err(RenderError::NARWriterError)?;
walk_node(
&mut segments,
&mut offset,
&|digest| {
directories
.binary_search_by_key(&digest.as_slice(), |(digest, _dir)| digest.as_slice())
.map(|pos| directories[pos].clone())
.expect("missing directory") // DirectoryGraph checks this
.1
},
root_node,
nar_node,
)?;
// Flush the final segment
flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
Ok(Reader {
segments,
position_bytes: 0,
position_index: 0,
blob_service: blob_service.into(),
seeking: false,
current_blob: TryMaybeDone::Gone,
})
}
pub fn stream_len(&self) -> u64 {
self.segments
.last()
.map(|&(off, ref data)| off + data.len())
.expect("no segment found")
}
}
impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
let stream_len = Reader::stream_len(&self);
let this = &mut *self;
if this.seeking {
return Err(io::Error::new(io::ErrorKind::Other, "Already seeking"));
}
this.seeking = true;
// TODO(edef): be sane about overflows
let pos = match pos {
io::SeekFrom::Start(n) => n,
io::SeekFrom::End(n) => (stream_len as i64 + n) as u64,
io::SeekFrom::Current(n) => (this.position_bytes as i64 + n) as u64,
};
let prev_position_bytes = this.position_bytes;
let prev_position_index = this.position_index;
this.position_bytes = min(pos, stream_len);
this.position_index = match this
.segments
.binary_search_by_key(&this.position_bytes, |&(off, _)| off)
{
Ok(idx) => idx,
Err(idx) => idx - 1,
};
let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
this.segments.get(this.position_index)
else {
// If not seeking into a blob, we clear the active blob reader and then we're done
this.current_blob = TryMaybeDone::Gone;
return Ok(());
};
let offset_in_segment = this.position_bytes - offset;
if prev_position_bytes == this.position_bytes {
// position has not changed. do nothing
} else if prev_position_index == this.position_index {
// seeking within the same segment, re-use the blob reader
let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
this.current_blob = futures::future::try_maybe_done(
(async move {
let mut reader = Pin::new(&mut prev).take_output().unwrap();
reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
Ok(reader)
})
.boxed(),
);
} else {
// seek to a different segment
let blob_service = this.blob_service.clone();
let digest = digest.clone();
this.current_blob = futures::future::try_maybe_done(
(async move {
let mut reader =
blob_service
.open_read(&digest)
.await?
.ok_or(io::Error::new(
io::ErrorKind::NotFound,
RenderError::BlobNotFound(digest.clone(), Default::default()),
))?;
if offset_in_segment != 0 {
reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
}
Ok(reader)
})
.boxed(),
);
};
Ok(())
}
fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
let this = &mut *self;
if !this.current_blob.is_terminated() {
futures::ready!(this.current_blob.poll_unpin(cx))?;
}
this.seeking = false;
Poll::Ready(Ok(this.position_bytes))
}
}
impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut tokio::io::ReadBuf,
) -> Poll<io::Result<()>> {
let this = &mut *self;
let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
return Poll::Ready(Ok(())); // EOF
};
let prev_read_buf_pos = buf.filled().len();
match segment {
Data::Literal(data) => {
let offset_in_segment = this.position_bytes - offset;
let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
let remaining_data = data.len() - offset_in_segment;
let read_size = std::cmp::min(remaining_data, buf.remaining());
buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
}
Data::Blob(BlobRef { size, .. }) => {
futures::ready!(this.current_blob.poll_unpin(cx))?;
this.seeking = false;
let blob = Pin::new(&mut this.current_blob)
.output_mut()
.expect("missing blob");
futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
let read_length = buf.filled().len() - prev_read_buf_pos;
let maximum_expected_read_length = (offset + size) - this.position_bytes;
let is_eof = read_length == 0;
let too_much_returned = read_length as u64 > maximum_expected_read_length;
match (is_eof, too_much_returned) {
(true, false) => {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"blob short read",
)))
}
(false, true) => {
buf.set_filled(prev_read_buf_pos);
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"blob continued to yield data beyond end",
)));
}
_ => {}
}
}
};
let new_read_buf_pos = buf.filled().len();
this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
let prev_position_index = this.position_index;
while {
if let Some(&(offset, ref segment)) = this.segments.get(this.position_index) {
(this.position_bytes - offset) >= segment.len()
} else {
false
}
} {
this.position_index += 1;
}
if prev_position_index != this.position_index {
let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
this.segments.get(this.position_index)
else {
// If the next segment is not a blob, we clear the active blob reader and then we're done
this.current_blob = TryMaybeDone::Gone;
return Poll::Ready(Ok(()));
};
// The next segment is a blob, open the BlobReader
let blob_service = this.blob_service.clone();
let digest = digest.clone();
this.current_blob = futures::future::try_maybe_done(
(async move {
let reader = blob_service
.open_read(&digest)
.await?
.ok_or(io::Error::new(
io::ErrorKind::NotFound,
RenderError::BlobNotFound(digest.clone(), Default::default()),
))?;
Ok(reader)
})
.boxed(),
);
}
Poll::Ready(Ok(()))
}
}