about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
Diffstat (limited to 'tvix')
-rw-r--r--tvix/store/src/nar/mod.rs1
-rw-r--r--tvix/store/src/nar/seekable.rs422
-rw-r--r--tvix/store/src/tests/mod.rs1
-rw-r--r--tvix/store/src/tests/nar_renderer_seekable.rs111
4 files changed, 535 insertions, 0 deletions
diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs
index da798bbf3a3c..86505bcb0c07 100644
--- a/tvix/store/src/nar/mod.rs
+++ b/tvix/store/src/nar/mod.rs
@@ -3,6 +3,7 @@ use tvix_castore::B3Digest;
 
 mod import;
 mod renderer;
+pub mod seekable;
 pub use import::ingest_nar;
 pub use import::ingest_nar_and_hash;
 pub use renderer::calculate_size_and_sha256;
diff --git a/tvix/store/src/nar/seekable.rs b/tvix/store/src/nar/seekable.rs
new file mode 100644
index 000000000000..951c1dfc0198
--- /dev/null
+++ b/tvix/store/src/nar/seekable.rs
@@ -0,0 +1,422 @@
+use std::{
+    cmp::min,
+    io,
+    pin::Pin,
+    sync::Arc,
+    task::{Context, Poll},
+};
+
+use super::RenderError;
+
+use bytes::{BufMut, Bytes};
+
+use nix_compat::nar::writer::sync as nar_writer;
+use tvix_castore::blobservice::{BlobReader, BlobService};
+use tvix_castore::directoryservice::{
+    DirectoryGraph, DirectoryService, RootToLeavesValidator, ValidatedDirectoryGraph,
+};
+use tvix_castore::Directory;
+use tvix_castore::{B3Digest, Node};
+
+use futures::future::{BoxFuture, FusedFuture, TryMaybeDone};
+use futures::FutureExt;
+use futures::TryStreamExt;
+
+use tokio::io::AsyncSeekExt;
+
+#[derive(Debug)]
+struct BlobRef {
+    digest: B3Digest,
+    size: u64,
+}
+
+#[derive(Debug)]
+enum Data {
+    Literal(Bytes),
+    Blob(BlobRef),
+}
+
+impl Data {
+    pub fn len(&self) -> u64 {
+        match self {
+            Data::Literal(data) => data.len() as u64,
+            Data::Blob(BlobRef { size, .. }) => *size,
+        }
+    }
+}
+
+pub struct Reader<B: BlobService> {
+    segments: Vec<(u64, Data)>,
+    position_bytes: u64,
+    position_index: usize,
+    blob_service: Arc<B>,
+    seeking: bool,
+    current_blob: TryMaybeDone<BoxFuture<'static, io::Result<Box<dyn BlobReader>>>>,
+}
+
+/// Used during construction.
+/// Converts the current buffer (passed as `cur_segment`) into a `Data::Literal` segment and
+/// inserts it into `self.segments`.
+fn flush_segment(segments: &mut Vec<(u64, Data)>, offset: &mut u64, cur_segment: Vec<u8>) {
+    let segment_size = cur_segment.len();
+    segments.push((*offset, Data::Literal(cur_segment.into())));
+    *offset += segment_size as u64;
+}
+
+/// Used during construction.
+/// Recursively walks the node and its children, and fills `segments` with the appropriate
+/// `Data::Literal` and `Data::Blob` elements.
+fn walk_node(
+    segments: &mut Vec<(u64, Data)>,
+    offset: &mut u64,
+    get_directory: &impl Fn(&B3Digest) -> Directory,
+    node: Node,
+    // Includes a reference to the current segment's buffer
+    nar_node: nar_writer::Node<'_, Vec<u8>>,
+) -> Result<(), RenderError> {
+    match node {
+        tvix_castore::Node::Symlink { target } => {
+            nar_node
+                .symlink(target.as_ref())
+                .map_err(RenderError::NARWriterError)?;
+        }
+        tvix_castore::Node::File {
+            digest,
+            size,
+            executable,
+        } => {
+            let (cur_segment, skip) = nar_node
+                .file_manual_write(executable, size)
+                .map_err(RenderError::NARWriterError)?;
+
+            // Flush the segment up until the beginning of the blob
+            flush_segment(segments, offset, std::mem::take(cur_segment));
+
+            // Insert the blob segment
+            segments.push((*offset, Data::Blob(BlobRef { digest, size })));
+            *offset += size;
+
+            // Close the file node
+            // We **intentionally** do not write the file contents anywhere.
+            // Instead we have stored the blob reference in a Data::Blob segment,
+            // and the poll_read implementation will take care of serving the
+            // appropriate blob at this offset.
+            skip.close(cur_segment)
+                .map_err(RenderError::NARWriterError)?;
+        }
+        tvix_castore::Node::Directory { digest, .. } => {
+            let directory = get_directory(&digest);
+
+            // start a directory node
+            let mut nar_node_directory =
+                nar_node.directory().map_err(RenderError::NARWriterError)?;
+
+            // for each node in the directory, create a new entry with its name,
+            // and then recurse on that entry.
+            for (name, node) in directory.nodes() {
+                let child_node = nar_node_directory
+                    .entry(name.as_ref())
+                    .map_err(RenderError::NARWriterError)?;
+
+                walk_node(segments, offset, get_directory, node.clone(), child_node)?;
+            }
+
+            // close the directory
+            nar_node_directory
+                .close()
+                .map_err(RenderError::NARWriterError)?;
+        }
+    }
+    Ok(())
+}
+
+impl<B: BlobService + 'static> Reader<B> {
+    /// Creates a new seekable NAR renderer for the given castore root node.
+    ///
+    /// This function pre-fetches the directory closure using `get_recursive()` and assembles the
+    /// NAR structure, except the file contents which are stored as 'holes' with references to a blob
+    /// of a specific BLAKE3 digest and known size. The AsyncRead implementation will then switch
+    /// between serving the precomputed literal segments, and the appropriate blob for the file
+    /// contents.
+    pub async fn new(
+        root_node: Node,
+        blob_service: B,
+        directory_service: impl DirectoryService,
+    ) -> Result<Self, RenderError> {
+        let maybe_directory_closure = match &root_node {
+            // If this is a directory, resolve all subdirectories
+            Node::Directory { digest, .. } => {
+                let mut closure = DirectoryGraph::with_order(
+                    RootToLeavesValidator::new_with_root_digest(digest.clone()),
+                );
+                let mut stream = directory_service.get_recursive(digest);
+                while let Some(dir) = stream
+                    .try_next()
+                    .await
+                    .map_err(|e| RenderError::StoreError(e.into()))?
+                {
+                    closure.add(dir).map_err(|e| {
+                        RenderError::StoreError(
+                            tvix_castore::Error::StorageError(e.to_string()).into(),
+                        )
+                    })?;
+                }
+                Some(closure.validate().map_err(|e| {
+                    RenderError::StoreError(tvix_castore::Error::StorageError(e.to_string()).into())
+                })?)
+            }
+            // If the top-level node is a file or a symlink, just pass it on
+            Node::File { .. } => None,
+            Node::Symlink { .. } => None,
+        };
+
+        Self::new_with_directory_closure(root_node, blob_service, maybe_directory_closure)
+    }
+
+    /// Creates a new seekable NAR renderer for the given castore root node.
+    /// This version of the instantiation does not perform any I/O and as such is not async.
+    /// However it requires all directories to be passed as a ValidatedDirectoryGraph.
+    ///
+    /// panics if the directory closure is not the closure of the root node
+    pub fn new_with_directory_closure(
+        root_node: Node,
+        blob_service: B,
+        directory_closure: Option<ValidatedDirectoryGraph>,
+    ) -> Result<Self, RenderError> {
+        let directories = directory_closure
+            .map(|directory_closure| {
+                let mut directories: Vec<(B3Digest, Directory)> = vec![];
+                for dir in directory_closure.drain_root_to_leaves() {
+                    let digest = dir.digest();
+                    let pos = directories
+                        .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| {
+                            digest.as_slice()
+                        })
+                        .expect_err("duplicate directory"); // DirectoryGraph checks this
+                    directories.insert(pos, (digest, dir));
+                }
+                directories
+            })
+            .unwrap_or_default();
+
+        let mut segments = vec![];
+        let mut cur_segment: Vec<u8> = vec![];
+        let mut offset = 0;
+
+        let nar_node = nar_writer::open(&mut cur_segment).map_err(RenderError::NARWriterError)?;
+
+        walk_node(
+            &mut segments,
+            &mut offset,
+            &|digest| {
+                directories
+                    .binary_search_by_key(&digest.as_slice(), |(digest, _dir)| digest.as_slice())
+                    .map(|pos| directories[pos].clone())
+                    .expect("missing directory") // DirectoryGraph checks this
+                    .1
+            },
+            root_node,
+            nar_node,
+        )?;
+        // Flush the final segment
+        flush_segment(&mut segments, &mut offset, std::mem::take(&mut cur_segment));
+
+        Ok(Reader {
+            segments,
+            position_bytes: 0,
+            position_index: 0,
+            blob_service: blob_service.into(),
+            seeking: false,
+            current_blob: TryMaybeDone::Gone,
+        })
+    }
+
+    pub fn stream_len(&self) -> u64 {
+        self.segments
+            .last()
+            .map(|&(off, ref data)| off + data.len())
+            .expect("no segment found")
+    }
+}
+
+impl<B: BlobService + 'static> tokio::io::AsyncSeek for Reader<B> {
+    fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
+        let stream_len = Reader::stream_len(&self);
+
+        let this = &mut *self;
+        if this.seeking {
+            return Err(io::Error::new(io::ErrorKind::Other, "Already seeking"));
+        }
+        this.seeking = true;
+
+        // TODO(edef): be sane about overflows
+        let pos = match pos {
+            io::SeekFrom::Start(n) => n,
+            io::SeekFrom::End(n) => (stream_len as i64 + n) as u64,
+            io::SeekFrom::Current(n) => (this.position_bytes as i64 + n) as u64,
+        };
+
+        let prev_position_bytes = this.position_bytes;
+        let prev_position_index = this.position_index;
+
+        this.position_bytes = min(pos, stream_len);
+        this.position_index = match this
+            .segments
+            .binary_search_by_key(&this.position_bytes, |&(off, _)| off)
+        {
+            Ok(idx) => idx,
+            Err(idx) => idx - 1,
+        };
+
+        let Some((offset, Data::Blob(BlobRef { digest, .. }))) =
+            this.segments.get(this.position_index)
+        else {
+            // If not seeking into a blob, we clear the active blob reader and then we're done
+            this.current_blob = TryMaybeDone::Gone;
+            return Ok(());
+        };
+        let offset_in_segment = this.position_bytes - offset;
+
+        if prev_position_bytes == this.position_bytes {
+            // position has not changed. do nothing
+        } else if prev_position_index == this.position_index {
+            // seeking within the same segment, re-use the blob reader
+            let mut prev = std::mem::replace(&mut this.current_blob, TryMaybeDone::Gone);
+            this.current_blob = futures::future::try_maybe_done(
+                (async move {
+                    let mut reader = Pin::new(&mut prev).take_output().unwrap();
+                    reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
+                    Ok(reader)
+                })
+                .boxed(),
+            );
+        } else {
+            // seek to a different segment
+            let blob_service = this.blob_service.clone();
+            let digest = digest.clone();
+            this.current_blob = futures::future::try_maybe_done(
+                (async move {
+                    let mut reader =
+                        blob_service
+                            .open_read(&digest)
+                            .await?
+                            .ok_or(io::Error::new(
+                                io::ErrorKind::NotFound,
+                                RenderError::BlobNotFound(digest.clone(), Default::default()),
+                            ))?;
+                    if offset_in_segment != 0 {
+                        reader.seek(io::SeekFrom::Start(offset_in_segment)).await?;
+                    }
+                    Ok(reader)
+                })
+                .boxed(),
+            );
+        };
+
+        Ok(())
+    }
+    fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<u64>> {
+        let this = &mut *self;
+
+        if !this.current_blob.is_terminated() {
+            futures::ready!(this.current_blob.poll_unpin(cx))?;
+        }
+        this.seeking = false;
+
+        Poll::Ready(Ok(this.position_bytes))
+    }
+}
+
+impl<B: BlobService + 'static> tokio::io::AsyncRead for Reader<B> {
+    fn poll_read(
+        mut self: Pin<&mut Self>,
+        cx: &mut Context,
+        buf: &mut tokio::io::ReadBuf,
+    ) -> Poll<io::Result<()>> {
+        let this = &mut *self;
+
+        let Some(&(offset, ref segment)) = this.segments.get(this.position_index) else {
+            return Poll::Ready(Ok(())); // EOF
+        };
+
+        let prev_read_buf_pos = buf.filled().len();
+        match segment {
+            Data::Literal(data) => {
+                let offset_in_segment = this.position_bytes - offset;
+                let offset_in_segment = usize::try_from(offset_in_segment).unwrap();
+                let remaining_data = data.len() - offset_in_segment;
+                let read_size = std::cmp::min(remaining_data, buf.remaining());
+                buf.put(&data[offset_in_segment..offset_in_segment + read_size]);
+            }
+            Data::Blob(BlobRef { size, .. }) => {
+                futures::ready!(this.current_blob.poll_unpin(cx))?;
+                this.seeking = false;
+                let blob = Pin::new(&mut this.current_blob)
+                    .output_mut()
+                    .expect("missing blob");
+                futures::ready!(Pin::new(blob).poll_read(cx, buf))?;
+                let read_length = buf.filled().len() - prev_read_buf_pos;
+                let maximum_expected_read_length = (offset + size) - this.position_bytes;
+                let is_eof = read_length == 0;
+                let too_much_returned = read_length as u64 > maximum_expected_read_length;
+                match (is_eof, too_much_returned) {
+                    (true, false) => {
+                        return Poll::Ready(Err(io::Error::new(
+                            io::ErrorKind::UnexpectedEof,
+                            "blob short read",
+                        )))
+                    }
+                    (false, true) => {
+                        buf.set_filled(prev_read_buf_pos);
+                        return Poll::Ready(Err(io::Error::new(
+                            io::ErrorKind::InvalidInput,
+                            "blob continued to yield data beyond end",
+                        )));
+                    }
+                    _ => {}
+                }
+            }
+        };
+        let new_read_buf_pos = buf.filled().len();
+        this.position_bytes += (new_read_buf_pos - prev_read_buf_pos) as u64;
+
+        let prev_position_index = this.position_index;
+        while {
+            if let Some(&(offset, ref segment)) = this.segments.get(this.position_index) {
+                (this.position_bytes - offset) >= segment.len()
+            } else {
+                false
+            }
+        } {
+            this.position_index += 1;
+        }
+        if prev_position_index != this.position_index {
+            let Some((_offset, Data::Blob(BlobRef { digest, .. }))) =
+                this.segments.get(this.position_index)
+            else {
+                // If the next segment is not a blob, we clear the active blob reader and then we're done
+                this.current_blob = TryMaybeDone::Gone;
+                return Poll::Ready(Ok(()));
+            };
+
+            // The next segment is a blob, open the BlobReader
+            let blob_service = this.blob_service.clone();
+            let digest = digest.clone();
+            this.current_blob = futures::future::try_maybe_done(
+                (async move {
+                    let reader = blob_service
+                        .open_read(&digest)
+                        .await?
+                        .ok_or(io::Error::new(
+                            io::ErrorKind::NotFound,
+                            RenderError::BlobNotFound(digest.clone(), Default::default()),
+                        ))?;
+                    Ok(reader)
+                })
+                .boxed(),
+            );
+        }
+
+        Poll::Ready(Ok(()))
+    }
+}
diff --git a/tvix/store/src/tests/mod.rs b/tvix/store/src/tests/mod.rs
index 1e7fc3f6b451..c7e6fee0cc6c 100644
--- a/tvix/store/src/tests/mod.rs
+++ b/tvix/store/src/tests/mod.rs
@@ -1,2 +1,3 @@
 pub mod fixtures;
 mod nar_renderer;
