diff options
Diffstat (limited to 'tvix/store')
25 files changed, 1659 insertions, 1016 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 367e63c21e..dc6126724f 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] anyhow = "1.0.68" +async-compression = { version = "0.4.9", features = ["tokio", "bzip2", "gzip", "xz", "zstd"]} async-stream = "0.3.5" blake3 = { version = "1.3.1", features = ["rayon", "std"] } bstr = "1.6.0" @@ -17,47 +18,64 @@ lazy_static = "1.4.0" nix-compat = { path = "../nix-compat", features = ["async"] } pin-project-lite = "0.2.13" prost = "0.12.1" -opentelemetry = { version = "0.21.0", optional = true} -opentelemetry-otlp = { version = "0.14.0", optional = true } -opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"], optional = true} +serde = { version = "1.0.197", features = [ "derive" ] } +serde_json = "1.0" +serde_with = "3.7.0" +serde_qs = "0.12.0" sha2 = "0.10.6" sled = { version = "0.34.7" } thiserror = "1.0.38" tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } -tokio-listener = { version = "0.3.2", features = [ "tonic011" ] } +tokio-listener = { version = "0.4.1", features = [ "tonic011" ] } tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } tonic = { version = "0.11.0", features = ["tls", "tls-roots"] } tower = "0.4.13" -tracing = "0.1.37" -tracing-opentelemetry = "0.22.0" -tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] } tvix-castore = { path = "../castore" } url = "2.4.0" walkdir = "2.4.0" -async-recursion = "1.0.5" reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots", "stream"], default-features = false } -xz2 = "0.1.7" +lru = "0.12.3" +parking_lot = "0.12.2" +tvix-tracing = { path = "../tracing" } +tracing = "0.1.40" +tracing-indicatif = "0.3.6" [dependencies.tonic-reflection] optional = true version = "0.11.0" +[dependencies.bigtable_rs] +optional = true +# https://github.com/liufuyang/bigtable_rs/pull/72 +git = "https://github.com/flokli/bigtable_rs" +rev = "0af404741dfc40eb9fa99cf4d4140a09c5c20df7" + [build-dependencies] prost-build = "0.12.1" tonic-build = "0.11.0" [dev-dependencies] -rstest = "0.18.2" +async-process = "2.1.0" +rstest = "0.19.0" rstest_reuse = "0.6.0" -test-case = "3.3.1" tempfile = "3.3.0" tokio-retry = "0.3.0" [features] default = ["cloud", "fuse", "otlp", "tonic-reflection"] -cloud = ["tvix-castore/cloud"] +cloud = [ + "dep:bigtable_rs", + "tvix-castore/cloud" +] fuse = ["tvix-castore/fuse"] -otlp = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"] +otlp = ["tvix-tracing/otlp"] tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"] virtiofs = ["tvix-castore/virtiofs"] +# Whether to run the integration tests. +# Requires the following packages in $PATH: +# cbtemulator, google-cloud-bigtable-tool +integration = [] + +[lints] +workspace = true diff --git a/tvix/store/default.nix b/tvix/store/default.nix index 2c07cdf2b3..78b499114c 100644 --- a/tvix/store/default.nix +++ b/tvix/store/default.nix @@ -1,4 +1,4 @@ -{ depot, pkgs, ... }: +{ depot, pkgs, lib, ... }: let mkImportCheck = p: expectedPath: { @@ -22,17 +22,35 @@ let }; in -(depot.tvix.crates.workspaceMembers.tvix-store.build.override { +(depot.tvix.crates.workspaceMembers.tvix-store.build.override (old: { runTests = true; testPreRun = '' - export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt; + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt ''; - - # virtiofs feature currently fails to build on Darwin. - # we however can ship it for non-darwin. - features = if pkgs.stdenv.isDarwin then [ "default" ] else [ "default" "virtiofs" ]; -}).overrideAttrs (_: { - meta.ci.extraSteps = { - import-docs = (mkImportCheck "tvix/store/docs" ./docs); + features = old.features + # virtiofs feature currently fails to build on Darwin + ++ lib.optional pkgs.stdenv.isLinux "virtiofs"; +})).overrideAttrs (old: rec { + meta.ci = { + targets = [ "integration-tests" ] ++ lib.filter (x: lib.hasPrefix "with-features" x || x == "no-features") (lib.attrNames passthru); + extraSteps.import-docs = (mkImportCheck "tvix/store/docs" ./docs); + }; + passthru = (depot.tvix.utils.mkFeaturePowerset { + inherit (old) crateName; + features = ([ "cloud" "fuse" "otlp" "tonic-reflection" ] + # virtiofs feature currently fails to build on Darwin + ++ lib.optional pkgs.stdenv.isLinux "virtiofs"); + override.testPreRun = '' + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt + ''; + }) // { + integration-tests = depot.tvix.crates.workspaceMembers.${old.crateName}.build.override (old: { + runTests = true; + testPreRun = '' + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt; + export PATH="$PATH:${pkgs.lib.makeBinPath [ pkgs.cbtemulator pkgs.google-cloud-bigtable-tool ]}" + ''; + features = old.features ++ [ "integration" ]; + }); }; }) diff --git a/tvix/store/docs/api.md b/tvix/store/docs/api.md index c1dacc89a5..c5a5c477aa 100644 --- a/tvix/store/docs/api.md +++ b/tvix/store/docs/api.md @@ -5,7 +5,7 @@ This document outlines the design of the API exposed by tvix-castore and tvix- store, as well as other implementations of this store protocol. This document is meant to be read side-by-side with -[castore.md](../../tvix-castore/docs/castore.md) which describes the data model +[castore.md](../../castore/docs/data-model.md) which describes the data model in more detail. The store API has four main consumers: @@ -218,7 +218,7 @@ This is useful for people running a Tvix-only system, or running builds on a In a system with Nix installed, we can't simply manually "extract" things to `/nix/store`, as Nix assumes to own all writes to this location. In these use cases, we're probably better off exposing a tvix-store as a local -binary cache (that's what `//tvix/nar-bridge` does). +binary cache (that's what `//tvix/nar-bridge-go` does). Assuming we are in an environment where we control `/nix/store` exclusively, a "realize to disk" would either "extract" things from the `tvix-store` to a diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 27a67b7c91..03c699b893 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -2,17 +2,23 @@ use clap::Parser; use clap::Subcommand; use futures::future::try_join_all; +use futures::StreamExt; +use futures::TryStreamExt; +use nix_compat::path_info::ExportedPathInfo; +use serde::Deserialize; +use serde::Serialize; use std::path::PathBuf; use std::sync::Arc; use tokio_listener::Listener; use tokio_listener::SystemOptions; use tokio_listener::UserOptions; use tonic::transport::Server; -use tracing::info; -use tracing::Level; -use tracing_subscriber::EnvFilter; -use tracing_subscriber::Layer; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tracing::{info, info_span, instrument, Level, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt as _; +use tvix_castore::import::fs::ingest_path; +use tvix_store::nar::NarCalculationService; +use tvix_store::proto::NarInfo; +use tvix_store::proto::PathInfo; use tvix_castore::proto::blob_service_server::BlobServiceServer; use tvix_castore::proto::directory_service_server::DirectoryServiceServer; @@ -28,15 +34,6 @@ use tvix_store::pathinfoservice::make_fs; #[cfg(feature = "fuse")] use tvix_castore::fs::fuse::FuseDaemon; -#[cfg(feature = "otlp")] -use opentelemetry::KeyValue; -#[cfg(feature = "otlp")] -use opentelemetry_sdk::{ - resource::{ResourceDetector, SdkProvidedResourceDetector}, - trace::BatchConfig, - Resource, -}; - #[cfg(feature = "virtiofs")] use tvix_castore::fs::virtiofs::start_virtiofs_daemon; @@ -48,10 +45,6 @@ use tvix_store::proto::FILE_DESCRIPTOR_SET; #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { - /// Whether to log in JSON - #[arg(long)] - json: bool, - /// Whether to configure OTLP. Set --otlp=false to disable. #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))] otlp: bool, @@ -60,8 +53,8 @@ struct Cli { /// It's also possible to set `RUST_LOG` according to /// `tracing_subscriber::filter::EnvFilter`, which will always have /// priority. - #[arg(long)] - log_level: Option<Level>, + #[arg(long, default_value_t=Level::INFO)] + log_level: Level, #[command(subcommand)] command: Commands, @@ -74,7 +67,11 @@ enum Commands { #[arg(long, short = 'l')] listen_address: Option<String>, - #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")] + #[arg( + long, + env, + default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store" + )] blob_service_addr: String, #[arg( @@ -101,6 +98,30 @@ enum Commands { #[arg(long, env, default_value = "grpc+http://[::1]:8000")] path_info_service_addr: String, }, + + /// Copies a list of store paths on the system into tvix-store. + Copy { + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// A path pointing to a JSON file produced by the Nix + /// `__structuredAttrs` containing reference graph information provided + /// by the `exportReferencesGraph` feature. + /// + /// This can be used to invoke tvix-store inside a Nix derivation + /// copying to a Tvix store (or outside, if the JSON file is copied + /// out). + /// + /// Currently limited to the `closure` key inside that JSON file. + #[arg(value_name = "NIX_ATTRS_JSON_FILE", env = "NIX_ATTRS_JSON_FILE")] + reference_graph_path: PathBuf, + }, /// Mounts a tvix-store at the given mountpoint #[cfg(feature = "fuse")] Mount { @@ -131,6 +152,10 @@ enum Commands { /// (exhaustive) listing. #[clap(long, short, action)] list_root: bool, + + #[arg(long, default_value_t = true)] + /// Whether to expose blob and directory digests as extended attributes. + show_xattr: bool, }, /// Starts a tvix-store virtiofs daemon at the given socket path. #[cfg(feature = "virtiofs")] @@ -153,6 +178,10 @@ enum Commands { /// (exhaustive) listing. #[clap(long, short, action)] list_root: bool, + + #[arg(long, default_value_t = true)] + /// Whether to expose blob and directory digests as extended attributes. + show_xattr: bool, }, } @@ -171,87 +200,22 @@ fn default_threads() -> usize { } #[tokio::main] +#[instrument(fields(indicatif.pb_show=1))] async fn main() -> Result<(), Box<dyn std::error::Error>> { let cli = Cli::parse(); - // configure log settings - let level = cli.log_level.unwrap_or(Level::INFO); - - // Set up the tracing subscriber. - let subscriber = tracing_subscriber::registry() - .with( - cli.json.then_some( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .json() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), - ), - ) - .with( - (!cli.json).then_some( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .pretty() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), - ), - ); - - // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI) - // then init the registry. - // If the feature is feature-flagged out, just init without adding the layer. - // It's necessary to do this separately, as every with() call chains the - // layer into the type of the registry. #[cfg(feature = "otlp")] { - let subscriber = if cli.otlp { - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(opentelemetry_otlp::new_exporter().tonic()) - .with_batch_config(BatchConfig::default()) - .with_trace_config(opentelemetry_sdk::trace::config().with_resource({ - // use SdkProvidedResourceDetector.detect to detect resources, - // but replace the default service name with our default. - // https://github.com/open-telemetry/opentelemetry-rust/issues/1298 - let resources = - SdkProvidedResourceDetector.detect(std::time::Duration::from_secs(0)); - // SdkProvidedResourceDetector currently always sets - // `service.name`, but we don't like its default. - if resources.get("service.name".into()).unwrap() == "unknown_service".into() { - resources.merge(&Resource::new([KeyValue::new( - "service.name", - "tvix.store", - )])) - } else { - resources - } - })) - .install_batch(opentelemetry_sdk::runtime::Tokio)?; - - // Create a tracing layer with the configured tracer - let layer = tracing_opentelemetry::layer().with_tracer(tracer); - - subscriber.with(Some(layer)) + if cli.otlp { + tvix_tracing::init_with_otlp(cli.log_level, "tvix.store")?; } else { - subscriber.with(None) - }; - - subscriber.try_init()?; + tvix_tracing::init(cli.log_level)?; + } } - // Init the registry (when otlp is not enabled) #[cfg(not(feature = "otlp"))] { - subscriber.try_init()?; + tvix_tracing::init(cli.log_level)?; } match cli.command { @@ -262,7 +226,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { path_info_service_addr, } => { // initialize stores - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -287,6 +251,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { )) .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( Arc::from(path_info_service), + nar_calculation_service, ))); #[cfg(feature = "tonic-reflection")] @@ -316,7 +281,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { path_info_service_addr, } => { // FUTUREWORK: allow flat for single files? - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -324,16 +289,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { ) .await?; - // Arc the PathInfoService, as we clone it . + // Arc PathInfoService and NarCalculationService, as we clone it . let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + let nar_calculation_service: Arc<dyn NarCalculationService> = + nar_calculation_service.into(); + + let root_span = { + let s = Span::current(); + s.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + s.pb_set_message("Importing paths"); + s.pb_set_length(paths.len() as u64); + s.pb_start(); + s + }; let tasks = paths .into_iter() .map(|path| { + let paths_span = root_span.clone(); tokio::task::spawn({ let blob_service = blob_service.clone(); let directory_service = directory_service.clone(); let path_info_service = path_info_service.clone(); + let nar_calculation_service = nar_calculation_service.clone(); async move { if let Ok(name) = tvix_store::import::path_to_name(&path) { @@ -343,6 +321,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { blob_service, directory_service, path_info_service, + nar_calculation_service, ) .await; if let Ok(output_path) = resp { @@ -350,6 +329,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { println!("{}", output_path.to_absolute_path()); } } + paths_span.pb_inc(1); } }) }) @@ -357,6 +337,119 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { try_join_all(tasks).await?; } + Commands::Copy { + blob_service_addr, + directory_service_addr, + path_info_service_addr, + reference_graph_path, + } => { + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = + tvix_store::utils::construct_services( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + ) + .await?; + + // Parse the file at reference_graph_path. + let reference_graph_json = tokio::fs::read(&reference_graph_path).await?; + + #[derive(Deserialize, Serialize)] + struct ReferenceGraph<'a> { + #[serde(borrow)] + closure: Vec<ExportedPathInfo<'a>>, + } + + let reference_graph: ReferenceGraph<'_> = + serde_json::from_slice(reference_graph_json.as_slice())?; + + // Arc the PathInfoService, as we clone it . + let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + + let lookups_span = info_span!( + "lookup pathinfos", + "indicatif.pb_show" = tracing::field::Empty + ); + lookups_span.pb_set_length(reference_graph.closure.len() as u64); + lookups_span.pb_set_style(&tvix_tracing::PB_PROGRESS_STYLE); + lookups_span.pb_start(); + + // From our reference graph, lookup all pathinfos that might exist. + let elems: Vec<_> = futures::stream::iter(reference_graph.closure) + .map(|elem| { + let path_info_service = path_info_service.clone(); + async move { + let resp = path_info_service + .get(*elem.path.digest()) + .await + .map(|resp| (elem, resp)); + + Span::current().pb_inc(1); + resp + } + }) + .buffer_unordered(50) + // Filter out all that are already uploaded. + // TODO: check if there's a better combinator for this + .try_filter_map(|(elem, path_info)| { + std::future::ready(if path_info.is_none() { + Ok(Some(elem)) + } else { + Ok(None) + }) + }) + .try_collect() + .await?; + + // Run ingest_path on all of them. + let uploads: Vec<_> = futures::stream::iter(elems) + .map(|elem| { + // Map to a future returning the root node, alongside with the closure info. + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + async move { + // Ingest the given path. + + ingest_path( + blob_service, + directory_service, + PathBuf::from(elem.path.to_absolute_path()), + ) + .await + .map(|root_node| (elem, root_node)) + } + }) + .buffer_unordered(10) + .try_collect() + .await?; + + // Insert them into the PathInfoService. + // FUTUREWORK: do this properly respecting the reference graph. + for (elem, root_node) in uploads { + // Create and upload a PathInfo pointing to the root_node, + // annotated with information we have from the reference graph. + let path_info = PathInfo { + node: Some(tvix_castore::proto::Node { + node: Some(root_node), + }), + references: Vec::from_iter( + elem.references.iter().map(|e| e.digest().to_vec().into()), + ), + narinfo: Some(NarInfo { + nar_size: elem.nar_size, + nar_sha256: elem.nar_sha256.to_vec().into(), + signatures: vec![], + reference_names: Vec::from_iter( + elem.references.iter().map(|e| e.to_string()), + ), + deriver: None, + ca: None, + }), + }; + + path_info_service.put(path_info).await?; + } + } #[cfg(feature = "fuse")] Commands::Mount { dest, @@ -366,8 +459,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { list_root, threads, allow_other, + show_xattr, } => { - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -381,6 +475,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { directory_service, Arc::from(path_info_service), list_root, + show_xattr, ); info!(mount_path=?dest, "mounting"); @@ -406,8 +501,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { directory_service_addr, path_info_service_addr, list_root, + show_xattr, } => { - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -421,6 +517,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { directory_service, Arc::from(path_info_service), list_root, + show_xattr, ); info!(socket_path=?socket, "starting virtiofs-daemon"); diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index f2e7bdfe5d..888380bca9 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,16 +1,32 @@ use std::path::Path; use tracing::{debug, instrument}; use tvix_castore::{ - blobservice::BlobService, directoryservice::DirectoryService, proto::node::Node, B3Digest, + blobservice::BlobService, directoryservice::DirectoryService, import::fs::ingest_path, + proto::node::Node, B3Digest, }; -use nix_compat::store_path::{self, StorePath}; +use nix_compat::{ + nixhash::{CAHash, NixHash}, + store_path::{self, StorePath}, +}; use crate::{ + nar::NarCalculationService, pathinfoservice::PathInfoService, proto::{nar_info, NarInfo, PathInfo}, }; +impl From<CAHash> for nar_info::Ca { + fn from(value: CAHash) -> Self { + let hash_type: nar_info::ca::Hash = (&value).into(); + let digest: bytes::Bytes = value.hash().to_string().into(); + nar_info::Ca { + r#type: hash_type.into(), + digest, + } + } +} + pub fn log_node(node: &Node, path: &Path) { match node { Node::Directory(directory_node) => { @@ -54,13 +70,20 @@ pub fn path_to_name(path: &Path) -> std::io::Result<&str> { }) } -/// Takes the NAR size, SHA-256 of the NAR representation and the root node. -/// Returns the path information object for a content addressed NAR-style (recursive) object. +/// Takes the NAR size, SHA-256 of the NAR representation, the root node and optionally +/// a CA hash information. +/// +/// Returns the path information object for a NAR-style object. /// /// This [`PathInfo`] can be further filled for signatures, deriver or verified for the expected /// hashes. #[inline] -pub fn derive_nar_ca_path_info(nar_size: u64, nar_sha256: [u8; 32], root_node: Node) -> PathInfo { +pub fn derive_nar_ca_path_info( + nar_size: u64, + nar_sha256: [u8; 32], + ca: Option<CAHash>, + root_node: Node, +) -> PathInfo { // assemble the [crate::proto::PathInfo] object. PathInfo { node: Some(tvix_castore::proto::Node { @@ -74,10 +97,7 @@ pub fn derive_nar_ca_path_info(nar_size: u64, nar_sha256: [u8; 32], root_node: N signatures: vec![], reference_names: vec![], deriver: None, - ca: Some(nar_info::Ca { - r#type: nar_info::ca::Hash::NarSha256.into(), - digest: nar_sha256.to_vec().into(), - }), + ca: ca.map(|ca_hash| ca_hash.into()), }), } } @@ -85,24 +105,27 @@ pub fn derive_nar_ca_path_info(nar_size: u64, nar_sha256: [u8; 32], root_node: N /// Ingest the given path `path` and register the resulting output path in the /// [`PathInfoService`] as a recursive fixed output NAR. #[instrument(skip_all, fields(store_name=name, path=?path), err)] -pub async fn import_path_as_nar_ca<BS, DS, PS, P>( +pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>( path: P, name: &str, blob_service: BS, directory_service: DS, path_info_service: PS, + nar_calculation_service: NS, ) -> Result<StorePath, std::io::Error> where P: AsRef<Path> + std::fmt::Debug, - BS: AsRef<dyn BlobService> + Clone, - DS: AsRef<dyn DirectoryService>, + BS: BlobService + Clone, + DS: DirectoryService, PS: AsRef<dyn PathInfoService>, + NS: NarCalculationService, { - let root_node = - tvix_castore::import::ingest_path(blob_service, directory_service, &path).await?; + let root_node = ingest_path(blob_service, directory_service, path.as_ref()) + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - // Ask the PathInfoService for the NAR size and sha256 - let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?; + // Ask for the NAR size and sha256 + let (nar_size, nar_sha256) = nar_calculation_service.calculate_nar(&root_node).await?; // Calculate the output path. This might still fail, as some names are illegal. // FUTUREWORK: express the `name` at the type level to be valid and move the conversion @@ -118,7 +141,12 @@ where let root_node = root_node.rename(output_path.to_string().into_bytes().into()); log_node(&root_node, path.as_ref()); - let path_info = derive_nar_ca_path_info(nar_size, nar_sha256, root_node); + let path_info = derive_nar_ca_path_info( + nar_size, + nar_sha256, + Some(CAHash::Nar(NixHash::Sha256(nar_sha256))), + root_node, + ); // This new [`PathInfo`] that we get back from there might contain additional signatures or // information set by the service itself. In this function, we silently swallow it because @@ -133,21 +161,22 @@ mod tests { use std::{ffi::OsStr, path::PathBuf}; use crate::import::path_to_name; - use test_case::test_case; + use rstest::rstest; - #[test_case("a/b/c", "c"; "simple path")] - #[test_case("a/b/../c", "c"; "simple path containing ..")] - #[test_case("a/b/../c/d/../e", "e"; "path containing multiple ..")] + #[rstest] + #[case::simple_path("a/b/c", "c")] + #[case::simple_path_containing_dotdot("a/b/../c", "c")] + #[case::path_containing_multiple_dotdot("a/b/../c/d/../e", "e")] - fn test_path_to_name(path: &str, expected_name: &str) { + fn test_path_to_name(#[case] path: &str, #[case] expected_name: &str) { let path: PathBuf = path.into(); assert_eq!(path_to_name(&path).expect("must succeed"), expected_name); } - #[test_case(b"a/b/.."; "path ending in ..")] - #[test_case(b"\xf8\xa1\xa1\xa1\xa1"; "non unicode path")] - - fn test_invalid_path_to_name(invalid_path: &[u8]) { + #[rstest] + #[case::path_ending_in_dotdot(b"a/b/..")] + #[case::non_unicode_path(b"\xf8\xa1\xa1\xa1\xa1")] + fn test_invalid_path_to_name(#[case] invalid_path: &[u8]) { let path: PathBuf = unsafe { OsStr::from_encoded_bytes_unchecked(invalid_path) }.into(); path_to_name(&path).expect_err("must fail"); } diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 6f4dcdea5d..32c2f4e580 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,225 +1,171 @@ -use bytes::Bytes; -use nix_compat::nar; -use std::io::{self, BufRead}; -use tokio_util::io::SyncIoBridge; -use tracing::warn; +use nix_compat::nar::reader::r#async as nar_reader; +use sha2::Digest; +use tokio::{ + io::{AsyncBufRead, AsyncRead}, + sync::mpsc, + try_join, +}; use tvix_castore::{ blobservice::BlobService, - directoryservice::{DirectoryPutter, DirectoryService}, - proto::{self as castorepb}, - B3Digest, + directoryservice::DirectoryService, + import::{ + blobs::{self, ConcurrentBlobUploader}, + ingest_entries, IngestionEntry, IngestionError, + }, + proto::{node::Node, NamedNode}, + PathBuf, }; -/// Accepts a reader providing a NAR. -/// Will traverse it, uploading blobs to the given [BlobService], and -/// directories to the given [DirectoryService]. -/// On success, the root node is returned. -/// This function is not async (because the NAR reader is not) -/// and calls [tokio::task::block_in_place] when interacting with backing -/// services, so make sure to only call this with spawn_blocking. -pub fn read_nar<R, BS, DS>( - r: &mut R, +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// Returns the castore root node, as well as the sha256 and size of the NAR +/// contents ingested. +pub async fn ingest_nar_and_hash<R, BS, DS>( blob_service: BS, directory_service: DS, -) -> io::Result<castorepb::node::Node> + r: &mut R, +) -> Result<(Node, [u8; 32], u64), IngestionError<Error>> where - R: BufRead + Send, - BS: AsRef<dyn BlobService>, - DS: AsRef<dyn DirectoryService>, + R: AsyncRead + Unpin + Send, + BS: BlobService + Clone + 'static, + DS: DirectoryService, { - let handle = tokio::runtime::Handle::current(); - - let directory_putter = directory_service.as_ref().put_multiple_start(); - - let node = nix_compat::nar::reader::open(r)?; - let (root_node, mut directory_putter, _) = process_node( - handle.clone(), - "".into(), // this is the root node, it has an empty name - node, - &blob_service, - directory_putter, - )?; - - // In case the root node points to a directory, we need to close - // [directory_putter], and ensure the digest we got back from there matches - // what the root node is pointing to. - if let castorepb::node::Node::Directory(ref directory_node) = root_node { - // Close directory_putter to make sure all directories have been inserted. - let directory_putter_digest = - handle.block_on(handle.spawn(async move { directory_putter.close().await }))??; - let root_directory_node_digest: B3Digest = - directory_node.digest.clone().try_into().unwrap(); - - if directory_putter_digest != root_directory_node_digest { - warn!( - root_directory_node_digest = %root_directory_node_digest, - directory_putter_digest =%directory_putter_digest, - "directory digest mismatch", - ); - return Err(io::Error::new( - io::ErrorKind::Other, - "directory digest mismatch", - )); - } - } - // In case it's not a Directory, [directory_putter] doesn't need to be - // closed (as we didn't end up uploading anything). - // It can just be dropped, as documented in its trait. + let mut nar_hash = sha2::Sha256::new(); + let mut nar_size = 0; - Ok(root_node) -} + // Assemble NarHash and NarSize as we read bytes. + let r = tokio_util::io::InspectReader::new(r, |b| { + nar_size += b.len() as u64; + use std::io::Write; + nar_hash.write_all(b).unwrap(); + }); -/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node]. -/// It does so by handling all three kinds, and recursing for directories. -/// -/// [DirectoryPutter] is passed around, so a single instance of it can be used, -/// which is sufficient, as this reads through the whole NAR linerarly. -fn process_node<BS>( - handle: tokio::runtime::Handle, - name: bytes::Bytes, - node: nar::reader::Node, - blob_service: BS, - directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)> -where - BS: AsRef<dyn BlobService>, -{ - Ok(match node { - nar::reader::Node::Symlink { target } => ( - castorepb::node::Node::Symlink(castorepb::SymlinkNode { - name, - target: target.into(), - }), - directory_putter, - blob_service, - ), - nar::reader::Node::File { executable, reader } => ( - castorepb::node::Node::File(process_file_reader( - handle, - name, - reader, - executable, - &blob_service, - )?), - directory_putter, - blob_service, - ), - nar::reader::Node::Directory(dir_reader) => { - let (directory_node, directory_putter, blob_service_back) = - process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?; - - ( - castorepb::node::Node::Directory(directory_node), - directory_putter, - blob_service_back, - ) - } - }) + // HACK: InspectReader doesn't implement AsyncBufRead. + // See if this can be propagated through and we can then require our input + // reader to be buffered too. + let mut r = tokio::io::BufReader::new(r); + + let root_node = ingest_nar(blob_service, directory_service, &mut r).await?; + + Ok((root_node, nar_hash.finalize().into(), nar_size)) } -/// Given a name and [nar::reader::FileReader], this ingests the file into the -/// passed [BlobService] and returns a [castorepb::FileNode]. -fn process_file_reader<BS>( - handle: tokio::runtime::Handle, - name: Bytes, - mut file_reader: nar::reader::FileReader, - executable: bool, +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// It returns the castore root node or an error. +pub async fn ingest_nar<R, BS, DS>( blob_service: BS, -) -> io::Result<castorepb::FileNode> + directory_service: DS, + r: &mut R, +) -> Result<Node, IngestionError<Error>> where - BS: AsRef<dyn BlobService>, + R: AsyncBufRead + Unpin + Send, + BS: BlobService + Clone + 'static, + DS: DirectoryService, { - // store the length. If we read any other length, reading will fail. - let expected_len = file_reader.len(); + // open the NAR for reading. + // The NAR reader emits nodes in DFS preorder. + let root_node = nar_reader::open(r).await.map_err(Error::IO)?; - // prepare writing a new blob. - let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await }); + let (tx, rx) = mpsc::channel(1); + let rx = tokio_stream::wrappers::ReceiverStream::new(rx); - // write the blob. - let mut blob_writer = { - let mut dst = SyncIoBridge::new(blob_writer); + let produce = async move { + let mut blob_uploader = ConcurrentBlobUploader::new(blob_service); - file_reader.copy(&mut dst)?; - dst.shutdown()?; + let res = produce_nar_inner( + &mut blob_uploader, + root_node, + "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT. + tx.clone(), + ) + .await; + + if let Err(err) = blob_uploader.join().await { + tx.send(Err(err.into())) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; + } + + tx.send(res) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; - // return back the blob_writer - dst.into_inner() + Ok(()) }; - // close the blob_writer, retrieve the digest. - let blob_digest = handle.block_on(async { blob_writer.close().await })?; + let consume = ingest_entries(directory_service, rx); - Ok(castorepb::FileNode { - name, - digest: blob_digest.into(), - size: expected_len, - executable, - }) + let (_, node) = try_join!(produce, consume)?; + + // remove the fake "root" name again + debug_assert_eq!(&node.get_name(), b"root"); + Ok(node.rename("".into())) } -/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode]. -/// It uses [process_node] to iterate over all children. -/// -/// [DirectoryPutter] is passed around, so a single instance of it can be used, -/// which is sufficient, as this reads through the whole NAR linerarly. -fn process_dir_reader<BS>( - handle: tokio::runtime::Handle, - name: Bytes, - mut dir_reader: nar::reader::DirReader, - blob_service: BS, - directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)> +async fn produce_nar_inner<BS>( + blob_uploader: &mut ConcurrentBlobUploader<BS>, + node: nar_reader::Node<'_, '_>, + path: PathBuf, + tx: mpsc::Sender<Result<IngestionEntry, Error>>, +) -> Result<IngestionEntry, Error> where - BS: AsRef<dyn BlobService>, + BS: BlobService + Clone + 'static, { - let mut directory = castorepb::Directory::default(); - - let mut directory_putter = directory_putter; - let mut blob_service = blob_service; - while let Some(entry) = dir_reader.next()? { - let (node, directory_putter_back, blob_service_back) = process_node( - handle.clone(), - entry.name.into(), - entry.node, - blob_service, - directory_putter, - )?; - - blob_service = blob_service_back; - directory_putter = directory_putter_back; - - match node { - castorepb::node::Node::Directory(node) => directory.directories.push(node), - castorepb::node::Node::File(node) => directory.files.push(node), - castorepb::node::Node::Symlink(node) => directory.symlinks.push(node), + Ok(match node { + nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target }, + nar_reader::Node::File { + executable, + mut reader, + } => { + let size = reader.len(); + let digest = blob_uploader.upload(&path, size, &mut reader).await?; + + IngestionEntry::Regular { + path, + size, + executable, + digest, + } } - } + nar_reader::Node::Directory(mut dir_reader) => { + while let Some(entry) = dir_reader.next().await? { + let mut path = path.clone(); + + // valid NAR names are valid castore names + path.try_push(entry.name) + .expect("Tvix bug: failed to join name"); + + let entry = Box::pin(produce_nar_inner( + blob_uploader, + entry.node, + path, + tx.clone(), + )) + .await?; + + tx.send(Ok(entry)).await.map_err(|e| { + Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)) + })?; + } + + IngestionEntry::Dir { path } + } + }) +} - // calculate digest and size. - let directory_digest = directory.digest(); - let directory_size = directory.size(); - - // upload the directory. This is a bit more verbose, as we want to get back - // directory_putter for later reuse. - let directory_putter = handle.block_on(handle.spawn(async move { - directory_putter.put(directory).await?; - Ok::<_, io::Error>(directory_putter) - }))??; - - Ok(( - castorepb::DirectoryNode { - name, - digest: directory_digest.into(), - size: directory_size, - }, - directory_putter, - blob_service, - )) +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + IO(#[from] std::io::Error), + + #[error(transparent)] + BlobUpload(#[from] blobs::Error), } #[cfg(test)] mod test { - use crate::nar::read_nar; + use crate::nar::ingest_nar; use std::io::Cursor; use std::sync::Arc; @@ -244,19 +190,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking(|| { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), - blob_service, - directory_service, - ) - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service, + directory_service, + &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::Symlink(castorepb::SymlinkNode { @@ -273,22 +213,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking({ - let blob_service = blob_service.clone(); - move || { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), - blob_service, - directory_service, - ) - } - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service.clone(), + directory_service, + &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::File(castorepb::FileNode { @@ -310,23 +241,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking({ - let blob_service = blob_service.clone(); - let directory_service = directory_service.clone(); - || { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), - blob_service, - directory_service, - ) - } - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service.clone(), + directory_service.clone(), + &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::Directory(castorepb::DirectoryNode { diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs index 49bb92fb0f..8cbb091f1a 100644 --- a/tvix/store/src/nar/mod.rs +++ b/tvix/store/src/nar/mod.rs @@ -1,10 +1,37 @@ +use tonic::async_trait; use tvix_castore::B3Digest; mod import; mod renderer; -pub use import::read_nar; +pub use import::ingest_nar; +pub use import::ingest_nar_and_hash; pub use renderer::calculate_size_and_sha256; pub use renderer::write_nar; +pub use renderer::SimpleRenderer; +use tvix_castore::proto as castorepb; + +#[async_trait] +pub trait NarCalculationService: Send + Sync { + /// Return the nar size and nar sha256 digest for a given root node. + /// This can be used to calculate NAR-based output paths. + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error>; +} + +#[async_trait] +impl<A> NarCalculationService for A +where + A: AsRef<dyn NarCalculationService> + Send + Sync, +{ + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error> { + self.as_ref().calculate_nar(root_node).await + } +} /// Errors that can encounter while rendering NARs. #[derive(Debug, thiserror::Error)] diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index 9ac363ff57..efd67671db 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -1,20 +1,51 @@ -use super::RenderError; -use async_recursion::async_recursion; +use crate::utils::AsyncIoBridge; + +use super::{NarCalculationService, RenderError}; use count_write::CountWrite; use nix_compat::nar::writer::r#async as nar_writer; use sha2::{Digest, Sha256}; -use std::{ - pin::Pin, - task::{self, Poll}, -}; use tokio::io::{self, AsyncWrite, BufReader}; -use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; +use tonic::async_trait; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, proto::{self as castorepb, NamedNode}, }; +pub struct SimpleRenderer<BS, DS> { + blob_service: BS, + directory_service: DS, +} + +impl<BS, DS> SimpleRenderer<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { + Self { + blob_service, + directory_service, + } + } +} + +#[async_trait] +impl<BS, DS> NarCalculationService for SimpleRenderer<BS, DS> +where + BS: BlobService + Clone, + DS: DirectoryService + Clone, +{ + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error> { + calculate_size_and_sha256( + root_node, + self.blob_service.clone(), + self.directory_service.clone(), + ) + .await + .map_err(|e| tvix_castore::Error::StorageError(format!("failed rendering nar: {}", e))) + } +} + /// Invoke [write_nar], and return the size and sha256 digest of the produced /// NAR output. pub async fn calculate_size_and_sha256<BS, DS>( @@ -42,37 +73,12 @@ where Ok((cw.count(), h.finalize().into())) } -/// The inverse of [tokio_util::io::SyncIoBridge]. -/// Don't use this with anything that actually does blocking I/O. -struct AsyncIoBridge<T>(T); - -impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> { - fn poll_write( - self: Pin<&mut Self>, - _cx: &mut task::Context<'_>, - buf: &[u8], - ) -> Poll<io::Result<usize>> { - Poll::Ready(self.get_mut().0.write(buf)) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { - Poll::Ready(self.get_mut().0.flush()) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - _cx: &mut task::Context<'_>, - ) -> Poll<Result<(), io::Error>> { - Poll::Ready(Ok(())) - } -} - /// Accepts a [castorepb::node::Node] pointing to the root of a (store) path, /// and uses the passed blob_service and directory_service to perform the /// necessary lookups as it traverses the structure. /// The contents in NAR serialization are writen to the passed [AsyncWrite]. pub async fn write_nar<W, BS, DS>( - w: W, + mut w: W, proto_root_node: &castorepb::node::Node, blob_service: BS, directory_service: DS, @@ -83,7 +89,6 @@ where DS: DirectoryService + Send, { // Initialize NAR writer - let mut w = w.compat_write(); let nar_root_node = nar_writer::open(&mut w) .await .map_err(RenderError::NARWriterError)?; @@ -101,9 +106,8 @@ where /// Process an intermediate node in the structure. /// This consumes the node. -#[async_recursion] async fn walk_node<BS, DS>( - nar_node: nar_writer::Node<'async_recursion, '_>, + nar_node: nar_writer::Node<'_, '_>, proto_node: &castorepb::node::Node, blob_service: BS, directory_service: DS, @@ -128,7 +132,7 @@ where )) })?; - let blob_reader = match blob_service + let mut blob_reader = match blob_service .open_read(&digest) .await .map_err(RenderError::StoreError)? @@ -144,7 +148,7 @@ where .file( proto_file_node.executable, proto_file_node.size, - &mut blob_reader.compat(), + &mut blob_reader, ) .await .map_err(RenderError::NARWriterError)?; @@ -193,9 +197,13 @@ where .await .map_err(RenderError::NARWriterError)?; - (blob_service, directory_service) = - walk_node(child_node, &proto_node, blob_service, directory_service) - .await?; + (blob_service, directory_service) = Box::pin(walk_node( + child_node, + &proto_node, + blob_service, + directory_service, + )) + .await?; } // close the directory diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs new file mode 100644 index 0000000000..707a686c0a --- /dev/null +++ b/tvix/store/src/pathinfoservice/bigtable.rs @@ -0,0 +1,412 @@ +use super::PathInfoService; +use crate::proto; +use crate::proto::PathInfo; +use async_stream::try_stream; +use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2}; +use bytes::Bytes; +use data_encoding::HEXLOWER; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use prost::Message; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DurationSeconds}; +use tonic::async_trait; +use tracing::{instrument, trace}; +use tvix_castore::Error; + +/// There should not be more than 10 MiB in a single cell. +/// https://cloud.google.com/bigtable/docs/schema-design#cells +const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; + +/// Provides a [DirectoryService] implementation using +/// [Bigtable](https://cloud.google.com/bigtable/docs/) +/// as an underlying K/V store. +/// +/// # Data format +/// We use Bigtable as a plain K/V store. +/// The row key is the digest of the store path, in hexlower. +/// Inside the row, we currently have a single column/cell, again using the +/// hexlower store path digest. +/// Its value is the PathInfo message, serialized in canonical protobuf. +/// We currently only populate this column. +/// +/// Listing is ranging over all rows, and calculate_nar is returning a +/// "unimplemented" error. +#[derive(Clone)] +pub struct BigtablePathInfoService { + client: bigtable::BigTable, + params: BigtableParameters, + + #[cfg(test)] + #[allow(dead_code)] + /// Holds the temporary directory containing the unix socket, and the + /// spawned emulator process. + emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, +} + +/// Represents configuration of [BigtablePathInfoService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option<DurationSeconds<String>>")] + #[serde(default = "default_timeout")] + timeout: Option<std::time::Duration>, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +impl BigtableParameters { + #[cfg(test)] + pub fn default_for_tests() -> Self { + Self { + project_id: "project-1".into(), + instance_name: "instance-1".into(), + is_read_only: false, + channel_size: default_channel_size(), + timeout: default_timeout(), + table_name: "table-1".into(), + family_name: "cf1".into(), + app_profile_id: default_app_profile_id(), + } + } +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option<std::time::Duration> { + Some(std::time::Duration::from_secs(4)) +} + +impl BigtablePathInfoService { + #[cfg(not(test))] + pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + let connection = bigtable::BigTableConnection::new( + ¶ms.project_id, + ¶ms.instance_name, + params.is_read_only, + params.channel_size, + params.timeout, + ) + .await?; + + Ok(Self { + client: connection.client(), + params, + }) + } + + #[cfg(test)] + pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + use std::time::Duration; + + use async_process::{Command, Stdio}; + use tempfile::TempDir; + use tokio_retry::{strategy::ExponentialBackoff, Retry}; + + let tmpdir = TempDir::new().unwrap(); + + let socket_path = tmpdir.path().join("cbtemulator.sock"); + + let emulator_process = Command::new("cbtemulator") + .arg("-address") + .arg(socket_path.clone()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .expect("failed to spawn emulator"); + + Retry::spawn( + ExponentialBackoff::from_millis(20) + .max_delay(Duration::from_secs(1)) + .take(3), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // populate the emulator + for cmd in &[ + vec!["createtable", ¶ms.table_name], + vec!["createfamily", ¶ms.table_name, ¶ms.family_name], + ] { + Command::new("cbt") + .args({ + let mut args = vec![ + "-instance", + ¶ms.instance_name, + "-project", + ¶ms.project_id, + ]; + args.extend_from_slice(cmd); + args + }) + .env( + "BIGTABLE_EMULATOR_HOST", + format!("unix://{}", socket_path.to_string_lossy()), + ) + .output() + .await + .expect("failed to run cbt setup command"); + } + + let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator( + &format!("unix://{}", socket_path.to_string_lossy()), + ¶ms.project_id, + ¶ms.instance_name, + false, + None, + )?; + + Ok(Self { + client: connection.client(), + params, + emulator: (tmpdir, emulator_process).into(), + }) + } +} + +/// Derives the row/column key for a given output path. +/// We use hexlower encoding, also because it can't be misinterpreted as RE2. +fn derive_pathinfo_key(digest: &[u8; 20]) -> String { + HEXLOWER.encode(digest) +} + +#[async_trait] +impl PathInfoService for BigtablePathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let mut client = self.client.clone(); + let path_info_key = derive_pathinfo_key(&digest); + + let request = bigtable_v2::ReadRowsRequest { + app_profile_id: self.params.app_profile_id.to_string(), + table_name: client.get_full_table_name(&self.params.table_name), + rows_limit: 1, + rows: Some(bigtable_v2::RowSet { + row_keys: vec![path_info_key.clone().into()], + row_ranges: vec![], + }), + // Filter selected family name, and column qualifier matching the digest. + // The latter is to ensure we don't fail once we start adding more metadata. + filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::Chain( + bigtable_v2::row_filter::Chain { + filters: vec![ + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::FamilyNameRegexFilter( + self.params.family_name.to_string(), + ), + ), + }, + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + path_info_key.clone().into(), + ), + ), + }, + ], + }, + )), + }), + ..Default::default() + }; + + let mut response = client + .read_rows(request) + .await + .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?; + + if response.len() != 1 { + if response.len() > 1 { + // This shouldn't happen, we limit number of rows to 1 + return Err(Error::StorageError( + "got more than one row from bigtable".into(), + )); + } + // else, this is simply a "not found". + return Ok(None); + } + + let (row_key, mut cells) = response.pop().unwrap(); + if row_key != path_info_key.as_bytes() { + // This shouldn't happen, we requested this row key. + return Err(Error::StorageError( + "got wrong row key from bigtable".into(), + )); + } + + let cell = cells + .pop() + .ok_or_else(|| Error::StorageError("found no cells".into()))?; + + // Ensure there's only one cell (so no more left after the pop()) + // This shouldn't happen, We filter out other cells in our query. + if !cells.is_empty() { + return Err(Error::StorageError( + "more than one cell returned from bigtable".into(), + )); + } + + // We also require the qualifier to be correct in the filter above, + // so this shouldn't happen. + if path_info_key.as_bytes() != cell.qualifier { + return Err(Error::StorageError("unexpected cell qualifier".into())); + } + + // Try to parse the value into a PathInfo message + let path_info = proto::PathInfo::decode(Bytes::from(cell.value)) + .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?; + + let store_path = path_info + .validate() + .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?; + + if store_path.digest() != &digest { + return Err(Error::StorageError("PathInfo has unexpected digest".into())); + } + + Ok(Some(path_info)) + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("pathinfo failed validation: {}", e)))?; + + let mut client = self.client.clone(); + let path_info_key = derive_pathinfo_key(store_path.digest()); + + let data = path_info.encode_to_vec(); + if data.len() as u64 > CELL_SIZE_LIMIT { + return Err(Error::StorageError( + "PathInfo exceeds cell limit on Bigtable".into(), + )); + } + + let resp = client + .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest { + table_name: client.get_full_table_name(&self.params.table_name), + app_profile_id: self.params.app_profile_id.to_string(), + row_key: path_info_key.clone().into(), + predicate_filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + path_info_key.clone().into(), + )), + }), + // If the column was already found, do nothing. + true_mutations: vec![], + // Else, do the insert. + false_mutations: vec![ + // https://cloud.google.com/bigtable/docs/writes + bigtable_v2::Mutation { + mutation: Some(bigtable_v2::mutation::Mutation::SetCell( + bigtable_v2::mutation::SetCell { + family_name: self.params.family_name.to_string(), + column_qualifier: path_info_key.clone().into(), + timestamp_micros: -1, // use server time to fill timestamp + value: data, + }, + )), + }, + ], + }) + .await + .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?; + + if resp.predicate_matched { + trace!("already existed") + } + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let mut client = self.client.clone(); + + let request = bigtable_v2::ReadRowsRequest { + app_profile_id: self.params.app_profile_id.to_string(), + table_name: client.get_full_table_name(&self.params.table_name), + filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter( + self.params.family_name.to_string(), + )), + }), + ..Default::default() + }; + + let stream = try_stream! { + // TODO: add pagination, we don't want to hold all of this in memory. + let response = client + .read_rows(request) + .await + .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?; + + for (row_key, mut cells) in response { + let cell = cells + .pop() + .ok_or_else(|| Error::StorageError("found no cells".into()))?; + + // The cell must have the same qualifier as the row key + if row_key != cell.qualifier { + Err(Error::StorageError("unexpected cell qualifier".into()))?; + } + + // Ensure there's only one cell (so no more left after the pop()) + // This shouldn't happen, We filter out other cells in our query. + if !cells.is_empty() { + + Err(Error::StorageError( + "more than one cell returned from bigtable".into(), + ))? + } + + // Try to parse the value into a PathInfo message. + let path_info = proto::PathInfo::decode(Bytes::from(cell.value)) + .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?; + + // Validate the containing PathInfo, ensure its StorePath digest + // matches row key. + let store_path = path_info + .validate() + .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?; + + if store_path.digest().as_slice() != row_key.as_slice() { + Err(Error::StorageError("PathInfo has unexpected digest".into()))? + } + + + yield path_info + } + }; + + Box::pin(stream) + } +} diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs new file mode 100644 index 0000000000..664144ef49 --- /dev/null +++ b/tvix/store/src/pathinfoservice/combinators.rs @@ -0,0 +1,111 @@ +use crate::proto::PathInfo; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use tonic::async_trait; +use tracing::{debug, instrument}; +use tvix_castore::Error; + +use super::PathInfoService; + +/// Asks near first, if not found, asks far. +/// If found in there, returns it, and *inserts* it into +/// near. +/// There is no negative cache. +/// Inserts and listings are not implemented for now. +pub struct Cache<PS1, PS2> { + near: PS1, + far: PS2, +} + +impl<PS1, PS2> Cache<PS1, PS2> { + pub fn new(near: PS1, far: PS2) -> Self { + Self { near, far } + } +} + +#[async_trait] +impl<PS1, PS2> PathInfoService for Cache<PS1, PS2> +where + PS1: PathInfoService, + PS2: PathInfoService, +{ + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + match self.near.get(digest).await? { + Some(path_info) => { + debug!("serving from cache"); + Ok(Some(path_info)) + } + None => { + debug!("not found in near, asking remoteโฆ"); + match self.far.get(digest).await? { + None => Ok(None), + Some(path_info) => { + debug!("found in remote, adding to cache"); + self.near.put(path_info.clone()).await?; + Ok(Some(path_info)) + } + } + } + } + } + + async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> { + Err(Error::StorageError("unimplemented".to_string())) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + Box::pin(tokio_stream::once(Err(Error::StorageError( + "unimplemented".to_string(), + )))) + } +} + +#[cfg(test)] +mod test { + use std::num::NonZeroUsize; + + use crate::{ + pathinfoservice::{LruPathInfoService, MemoryPathInfoService, PathInfoService}, + tests::fixtures::PATH_INFO_WITH_NARINFO, + }; + + const PATH_INFO_DIGEST: [u8; 20] = [0; 20]; + + /// Helper function setting up an instance of a "far" and "near" + /// PathInfoService. + async fn create_pathinfoservice() -> super::Cache<LruPathInfoService, MemoryPathInfoService> { + // Create an instance of a "far" PathInfoService. + let far = MemoryPathInfoService::default(); + + // โฆ and an instance of a "near" PathInfoService. + let near = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap()); + + // create a Pathinfoservice combining the two and return it. + super::Cache::new(near, far) + } + + /// Getting from the far backend is gonna insert it into the near one. + #[tokio::test] + async fn test_populate_cache() { + let svc = create_pathinfoservice().await; + + // query the PathInfo, things should not be there. + assert!(svc.get(PATH_INFO_DIGEST).await.unwrap().is_none()); + + // insert it into the far one. + svc.far.put(PATH_INFO_WITH_NARINFO.clone()).await.unwrap(); + + // now try getting it again, it should succeed. + assert_eq!( + Some(PATH_INFO_WITH_NARINFO.clone()), + svc.get(PATH_INFO_DIGEST).await.unwrap() + ); + + // peek near, it should now be there. + assert_eq!( + Some(PATH_INFO_WITH_NARINFO.clone()), + svc.near.get(PATH_INFO_DIGEST).await.unwrap() + ); + } +} diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 6109054d7a..455909e7f2 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -37,7 +37,8 @@ pub async fn from_addr( blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) -> Result<Box<dyn PathInfoService>, Error> { - let url = + #[allow(unused_mut)] + let mut url = Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; let path_info_service: Box<dyn PathInfoService> = match url.scheme() { @@ -46,7 +47,7 @@ pub async fn from_addr( if url.has_host() || !url.path().is_empty() { return Err(Error::StorageError("invalid url".to_string())); } - Box::new(MemoryPathInfoService::new(blob_service, directory_service)) + Box::<MemoryPathInfoService>::default() } "sled" => { // sled doesn't support host, and a path can be provided (otherwise @@ -64,10 +65,10 @@ pub async fn from_addr( // TODO: expose other parameters as URL parameters? Box::new(if url.path().is_empty() { - SledPathInfoService::new_temporary(blob_service, directory_service) + SledPathInfoService::new_temporary() .map_err(|e| Error::StorageError(e.to_string()))? } else { - SledPathInfoService::new(url.path(), blob_service, directory_service) + SledPathInfoService::new(url.path()) .map_err(|e| Error::StorageError(e.to_string()))? }) } @@ -108,6 +109,30 @@ pub async fn from_addr( PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?); Box::new(GRPCPathInfoService::from_client(client)) } + #[cfg(feature = "cloud")] + "bigtable" => { + use super::bigtable::BigtableParameters; + use super::BigtablePathInfoService; + + // parse the instance name from the hostname. + let instance_name = url + .host_str() + .ok_or_else(|| Error::StorageError("instance name missing".into()))? + .to_string(); + + // โฆ but add it to the query string now, so we just need to parse that. + url.query_pairs_mut() + .append_pair("instance_name", &instance_name); + + let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; + + Box::new( + BigtablePathInfoService::connect(params) + .await + .map_err(|e| Error::StorageError(e.to_string()))?, + ) + } _ => Err(Error::StorageError(format!( "unknown scheme: {}", url.scheme() @@ -121,9 +146,9 @@ pub async fn from_addr( mod tests { use super::from_addr; use lazy_static::lazy_static; + use rstest::rstest; use std::sync::Arc; use tempfile::TempDir; - use test_case::test_case; use tvix_castore::{ blobservice::{BlobService, MemoryBlobService}, directoryservice::{DirectoryService, MemoryDirectoryService}, @@ -136,52 +161,66 @@ mod tests { // the gRPC tests below don't fail, because we connect lazily. + #[rstest] /// This uses a unsupported scheme. - #[test_case("http://foo.example/test", false; "unsupported scheme")] + #[case::unsupported_scheme("http://foo.example/test", false)] /// This configures sled in temporary mode. - #[test_case("sled://", true; "sled valid temporary")] + #[case::sled_temporary("sled://", true)] /// This configures sled with /, which should fail. - #[test_case("sled:///", false; "sled invalid root")] + #[case::sled_invalid_root("sled:///", false)] /// This configures sled with a host, not path, which should fail. - #[test_case("sled://foo.example", false; "sled invalid host")] + #[case::sled_invalid_host("sled://foo.example", false)] /// This configures sled with a valid path path, which should succeed. - #[test_case(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true; "sled valid path")] + #[case::sled_valid_path(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true)] /// This configures sled with a host, and a valid path path, which should fail. - #[test_case(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false; "sled invalid host with valid path")] + #[case::sled_invalid_host_with_valid_path(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false)] /// This correctly sets the scheme, and doesn't set a path. - #[test_case("memory://", true; "memory valid")] + #[case::memory_valid("memory://", true)] /// This sets a memory url host to `foo` - #[test_case("memory://foo", false; "memory invalid host")] + #[case::memory_invalid_host("memory://foo", false)] /// This sets a memory url path to "/", which is invalid. - #[test_case("memory:///", false; "memory invalid root path")] + #[case::memory_invalid_root_path("memory:///", false)] /// This sets a memory url path to "/foo", which is invalid. - #[test_case("memory:///foo", false; "memory invalid root path foo")] + #[case::memory_invalid_root_path_foo("memory:///foo", false)] /// Correct Scheme for the cache.nixos.org binary cache. - #[test_case("nix+https://cache.nixos.org", true; "correct nix+https")] + #[case::correct_nix_https("nix+https://cache.nixos.org", true)] /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL). - #[test_case("nix+http://cache.nixos.org", true; "correct nix+http")] + #[case::correct_nix_http("nix+http://cache.nixos.org", true)] /// Correct Scheme for Nix HTTP Binary cache, with a subpath. - #[test_case("nix+http://192.0.2.1/foo", true; "correct nix http with subpath")] + #[case::correct_nix_http_with_subpath("nix+http://192.0.2.1/foo", true)] /// Correct Scheme for Nix HTTP Binary cache, with a subpath and port. - #[test_case("nix+http://[::1]:8080/foo", true; "correct nix http with subpath and port")] + #[case::correct_nix_http_with_subpath_and_port("nix+http://[::1]:8080/foo", true)] /// Correct Scheme for the cache.nixos.org binary cache, and correct trusted public key set - #[test_case("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", true; "correct nix+https with trusted-public-key")] + #[case::correct_nix_https_with_trusted_public_key("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", true)] /// Correct Scheme for the cache.nixos.org binary cache, and two correct trusted public keys set - #[test_case("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=%20foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", true; "correct nix+https with two trusted-public-key")] + #[case::correct_nix_https_with_two_trusted_public_keys("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=%20foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", true)] /// Correct scheme to connect to a unix socket. - #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")] + #[case::grpc_valid_unix_socket("grpc+unix:///path/to/somewhere", true)] /// Correct scheme for unix socket, but setting a host too, which is invalid. - #[test_case("grpc+unix://host.example/path/to/somewhere", false; "grpc invalid unix socket and host")] + #[case::grpc_invalid_unix_socket_and_host("grpc+unix://host.example/path/to/somewhere", false)] /// Correct scheme to connect to localhost, with port 12345 - #[test_case("grpc+http://[::1]:12345", true; "grpc valid IPv6 localhost port 12345")] + #[case::grpc_valid_ipv6_localhost_port_12345("grpc+http://[::1]:12345", true)] /// Correct scheme to connect to localhost over http, without specifying a port. - #[test_case("grpc+http://localhost", true; "grpc valid http host without port")] + #[case::grpc_valid_http_host_without_port("grpc+http://localhost", true)] /// Correct scheme to connect to localhost over http, without specifying a port. - #[test_case("grpc+https://localhost", true; "grpc valid https host without port")] + #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)] /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. - #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")] + #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)] + /// A valid example for Bigtable. + #[cfg_attr( + all(feature = "cloud", feature = "integration"), + case::bigtable_valid( + "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1", + true + ) + )] + /// An invalid example for Bigtable, missing fields + #[cfg_attr( + all(feature = "cloud", feature = "integration"), + case::bigtable_invalid_missing_fields("bigtable://instance-1", false) + )] #[tokio::test] - async fn test_from_addr_tokio(uri_str: &str, exp_succeed: bool) { + async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default()); let directory_service: Arc<dyn DirectoryService> = Arc::from(MemoryDirectoryService::default()); diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs index 45d59fd0bc..aa64b1c01f 100644 --- a/tvix/store/src/pathinfoservice/fs/mod.rs +++ b/tvix/store/src/pathinfoservice/fs/mod.rs @@ -17,6 +17,7 @@ pub fn make_fs<BS, DS, PS>( directory_service: DS, path_info_service: PS, list_root: bool, + show_xattr: bool, ) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>> where BS: AsRef<dyn BlobService> + Send + Clone + 'static, @@ -28,6 +29,7 @@ where directory_service, RootNodesWrapper(path_info_service), list_root, + show_xattr, ) } diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 7d740429cf..f6a356cf18 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -1,8 +1,13 @@ use super::PathInfoService; -use crate::proto::{self, ListPathInfoRequest, PathInfo}; +use crate::{ + nar::NarCalculationService, + proto::{self, ListPathInfoRequest, PathInfo}, +}; use async_stream::try_stream; use futures::stream::BoxStream; +use nix_compat::nixbase32; use tonic::{async_trait, transport::Channel, Code}; +use tracing::instrument; use tvix_castore::{proto as castorepb, Error}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. @@ -25,6 +30,7 @@ impl GRPCPathInfoService { #[async_trait] impl PathInfoService for GRPCPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let path_info = self .grpc_client @@ -51,6 +57,7 @@ impl PathInfoService for GRPCPathInfoService { } } + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { let path_info = self .grpc_client @@ -63,29 +70,7 @@ impl PathInfoService for GRPCPathInfoService { Ok(path_info) } - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - let path_info = self - .grpc_client - .clone() - .calculate_nar(castorepb::Node { - node: Some(root_node.clone()), - }) - .await - .map_err(|e| Error::StorageError(e.to_string()))? - .into_inner(); - - let nar_sha256: [u8; 32] = path_info - .nar_sha256 - .to_vec() - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; - - Ok((path_info.nar_size, nar_sha256)) - } - + #[instrument(level = "trace", skip_all)] fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { let mut grpc_client = self.grpc_client.clone(); @@ -120,88 +105,47 @@ impl PathInfoService for GRPCPathInfoService { } } +#[async_trait] +impl NarCalculationService for GRPCPathInfoService { + #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))] + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + let path_info = self + .grpc_client + .clone() + .calculate_nar(castorepb::Node { + node: Some(root_node.clone()), + }) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + let nar_sha256: [u8; 32] = path_info + .nar_sha256 + .to_vec() + .try_into() + .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; + + Ok((path_info.nar_size, nar_sha256)) + } +} + #[cfg(test)] mod tests { - use std::sync::Arc; - use std::time::Duration; - - use rstest::*; - use tempfile::TempDir; - use tokio::net::UnixListener; - use tokio_retry::strategy::ExponentialBackoff; - use tokio_retry::Retry; - use tokio_stream::wrappers::UnixListenerStream; - use tvix_castore::blobservice::BlobService; - use tvix_castore::directoryservice::DirectoryService; - - use crate::pathinfoservice::MemoryPathInfoService; - use crate::proto::path_info_service_client::PathInfoServiceClient; - use crate::proto::GRPCPathInfoServiceWrapper; - use crate::tests::fixtures::{self, blob_service, directory_service}; - - use super::GRPCPathInfoService; - use super::PathInfoService; + use crate::pathinfoservice::tests::make_grpc_path_info_service_client; + use crate::pathinfoservice::PathInfoService; + use crate::tests::fixtures; /// This ensures connecting via gRPC works as expected. - #[rstest] #[tokio::test] - async fn test_valid_unix_path_ping_pong( - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, - ) { - let tmpdir = TempDir::new().unwrap(); - let socket_path = tmpdir.path().join("daemon"); - - let path_clone = socket_path.clone(); - - // Spin up a server - tokio::spawn(async { - let uds = UnixListener::bind(path_clone).unwrap(); - let uds_stream = UnixListenerStream::new(uds); - - // spin up a new server - let mut server = tonic::transport::Server::builder(); - let router = server.add_service( - crate::proto::path_info_service_server::PathInfoServiceServer::new( - GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new( - blob_service, - directory_service, - )) - as Box<dyn PathInfoService>), - ), - ); - router.serve_with_incoming(uds_stream).await - }); - - // wait for the socket to be created - Retry::spawn( - ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), - || async { - if socket_path.exists() { - Ok(()) - } else { - Err(()) - } - }, - ) - .await - .expect("failed to wait for socket"); - - // prepare a client - let grpc_client = { - let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) - .expect("must parse"); - let client = PathInfoServiceClient::new( - tvix_castore::tonic::channel_from_url(&url) - .await - .expect("must succeed"), - ); - - GRPCPathInfoService::from_client(client) - }; + async fn test_valid_unix_path_ping_pong() { + let (_blob_service, _directory_service, path_info_service) = + make_grpc_path_info_service_client().await; - let path_info = grpc_client - .get(fixtures::DUMMY_OUTPUT_HASH) + let path_info = path_info_service + .get(fixtures::DUMMY_PATH_DIGEST) .await .expect("must not be error"); diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs new file mode 100644 index 0000000000..da674f497a --- /dev/null +++ b/tvix/store/src/pathinfoservice/lru.rs @@ -0,0 +1,128 @@ +use async_stream::try_stream; +use futures::stream::BoxStream; +use lru::LruCache; +use nix_compat::nixbase32; +use std::num::NonZeroUsize; +use std::sync::Arc; +use tokio::sync::RwLock; +use tonic::async_trait; +use tracing::instrument; + +use crate::proto::PathInfo; +use tvix_castore::Error; + +use super::PathInfoService; + +pub struct LruPathInfoService { + lru: Arc<RwLock<LruCache<[u8; 20], PathInfo>>>, +} + +impl LruPathInfoService { + pub fn with_capacity(capacity: NonZeroUsize) -> Self { + Self { + lru: Arc::new(RwLock::new(LruCache::new(capacity))), + } + } +} + +#[async_trait] +impl PathInfoService for LruPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + Ok(self.lru.write().await.get(&digest).cloned()) + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // call validate + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("invalid PathInfo: {}", e)))?; + + self.lru + .write() + .await + .put(*store_path.digest(), path_info.clone()); + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let lru = self.lru.clone(); + Box::pin(try_stream! { + let lru = lru.read().await; + let it = lru.iter(); + + for (_k,v) in it { + yield v.clone() + } + }) + } +} + +#[cfg(test)] +mod test { + use std::num::NonZeroUsize; + + use crate::{ + pathinfoservice::{LruPathInfoService, PathInfoService}, + proto::PathInfo, + tests::fixtures::PATH_INFO_WITH_NARINFO, + }; + use lazy_static::lazy_static; + use tvix_castore::proto as castorepb; + + lazy_static! { + static ref PATHINFO_1: PathInfo = PATH_INFO_WITH_NARINFO.clone(); + static ref PATHINFO_1_DIGEST: [u8; 20] = [0; 20]; + static ref PATHINFO_2: PathInfo = { + let mut p = PATHINFO_1.clone(); + let root_node = p.node.as_mut().unwrap(); + if let castorepb::Node { node: Some(node) } = root_node { + let n = node.to_owned(); + *node = n.rename("11111111111111111111111111111111-dummy2".into()); + } else { + unreachable!() + } + p + }; + static ref PATHINFO_2_DIGEST: [u8; 20] = *(PATHINFO_2.validate().unwrap()).digest(); + } + + #[tokio::test] + async fn evict() { + let svc = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap()); + + // pathinfo_1 should not be there + assert!(svc + .get(*PATHINFO_1_DIGEST) + .await + .expect("no error") + .is_none()); + + // insert it + svc.put(PATHINFO_1.clone()).await.expect("no error"); + + // now it should be there. + assert_eq!( + Some(PATHINFO_1.clone()), + svc.get(*PATHINFO_1_DIGEST).await.expect("no error") + ); + + // insert pathinfo_2. This will evict pathinfo 1 + svc.put(PATHINFO_2.clone()).await.expect("no error"); + + // now pathinfo 2 should be there. + assert_eq!( + Some(PATHINFO_2.clone()), + svc.get(*PATHINFO_2_DIGEST).await.expect("no error") + ); + + // โฆ but pathinfo 1 not anymore. + assert!(svc + .get(*PATHINFO_1_DIGEST) + .await + .expect("no error") + .is_none()); + } +} diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index f8435dbbf8..3de3221df2 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -1,40 +1,24 @@ use super::PathInfoService; -use crate::{nar::calculate_size_and_sha256, proto::PathInfo}; -use futures::stream::{iter, BoxStream}; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use crate::proto::PathInfo; +use async_stream::try_stream; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; use tonic::async_trait; -use tvix_castore::proto as castorepb; +use tracing::instrument; use tvix_castore::Error; -use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; -pub struct MemoryPathInfoService<BS, DS> { +#[derive(Default)] +pub struct MemoryPathInfoService { db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>, - - blob_service: BS, - directory_service: DS, -} - -impl<BS, DS> MemoryPathInfoService<BS, DS> { - pub fn new(blob_service: BS, directory_service: DS) -> Self { - Self { - db: Default::default(), - blob_service, - directory_service, - } - } } #[async_trait] -impl<BS, DS> PathInfoService for MemoryPathInfoService<BS, DS> -where - BS: AsRef<dyn BlobService> + Send + Sync, - DS: AsRef<dyn DirectoryService> + Send + Sync, -{ +impl PathInfoService for MemoryPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { - let db = self.db.read().unwrap(); + let db = self.db.read().await; match db.get(&digest) { None => Ok(None), @@ -42,6 +26,7 @@ where } } + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { // Call validate on the received PathInfo message. match path_info.validate() { @@ -53,7 +38,7 @@ where // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. // This overwrites existing PathInfo objects. Ok(nix_path) => { - let mut db = self.db.write().unwrap(); + let mut db = self.db.write().await; db.insert(*nix_path.digest(), path_info.clone()); Ok(path_info) @@ -61,24 +46,16 @@ where } } - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service) - .await - .map_err(|e| Error::StorageError(e.to_string())) - } - fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { - let db = self.db.read().unwrap(); + let db = self.db.clone(); - // Copy all elements into a list. - // This is a bit ugly, because we can't have db escape the lifetime - // of this function, but elements need to be returned owned anyways, and this in- - // memory impl is only for testing purposes anyways. - let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); + Box::pin(try_stream! { + let db = db.read().await; + let it = db.iter(); - Box::pin(iter(items)) + for (_k, v) in it { + yield v.clone() + } + }) } } diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index c334c8dc56..574bcc0b8b 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -1,5 +1,7 @@ +mod combinators; mod from_addr; mod grpc; +mod lru; mod memory; mod nix_http; mod sled; @@ -12,17 +14,23 @@ mod tests; use futures::stream::BoxStream; use tonic::async_trait; -use tvix_castore::proto as castorepb; use tvix_castore::Error; use crate::proto::PathInfo; +pub use self::combinators::Cache as CachePathInfoService; pub use self::from_addr::from_addr; pub use self::grpc::GRPCPathInfoService; +pub use self::lru::LruPathInfoService; pub use self::memory::MemoryPathInfoService; pub use self::nix_http::NixHTTPPathInfoService; pub use self::sled::SledPathInfoService; +#[cfg(feature = "cloud")] +mod bigtable; +#[cfg(feature = "cloud")] +pub use self::bigtable::BigtablePathInfoService; + #[cfg(any(feature = "fuse", feature = "virtiofs"))] pub use self::fs::make_fs; @@ -36,14 +44,6 @@ pub trait PathInfoService: Send + Sync { /// invalid messages. async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>; - /// Return the nar size and nar sha256 digest for a given root node. - /// This can be used to calculate NAR-based output paths, - /// and implementations are encouraged to cache it. - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error>; - /// Iterate over all PathInfo objects in the store. /// Implementations can decide to disallow listing. /// @@ -67,13 +67,6 @@ where self.as_ref().put(path_info).await } - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - self.as_ref().calculate_nar(root_node).await - } - fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { self.as_ref().list() } diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index bdb0e2c3cb..1dd7da4831 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -1,6 +1,5 @@ -use std::io::{self, BufRead, Read, Write}; - -use data_encoding::BASE64; +use super::PathInfoService; +use crate::{nar::ingest_nar_and_hash, proto::PathInfo}; use futures::{stream::BoxStream, TryStreamExt}; use nix_compat::{ narinfo::{self, NarInfo}, @@ -8,17 +7,13 @@ use nix_compat::{ nixhash::NixHash, }; use reqwest::StatusCode; -use sha2::{digest::FixedOutput, Digest, Sha256}; +use tokio::io::{self, AsyncRead}; use tonic::async_trait; use tracing::{debug, instrument, warn}; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, }; -use crate::proto::PathInfo; - -use super::PathInfoService; - /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix /// Store Model. @@ -32,8 +27,7 @@ use super::PathInfoService; /// /// The client is expected to be (indirectly) using the same [BlobService] and /// [DirectoryService], so able to fetch referred Directories and Blobs. -/// [PathInfoService::put] and [PathInfoService::calculate_nar] are not -/// implemented and return an error if called. +/// [PathInfoService::put] is not implemented and returns an error if called. /// TODO: what about reading from nix-cache-info? pub struct NixHTTPPathInfoService<BS, DS> { base_url: url::Url, @@ -71,7 +65,7 @@ where BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static, { - #[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))] + #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let narinfo_url = self .base_url @@ -171,85 +165,71 @@ where ))); } - // get an AsyncRead of the response body. - let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| { + // get a reader of the response body. + let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| { let e = e.without_url(); warn!(e=%e, "failed to get response body"); io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) })); - let sync_r = tokio_util::io::SyncIoBridge::new(async_r); - // handle decompression, by wrapping the reader. - let sync_r: Box<dyn BufRead + Send> = match narinfo.compression { - Some("none") => Box::new(sync_r), - Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))), - Some(comp) => { - return Err(Error::InvalidRequest( - format!("unsupported compression: {}", comp).to_string(), - )) - } - None => { - return Err(Error::InvalidRequest( - "unsupported compression: bzip2".to_string(), - )) + // handle decompression, depending on the compression field. + let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression { + Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>, + Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some(comp_str) => { + return Err(Error::StorageError(format!( + "unsupported compression: {comp_str}" + ))); } }; - let res = tokio::task::spawn_blocking({ - let blob_service = self.blob_service.clone(); - let directory_service = self.directory_service.clone(); - move || -> io::Result<_> { - // Wrap the reader once more, so we can calculate NarSize and NarHash - let mut sync_r = io::BufReader::new(NarReader::from(sync_r)); - let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?; - - let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner(); - - Ok((root_node, nar_hash, nar_size)) - } - }) + let (root_node, nar_hash, nar_size) = ingest_nar_and_hash( + self.blob_service.clone(), + self.directory_service.clone(), + &mut r, + ) .await - .unwrap(); - - match res { - Ok((root_node, nar_hash, nar_size)) => { - // ensure the ingested narhash and narsize do actually match. - if narinfo.nar_size != nar_size { - warn!( - narinfo.nar_size = narinfo.nar_size, - http.nar_size = nar_size, - "NARSize mismatch" - ); - Err(io::Error::new( - io::ErrorKind::InvalidData, - "NarSize mismatch".to_string(), - ))?; - } - if narinfo.nar_hash != nar_hash { - warn!( - narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash), - http.nar_hash = %NixHash::Sha256(nar_hash), - "NarHash mismatch" - ); - Err(io::Error::new( - io::ErrorKind::InvalidData, - "NarHash mismatch".to_string(), - ))?; - } - - Ok(Some(PathInfo { - node: Some(castorepb::Node { - // set the name of the root node to the digest-name of the store path. - node: Some( - root_node.rename(narinfo.store_path.to_string().to_owned().into()), - ), - }), - references: pathinfo.references, - narinfo: pathinfo.narinfo, - })) - } - Err(e) => Err(e.into()), + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + // ensure the ingested narhash and narsize do actually match. + if narinfo.nar_size != nar_size { + warn!( + narinfo.nar_size = narinfo.nar_size, + http.nar_size = nar_size, + "NarSize mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarSize mismatch".to_string(), + ))?; } + if narinfo.nar_hash != nar_hash { + warn!( + narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash), + http.nar_hash = %NixHash::Sha256(nar_hash), + "NarHash mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarHash mismatch".to_string(), + ))?; + } + + Ok(Some(PathInfo { + node: Some(castorepb::Node { + // set the name of the root node to the digest-name of the store path. + node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())), + }), + references: pathinfo.references, + narinfo: pathinfo.narinfo, + })) } #[instrument(skip_all, fields(path_info=?_path_info))] @@ -259,16 +239,6 @@ where )) } - #[instrument(skip_all, fields(root_node=?root_node))] - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - Err(Error::InvalidRequest( - "calculate_nar not supported for this backend".to_string(), - )) - } - fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { Box::pin(futures::stream::once(async { Err(Error::InvalidRequest( @@ -277,38 +247,3 @@ where })) } } - -/// Small helper reader implementing [std::io::Read]. -/// It can be used to wrap another reader, counts the number of bytes read -/// and the sha256 digest of the contents. -struct NarReader<R: Read> { - r: R, - - sha256: sha2::Sha256, - bytes_read: u64, -} - -impl<R: Read> NarReader<R> { - pub fn from(inner: R) -> Self { - Self { - r: inner, - sha256: Sha256::new(), - bytes_read: 0, - } - } - - /// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read. - pub fn into_inner(self) -> (R, [u8; 32], u64) { - (self.r, self.sha256.finalize_fixed().into(), self.bytes_read) - } -} - -impl<R: Read> Read for NarReader<R> { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.r.read(buf).map(|n| { - self.bytes_read += n as u64; - self.sha256.write_all(&buf[..n]).unwrap(); - n - }) - } -} diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 7b6d7fd7ab..96ade18169 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -1,140 +1,116 @@ use super::PathInfoService; -use crate::nar::calculate_size_and_sha256; use crate::proto::PathInfo; -use futures::stream::iter; +use async_stream::try_stream; use futures::stream::BoxStream; +use nix_compat::nixbase32; use prost::Message; use std::path::Path; use tonic::async_trait; -use tracing::warn; -use tvix_castore::proto as castorepb; -use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; +use tracing::{instrument, warn}; +use tvix_castore::Error; /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). /// /// The PathInfo messages are stored as encoded protos, and keyed by their output hash, /// as that's currently the only request type available. -pub struct SledPathInfoService<BS, DS> { +pub struct SledPathInfoService { db: sled::Db, - - blob_service: BS, - directory_service: DS, } -impl<BS, DS> SledPathInfoService<BS, DS> { - pub fn new<P: AsRef<Path>>( - p: P, - blob_service: BS, - directory_service: DS, - ) -> Result<Self, sled::Error> { +impl SledPathInfoService { + pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> { let config = sled::Config::default() .use_compression(false) // is a required parameter .path(p); let db = config.open()?; - Ok(Self { - db, - blob_service, - directory_service, - }) + Ok(Self { db }) } - pub fn new_temporary(blob_service: BS, directory_service: DS) -> Result<Self, sled::Error> { + pub fn new_temporary() -> Result<Self, sled::Error> { let config = sled::Config::default().temporary(true); let db = config.open()?; - Ok(Self { - db, - blob_service, - directory_service, - }) + Ok(Self { db }) } } #[async_trait] -impl<BS, DS> PathInfoService for SledPathInfoService<BS, DS> -where - BS: AsRef<dyn BlobService> + Send + Sync, - DS: AsRef<dyn DirectoryService> + Send + Sync, -{ +impl PathInfoService for SledPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { - match self.db.get(digest) { - Ok(None) => Ok(None), - Ok(Some(data)) => match PathInfo::decode(&*data) { - Ok(path_info) => Ok(Some(path_info)), - Err(e) => { + let resp = tokio::task::spawn_blocking({ + let db = self.db.clone(); + move || db.get(digest.as_slice()) + }) + .await? + .map_err(|e| { + warn!("failed to retrieve PathInfo: {}", e); + Error::StorageError(format!("failed to retrieve PathInfo: {}", e)) + })?; + match resp { + None => Ok(None), + Some(data) => { + let path_info = PathInfo::decode(&*data).map_err(|e| { warn!("failed to decode stored PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to decode stored PathInfo: {}", - e - ))) - } - }, - Err(e) => { - warn!("failed to retrieve PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to retrieve PathInfo: {}", - e - ))) + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?; + Ok(Some(path_info)) } } } + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { // Call validate on the received PathInfo message. - match path_info.validate() { - Err(e) => Err(Error::InvalidRequest(format!( - "failed to validate PathInfo: {}", - e - ))), - // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. - // This overwrites existing PathInfo objects. - Ok(nix_path) => match self - .db - .insert(*nix_path.digest(), path_info.encode_to_vec()) - { - Ok(_) => Ok(path_info), - Err(e) => { - warn!("failed to insert PathInfo: {}", e); - Err(Error::StorageError(format! { - "failed to insert PathInfo: {}", e - })) - } - }, - } - } + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?; + + // In case the PathInfo is valid, we were able to parse a StorePath. + // Store it in the database, keyed by its digest. + // This overwrites existing PathInfo objects. + tokio::task::spawn_blocking({ + let db = self.db.clone(); + let k = *store_path.digest(); + let data = path_info.encode_to_vec(); + move || db.insert(k, data) + }) + .await? + .map_err(|e| { + warn!("failed to insert PathInfo: {}", e); + Error::StorageError(format! { + "failed to insert PathInfo: {}", e + }) + })?; - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service) - .await - .map_err(|e| Error::StorageError(e.to_string())) + Ok(path_info) } fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { - Box::pin(iter(self.db.iter().values().map(|v| match v { - Ok(data) => { - // we retrieved some bytes - match PathInfo::decode(&*data) { - Ok(path_info) => Ok(path_info), - Err(e) => { - warn!("failed to decode stored PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to decode stored PathInfo: {}", - e - ))) - } - } - } - Err(e) => { - warn!("failed to retrieve PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to retrieve PathInfo: {}", - e - ))) + let db = self.db.clone(); + let mut it = db.iter().values(); + + Box::pin(try_stream! { + // Don't block the executor while waiting for .next(), so wrap that + // in a spawn_blocking call. + // We need to pass around it to be able to reuse it. + while let (Some(elem), new_it) = tokio::task::spawn_blocking(move || { + (it.next(), it) + }).await? { + it = new_it; + let data = elem.map_err(|e| { + warn!("failed to retrieve PathInfo: {}", e); + Error::StorageError(format!("failed to retrieve PathInfo: {}", e)) + })?; + + let path_info = PathInfo::decode(&*data).map_err(|e| { + warn!("failed to decode stored PathInfo: {}", e); + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?; + + yield path_info } - }))) + }) } } diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs index a3035d094d..061655e4ba 100644 --- a/tvix/store/src/pathinfoservice/tests/mod.rs +++ b/tvix/store/src/pathinfoservice/tests/mod.rs @@ -4,61 +4,30 @@ use rstest::*; use rstest_reuse::{self, *}; -use std::sync::Arc; -use tvix_castore::proto as castorepb; -use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; use super::PathInfoService; +use crate::pathinfoservice::MemoryPathInfoService; +use crate::pathinfoservice::SledPathInfoService; use crate::proto::PathInfo; -use crate::tests::fixtures::DUMMY_OUTPUT_HASH; +use crate::tests::fixtures::DUMMY_PATH_DIGEST; +use tvix_castore::proto as castorepb; mod utils; -use self::utils::make_grpc_path_info_service_client; - -/// Convenience type alias batching all three servives together. -#[allow(clippy::upper_case_acronyms)] -type BSDSPS = ( - Arc<dyn BlobService>, - Arc<dyn DirectoryService>, - Box<dyn PathInfoService>, -); - -/// Creates a PathInfoService using a new Memory{Blob,Directory}Service. -/// We return a 3-tuple containing all of them, as some tests want to interact -/// with all three. -pub async fn make_path_info_service(uri: &str) -> BSDSPS { - let blob_service: Arc<dyn BlobService> = tvix_castore::blobservice::from_addr("memory://") - .await - .unwrap() - .into(); - let directory_service: Arc<dyn DirectoryService> = - tvix_castore::directoryservice::from_addr("memory://") - .await - .unwrap() - .into(); +pub use self::utils::make_grpc_path_info_service_client; - ( - blob_service.clone(), - directory_service.clone(), - crate::pathinfoservice::from_addr(uri, blob_service, directory_service) - .await - .unwrap(), - ) -} +#[cfg(all(feature = "cloud", feature = "integration"))] +use self::utils::make_bigtable_path_info_service; #[template] #[rstest] -#[case::memory(make_path_info_service("memory://").await)] -#[case::grpc(make_grpc_path_info_service_client().await)] -#[case::sled(make_path_info_service("sled://").await)] -pub fn path_info_services( - #[case] services: ( - impl BlobService, - impl DirectoryService, - impl PathInfoService, - ), -) { -} +#[case::memory(MemoryPathInfoService::default())] +#[case::grpc({ + let (_, _, svc) = make_grpc_path_info_service_client().await; + svc +})] +#[case::sled(SledPathInfoService::new_temporary().unwrap())] +#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))] +pub fn path_info_services(#[case] svc: impl PathInfoService) {} // FUTUREWORK: add more tests rejecting invalid PathInfo messages. // A subset of them should also ensure references to other PathInfos, or @@ -67,10 +36,9 @@ pub fn path_info_services( /// Trying to get a non-existent PathInfo should return Ok(None). #[apply(path_info_services)] #[tokio::test] -async fn not_found(services: BSDSPS) { - let (_, _, path_info_service) = services; - assert!(path_info_service - .get(DUMMY_OUTPUT_HASH) +async fn not_found(svc: impl PathInfoService) { + assert!(svc + .get(DUMMY_PATH_DIGEST) .await .expect("must succeed") .is_none()); @@ -79,9 +47,7 @@ async fn not_found(services: BSDSPS) { /// Put a PathInfo into the store, get it back. #[apply(path_info_services)] #[tokio::test] -async fn put_get(services: BSDSPS) { - let (_, _, path_info_service) = services; - +async fn put_get(svc: impl PathInfoService) { let path_info = PathInfo { node: Some(castorepb::Node { node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode { @@ -93,20 +59,14 @@ async fn put_get(services: BSDSPS) { }; // insert - let resp = path_info_service - .put(path_info.clone()) - .await - .expect("must succeed"); + let resp = svc.put(path_info.clone()).await.expect("must succeed"); // expect the returned PathInfo to be equal (for now) // in the future, some stores might add additional fields/signatures. assert_eq!(path_info, resp); // get it back - let resp = path_info_service - .get(DUMMY_OUTPUT_HASH) - .await - .expect("must succeed"); + let resp = svc.get(DUMMY_PATH_DIGEST).await.expect("must succeed"); assert_eq!(Some(path_info), resp); } diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs index 31ec57aade..ee170468d1 100644 --- a/tvix/store/src/pathinfoservice/tests/utils.rs +++ b/tvix/store/src/pathinfoservice/tests/utils.rs @@ -1,8 +1,10 @@ use std::sync::Arc; use tonic::transport::{Endpoint, Server, Uri}; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; use crate::{ + nar::{NarCalculationService, SimpleRenderer}, pathinfoservice::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService}, proto::{ path_info_service_client::PathInfoServiceClient, @@ -14,7 +16,8 @@ use crate::{ /// Constructs and returns a gRPC PathInfoService. /// We also return memory-based {Blob,Directory}Service, /// as the consumer of this function accepts a 3-tuple. -pub async fn make_grpc_path_info_service_client() -> super::BSDSPS { +pub async fn make_grpc_path_info_service_client( +) -> (impl BlobService, impl DirectoryService, GRPCPathInfoService) { let (left, right) = tokio::io::duplex(64); let blob_service = blob_service(); @@ -26,12 +29,15 @@ pub async fn make_grpc_path_info_service_client() -> super::BSDSPS { let directory_service = directory_service.clone(); async move { let path_info_service: Arc<dyn PathInfoService> = - Arc::from(MemoryPathInfoService::new(blob_service, directory_service)); + Arc::from(MemoryPathInfoService::default()); + let nar_calculation_service = + Box::new(SimpleRenderer::new(blob_service, directory_service)) + as Box<dyn NarCalculationService>; - // spin up a new DirectoryService + // spin up a new PathInfoService let mut server = Server::builder(); let router = server.add_service(PathInfoServiceServer::new( - GRPCPathInfoServiceWrapper::new(path_info_service), + GRPCPathInfoServiceWrapper::new(path_info_service, nar_calculation_service), )); router @@ -43,18 +49,27 @@ pub async fn make_grpc_path_info_service_client() -> super::BSDSPS { // Create a client, connecting to the right side. The URI is unused. let mut maybe_right = Some(right); - let path_info_service = Box::new(GRPCPathInfoService::from_client( - PathInfoServiceClient::new( - Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(tower::service_fn(move |_: Uri| { - let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(right) } - })) - .await - .unwrap(), - ), + let path_info_service = GRPCPathInfoService::from_client(PathInfoServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(right) } + })) + .await + .unwrap(), )); (blob_service, directory_service, path_info_service) } + +#[cfg(all(feature = "cloud", feature = "integration"))] +pub(crate) async fn make_bigtable_path_info_service( +) -> crate::pathinfoservice::BigtablePathInfoService { + use crate::pathinfoservice::bigtable::BigtableParameters; + use crate::pathinfoservice::BigtablePathInfoService; + + BigtablePathInfoService::connect(BigtableParameters::default_for_tests()) + .await + .unwrap() +} diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 9f45818227..68f5575676 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -1,4 +1,4 @@ -use crate::nar::RenderError; +use crate::nar::{NarCalculationService, RenderError}; use crate::pathinfoservice::PathInfoService; use crate::proto; use futures::{stream::BoxStream, TryStreamExt}; @@ -7,23 +7,26 @@ use tonic::{async_trait, Request, Response, Result, Status}; use tracing::{instrument, warn}; use tvix_castore::proto as castorepb; -pub struct GRPCPathInfoServiceWrapper<PS> { - inner: PS, +pub struct GRPCPathInfoServiceWrapper<PS, NS> { + path_info_service: PS, // FUTUREWORK: allow exposing without allowing listing + nar_calculation_service: NS, } -impl<PS> GRPCPathInfoServiceWrapper<PS> { - pub fn new(path_info_service: PS) -> Self { +impl<PS, NS> GRPCPathInfoServiceWrapper<PS, NS> { + pub fn new(path_info_service: PS, nar_calculation_service: NS) -> Self { Self { - inner: path_info_service, + path_info_service, + nar_calculation_service, } } } #[async_trait] -impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS> +impl<PS, NS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS, NS> where PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static, + NS: NarCalculationService + Send + Sync + 'static, { type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>; @@ -39,7 +42,7 @@ where .to_vec() .try_into() .map_err(|_e| Status::invalid_argument("invalid output digest length"))?; - match self.inner.get(digest).await { + match self.path_info_service.get(digest).await { Ok(None) => Err(Status::not_found("PathInfo not found")), Ok(Some(path_info)) => Ok(Response::new(path_info)), Err(e) => { @@ -57,7 +60,7 @@ where // Store the PathInfo in the client. Clients MUST validate the data // they receive, so we don't validate additionally here. - match self.inner.put(path_info).await { + match self.path_info_service.put(path_info).await { Ok(path_info_new) => Ok(Response::new(path_info_new)), Err(e) => { warn!(err = %e, "failed to put PathInfo"); @@ -79,7 +82,7 @@ where Err(Status::invalid_argument("invalid root node"))? } - match self.inner.calculate_nar(&root_node).await { + match self.nar_calculation_service.calculate_nar(&root_node).await { Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse { nar_size, nar_sha256: nar_sha256.to_vec().into(), @@ -99,7 +102,7 @@ where _request: Request<proto::ListPathInfoRequest>, ) -> Result<Response<Self::ListStream>, Status> { let stream = Box::pin( - self.inner + self.path_info_service .list() .map_err(|e| Status::internal(e.to_string())), ); diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs index 10fde33527..a09839c8bd 100644 --- a/tvix/store/src/proto/mod.rs +++ b/tvix/store/src/proto/mod.rs @@ -74,23 +74,20 @@ pub enum ValidatePathInfoError { /// Parses a root node name. /// -/// On success, this returns the parsed [store_path::StorePath]. +/// On success, this returns the parsed [store_path::StorePathRef]. /// On error, it returns an error generated from the supplied constructor. fn parse_node_name_root<E>( name: &[u8], err: fn(Vec<u8>, store_path::Error) -> E, -) -> Result<store_path::StorePath, E> { - match store_path::StorePath::from_bytes(name) { - Ok(np) => Ok(np), - Err(e) => Err(err(name.to_vec(), e)), - } +) -> Result<store_path::StorePathRef<'_>, E> { + store_path::StorePathRef::from_bytes(name).map_err(|e| err(name.to_vec(), e)) } impl PathInfo { /// validate performs some checks on the PathInfo struct, /// Returning either a [store_path::StorePath] of the root node, or a /// [ValidatePathInfoError]. - pub fn validate(&self) -> Result<store_path::StorePath, ValidatePathInfoError> { + pub fn validate(&self) -> Result<store_path::StorePathRef<'_>, ValidatePathInfoError> { // ensure the references have the right number of bytes. for (i, reference) in self.references.iter().enumerate() { if reference.len() != store_path::DIGEST_SIZE { @@ -154,8 +151,7 @@ impl PathInfo { // converting to this field. if let Some(deriver) = &narinfo.deriver { store_path::StorePathRef::from_name_and_digest(&deriver.name, &deriver.digest) - .map_err(ValidatePathInfoError::InvalidDeriverField)? - .to_owned(); + .map_err(ValidatePathInfoError::InvalidDeriverField)?; } } } @@ -177,12 +173,16 @@ impl PathInfo { Ok(root_nix_path) } - /// With self and a given StorePathRef, this reconstructs a + /// With self and its store path name, this reconstructs a /// [nix_compat::narinfo::NarInfo<'_>]. /// It can be used to validate Signatures, or get back a (sparse) NarInfo /// struct to prepare writing it out. /// - /// This doesn't allocate any new data. + /// It assumes self to be validated first, and will only return None if the + /// `narinfo` field is unpopulated. + /// + /// It does very little allocation (a Vec each for `signatures` and + /// `references`), the rest points to data owned elsewhere. /// /// Keep in mind this is not able to reconstruct all data present in the /// NarInfo<'_>, as some of it is not stored at all: @@ -192,7 +192,7 @@ impl PathInfo { /// /// If you want to render it out to a string and be able to parse it back /// in, at least URL *must* be set again. - pub fn as_narinfo<'a>( + pub fn to_narinfo<'a>( &'a self, store_path: store_path::StorePathRef<'a>, ) -> Option<nix_compat::narinfo::NarInfo<'_>> { @@ -201,7 +201,11 @@ impl PathInfo { Some(nix_compat::narinfo::NarInfo { flags: Flags::empty(), store_path, - nar_hash: narinfo.nar_sha256.to_vec().try_into().unwrap(), + nar_hash: narinfo + .nar_sha256 + .as_ref() + .try_into() + .expect("invalid narhash"), nar_size: narinfo.nar_size, references: narinfo .reference_names diff --git a/tvix/store/src/proto/tests/pathinfo.rs b/tvix/store/src/proto/tests/pathinfo.rs index dca74dc92f..4d0834878d 100644 --- a/tvix/store/src/proto/tests/pathinfo.rs +++ b/tvix/store/src/proto/tests/pathinfo.rs @@ -3,97 +3,82 @@ use crate::tests::fixtures::*; use bytes::Bytes; use data_encoding::BASE64; use nix_compat::nixbase32; -use nix_compat::store_path::{self, StorePath, StorePathRef}; -use std::str::FromStr; -use test_case::test_case; +use nix_compat::store_path::{self, StorePathRef}; +use rstest::rstest; use tvix_castore::proto as castorepb; -#[test_case( - None, - Err(ValidatePathInfoError::NoNodePresent) ; - "No node" -)] -#[test_case( - Some(castorepb::Node { node: None }), - Err(ValidatePathInfoError::NoNodePresent); - "No node 2" -)] -fn validate_no_node( - t_node: Option<castorepb::Node>, - t_result: Result<StorePath, ValidatePathInfoError>, +#[rstest] +#[case::no_node(None, Err(ValidatePathInfoError::NoNodePresent))] +#[case::no_node_2(Some(castorepb::Node { node: None}), Err(ValidatePathInfoError::NoNodePresent))] + +fn validate_pathinfo( + #[case] node: Option<castorepb::Node>, + #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, ) { // construct the PathInfo object let p = PathInfo { - node: t_node, + node, ..Default::default() }; - assert_eq!(t_result, p.validate()); + + assert_eq!(exp_result, p.validate()); + + let err = p.validate().expect_err("validation should fail"); + assert!(matches!(err, ValidatePathInfoError::NoNodePresent)); } -#[test_case( - castorepb::DirectoryNode { - name: DUMMY_NAME.into(), +#[rstest] +#[case::ok(castorepb::DirectoryNode { + name: DUMMY_PATH.into(), digest: DUMMY_DIGEST.clone().into(), size: 0, - }, - Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed")); - "ok" -)] -#[test_case( - castorepb::DirectoryNode { - name: DUMMY_NAME.into(), +}, Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()))] +#[case::invalid_digest_length(castorepb::DirectoryNode { + name: DUMMY_PATH.into(), digest: Bytes::new(), size: 0, - }, - Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))); - "invalid digest length" -)] -#[test_case( - castorepb::DirectoryNode { +}, Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))))] +#[case::invalid_node_name_no_storepath(castorepb::DirectoryNode { name: "invalid".into(), digest: DUMMY_DIGEST.clone().into(), size: 0, - }, - Err(ValidatePathInfoError::InvalidNodeName( +}, Err(ValidatePathInfoError::InvalidNodeName( "invalid".into(), store_path::Error::InvalidLength - )); - "invalid node name" -)] +)))] fn validate_directory( - t_directory_node: castorepb::DirectoryNode, - t_result: Result<StorePath, ValidatePathInfoError>, + #[case] directory_node: castorepb::DirectoryNode, + #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, ) { // construct the PathInfo object let p = PathInfo { node: Some(castorepb::Node { - node: Some(castorepb::node::Node::Directory(t_directory_node)), + node: Some(castorepb::node::Node::Directory(directory_node)), }), ..Default::default() }; - assert_eq!(t_result, p.validate()); + assert_eq!(exp_result, p.validate()); } -#[test_case( +#[rstest] +#[case::ok( castorepb::FileNode { - name: DUMMY_NAME.into(), + name: DUMMY_PATH.into(), digest: DUMMY_DIGEST.clone().into(), size: 0, executable: false, }, - Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed")); - "ok" + Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()) )] -#[test_case( +#[case::invalid_digest_len( castorepb::FileNode { - name: DUMMY_NAME.into(), + name: DUMMY_PATH.into(), digest: Bytes::new(), ..Default::default() }, - Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))); - "invalid digest length" + Err(ValidatePathInfoError::InvalidRootNode(castorepb::ValidateNodeError::InvalidDigestLen(0))) )] -#[test_case( +#[case::invalid_node_name( castorepb::FileNode { name: "invalid".into(), digest: DUMMY_DIGEST.clone().into(), @@ -102,32 +87,31 @@ fn validate_directory( Err(ValidatePathInfoError::InvalidNodeName( "invalid".into(), store_path::Error::InvalidLength - )); - "invalid node name" + )) )] fn validate_file( - t_file_node: castorepb::FileNode, - t_result: Result<StorePath, ValidatePathInfoError>, + #[case] file_node: castorepb::FileNode, + #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, ) { // construct the PathInfo object let p = PathInfo { node: Some(castorepb::Node { - node: Some(castorepb::node::Node::File(t_file_node)), + node: Some(castorepb::node::Node::File(file_node)), }), ..Default::default() }; - assert_eq!(t_result, p.validate()); + assert_eq!(exp_result, p.validate()); } -#[test_case( +#[rstest] +#[case::ok( castorepb::SymlinkNode { - name: DUMMY_NAME.into(), + name: DUMMY_PATH.into(), target: "foo".into(), }, - Ok(StorePath::from_str(DUMMY_NAME).expect("must succeed")); - "ok" + Ok(StorePathRef::from_bytes(DUMMY_PATH.as_bytes()).unwrap()) )] -#[test_case( +#[case::invalid_node_name( castorepb::SymlinkNode { name: "invalid".into(), target: "foo".into(), @@ -135,21 +119,20 @@ fn validate_file( Err(ValidatePathInfoError::InvalidNodeName( "invalid".into(), store_path::Error::InvalidLength - )); - "invalid node name" + )) )] fn validate_symlink( - t_symlink_node: castorepb::SymlinkNode, - t_result: Result<StorePath, ValidatePathInfoError>, + #[case] symlink_node: castorepb::SymlinkNode, + #[case] exp_result: Result<StorePathRef, ValidatePathInfoError>, ) { // construct the PathInfo object let p = PathInfo { node: Some(castorepb::Node { - node: Some(castorepb::node::Node::Symlink(t_symlink_node)), + node: Some(castorepb::node::Node::Symlink(symlink_node)), }), ..Default::default() }; - assert_eq!(t_result, p.validate()); + assert_eq!(exp_result, p.validate()); } /// Ensure parsing a correct PathInfo without narinfo populated succeeds. @@ -236,7 +219,7 @@ fn validate_inconsistent_narinfo_reference_name_digest() { match path_info.validate().expect_err("must fail") { ValidatePathInfoError::InconsistentNarinfoReferenceNameDigest(0, e_expected, e_actual) => { assert_eq!(path_info.references[0][..], e_expected[..]); - assert_eq!(DUMMY_OUTPUT_HASH, e_actual); + assert_eq!(DUMMY_PATH_DIGEST, e_actual); } e => panic!("unexpected error: {:?}", e), } @@ -274,7 +257,7 @@ fn validate_valid_deriver() { let narinfo = path_info.narinfo.as_mut().unwrap(); narinfo.deriver = Some(crate::proto::StorePath { name: "foo".to_string(), - digest: Bytes::from(DUMMY_OUTPUT_HASH.as_slice()), + digest: Bytes::from(DUMMY_PATH_DIGEST.as_slice()), }); path_info.validate().expect("must validate"); @@ -425,7 +408,7 @@ CA: fixed:sha256:086vqwk2wl8zfs47sq2xpjc9k066ilmb8z6dn0q6ymwjzlm196cd"# let path_info: PathInfo = (&narinfo_parsed).into(); let mut narinfo_returned = path_info - .as_narinfo( + .to_narinfo( StorePathRef::from_bytes(b"pa10z4ngm0g83kx9mssrqzz30s84vq7k-hello-2.12.1.tar.gz") .expect("invalid storepath"), ) diff --git a/tvix/store/src/tests/fixtures.rs b/tvix/store/src/tests/fixtures.rs index 500ac0aa5b..1c8359a2c0 100644 --- a/tvix/store/src/tests/fixtures.rs +++ b/tvix/store/src/tests/fixtures.rs @@ -13,8 +13,8 @@ use crate::proto::{ NarInfo, PathInfo, }; -pub const DUMMY_NAME: &str = "00000000000000000000000000000000-dummy"; -pub const DUMMY_OUTPUT_HASH: [u8; 20] = [0; 20]; +pub const DUMMY_PATH: &str = "00000000000000000000000000000000-dummy"; +pub const DUMMY_PATH_DIGEST: [u8; 20] = [0; 20]; lazy_static! { /// The NAR representation of a symlink pointing to `/nix/store/somewhereelse` @@ -106,12 +106,12 @@ lazy_static! { pub static ref PATH_INFO_WITHOUT_NARINFO : PathInfo = PathInfo { node: Some(castorepb::Node { node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DUMMY_NAME.into(), + name: DUMMY_PATH.into(), digest: DUMMY_DIGEST.clone().into(), size: 0, })), }), - references: vec![DUMMY_OUTPUT_HASH.as_slice().into()], + references: vec![DUMMY_PATH_DIGEST.as_slice().into()], narinfo: None, }; @@ -123,7 +123,7 @@ lazy_static! { nar_size: 0, nar_sha256: DUMMY_DIGEST.clone().into(), signatures: vec![], - reference_names: vec![DUMMY_NAME.to_string()], + reference_names: vec![DUMMY_PATH.to_string()], deriver: None, ca: Some(Ca { r#type: ca::Hash::NarSha256.into(), digest: DUMMY_DIGEST.clone().into() }) }), diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs index 041a9e683d..e6e42f6ec4 100644 --- a/tvix/store/src/utils.rs +++ b/tvix/store/src/utils.rs @@ -1,13 +1,19 @@ use std::sync::Arc; +use std::{ + pin::Pin, + task::{self, Poll}, +}; +use tokio::io::{self, AsyncWrite}; use tvix_castore::{ blobservice::{self, BlobService}, directoryservice::{self, DirectoryService}, }; +use crate::nar::{NarCalculationService, SimpleRenderer}; use crate::pathinfoservice::{self, PathInfoService}; -/// Construct the three store handles from their addrs. +/// Construct the store handles from their addrs. pub async fn construct_services( blob_service_addr: impl AsRef<str>, directory_service_addr: impl AsRef<str>, @@ -16,6 +22,7 @@ pub async fn construct_services( Arc<dyn BlobService>, Arc<dyn DirectoryService>, Box<dyn PathInfoService>, + Box<dyn NarCalculationService>, )> { let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref()) .await? @@ -31,5 +38,41 @@ pub async fn construct_services( ) .await?; - Ok((blob_service, directory_service, path_info_service)) + // TODO: grpc client also implements NarCalculationService + let nar_calculation_service = Box::new(SimpleRenderer::new( + blob_service.clone(), + directory_service.clone(), + )) as Box<dyn NarCalculationService>; + + Ok(( + blob_service, + directory_service, + path_info_service, + nar_calculation_service, + )) +} + +/// The inverse of [tokio_util::io::SyncIoBridge]. +/// Don't use this with anything that actually does blocking I/O. +pub struct AsyncIoBridge<T>(pub T); + +impl<W: std::io::Write + Unpin> AsyncWrite for AsyncIoBridge<W> { + fn poll_write( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll<io::Result<usize>> { + Poll::Ready(self.get_mut().0.write(buf)) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> { + Poll::Ready(self.get_mut().0.flush()) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + _cx: &mut task::Context<'_>, + ) -> Poll<Result<(), io::Error>> { + Poll::Ready(Ok(())) + } } |