diff options
Diffstat (limited to 'tvix')
61 files changed, 2873 insertions, 1204 deletions
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 51d47b05e3..715d08d345 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -1,4 +1,4 @@ -# This file was @generated by crate2nix 0.13.0 with the command: +# This file was @generated by crate2nix 0.14.0 with the command: # "generate" "--all-features" # See https://github.com/kolloch/crate2nix for more info. @@ -13771,7 +13771,7 @@ rec { "tonic-reflection" = [ "dep:tonic-reflection" ]; "virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ]; }; - resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "tonic-reflection" "virtiofs" ]; + resolvedDefaultFeatures = [ "cloud" "default" "fs" "fuse" "integration" "tonic-reflection" "virtiofs" ]; }; "tvix-cli" = rec { crateName = "tvix-cli"; @@ -14489,7 +14489,7 @@ rec { "tonic-reflection" = [ "dep:tonic-reflection" "tvix-castore/tonic-reflection" ]; "virtiofs" = [ "tvix-castore/virtiofs" ]; }; - resolvedDefaultFeatures = [ "cloud" "default" "fuse" "otlp" "tonic-reflection" "virtiofs" ]; + resolvedDefaultFeatures = [ "cloud" "default" "fuse" "integration" "otlp" "tonic-reflection" "virtiofs" ]; }; "typenum" = rec { crateName = "typenum"; @@ -17085,8 +17085,9 @@ rec { # because we compiled those test binaries in the former and not the latter. # So all paths will expect source tree to be there and not in the build top directly. # For example: $NIX_BUILD_TOP := /build in general, if you ask yourself. - # TODO(raitobezarius): I believe there could be more edge cases if `crate.sourceRoot` - # do exist but it's very hard to reason about them, so let's wait until the first bug report. + # NOTE: There could be edge cases if `crate.sourceRoot` does exist but + # it's very hard to reason about them. + # Open a bug if you run into this! mkdir -p source/ cd source/ diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index f54bb2ddb5..2797ef08f2 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -112,3 +112,7 @@ virtiofs = [ ] fuse = ["fs"] tonic-reflection = ["dep:tonic-reflection"] +# Whether to run the integration tests. +# Requires the following packages in $PATH: +# cbtemulator, google-cloud-bigtable-tool +integration = [] diff --git a/tvix/castore/default.nix b/tvix/castore/default.nix index edc20ac79d..641d883760 100644 --- a/tvix/castore/default.nix +++ b/tvix/castore/default.nix @@ -1,12 +1,23 @@ { depot, pkgs, ... }: -depot.tvix.crates.workspaceMembers.tvix-castore.build.override { +(depot.tvix.crates.workspaceMembers.tvix-castore.build.override { runTests = true; testPreRun = '' export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt; - export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}" ''; # enable some optional features. features = [ "default" "cloud" ]; -} +}).overrideAttrs (_: { + meta.ci.targets = [ "integration-tests" ]; + passthru.integration-tests = depot.tvix.crates.workspaceMembers.tvix-castore.build.override { + runTests = true; + testPreRun = '' + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt; + export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}" + ''; + + # enable some optional features. + features = [ "default" "cloud" "integration" ]; + }; +}) diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index bee2fb15ae..0fdb24628f 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -115,7 +115,7 @@ impl BigtableDirectoryService { .stdout(Stdio::piped()) .kill_on_drop(true) .spawn() - .expect("failed to spwan emulator"); + .expect("failed to spawn emulator"); Retry::spawn( ExponentialBackoff::from_millis(20) diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs index 31158d3a38..ae51df6376 100644 --- a/tvix/castore/src/directoryservice/from_addr.rs +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -144,7 +144,7 @@ mod tests { #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)] /// A valid example for Bigtable #[cfg_attr( - feature = "cloud", + all(feature = "cloud", feature = "integration"), case::bigtable_valid_url( "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1", true @@ -152,7 +152,7 @@ mod tests { )] /// A valid example for Bigtable, specifying a custom channel size and timeout #[cfg_attr( - feature = "cloud", + all(feature = "cloud", feature = "integration"), case::bigtable_valid_url( "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1&channel_size=10&timeout=10", true @@ -160,7 +160,7 @@ mod tests { )] /// A invalid Bigtable example (missing fields) #[cfg_attr( - feature = "cloud", + all(feature = "cloud", feature = "integration"), case::bigtable_invalid_url("bigtable://instance-1", false) )] #[tokio::test] diff --git a/tvix/castore/src/directoryservice/tests/mod.rs b/tvix/castore/src/directoryservice/tests/mod.rs index 50c8a5c6d3..1b40d9feb0 100644 --- a/tvix/castore/src/directoryservice/tests/mod.rs +++ b/tvix/castore/src/directoryservice/tests/mod.rs @@ -26,7 +26,7 @@ use self::utils::make_grpc_directory_service_client; #[case::grpc(make_grpc_directory_service_client().await)] #[case::memory(directoryservice::from_addr("memory://").await.unwrap())] #[case::sled(directoryservice::from_addr("sled://").await.unwrap())] -#[cfg_attr(feature = "cloud", case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))] +#[cfg_attr(all(feature = "cloud", feature = "integration"), case::bigtable(directoryservice::from_addr("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await.unwrap()))] pub fn directory_services(#[case] directory_service: impl DirectoryService) {} /// Ensures asking for a directory that doesn't exist returns a Ok(None). diff --git a/tvix/castore/src/directoryservice/traverse.rs b/tvix/castore/src/directoryservice/traverse.rs index 573581edbd..8a668c868c 100644 --- a/tvix/castore/src/directoryservice/traverse.rs +++ b/tvix/castore/src/directoryservice/traverse.rs @@ -1,30 +1,20 @@ use super::DirectoryService; -use crate::{proto::NamedNode, B3Digest, Error}; -use std::os::unix::ffi::OsStrExt; +use crate::{proto::NamedNode, B3Digest, Error, Path}; use tracing::{instrument, warn}; /// This descends from a (root) node to the given (sub)path, returning the Node /// at that path, or none, if there's nothing at that path. -#[instrument(skip(directory_service))] +#[instrument(skip(directory_service, path), fields(%path))] pub async fn descend_to<DS>( directory_service: DS, root_node: crate::proto::node::Node, - path: &std::path::Path, + path: impl AsRef<Path> + std::fmt::Display, ) -> Result<Option<crate::proto::node::Node>, Error> where DS: AsRef<dyn DirectoryService>, { - // strip a possible `/` prefix from the path. - let path = { - if path.starts_with("/") { - path.strip_prefix("/").unwrap() - } else { - path - } - }; - let mut cur_node = root_node; - let mut it = path.components(); + let mut it = path.as_ref().components(); loop { match it.next() { @@ -59,9 +49,8 @@ where // look for first_component in the [Directory]. // FUTUREWORK: as the nodes() iterator returns in a sorted fashion, we // could stop as soon as e.name is larger than the search string. - let child_node = directory.nodes().find(|n| { - n.get_name() == first_component.as_os_str().as_bytes() - }); + let child_node = + directory.nodes().find(|n| n.get_name() == first_component); match child_node { // child node not found means there's no such element inside the directory. @@ -85,11 +74,10 @@ where #[cfg(test)] mod tests { - use std::path::PathBuf; - use crate::{ directoryservice, fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, + PathBuf, }; use super::descend_to; @@ -132,7 +120,7 @@ mod tests { let resp = descend_to( &directory_service, node_directory_complicated.clone(), - &PathBuf::from(""), + "".parse::<PathBuf>().unwrap(), ) .await .expect("must succeed"); @@ -145,7 +133,7 @@ mod tests { let resp = descend_to( &directory_service, node_directory_complicated.clone(), - &PathBuf::from("keep"), + "keep".parse::<PathBuf>().unwrap(), ) .await .expect("must succeed"); @@ -158,7 +146,7 @@ mod tests { let resp = descend_to( &directory_service, node_directory_complicated.clone(), - &PathBuf::from("keep/.keep"), + "keep/.keep".parse::<PathBuf>().unwrap(), ) .await .expect("must succeed"); @@ -166,25 +154,12 @@ mod tests { assert_eq!(Some(node_file_keep.clone()), resp); } - // traversal to `keep/.keep` should return the node for the .keep file - { - let resp = descend_to( - &directory_service, - node_directory_complicated.clone(), - &PathBuf::from("/keep/.keep"), - ) - .await - .expect("must succeed"); - - assert_eq!(Some(node_file_keep), resp); - } - // traversal to `void` should return None (doesn't exist) { let resp = descend_to( &directory_service, node_directory_complicated.clone(), - &PathBuf::from("void"), + "void".parse::<PathBuf>().unwrap(), ) .await .expect("must succeed"); @@ -192,12 +167,12 @@ mod tests { assert_eq!(None, resp); } - // traversal to `void` should return None (doesn't exist) + // traversal to `v/oid` should return None (doesn't exist) { let resp = descend_to( &directory_service, node_directory_complicated.clone(), - &PathBuf::from("//v/oid"), + "v/oid".parse::<PathBuf>().unwrap(), ) .await .expect("must succeed"); @@ -211,25 +186,12 @@ mod tests { let resp = descend_to( &directory_service, node_directory_complicated.clone(), - &PathBuf::from("keep/.keep/foo"), + "keep/.keep/foo".parse::<PathBuf>().unwrap(), ) .await .expect("must succeed"); assert_eq!(None, resp); } - - // traversal to a subpath of '/' should return the root node. - { - let resp = descend_to( - &directory_service, - node_directory_complicated.clone(), - &PathBuf::from("/"), - ) - .await - .expect("must succeed"); - - assert_eq!(Some(node_directory_complicated), resp); - } } } diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs index c22bd4b2eb..bdd4595434 100644 --- a/tvix/castore/src/fs/inodes.rs +++ b/tvix/castore/src/fs/inodes.rs @@ -57,16 +57,18 @@ impl InodeData { children.len() as u64 } }, - mode: match self { - InodeData::Regular(_, _, false) => libc::S_IFREG | 0o444, // no-executable files - InodeData::Regular(_, _, true) => libc::S_IFREG | 0o555, // executable files - InodeData::Symlink(_) => libc::S_IFLNK | 0o444, - InodeData::Directory(_) => libc::S_IFDIR | 0o555, - }, + mode: self.as_fuse_type() | self.mode(), ..Default::default() } } + fn mode(&self) -> u32 { + match self { + InodeData::Regular(_, _, false) | InodeData::Symlink(_) => 0o444, + InodeData::Regular(_, _, true) | InodeData::Directory(_) => 0o555, + } + } + pub fn as_fuse_entry(&self, inode: u64) -> fuse_backend_rs::api::filesystem::Entry { fuse_backend_rs::api::filesystem::Entry { inode, diff --git a/tvix/castore/src/fs/virtiofs.rs b/tvix/castore/src/fs/virtiofs.rs index 846270d285..d63e2f2bdd 100644 --- a/tvix/castore/src/fs/virtiofs.rs +++ b/tvix/castore/src/fs/virtiofs.rs @@ -34,6 +34,7 @@ enum Error { /// Invalid descriptor chain. InvalidDescriptorChain, /// Failed to handle filesystem requests. + #[allow(dead_code)] HandleRequests(fuse_backend_rs::Error), /// Failed to construct new vhost user daemon. NewDaemon, diff --git a/tvix/castore/src/import/archive.rs b/tvix/castore/src/import/archive.rs index adcfb871d5..0d21481d40 100644 --- a/tvix/castore/src/import/archive.rs +++ b/tvix/castore/src/import/archive.rs @@ -1,6 +1,8 @@ +//! Imports from an archive (tarballs) + +use std::collections::HashMap; use std::io::{Cursor, Write}; use std::sync::Arc; -use std::{collections::HashMap, path::PathBuf}; use petgraph::graph::{DiGraph, NodeIndex}; use petgraph::visit::{DfsPostOrder, EdgeRef}; @@ -15,10 +17,12 @@ use tracing::{instrument, warn, Level}; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; -use crate::import::{ingest_entries, Error as ImportError, IngestionEntry}; +use crate::import::{ingest_entries, IngestionEntry, IngestionError}; use crate::proto::node::Node; use crate::B3Digest; +type TarPathBuf = std::path::PathBuf; + /// Files smaller than this threshold, in bytes, are uploaded to the [BlobService] in the /// background. /// @@ -32,20 +36,42 @@ const MAX_TARBALL_BUFFER_SIZE: usize = 128 * 1024 * 1024; #[derive(Debug, thiserror::Error)] pub enum Error { - #[error("error reading archive entry: {0}")] - Io(#[from] std::io::Error), + #[error("unable to construct stream of entries: {0}")] + Entries(std::io::Error), + + #[error("unable to read next entry: {0}")] + NextEntry(std::io::Error), + + #[error("unable to read path for entry: {0}")] + PathRead(std::io::Error), + + #[error("unable to convert path {0} for entry: {1}")] + PathConvert(TarPathBuf, std::io::Error), + + #[error("unable to read size field for {0}: {1}")] + Size(TarPathBuf, std::io::Error), + + #[error("unable to read mode field for {0}: {1}")] + Mode(TarPathBuf, std::io::Error), + + #[error("unable to read link name field for {0}: {1}")] + LinkName(TarPathBuf, std::io::Error), + + #[error("unable to read blob contents for {0}: {1}")] + BlobRead(TarPathBuf, std::io::Error), + + // FUTUREWORK: proper error for blob finalize + #[error("unable to finalize blob {0}: {1}")] + BlobFinalize(TarPathBuf, std::io::Error), #[error("unsupported tar entry {0} type: {1:?}")] - UnsupportedTarEntry(PathBuf, tokio_tar::EntryType), + EntryType(TarPathBuf, tokio_tar::EntryType), #[error("symlink missing target {0}")] - MissingSymlinkTarget(PathBuf), + MissingSymlinkTarget(TarPathBuf), #[error("unexpected number of top level directory entries")] UnexpectedNumberOfTopLevelEntries, - - #[error("failed to import into castore {0}")] - Import(#[from] ImportError), } /// Ingests elements from the given tar [`Archive`] into a the passed [`BlobService`] and @@ -55,10 +81,10 @@ pub async fn ingest_archive<BS, DS, R>( blob_service: BS, directory_service: DS, mut archive: Archive<R>, -) -> Result<Node, Error> +) -> Result<Node, IngestionError<Error>> where BS: BlobService + Clone + 'static, - DS: AsRef<dyn DirectoryService>, + DS: DirectoryService, R: AsyncRead + Unpin, { // Since tarballs can have entries in any arbitrary order, we need to @@ -71,16 +97,22 @@ where let semaphore = Arc::new(Semaphore::new(MAX_TARBALL_BUFFER_SIZE)); let mut async_blob_uploads: JoinSet<Result<(), Error>> = JoinSet::new(); - let mut entries_iter = archive.entries()?; - while let Some(mut entry) = entries_iter.try_next().await? { - let path: PathBuf = entry.path()?.into(); + let mut entries_iter = archive.entries().map_err(Error::Entries)?; + while let Some(mut entry) = entries_iter.try_next().await.map_err(Error::NextEntry)? { + let tar_path: TarPathBuf = entry.path().map_err(Error::PathRead)?.into(); + + // construct a castore PathBuf, which we use in the produced IngestionEntry. + let path = crate::path::PathBuf::from_host_path(tar_path.as_path(), true) + .map_err(|e| Error::PathConvert(tar_path.clone(), e))?; let header = entry.header(); let entry = match header.entry_type() { tokio_tar::EntryType::Regular | tokio_tar::EntryType::GNUSparse | tokio_tar::EntryType::Continuous => { - let header_size = header.size()?; + let header_size = header + .size() + .map_err(|e| Error::Size(tar_path.clone(), e))?; // If the blob is small enough, read it off the wire, compute the digest, // and upload it to the [BlobService] in the background. @@ -101,7 +133,9 @@ where .acquire_many_owned(header_size as u32) .await .unwrap(); - let size = tokio::io::copy(&mut reader, &mut buffer).await?; + let size = tokio::io::copy(&mut reader, &mut buffer) + .await + .map_err(|e| Error::Size(tar_path.clone(), e))?; let digest: B3Digest = hasher.finalize().as_bytes().into(); @@ -109,12 +143,18 @@ where let blob_service = blob_service.clone(); let digest = digest.clone(); async_blob_uploads.spawn({ + let tar_path = tar_path.clone(); async move { let mut writer = blob_service.open_write().await; - tokio::io::copy(&mut Cursor::new(buffer), &mut writer).await?; + tokio::io::copy(&mut Cursor::new(buffer), &mut writer) + .await + .map_err(|e| Error::BlobRead(tar_path.clone(), e))?; - let blob_digest = writer.close().await?; + let blob_digest = writer + .close() + .await + .map_err(|e| Error::BlobFinalize(tar_path, e))?; assert_eq!(digest, blob_digest, "Tvix bug: blob digest mismatch"); @@ -130,35 +170,50 @@ where } else { let mut writer = blob_service.open_write().await; - let size = tokio::io::copy(&mut entry, &mut writer).await?; + let size = tokio::io::copy(&mut entry, &mut writer) + .await + .map_err(|e| Error::BlobRead(tar_path.clone(), e))?; - let digest = writer.close().await?; + let digest = writer + .close() + .await + .map_err(|e| Error::BlobFinalize(tar_path.clone(), e))?; (size, digest) }; + let executable = entry + .header() + .mode() + .map_err(|e| Error::Mode(tar_path, e))? + & 64 + != 0; + IngestionEntry::Regular { path, size, - executable: entry.header().mode()? & 64 != 0, + executable, digest, } } tokio_tar::EntryType::Symlink => IngestionEntry::Symlink { target: entry - .link_name()? - .ok_or_else(|| Error::MissingSymlinkTarget(path.clone()))? - .into(), + .link_name() + .map_err(|e| Error::LinkName(tar_path.clone(), e))? + .ok_or_else(|| Error::MissingSymlinkTarget(tar_path.clone()))? + .into_owned() + .into_os_string() + .into_encoded_bytes(), path, }, // Push a bogus directory marker so we can make sure this directoy gets // created. We don't know the digest and size until after reading the full // tarball. - tokio_tar::EntryType::Directory => IngestionEntry::Dir { path: path.clone() }, + tokio_tar::EntryType::Directory => IngestionEntry::Dir { path }, tokio_tar::EntryType::XGlobalHeader | tokio_tar::EntryType::XHeader => continue, - entry_type => return Err(Error::UnsupportedTarEntry(path, entry_type)), + entry_type => return Err(Error::EntryType(tar_path, entry_type).into()), }; nodes.add(entry)?; @@ -193,7 +248,7 @@ where /// An error is returned if this is not the case and ingestion will fail. struct IngestionEntryGraph { graph: DiGraph<IngestionEntry, ()>, - path_to_index: HashMap<PathBuf, NodeIndex>, + path_to_index: HashMap<crate::path::PathBuf, NodeIndex>, root_node: Option<NodeIndex>, } @@ -218,7 +273,7 @@ impl IngestionEntryGraph { /// and new nodes are not directories, the node is replaced and is disconnected from its /// children. pub fn add(&mut self, entry: IngestionEntry) -> Result<NodeIndex, Error> { - let path = entry.path().to_path_buf(); + let path = entry.path().to_owned(); let index = match self.path_to_index.get(entry.path()) { Some(&index) => { @@ -238,7 +293,7 @@ impl IngestionEntryGraph { // We expect archives to contain a single root node, if there is another root node // entry with a different path name, this is unsupported. if let Some(root_node) = self.root_node { - if self.get_node(root_node).path() != path { + if self.get_node(root_node).path() != &path { return Err(Error::UnexpectedNumberOfTopLevelEntries); } } @@ -247,7 +302,7 @@ impl IngestionEntryGraph { } else if let Some(parent_path) = path.parent() { // Recursively add the parent node until it hits the root node. let parent_index = self.add(IngestionEntry::Dir { - path: parent_path.to_path_buf(), + path: parent_path.to_owned(), })?; // Insert an edge from the parent directory to the child entry. @@ -332,23 +387,29 @@ mod test { lazy_static! { pub static ref EMPTY_DIGEST: B3Digest = blake3::hash(&[]).as_bytes().into(); - pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { path: "a".into() }; - pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { path: "b".into() }; - pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { path: "a/b".into() }; + pub static ref DIR_A: IngestionEntry = IngestionEntry::Dir { + path: "a".parse().unwrap() + }; + pub static ref DIR_B: IngestionEntry = IngestionEntry::Dir { + path: "b".parse().unwrap() + }; + pub static ref DIR_A_B: IngestionEntry = IngestionEntry::Dir { + path: "a/b".parse().unwrap() + }; pub static ref FILE_A: IngestionEntry = IngestionEntry::Regular { - path: "a".into(), + path: "a".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), }; pub static ref FILE_A_B: IngestionEntry = IngestionEntry::Regular { - path: "a/b".into(), + path: "a/b".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), }; pub static ref FILE_A_B_C: IngestionEntry = IngestionEntry::Regular { - path: "a/b/c".into(), + path: "a/b/c".parse().unwrap(), size: 0, executable: false, digest: EMPTY_DIGEST.clone(), diff --git a/tvix/castore/src/import/error.rs b/tvix/castore/src/import/error.rs index 15dd0664de..e3fba617e0 100644 --- a/tvix/castore/src/import/error.rs +++ b/tvix/castore/src/import/error.rs @@ -1,39 +1,20 @@ -use std::{fs::FileType, path::PathBuf}; +use super::PathBuf; use crate::Error as CastoreError; +/// Represents all error types that emitted by ingest_entries. +/// It can represent errors uploading individual Directories and finalizing +/// the upload. +/// It also contains a generic error kind that'll carry ingestion-method +/// specific errors. #[derive(Debug, thiserror::Error)] -pub enum Error { +pub enum IngestionError<E: std::fmt::Display> { + #[error("error from producer: {0}")] + Producer(#[from] E), + #[error("failed to upload directory at {0}: {1}")] UploadDirectoryError(PathBuf, CastoreError), - #[error("invalid encoding encountered for entry {0:?}")] - InvalidEncoding(PathBuf), - - #[error("unable to stat {0}: {1}")] - UnableToStat(PathBuf, std::io::Error), - - #[error("unable to open {0}: {1}")] - UnableToOpen(PathBuf, std::io::Error), - - #[error("unable to read {0}: {1}")] - UnableToRead(PathBuf, std::io::Error), - - #[error("unsupported file {0} type: {1:?}")] - UnsupportedFileType(PathBuf, FileType), -} - -impl From<CastoreError> for Error { - fn from(value: CastoreError) -> Self { - match value { - CastoreError::InvalidRequest(_) => panic!("tvix bug"), - CastoreError::StorageError(_) => panic!("error"), - } - } -} - -impl From<Error> for std::io::Error { - fn from(value: Error) -> Self { - std::io::Error::new(std::io::ErrorKind::Other, value) - } + #[error("failed to finalize directory upload: {0}")] + FinalizeDirectoryUpload(CastoreError), } diff --git a/tvix/castore/src/import/fs.rs b/tvix/castore/src/import/fs.rs index 6709d4a127..9d3ecfe6ab 100644 --- a/tvix/castore/src/import/fs.rs +++ b/tvix/castore/src/import/fs.rs @@ -1,8 +1,11 @@ +//! Import from a real filesystem. + use futures::stream::BoxStream; use futures::StreamExt; +use std::fs::FileType; +use std::os::unix::ffi::OsStringExt; use std::os::unix::fs::MetadataExt; use std::os::unix::fs::PermissionsExt; -use std::path::Path; use tracing::instrument; use walkdir::DirEntry; use walkdir::WalkDir; @@ -10,13 +13,11 @@ use walkdir::WalkDir; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::proto::node::Node; +use crate::B3Digest; use super::ingest_entries; -use super::upload_blob_at_path; -use super::Error; use super::IngestionEntry; - -///! Imports that deal with a real filesystem. +use super::IngestionError; /// Ingests the contents at a given path into the tvix store, interacting with a [BlobService] and /// [DirectoryService]. It returns the root node or an error. @@ -30,11 +31,11 @@ pub async fn ingest_path<BS, DS, P>( blob_service: BS, directory_service: DS, path: P, -) -> Result<Node, Error> +) -> Result<Node, IngestionError<Error>> where - P: AsRef<Path> + std::fmt::Debug, + P: AsRef<std::path::Path> + std::fmt::Debug, BS: BlobService + Clone, - DS: AsRef<dyn DirectoryService>, + DS: DirectoryService, { let iter = WalkDir::new(path.as_ref()) .follow_links(false) @@ -55,13 +56,13 @@ where pub fn dir_entries_to_ingestion_stream<'a, BS, I>( blob_service: BS, iter: I, - root: &'a Path, + root: &'a std::path::Path, ) -> BoxStream<'a, Result<IngestionEntry, Error>> where BS: BlobService + Clone + 'a, I: Iterator<Item = Result<DirEntry, walkdir::Error>> + Send + 'a, { - let prefix = root.parent().unwrap_or_else(|| Path::new("")); + let prefix = root.parent().unwrap_or_else(|| std::path::Path::new("")); Box::pin( futures::stream::iter(iter) @@ -72,7 +73,7 @@ where Ok(dir_entry) => { dir_entry_to_ingestion_entry(blob_service, &dir_entry, prefix).await } - Err(e) => Err(Error::UnableToStat( + Err(e) => Err(Error::Stat( prefix.to_path_buf(), e.into_io_error().expect("walkdir err must be some"), )), @@ -91,32 +92,37 @@ where pub async fn dir_entry_to_ingestion_entry<BS>( blob_service: BS, entry: &DirEntry, - prefix: &Path, + prefix: &std::path::Path, ) -> Result<IngestionEntry, Error> where BS: BlobService, { let file_type = entry.file_type(); - let path = entry + let fs_path = entry .path() .strip_prefix(prefix) - .expect("Tvix bug: failed to strip root path prefix") - .to_path_buf(); + .expect("Tvix bug: failed to strip root path prefix"); + + // convert to castore PathBuf + let path = crate::path::PathBuf::from_host_path(fs_path, false) + .unwrap_or_else(|e| panic!("Tvix bug: walkdir direntry cannot be parsed: {}", e)); if file_type.is_dir() { Ok(IngestionEntry::Dir { path }) } else if file_type.is_symlink() { let target = std::fs::read_link(entry.path()) - .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e))?; + .map_err(|e| Error::Stat(entry.path().to_path_buf(), e))? + .into_os_string() + .into_vec(); Ok(IngestionEntry::Symlink { path, target }) } else if file_type.is_file() { let metadata = entry .metadata() - .map_err(|e| Error::UnableToStat(entry.path().to_path_buf(), e.into()))?; + .map_err(|e| Error::Stat(entry.path().to_path_buf(), e.into()))?; - let digest = upload_blob_at_path(blob_service, entry.path().to_path_buf()).await?; + let digest = upload_blob(blob_service, entry.path().to_path_buf()).await?; Ok(IngestionEntry::Regular { path, @@ -127,6 +133,53 @@ where digest, }) } else { - Ok(IngestionEntry::Unknown { path, file_type }) + return Err(Error::FileType(fs_path.to_path_buf(), file_type)); } } + +/// Uploads the file at the provided [Path] the the [BlobService]. +#[instrument(skip(blob_service), fields(path), err)] +async fn upload_blob<BS>( + blob_service: BS, + path: impl AsRef<std::path::Path>, +) -> Result<B3Digest, Error> +where + BS: BlobService, +{ + let mut file = match tokio::fs::File::open(path.as_ref()).await { + Ok(file) => file, + Err(e) => return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)), + }; + + let mut writer = blob_service.open_write().await; + + if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { + return Err(Error::BlobRead(path.as_ref().to_path_buf(), e)); + }; + + let digest = writer + .close() + .await + .map_err(|e| Error::BlobFinalize(path.as_ref().to_path_buf(), e))?; + + Ok(digest) +} + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("unsupported file type at {0}: {1:?}")] + FileType(std::path::PathBuf, FileType), + + #[error("unable to stat {0}: {1}")] + Stat(std::path::PathBuf, std::io::Error), + + #[error("unable to open {0}: {1}")] + Open(std::path::PathBuf, std::io::Error), + + #[error("unable to read {0}: {1}")] + BlobRead(std::path::PathBuf, std::io::Error), + + // TODO: proper error for blob finalize + #[error("unable to finalize blob {0}: {1}")] + BlobFinalize(std::path::PathBuf, std::io::Error), +} diff --git a/tvix/castore/src/import/mod.rs b/tvix/castore/src/import/mod.rs index e9fdc750f8..7b42644a27 100644 --- a/tvix/castore/src/import/mod.rs +++ b/tvix/castore/src/import/mod.rs @@ -4,9 +4,9 @@ //! Specific implementations, such as ingesting from the filesystem, live in //! child modules. -use crate::blobservice::BlobService; use crate::directoryservice::DirectoryPutter; use crate::directoryservice::DirectoryService; +use crate::path::PathBuf; use crate::proto::node::Node; use crate::proto::Directory; use crate::proto::DirectoryNode; @@ -14,21 +14,14 @@ use crate::proto::FileNode; use crate::proto::SymlinkNode; use crate::B3Digest; use futures::{Stream, StreamExt}; -use std::fs::FileType; use tracing::Level; -#[cfg(target_family = "unix")] -use std::os::unix::ffi::OsStrExt; - -use std::{ - collections::HashMap, - path::{Path, PathBuf}, -}; +use std::collections::HashMap; use tracing::instrument; mod error; -pub use error::Error; +pub use error::IngestionError; pub mod archive; pub mod fs; @@ -51,10 +44,14 @@ pub mod fs; /// /// On success, returns the root node. #[instrument(skip_all, ret(level = Level::TRACE), err)] -pub async fn ingest_entries<DS, S>(directory_service: DS, mut entries: S) -> Result<Node, Error> +pub async fn ingest_entries<DS, S, E>( + directory_service: DS, + mut entries: S, +) -> Result<Node, IngestionError<E>> where - DS: AsRef<dyn DirectoryService>, - S: Stream<Item = Result<IngestionEntry, Error>> + Send + std::marker::Unpin, + DS: DirectoryService, + S: Stream<Item = Result<IngestionEntry, E>> + Send + std::marker::Unpin, + E: std::error::Error, { // For a given path, this holds the [Directory] structs as they are populated. let mut directories: HashMap<PathBuf, Directory> = HashMap::default(); @@ -68,20 +65,11 @@ where // we break the loop manually. .expect("Tvix bug: unexpected end of stream")?; - debug_assert!( - entry - .path() - .components() - .all(|x| matches!(x, std::path::Component::Normal(_))), - "path may only contain normal components" - ); - let name = entry .path() .file_name() // If this is the root node, it will have an empty name. .unwrap_or_default() - .as_bytes() .to_owned() .into(); @@ -102,9 +90,12 @@ where // If we don't have one yet (as that's the first one to upload), // initialize the putter. maybe_directory_putter - .get_or_insert_with(|| directory_service.as_ref().put_multiple_start()) + .get_or_insert_with(|| directory_service.put_multiple_start()) .put(directory) - .await?; + .await + .map_err(|e| { + IngestionError::UploadDirectoryError(entry.path().to_owned(), e) + })?; Node::Directory(DirectoryNode { name, @@ -114,7 +105,7 @@ where } IngestionEntry::Symlink { ref target, .. } => Node::Symlink(SymlinkNode { name, - target: target.as_os_str().as_bytes().to_owned().into(), + target: target.to_owned().into(), }), IngestionEntry::Regular { size, @@ -127,9 +118,6 @@ where size: *size, executable: *executable, }), - IngestionEntry::Unknown { path, file_type } => { - return Err(Error::UnsupportedFileType(path.clone(), *file_type)); - } }; if entry.path().components().count() == 1 { @@ -138,7 +126,7 @@ where // record node in parent directory, creating a new [Directory] if not there yet. directories - .entry(entry.path().parent().unwrap().to_path_buf()) + .entry(entry.path().parent().unwrap().to_owned()) .or_default() .add(node); }; @@ -152,7 +140,10 @@ where // they're all persisted to the backend. if let Some(mut directory_putter) = maybe_directory_putter { #[cfg_attr(not(debug_assertions), allow(unused))] - let root_directory_digest = directory_putter.close().await?; + let root_directory_digest = directory_putter + .close() + .await + .map_err(|e| IngestionError::FinalizeDirectoryUpload(e))?; #[cfg(debug_assertions)] { @@ -174,31 +165,6 @@ where Ok(root_node) } -/// Uploads the file at the provided [Path] the the [BlobService]. -#[instrument(skip(blob_service), fields(path), err)] -async fn upload_blob_at_path<BS>(blob_service: BS, path: PathBuf) -> Result<B3Digest, Error> -where - BS: BlobService, -{ - let mut file = match tokio::fs::File::open(&path).await { - Ok(file) => file, - Err(e) => return Err(Error::UnableToRead(path, e)), - }; - - let mut writer = blob_service.open_write().await; - - if let Err(e) = tokio::io::copy(&mut file, &mut writer).await { - return Err(Error::UnableToRead(path, e)); - }; - - let digest = writer - .close() - .await - .map_err(|e| Error::UnableToRead(path, e))?; - - Ok(digest) -} - #[derive(Debug, Clone, Eq, PartialEq)] pub enum IngestionEntry { Regular { @@ -209,24 +175,19 @@ pub enum IngestionEntry { }, Symlink { path: PathBuf, - target: PathBuf, + target: Vec<u8>, }, Dir { path: PathBuf, }, - Unknown { - path: PathBuf, - file_type: FileType, - }, } impl IngestionEntry { - fn path(&self) -> &Path { + fn path(&self) -> &PathBuf { match self { IngestionEntry::Regular { path, .. } => path, IngestionEntry::Symlink { path, .. } => path, IngestionEntry::Dir { path } => path, - IngestionEntry::Unknown { path, .. } => path, } } diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index 1a7ac6b4b4..bdc533a8c5 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -9,6 +9,9 @@ pub mod fixtures; #[cfg(feature = "fs")] pub mod fs; +mod path; +pub use path::{Path, PathBuf}; + pub mod import; pub mod proto; pub mod tonic; diff --git a/tvix/castore/src/path.rs b/tvix/castore/src/path.rs new file mode 100644 index 0000000000..fcc2bd01fb --- /dev/null +++ b/tvix/castore/src/path.rs @@ -0,0 +1,446 @@ +//! Contains data structures to deal with Paths in the tvix-castore model. + +use std::{ + borrow::Borrow, + fmt::{self, Debug, Display}, + mem, + ops::Deref, + str::FromStr, +}; + +use bstr::ByteSlice; + +use crate::proto::validate_node_name; + +/// Represents a Path in the castore model. +/// These are always relative, and platform-independent, which distinguishes +/// them from the ones provided in the standard library. +#[derive(Eq, Hash, PartialEq)] +#[repr(transparent)] // SAFETY: Representation has to match [u8] +pub struct Path { + // As node names in the castore model cannot contain slashes, + // we use them as component separators here. + inner: [u8], +} + +#[allow(dead_code)] +impl Path { + // SAFETY: The empty path is valid. + pub const ROOT: &'static Path = unsafe { Path::from_bytes_unchecked(&[]) }; + + /// Convert a byte slice to a path, without checking validity. + const unsafe fn from_bytes_unchecked(bytes: &[u8]) -> &Path { + // SAFETY: &[u8] and &Path have the same representation. + unsafe { mem::transmute(bytes) } + } + + fn from_bytes(bytes: &[u8]) -> Option<&Path> { + if !bytes.is_empty() { + // Ensure all components are valid castore node names. + for component in bytes.split_str(b"/") { + validate_node_name(component).ok()?; + } + } + + // SAFETY: We have verified that the path contains no empty components. + Some(unsafe { Path::from_bytes_unchecked(bytes) }) + } + + pub fn into_boxed_bytes(self: Box<Path>) -> Box<[u8]> { + // SAFETY: Box<Path> and Box<[u8]> have the same representation. + unsafe { mem::transmute(self) } + } + + /// Returns the path without its final component, if there is one. + /// + /// Note that the parent of a bare file name is [Path::ROOT]. + /// [Path::ROOT] is the only path without a parent. + pub fn parent(&self) -> Option<&Path> { + // The root does not have a parent. + if self.inner.is_empty() { + return None; + } + + Some( + if let Some((parent, _file_name)) = self.inner.rsplit_once_str(b"/") { + // SAFETY: The parent of a valid Path is a valid Path. + unsafe { Path::from_bytes_unchecked(parent) } + } else { + // The parent of a bare file name is the root. + Path::ROOT + }, + ) + } + + /// Creates a PathBuf with `name` adjoined to self. + pub fn try_join(&self, name: &[u8]) -> Result<PathBuf, std::io::Error> { + let mut v = PathBuf::with_capacity(self.inner.len() + name.len() + 1); + v.inner.extend_from_slice(&self.inner); + v.try_push(name)?; + + Ok(v) + } + + /// Produces an iterator over the components of the path, which are + /// individual byte slices. + /// In case the path is empty, an empty iterator is returned. + pub fn components(&self) -> impl Iterator<Item = &[u8]> { + let mut iter = self.inner.split_str(&b"/"); + + // We don't want to return an empty element, consume it if it's the only one. + if self.inner.is_empty() { + let _ = iter.next(); + } + + iter + } + + /// Returns the final component of the Path, if there is one. + pub fn file_name(&self) -> Option<&[u8]> { + self.components().last() + } + + pub fn as_bytes(&self) -> &[u8] { + &self.inner + } +} + +impl Debug for Path { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Debug::fmt(self.inner.as_bstr(), f) + } +} + +impl Display for Path { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Display::fmt(self.inner.as_bstr(), f) + } +} + +impl AsRef<Path> for Path { + fn as_ref(&self) -> &Path { + self + } +} + +/// Represents a owned PathBuf in the castore model. +/// These are always relative, and platform-independent, which distinguishes +/// them from the ones provided in the standard library. +#[derive(Clone, Default, Eq, Hash, PartialEq)] +pub struct PathBuf { + inner: Vec<u8>, +} + +impl Deref for PathBuf { + type Target = Path; + + fn deref(&self) -> &Self::Target { + // SAFETY: PathBuf always contains a valid Path. + unsafe { Path::from_bytes_unchecked(&self.inner) } + } +} + +impl AsRef<Path> for PathBuf { + fn as_ref(&self) -> &Path { + self + } +} + +impl ToOwned for Path { + type Owned = PathBuf; + + fn to_owned(&self) -> Self::Owned { + PathBuf { + inner: self.inner.to_owned(), + } + } +} + +impl Borrow<Path> for PathBuf { + fn borrow(&self) -> &Path { + self + } +} + +impl From<Box<Path>> for PathBuf { + fn from(value: Box<Path>) -> Self { + // SAFETY: Box<Path> is always a valid path. + unsafe { PathBuf::from_bytes_unchecked(value.into_boxed_bytes().into_vec()) } + } +} + +impl From<&Path> for PathBuf { + fn from(value: &Path) -> Self { + value.to_owned() + } +} + +impl FromStr for PathBuf { + type Err = std::io::Error; + + fn from_str(s: &str) -> Result<PathBuf, Self::Err> { + Ok(Path::from_bytes(s.as_bytes()) + .ok_or(std::io::ErrorKind::InvalidData)? + .to_owned()) + } +} + +impl Debug for PathBuf { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Debug::fmt(&**self, f) + } +} + +impl Display for PathBuf { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + Display::fmt(&**self, f) + } +} + +impl PathBuf { + pub fn new() -> PathBuf { + Self::default() + } + + pub fn with_capacity(capacity: usize) -> PathBuf { + // SAFETY: The empty path is a valid path. + Self { + inner: Vec::with_capacity(capacity), + } + } + + /// Adjoins `name` to self. + pub fn try_push(&mut self, name: &[u8]) -> Result<(), std::io::Error> { + validate_node_name(name).map_err(|_| std::io::ErrorKind::InvalidData)?; + + if !self.inner.is_empty() { + self.inner.push(b'/'); + } + + self.inner.extend_from_slice(name); + + Ok(()) + } + + /// Convert a byte vector to a PathBuf, without checking validity. + unsafe fn from_bytes_unchecked(bytes: Vec<u8>) -> PathBuf { + PathBuf { inner: bytes } + } + + /// Convert from a [&std::path::Path] to [Self]. + /// + /// - Self uses `/` as path separator. + /// - Absolute paths are always rejected, are are these with custom prefixes. + /// - Repeated separators are deduplicated. + /// - Occurrences of `.` are normalized away. + /// - A trailing slash is normalized away. + /// + /// A `canonicalize_dotdot` boolean controls whether `..` will get + /// canonicalized if possible, or should return an error. + /// + /// For more exotic paths, this conversion might produce different results + /// on different platforms, due to different underlying byte + /// representations, which is why it's restricted to unix for now. + #[cfg(unix)] + pub fn from_host_path( + host_path: &std::path::Path, + canonicalize_dotdot: bool, + ) -> Result<Self, std::io::Error> { + let mut p = PathBuf::with_capacity(host_path.as_os_str().len()); + + for component in host_path.components() { + match component { + std::path::Component::Prefix(_) | std::path::Component::RootDir => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "found disallowed prefix or rootdir", + )) + } + std::path::Component::CurDir => continue, // ignore + std::path::Component::ParentDir => { + if canonicalize_dotdot { + // Try popping the last element from the path being constructed. + // FUTUREWORK: pop method? + p = p + .parent() + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "found .. going too far up", + ) + })? + .to_owned(); + } else { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "found disallowed ..", + )); + } + } + std::path::Component::Normal(s) => { + // append the new component to the path being constructed. + p.try_push(s.as_encoded_bytes()).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "encountered invalid node in sub_path component", + ) + })? + } + } + } + + Ok(p) + } + + pub fn into_boxed_path(self) -> Box<Path> { + // SAFETY: Box<[u8]> and Box<Path> have the same representation, + // and PathBuf always contains a valid Path. + unsafe { mem::transmute(self.inner.into_boxed_slice()) } + } + + pub fn into_bytes(self) -> Vec<u8> { + self.inner + } +} + +#[cfg(test)] +mod test { + use super::{Path, PathBuf}; + use bstr::ByteSlice; + use rstest::rstest; + + // TODO: add some manual tests including invalid UTF-8 (hard to express + // with rstest) + + #[rstest] + #[case::empty("", 0)] + #[case("a", 1)] + #[case("a/b", 2)] + #[case("a/b/c", 3)] + // add two slightly more cursed variants. + // Technically nothing prevents us from representing this with castore, + // but maybe we want to disallow constructing paths like this as it's a + // bad idea. + #[case::cursed("C:\\a/b", 2)] + #[case::cursed("\\\\tvix-store", 1)] + pub fn from_str(#[case] s: &str, #[case] num_components: usize) { + let p: PathBuf = s.parse().expect("must parse"); + + assert_eq!(s.as_bytes(), p.as_bytes(), "inner bytes mismatch"); + assert_eq!( + num_components, + p.components().count(), + "number of components mismatch" + ); + } + + #[rstest] + #[case::absolute("/a/b")] + #[case::two_forward_slashes_start("//a/b")] + #[case::two_forward_slashes_middle("a/b//c/d")] + #[case::trailing_slash("a/b/")] + #[case::dot(".")] + #[case::dotdot("..")] + #[case::dot_start("./a")] + #[case::dotdot_start("../a")] + #[case::dot_middle("a/./b")] + #[case::dotdot_middle("a/../b")] + #[case::dot_end("a/b/.")] + #[case::dotdot_end("a/b/..")] + #[case::null("fo\0o")] + pub fn from_str_fail(#[case] s: &str) { + s.parse::<PathBuf>().expect_err("must fail"); + } + + #[rstest] + #[case("foo", "")] + #[case("foo/bar", "foo")] + #[case("foo2/bar2", "foo2")] + #[case("foo/bar/baz", "foo/bar")] + pub fn parent(#[case] p: PathBuf, #[case] exp_parent: PathBuf) { + assert_eq!(Some(&*exp_parent), p.parent()); + } + + #[rstest] + pub fn no_parent() { + assert!(Path::ROOT.parent().is_none()); + } + + #[rstest] + #[case("a", "b", "a/b")] + #[case("a", "b", "a/b")] + pub fn join_push(#[case] mut p: PathBuf, #[case] name: &str, #[case] exp_p: PathBuf) { + assert_eq!(exp_p, p.try_join(name.as_bytes()).expect("join failed")); + p.try_push(name.as_bytes()).expect("push failed"); + assert_eq!(exp_p, p); + } + + #[rstest] + #[case("a", "/")] + #[case("a", "")] + #[case("a", "b/c")] + #[case("", "/")] + #[case("", "")] + #[case("", "b/c")] + #[case("", ".")] + #[case("", "..")] + pub fn join_push_fail(#[case] mut p: PathBuf, #[case] name: &str) { + p.try_join(name.as_bytes()) + .expect_err("join succeeded unexpectedly"); + p.try_push(name.as_bytes()) + .expect_err("push succeeded unexpectedly"); + } + + #[rstest] + #[case::empty("", vec![])] + #[case("a", vec!["a"])] + #[case("a/b", vec!["a", "b"])] + #[case("a/b/c", vec!["a","b", "c"])] + pub fn components(#[case] p: PathBuf, #[case] exp_components: Vec<&str>) { + assert_eq!( + exp_components, + p.components() + .map(|x| x.to_str().unwrap()) + .collect::<Vec<_>>() + ); + } + + #[rstest] + #[case::empty("", "", false)] + #[case::path("a", "a", false)] + #[case::path2("a/b", "a/b", false)] + #[case::double_slash_middle("a//b", "a/b", false)] + #[case::dot(".", "", false)] + #[case::dot_start("./a/b", "a/b", false)] + #[case::dot_middle("a/./b", "a/b", false)] + #[case::dot_end("a/b/.", "a/b", false)] + #[case::trailing_slash("a/b/", "a/b", false)] + #[case::dotdot_canonicalize("a/..", "", true)] + #[case::dotdot_canonicalize2("a/../b", "b", true)] + #[cfg_attr(unix, case::faux_prefix("\\\\nix-store", "\\\\nix-store", false))] + #[cfg_attr(unix, case::faux_letter("C:\\foo.txt", "C:\\foo.txt", false))] + pub fn from_host_path( + #[case] host_path: std::path::PathBuf, + #[case] exp_path: PathBuf, + #[case] canonicalize_dotdot: bool, + ) { + let p = PathBuf::from_host_path(&host_path, canonicalize_dotdot).expect("must succeed"); + + assert_eq!(exp_path, p); + } + + #[rstest] + #[case::absolute("/", false)] + #[case::dotdot_root("..", false)] + #[case::dotdot_root_canonicalize("..", true)] + #[case::dotdot_root_no_canonicalize("a/..", false)] + #[case::invalid_name("foo/bar\0", false)] + // #[cfg_attr(windows, case::prefix("\\\\nix-store", false))] + // #[cfg_attr(windows, case::letter("C:\\foo.txt", false))] + pub fn from_host_path_fail( + #[case] host_path: std::path::PathBuf, + #[case] canonicalize_dotdot: bool, + ) { + PathBuf::from_host_path(&host_path, canonicalize_dotdot).expect_err("must fail"); + } +} diff --git a/tvix/castore/src/proto/mod.rs b/tvix/castore/src/proto/mod.rs index 39c1bcc6fa..5374e3ae5a 100644 --- a/tvix/castore/src/proto/mod.rs +++ b/tvix/castore/src/proto/mod.rs @@ -66,7 +66,7 @@ pub enum ValidateStatBlobResponseError { /// Checks a Node name for validity as an intermediate node. /// We disallow slashes, null bytes, '.', '..' and the empty string. -fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { +pub(crate) fn validate_node_name(name: &[u8]) -> Result<(), ValidateNodeError> { if name.is_empty() || name == b".." || name == b"." diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs index 436e895863..5635f446b9 100644 --- a/tvix/cli/src/main.rs +++ b/tvix/cli/src/main.rs @@ -5,6 +5,7 @@ use std::{fs, path::PathBuf}; use tracing::Level; use tracing_subscriber::fmt::writer::MakeWriterExt; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tracing_subscriber::{EnvFilter, Layer}; use tvix_build::buildservice; use tvix_eval::builtins::impure_builtins; use tvix_eval::observer::{DisassemblingObserver, TracingObserver}; @@ -229,7 +230,13 @@ fn main() { let subscriber = tracing_subscriber::registry().with( tracing_subscriber::fmt::Layer::new() .with_writer(std::io::stderr.with_max_level(level)) - .pretty(), + .compact() + .with_filter( + EnvFilter::builder() + .with_default_directive(level.into()) + .from_env() + .expect("invalid RUST_LOG"), + ), ); subscriber .try_init() diff --git a/tvix/crate-hashes.json b/tvix/crate-hashes.json index ca45e43176..2c1e740cb9 100644 --- a/tvix/crate-hashes.json +++ b/tvix/crate-hashes.json @@ -1,4 +1,4 @@ { - "bigtable_rs 0.2.9 (git+https://github.com/flokli/bigtable_rs?rev=0af404741dfc40eb9fa99cf4d4140a09c5c20df7#0af404741dfc40eb9fa99cf4d4140a09c5c20df7)": "1njjam1lx2xlnm7a41lga8601vmjgqz0fvc77x24gd04pc7avxll", - "wu-manber 0.1.0 (git+https://github.com/tvlfyi/wu-manber.git#0d5b22bea136659f7de60b102a7030e0daaa503d)": "1zhk83lbq99xzyjwphv2qrb8f8qgfqwa5bbbvyzm0z0bljsjv0pd" + "git+https://github.com/flokli/bigtable_rs?rev=0af404741dfc40eb9fa99cf4d4140a09c5c20df7#0.2.9": "1njjam1lx2xlnm7a41lga8601vmjgqz0fvc77x24gd04pc7avxll", + "git+https://github.com/tvlfyi/wu-manber.git#wu-manber@0.1.0": "1zhk83lbq99xzyjwphv2qrb8f8qgfqwa5bbbvyzm0z0bljsjv0pd" } \ No newline at end of file diff --git a/tvix/eval/docs/bindings.md b/tvix/eval/docs/bindings.md new file mode 100644 index 0000000000..2b062cb13d --- /dev/null +++ b/tvix/eval/docs/bindings.md @@ -0,0 +1,133 @@ +Compilation of bindings +======================= + +Compilation of Nix bindings is one of the most mind-bending parts of Nix +evaluation. The implementation of just the compilation is currently almost 1000 +lines of code, excluding the various insane test cases we dreamt up for it. + +## What is a binding? + +In short, any attribute set or `let`-expression. Tvix currently does not treat +formals in function parameters (e.g. `{ name ? "fred" }: ...`) the same as these +bindings. + +They have two very difficult features: + +1. Keys can mutually refer to each other in `rec` sets or `let`-bindings, + including out of definition order. +2. Attribute sets can be nested, and parts of one attribute set can be defined + in multiple separate bindings. + +Tvix resolves as much of this logic statically (i.e. at compile-time) as +possible, but the procedure is quite complicated. + +## High-level concept + +The idea behind the way we compile bindings is to fully resolve nesting +statically, and use the usual mechanisms (i.e. recursion/thunking/value +capturing) for resolving dynamic values. + +This is done by compiling bindings in several phases: + +1. An initial compilation phase *only* for plain inherit statements (i.e. + `inherit name;`), *not* for namespaced inherits (i.e. `inherit (from) + name;`). + +2. A declaration-only phase, in which we use the compiler's scope tracking logic + to calculate the physical runtime stack indices (further referred to as + "stack slots" or just "slots") that all values will end up in. + + In this phase, whenever we encounter a nested attribute set, it is merged + into a custom data structure that acts like a synthetic AST node. + + This can be imagined similar to a rewrite like this: + + ```nix + # initial code: + { + a.b = 1; + a.c = 2; + } + + # rewritten form: + { + a = { + b = 1; + c = 2; + }; + } + ``` + + The rewrite applies to attribute sets and `let`-bindings alike. + + At the end of this phase, we know the stack slots of all namespaces for + inheriting from, all values inherited from them, and all values (and + optionall keys) of bindings at the current level. + + Only statically known keys are actually merged, so any dynamic keys that + conflict will lead to a "key already defined" error at runtime. + +3. A compilation phase, in which all values (and, when necessary, keys) are + actually compiled. In this phase the custom data structure used for merging + is encountered when compiling values. + + As this data structure acts like an AST node, the process begins recursively + for each nested attribute set. + +At the end of this process we have bytecode that leaves the required values (and +optionally keys) on the stack. In the case of attribute sets, a final operation +is emitted that constructs the actual attribute set structure at runtime. For +`let`-bindings a final operation is emitted that removes these locals from the +stack when the scope ends. + +## Moving parts + +WARNING: This documents the *current* implementation. If you only care about the +conceptual aspects, see above. + +There's a few types involved: + +* `PeekableAttrs`: peekable iterator over an attribute path (e.g. `a.b.c`) +* `BindingsKind`: enum defining the kind of bindings (attrs/recattrs/let) +* `AttributeSet`: struct holding the bindings kind, the AST nodes with inherits + (both namespaced and not), and an internal representation of bindings + (essentially a vector of tuples of the peekable attrs and the expression to + compile for the value). +* `Binding`: enum describing the kind of binding (namespaced inherit, attribute + set, plain binding of *any other value type*) +* `KeySlot`: enum describing the location in which a key slot is placed at + runtime (nowhere, statically known value in a slot, dynamic value in a slot) +* `TrackedBinding`: struct representing statically known information about a + single binding (its key slot, value slot and `Binding`) +* `TrackedBindings`: vector of tracked bindings, which implements logic for + merging attribute sets together + +And quite a few methods on `Compiler`: + +* `compile_bindings`: entry point for compiling anything that looks like a + binding, this calls out to the functions below. +* `compile_plain_inherits`: takes all inherits of a bindings node and compiles + the ones that are trivial to compile (i.e. just plain inherits without a + namespace). The `rnix` parser does not represent namespaced/plain inherits in + different nodes, so this function also aggregates the namespaced inherits and + returns them for further use +* `declare_namespaced_inherits`: passes over all namespaced inherits and + declares them on the locals stack, as well as inserts them into the provided + `TrackedBindings` +* `declare_bindings`: declares all regular key/value bindings in a bindings + scope, but without actually compiling their keys or values. + + There's a lot of heavy lifting going on here: + + 1. It invokes the various pieces of logic responsible for merging nested + attribute sets together, creating intermediate data structures in the value + slots of bindings that can be recursively processed the same way. + 2. It decides on the key slots of expressions based on the kind of bindings, + and the type of expression providing the key. +* `bind_values`: runs the actual compilation of values. Notably this function is + responsible for recursively compiling merged attribute sets when it encounters + a `Binding::Set` (on which it invokes `compile_bindings` itself). + +In addition to these several methods (such as `compile_attr_set`, +`compile_let_in`, ...) invoke the binding-kind specific logic and then call out +to the functions above. diff --git a/tvix/eval/src/builtins/impure.rs b/tvix/eval/src/builtins/impure.rs index 18403fe5d8..c82b910f5f 100644 --- a/tvix/eval/src/builtins/impure.rs +++ b/tvix/eval/src/builtins/impure.rs @@ -37,7 +37,7 @@ mod impure_builtins { Ok(p) => p, }; let r = generators::request_open_file(&co, path).await; - Ok(hash_nix_string(algo.to_str()?, r).map(Value::from)?) + hash_nix_string(algo.to_str()?, r).map(Value::from) } #[builtin("pathExists")] diff --git a/tvix/eval/src/vm/mod.rs b/tvix/eval/src/vm/mod.rs index c10b79cd99..5c244cc3ca 100644 --- a/tvix/eval/src/vm/mod.rs +++ b/tvix/eval/src/vm/mod.rs @@ -1148,7 +1148,7 @@ where let mut captured_with_stack = frame .upvalues .with_stack() - .map(Clone::clone) + .cloned() // ... or make an empty one if there isn't one already. .unwrap_or_else(|| Vec::with_capacity(self.with_stack.len())); diff --git a/tvix/eval/tests/nix_oracle.rs b/tvix/eval/tests/nix_oracle.rs index 6bab75cfd9..5a5cc0a822 100644 --- a/tvix/eval/tests/nix_oracle.rs +++ b/tvix/eval/tests/nix_oracle.rs @@ -30,7 +30,14 @@ fn nix_eval(expr: &str, strictness: Strictness) -> String { .arg(format!("({expr})")) .env( "NIX_REMOTE", - format!("local?root={}", store_dir.path().display()), + format!( + "local?root={}", + store_dir + .path() + .canonicalize() + .expect("valid path") + .display() + ), ) .output() .unwrap(); diff --git a/tvix/glue/src/builtins/derivation.rs b/tvix/glue/src/builtins/derivation.rs index 8c7df96f91..a7742ae40a 100644 --- a/tvix/glue/src/builtins/derivation.rs +++ b/tvix/glue/src/builtins/derivation.rs @@ -457,55 +457,59 @@ pub(crate) mod derivation_builtins { drv.validate(false) .map_err(DerivationError::InvalidDerivation)?; - // Calculate the derivation_or_fod_hash for the current derivation. - // This one is still intermediate (so not added to known_paths) - let derivation_or_fod_hash_tmp = drv.derivation_or_fod_hash(|drv_path| { - known_paths - .get_hash_derivation_modulo(&drv_path.to_owned()) - .unwrap_or_else(|| panic!("{} not found", drv_path)) - .to_owned() - }); + // Calculate the hash_derivation_modulo for the current derivation.. + debug_assert!( + drv.outputs.values().all(|output| { output.path.is_none() }), + "outputs should still be unset" + ); // Mutate the Derivation struct and set output paths - drv.calculate_output_paths(name, &derivation_or_fod_hash_tmp) - .map_err(DerivationError::InvalidDerivation)?; + drv.calculate_output_paths( + name, + // This one is still intermediate (so not added to known_paths), + // as the outputs are still unset. + &drv.hash_derivation_modulo(|drv_path| { + *known_paths + .get_hash_derivation_modulo(&drv_path.to_owned()) + .unwrap_or_else(|| panic!("{} not found", drv_path)) + }), + ) + .map_err(DerivationError::InvalidDerivation)?; let drv_path = drv .calculate_derivation_path(name) .map_err(DerivationError::InvalidDerivation)?; - // TODO: avoid cloning - known_paths.add_derivation(drv_path.clone(), drv.clone()); - - let mut new_attrs: Vec<(String, NixString)> = drv - .outputs - .into_iter() - .map(|(name, output)| { - ( - name.clone(), + // Assemble the attrset to return from this builtin. + let out = Value::Attrs(Box::new(NixAttrs::from_iter( + drv.outputs + .iter() + .map(|(name, output)| { + ( + name.clone(), + NixString::new_context_from( + NixContextElement::Single { + name: name.clone(), + derivation: drv_path.to_absolute_path(), + } + .into(), + output.path.as_ref().unwrap().to_absolute_path(), + ), + ) + }) + .chain(std::iter::once(( + "drvPath".to_owned(), NixString::new_context_from( - NixContextElement::Single { - name, - derivation: drv_path.to_absolute_path(), - } - .into(), - output.path.unwrap().to_absolute_path(), + NixContextElement::Derivation(drv_path.to_absolute_path()).into(), + drv_path.to_absolute_path(), ), - ) - }) - .collect(); - - new_attrs.push(( - "drvPath".to_string(), - NixString::new_context_from( - NixContextElement::Derivation(drv_path.to_absolute_path()).into(), - drv_path.to_absolute_path(), - ), - )); - - Ok(Value::Attrs(Box::new(NixAttrs::from_iter( - new_attrs.into_iter(), - )))) + ))), + ))); + + // Register the Derivation in known_paths. + known_paths.add_derivation(drv_path, drv); + + Ok(out) } #[builtin("toFile")] diff --git a/tvix/glue/src/builtins/errors.rs b/tvix/glue/src/builtins/errors.rs index c05d366f13..f6d5745c56 100644 --- a/tvix/glue/src/builtins/errors.rs +++ b/tvix/glue/src/builtins/errors.rs @@ -6,6 +6,7 @@ use nix_compat::{ use reqwest::Url; use std::rc::Rc; use thiserror::Error; +use tvix_castore::import; /// Errors related to derivation construction #[derive(Debug, Error)] @@ -52,10 +53,7 @@ pub enum FetcherError { Io(#[from] std::io::Error), #[error(transparent)] - Import(#[from] tvix_castore::import::Error), - - #[error(transparent)] - ImportArchive(#[from] tvix_castore::import::archive::Error), + Import(#[from] tvix_castore::import::IngestionError<import::archive::Error>), #[error("Error calculating store path for fetcher output: {0}")] StorePath(#[from] BuildStorePathError), diff --git a/tvix/glue/src/builtins/import.rs b/tvix/glue/src/builtins/import.rs index 6814781df3..219695b69f 100644 --- a/tvix/glue/src/builtins/import.rs +++ b/tvix/glue/src/builtins/import.rs @@ -95,9 +95,9 @@ async fn filtered_ingest( ); ingest_entries(&state.directory_service, entries) .await - .map_err(|err| ErrorKind::IO { + .map_err(|e| ErrorKind::IO { path: Some(path.to_path_buf()), - error: Rc::new(err.into()), + error: Rc::new(std::io::Error::new(std::io::ErrorKind::Other, e)), }) }) } diff --git a/tvix/glue/src/builtins/mod.rs b/tvix/glue/src/builtins/mod.rs index 4081489e0e..0c7bcc880a 100644 --- a/tvix/glue/src/builtins/mod.rs +++ b/tvix/glue/src/builtins/mod.rs @@ -739,6 +739,7 @@ mod tests { false )] fn builtins_filter_source_unsupported_files(#[case] code: &str, #[case] exp_success: bool) { + use nix::errno::Errno; use nix::sys::stat; use nix::unistd; use std::os::unix::net::UnixListener; @@ -765,6 +766,15 @@ mod tests { stat::Mode::S_IRWXU, 0, ) + .inspect_err(|e| { + if *e == Errno::EPERM { + eprintln!( + "\ +Missing permissions to create a character device node with mknod(2). +Please run this test as root or set CAP_MKNOD." + ); + } + }) .expect("Failed to create a character device node"); let code_replaced = code.replace("@fixtures", &temp.path().to_string_lossy()); diff --git a/tvix/glue/src/decompression.rs b/tvix/glue/src/fetchers/decompression.rs index 11dc9d9835..f96fa60e34 100644 --- a/tvix/glue/src/decompression.rs +++ b/tvix/glue/src/fetchers/decompression.rs @@ -204,9 +204,9 @@ mod tests { } #[rstest] - #[case::gzip(include_bytes!("tests/blob.tar.gz"))] - #[case::bzip2(include_bytes!("tests/blob.tar.bz2"))] - #[case::xz(include_bytes!("tests/blob.tar.xz"))] + #[case::gzip(include_bytes!("../tests/blob.tar.gz"))] + #[case::bzip2(include_bytes!("../tests/blob.tar.bz2"))] + #[case::xz(include_bytes!("../tests/blob.tar.xz"))] #[tokio::test] async fn compressed_tar(#[case] data: &[u8]) { let reader = DecompressedReader::new(BufReader::new(data)); diff --git a/tvix/glue/src/fetchers.rs b/tvix/glue/src/fetchers/mod.rs index 7560c447d8..342dfd84e8 100644 --- a/tvix/glue/src/fetchers.rs +++ b/tvix/glue/src/fetchers/mod.rs @@ -17,7 +17,10 @@ use tvix_castore::{ use tvix_store::{pathinfoservice::PathInfoService, proto::PathInfo}; use url::Url; -use crate::{builtins::FetcherError, decompression::DecompressedReader}; +use crate::builtins::FetcherError; + +mod decompression; +use decompression::DecompressedReader; /// Representing options for doing a fetch. #[derive(Clone, Eq, PartialEq)] @@ -28,7 +31,8 @@ pub enum Fetch { URL(Url, Option<NixHash>), /// Fetch a tarball from the given URL and unpack. - /// The file must be a tape archive (.tar) compressed with gzip, bzip2 or xz. + /// The file must be a tape archive (.tar), optionally compressed with gzip, + /// bzip2 or xz. /// The top-level path component of the files in the tarball is removed, /// so it is best if the tarball contains a single directory at top level. /// Optionally, a sha256 digest can be provided to verify the unpacked @@ -56,10 +60,10 @@ fn redact_url(url: &Url) -> Url { impl std::fmt::Debug for Fetch { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Fetch::URL(url, nixhash) => { + Fetch::URL(url, exp_hash) => { let url = redact_url(url); - if let Some(nixhash) = nixhash { - write!(f, "URL [url: {}, exp_hash: Some({})]", &url, nixhash) + if let Some(exp_hash) = exp_hash { + write!(f, "URL [url: {}, exp_hash: Some({})]", &url, exp_hash) } else { write!(f, "URL [url: {}, exp_hash: None]", &url) } @@ -168,8 +172,8 @@ async fn hash<D: Digest + std::io::Write>( impl<BS, DS, PS> Fetcher<BS, DS, PS> where - BS: AsRef<(dyn BlobService + 'static)> + Clone + Send + Sync + 'static, - DS: AsRef<(dyn DirectoryService + 'static)>, + BS: BlobService + Clone + 'static, + DS: DirectoryService + Clone, PS: PathInfoService, { /// Ingest the data from a specified [Fetch]. @@ -178,7 +182,7 @@ where /// didn't match the previously communicated hash contained inside the FetchArgs. pub async fn ingest(&self, fetch: Fetch) -> Result<(Node, CAHash, u64), FetcherError> { match fetch { - Fetch::URL(url, exp_nixhash) => { + Fetch::URL(url, exp_hash) => { // Construct a AsyncRead reading from the data as its downloaded. let mut r = self.download(url.clone()).await?; @@ -188,7 +192,7 @@ where // Copy the contents from the download reader to the blob writer. // Calculate the digest of the file received, depending on the // communicated expected hash (or sha256 if none provided). - let (actual_nixhash, blob_size) = match exp_nixhash + let (actual_hash, blob_size) = match exp_hash .as_ref() .map(NixHash::algo) .unwrap_or_else(|| HashAlgo::Sha256) @@ -209,12 +213,12 @@ where )?, }; - if let Some(exp_nixhash) = exp_nixhash { - if exp_nixhash != actual_nixhash { + if let Some(exp_hash) = exp_hash { + if exp_hash != actual_hash { return Err(FetcherError::HashMismatch { url, - wanted: exp_nixhash, - got: actual_nixhash, + wanted: exp_hash, + got: actual_hash, }); } } @@ -227,7 +231,7 @@ where size: blob_size, executable: false, }), - CAHash::Flat(actual_nixhash), + CAHash::Flat(actual_hash), blob_size, )) } @@ -243,7 +247,7 @@ where // Ingest the archive, get the root node let node = tvix_castore::import::archive::ingest_archive( self.blob_service.clone(), - &self.directory_service, + self.directory_service.clone(), archive, ) .await?; @@ -379,12 +383,12 @@ mod tests { #[test] fn fetchurl_store_path() { let url = Url::parse("https://raw.githubusercontent.com/aaptel/notmuch-extract-patch/f732a53e12a7c91a06755ebfab2007adc9b3063b/notmuch-extract-patch").unwrap(); - let exp_nixhash = NixHash::Sha256( + let exp_hash = NixHash::Sha256( nixbase32::decode_fixed("0nawkl04sj7psw6ikzay7kydj3dhd0fkwghcsf5rzaw4bmp4kbax") .unwrap(), ); - let fetch = Fetch::URL(url, Some(exp_nixhash)); + let fetch = Fetch::URL(url, Some(exp_hash)); assert_eq!( "06qi00hylriyfm0nl827crgjvbax84mz-notmuch-extract-patch", &fetch diff --git a/tvix/glue/src/known_paths.rs b/tvix/glue/src/known_paths.rs index c95065592b..290c9d5b69 100644 --- a/tvix/glue/src/known_paths.rs +++ b/tvix/glue/src/known_paths.rs @@ -73,7 +73,7 @@ impl KnownPaths { } // compute the hash derivation modulo - let hash_derivation_modulo = drv.derivation_or_fod_hash(|drv_path| { + let hash_derivation_modulo = drv.hash_derivation_modulo(|drv_path| { self.get_hash_derivation_modulo(&drv_path.to_owned()) .unwrap_or_else(|| panic!("{} not found", drv_path)) .to_owned() diff --git a/tvix/glue/src/lib.rs b/tvix/glue/src/lib.rs index 8528f09e52..2e5a3be103 100644 --- a/tvix/glue/src/lib.rs +++ b/tvix/glue/src/lib.rs @@ -6,7 +6,6 @@ pub mod tvix_build; pub mod tvix_io; pub mod tvix_store_io; -mod decompression; #[cfg(test)] mod tests; diff --git a/tvix/glue/src/tests/mod.rs b/tvix/glue/src/tests/mod.rs index e66f484e3d..8e1572b6e3 100644 --- a/tvix/glue/src/tests/mod.rs +++ b/tvix/glue/src/tests/mod.rs @@ -14,6 +14,8 @@ use rstest::rstest; use crate::{ builtins::{add_derivation_builtins, add_fetcher_builtins, add_import_builtins}, + configure_nix_path, + tvix_io::TvixIO, tvix_store_io::TvixStoreIO, }; @@ -50,12 +52,17 @@ fn eval_test(code_path: PathBuf, expect_success: bool) { Arc::new(DummyBuildService::default()), tokio_runtime.handle().clone(), )); - let mut eval = tvix_eval::Evaluation::new(tvix_store_io.clone() as Rc<dyn EvalIO>, true); + // Wrap with TvixIO, so <nix/fetchurl.nix can be imported. + let mut eval = tvix_eval::Evaluation::new( + Box::new(TvixIO::new(tvix_store_io.clone() as Rc<dyn EvalIO>)) as Box<dyn EvalIO>, + true, + ); eval.strict = true; add_derivation_builtins(&mut eval, tvix_store_io.clone()); add_fetcher_builtins(&mut eval, tvix_store_io.clone()); add_import_builtins(&mut eval, tvix_store_io.clone()); + configure_nix_path(&mut eval, &None); let result = eval.evaluate(code, Some(code_path.clone())); let failed = match result.value { diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs index 1f709906de..7daefffe82 100644 --- a/tvix/glue/src/tvix_store_io.rs +++ b/tvix/glue/src/tvix_store_io.rs @@ -305,6 +305,9 @@ impl TvixStoreIO { }; // now with the root_node and sub_path, descend to the node requested. + // We convert sub_path to the castore model here. + let sub_path = tvix_castore::PathBuf::from_host_path(sub_path, true)?; + directoryservice::descend_to(&self.directory_service, root_node, sub_path) .await .map_err(|e| std::io::Error::new(io::ErrorKind::Other, e)) diff --git a/tvix/nar-bridge/pkg/http/narinfo_get.go b/tvix/nar-bridge/pkg/http/narinfo_get.go index 98d85744d8..d43cb58078 100644 --- a/tvix/nar-bridge/pkg/http/narinfo_get.go +++ b/tvix/nar-bridge/pkg/http/narinfo_get.go @@ -96,37 +96,42 @@ func renderNarinfo( } func registerNarinfoGet(s *Server) { - // GET $outHash.narinfo looks up the PathInfo from the tvix-store, - // and then render a .narinfo file to the client. - // It will keep the PathInfo in the lookup map, - // so a subsequent GET /nar/ $narhash.nar request can find it. - s.handler.Get("/{outputhash:^["+nixbase32.Alphabet+"]{32}}.narinfo", func(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - - ctx := r.Context() - log := log.WithField("outputhash", chi.URLParamFromCtx(ctx, "outputhash")) - - // parse the output hash sent in the request URL - outputHash, err := nixbase32.DecodeString(chi.URLParamFromCtx(ctx, "outputhash")) - if err != nil { - log.WithError(err).Error("unable to decode output hash from url") - w.WriteHeader(http.StatusBadRequest) - _, err := w.Write([]byte("unable to decode output hash from url")) + // GET/HEAD $outHash.narinfo looks up the PathInfo from the tvix-store, + // and, if it's a GET request, render a .narinfo file to the client. + // In both cases it will keep the PathInfo in the lookup map, + // so a subsequent GET/HEAD /nar/ $narhash.nar request can find it. + genNarinfoHandler := func(isHead bool) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + ctx := r.Context() + log := log.WithField("outputhash", chi.URLParamFromCtx(ctx, "outputhash")) + + // parse the output hash sent in the request URL + outputHash, err := nixbase32.DecodeString(chi.URLParamFromCtx(ctx, "outputhash")) if err != nil { - log.WithError(err).Errorf("unable to write error message to client") + log.WithError(err).Error("unable to decode output hash from url") + w.WriteHeader(http.StatusBadRequest) + _, err := w.Write([]byte("unable to decode output hash from url")) + if err != nil { + log.WithError(err).Errorf("unable to write error message to client") + } + + return } - return - } - - err = renderNarinfo(ctx, log, s.pathInfoServiceClient, &s.narDbMu, s.narDb, outputHash, w, false) - if err != nil { - if errors.Is(err, fs.ErrNotExist) { - w.WriteHeader(http.StatusNotFound) - } else { - log.WithError(err).Warn("unable to render narinfo") - w.WriteHeader(http.StatusInternalServerError) + err = renderNarinfo(ctx, log, s.pathInfoServiceClient, &s.narDbMu, s.narDb, outputHash, w, isHead) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + w.WriteHeader(http.StatusNotFound) + } else { + log.WithError(err).Warn("unable to render narinfo") + w.WriteHeader(http.StatusInternalServerError) + } } } - }) + } + + s.handler.Get("/{outputhash:^["+nixbase32.Alphabet+"]{32}}.narinfo", genNarinfoHandler(false)) + s.handler.Head("/{outputhash:^["+nixbase32.Alphabet+"]{32}}.narinfo", genNarinfoHandler(true)) } diff --git a/tvix/nix-compat/src/derivation/mod.rs b/tvix/nix-compat/src/derivation/mod.rs index 07da127ed0..6e12e3ea86 100644 --- a/tvix/nix-compat/src/derivation/mod.rs +++ b/tvix/nix-compat/src/derivation/mod.rs @@ -188,11 +188,12 @@ impl Derivation { /// `fixed:out:${algo}:${digest}:${fodPath}` string is hashed instead of /// the A-Term. /// - /// If the derivation is not a fixed derivation, it's up to the caller of - /// this function to provide a lookup function to lookup these calculation - /// results of parent derivations at `fn_get_derivation_or_fod_hash` (by - /// drv path). - pub fn derivation_or_fod_hash<F>(&self, fn_get_derivation_or_fod_hash: F) -> [u8; 32] + /// It's up to the caller of this function to provide a (infallible) lookup + /// function to query [hash_derivation_modulo] of direct input derivations, + /// by their [StorePathRef]. + /// It will only be called in case the derivation is not a fixed-output + /// derivation. + pub fn hash_derivation_modulo<F>(&self, fn_lookup_hash_derivation_modulo: F) -> [u8; 32] where F: Fn(&StorePathRef) -> [u8; 32], { @@ -200,16 +201,16 @@ impl Derivation { // Non-Fixed-output derivations return the sha256 digest of the ATerm // notation, but with all input_derivation paths replaced by a recursive // call to this function. - // We use fn_get_derivation_or_fod_hash here, so callers can precompute this. + // We call [fn_lookup_hash_derivation_modulo] rather than recursing + // ourselves, so callers can precompute this. self.fod_digest().unwrap_or({ - // For each input_derivation, look up the - // derivation_or_fod_hash, and replace the derivation path with - // it's HEXLOWER digest. + // For each input_derivation, look up the hash derivation modulo, + // and replace the derivation path in the aterm with it's HEXLOWER digest. let aterm_bytes = self.to_aterm_bytes_with_replacements(&BTreeMap::from_iter( self.input_derivations .iter() .map(|(drv_path, output_names)| { - let hash = fn_get_derivation_or_fod_hash(&drv_path.into()); + let hash = fn_lookup_hash_derivation_modulo(&drv_path.into()); (hash, output_names.to_owned()) }), @@ -226,20 +227,22 @@ impl Derivation { /// and self.environment[$outputName] needs to be an empty string. /// /// Output path calculation requires knowledge of the - /// derivation_or_fod_hash [NixHash], which (in case of non-fixed-output - /// derivations) also requires knowledge of other hash_derivation_modulo - /// [NixHash]es. + /// [hash_derivation_modulo], which (in case of non-fixed-output + /// derivations) also requires knowledge of the [hash_derivation_modulo] of + /// input derivations (recursively). /// - /// We solve this by asking the caller of this function to provide the - /// hash_derivation_modulo of the current Derivation. + /// To avoid recursing and doing unnecessary calculation, we simply + /// ask the caller of this function to provide the result of the + /// [hash_derivation_modulo] call of the current [Derivation], + /// and leave it up to them to calculate it when needed. /// - /// On completion, self.environment[$outputName] and - /// self.outputs[$outputName].path are set to the calculated output path for all + /// On completion, `self.environment[$outputName]` and + /// `self.outputs[$outputName].path` are set to the calculated output path for all /// outputs. pub fn calculate_output_paths( &mut self, name: &str, - derivation_or_fod_hash: &[u8; 32], + hash_derivation_modulo: &[u8; 32], ) -> Result<(), DerivationError> { // The fingerprint and hash differs per output for (output_name, output) in self.outputs.iter_mut() { @@ -250,14 +253,14 @@ impl Derivation { let path_name = output_path_name(name, output_name); - // For fixed output derivation we use the per-output info, otherwise we use the - // derivation hash. + // For fixed output derivation we use [build_ca_path], otherwise we + // use [build_output_path] with [hash_derivation_modulo]. let abs_store_path = if let Some(ref hwm) = output.ca_hash { build_ca_path(&path_name, hwm, Vec::<String>::new(), false).map_err(|e| { DerivationError::InvalidOutputDerivationPath(output_name.to_string(), e) })? } else { - build_output_path(derivation_or_fod_hash, output_name, &path_name).map_err(|e| { + build_output_path(hash_derivation_modulo, output_name, &path_name).map_err(|e| { DerivationError::InvalidOutputDerivationPath( output_name.to_string(), store_path::BuildStorePathError::InvalidStorePath(e), diff --git a/tvix/nix-compat/src/derivation/tests/mod.rs b/tvix/nix-compat/src/derivation/tests/mod.rs index 63a65356bd..48d4e8926a 100644 --- a/tvix/nix-compat/src/derivation/tests/mod.rs +++ b/tvix/nix-compat/src/derivation/tests/mod.rs @@ -164,7 +164,7 @@ fn derivation_path(#[case] name: &str, #[case] expected_path: &str) { /// This trims all output paths from a Derivation struct, /// by setting outputs[$outputName].path and environment[$outputName] to the empty string. -fn derivation_with_trimmed_output_paths(derivation: &Derivation) -> Derivation { +fn derivation_without_output_paths(derivation: &Derivation) -> Derivation { let mut trimmed_env = derivation.environment.clone(); let mut trimmed_outputs = derivation.outputs.clone(); @@ -191,13 +191,13 @@ fn derivation_with_trimmed_output_paths(derivation: &Derivation) -> Derivation { #[rstest] #[case::fixed_sha256("0hm2f1psjpcwg8fijsmr4wwxrx59s092-bar.drv", hex!("724f3e3634fce4cbbbd3483287b8798588e80280660b9a63fd13a1bc90485b33"))] #[case::fixed_sha1("ss2p4wmxijn652haqyd7dckxwl4c7hxx-bar.drv", hex!("c79aebd0ce3269393d4a1fde2cbd1d975d879b40f0bf40a48f550edc107fd5df"))] -fn derivation_or_fod_hash(#[case] drv_path: &str, #[case] expected_digest: [u8; 32]) { +fn hash_derivation_modulo_fixed(#[case] drv_path: &str, #[case] expected_digest: [u8; 32]) { // read in the fixture let json_bytes = fs::read(format!("{}/ok/{}.json", RESOURCES_PATHS, drv_path)).expect("unable to read JSON"); let drv: Derivation = serde_json::from_slice(&json_bytes).expect("must deserialize"); - let actual = drv.derivation_or_fod_hash(|_| panic!("must not be called")); + let actual = drv.hash_derivation_modulo(|_| panic!("must not be called")); assert_eq!(expected_digest, actual); } @@ -224,13 +224,13 @@ fn output_paths(#[case] name: &str, #[case] drv_path_str: &str) { ) .expect("must succeed"); - // create a version with trimmed output paths, simulating we constructed - // the struct. - let mut derivation = derivation_with_trimmed_output_paths(&expected_derivation); + // create a version without output paths, simulating we constructed the + // struct. + let mut derivation = derivation_without_output_paths(&expected_derivation); - // calculate the derivation_or_fod_hash of derivation + // calculate the hash_derivation_modulo of Derivation // We don't expect the lookup function to be called for most derivations. - let calculated_derivation_or_fod_hash = derivation.derivation_or_fod_hash(|parent_drv_path| { + let actual_hash_derivation_modulo = derivation.hash_derivation_modulo(|parent_drv_path| { // 4wvvbi4jwn0prsdxb7vs673qa5h9gr7x-foo.drv may lookup /nix/store/0hm2f1psjpcwg8fijsmr4wwxrx59s092-bar.drv // ch49594n9avinrf8ip0aslidkc4lxkqv-foo.drv may lookup /nix/store/ss2p4wmxijn652haqyd7dckxwl4c7hxx-bar.drv if name == "foo" @@ -255,9 +255,9 @@ fn output_paths(#[case] name: &str, #[case] drv_path_str: &str) { let drv: Derivation = serde_json::from_slice(&json_bytes).expect("must deserialize"); - // calculate derivation_or_fod_hash for each parent. + // calculate hash_derivation_modulo for each parent. // This may not trigger subsequent requests, as both parents are FOD. - drv.derivation_or_fod_hash(|_| panic!("must not lookup")) + drv.hash_derivation_modulo(|_| panic!("must not lookup")) } else { // we only expect this to be called in the "foo" testcase, for the "bar derivations" panic!("may only be called for foo testcase on bar derivations"); @@ -265,7 +265,7 @@ fn output_paths(#[case] name: &str, #[case] drv_path_str: &str) { }); derivation - .calculate_output_paths(name, &calculated_derivation_or_fod_hash) + .calculate_output_paths(name, &actual_hash_derivation_modulo) .unwrap(); // The derivation should now look like it was before @@ -343,7 +343,7 @@ fn output_path_construction() { // calculate bar output paths let bar_calc_result = bar_drv.calculate_output_paths( "bar", - &bar_drv.derivation_or_fod_hash(|_| panic!("is FOD, should not lookup")), + &bar_drv.hash_derivation_modulo(|_| panic!("is FOD, should not lookup")), ); assert!(bar_calc_result.is_ok()); @@ -360,8 +360,8 @@ fn output_path_construction() { // now construct foo, which requires bar_drv // Note how we refer to the output path, drv name and replacement_str (with calculated output paths) of bar. let bar_output_path = &bar_drv.outputs.get("out").expect("must exist").path; - let bar_drv_derivation_or_fod_hash = - bar_drv.derivation_or_fod_hash(|_| panic!("is FOD, should not lookup")); + let bar_drv_hash_derivation_modulo = + bar_drv.hash_derivation_modulo(|_| panic!("is FOD, should not lookup")); let bar_drv_path = bar_drv .calculate_derivation_path("bar") @@ -408,11 +408,11 @@ fn output_path_construction() { // calculate foo output paths let foo_calc_result = foo_drv.calculate_output_paths( "foo", - &foo_drv.derivation_or_fod_hash(|drv_path| { + &foo_drv.hash_derivation_modulo(|drv_path| { if drv_path.to_string() != "0hm2f1psjpcwg8fijsmr4wwxrx59s092-bar.drv" { panic!("lookup called with unexpected drv_path: {}", drv_path); } - bar_drv_derivation_or_fod_hash + bar_drv_hash_derivation_modulo }), ); assert!(foo_calc_result.is_ok()); diff --git a/tvix/nix-compat/src/nar/reader/async/mod.rs b/tvix/nix-compat/src/nar/reader/async/mod.rs new file mode 100644 index 0000000000..aaf00faf44 --- /dev/null +++ b/tvix/nix-compat/src/nar/reader/async/mod.rs @@ -0,0 +1,166 @@ +use std::{ + pin::Pin, + task::{self, Poll}, +}; + +use tokio::io::{self, AsyncBufRead, AsyncRead, ErrorKind::InvalidData}; + +// Required reading for understanding this module. +use crate::{ + nar::{self, wire::PadPar}, + wire::{self, BytesReader}, +}; + +mod read; +#[cfg(test)] +mod test; + +pub type Reader<'a> = dyn AsyncBufRead + Unpin + Send + 'a; + +/// Start reading a NAR file from `reader`. +pub async fn open<'a, 'r>(reader: &'a mut Reader<'r>) -> io::Result<Node<'a, 'r>> { + read::token(reader, &nar::wire::TOK_NAR).await?; + Node::new(reader).await +} + +pub enum Node<'a, 'r: 'a> { + Symlink { + target: Vec<u8>, + }, + File { + executable: bool, + reader: FileReader<'a, 'r>, + }, + Directory(DirReader<'a, 'r>), +} + +impl<'a, 'r: 'a> Node<'a, 'r> { + /// Start reading a [Node], matching the next [wire::Node]. + /// + /// Reading the terminating [wire::TOK_PAR] is done immediately for [Node::Symlink], + /// but is otherwise left to [DirReader] or [BytesReader]. + async fn new(reader: &'a mut Reader<'r>) -> io::Result<Self> { + Ok(match read::tag(reader).await? { + nar::wire::Node::Sym => { + let target = wire::read_bytes(reader, 1..=nar::wire::MAX_TARGET_LEN).await?; + + if target.contains(&0) { + return Err(InvalidData.into()); + } + + read::token(reader, &nar::wire::TOK_PAR).await?; + + Node::Symlink { target } + } + tag @ (nar::wire::Node::Reg | nar::wire::Node::Exe) => Node::File { + executable: tag == nar::wire::Node::Exe, + reader: FileReader { + inner: BytesReader::new_internal(reader, ..).await?, + }, + }, + nar::wire::Node::Dir => Node::Directory(DirReader::new(reader)), + }) + } +} + +/// File contents, readable through the [AsyncRead] trait. +/// +/// It comes with some caveats: +/// * You must always read the entire file, unless you intend to abandon the entire archive reader. +/// * You must abandon the entire archive reader upon the first error. +/// +/// It's fine to read exactly `reader.len()` bytes without ever seeing an explicit EOF. +pub struct FileReader<'a, 'r> { + inner: BytesReader<&'a mut Reader<'r>, PadPar>, +} + +impl<'a, 'r> FileReader<'a, 'r> { + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + pub fn len(&self) -> u64 { + self.inner.len() + } +} + +impl<'a, 'r> AsyncRead for FileReader<'a, 'r> { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context, + buf: &mut io::ReadBuf, + ) -> Poll<io::Result<()>> { + Pin::new(&mut self.get_mut().inner).poll_read(cx, buf) + } +} + +/// A directory iterator, yielding a sequence of [Node]s. +/// It must be fully consumed before reading further from the [DirReader] that produced it, if any. +pub struct DirReader<'a, 'r> { + reader: &'a mut Reader<'r>, + /// Previous directory entry name. + /// We have to hang onto this to enforce name monotonicity. + prev_name: Option<Vec<u8>>, +} + +pub struct Entry<'a, 'r> { + pub name: Vec<u8>, + pub node: Node<'a, 'r>, +} + +impl<'a, 'r> DirReader<'a, 'r> { + fn new(reader: &'a mut Reader<'r>) -> Self { + Self { + reader, + prev_name: None, + } + } + + /// Read the next [Entry] from the directory. + /// + /// We explicitly don't implement [Iterator], since treating this as + /// a regular Rust iterator will surely lead you astray. + /// + /// * You must always consume the entire iterator, unless you abandon the entire archive reader. + /// * You must abandon the entire archive reader on the first error. + /// * You must abandon the directory reader upon the first [None]. + /// * Even if you know the amount of elements up front, you must keep reading until you encounter [None]. + pub async fn next(&mut self) -> io::Result<Option<Entry<'_, 'r>>> { + // COME FROM the previous iteration: if we've already read an entry, + // read its terminating TOK_PAR here. + if self.prev_name.is_some() { + read::token(self.reader, &nar::wire::TOK_PAR).await?; + } + + if let nar::wire::Entry::None = read::tag(self.reader).await? { + return Ok(None); + } + + let name = wire::read_bytes(self.reader, 1..=nar::wire::MAX_NAME_LEN).await?; + + if name.contains(&0) || name.contains(&b'/') || name == b"." || name == b".." { + return Err(InvalidData.into()); + } + + // Enforce strict monotonicity of directory entry names. + match &mut self.prev_name { + None => { + self.prev_name = Some(name.clone()); + } + Some(prev_name) => { + if *prev_name >= name { + return Err(InvalidData.into()); + } + + name[..].clone_into(prev_name); + } + } + + read::token(self.reader, &nar::wire::TOK_NOD).await?; + + Ok(Some(Entry { + name, + node: Node::new(self.reader).await?, + })) + } +} diff --git a/tvix/nix-compat/src/nar/reader/async/read.rs b/tvix/nix-compat/src/nar/reader/async/read.rs new file mode 100644 index 0000000000..2adf894922 --- /dev/null +++ b/tvix/nix-compat/src/nar/reader/async/read.rs @@ -0,0 +1,69 @@ +use tokio::io::{ + self, AsyncReadExt, + ErrorKind::{InvalidData, UnexpectedEof}, +}; + +use crate::nar::wire::Tag; + +use super::Reader; + +/// Consume a known token from the reader. +pub async fn token<const N: usize>(reader: &mut Reader<'_>, token: &[u8; N]) -> io::Result<()> { + let mut buf = [0u8; N]; + + // This implements something similar to [AsyncReadExt::read_exact], but verifies that + // the input data matches the token while we read it. These two slices respectively + // represent the remaining token to be verified, and the remaining input buffer. + let mut token = &token[..]; + let mut buf = &mut buf[..]; + + while !token.is_empty() { + match reader.read(buf).await? { + 0 => { + return Err(UnexpectedEof.into()); + } + n => { + let (t, b); + (t, token) = token.split_at(n); + (b, buf) = buf.split_at_mut(n); + + if t != b { + return Err(InvalidData.into()); + } + } + } + } + + Ok(()) +} + +/// Consume a [Tag] from the reader. +pub async fn tag<T: Tag>(reader: &mut Reader<'_>) -> io::Result<T> { + let mut buf = T::make_buf(); + let buf = buf.as_mut(); + + // first read the known minimum lengthโฆ + reader.read_exact(&mut buf[..T::MIN]).await?; + + // then decide which tag we're expecting + let tag = T::from_u8(buf[T::OFF]).ok_or(InvalidData)?; + let (head, tail) = tag.as_bytes().split_at(T::MIN); + + // make sure what we've read so far is valid + if buf[..T::MIN] != *head { + return Err(InvalidData.into()); + } + + // โฆthen read the rest, if any + if !tail.is_empty() { + let rest = tail.len(); + reader.read_exact(&mut buf[..rest]).await?; + + // and make sure it's what we expect + if buf[..rest] != *tail { + return Err(InvalidData.into()); + } + } + + Ok(tag) +} diff --git a/tvix/nix-compat/src/nar/reader/async/test.rs b/tvix/nix-compat/src/nar/reader/async/test.rs new file mode 100644 index 0000000000..58bb651fca --- /dev/null +++ b/tvix/nix-compat/src/nar/reader/async/test.rs @@ -0,0 +1,310 @@ +use tokio::io::AsyncReadExt; + +mod nar { + pub use crate::nar::reader::r#async as reader; +} + +#[tokio::test] +async fn symlink() { + let mut f = std::io::Cursor::new(include_bytes!("../../tests/symlink.nar")); + let node = nar::reader::open(&mut f).await.unwrap(); + + match node { + nar::reader::Node::Symlink { target } => { + assert_eq!( + &b"/nix/store/somewhereelse"[..], + &target, + "target must match" + ); + } + _ => panic!("unexpected type"), + } +} + +#[tokio::test] +async fn file() { + let mut f = std::io::Cursor::new(include_bytes!("../../tests/helloworld.nar")); + let node = nar::reader::open(&mut f).await.unwrap(); + + match node { + nar::reader::Node::File { + executable, + mut reader, + } => { + assert!(!executable); + let mut buf = vec![]; + reader + .read_to_end(&mut buf) + .await + .expect("read must succeed"); + assert_eq!(&b"Hello World!"[..], &buf); + } + _ => panic!("unexpected type"), + } +} + +#[tokio::test] +async fn complicated() { + let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar")); + let node = nar::reader::open(&mut f).await.unwrap(); + + match node { + nar::reader::Node::Directory(mut dir_reader) => { + // first entry is .keep, an empty regular file. + must_read_file( + ".keep", + dir_reader + .next() + .await + .expect("next must succeed") + .expect("must be some"), + ) + .await; + + // second entry is aa, a symlink to /nix/store/somewhereelse + must_be_symlink( + "aa", + "/nix/store/somewhereelse", + dir_reader + .next() + .await + .expect("next must be some") + .expect("must be some"), + ); + + { + // third entry is a directory called "keep" + let entry = dir_reader + .next() + .await + .expect("next must be some") + .expect("must be some"); + + assert_eq!(&b"keep"[..], &entry.name); + + match entry.node { + nar::reader::Node::Directory(mut subdir_reader) => { + { + // first entry is .keep, an empty regular file. + let entry = subdir_reader + .next() + .await + .expect("next must succeed") + .expect("must be some"); + + must_read_file(".keep", entry).await; + } + + // we must read the None + assert!( + subdir_reader + .next() + .await + .expect("next must succeed") + .is_none(), + "keep directory contains only .keep" + ); + } + _ => panic!("unexpected type for keep/.keep"), + } + }; + + // reading more entries yields None (and we actually must read until this) + assert!(dir_reader.next().await.expect("must succeed").is_none()); + } + _ => panic!("unexpected type"), + } +} + +#[tokio::test] +#[should_panic] +#[ignore = "TODO: async poisoning"] +async fn file_read_abandoned() { + let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar")); + let node = nar::reader::open(&mut f).await.unwrap(); + + match node { + nar::reader::Node::Directory(mut dir_reader) => { + // first entry is .keep, an empty regular file. + { + let entry = dir_reader + .next() + .await + .expect("next must succeed") + .expect("must be some"); + + assert_eq!(&b".keep"[..], &entry.name); + // don't bother to finish reading it. + }; + + // this should panic (not return an error), because we are meant to abandon the archive reader now. + assert!(dir_reader.next().await.expect("must succeed").is_none()); + } + _ => panic!("unexpected type"), + } +} + +#[tokio::test] +#[should_panic] +#[ignore = "TODO: async poisoning"] +async fn dir_read_abandoned() { + let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar")); + let node = nar::reader::open(&mut f).await.unwrap(); + + match node { + nar::reader::Node::Directory(mut dir_reader) => { + // first entry is .keep, an empty regular file. + must_read_file( + ".keep", + dir_reader + .next() + .await + .expect("next must succeed") + .expect("must be some"), + ) + .await; + + // second entry is aa, a symlink to /nix/store/somewhereelse + must_be_symlink( + "aa", + "/nix/store/somewhereelse", + dir_reader + .next() + .await + .expect("next must be some") + .expect("must be some"), + ); + + { + // third entry is a directory called "keep" + let entry = dir_reader + .next() + .await + .expect("next must be some") + .expect("must be some"); + + assert_eq!(&b"keep"[..], &entry.name); + + match entry.node { + nar::reader::Node::Directory(_) => { + // don't finish using it, which poisons the archive reader + } + _ => panic!("unexpected type for keep/.keep"), + } + }; + + // this should panic, because we didn't finish reading the child subdirectory + assert!(dir_reader.next().await.expect("must succeed").is_none()); + } + _ => panic!("unexpected type"), + } +} + +#[tokio::test] +#[should_panic] +#[ignore = "TODO: async poisoning"] +async fn dir_read_after_none() { + let mut f = std::io::Cursor::new(include_bytes!("../../tests/complicated.nar")); + let node = nar::reader::open(&mut f).await.unwrap(); + + match node { + nar::reader::Node::Directory(mut dir_reader) => { + // first entry is .keep, an empty regular file. + must_read_file( + ".keep", + dir_reader + .next() + .await + .expect("next must succeed") + .expect("must be some"), + ) + .await; + + // second entry is aa, a symlink to /nix/store/somewhereelse + must_be_symlink( + "aa", + "/nix/store/somewhereelse", + dir_reader + .next() + .await + .expect("next must be some") + .expect("must be some"), + ); + + { + // third entry is a directory called "keep" + let entry = dir_reader + .next() + .await + .expect("next must be some") + .expect("must be some"); + + assert_eq!(&b"keep"[..], &entry.name); + + match entry.node { + nar::reader::Node::Directory(mut subdir_reader) => { + // first entry is .keep, an empty regular file. + must_read_file( + ".keep", + subdir_reader + .next() + .await + .expect("next must succeed") + .expect("must be some"), + ) + .await; + + // we must read the None + assert!( + subdir_reader + .next() + .await + .expect("next must succeed") + .is_none(), + "keep directory contains only .keep" + ); + } + _ => panic!("unexpected type for keep/.keep"), + } + }; + + // reading more entries yields None (and we actually must read until this) + assert!(dir_reader.next().await.expect("must succeed").is_none()); + + // this should panic, because we already got a none so we're meant to stop. + dir_reader.next().await.unwrap(); + unreachable!() + } + _ => panic!("unexpected type"), + } +} + +async fn must_read_file(name: &'static str, entry: nar::reader::Entry<'_, '_>) { + assert_eq!(name.as_bytes(), &entry.name); + + match entry.node { + nar::reader::Node::File { + executable, + mut reader, + } => { + assert!(!executable); + assert_eq!(reader.read(&mut [0]).await.unwrap(), 0); + } + _ => panic!("unexpected type for {}", name), + } +} + +fn must_be_symlink( + name: &'static str, + exp_target: &'static str, + entry: nar::reader::Entry<'_, '_>, +) { + assert_eq!(name.as_bytes(), &entry.name); + + match entry.node { + nar::reader::Node::Symlink { target } => { + assert_eq!(exp_target.as_bytes(), &target); + } + _ => panic!("unexpected type for {}", name), + } +} diff --git a/tvix/nix-compat/src/nar/reader/mod.rs b/tvix/nix-compat/src/nar/reader/mod.rs index fa7ddc77f9..bddf175080 100644 --- a/tvix/nix-compat/src/nar/reader/mod.rs +++ b/tvix/nix-compat/src/nar/reader/mod.rs @@ -10,19 +10,50 @@ use std::io::{ Read, Write, }; +#[cfg(not(debug_assertions))] +use std::marker::PhantomData; + // Required reading for understanding this module. use crate::nar::wire; +#[cfg(feature = "async")] +pub mod r#async; + mod read; #[cfg(test)] mod test; pub type Reader<'a> = dyn BufRead + Send + 'a; +struct ArchiveReader<'a, 'r> { + inner: &'a mut Reader<'r>, + + /// In debug mode, also track when we need to abandon this archive reader. + /// The archive reader must be abandoned when: + /// * An error is encountered at any point + /// * A file or directory reader is dropped before being read entirely. + /// All of these checks vanish in release mode. + status: ArchiveReaderStatus<'a>, +} + +macro_rules! try_or_poison { + ($it:expr, $ex:expr) => { + match $ex { + Ok(x) => x, + Err(e) => { + $it.status.poison(); + return Err(e.into()); + } + } + }; +} /// Start reading a NAR file from `reader`. pub fn open<'a, 'r>(reader: &'a mut Reader<'r>) -> io::Result<Node<'a, 'r>> { read::token(reader, &wire::TOK_NAR)?; - Node::new(reader) + Node::new(ArchiveReader { + inner: reader, + status: ArchiveReaderStatus::top(), + }) } pub enum Node<'a, 'r> { @@ -41,21 +72,24 @@ impl<'a, 'r> Node<'a, 'r> { /// /// Reading the terminating [wire::TOK_PAR] is done immediately for [Node::Symlink], /// but is otherwise left to [DirReader] or [FileReader]. - fn new(reader: &'a mut Reader<'r>) -> io::Result<Self> { - Ok(match read::tag(reader)? { + fn new(mut reader: ArchiveReader<'a, 'r>) -> io::Result<Self> { + Ok(match read::tag(reader.inner)? { wire::Node::Sym => { - let target = read::bytes(reader, wire::MAX_TARGET_LEN)?; + let target = + try_or_poison!(reader, read::bytes(reader.inner, wire::MAX_TARGET_LEN)); if target.is_empty() || target.contains(&0) { + reader.status.poison(); return Err(InvalidData.into()); } - read::token(reader, &wire::TOK_PAR)?; + try_or_poison!(reader, read::token(reader.inner, &wire::TOK_PAR)); + reader.status.ready_parent(); // Immediately allow reading from parent again Node::Symlink { target } } tag @ (wire::Node::Reg | wire::Node::Exe) => { - let len = read::u64(reader)?; + let len = try_or_poison!(&mut reader, read::u64(reader.inner)); Node::File { executable: tag == wire::Node::Exe, @@ -74,10 +108,8 @@ impl<'a, 'r> Node<'a, 'r> { /// * You must abandon the entire archive reader upon the first error. /// /// It's fine to read exactly `reader.len()` bytes without ever seeing an explicit EOF. -/// -/// TODO(edef): enforce these in `#[cfg(debug_assertions)]` pub struct FileReader<'a, 'r> { - reader: &'a mut Reader<'r>, + reader: ArchiveReader<'a, 'r>, len: u64, /// Truncated original file length for padding computation. /// We only care about the 3 least significant bits; semantically, this is a u3. @@ -87,12 +119,13 @@ pub struct FileReader<'a, 'r> { impl<'a, 'r> FileReader<'a, 'r> { /// Instantiate a new reader, starting after [wire::TOK_REG] or [wire::TOK_EXE]. /// We handle the terminating [wire::TOK_PAR] on semantic EOF. - fn new(reader: &'a mut Reader<'r>, len: u64) -> io::Result<Self> { + fn new(mut reader: ArchiveReader<'a, 'r>, len: u64) -> io::Result<Self> { // For zero-length files, we have to read the terminating TOK_PAR // immediately, since FileReader::read may never be called; we've // already reached semantic EOF by definition. if len == 0 { - read::token(reader, &wire::TOK_PAR)?; + read::token(reader.inner, &wire::TOK_PAR)?; + reader.status.ready_parent(); } Ok(Self { @@ -121,9 +154,12 @@ impl FileReader<'_, '_> { return Ok(&[]); } - let mut buf = self.reader.fill_buf()?; + self.reader.check_correct(); + + let mut buf = try_or_poison!(self.reader, self.reader.inner.fill_buf()); if buf.is_empty() { + self.reader.status.poison(); return Err(UnexpectedEof.into()); } @@ -141,12 +177,14 @@ impl FileReader<'_, '_> { return Ok(()); } + self.reader.check_correct(); + self.len = self .len .checked_sub(n as u64) .expect("consumed bytes past EOF"); - self.reader.consume(n); + self.reader.inner.consume(n); if self.is_empty() { self.finish()?; @@ -159,7 +197,7 @@ impl FileReader<'_, '_> { pub fn copy(&mut self, mut dst: impl Write) -> io::Result<()> { while !self.is_empty() { let buf = self.fill_buf()?; - let n = dst.write(buf)?; + let n = try_or_poison!(self.reader, dst.write(buf)); self.consume(n)?; } @@ -173,14 +211,17 @@ impl Read for FileReader<'_, '_> { return Ok(0); } + self.reader.check_correct(); + if buf.len() as u64 > self.len { buf = &mut buf[..self.len as usize]; } - let n = self.reader.read(buf)?; + let n = try_or_poison!(self.reader, self.reader.inner.read(buf)); self.len -= n as u64; if n == 0 { + self.reader.status.poison(); return Err(UnexpectedEof.into()); } @@ -200,21 +241,27 @@ impl FileReader<'_, '_> { if pad != 0 { let mut buf = [0; 8]; - self.reader.read_exact(&mut buf[pad..])?; + try_or_poison!(self.reader, self.reader.inner.read_exact(&mut buf[pad..])); if buf != [0; 8] { + self.reader.status.poison(); return Err(InvalidData.into()); } } - read::token(self.reader, &wire::TOK_PAR) + try_or_poison!(self.reader, read::token(self.reader.inner, &wire::TOK_PAR)); + + // Done with reading this file, allow going back up the chain of readers + self.reader.status.ready_parent(); + + Ok(()) } } /// A directory iterator, yielding a sequence of [Node]s. /// It must be fully consumed before reading further from the [DirReader] that produced it, if any. pub struct DirReader<'a, 'r> { - reader: &'a mut Reader<'r>, + reader: ArchiveReader<'a, 'r>, /// Previous directory entry name. /// We have to hang onto this to enforce name monotonicity. prev_name: Option<Vec<u8>>, @@ -226,7 +273,7 @@ pub struct Entry<'a, 'r> { } impl<'a, 'r> DirReader<'a, 'r> { - fn new(reader: &'a mut Reader<'r>) -> Self { + fn new(reader: ArchiveReader<'a, 'r>) -> Self { Self { reader, prev_name: None, @@ -242,23 +289,27 @@ impl<'a, 'r> DirReader<'a, 'r> { /// * You must abandon the entire archive reader on the first error. /// * You must abandon the directory reader upon the first [None]. /// * Even if you know the amount of elements up front, you must keep reading until you encounter [None]. - /// - /// TODO(edef): enforce these in `#[cfg(debug_assertions)]` #[allow(clippy::should_implement_trait)] - pub fn next(&mut self) -> io::Result<Option<Entry>> { + pub fn next(&mut self) -> io::Result<Option<Entry<'_, 'r>>> { + self.reader.check_correct(); + // COME FROM the previous iteration: if we've already read an entry, // read its terminating TOK_PAR here. if self.prev_name.is_some() { - read::token(self.reader, &wire::TOK_PAR)?; + try_or_poison!(self.reader, read::token(self.reader.inner, &wire::TOK_PAR)); } // Determine if there are more entries to follow - if let wire::Entry::None = read::tag(self.reader)? { + if let wire::Entry::None = try_or_poison!(self.reader, read::tag(self.reader.inner)) { // We've reached the end of this directory. + self.reader.status.ready_parent(); return Ok(None); } - let name = read::bytes(self.reader, wire::MAX_NAME_LEN)?; + let name = try_or_poison!( + self.reader, + read::bytes(self.reader.inner, wire::MAX_NAME_LEN) + ); if name.is_empty() || name.contains(&0) @@ -266,6 +317,7 @@ impl<'a, 'r> DirReader<'a, 'r> { || name == b"." || name == b".." { + self.reader.status.poison(); return Err(InvalidData.into()); } @@ -276,6 +328,7 @@ impl<'a, 'r> DirReader<'a, 'r> { } Some(prev_name) => { if *prev_name >= name { + self.reader.status.poison(); return Err(InvalidData.into()); } @@ -283,11 +336,147 @@ impl<'a, 'r> DirReader<'a, 'r> { } } - read::token(self.reader, &wire::TOK_NOD)?; + try_or_poison!(self.reader, read::token(self.reader.inner, &wire::TOK_NOD)); Ok(Some(Entry { name, - node: Node::new(&mut self.reader)?, + // Don't need to worry about poisoning here: Node::new will do it for us if needed + node: Node::new(self.reader.child())?, })) } } + +/// We use a stack of statuses to: +/// * Share poisoned state across all objects from the same underlying reader, +/// so we can check they are abandoned when an error occurs +/// * Make sure only the most recently created object is read from, and is fully exhausted +/// before anything it was created from is used again. +enum ArchiveReaderStatus<'a> { + #[cfg(not(debug_assertions))] + None(PhantomData<&'a ()>), + #[cfg(debug_assertions)] + StackTop { poisoned: bool, ready: bool }, + #[cfg(debug_assertions)] + StackChild { + poisoned: &'a mut bool, + parent_ready: &'a mut bool, + ready: bool, + }, +} + +impl ArchiveReaderStatus<'_> { + fn top() -> Self { + #[cfg(debug_assertions)] + { + ArchiveReaderStatus::StackTop { + poisoned: false, + ready: true, + } + } + + #[cfg(not(debug_assertions))] + ArchiveReaderStatus::None(PhantomData) + } + + /// Poison all the objects sharing the same reader, to be used when an error occurs + fn poison(&mut self) { + match self { + #[cfg(not(debug_assertions))] + ArchiveReaderStatus::None(_) => {} + #[cfg(debug_assertions)] + ArchiveReaderStatus::StackTop { poisoned: x, .. } => *x = true, + #[cfg(debug_assertions)] + ArchiveReaderStatus::StackChild { poisoned: x, .. } => **x = true, + } + } + + /// Mark the parent as ready, allowing it to be used again and preventing this reference to the reader being used again. + fn ready_parent(&mut self) { + match self { + #[cfg(not(debug_assertions))] + ArchiveReaderStatus::None(_) => {} + #[cfg(debug_assertions)] + ArchiveReaderStatus::StackTop { ready, .. } => { + *ready = false; + } + #[cfg(debug_assertions)] + ArchiveReaderStatus::StackChild { + ready, + parent_ready, + .. + } => { + *ready = false; + **parent_ready = true; + } + }; + } + + fn poisoned(&self) -> bool { + match self { + #[cfg(not(debug_assertions))] + ArchiveReaderStatus::None(_) => false, + #[cfg(debug_assertions)] + ArchiveReaderStatus::StackTop { poisoned, .. } => *poisoned, + #[cfg(debug_assertions)] + ArchiveReaderStatus::StackChild { poisoned, .. } => **poisoned, + } + } + + fn ready(&self) -> bool { + match self { + #[cfg(not(debug_assertions))] + ArchiveReaderStatus::None(_) => true, + #[cfg(debug_assertions)] + ArchiveReaderStatus::StackTop { ready, .. } => *ready, + #[cfg(debug_assertions)] + ArchiveReaderStatus::StackChild { ready, .. } => *ready, + } + } +} + +impl<'a, 'r> ArchiveReader<'a, 'r> { + /// Create a new child reader from this one. + /// In debug mode, this reader will panic if called before the new child is exhausted / calls `ready_parent` + fn child(&mut self) -> ArchiveReader<'_, 'r> { + ArchiveReader { + inner: self.inner, + #[cfg(not(debug_assertions))] + status: ArchiveReaderStatus::None(PhantomData), + #[cfg(debug_assertions)] + status: match &mut self.status { + ArchiveReaderStatus::StackTop { poisoned, ready } => { + *ready = false; + ArchiveReaderStatus::StackChild { + poisoned, + parent_ready: ready, + ready: true, + } + } + ArchiveReaderStatus::StackChild { + poisoned, ready, .. + } => { + *ready = false; + ArchiveReaderStatus::StackChild { + poisoned, + parent_ready: ready, + ready: true, + } + } + }, + } + } + + /// Check the reader is in the correct status. + /// Only does anything when debug assertions are on. + #[inline(always)] + fn check_correct(&self) { + assert!( + !self.status.poisoned(), + "Archive reader used after it was meant to be abandoned!" + ); + assert!( + self.status.ready(), + "Non-ready archive reader used! (Should've been reading from something else)" + ); + } +} diff --git a/tvix/nix-compat/src/nar/reader/test.rs b/tvix/nix-compat/src/nar/reader/test.rs index fd0d6a9f5a..02dc4767c9 100644 --- a/tvix/nix-compat/src/nar/reader/test.rs +++ b/tvix/nix-compat/src/nar/reader/test.rs @@ -46,75 +46,233 @@ fn complicated() { match node { nar::reader::Node::Directory(mut dir_reader) => { // first entry is .keep, an empty regular file. - let entry = dir_reader - .next() - .expect("next must succeed") - .expect("must be some"); - - assert_eq!(&b".keep"[..], &entry.name); - - match entry.node { - nar::reader::Node::File { - executable, - mut reader, - } => { - assert!(!executable); - assert_eq!(reader.read(&mut [0]).unwrap(), 0); + must_read_file( + ".keep", + dir_reader + .next() + .expect("next must succeed") + .expect("must be some"), + ); + + // second entry is aa, a symlink to /nix/store/somewhereelse + must_be_symlink( + "aa", + "/nix/store/somewhereelse", + dir_reader + .next() + .expect("next must be some") + .expect("must be some"), + ); + + { + // third entry is a directory called "keep" + let entry = dir_reader + .next() + .expect("next must be some") + .expect("must be some"); + + assert_eq!(&b"keep"[..], &entry.name); + + match entry.node { + nar::reader::Node::Directory(mut subdir_reader) => { + { + // first entry is .keep, an empty regular file. + let entry = subdir_reader + .next() + .expect("next must succeed") + .expect("must be some"); + + must_read_file(".keep", entry); + } + + // we must read the None + assert!( + subdir_reader.next().expect("next must succeed").is_none(), + "keep directory contains only .keep" + ); + } + _ => panic!("unexpected type for keep/.keep"), } - _ => panic!("unexpected type for .keep"), - } + }; + + // reading more entries yields None (and we actually must read until this) + assert!(dir_reader.next().expect("must succeed").is_none()); + } + _ => panic!("unexpected type"), + } +} + +#[test] +#[should_panic] +fn file_read_abandoned() { + let mut f = std::io::Cursor::new(include_bytes!("../tests/complicated.nar")); + let node = nar::reader::open(&mut f).unwrap(); + + match node { + nar::reader::Node::Directory(mut dir_reader) => { + // first entry is .keep, an empty regular file. + { + let entry = dir_reader + .next() + .expect("next must succeed") + .expect("must be some"); + + assert_eq!(&b".keep"[..], &entry.name); + // don't bother to finish reading it. + }; + + // this should panic (not return an error), because we are meant to abandon the archive reader now. + assert!(dir_reader.next().expect("must succeed").is_none()); + } + _ => panic!("unexpected type"), + } +} + +#[test] +#[should_panic] +fn dir_read_abandoned() { + let mut f = std::io::Cursor::new(include_bytes!("../tests/complicated.nar")); + let node = nar::reader::open(&mut f).unwrap(); + + match node { + nar::reader::Node::Directory(mut dir_reader) => { + // first entry is .keep, an empty regular file. + must_read_file( + ".keep", + dir_reader + .next() + .expect("next must succeed") + .expect("must be some"), + ); // second entry is aa, a symlink to /nix/store/somewhereelse - let entry = dir_reader - .next() - .expect("next must be some") - .expect("must be some"); + must_be_symlink( + "aa", + "/nix/store/somewhereelse", + dir_reader + .next() + .expect("next must be some") + .expect("must be some"), + ); - assert_eq!(&b"aa"[..], &entry.name); + { + // third entry is a directory called "keep" + let entry = dir_reader + .next() + .expect("next must be some") + .expect("must be some"); - match entry.node { - nar::reader::Node::Symlink { target } => { - assert_eq!(&b"/nix/store/somewhereelse"[..], &target); + assert_eq!(&b"keep"[..], &entry.name); + + match entry.node { + nar::reader::Node::Directory(_) => { + // don't finish using it, which poisons the archive reader + } + _ => panic!("unexpected type for keep/.keep"), } - _ => panic!("unexpected type for aa"), - } - - // third entry is a directory called "keep" - let entry = dir_reader - .next() - .expect("next must be some") - .expect("must be some"); - - assert_eq!(&b"keep"[..], &entry.name); - - match entry.node { - nar::reader::Node::Directory(mut subdir_reader) => { - // first entry is .keep, an empty regular file. - let entry = subdir_reader - .next() - .expect("next must succeed") - .expect("must be some"); - - // โฆ it contains a single .keep, an empty regular file. - assert_eq!(&b".keep"[..], &entry.name); - - match entry.node { - nar::reader::Node::File { - executable, - mut reader, - } => { - assert!(!executable); - assert_eq!(reader.read(&mut [0]).unwrap(), 0); - } - _ => panic!("unexpected type for keep/.keep"), + }; + + // this should panic, because we didn't finish reading the child subdirectory + assert!(dir_reader.next().expect("must succeed").is_none()); + } + _ => panic!("unexpected type"), + } +} + +#[test] +#[should_panic] +fn dir_read_after_none() { + let mut f = std::io::Cursor::new(include_bytes!("../tests/complicated.nar")); + let node = nar::reader::open(&mut f).unwrap(); + + match node { + nar::reader::Node::Directory(mut dir_reader) => { + // first entry is .keep, an empty regular file. + must_read_file( + ".keep", + dir_reader + .next() + .expect("next must succeed") + .expect("must be some"), + ); + + // second entry is aa, a symlink to /nix/store/somewhereelse + must_be_symlink( + "aa", + "/nix/store/somewhereelse", + dir_reader + .next() + .expect("next must be some") + .expect("must be some"), + ); + + { + // third entry is a directory called "keep" + let entry = dir_reader + .next() + .expect("next must be some") + .expect("must be some"); + + assert_eq!(&b"keep"[..], &entry.name); + + match entry.node { + nar::reader::Node::Directory(mut subdir_reader) => { + // first entry is .keep, an empty regular file. + must_read_file( + ".keep", + subdir_reader + .next() + .expect("next must succeed") + .expect("must be some"), + ); + + // we must read the None + assert!( + subdir_reader.next().expect("next must succeed").is_none(), + "keep directory contains only .keep" + ); } + _ => panic!("unexpected type for keep/.keep"), } - _ => panic!("unexpected type for keep/.keep"), - } + }; // reading more entries yields None (and we actually must read until this) assert!(dir_reader.next().expect("must succeed").is_none()); + + // this should panic, because we already got a none so we're meant to stop. + dir_reader.next().unwrap(); + unreachable!() } _ => panic!("unexpected type"), } } + +fn must_read_file(name: &'static str, entry: nar::reader::Entry<'_, '_>) { + assert_eq!(name.as_bytes(), &entry.name); + + match entry.node { + nar::reader::Node::File { + executable, + mut reader, + } => { + assert!(!executable); + assert_eq!(reader.read(&mut [0]).unwrap(), 0); + } + _ => panic!("unexpected type for {}", name), + } +} + +fn must_be_symlink( + name: &'static str, + exp_target: &'static str, + entry: nar::reader::Entry<'_, '_>, +) { + assert_eq!(name.as_bytes(), &entry.name); + + match entry.node { + nar::reader::Node::Symlink { target } => { + assert_eq!(exp_target.as_bytes(), &target); + } + _ => panic!("unexpected type for {}", name), + } +} diff --git a/tvix/nix-compat/src/nar/wire/mod.rs b/tvix/nix-compat/src/nar/wire/mod.rs index b9e0212495..9e99b530ce 100644 --- a/tvix/nix-compat/src/nar/wire/mod.rs +++ b/tvix/nix-compat/src/nar/wire/mod.rs @@ -90,6 +90,23 @@ pub const TOK_DIR: [u8; 24] = *b"\x09\0\0\0\0\0\0\0directory\0\0\0\0\0\0\0"; pub const TOK_ENT: [u8; 48] = *b"\x05\0\0\0\0\0\0\0entry\0\0\0\x01\0\0\0\0\0\0\0(\0\0\0\0\0\0\0\x04\0\0\0\0\0\0\0name\0\0\0\0"; pub const TOK_NOD: [u8; 48] = *b"\x04\0\0\0\0\0\0\0node\0\0\0\0\x01\0\0\0\0\0\0\0(\0\0\0\0\0\0\0\x04\0\0\0\0\0\0\0type\0\0\0\0"; pub const TOK_PAR: [u8; 16] = *b"\x01\0\0\0\0\0\0\0)\0\0\0\0\0\0\0"; +#[cfg(feature = "async")] +const TOK_PAD_PAR: [u8; 24] = *b"\0\0\0\0\0\0\0\0\x01\0\0\0\0\0\0\0)\0\0\0\0\0\0\0"; + +#[cfg(feature = "async")] +#[derive(Debug)] +pub(crate) enum PadPar {} + +#[cfg(feature = "async")] +impl crate::wire::reader::Tag for PadPar { + const PATTERN: &'static [u8] = &TOK_PAD_PAR; + + type Buf = [u8; 24]; + + fn make_buf() -> Self::Buf { + [0; 24] + } +} #[test] fn tokens() { diff --git a/tvix/nix-compat/src/nar/wire/tag.rs b/tvix/nix-compat/src/nar/wire/tag.rs index 55b93f9985..4982a0d707 100644 --- a/tvix/nix-compat/src/nar/wire/tag.rs +++ b/tvix/nix-compat/src/nar/wire/tag.rs @@ -10,6 +10,7 @@ pub trait Tag: Sized { const MIN: usize; /// Minimal suitably sized buffer for reading the wire representation + /// /// HACK: This is a workaround for const generics limitations. type Buf: AsMut<[u8]> + Send; diff --git a/tvix/nix-compat/src/nix_daemon/worker_protocol.rs b/tvix/nix-compat/src/nix_daemon/worker_protocol.rs index 58a48d1bdd..7e3adc0db2 100644 --- a/tvix/nix-compat/src/nix_daemon/worker_protocol.rs +++ b/tvix/nix-compat/src/nix_daemon/worker_protocol.rs @@ -15,13 +15,34 @@ static WORKER_MAGIC_1: u64 = 0x6e697863; // "nixc" static WORKER_MAGIC_2: u64 = 0x6478696f; // "dxio" pub static STDERR_LAST: u64 = 0x616c7473; // "alts" +/// | Nix version | Protocol | +/// |-----------------|----------| +/// | 0.11 | 1.02 | +/// | 0.12 | 1.04 | +/// | 0.13 | 1.05 | +/// | 0.14 | 1.05 | +/// | 0.15 | 1.05 | +/// | 0.16 | 1.06 | +/// | 1.0 | 1.10 | +/// | 1.1 | 1.11 | +/// | 1.2 | 1.12 | +/// | 1.3 - 1.5.3 | 1.13 | +/// | 1.6 - 1.10 | 1.14 | +/// | 1.11 - 1.11.16 | 1.15 | +/// | 2.0 - 2.0.4 | 1.20 | +/// | 2.1 - 2.3.18 | 1.21 | +/// | 2.4 - 2.6.1 | 1.32 | +/// | 2.7.0 | 1.33 | +/// | 2.8.0 - 2.14.1 | 1.34 | +/// | 2.15.0 - 2.19.4 | 1.35 | +/// | 2.20.0 - 2.22.0 | 1.37 | static PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion::from_parts(1, 37); /// Max length of a Nix setting name/value. In bytes. /// /// This value has been arbitrarily choosen after looking the nix.conf /// manpage. Don't hesitate to increase it if it's too limiting. -pub static MAX_SETTING_SIZE: u64 = 1024; +pub static MAX_SETTING_SIZE: usize = 1024; /// Worker Operation /// @@ -131,30 +152,30 @@ pub async fn read_client_settings<R: AsyncReadExt + Unpin>( r: &mut R, client_version: ProtocolVersion, ) -> std::io::Result<ClientSettings> { - let keep_failed = wire::read_bool(r).await?; - let keep_going = wire::read_bool(r).await?; - let try_fallback = wire::read_bool(r).await?; - let verbosity_uint = wire::read_u64(r).await?; + let keep_failed = r.read_u64_le().await? != 0; + let keep_going = r.read_u64_le().await? != 0; + let try_fallback = r.read_u64_le().await? != 0; + let verbosity_uint = r.read_u64_le().await?; let verbosity = Verbosity::from_u64(verbosity_uint).ok_or_else(|| { Error::new( ErrorKind::InvalidData, format!("Can't convert integer {} to verbosity", verbosity_uint), ) })?; - let max_build_jobs = wire::read_u64(r).await?; - let max_silent_time = wire::read_u64(r).await?; - _ = wire::read_u64(r).await?; // obsolete useBuildHook - let verbose_build = wire::read_bool(r).await?; - _ = wire::read_u64(r).await?; // obsolete logType - _ = wire::read_u64(r).await?; // obsolete printBuildTrace - let build_cores = wire::read_u64(r).await?; - let use_substitutes = wire::read_bool(r).await?; + let max_build_jobs = r.read_u64_le().await?; + let max_silent_time = r.read_u64_le().await?; + _ = r.read_u64_le().await?; // obsolete useBuildHook + let verbose_build = r.read_u64_le().await? != 0; + _ = r.read_u64_le().await?; // obsolete logType + _ = r.read_u64_le().await?; // obsolete printBuildTrace + let build_cores = r.read_u64_le().await?; + let use_substitutes = r.read_u64_le().await? != 0; let mut overrides = HashMap::new(); if client_version.minor() >= 12 { - let num_overrides = wire::read_u64(r).await?; + let num_overrides = r.read_u64_le().await?; for _ in 0..num_overrides { - let name = wire::read_string(r, 0..MAX_SETTING_SIZE).await?; - let value = wire::read_string(r, 0..MAX_SETTING_SIZE).await?; + let name = wire::read_string(r, 0..=MAX_SETTING_SIZE).await?; + let value = wire::read_string(r, 0..=MAX_SETTING_SIZE).await?; overrides.insert(name, value); } } @@ -197,17 +218,17 @@ pub async fn server_handshake_client<'a, RW: 'a>( where &'a mut RW: AsyncReadExt + AsyncWriteExt + Unpin, { - let worker_magic_1 = wire::read_u64(&mut conn).await?; + let worker_magic_1 = conn.read_u64_le().await?; if worker_magic_1 != WORKER_MAGIC_1 { Err(std::io::Error::new( ErrorKind::InvalidData, format!("Incorrect worker magic number received: {}", worker_magic_1), )) } else { - wire::write_u64(&mut conn, WORKER_MAGIC_2).await?; - wire::write_u64(&mut conn, PROTOCOL_VERSION.into()).await?; + conn.write_u64_le(WORKER_MAGIC_2).await?; + conn.write_u64_le(PROTOCOL_VERSION.into()).await?; conn.flush().await?; - let client_version = wire::read_u64(&mut conn).await?; + let client_version = conn.read_u64_le().await?; // Parse into ProtocolVersion. let client_version: ProtocolVersion = client_version .try_into() @@ -220,14 +241,14 @@ where } if client_version.minor() >= 14 { // Obsolete CPU affinity. - let read_affinity = wire::read_u64(&mut conn).await?; + let read_affinity = conn.read_u64_le().await?; if read_affinity != 0 { - let _cpu_affinity = wire::read_u64(&mut conn).await?; + let _cpu_affinity = conn.read_u64_le().await?; }; } if client_version.minor() >= 11 { // Obsolete reserveSpace - let _reserve_space = wire::read_u64(&mut conn).await?; + let _reserve_space = conn.read_u64_le().await?; } if client_version.minor() >= 33 { // Nix version. We're plain lying, we're not Nix, but ehโฆ @@ -245,7 +266,7 @@ where /// Read a worker [Operation] from the wire. pub async fn read_op<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<Operation> { - let op_number = wire::read_u64(r).await?; + let op_number = r.read_u64_le().await?; Operation::from_u64(op_number).ok_or(Error::new( ErrorKind::InvalidData, format!("Invalid OP number {}", op_number), @@ -278,8 +299,8 @@ where W: AsyncReadExt + AsyncWriteExt + Unpin, { match t { - Trust::Trusted => wire::write_u64(conn, 1).await, - Trust::NotTrusted => wire::write_u64(conn, 2).await, + Trust::Trusted => conn.write_u64_le(1).await, + Trust::NotTrusted => conn.write_u64_le(2).await, } } diff --git a/tvix/nix-compat/src/store_path/mod.rs b/tvix/nix-compat/src/store_path/mod.rs index ac9f1805e3..ff7ede77e1 100644 --- a/tvix/nix-compat/src/store_path/mod.rs +++ b/tvix/nix-compat/src/store_path/mod.rs @@ -303,8 +303,7 @@ impl Serialize for StorePathRef<'_> { } } -/// NAME_CHARS contains `true` for bytes that are valid in store path names, -/// not accounting for '.' being permitted only past the first character. +/// NAME_CHARS contains `true` for bytes that are valid in store path names. static NAME_CHARS: [bool; 256] = { let mut tbl = [false; 256]; let mut c = 0; @@ -332,10 +331,6 @@ pub(crate) fn validate_name(s: &(impl AsRef<[u8]> + ?Sized)) -> Result<&str, Err return Err(Error::InvalidLength); } - if s[0] == b'.' { - return Err(Error::InvalidName(s.to_vec(), 0)); - } - let mut valid = true; for &c in s { valid = valid && NAME_CHARS[c as usize]; @@ -446,15 +441,18 @@ mod tests { } } - /// This is the store path rejected when `nix-store --add`'ing an + /// This is the store path *accepted* when `nix-store --add`'ing an /// empty `.gitignore` file. /// - /// Nix 2.4 accidentally dropped this behaviour, but this is considered a bug. - /// See https://github.com/NixOS/nix/pull/9095. + /// Nix 2.4 accidentally permitted this behaviour, but the revert came + /// too late to beat Hyrum's law. It is now considered permissible. + /// + /// https://github.com/NixOS/nix/pull/9095 (revert) + /// https://github.com/NixOS/nix/pull/9867 (revert-of-revert) #[test] fn starts_with_dot() { StorePath::from_bytes(b"fli4bwscgna7lpm7v5xgnjxrxh0yc7ra-.gitignore") - .expect_err("must fail"); + .expect("must succeed"); } #[test] diff --git a/tvix/nix-compat/src/wire/bytes/mod.rs b/tvix/nix-compat/src/wire/bytes/mod.rs index 9ec8b3fa04..5ed5e15a64 100644 --- a/tvix/nix-compat/src/wire/bytes/mod.rs +++ b/tvix/nix-compat/src/wire/bytes/mod.rs @@ -1,16 +1,14 @@ use std::{ io::{Error, ErrorKind}, - ops::RangeBounds, + ops::RangeInclusive, }; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -mod reader; +pub(crate) mod reader; pub use reader::BytesReader; mod writer; pub use writer::BytesWriter; -use super::primitive; - /// 8 null bytes, used to write out padding. const EMPTY_BYTES: &[u8; 8] = &[0u8; 8]; @@ -35,24 +33,29 @@ const LEN_SIZE: usize = 8; /// /// This buffers the entire payload into memory, /// a streaming version is available at [crate::wire::bytes::BytesReader]. -pub async fn read_bytes<R, S>(r: &mut R, allowed_size: S) -> std::io::Result<Vec<u8>> +pub async fn read_bytes<R: ?Sized>( + r: &mut R, + allowed_size: RangeInclusive<usize>, +) -> std::io::Result<Vec<u8>> where R: AsyncReadExt + Unpin, - S: RangeBounds<u64>, { // read the length field - let len = primitive::read_u64(r).await?; - - if !allowed_size.contains(&len) { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "signalled package size not in allowed range", - )); - } + let len = r.read_u64_le().await?; + let len: usize = len + .try_into() + .ok() + .filter(|len| allowed_size.contains(len)) + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "signalled package size not in allowed range", + ) + })?; // calculate the total length, including padding. // byte packets are padded to 8 byte blocks each. - let padded_len = padding_len(len) as u64 + (len as u64); + let padded_len = padding_len(len as u64) as u64 + (len as u64); let mut limited_reader = r.take(padded_len); let mut buf = Vec::new(); @@ -61,13 +64,10 @@ where // make sure we got exactly the number of bytes, and not less. if s as u64 != padded_len { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "got less bytes than expected", - )); + return Err(std::io::ErrorKind::UnexpectedEof.into()); } - let (_content, padding) = buf.split_at(len as usize); + let (_content, padding) = buf.split_at(len); // ensure the padding is all zeroes. if !padding.iter().all(|e| *e == b'\0') { @@ -78,17 +78,19 @@ where } // return the data without the padding - buf.truncate(len as usize); + buf.truncate(len); Ok(buf) } /// Read a "bytes wire packet" of from the AsyncRead and tries to parse as string. /// Internally uses [read_bytes]. /// Rejects reading more than `allowed_size` bytes of payload. -pub async fn read_string<R, S>(r: &mut R, allowed_size: S) -> std::io::Result<String> +pub async fn read_string<R>( + r: &mut R, + allowed_size: RangeInclusive<usize>, +) -> std::io::Result<String> where R: AsyncReadExt + Unpin, - S: RangeBounds<u64>, { let bytes = read_bytes(r, allowed_size).await?; String::from_utf8(bytes).map_err(|e| Error::new(ErrorKind::InvalidData, e)) @@ -108,7 +110,7 @@ pub async fn write_bytes<W: AsyncWriteExt + Unpin, B: AsRef<[u8]>>( b: B, ) -> std::io::Result<()> { // write the size packet. - primitive::write_u64(w, b.as_ref().len() as u64).await?; + w.write_u64_le(b.as_ref().len() as u64).await?; // write the payload w.write_all(b.as_ref()).await?; @@ -122,33 +124,10 @@ pub async fn write_bytes<W: AsyncWriteExt + Unpin, B: AsRef<[u8]>>( } /// Computes the number of bytes we should add to len (a length in -/// bytes) to be alined on 64 bits (8 bytes). +/// bytes) to be aligned on 64 bits (8 bytes). fn padding_len(len: u64) -> u8 { - let modulo = len % 8; - if modulo == 0 { - 0 - } else { - 8 - modulo as u8 - } -} - -/// Models the position inside a "bytes wire packet" that the reader or writer -/// is in. -/// It can be in three different stages, inside size, payload or padding fields. -/// The number tracks the number of bytes written inside the specific field. -/// There shall be no ambiguous states, at the end of a stage we immediately -/// move to the beginning of the next one: -/// - Size(LEN_SIZE) must be expressed as Payload(0) -/// - Payload(self.payload_len) must be expressed as Padding(0) -/// There's one exception - Size(LEN_SIZE) in the reader represents a failure -/// state we enter in case the allowed size doesn't match the allowed range. -/// -/// Padding(padding_len) means we're at the end of the bytes wire packet. -#[derive(Clone, Debug, PartialEq, Eq)] -enum BytesPacketPosition { - Size(usize), - Payload(u64), - Padding(usize), + let aligned = len.wrapping_add(7) & !7; + aligned.wrapping_sub(len) as u8 } #[cfg(test)] @@ -160,7 +139,7 @@ mod tests { /// The maximum length of bytes packets we're willing to accept in the test /// cases. - const MAX_LEN: u64 = 1024; + const MAX_LEN: usize = 1024; #[tokio::test] async fn test_read_8_bytes() { @@ -171,10 +150,7 @@ mod tests { assert_eq!( &12345678u64.to_le_bytes(), - read_bytes(&mut mock, 0u64..MAX_LEN) - .await - .unwrap() - .as_slice() + read_bytes(&mut mock, 0..=MAX_LEN).await.unwrap().as_slice() ); } @@ -187,10 +163,7 @@ mod tests { assert_eq!( hex!("010203040506070809"), - read_bytes(&mut mock, 0u64..MAX_LEN) - .await - .unwrap() - .as_slice() + read_bytes(&mut mock, 0..=MAX_LEN).await.unwrap().as_slice() ); } @@ -202,10 +175,7 @@ mod tests { assert_eq!( hex!(""), - read_bytes(&mut mock, 0u64..MAX_LEN) - .await - .unwrap() - .as_slice() + read_bytes(&mut mock, 0..=MAX_LEN).await.unwrap().as_slice() ); } @@ -215,7 +185,7 @@ mod tests { async fn test_read_reject_too_large() { let mut mock = Builder::new().read(&100u64.to_le_bytes()).build(); - read_bytes(&mut mock, 10..10) + read_bytes(&mut mock, 10..=10) .await .expect_err("expect this to fail"); } @@ -251,4 +221,9 @@ mod tests { .build(); assert_ok!(write_bytes(&mut mock, &input).await) } + + #[test] + fn padding_len_u64_max() { + assert_eq!(padding_len(u64::MAX), 1); + } } diff --git a/tvix/nix-compat/src/wire/bytes/reader.rs b/tvix/nix-compat/src/wire/bytes/reader.rs deleted file mode 100644 index 9aea677645..0000000000 --- a/tvix/nix-compat/src/wire/bytes/reader.rs +++ /dev/null @@ -1,464 +0,0 @@ -use pin_project_lite::pin_project; -use std::{ - ops::RangeBounds, - task::{ready, Poll}, -}; -use tokio::io::AsyncRead; - -use super::{padding_len, BytesPacketPosition, LEN_SIZE}; - -pin_project! { - /// Reads a "bytes wire packet" from the underlying reader. - /// The format is the same as in [crate::wire::bytes::read_bytes], - /// however this structure provides a [AsyncRead] interface, - /// allowing to not having to pass around the entire payload in memory. - /// - /// After being constructed with the underlying reader and an allowed size, - /// subsequent requests to poll_read will return payload data until the end - /// of the packet is reached. - /// - /// Internally, it will first read over the size packet, filling payload_size, - /// ensuring it fits allowed_size, then return payload data. - /// It will only signal EOF (returning `Ok(())` without filling the buffer anymore) - /// when all padding has been successfully consumed too. - /// - /// This also means, it's important for a user to always read to the end, - /// and not just call read_exact - otherwise it might not skip over the - /// padding, and return garbage when reading the next packet. - /// - /// In case of an error due to size constraints, or in case of not reading - /// all the way to the end (and getting a EOF), the underlying reader is no - /// longer usable and might return garbage. - pub struct BytesReader<R, S> - where - R: AsyncRead, - S: RangeBounds<u64>, - - { - #[pin] - inner: R, - - allowed_size: S, - payload_size: [u8; 8], - state: BytesPacketPosition, - } -} - -impl<R, S> BytesReader<R, S> -where - R: AsyncRead + Unpin, - S: RangeBounds<u64>, -{ - /// Constructs a new BytesReader, using the underlying passed reader. - pub fn new(r: R, allowed_size: S) -> Self { - Self { - inner: r, - allowed_size, - payload_size: [0; 8], - state: BytesPacketPosition::Size(0), - } - } -} -/// Returns an error if the passed usize is 0. -#[inline] -fn ensure_nonzero_bytes_read(bytes_read: usize) -> Result<usize, std::io::Error> { - if bytes_read == 0 { - Err(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "underlying reader returned EOF", - )) - } else { - Ok(bytes_read) - } -} - -impl<R, S> AsyncRead for BytesReader<R, S> -where - R: AsyncRead, - S: RangeBounds<u64>, -{ - fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll<std::io::Result<()>> { - let mut this = self.project(); - - // Use a loop, so we can deal with (multiple) state transitions. - loop { - match *this.state { - BytesPacketPosition::Size(LEN_SIZE) => { - // used in case an invalid size was signalled. - Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "signalled package size not in allowed range", - ))? - } - BytesPacketPosition::Size(pos) => { - // try to read more of the size field. - // We wrap a ReadBuf around this.payload_size here, and set_filled. - let mut read_buf = tokio::io::ReadBuf::new(this.payload_size); - read_buf.advance(pos); - ready!(this.inner.as_mut().poll_read(cx, &mut read_buf))?; - - ensure_nonzero_bytes_read(read_buf.filled().len() - pos)?; - - let total_size_read = read_buf.filled().len(); - if total_size_read == LEN_SIZE { - // If the entire payload size was read, parse it - let payload_size = u64::from_le_bytes(*this.payload_size); - - if !this.allowed_size.contains(&payload_size) { - // If it's not in the allowed - // range, transition to failure mode - // `BytesPacketPosition::Size(LEN_SIZE)`, where only - // an error is returned. - *this.state = BytesPacketPosition::Size(LEN_SIZE) - } else if payload_size == 0 { - // If the payload size is 0, move on to reading padding directly. - *this.state = BytesPacketPosition::Padding(0) - } else { - // Else, transition to reading the payload. - *this.state = BytesPacketPosition::Payload(0) - } - } else { - // If we still need to read more of payload size, update - // our position in the state. - *this.state = BytesPacketPosition::Size(total_size_read) - } - } - BytesPacketPosition::Payload(pos) => { - let signalled_size = u64::from_le_bytes(*this.payload_size); - // We don't enter this match arm at all if we're expecting empty payload - debug_assert!(signalled_size > 0, "signalled size must be larger than 0"); - - // Read from the underlying reader into buf - // We cap the ReadBuf to the size of the payload, as we - // don't want to leak padding to the caller. - let bytes_read = ensure_nonzero_bytes_read({ - // Reducing these two u64 to usize on 32bits is fine - we - // only care about not reading too much, not too less. - let mut limited_buf = buf.take((signalled_size - pos) as usize); - ready!(this.inner.as_mut().poll_read(cx, &mut limited_buf))?; - limited_buf.filled().len() - })?; - - // SAFETY: we just did populate this, but through limited_buf. - unsafe { buf.assume_init(bytes_read) } - buf.advance(bytes_read); - - if pos + bytes_read as u64 == signalled_size { - // If we now read all payload, transition to padding - // state. - *this.state = BytesPacketPosition::Padding(0); - } else { - // if we didn't read everything yet, update our position - // in the state. - *this.state = BytesPacketPosition::Payload(pos + bytes_read as u64); - } - - // We return from poll_read here. - // This is important, as any error (or even Pending) from - // the underlying reader on the next read (be it padding or - // payload) would require us to roll back buf, as generally - // a AsyncRead::poll_read may not advance the buffer in case - // of a nonsuccessful read. - // It can't be misinterpreted as EOF, as we definitely *did* - // write something into buf if we come to here (we pass - // `ensure_nonzero_bytes_read`). - return Ok(()).into(); - } - BytesPacketPosition::Padding(pos) => { - // Consume whatever padding is left, ensuring it's all null - // bytes. Only return `Ready(Ok(()))` once we're past the - // padding (or in cases where polling the inner reader - // returns `Poll::Pending`). - let signalled_size = u64::from_le_bytes(*this.payload_size); - let total_padding_len = padding_len(signalled_size) as usize; - - let padding_len_remaining = total_padding_len - pos; - if padding_len_remaining != 0 { - // create a buffer only accepting the number of remaining padding bytes. - let mut buf = [0; 8]; - let mut padding_buf = tokio::io::ReadBuf::new(&mut buf); - let mut padding_buf = padding_buf.take(padding_len_remaining); - - // read into padding_buf. - ready!(this.inner.as_mut().poll_read(cx, &mut padding_buf))?; - let bytes_read = ensure_nonzero_bytes_read(padding_buf.filled().len())?; - - *this.state = BytesPacketPosition::Padding(pos + bytes_read); - - // ensure the bytes are not null bytes - if !padding_buf.filled().iter().all(|e| *e == b'\0') { - return Err(std::io::Error::new( - std::io::ErrorKind::InvalidData, - "padding is not all zeroes", - )) - .into(); - } - - // if we still have padding to read, run the loop again. - continue; - } - // return EOF - return Ok(()).into(); - } - } - } - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use crate::wire::bytes::write_bytes; - use hex_literal::hex; - use lazy_static::lazy_static; - use rstest::rstest; - use tokio::io::AsyncReadExt; - use tokio_test::{assert_err, io::Builder}; - - use super::*; - - /// The maximum length of bytes packets we're willing to accept in the test - /// cases. - const MAX_LEN: u64 = 1024; - - lazy_static! { - pub static ref LARGE_PAYLOAD: Vec<u8> = (0..255).collect::<Vec<u8>>().repeat(4 * 1024); - } - - /// Helper function, calling the (simpler) write_bytes with the payload. - /// We use this to create data we want to read from the wire. - async fn produce_packet_bytes(payload: &[u8]) -> Vec<u8> { - let mut exp = vec![]; - write_bytes(&mut exp, payload).await.unwrap(); - exp - } - - /// Read bytes packets of various length, and ensure read_to_end returns the - /// expected payload. - #[rstest] - #[case::empty(&[])] // empty bytes packet - #[case::size_1b(&[0xff])] // 1 bytes payload - #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding) - #[case::size_9b( &hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding) - #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet - #[tokio::test] - async fn read_payload_correct(#[case] payload: &[u8]) { - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await) - .build(); - - let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64); - let mut buf = Vec::new(); - r.read_to_end(&mut buf).await.expect("must succeed"); - - assert_eq!(payload, &buf[..]); - } - - /// Fail if the bytes packet is larger than allowed - #[tokio::test] - async fn read_bigger_than_allowed_fail() { - let payload = LARGE_PAYLOAD.as_slice(); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet - .build(); - - let mut r = BytesReader::new(&mut mock, ..2048); - let mut buf = Vec::new(); - assert_err!(r.read_to_end(&mut buf).await); - } - - /// Fail if the bytes packet is smaller than allowed - #[tokio::test] - async fn read_smaller_than_allowed_fail() { - let payload = &[0x00, 0x01, 0x02]; - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet - .build(); - - let mut r = BytesReader::new(&mut mock, 1024..2048); - let mut buf = Vec::new(); - assert_err!(r.read_to_end(&mut buf).await); - } - - /// Fail if the padding is not all zeroes - #[tokio::test] - async fn read_fail_if_nonzero_padding() { - let payload = &[0x00, 0x01, 0x02]; - let mut packet_bytes = produce_packet_bytes(payload).await; - // Flip some bits in the padding - packet_bytes[12] = 0xff; - let mut mock = Builder::new().read(&packet_bytes).build(); // We stop reading after the faulty bit - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = Vec::new(); - - r.read_to_end(&mut buf).await.expect_err("must fail"); - } - - /// Start a 9 bytes payload packet, but have the underlying reader return - /// EOF in the middle of the size packet (after 4 bytes). - /// We should get an unexpected EOF error, already when trying to read the - /// first byte (of payload) - #[tokio::test] - async fn read_9b_eof_during_size() { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..4]) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = [0u8; 1]; - - assert_eq!( - r.read_exact(&mut buf).await.expect_err("must fail").kind(), - std::io::ErrorKind::UnexpectedEof - ); - - assert_eq!(&[0], &buf, "buffer should stay empty"); - } - - /// Start a 9 bytes payload packet, but have the underlying reader return - /// EOF in the middle of the payload (4 bytes into the payload). - /// We should get an unexpected EOF error, after reading the first 4 bytes - /// (successfully). - #[tokio::test] - async fn read_9b_eof_during_payload() { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..8 + 4]) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = [0; 9]; - - r.read_exact(&mut buf[..4]).await.expect("must succeed"); - - assert_eq!( - r.read_exact(&mut buf[4..=4]) - .await - .expect_err("must fail") - .kind(), - std::io::ErrorKind::UnexpectedEof - ); - } - - /// Start a 9 bytes payload packet, but return an error at various stages *after* the actual payload. - /// read_exact with a 9 bytes buffer is expected to succeed, but any further - /// read, as well as read_to_end are expected to fail. - #[rstest] - #[case::before_padding(8 + 9)] - #[case::during_padding(8 + 9 + 2)] - #[case::after_padding(8 + 9 + padding_len(9) as usize)] - #[tokio::test] - async fn read_9b_eof_after_payload(#[case] offset: usize) { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..offset]) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = [0; 9]; - - // read_exact of the payload will succeed, but a subsequent read will - // return UnexpectedEof error. - r.read_exact(&mut buf).await.expect("should succeed"); - assert_eq!( - r.read_exact(&mut buf[4..=4]) - .await - .expect_err("must fail") - .kind(), - std::io::ErrorKind::UnexpectedEof - ); - - // read_to_end will fail. - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..8 + payload.len()]) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = Vec::new(); - assert_eq!( - r.read_to_end(&mut buf).await.expect_err("must fail").kind(), - std::io::ErrorKind::UnexpectedEof - ); - } - - /// Start a 9 bytes payload packet, but return an error after a certain position. - /// Ensure that error is propagated. - #[rstest] - #[case::during_size(4)] - #[case::before_payload(8)] - #[case::during_payload(8 + 4)] - #[case::before_padding(8 + 4)] - #[case::during_padding(8 + 9 + 2)] - #[tokio::test] - async fn propagate_error_from_reader(#[case] offset: usize) { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..offset]) - .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo")) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = Vec::new(); - - let err = r.read_to_end(&mut buf).await.expect_err("must fail"); - assert_eq!( - err.kind(), - std::io::ErrorKind::Other, - "error kind must match" - ); - - assert_eq!( - err.into_inner().unwrap().to_string(), - "foo", - "error payload must contain foo" - ); - } - - /// If there's an error right after the padding, we don't propagate it, as - /// we're done reading. We just return EOF. - #[tokio::test] - async fn no_error_after_eof() { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await) - .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo")) - .build(); - - let mut r = BytesReader::new(&mut mock, ..MAX_LEN); - let mut buf = Vec::new(); - - r.read_to_end(&mut buf).await.expect("must succeed"); - assert_eq!(buf.as_slice(), payload); - } - - /// Introduce various stalls in various places of the packet, to ensure we - /// handle these cases properly, too. - #[rstest] - #[case::beginning(0)] - #[case::before_payload(8)] - #[case::during_payload(8 + 4)] - #[case::before_padding(8 + 4)] - #[case::during_padding(8 + 9 + 2)] - #[tokio::test] - async fn read_payload_correct_pending(#[case] offset: usize) { - let payload = &hex!("FF0102030405060708"); - let mut mock = Builder::new() - .read(&produce_packet_bytes(payload).await[..offset]) - .wait(Duration::from_nanos(0)) - .read(&produce_packet_bytes(payload).await[offset..]) - .build(); - - let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64); - let mut buf = Vec::new(); - r.read_to_end(&mut buf).await.expect("must succeed"); - - assert_eq!(payload, &buf[..]); - } -} diff --git a/tvix/nix-compat/src/wire/bytes/reader/mod.rs b/tvix/nix-compat/src/wire/bytes/reader/mod.rs new file mode 100644 index 0000000000..cd45f78a0c --- /dev/null +++ b/tvix/nix-compat/src/wire/bytes/reader/mod.rs @@ -0,0 +1,447 @@ +use std::{ + future::Future, + io, + ops::RangeBounds, + pin::Pin, + task::{self, ready, Poll}, +}; +use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; + +use trailer::{read_trailer, ReadTrailer, Trailer}; + +#[doc(hidden)] +pub use self::trailer::Pad; +pub(crate) use self::trailer::Tag; +mod trailer; + +/// Reads a "bytes wire packet" from the underlying reader. +/// The format is the same as in [crate::wire::bytes::read_bytes], +/// however this structure provides a [AsyncRead] interface, +/// allowing to not having to pass around the entire payload in memory. +/// +/// It is constructed by reading a size with [BytesReader::new], +/// and yields payload data until the end of the packet is reached. +/// +/// It will not return the final bytes before all padding has been successfully +/// consumed as well, but the full length of the reader must be consumed. +/// +/// If the data is not read all the way to the end, or an error is encountered, +/// the underlying reader is no longer usable and might return garbage. +#[derive(Debug)] +#[allow(private_bounds)] +pub struct BytesReader<R, T: Tag = Pad> { + state: State<R, T>, +} + +#[derive(Debug)] +enum State<R, T: Tag> { + /// Full 8-byte blocks are being read and released to the caller. + Body { + reader: Option<R>, + consumed: u64, + /// The total length of all user data contained in both the body and trailer. + user_len: u64, + }, + /// The trailer is in the process of being read. + ReadTrailer(ReadTrailer<R, T>), + /// The trailer has been fully read and validated, + /// and data can now be released to the caller. + ReleaseTrailer { consumed: u8, data: Trailer }, +} + +impl<R> BytesReader<R> +where + R: AsyncRead + Unpin, +{ + /// Constructs a new BytesReader, using the underlying passed reader. + pub async fn new<S: RangeBounds<u64>>(reader: R, allowed_size: S) -> io::Result<Self> { + BytesReader::new_internal(reader, allowed_size).await + } +} + +#[allow(private_bounds)] +impl<R, T: Tag> BytesReader<R, T> +where + R: AsyncRead + Unpin, +{ + /// Constructs a new BytesReader, using the underlying passed reader. + pub(crate) async fn new_internal<S: RangeBounds<u64>>( + mut reader: R, + allowed_size: S, + ) -> io::Result<Self> { + let size = reader.read_u64_le().await?; + + if !allowed_size.contains(&size) { + return Err(io::Error::new(io::ErrorKind::InvalidData, "invalid size")); + } + + Ok(Self { + state: State::Body { + reader: Some(reader), + consumed: 0, + user_len: size, + }, + }) + } + + /// Returns whether there is any remaining data to be read. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Remaining data length, ie not including data already read. + /// + /// If the size has not been read yet, this is [None]. + pub fn len(&self) -> u64 { + match self.state { + State::Body { + consumed, user_len, .. + } => user_len - consumed, + State::ReadTrailer(ref fut) => fut.len() as u64, + State::ReleaseTrailer { consumed, ref data } => data.len() as u64 - consumed as u64, + } + } +} + +#[allow(private_bounds)] +impl<R: AsyncRead + Unpin, T: Tag> AsyncRead for BytesReader<R, T> { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut task::Context, + buf: &mut ReadBuf, + ) -> Poll<io::Result<()>> { + let this = &mut self.state; + + loop { + match this { + State::Body { + reader, + consumed, + user_len, + } => { + let body_len = *user_len & !7; + let remaining = body_len - *consumed; + + let reader = if remaining == 0 { + let reader = reader.take().unwrap(); + let user_len = (*user_len & 7) as u8; + *this = State::ReadTrailer(read_trailer(reader, user_len)); + continue; + } else { + reader.as_mut().unwrap() + }; + + let mut bytes_read = 0; + ready!(with_limited(buf, remaining, |buf| { + let ret = Pin::new(reader).poll_read(cx, buf); + bytes_read = buf.initialized().len(); + ret + }))?; + + *consumed += bytes_read as u64; + + return if bytes_read != 0 { + Ok(()) + } else { + Err(io::ErrorKind::UnexpectedEof.into()) + } + .into(); + } + State::ReadTrailer(fut) => { + *this = State::ReleaseTrailer { + consumed: 0, + data: ready!(Pin::new(fut).poll(cx))?, + }; + } + State::ReleaseTrailer { consumed, data } => { + let data = &data[*consumed as usize..]; + let data = &data[..usize::min(data.len(), buf.remaining())]; + + buf.put_slice(data); + *consumed += data.len() as u8; + + return Ok(()).into(); + } + } + } + } +} + +/// Make a limited version of `buf`, consisting only of up to `n` bytes of the unfilled section, and call `f` with it. +/// After `f` returns, we propagate the filled cursor advancement back to `buf`. +fn with_limited<R>(buf: &mut ReadBuf, n: u64, f: impl FnOnce(&mut ReadBuf) -> R) -> R { + let mut nbuf = buf.take(n.try_into().unwrap_or(usize::MAX)); + let ptr = nbuf.initialized().as_ptr(); + let ret = f(&mut nbuf); + + // SAFETY: `ReadBuf::take` only returns the *unfilled* section of `buf`, + // so anything filled is new, initialized data. + // + // We verify that `nbuf` still points to the same buffer, + // so we're sure it hasn't been swapped out. + unsafe { + // ensure our buffer hasn't been swapped out + assert_eq!(nbuf.initialized().as_ptr(), ptr); + + let n = nbuf.filled().len(); + buf.assume_init(n); + buf.advance(n); + } + + ret +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use crate::wire::bytes::{padding_len, write_bytes}; + use hex_literal::hex; + use lazy_static::lazy_static; + use rstest::rstest; + use tokio::io::AsyncReadExt; + use tokio_test::io::Builder; + + use super::*; + + /// The maximum length of bytes packets we're willing to accept in the test + /// cases. + const MAX_LEN: u64 = 1024; + + lazy_static! { + pub static ref LARGE_PAYLOAD: Vec<u8> = (0..255).collect::<Vec<u8>>().repeat(4 * 1024); + } + + /// Helper function, calling the (simpler) write_bytes with the payload. + /// We use this to create data we want to read from the wire. + async fn produce_packet_bytes(payload: &[u8]) -> Vec<u8> { + let mut exp = vec![]; + write_bytes(&mut exp, payload).await.unwrap(); + exp + } + + /// Read bytes packets of various length, and ensure read_to_end returns the + /// expected payload. + #[rstest] + #[case::empty(&[])] // empty bytes packet + #[case::size_1b(&[0xff])] // 1 bytes payload + #[case::size_8b(&hex!("0001020304050607"))] // 8 bytes payload (no padding) + #[case::size_9b(&hex!("000102030405060708"))] // 9 bytes payload (7 bytes padding) + #[case::size_1m(LARGE_PAYLOAD.as_slice())] // larger bytes packet + #[tokio::test] + async fn read_payload_correct(#[case] payload: &[u8]) { + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await) + .build(); + + let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64) + .await + .unwrap(); + let mut buf = Vec::new(); + r.read_to_end(&mut buf).await.expect("must succeed"); + + assert_eq!(payload, &buf[..]); + } + + /// Fail if the bytes packet is larger than allowed + #[tokio::test] + async fn read_bigger_than_allowed_fail() { + let payload = LARGE_PAYLOAD.as_slice(); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet + .build(); + + assert_eq!( + BytesReader::new(&mut mock, ..2048) + .await + .unwrap_err() + .kind(), + io::ErrorKind::InvalidData + ); + } + + /// Fail if the bytes packet is smaller than allowed + #[tokio::test] + async fn read_smaller_than_allowed_fail() { + let payload = &[0x00, 0x01, 0x02]; + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[0..8]) // We stop reading after the size packet + .build(); + + assert_eq!( + BytesReader::new(&mut mock, 1024..2048) + .await + .unwrap_err() + .kind(), + io::ErrorKind::InvalidData + ); + } + + /// Fail if the padding is not all zeroes + #[tokio::test] + async fn read_fail_if_nonzero_padding() { + let payload = &[0x00, 0x01, 0x02]; + let mut packet_bytes = produce_packet_bytes(payload).await; + // Flip some bits in the padding + packet_bytes[12] = 0xff; + let mut mock = Builder::new().read(&packet_bytes).build(); // We stop reading after the faulty bit + + let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap(); + let mut buf = Vec::new(); + + r.read_to_end(&mut buf).await.expect_err("must fail"); + } + + /// Start a 9 bytes payload packet, but have the underlying reader return + /// EOF in the middle of the size packet (after 4 bytes). + /// We should get an unexpected EOF error, already when trying to read the + /// first byte (of payload) + #[tokio::test] + async fn read_9b_eof_during_size() { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[..4]) + .build(); + + assert_eq!( + BytesReader::new(&mut mock, ..MAX_LEN) + .await + .expect_err("must fail") + .kind(), + io::ErrorKind::UnexpectedEof + ); + } + + /// Start a 9 bytes payload packet, but have the underlying reader return + /// EOF in the middle of the payload (4 bytes into the payload). + /// We should get an unexpected EOF error, after reading the first 4 bytes + /// (successfully). + #[tokio::test] + async fn read_9b_eof_during_payload() { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[..8 + 4]) + .build(); + + let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap(); + let mut buf = [0; 9]; + + r.read_exact(&mut buf[..4]).await.expect("must succeed"); + + assert_eq!( + r.read_exact(&mut buf[4..=4]) + .await + .expect_err("must fail") + .kind(), + std::io::ErrorKind::UnexpectedEof + ); + } + + /// Start a 9 bytes payload packet, but don't supply the necessary padding. + /// This is expected to always fail before returning the final data. + #[rstest] + #[case::before_padding(8 + 9)] + #[case::during_padding(8 + 9 + 2)] + #[case::after_padding(8 + 9 + padding_len(9) as usize - 1)] + #[tokio::test] + async fn read_9b_eof_after_payload(#[case] offset: usize) { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[..offset]) + .build(); + + let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap(); + + // read_exact of the payload *body* will succeed, but a subsequent read will + // return UnexpectedEof error. + assert_eq!(r.read_exact(&mut [0; 8]).await.unwrap(), 8); + assert_eq!( + r.read_exact(&mut [0]).await.unwrap_err().kind(), + std::io::ErrorKind::UnexpectedEof + ); + } + + /// Start a 9 bytes payload packet, but return an error after a certain position. + /// Ensure that error is propagated. + #[rstest] + #[case::during_size(4)] + #[case::before_payload(8)] + #[case::during_payload(8 + 4)] + #[case::before_padding(8 + 4)] + #[case::during_padding(8 + 9 + 2)] + #[tokio::test] + async fn propagate_error_from_reader(#[case] offset: usize) { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[..offset]) + .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo")) + .build(); + + // Either length reading or data reading can fail, depending on which test case we're in. + let err: io::Error = async { + let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await?; + let mut buf = Vec::new(); + + r.read_to_end(&mut buf).await?; + + Ok(()) + } + .await + .expect_err("must fail"); + + assert_eq!( + err.kind(), + std::io::ErrorKind::Other, + "error kind must match" + ); + + assert_eq!( + err.into_inner().unwrap().to_string(), + "foo", + "error payload must contain foo" + ); + } + + /// If there's an error right after the padding, we don't propagate it, as + /// we're done reading. We just return EOF. + #[tokio::test] + async fn no_error_after_eof() { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await) + .read_error(std::io::Error::new(std::io::ErrorKind::Other, "foo")) + .build(); + + let mut r = BytesReader::new(&mut mock, ..MAX_LEN).await.unwrap(); + let mut buf = Vec::new(); + + r.read_to_end(&mut buf).await.expect("must succeed"); + assert_eq!(buf.as_slice(), payload); + } + + /// Introduce various stalls in various places of the packet, to ensure we + /// handle these cases properly, too. + #[rstest] + #[case::beginning(0)] + #[case::before_payload(8)] + #[case::during_payload(8 + 4)] + #[case::before_padding(8 + 4)] + #[case::during_padding(8 + 9 + 2)] + #[tokio::test] + async fn read_payload_correct_pending(#[case] offset: usize) { + let payload = &hex!("FF0102030405060708"); + let mut mock = Builder::new() + .read(&produce_packet_bytes(payload).await[..offset]) + .wait(Duration::from_nanos(0)) + .read(&produce_packet_bytes(payload).await[offset..]) + .build(); + + let mut r = BytesReader::new(&mut mock, ..=LARGE_PAYLOAD.len() as u64) + .await + .unwrap(); + let mut buf = Vec::new(); + r.read_to_end(&mut buf).await.expect("must succeed"); + + assert_eq!(payload, &buf[..]); + } +} diff --git a/tvix/nix-compat/src/wire/bytes/reader/trailer.rs b/tvix/nix-compat/src/wire/bytes/reader/trailer.rs new file mode 100644 index 0000000000..0b0c7b1355 --- /dev/null +++ b/tvix/nix-compat/src/wire/bytes/reader/trailer.rs @@ -0,0 +1,198 @@ +use std::{ + fmt::Debug, + future::Future, + marker::PhantomData, + ops::Deref, + pin::Pin, + task::{self, ready, Poll}, +}; + +use tokio::io::{self, AsyncRead, ReadBuf}; + +/// Trailer represents up to 7 bytes of data read as part of the trailer block(s) +#[derive(Debug)] +pub(crate) struct Trailer { + data_len: u8, + buf: [u8; 7], +} + +impl Deref for Trailer { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + &self.buf[..self.data_len as usize] + } +} + +/// Tag defines a "trailer tag": specific, fixed bytes that must follow wire data. +pub(crate) trait Tag { + /// The expected suffix + /// + /// The first 7 bytes may be ignored, and it must be an 8-byte aligned size. + const PATTERN: &'static [u8]; + + /// Suitably sized buffer for reading [Self::PATTERN] + /// + /// HACK: This is a workaround for const generics limitations. + type Buf: AsRef<[u8]> + AsMut<[u8]> + Debug + Unpin; + + /// Make an instance of [Self::Buf] + fn make_buf() -> Self::Buf; +} + +#[derive(Debug)] +pub enum Pad {} + +impl Tag for Pad { + const PATTERN: &'static [u8] = &[0; 8]; + + type Buf = [u8; 8]; + + fn make_buf() -> Self::Buf { + [0; 8] + } +} + +#[derive(Debug)] +pub(crate) struct ReadTrailer<R, T: Tag> { + reader: R, + data_len: u8, + filled: u8, + buf: T::Buf, + _phantom: PhantomData<fn(T) -> T>, +} + +/// read_trailer returns a [Future] that reads a trailer with a given [Tag] from `reader` +pub(crate) fn read_trailer<R: AsyncRead + Unpin, T: Tag>( + reader: R, + data_len: u8, +) -> ReadTrailer<R, T> { + assert!(data_len < 8, "payload in trailer must be less than 8 bytes"); + + let buf = T::make_buf(); + assert_eq!(buf.as_ref().len(), T::PATTERN.len()); + assert_eq!(T::PATTERN.len() % 8, 0); + + ReadTrailer { + reader, + data_len, + filled: if data_len != 0 { 0 } else { 8 }, + buf, + _phantom: PhantomData, + } +} + +impl<R, T: Tag> ReadTrailer<R, T> { + pub fn len(&self) -> u8 { + self.data_len + } +} + +impl<R: AsyncRead + Unpin, T: Tag> Future for ReadTrailer<R, T> { + type Output = io::Result<Trailer>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> { + let this = &mut *self; + + loop { + if this.filled >= this.data_len { + let check_range = || this.data_len as usize..this.filled as usize; + + if this.buf.as_ref()[check_range()] != T::PATTERN[check_range()] { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "invalid trailer", + )) + .into(); + } + } + + if this.filled as usize == T::PATTERN.len() { + let mut buf = [0; 7]; + buf.copy_from_slice(&this.buf.as_ref()[..7]); + + return Ok(Trailer { + data_len: this.data_len, + buf, + }) + .into(); + } + + let mut buf = ReadBuf::new(this.buf.as_mut()); + buf.advance(this.filled as usize); + + ready!(Pin::new(&mut this.reader).poll_read(cx, &mut buf))?; + + this.filled = { + let prev_filled = this.filled; + let filled = buf.filled().len() as u8; + + if filled == prev_filled { + return Err(io::ErrorKind::UnexpectedEof.into()).into(); + } + + filled + }; + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + + #[tokio::test] + async fn unexpected_eof() { + let reader = tokio_test::io::Builder::new() + .read(&[0xed]) + .wait(Duration::ZERO) + .read(&[0xef, 0x00]) + .build(); + + assert_eq!( + read_trailer::<_, Pad>(reader, 2).await.unwrap_err().kind(), + io::ErrorKind::UnexpectedEof + ); + } + + #[tokio::test] + async fn invalid_padding() { + let reader = tokio_test::io::Builder::new() + .read(&[0xed]) + .wait(Duration::ZERO) + .read(&[0xef, 0x01, 0x00]) + .wait(Duration::ZERO) + .build(); + + assert_eq!( + read_trailer::<_, Pad>(reader, 2).await.unwrap_err().kind(), + io::ErrorKind::InvalidData + ); + } + + #[tokio::test] + async fn success() { + let reader = tokio_test::io::Builder::new() + .read(&[0xed]) + .wait(Duration::ZERO) + .read(&[0xef, 0x00]) + .wait(Duration::ZERO) + .read(&[0x00, 0x00, 0x00, 0x00, 0x00]) + .build(); + + assert_eq!( + &*read_trailer::<_, Pad>(reader, 2).await.unwrap(), + &[0xed, 0xef] + ); + } + + #[tokio::test] + async fn no_padding() { + assert!(read_trailer::<_, Pad>(io::empty(), 0) + .await + .unwrap() + .is_empty()); + } +} diff --git a/tvix/nix-compat/src/wire/bytes/writer.rs b/tvix/nix-compat/src/wire/bytes/writer.rs index 347934b3dc..f5632771e9 100644 --- a/tvix/nix-compat/src/wire/bytes/writer.rs +++ b/tvix/nix-compat/src/wire/bytes/writer.rs @@ -3,7 +3,7 @@ use std::task::{ready, Poll}; use tokio::io::AsyncWrite; -use super::{padding_len, BytesPacketPosition, EMPTY_BYTES, LEN_SIZE}; +use super::{padding_len, EMPTY_BYTES, LEN_SIZE}; pin_project! { /// Writes a "bytes wire packet" to the underlying writer. @@ -41,6 +41,22 @@ pin_project! { } } +/// Models the position inside a "bytes wire packet" that the writer is in. +/// It can be in three different stages, inside size, payload or padding fields. +/// The number tracks the number of bytes written inside the specific field. +/// There shall be no ambiguous states, at the end of a stage we immediately +/// move to the beginning of the next one: +/// - Size(LEN_SIZE) must be expressed as Payload(0) +/// - Payload(self.payload_len) must be expressed as Padding(0) +/// +/// Padding(padding_len) means we're at the end of the bytes wire packet. +#[derive(Clone, Debug, PartialEq, Eq)] +enum BytesPacketPosition { + Size(usize), + Payload(u64), + Padding(usize), +} + impl<W> BytesWriter<W> where W: AsyncWrite, diff --git a/tvix/nix-compat/src/wire/mod.rs b/tvix/nix-compat/src/wire/mod.rs index 65c053d58e..a197e3a1f4 100644 --- a/tvix/nix-compat/src/wire/mod.rs +++ b/tvix/nix-compat/src/wire/mod.rs @@ -3,6 +3,3 @@ mod bytes; pub use bytes::*; - -mod primitive; -pub use primitive::*; diff --git a/tvix/nix-compat/src/wire/primitive.rs b/tvix/nix-compat/src/wire/primitive.rs deleted file mode 100644 index ee0f5fc427..0000000000 --- a/tvix/nix-compat/src/wire/primitive.rs +++ /dev/null @@ -1,74 +0,0 @@ -// SPDX-FileCopyrightText: 2023 embr <git@liclac.eu> -// -// SPDX-License-Identifier: EUPL-1.2 - -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; - -#[allow(dead_code)] -/// Read a u64 from the AsyncRead (little endian). -pub async fn read_u64<R: AsyncReadExt + Unpin>(r: &mut R) -> std::io::Result<u64> { - r.read_u64_le().await -} - -/// Write a u64 to the AsyncWrite (little endian). -pub async fn write_u64<W: AsyncWrite + Unpin>(w: &mut W, v: u64) -> std::io::Result<()> { - w.write_u64_le(v).await -} - -#[allow(dead_code)] -/// Read a boolean from the AsyncRead, encoded as u64 (>0 is true). -pub async fn read_bool<R: AsyncRead + Unpin>(r: &mut R) -> std::io::Result<bool> { - Ok(read_u64(r).await? > 0) -} - -#[allow(dead_code)] -/// Write a boolean to the AsyncWrite, encoded as u64 (>0 is true). -pub async fn write_bool<W: AsyncWrite + Unpin>(w: &mut W, v: bool) -> std::io::Result<()> { - write_u64(w, if v { 1u64 } else { 0u64 }).await -} - -#[cfg(test)] -mod tests { - use super::*; - use tokio_test::io::Builder; - - // Integers. - #[tokio::test] - async fn test_read_u64() { - let mut mock = Builder::new().read(&1234567890u64.to_le_bytes()).build(); - assert_eq!(1234567890u64, read_u64(&mut mock).await.unwrap()); - } - #[tokio::test] - async fn test_write_u64() { - let mut mock = Builder::new().write(&1234567890u64.to_le_bytes()).build(); - write_u64(&mut mock, 1234567890).await.unwrap(); - } - - // Booleans. - #[tokio::test] - async fn test_read_bool_0() { - let mut mock = Builder::new().read(&0u64.to_le_bytes()).build(); - assert!(!read_bool(&mut mock).await.unwrap()); - } - #[tokio::test] - async fn test_read_bool_1() { - let mut mock = Builder::new().read(&1u64.to_le_bytes()).build(); - assert!(read_bool(&mut mock).await.unwrap()); - } - #[tokio::test] - async fn test_read_bool_2() { - let mut mock = Builder::new().read(&2u64.to_le_bytes()).build(); - assert!(read_bool(&mut mock).await.unwrap()); - } - - #[tokio::test] - async fn test_write_bool_false() { - let mut mock = Builder::new().write(&0u64.to_le_bytes()).build(); - write_bool(&mut mock, false).await.unwrap(); - } - #[tokio::test] - async fn test_write_bool_true() { - let mut mock = Builder::new().write(&1u64.to_le_bytes()).build(); - write_bool(&mut mock, true).await.unwrap(); - } -} diff --git a/tvix/shell.nix b/tvix/shell.nix index 422f1c8dd4..f0d8ab1657 100644 --- a/tvix/shell.nix +++ b/tvix/shell.nix @@ -29,12 +29,10 @@ pkgs.mkShell { pkgs.cargo pkgs.cargo-machete pkgs.cargo-expand - pkgs.cbtemulator pkgs.clippy pkgs.evans pkgs.fuse pkgs.go - pkgs.google-cloud-bigtable-tool pkgs.grpcurl pkgs.hyperfine pkgs.mdbook diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index b549eeb7f5..4c0e9be49a 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -74,3 +74,7 @@ fuse = ["tvix-castore/fuse"] otlp = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"] tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"] virtiofs = ["tvix-castore/virtiofs"] +# Whether to run the integration tests. +# Requires the following packages in $PATH: +# cbtemulator, google-cloud-bigtable-tool +integration = [] diff --git a/tvix/store/default.nix b/tvix/store/default.nix index f30923ac27..ad47994f24 100644 --- a/tvix/store/default.nix +++ b/tvix/store/default.nix @@ -26,7 +26,6 @@ in runTests = true; testPreRun = '' export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt - export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}" ''; # enable some optional features. @@ -34,7 +33,20 @@ in # virtiofs feature currently fails to build on Darwin. ++ pkgs.lib.optional pkgs.stdenv.isLinux "virtiofs"; }).overrideAttrs (_: { + meta.ci.targets = [ "integration-tests" ]; meta.ci.extraSteps = { import-docs = (mkImportCheck "tvix/store/docs" ./docs); }; + passthru.integration-tests = depot.tvix.crates.workspaceMembers.tvix-store.build.override { + runTests = true; + testPreRun = '' + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt + export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}" + ''; + + # enable some optional features. + features = [ "default" "cloud" "integration" ] + # virtiofs feature currently fails to build on Darwin. + ++ pkgs.lib.optional pkgs.stdenv.isLinux "virtiofs"; + }; }) diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 8262a9e98c..fa30501e78 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -56,10 +56,6 @@ use tvix_store::proto::FILE_DESCRIPTOR_SET; #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { - /// Whether to log in JSON - #[arg(long)] - json: bool, - /// Whether to configure OTLP. Set --otlp=false to disable. #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))] otlp: bool, @@ -82,7 +78,11 @@ enum Commands { #[arg(long, short = 'l')] listen_address: Option<String>, - #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")] + #[arg( + long, + env, + default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store" + )] blob_service_addr: String, #[arg( @@ -218,33 +218,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let level = cli.log_level.unwrap_or(Level::INFO); // Set up the tracing subscriber. - let subscriber = tracing_subscriber::registry() - .with( - cli.json.then_some( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .json() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), - ), - ) - .with( - (!cli.json).then_some( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .pretty() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), + let subscriber = tracing_subscriber::registry().with( + tracing_subscriber::fmt::Layer::new() + .with_writer(std::io::stderr) + .compact() + .with_filter( + EnvFilter::builder() + .with_default_directive(level.into()) + .from_env() + .expect("invalid RUST_LOG"), ), - ); + ); // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI) // then init the registry. diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index 7b6aeb824e..2331fd77ea 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -114,10 +114,12 @@ pub async fn import_path_as_nar_ca<BS, DS, PS, P>( where P: AsRef<Path> + std::fmt::Debug, BS: BlobService + Clone, - DS: AsRef<dyn DirectoryService>, + DS: DirectoryService, PS: AsRef<dyn PathInfoService>, { - let root_node = ingest_path(blob_service, directory_service, path.as_ref()).await?; + let root_node = ingest_path(blob_service, directory_service, path.as_ref()) + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; // Ask the PathInfoService for the NAR size and sha256 let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?; diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 6f4dcdea5d..70f8137e89 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -24,19 +24,19 @@ pub fn read_nar<R, BS, DS>( ) -> io::Result<castorepb::node::Node> where R: BufRead + Send, - BS: AsRef<dyn BlobService>, - DS: AsRef<dyn DirectoryService>, + BS: BlobService + Clone, + DS: DirectoryService, { let handle = tokio::runtime::Handle::current(); - let directory_putter = directory_service.as_ref().put_multiple_start(); + let directory_putter = directory_service.put_multiple_start(); let node = nix_compat::nar::reader::open(r)?; - let (root_node, mut directory_putter, _) = process_node( + let (root_node, mut directory_putter) = process_node( handle.clone(), "".into(), // this is the root node, it has an empty name node, - &blob_service, + blob_service, directory_putter, )?; @@ -80,9 +80,9 @@ fn process_node<BS>( node: nar::reader::Node, blob_service: BS, directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)> +) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>)> where - BS: AsRef<dyn BlobService>, + BS: BlobService + Clone, { Ok(match node { nar::reader::Node::Symlink { target } => ( @@ -91,7 +91,6 @@ where target: target.into(), }), directory_putter, - blob_service, ), nar::reader::Node::File { executable, reader } => ( castorepb::node::Node::File(process_file_reader( @@ -99,19 +98,17 @@ where name, reader, executable, - &blob_service, + blob_service, )?), directory_putter, - blob_service, ), nar::reader::Node::Directory(dir_reader) => { - let (directory_node, directory_putter, blob_service_back) = + let (directory_node, directory_putter) = process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?; ( castorepb::node::Node::Directory(directory_node), directory_putter, - blob_service_back, ) } }) @@ -127,13 +124,13 @@ fn process_file_reader<BS>( blob_service: BS, ) -> io::Result<castorepb::FileNode> where - BS: AsRef<dyn BlobService>, + BS: BlobService, { // store the length. If we read any other length, reading will fail. let expected_len = file_reader.len(); // prepare writing a new blob. - let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await }); + let blob_writer = handle.block_on(async { blob_service.open_write().await }); // write the blob. let mut blob_writer = { @@ -168,24 +165,22 @@ fn process_dir_reader<BS>( mut dir_reader: nar::reader::DirReader, blob_service: BS, directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)> +) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>)> where - BS: AsRef<dyn BlobService>, + BS: BlobService + Clone, { let mut directory = castorepb::Directory::default(); let mut directory_putter = directory_putter; - let mut blob_service = blob_service; while let Some(entry) = dir_reader.next()? { - let (node, directory_putter_back, blob_service_back) = process_node( + let (node, directory_putter_back) = process_node( handle.clone(), entry.name.into(), entry.node, - blob_service, + blob_service.clone(), directory_putter, )?; - blob_service = blob_service_back; directory_putter = directory_putter_back; match node { @@ -213,7 +208,6 @@ where size: directory_size, }, directory_putter, - blob_service, )) } diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs index f49ef475eb..6fb52abbfd 100644 --- a/tvix/store/src/pathinfoservice/bigtable.rs +++ b/tvix/store/src/pathinfoservice/bigtable.rs @@ -116,7 +116,7 @@ impl BigtablePathInfoService { .stdout(Stdio::piped()) .kill_on_drop(true) .spawn() - .expect("failed to spwan emulator"); + .expect("failed to spawn emulator"); Retry::spawn( ExponentialBackoff::from_millis(20) diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 1ff822ad35..f22884ca47 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -208,7 +208,7 @@ mod tests { #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)] /// A valid example for Bigtable. #[cfg_attr( - feature = "cloud", + all(feature = "cloud", feature = "integration"), case::bigtable_valid( "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1", true @@ -216,7 +216,7 @@ mod tests { )] /// An invalid example for Bigtable, missing fields #[cfg_attr( - feature = "cloud", + all(feature = "cloud", feature = "integration"), case::bigtable_invalid_missing_fields("bigtable://instance-1", false) )] #[tokio::test] diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 7b6d7fd7ab..0255c031e2 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -1,11 +1,13 @@ use super::PathInfoService; use crate::nar::calculate_size_and_sha256; use crate::proto::PathInfo; +use data_encoding::BASE64; use futures::stream::iter; use futures::stream::BoxStream; use prost::Message; use std::path::Path; use tonic::async_trait; +use tracing::instrument; use tracing::warn; use tvix_castore::proto as castorepb; use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; @@ -57,53 +59,46 @@ where BS: AsRef<dyn BlobService> + Send + Sync, DS: AsRef<dyn DirectoryService> + Send + Sync, { + #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { - match self.db.get(digest) { - Ok(None) => Ok(None), - Ok(Some(data)) => match PathInfo::decode(&*data) { - Ok(path_info) => Ok(Some(path_info)), - Err(e) => { + match self.db.get(digest).map_err(|e| { + warn!("failed to retrieve PathInfo: {}", e); + Error::StorageError(format!("failed to retrieve PathInfo: {}", e)) + })? { + None => Ok(None), + Some(data) => { + let path_info = PathInfo::decode(&*data).map_err(|e| { warn!("failed to decode stored PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to decode stored PathInfo: {}", - e - ))) - } - }, - Err(e) => { - warn!("failed to retrieve PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to retrieve PathInfo: {}", - e - ))) + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?; + Ok(Some(path_info)) } } } + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { // Call validate on the received PathInfo message. - match path_info.validate() { - Err(e) => Err(Error::InvalidRequest(format!( - "failed to validate PathInfo: {}", - e - ))), - // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. - // This overwrites existing PathInfo objects. - Ok(nix_path) => match self - .db - .insert(*nix_path.digest(), path_info.encode_to_vec()) - { - Ok(_) => Ok(path_info), - Err(e) => { - warn!("failed to insert PathInfo: {}", e); - Err(Error::StorageError(format! { - "failed to insert PathInfo: {}", e - })) - } - }, - } + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?; + + // In case the PathInfo is valid, we were able to parse a StorePath. + // Store it in the database, keyed by its digest. + // This overwrites existing PathInfo objects. + self.db + .insert(store_path.digest(), path_info.encode_to_vec()) + .map_err(|e| { + warn!("failed to insert PathInfo: {}", e); + Error::StorageError(format! { + "failed to insert PathInfo: {}", e + }) + })?; + + Ok(path_info) } + #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))] async fn calculate_nar( &self, root_node: &castorepb::node::Node, @@ -114,27 +109,17 @@ where } fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { - Box::pin(iter(self.db.iter().values().map(|v| match v { - Ok(data) => { - // we retrieved some bytes - match PathInfo::decode(&*data) { - Ok(path_info) => Ok(path_info), - Err(e) => { - warn!("failed to decode stored PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to decode stored PathInfo: {}", - e - ))) - } - } - } - Err(e) => { + Box::pin(iter(self.db.iter().values().map(|v| { + let data = v.map_err(|e| { warn!("failed to retrieve PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to retrieve PathInfo: {}", - e - ))) - } + Error::StorageError(format!("failed to retrieve PathInfo: {}", e)) + })?; + + let path_info = PathInfo::decode(&*data).map_err(|e| { + warn!("failed to decode stored PathInfo: {}", e); + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?; + Ok(path_info) }))) } } diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs index c9b9a06377..9719371592 100644 --- a/tvix/store/src/pathinfoservice/tests/mod.rs +++ b/tvix/store/src/pathinfoservice/tests/mod.rs @@ -51,7 +51,7 @@ pub async fn make_path_info_service(uri: &str) -> BSDSPS { #[case::memory(make_path_info_service("memory://").await)] #[case::grpc(make_grpc_path_info_service_client().await)] #[case::sled(make_path_info_service("sled://").await)] -#[cfg_attr(feature = "cloud", case::bigtable(make_path_info_service("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await))] +#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_path_info_service("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await))] pub fn path_info_services( #[case] services: ( impl BlobService, |