+mod nar_renderer_seekable;
diff --git a/tvix/store/src/tests/nar_renderer_seekable.rs b/tvix/store/src/tests/nar_renderer_seekable.rs
new file mode 100644
index 000000000000..233b95d0b036
--- /dev/null
+++ b/tvix/store/src/tests/nar_renderer_seekable.rs
@@ -0,0 +1,111 @@
+use crate::nar::seekable::Reader;
+use crate::tests::fixtures::blob_service_with_contents as blob_service;
+use crate::tests::fixtures::directory_service_with_contents as directory_service;
+use crate::tests::fixtures::*;
+use rstest::*;
+use rstest_reuse::*;
+use std::io;
+use std::pin::Pin;
+use std::sync::Arc;
+use tokio::io::{AsyncReadExt, AsyncSeek, AsyncSeekExt};
+use tvix_castore::blobservice::BlobService;
+use tvix_castore::directoryservice::DirectoryService;
+use tvix_castore::Node;
+
+#[apply(castore_fixtures_template)]
+#[tokio::test]
+async fn read_to_end(
+    #[future] blob_service: Arc<dyn BlobService>,
+    #[future] directory_service: Arc<dyn DirectoryService>,
+    #[case] test_input: &Node,
+    #[case] test_output: Result<Result<&Vec<u8>, io::ErrorKind>, crate::nar::RenderError>,
+) {
+    let reader_result = Reader::new(
+        test_input.clone(),
+        // don't put anything in the stores, as we don't actually do any requests.
+        blob_service.await,
+        directory_service.await,
+    )
+    .await;
+
+    match (reader_result, test_output) {
+        (Ok(_), Err(_)) => panic!("creating reader should have failed but succeeded"),
+        (Err(err), Ok(_)) => panic!("creating reader should have succeeded but failed: {}", err),
+        (Err(reader_err), Err(expected_err)) => {
+            assert_eq!(format!("{}", reader_err), format!("{}", expected_err));
+        }
+        (Ok(mut reader), Ok(expected_read_result)) => {
+            let mut buf: Vec<u8> = vec![];
+            let read_result = reader.read_to_end(&mut buf).await;
+
+            match (read_result, expected_read_result) {
+                (Ok(_), Err(_)) => panic!("read_to_end should have failed but succeeded"),
+                (Err(err), Ok(_)) => {
+                    panic!("read_to_end should have succeeded but failed: {}", err)
+                }
+                (Err(read_err), Err(expected_read_err)) => {
+                    assert_eq!(read_err.kind(), expected_read_err);
+                }
+                (Ok(_n), Ok(expected_read_result)) => {
+                    assert_eq!(buf, expected_read_result.to_vec());
+                }
+            }
+        }
+    }
+}
+
+#[rstest]
+#[tokio::test]
+/// Check that the Reader does not allow starting a seek while another seek is running
+/// If this is not prevented, it might lead to futures piling up on the heap
+async fn seek_twice(
+    #[future] blob_service: Arc<dyn BlobService>,
+    #[future] directory_service: Arc<dyn DirectoryService>,
+) {
+    let mut reader = Reader::new(
+        CASTORE_NODE_COMPLICATED.clone(),
+        // don't put anything in the stores, as we don't actually do any requests.
+        blob_service.await,
+        directory_service.await,
+    )
+    .await
+    .expect("must succeed");
+
+    Pin::new(&mut reader)
+        .start_seek(io::SeekFrom::Start(1))
+        .expect("must succeed");
+    let seek_err = Pin::new(&mut reader)
+        .start_seek(io::SeekFrom::Start(2))
+        .expect_err("must fail");
+
+    assert_eq!(seek_err.kind(), io::ErrorKind::Other);
+    assert_eq!(seek_err.to_string(), "Already seeking".to_string());
+}
+
+#[rstest]
+#[tokio::test]
+async fn seek(
+    #[future] blob_service: Arc<dyn BlobService>,
+    #[future] directory_service: Arc<dyn DirectoryService>,
+) {
+    let mut reader = Reader::new(
+        CASTORE_NODE_HELLOWORLD.clone(),
+        // don't put anything in the stores, as we don't actually do any requests.
+        blob_service.await,
+        directory_service.await,
+    )
+    .await
+    .expect("must succeed");
+
+    let mut buf = [0u8; 10];
+
+    for position in [
+        io::SeekFrom::Start(0x65), // Just before the file contents
+        io::SeekFrom::Start(0x68), // Seek back the file contents
+        io::SeekFrom::Start(0x70), // Just before the end of the file contents
+    ] {
+        let n = reader.seek(position).await.expect("seek") as usize;
+        reader.read_exact(&mut buf).await.expect("read_exact");
+        assert_eq!(NAR_CONTENTS_HELLOWORLD[n..n + 10], buf);
+    }
+}