diff options
Diffstat (limited to 'tvix/store')
21 files changed, 818 insertions, 786 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index b549eeb7f5..4727f43f78 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [dependencies] anyhow = "1.0.68" +async-compression = { version = "0.4.9", features = ["tokio", "bzip2", "gzip", "xz", "zstd"]} async-stream = "0.3.5" blake3 = { version = "1.3.1", features = ["rayon", "std"] } bstr = "1.6.0" @@ -17,9 +18,9 @@ lazy_static = "1.4.0" nix-compat = { path = "../nix-compat", features = ["async"] } pin-project-lite = "0.2.13" prost = "0.12.1" -opentelemetry = { version = "0.21.0", optional = true} -opentelemetry-otlp = { version = "0.14.0", optional = true } -opentelemetry_sdk = { version = "0.21.0", features = ["rt-tokio"], optional = true} +opentelemetry = { version = "0.22.0", optional = true} +opentelemetry-otlp = { version = "0.15.0", optional = true } +opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"], optional = true} serde = { version = "1.0.197", features = [ "derive" ] } serde_json = "1.0" serde_with = "3.7.0" @@ -28,20 +29,20 @@ sha2 = "0.10.6" sled = { version = "0.34.7" } thiserror = "1.0.38" tokio = { version = "1.32.0", features = ["fs", "macros", "net", "rt", "rt-multi-thread", "signal"] } -tokio-listener = { version = "0.3.2", features = [ "tonic011" ] } +tokio-listener = { version = "0.4.1", features = [ "tonic011" ] } tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } tonic = { version = "0.11.0", features = ["tls", "tls-roots"] } tower = "0.4.13" tracing = "0.1.37" -tracing-opentelemetry = "0.22.0" -tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] } +tracing-opentelemetry = "0.23.0" +tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tvix-castore = { path = "../castore" } url = "2.4.0" walkdir = "2.4.0" -async-recursion = "1.0.5" reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots", "stream"], default-features = false } -xz2 = "0.1.7" +lru = "0.12.3" +parking_lot = "0.12.2" [dependencies.tonic-reflection] optional = true @@ -74,3 +75,10 @@ fuse = ["tvix-castore/fuse"] otlp = ["dep:opentelemetry", "dep:opentelemetry-otlp", "dep:opentelemetry_sdk"] tonic-reflection = ["dep:tonic-reflection", "tvix-castore/tonic-reflection"] virtiofs = ["tvix-castore/virtiofs"] +# Whether to run the integration tests. +# Requires the following packages in $PATH: +# cbtemulator, google-cloud-bigtable-tool +integration = [] + +[lints] +workspace = true diff --git a/tvix/store/default.nix b/tvix/store/default.nix index f30923ac27..ad47994f24 100644 --- a/tvix/store/default.nix +++ b/tvix/store/default.nix @@ -26,7 +26,6 @@ in runTests = true; testPreRun = '' export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt - export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}" ''; # enable some optional features. @@ -34,7 +33,20 @@ in # virtiofs feature currently fails to build on Darwin. ++ pkgs.lib.optional pkgs.stdenv.isLinux "virtiofs"; }).overrideAttrs (_: { + meta.ci.targets = [ "integration-tests" ]; meta.ci.extraSteps = { import-docs = (mkImportCheck "tvix/store/docs" ./docs); }; + passthru.integration-tests = depot.tvix.crates.workspaceMembers.tvix-store.build.override { + runTests = true; + testPreRun = '' + export SSL_CERT_FILE=${pkgs.cacert}/etc/ssl/certs/ca-bundle.crt + export PATH="$PATH:${pkgs.lib.makeBinPath [pkgs.cbtemulator pkgs.google-cloud-bigtable-tool]}" + ''; + + # enable some optional features. + features = [ "default" "cloud" "integration" ] + # virtiofs feature currently fails to build on Darwin. + ++ pkgs.lib.optional pkgs.stdenv.isLinux "virtiofs"; + }; }) diff --git a/tvix/store/docs/api.md b/tvix/store/docs/api.md index c1dacc89a5..01e72671a7 100644 --- a/tvix/store/docs/api.md +++ b/tvix/store/docs/api.md @@ -218,7 +218,7 @@ This is useful for people running a Tvix-only system, or running builds on a In a system with Nix installed, we can't simply manually "extract" things to `/nix/store`, as Nix assumes to own all writes to this location. In these use cases, we're probably better off exposing a tvix-store as a local -binary cache (that's what `//tvix/nar-bridge` does). +binary cache (that's what `//tvix/nar-bridge-go` does). Assuming we are in an environment where we control `/nix/store` exclusively, a "realize to disk" would either "extract" things from the `tvix-store` to a diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 8262a9e98c..906d0ab520 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -19,6 +19,7 @@ use tracing_subscriber::EnvFilter; use tracing_subscriber::Layer; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tvix_castore::import::fs::ingest_path; +use tvix_store::nar::NarCalculationService; use tvix_store::proto::NarInfo; use tvix_store::proto::PathInfo; @@ -56,10 +57,6 @@ use tvix_store::proto::FILE_DESCRIPTOR_SET; #[derive(Parser)] #[command(author, version, about, long_about = None)] struct Cli { - /// Whether to log in JSON - #[arg(long)] - json: bool, - /// Whether to configure OTLP. Set --otlp=false to disable. #[arg(long, default_missing_value = "true", default_value = "true", num_args(0..=1), require_equals(true), action(clap::ArgAction::Set))] otlp: bool, @@ -82,7 +79,11 @@ enum Commands { #[arg(long, short = 'l')] listen_address: Option<String>, - #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")] + #[arg( + long, + env, + default_value = "objectstore+file:///var/lib/tvix-store/blobs.object_store" + )] blob_service_addr: String, #[arg( @@ -218,33 +219,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let level = cli.log_level.unwrap_or(Level::INFO); // Set up the tracing subscriber. - let subscriber = tracing_subscriber::registry() - .with( - cli.json.then_some( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .json() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), + let subscriber = tracing_subscriber::registry().with( + tracing_subscriber::fmt::Layer::new() + .with_writer(std::io::stderr) + .compact() + .with_filter( + EnvFilter::builder() + .with_default_directive(level.into()) + .from_env() + .expect("invalid RUST_LOG"), ), - ) - .with( - (!cli.json).then_some( - tracing_subscriber::fmt::Layer::new() - .with_writer(std::io::stderr) - .pretty() - .with_filter( - EnvFilter::builder() - .with_default_directive(level.into()) - .from_env() - .expect("invalid RUST_LOG"), - ), - ), - ); + ); // Add the otlp layer (when otlp is enabled, and it's not disabled in the CLI) // then init the registry. @@ -302,7 +287,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { path_info_service_addr, } => { // initialize stores - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -327,6 +312,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { )) .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( Arc::from(path_info_service), + nar_calculation_service, ))); #[cfg(feature = "tonic-reflection")] @@ -356,7 +342,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { path_info_service_addr, } => { // FUTUREWORK: allow flat for single files? - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -364,8 +350,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { ) .await?; - // Arc the PathInfoService, as we clone it . + // Arc PathInfoService and NarCalculationService, as we clone it . let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + let nar_calculation_service: Arc<dyn NarCalculationService> = + nar_calculation_service.into(); let tasks = paths .into_iter() @@ -374,6 +362,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let blob_service = blob_service.clone(); let directory_service = directory_service.clone(); let path_info_service = path_info_service.clone(); + let nar_calculation_service = nar_calculation_service.clone(); async move { if let Ok(name) = tvix_store::import::path_to_name(&path) { @@ -383,6 +372,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { blob_service, directory_service, path_info_service, + nar_calculation_service, ) .await; if let Ok(output_path) = resp { @@ -403,7 +393,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { path_info_service_addr, reference_graph_path, } => { - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -510,7 +500,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { allow_other, show_xattr, } => { - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, @@ -552,7 +542,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { list_root, show_xattr, } => { - let (blob_service, directory_service, path_info_service) = + let (blob_service, directory_service, path_info_service, _nar_calculation_service) = tvix_store::utils::construct_services( blob_service_addr, directory_service_addr, diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index 7b6aeb824e..888380bca9 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -11,6 +11,7 @@ use nix_compat::{ }; use crate::{ + nar::NarCalculationService, pathinfoservice::PathInfoService, proto::{nar_info, NarInfo, PathInfo}, }; @@ -104,23 +105,27 @@ pub fn derive_nar_ca_path_info( /// Ingest the given path `path` and register the resulting output path in the /// [`PathInfoService`] as a recursive fixed output NAR. #[instrument(skip_all, fields(store_name=name, path=?path), err)] -pub async fn import_path_as_nar_ca<BS, DS, PS, P>( +pub async fn import_path_as_nar_ca<BS, DS, PS, NS, P>( path: P, name: &str, blob_service: BS, directory_service: DS, path_info_service: PS, + nar_calculation_service: NS, ) -> Result<StorePath, std::io::Error> where P: AsRef<Path> + std::fmt::Debug, BS: BlobService + Clone, - DS: AsRef<dyn DirectoryService>, + DS: DirectoryService, PS: AsRef<dyn PathInfoService>, + NS: NarCalculationService, { - let root_node = ingest_path(blob_service, directory_service, path.as_ref()).await?; + let root_node = ingest_path(blob_service, directory_service, path.as_ref()) + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?; - // Ask the PathInfoService for the NAR size and sha256 - let (nar_size, nar_sha256) = path_info_service.as_ref().calculate_nar(&root_node).await?; + // Ask for the NAR size and sha256 + let (nar_size, nar_sha256) = nar_calculation_service.calculate_nar(&root_node).await?; // Calculate the output path. This might still fail, as some names are illegal. // FUTUREWORK: express the `name` at the type level to be valid and move the conversion diff --git a/tvix/store/src/nar/import.rs b/tvix/store/src/nar/import.rs index 6f4dcdea5d..36122d419d 100644 --- a/tvix/store/src/nar/import.rs +++ b/tvix/store/src/nar/import.rs @@ -1,225 +1,132 @@ -use bytes::Bytes; -use nix_compat::nar; -use std::io::{self, BufRead}; -use tokio_util::io::SyncIoBridge; -use tracing::warn; +use nix_compat::nar::reader::r#async as nar_reader; +use tokio::{io::AsyncBufRead, sync::mpsc, try_join}; use tvix_castore::{ blobservice::BlobService, - directoryservice::{DirectoryPutter, DirectoryService}, - proto::{self as castorepb}, - B3Digest, + directoryservice::DirectoryService, + import::{ + blobs::{self, ConcurrentBlobUploader}, + ingest_entries, IngestionEntry, IngestionError, + }, + proto::{node::Node, NamedNode}, + PathBuf, }; -/// Accepts a reader providing a NAR. -/// Will traverse it, uploading blobs to the given [BlobService], and -/// directories to the given [DirectoryService]. -/// On success, the root node is returned. -/// This function is not async (because the NAR reader is not) -/// and calls [tokio::task::block_in_place] when interacting with backing -/// services, so make sure to only call this with spawn_blocking. -pub fn read_nar<R, BS, DS>( - r: &mut R, +/// Ingests the contents from a [AsyncRead] providing NAR into the tvix store, +/// interacting with a [BlobService] and [DirectoryService]. +/// It returns the castore root node or an error. +pub async fn ingest_nar<R, BS, DS>( blob_service: BS, directory_service: DS, -) -> io::Result<castorepb::node::Node> + r: &mut R, +) -> Result<Node, IngestionError<Error>> where - R: BufRead + Send, - BS: AsRef<dyn BlobService>, - DS: AsRef<dyn DirectoryService>, + R: AsyncBufRead + Unpin + Send, + BS: BlobService + Clone + 'static, + DS: DirectoryService, { - let handle = tokio::runtime::Handle::current(); - - let directory_putter = directory_service.as_ref().put_multiple_start(); - - let node = nix_compat::nar::reader::open(r)?; - let (root_node, mut directory_putter, _) = process_node( - handle.clone(), - "".into(), // this is the root node, it has an empty name - node, - &blob_service, - directory_putter, - )?; - - // In case the root node points to a directory, we need to close - // [directory_putter], and ensure the digest we got back from there matches - // what the root node is pointing to. - if let castorepb::node::Node::Directory(ref directory_node) = root_node { - // Close directory_putter to make sure all directories have been inserted. - let directory_putter_digest = - handle.block_on(handle.spawn(async move { directory_putter.close().await }))??; - let root_directory_node_digest: B3Digest = - directory_node.digest.clone().try_into().unwrap(); - - if directory_putter_digest != root_directory_node_digest { - warn!( - root_directory_node_digest = %root_directory_node_digest, - directory_putter_digest =%directory_putter_digest, - "directory digest mismatch", - ); - return Err(io::Error::new( - io::ErrorKind::Other, - "directory digest mismatch", - )); - } - } - // In case it's not a Directory, [directory_putter] doesn't need to be - // closed (as we didn't end up uploading anything). - // It can just be dropped, as documented in its trait. - - Ok(root_node) -} + // open the NAR for reading. + // The NAR reader emits nodes in DFS preorder. + let root_node = nar_reader::open(r).await.map_err(Error::IO)?; -/// This is called on a [nar::reader::Node] and returns a [castorepb::node::Node]. -/// It does so by handling all three kinds, and recursing for directories. -/// -/// [DirectoryPutter] is passed around, so a single instance of it can be used, -/// which is sufficient, as this reads through the whole NAR linerarly. -fn process_node<BS>( - handle: tokio::runtime::Handle, - name: bytes::Bytes, - node: nar::reader::Node, - blob_service: BS, - directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::node::Node, Box<dyn DirectoryPutter>, BS)> -where - BS: AsRef<dyn BlobService>, -{ - Ok(match node { - nar::reader::Node::Symlink { target } => ( - castorepb::node::Node::Symlink(castorepb::SymlinkNode { - name, - target: target.into(), - }), - directory_putter, - blob_service, - ), - nar::reader::Node::File { executable, reader } => ( - castorepb::node::Node::File(process_file_reader( - handle, - name, - reader, - executable, - &blob_service, - )?), - directory_putter, - blob_service, - ), - nar::reader::Node::Directory(dir_reader) => { - let (directory_node, directory_putter, blob_service_back) = - process_dir_reader(handle, name, dir_reader, blob_service, directory_putter)?; - - ( - castorepb::node::Node::Directory(directory_node), - directory_putter, - blob_service_back, - ) - } - }) -} - -/// Given a name and [nar::reader::FileReader], this ingests the file into the -/// passed [BlobService] and returns a [castorepb::FileNode]. -fn process_file_reader<BS>( - handle: tokio::runtime::Handle, - name: Bytes, - mut file_reader: nar::reader::FileReader, - executable: bool, - blob_service: BS, -) -> io::Result<castorepb::FileNode> -where - BS: AsRef<dyn BlobService>, -{ - // store the length. If we read any other length, reading will fail. - let expected_len = file_reader.len(); + let (tx, rx) = mpsc::channel(1); + let rx = tokio_stream::wrappers::ReceiverStream::new(rx); - // prepare writing a new blob. - let blob_writer = handle.block_on(async { blob_service.as_ref().open_write().await }); + let produce = async move { + let mut blob_uploader = ConcurrentBlobUploader::new(blob_service); - // write the blob. - let mut blob_writer = { - let mut dst = SyncIoBridge::new(blob_writer); + let res = produce_nar_inner( + &mut blob_uploader, + root_node, + "root".parse().unwrap(), // HACK: the root node sent to ingest_entries may not be ROOT. + tx.clone(), + ) + .await; + + if let Err(err) = blob_uploader.join().await { + tx.send(Err(err.into())) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; + } - file_reader.copy(&mut dst)?; - dst.shutdown()?; + tx.send(res) + .await + .map_err(|e| Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)))?; - // return back the blob_writer - dst.into_inner() + Ok(()) }; - // close the blob_writer, retrieve the digest. - let blob_digest = handle.block_on(async { blob_writer.close().await })?; + let consume = ingest_entries(directory_service, rx); - Ok(castorepb::FileNode { - name, - digest: blob_digest.into(), - size: expected_len, - executable, - }) + let (_, node) = try_join!(produce, consume)?; + + // remove the fake "root" name again + debug_assert_eq!(&node.get_name(), b"root"); + Ok(node.rename("".into())) } -/// Given a name and [nar::reader::DirReader], this returns a [castorepb::DirectoryNode]. -/// It uses [process_node] to iterate over all children. -/// -/// [DirectoryPutter] is passed around, so a single instance of it can be used, -/// which is sufficient, as this reads through the whole NAR linerarly. -fn process_dir_reader<BS>( - handle: tokio::runtime::Handle, - name: Bytes, - mut dir_reader: nar::reader::DirReader, - blob_service: BS, - directory_putter: Box<dyn DirectoryPutter>, -) -> io::Result<(castorepb::DirectoryNode, Box<dyn DirectoryPutter>, BS)> +async fn produce_nar_inner<BS>( + blob_uploader: &mut ConcurrentBlobUploader<BS>, + node: nar_reader::Node<'_, '_>, + path: PathBuf, + tx: mpsc::Sender<Result<IngestionEntry, Error>>, +) -> Result<IngestionEntry, Error> where - BS: AsRef<dyn BlobService>, + BS: BlobService + Clone + 'static, { - let mut directory = castorepb::Directory::default(); - - let mut directory_putter = directory_putter; - let mut blob_service = blob_service; - while let Some(entry) = dir_reader.next()? { - let (node, directory_putter_back, blob_service_back) = process_node( - handle.clone(), - entry.name.into(), - entry.node, - blob_service, - directory_putter, - )?; - - blob_service = blob_service_back; - directory_putter = directory_putter_back; - - match node { - castorepb::node::Node::Directory(node) => directory.directories.push(node), - castorepb::node::Node::File(node) => directory.files.push(node), - castorepb::node::Node::Symlink(node) => directory.symlinks.push(node), + Ok(match node { + nar_reader::Node::Symlink { target } => IngestionEntry::Symlink { path, target }, + nar_reader::Node::File { + executable, + mut reader, + } => { + let size = reader.len(); + let digest = blob_uploader.upload(&path, size, &mut reader).await?; + + IngestionEntry::Regular { + path, + size, + executable, + digest, + } } - } + nar_reader::Node::Directory(mut dir_reader) => { + while let Some(entry) = dir_reader.next().await? { + let mut path = path.clone(); + + // valid NAR names are valid castore names + path.try_push(entry.name) + .expect("Tvix bug: failed to join name"); + + let entry = Box::pin(produce_nar_inner( + blob_uploader, + entry.node, + path, + tx.clone(), + )) + .await?; + + tx.send(Ok(entry)).await.map_err(|e| { + Error::IO(std::io::Error::new(std::io::ErrorKind::BrokenPipe, e)) + })?; + } + + IngestionEntry::Dir { path } + } + }) +} - // calculate digest and size. - let directory_digest = directory.digest(); - let directory_size = directory.size(); - - // upload the directory. This is a bit more verbose, as we want to get back - // directory_putter for later reuse. - let directory_putter = handle.block_on(handle.spawn(async move { - directory_putter.put(directory).await?; - Ok::<_, io::Error>(directory_putter) - }))??; - - Ok(( - castorepb::DirectoryNode { - name, - digest: directory_digest.into(), - size: directory_size, - }, - directory_putter, - blob_service, - )) +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error(transparent)] + IO(#[from] std::io::Error), + + #[error(transparent)] + BlobUpload(#[from] blobs::Error), } #[cfg(test)] mod test { - use crate::nar::read_nar; + use crate::nar::ingest_nar; use std::io::Cursor; use std::sync::Arc; @@ -244,19 +151,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking(|| { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), - blob_service, - directory_service, - ) - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service, + directory_service, + &mut Cursor::new(&NAR_CONTENTS_SYMLINK.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::Symlink(castorepb::SymlinkNode { @@ -273,22 +174,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking({ - let blob_service = blob_service.clone(); - move || { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), - blob_service, - directory_service, - ) - } - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service.clone(), + directory_service, + &mut Cursor::new(&NAR_CONTENTS_HELLOWORLD.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::File(castorepb::FileNode { @@ -310,23 +202,13 @@ mod test { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, ) { - let handle = tokio::runtime::Handle::current(); - - let root_node = handle - .spawn_blocking({ - let blob_service = blob_service.clone(); - let directory_service = directory_service.clone(); - || { - read_nar( - &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), - blob_service, - directory_service, - ) - } - }) - .await - .unwrap() - .expect("must parse"); + let root_node = ingest_nar( + blob_service.clone(), + directory_service.clone(), + &mut Cursor::new(&NAR_CONTENTS_COMPLICATED.clone()), + ) + .await + .expect("must parse"); assert_eq!( castorepb::node::Node::Directory(castorepb::DirectoryNode { diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs index 49bb92fb0f..164748a655 100644 --- a/tvix/store/src/nar/mod.rs +++ b/tvix/store/src/nar/mod.rs @@ -1,10 +1,36 @@ +use tonic::async_trait; use tvix_castore::B3Digest; mod import; mod renderer; -pub use import::read_nar; +pub use import::ingest_nar; pub use renderer::calculate_size_and_sha256; pub use renderer::write_nar; +pub use renderer::SimpleRenderer; +use tvix_castore::proto as castorepb; + +#[async_trait] +pub trait NarCalculationService: Send + Sync { + /// Return the nar size and nar sha256 digest for a given root node. + /// This can be used to calculate NAR-based output paths. + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error>; +} + +#[async_trait] +impl<A> NarCalculationService for A +where + A: AsRef<dyn NarCalculationService> + Send + Sync, +{ + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error> { + self.as_ref().calculate_nar(root_node).await + } +} /// Errors that can encounter while rendering NARs. #[derive(Debug, thiserror::Error)] diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index 0816b8e973..efd67671db 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -1,17 +1,51 @@ use crate::utils::AsyncIoBridge; -use super::RenderError; -use async_recursion::async_recursion; +use super::{NarCalculationService, RenderError}; use count_write::CountWrite; use nix_compat::nar::writer::r#async as nar_writer; use sha2::{Digest, Sha256}; use tokio::io::{self, AsyncWrite, BufReader}; +use tonic::async_trait; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, proto::{self as castorepb, NamedNode}, }; +pub struct SimpleRenderer<BS, DS> { + blob_service: BS, + directory_service: DS, +} + +impl<BS, DS> SimpleRenderer<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { + Self { + blob_service, + directory_service, + } + } +} + +#[async_trait] +impl<BS, DS> NarCalculationService for SimpleRenderer<BS, DS> +where + BS: BlobService + Clone, + DS: DirectoryService + Clone, +{ + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), tvix_castore::Error> { + calculate_size_and_sha256( + root_node, + self.blob_service.clone(), + self.directory_service.clone(), + ) + .await + .map_err(|e| tvix_castore::Error::StorageError(format!("failed rendering nar: {}", e))) + } +} + /// Invoke [write_nar], and return the size and sha256 digest of the produced /// NAR output. pub async fn calculate_size_and_sha256<BS, DS>( @@ -72,9 +106,8 @@ where /// Process an intermediate node in the structure. /// This consumes the node. -#[async_recursion] async fn walk_node<BS, DS>( - nar_node: nar_writer::Node<'async_recursion, '_>, + nar_node: nar_writer::Node<'_, '_>, proto_node: &castorepb::node::Node, blob_service: BS, directory_service: DS, @@ -164,9 +197,13 @@ where .await .map_err(RenderError::NARWriterError)?; - (blob_service, directory_service) = - walk_node(child_node, &proto_node, blob_service, directory_service) - .await?; + (blob_service, directory_service) = Box::pin(walk_node( + child_node, + &proto_node, + blob_service, + directory_service, + )) + .await?; } // close the directory diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs index f49ef475eb..707a686c0a 100644 --- a/tvix/store/src/pathinfoservice/bigtable.rs +++ b/tvix/store/src/pathinfoservice/bigtable.rs @@ -6,12 +6,12 @@ use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2}; use bytes::Bytes; use data_encoding::HEXLOWER; use futures::stream::BoxStream; +use nix_compat::nixbase32; use prost::Message; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationSeconds}; use tonic::async_trait; -use tracing::trace; -use tvix_castore::proto as castorepb; +use tracing::{instrument, trace}; use tvix_castore::Error; /// There should not be more than 10 MiB in a single cell. @@ -67,6 +67,22 @@ pub struct BigtableParameters { app_profile_id: String, } +impl BigtableParameters { + #[cfg(test)] + pub fn default_for_tests() -> Self { + Self { + project_id: "project-1".into(), + instance_name: "instance-1".into(), + is_read_only: false, + channel_size: default_channel_size(), + timeout: default_timeout(), + table_name: "table-1".into(), + family_name: "cf1".into(), + app_profile_id: default_app_profile_id(), + } + } +} + fn default_app_profile_id() -> String { "default".to_owned() } @@ -116,7 +132,7 @@ impl BigtablePathInfoService { .stdout(Stdio::piped()) .kill_on_drop(true) .spawn() - .expect("failed to spwan emulator"); + .expect("failed to spawn emulator"); Retry::spawn( ExponentialBackoff::from_millis(20) @@ -182,6 +198,7 @@ fn derive_pathinfo_key(digest: &[u8; 20]) -> String { #[async_trait] impl PathInfoService for BigtablePathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let mut client = self.client.clone(); let path_info_key = derive_pathinfo_key(&digest); @@ -278,6 +295,7 @@ impl PathInfoService for BigtablePathInfoService { Ok(Some(path_info)) } + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { let store_path = path_info .validate() @@ -330,13 +348,6 @@ impl PathInfoService for BigtablePathInfoService { Ok(path_info) } - async fn calculate_nar( - &self, - _root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - return Err(Error::StorageError("unimplemented".into())); - } - fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { let mut client = self.client.clone(); diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs new file mode 100644 index 0000000000..664144ef49 --- /dev/null +++ b/tvix/store/src/pathinfoservice/combinators.rs @@ -0,0 +1,111 @@ +use crate::proto::PathInfo; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use tonic::async_trait; +use tracing::{debug, instrument}; +use tvix_castore::Error; + +use super::PathInfoService; + +/// Asks near first, if not found, asks far. +/// If found in there, returns it, and *inserts* it into +/// near. +/// There is no negative cache. +/// Inserts and listings are not implemented for now. +pub struct Cache<PS1, PS2> { + near: PS1, + far: PS2, +} + +impl<PS1, PS2> Cache<PS1, PS2> { + pub fn new(near: PS1, far: PS2) -> Self { + Self { near, far } + } +} + +#[async_trait] +impl<PS1, PS2> PathInfoService for Cache<PS1, PS2> +where + PS1: PathInfoService, + PS2: PathInfoService, +{ + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + match self.near.get(digest).await? { + Some(path_info) => { + debug!("serving from cache"); + Ok(Some(path_info)) + } + None => { + debug!("not found in near, asking remote…"); + match self.far.get(digest).await? { + None => Ok(None), + Some(path_info) => { + debug!("found in remote, adding to cache"); + self.near.put(path_info.clone()).await?; + Ok(Some(path_info)) + } + } + } + } + } + + async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> { + Err(Error::StorageError("unimplemented".to_string())) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + Box::pin(tokio_stream::once(Err(Error::StorageError( + "unimplemented".to_string(), + )))) + } +} + +#[cfg(test)] +mod test { + use std::num::NonZeroUsize; + + use crate::{ + pathinfoservice::{LruPathInfoService, MemoryPathInfoService, PathInfoService}, + tests::fixtures::PATH_INFO_WITH_NARINFO, + }; + + const PATH_INFO_DIGEST: [u8; 20] = [0; 20]; + + /// Helper function setting up an instance of a "far" and "near" + /// PathInfoService. + async fn create_pathinfoservice() -> super::Cache<LruPathInfoService, MemoryPathInfoService> { + // Create an instance of a "far" PathInfoService. + let far = MemoryPathInfoService::default(); + + // … and an instance of a "near" PathInfoService. + let near = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap()); + + // create a Pathinfoservice combining the two and return it. + super::Cache::new(near, far) + } + + /// Getting from the far backend is gonna insert it into the near one. + #[tokio::test] + async fn test_populate_cache() { + let svc = create_pathinfoservice().await; + + // query the PathInfo, things should not be there. + assert!(svc.get(PATH_INFO_DIGEST).await.unwrap().is_none()); + + // insert it into the far one. + svc.far.put(PATH_INFO_WITH_NARINFO.clone()).await.unwrap(); + + // now try getting it again, it should succeed. + assert_eq!( + Some(PATH_INFO_WITH_NARINFO.clone()), + svc.get(PATH_INFO_DIGEST).await.unwrap() + ); + + // peek near, it should now be there. + assert_eq!( + Some(PATH_INFO_WITH_NARINFO.clone()), + svc.near.get(PATH_INFO_DIGEST).await.unwrap() + ); + } +} diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 1ff822ad35..455909e7f2 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -47,7 +47,7 @@ pub async fn from_addr( if url.has_host() || !url.path().is_empty() { return Err(Error::StorageError("invalid url".to_string())); } - Box::new(MemoryPathInfoService::new(blob_service, directory_service)) + Box::<MemoryPathInfoService>::default() } "sled" => { // sled doesn't support host, and a path can be provided (otherwise @@ -65,10 +65,10 @@ pub async fn from_addr( // TODO: expose other parameters as URL parameters? Box::new(if url.path().is_empty() { - SledPathInfoService::new_temporary(blob_service, directory_service) + SledPathInfoService::new_temporary() .map_err(|e| Error::StorageError(e.to_string()))? } else { - SledPathInfoService::new(url.path(), blob_service, directory_service) + SledPathInfoService::new(url.path()) .map_err(|e| Error::StorageError(e.to_string()))? }) } @@ -208,7 +208,7 @@ mod tests { #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)] /// A valid example for Bigtable. #[cfg_attr( - feature = "cloud", + all(feature = "cloud", feature = "integration"), case::bigtable_valid( "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1", true @@ -216,7 +216,7 @@ mod tests { )] /// An invalid example for Bigtable, missing fields #[cfg_attr( - feature = "cloud", + all(feature = "cloud", feature = "integration"), case::bigtable_invalid_missing_fields("bigtable://instance-1", false) )] #[tokio::test] diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 1138ebdc19..f6a356cf18 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -1,8 +1,11 @@ use super::PathInfoService; -use crate::proto::{self, ListPathInfoRequest, PathInfo}; +use crate::{ + nar::NarCalculationService, + proto::{self, ListPathInfoRequest, PathInfo}, +}; use async_stream::try_stream; -use data_encoding::BASE64; use futures::stream::BoxStream; +use nix_compat::nixbase32; use tonic::{async_trait, transport::Channel, Code}; use tracing::instrument; use tvix_castore::{proto as castorepb, Error}; @@ -27,7 +30,7 @@ impl GRPCPathInfoService { #[async_trait] impl PathInfoService for GRPCPathInfoService { - #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))] + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let path_info = self .grpc_client @@ -67,30 +70,6 @@ impl PathInfoService for GRPCPathInfoService { Ok(path_info) } - #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))] - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - let path_info = self - .grpc_client - .clone() - .calculate_nar(castorepb::Node { - node: Some(root_node.clone()), - }) - .await - .map_err(|e| Error::StorageError(e.to_string()))? - .into_inner(); - - let nar_sha256: [u8; 32] = path_info - .nar_sha256 - .to_vec() - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; - - Ok((path_info.nar_size, nar_sha256)) - } - #[instrument(level = "trace", skip_all)] fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { let mut grpc_client = self.grpc_client.clone(); @@ -126,87 +105,46 @@ impl PathInfoService for GRPCPathInfoService { } } +#[async_trait] +impl NarCalculationService for GRPCPathInfoService { + #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))] + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + let path_info = self + .grpc_client + .clone() + .calculate_nar(castorepb::Node { + node: Some(root_node.clone()), + }) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + let nar_sha256: [u8; 32] = path_info + .nar_sha256 + .to_vec() + .try_into() + .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; + + Ok((path_info.nar_size, nar_sha256)) + } +} + #[cfg(test)] mod tests { - use std::sync::Arc; - use std::time::Duration; - - use rstest::*; - use tempfile::TempDir; - use tokio::net::UnixListener; - use tokio_retry::strategy::ExponentialBackoff; - use tokio_retry::Retry; - use tokio_stream::wrappers::UnixListenerStream; - use tvix_castore::blobservice::BlobService; - use tvix_castore::directoryservice::DirectoryService; - - use crate::pathinfoservice::MemoryPathInfoService; - use crate::proto::path_info_service_client::PathInfoServiceClient; - use crate::proto::GRPCPathInfoServiceWrapper; - use crate::tests::fixtures::{self, blob_service, directory_service}; - - use super::GRPCPathInfoService; - use super::PathInfoService; + use crate::pathinfoservice::tests::make_grpc_path_info_service_client; + use crate::pathinfoservice::PathInfoService; + use crate::tests::fixtures; /// This ensures connecting via gRPC works as expected. - #[rstest] #[tokio::test] - async fn test_valid_unix_path_ping_pong( - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, - ) { - let tmpdir = TempDir::new().unwrap(); - let socket_path = tmpdir.path().join("daemon"); - - let path_clone = socket_path.clone(); - - // Spin up a server - tokio::spawn(async { - let uds = UnixListener::bind(path_clone).unwrap(); - let uds_stream = UnixListenerStream::new(uds); - - // spin up a new server - let mut server = tonic::transport::Server::builder(); - let router = server.add_service( - crate::proto::path_info_service_server::PathInfoServiceServer::new( - GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new( - blob_service, - directory_service, - )) - as Box<dyn PathInfoService>), - ), - ); - router.serve_with_incoming(uds_stream).await - }); - - // wait for the socket to be created - Retry::spawn( - ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), - || async { - if socket_path.exists() { - Ok(()) - } else { - Err(()) - } - }, - ) - .await - .expect("failed to wait for socket"); - - // prepare a client - let grpc_client = { - let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) - .expect("must parse"); - let client = PathInfoServiceClient::new( - tvix_castore::tonic::channel_from_url(&url) - .await - .expect("must succeed"), - ); - - GRPCPathInfoService::from_client(client) - }; + async fn test_valid_unix_path_ping_pong() { + let (_blob_service, _directory_service, path_info_service) = + make_grpc_path_info_service_client().await; - let path_info = grpc_client + let path_info = path_info_service .get(fixtures::DUMMY_PATH_DIGEST) .await .expect("must not be error"); diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs new file mode 100644 index 0000000000..da674f497a --- /dev/null +++ b/tvix/store/src/pathinfoservice/lru.rs @@ -0,0 +1,128 @@ +use async_stream::try_stream; +use futures::stream::BoxStream; +use lru::LruCache; +use nix_compat::nixbase32; +use std::num::NonZeroUsize; +use std::sync::Arc; +use tokio::sync::RwLock; +use tonic::async_trait; +use tracing::instrument; + +use crate::proto::PathInfo; +use tvix_castore::Error; + +use super::PathInfoService; + +pub struct LruPathInfoService { + lru: Arc<RwLock<LruCache<[u8; 20], PathInfo>>>, +} + +impl LruPathInfoService { + pub fn with_capacity(capacity: NonZeroUsize) -> Self { + Self { + lru: Arc::new(RwLock::new(LruCache::new(capacity))), + } + } +} + +#[async_trait] +impl PathInfoService for LruPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + Ok(self.lru.write().await.get(&digest).cloned()) + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // call validate + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("invalid PathInfo: {}", e)))?; + + self.lru + .write() + .await + .put(*store_path.digest(), path_info.clone()); + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let lru = self.lru.clone(); + Box::pin(try_stream! { + let lru = lru.read().await; + let it = lru.iter(); + + for (_k,v) in it { + yield v.clone() + } + }) + } +} + +#[cfg(test)] +mod test { + use std::num::NonZeroUsize; + + use crate::{ + pathinfoservice::{LruPathInfoService, PathInfoService}, + proto::PathInfo, + tests::fixtures::PATH_INFO_WITH_NARINFO, + }; + use lazy_static::lazy_static; + use tvix_castore::proto as castorepb; + + lazy_static! { + static ref PATHINFO_1: PathInfo = PATH_INFO_WITH_NARINFO.clone(); + static ref PATHINFO_1_DIGEST: [u8; 20] = [0; 20]; + static ref PATHINFO_2: PathInfo = { + let mut p = PATHINFO_1.clone(); + let root_node = p.node.as_mut().unwrap(); + if let castorepb::Node { node: Some(node) } = root_node { + let n = node.to_owned(); + *node = n.rename("11111111111111111111111111111111-dummy2".into()); + } else { + unreachable!() + } + p + }; + static ref PATHINFO_2_DIGEST: [u8; 20] = *(PATHINFO_2.validate().unwrap()).digest(); + } + + #[tokio::test] + async fn evict() { + let svc = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap()); + + // pathinfo_1 should not be there + assert!(svc + .get(*PATHINFO_1_DIGEST) + .await + .expect("no error") + .is_none()); + + // insert it + svc.put(PATHINFO_1.clone()).await.expect("no error"); + + // now it should be there. + assert_eq!( + Some(PATHINFO_1.clone()), + svc.get(*PATHINFO_1_DIGEST).await.expect("no error") + ); + + // insert pathinfo_2. This will evict pathinfo 1 + svc.put(PATHINFO_2.clone()).await.expect("no error"); + + // now pathinfo 2 should be there. + assert_eq!( + Some(PATHINFO_2.clone()), + svc.get(*PATHINFO_2_DIGEST).await.expect("no error") + ); + + // … but pathinfo 1 not anymore. + assert!(svc + .get(*PATHINFO_1_DIGEST) + .await + .expect("no error") + .is_none()); + } +} diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index f8435dbbf8..3de3221df2 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -1,40 +1,24 @@ use super::PathInfoService; -use crate::{nar::calculate_size_and_sha256, proto::PathInfo}; -use futures::stream::{iter, BoxStream}; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use crate::proto::PathInfo; +use async_stream::try_stream; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; use tonic::async_trait; -use tvix_castore::proto as castorepb; +use tracing::instrument; use tvix_castore::Error; -use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; -pub struct MemoryPathInfoService<BS, DS> { +#[derive(Default)] +pub struct MemoryPathInfoService { db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>, - - blob_service: BS, - directory_service: DS, -} - -impl<BS, DS> MemoryPathInfoService<BS, DS> { - pub fn new(blob_service: BS, directory_service: DS) -> Self { - Self { - db: Default::default(), - blob_service, - directory_service, - } - } } #[async_trait] -impl<BS, DS> PathInfoService for MemoryPathInfoService<BS, DS> -where - BS: AsRef<dyn BlobService> + Send + Sync, - DS: AsRef<dyn DirectoryService> + Send + Sync, -{ +impl PathInfoService for MemoryPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { - let db = self.db.read().unwrap(); + let db = self.db.read().await; match db.get(&digest) { None => Ok(None), @@ -42,6 +26,7 @@ where } } + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { // Call validate on the received PathInfo message. match path_info.validate() { @@ -53,7 +38,7 @@ where // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. // This overwrites existing PathInfo objects. Ok(nix_path) => { - let mut db = self.db.write().unwrap(); + let mut db = self.db.write().await; db.insert(*nix_path.digest(), path_info.clone()); Ok(path_info) @@ -61,24 +46,16 @@ where } } - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service) - .await - .map_err(|e| Error::StorageError(e.to_string())) - } - fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { - let db = self.db.read().unwrap(); + let db = self.db.clone(); - // Copy all elements into a list. - // This is a bit ugly, because we can't have db escape the lifetime - // of this function, but elements need to be returned owned anyways, and this in- - // memory impl is only for testing purposes anyways. - let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); + Box::pin(try_stream! { + let db = db.read().await; + let it = db.iter(); - Box::pin(iter(items)) + for (_k, v) in it { + yield v.clone() + } + }) } } diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index c1a482bbb5..574bcc0b8b 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -1,5 +1,7 @@ +mod combinators; mod from_addr; mod grpc; +mod lru; mod memory; mod nix_http; mod sled; @@ -12,13 +14,14 @@ mod tests; use futures::stream::BoxStream; use tonic::async_trait; -use tvix_castore::proto as castorepb; use tvix_castore::Error; use crate::proto::PathInfo; +pub use self::combinators::Cache as CachePathInfoService; pub use self::from_addr::from_addr; pub use self::grpc::GRPCPathInfoService; +pub use self::lru::LruPathInfoService; pub use self::memory::MemoryPathInfoService; pub use self::nix_http::NixHTTPPathInfoService; pub use self::sled::SledPathInfoService; @@ -41,14 +44,6 @@ pub trait PathInfoService: Send + Sync { /// invalid messages. async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>; - /// Return the nar size and nar sha256 digest for a given root node. - /// This can be used to calculate NAR-based output paths, - /// and implementations are encouraged to cache it. - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error>; - /// Iterate over all PathInfo objects in the store. /// Implementations can decide to disallow listing. /// @@ -72,13 +67,6 @@ where self.as_ref().put(path_info).await } - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - self.as_ref().calculate_nar(root_node).await - } - fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { self.as_ref().list() } diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index bdb0e2c3cb..cccd4805c6 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -1,6 +1,3 @@ -use std::io::{self, BufRead, Read, Write}; - -use data_encoding::BASE64; use futures::{stream::BoxStream, TryStreamExt}; use nix_compat::{ narinfo::{self, NarInfo}, @@ -8,7 +5,10 @@ use nix_compat::{ nixhash::NixHash, }; use reqwest::StatusCode; -use sha2::{digest::FixedOutput, Digest, Sha256}; +use sha2::Digest; +use std::io::{self, Write}; +use tokio::io::{AsyncRead, BufReader}; +use tokio_util::io::InspectReader; use tonic::async_trait; use tracing::{debug, instrument, warn}; use tvix_castore::{ @@ -32,8 +32,7 @@ use super::PathInfoService; /// /// The client is expected to be (indirectly) using the same [BlobService] and /// [DirectoryService], so able to fetch referred Directories and Blobs. -/// [PathInfoService::put] and [PathInfoService::calculate_nar] are not -/// implemented and return an error if called. +/// [PathInfoService::put] is not implemented and returns an error if called. /// TODO: what about reading from nix-cache-info? pub struct NixHTTPPathInfoService<BS, DS> { base_url: url::Url, @@ -71,7 +70,7 @@ where BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static, { - #[instrument(skip_all, err, fields(path.digest=BASE64.encode(&digest)))] + #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let narinfo_url = self .base_url @@ -171,85 +170,83 @@ where ))); } - // get an AsyncRead of the response body. - let async_r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| { + // get a reader of the response body. + let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| { let e = e.without_url(); warn!(e=%e, "failed to get response body"); io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) })); - let sync_r = tokio_util::io::SyncIoBridge::new(async_r); - // handle decompression, by wrapping the reader. - let sync_r: Box<dyn BufRead + Send> = match narinfo.compression { - Some("none") => Box::new(sync_r), - Some("xz") => Box::new(io::BufReader::new(xz2::read::XzDecoder::new(sync_r))), - Some(comp) => { - return Err(Error::InvalidRequest( - format!("unsupported compression: {}", comp).to_string(), - )) - } - None => { - return Err(Error::InvalidRequest( - "unsupported compression: bzip2".to_string(), - )) + // handle decompression, depending on the compression field. + let r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression { + Some("none") => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>, + Some("bzip2") | None => Box::new(async_compression::tokio::bufread::BzDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some(comp_str) => { + return Err(Error::StorageError(format!( + "unsupported compression: {comp_str}" + ))); } }; - - let res = tokio::task::spawn_blocking({ - let blob_service = self.blob_service.clone(); - let directory_service = self.directory_service.clone(); - move || -> io::Result<_> { - // Wrap the reader once more, so we can calculate NarSize and NarHash - let mut sync_r = io::BufReader::new(NarReader::from(sync_r)); - let root_node = crate::nar::read_nar(&mut sync_r, blob_service, directory_service)?; - - let (_, nar_hash, nar_size) = sync_r.into_inner().into_inner(); - - Ok((root_node, nar_hash, nar_size)) - } - }) + let mut nar_hash = sha2::Sha256::new(); + let mut nar_size = 0; + + // Assemble NarHash and NarSize as we read bytes. + let r = InspectReader::new(r, |b| { + nar_size += b.len() as u64; + nar_hash.write_all(b).unwrap(); + }); + + // HACK: InspectReader doesn't implement AsyncBufRead, but neither do our decompressors. + let mut r = BufReader::new(r); + + let root_node = crate::nar::ingest_nar( + self.blob_service.clone(), + self.directory_service.clone(), + &mut r, + ) .await - .unwrap(); - - match res { - Ok((root_node, nar_hash, nar_size)) => { - // ensure the ingested narhash and narsize do actually match. - if narinfo.nar_size != nar_size { - warn!( - narinfo.nar_size = narinfo.nar_size, - http.nar_size = nar_size, - "NARSize mismatch" - ); - Err(io::Error::new( - io::ErrorKind::InvalidData, - "NarSize mismatch".to_string(), - ))?; - } - if narinfo.nar_hash != nar_hash { - warn!( - narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash), - http.nar_hash = %NixHash::Sha256(nar_hash), - "NarHash mismatch" - ); - Err(io::Error::new( - io::ErrorKind::InvalidData, - "NarHash mismatch".to_string(), - ))?; - } - - Ok(Some(PathInfo { - node: Some(castorepb::Node { - // set the name of the root node to the digest-name of the store path. - node: Some( - root_node.rename(narinfo.store_path.to_string().to_owned().into()), - ), - }), - references: pathinfo.references, - narinfo: pathinfo.narinfo, - })) - } - Err(e) => Err(e.into()), + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + // ensure the ingested narhash and narsize do actually match. + if narinfo.nar_size != nar_size { + warn!( + narinfo.nar_size = narinfo.nar_size, + http.nar_size = nar_size, + "NarSize mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarSize mismatch".to_string(), + ))?; + } + let nar_hash: [u8; 32] = nar_hash.finalize().into(); + if narinfo.nar_hash != nar_hash { + warn!( + narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash), + http.nar_hash = %NixHash::Sha256(nar_hash), + "NarHash mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarHash mismatch".to_string(), + ))?; } + + Ok(Some(PathInfo { + node: Some(castorepb::Node { + // set the name of the root node to the digest-name of the store path. + node: Some(root_node.rename(narinfo.store_path.to_string().to_owned().into())), + }), + references: pathinfo.references, + narinfo: pathinfo.narinfo, + })) } #[instrument(skip_all, fields(path_info=?_path_info))] @@ -259,16 +256,6 @@ where )) } - #[instrument(skip_all, fields(root_node=?root_node))] - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - Err(Error::InvalidRequest( - "calculate_nar not supported for this backend".to_string(), - )) - } - fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { Box::pin(futures::stream::once(async { Err(Error::InvalidRequest( @@ -277,38 +264,3 @@ where })) } } - -/// Small helper reader implementing [std::io::Read]. -/// It can be used to wrap another reader, counts the number of bytes read -/// and the sha256 digest of the contents. -struct NarReader<R: Read> { - r: R, - - sha256: sha2::Sha256, - bytes_read: u64, -} - -impl<R: Read> NarReader<R> { - pub fn from(inner: R) -> Self { - Self { - r: inner, - sha256: Sha256::new(), - bytes_read: 0, - } - } - - /// Returns the (remaining) inner reader, the sha256 digest and the number of bytes read. - pub fn into_inner(self) -> (R, [u8; 32], u64) { - (self.r, self.sha256.finalize_fixed().into(), self.bytes_read) - } -} - -impl<R: Read> Read for NarReader<R> { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - self.r.read(buf).map(|n| { - self.bytes_read += n as u64; - self.sha256.write_all(&buf[..n]).unwrap(); - n - }) - } -} diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 7b6d7fd7ab..eb3cf2ff1b 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -1,140 +1,117 @@ use super::PathInfoService; -use crate::nar::calculate_size_and_sha256; use crate::proto::PathInfo; -use futures::stream::iter; +use async_stream::try_stream; use futures::stream::BoxStream; +use nix_compat::nixbase32; use prost::Message; use std::path::Path; use tonic::async_trait; +use tracing::instrument; use tracing::warn; -use tvix_castore::proto as castorepb; -use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; +use tvix_castore::Error; /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). /// /// The PathInfo messages are stored as encoded protos, and keyed by their output hash, /// as that's currently the only request type available. -pub struct SledPathInfoService<BS, DS> { +pub struct SledPathInfoService { db: sled::Db, - - blob_service: BS, - directory_service: DS, } -impl<BS, DS> SledPathInfoService<BS, DS> { - pub fn new<P: AsRef<Path>>( - p: P, - blob_service: BS, - directory_service: DS, - ) -> Result<Self, sled::Error> { +impl SledPathInfoService { + pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> { let config = sled::Config::default() .use_compression(false) // is a required parameter .path(p); let db = config.open()?; - Ok(Self { - db, - blob_service, - directory_service, - }) + Ok(Self { db }) } - pub fn new_temporary(blob_service: BS, directory_service: DS) -> Result<Self, sled::Error> { + pub fn new_temporary() -> Result<Self, sled::Error> { let config = sled::Config::default().temporary(true); let db = config.open()?; - Ok(Self { - db, - blob_service, - directory_service, - }) + Ok(Self { db }) } } #[async_trait] -impl<BS, DS> PathInfoService for SledPathInfoService<BS, DS> -where - BS: AsRef<dyn BlobService> + Send + Sync, - DS: AsRef<dyn DirectoryService> + Send + Sync, -{ +impl PathInfoService for SledPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { - match self.db.get(digest) { - Ok(None) => Ok(None), - Ok(Some(data)) => match PathInfo::decode(&*data) { - Ok(path_info) => Ok(Some(path_info)), - Err(e) => { + let resp = tokio::task::spawn_blocking({ + let db = self.db.clone(); + move || db.get(digest.as_slice()) + }) + .await? + .map_err(|e| { + warn!("failed to retrieve PathInfo: {}", e); + Error::StorageError(format!("failed to retrieve PathInfo: {}", e)) + })?; + match resp { + None => Ok(None), + Some(data) => { + let path_info = PathInfo::decode(&*data).map_err(|e| { warn!("failed to decode stored PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to decode stored PathInfo: {}", - e - ))) - } - }, - Err(e) => { - warn!("failed to retrieve PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to retrieve PathInfo: {}", - e - ))) + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?; + Ok(Some(path_info)) } } } + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { // Call validate on the received PathInfo message. - match path_info.validate() { - Err(e) => Err(Error::InvalidRequest(format!( - "failed to validate PathInfo: {}", - e - ))), - // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. - // This overwrites existing PathInfo objects. - Ok(nix_path) => match self - .db - .insert(*nix_path.digest(), path_info.encode_to_vec()) - { - Ok(_) => Ok(path_info), - Err(e) => { - warn!("failed to insert PathInfo: {}", e); - Err(Error::StorageError(format! { - "failed to insert PathInfo: {}", e - })) - } - }, - } - } + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?; + + // In case the PathInfo is valid, we were able to parse a StorePath. + // Store it in the database, keyed by its digest. + // This overwrites existing PathInfo objects. + tokio::task::spawn_blocking({ + let db = self.db.clone(); + let k = *store_path.digest(); + let data = path_info.encode_to_vec(); + move || db.insert(k, data) + }) + .await? + .map_err(|e| { + warn!("failed to insert PathInfo: {}", e); + Error::StorageError(format! { + "failed to insert PathInfo: {}", e + }) + })?; - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service) - .await - .map_err(|e| Error::StorageError(e.to_string())) + Ok(path_info) } fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { - Box::pin(iter(self.db.iter().values().map(|v| match v { - Ok(data) => { - // we retrieved some bytes - match PathInfo::decode(&*data) { - Ok(path_info) => Ok(path_info), - Err(e) => { - warn!("failed to decode stored PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to decode stored PathInfo: {}", - e - ))) - } - } - } - Err(e) => { - warn!("failed to retrieve PathInfo: {}", e); - Err(Error::StorageError(format!( - "failed to retrieve PathInfo: {}", - e - ))) + let db = self.db.clone(); + let mut it = db.iter().values(); + + Box::pin(try_stream! { + // Don't block the executor while waiting for .next(), so wrap that + // in a spawn_blocking call. + // We need to pass around it to be able to reuse it. + while let (Some(elem), new_it) = tokio::task::spawn_blocking(move || { + (it.next(), it) + }).await? { + it = new_it; + let data = elem.map_err(|e| { + warn!("failed to retrieve PathInfo: {}", e); + Error::StorageError(format!("failed to retrieve PathInfo: {}", e)) + })?; + + let path_info = PathInfo::decode(&*data).map_err(|e| { + warn!("failed to decode stored PathInfo: {}", e); + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?; + + yield path_info } - }))) + }) } } diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs index c9b9a06377..061655e4ba 100644 --- a/tvix/store/src/pathinfoservice/tests/mod.rs +++ b/tvix/store/src/pathinfoservice/tests/mod.rs @@ -4,62 +4,30 @@ use rstest::*; use rstest_reuse::{self, *}; -use std::sync::Arc; -use tvix_castore::proto as castorepb; -use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; use super::PathInfoService; +use crate::pathinfoservice::MemoryPathInfoService; +use crate::pathinfoservice::SledPathInfoService; use crate::proto::PathInfo; use crate::tests::fixtures::DUMMY_PATH_DIGEST; +use tvix_castore::proto as castorepb; mod utils; -use self::utils::make_grpc_path_info_service_client; - -/// Convenience type alias batching all three servives together. -#[allow(clippy::upper_case_acronyms)] -type BSDSPS = ( - Arc<dyn BlobService>, - Arc<dyn DirectoryService>, - Box<dyn PathInfoService>, -); +pub use self::utils::make_grpc_path_info_service_client; -/// Creates a PathInfoService using a new Memory{Blob,Directory}Service. -/// We return a 3-tuple containing all of them, as some tests want to interact -/// with all three. -pub async fn make_path_info_service(uri: &str) -> BSDSPS { - let blob_service: Arc<dyn BlobService> = tvix_castore::blobservice::from_addr("memory://") - .await - .unwrap() - .into(); - let directory_service: Arc<dyn DirectoryService> = - tvix_castore::directoryservice::from_addr("memory://") - .await - .unwrap() - .into(); - - ( - blob_service.clone(), - directory_service.clone(), - crate::pathinfoservice::from_addr(uri, blob_service, directory_service) - .await - .unwrap(), - ) -} +#[cfg(all(feature = "cloud", feature = "integration"))] +use self::utils::make_bigtable_path_info_service; #[template] #[rstest] -#[case::memory(make_path_info_service("memory://").await)] -#[case::grpc(make_grpc_path_info_service_client().await)] -#[case::sled(make_path_info_service("sled://").await)] -#[cfg_attr(feature = "cloud", case::bigtable(make_path_info_service("bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1").await))] -pub fn path_info_services( - #[case] services: ( - impl BlobService, - impl DirectoryService, - impl PathInfoService, - ), -) { -} +#[case::memory(MemoryPathInfoService::default())] +#[case::grpc({ + let (_, _, svc) = make_grpc_path_info_service_client().await; + svc +})] +#[case::sled(SledPathInfoService::new_temporary().unwrap())] +#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))] +pub fn path_info_services(#[case] svc: impl PathInfoService) {} // FUTUREWORK: add more tests rejecting invalid PathInfo messages. // A subset of them should also ensure references to other PathInfos, or @@ -68,9 +36,8 @@ pub fn path_info_services( /// Trying to get a non-existent PathInfo should return Ok(None). #[apply(path_info_services)] #[tokio::test] -async fn not_found(services: BSDSPS) { - let (_, _, path_info_service) = services; - assert!(path_info_service +async fn not_found(svc: impl PathInfoService) { + assert!(svc .get(DUMMY_PATH_DIGEST) .await .expect("must succeed") @@ -80,9 +47,7 @@ async fn not_found(services: BSDSPS) { /// Put a PathInfo into the store, get it back. #[apply(path_info_services)] #[tokio::test] -async fn put_get(services: BSDSPS) { - let (_, _, path_info_service) = services; - +async fn put_get(svc: impl PathInfoService) { let path_info = PathInfo { node: Some(castorepb::Node { node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode { @@ -94,20 +59,14 @@ async fn put_get(services: BSDSPS) { }; // insert - let resp = path_info_service - .put(path_info.clone()) - .await - .expect("must succeed"); + let resp = svc.put(path_info.clone()).await.expect("must succeed"); // expect the returned PathInfo to be equal (for now) // in the future, some stores might add additional fields/signatures. assert_eq!(path_info, resp); // get it back - let resp = path_info_service - .get(DUMMY_PATH_DIGEST) - .await - .expect("must succeed"); + let resp = svc.get(DUMMY_PATH_DIGEST).await.expect("must succeed"); assert_eq!(Some(path_info), resp); } diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs index 31ec57aade..ee170468d1 100644 --- a/tvix/store/src/pathinfoservice/tests/utils.rs +++ b/tvix/store/src/pathinfoservice/tests/utils.rs @@ -1,8 +1,10 @@ use std::sync::Arc; use tonic::transport::{Endpoint, Server, Uri}; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; use crate::{ + nar::{NarCalculationService, SimpleRenderer}, pathinfoservice::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService}, proto::{ path_info_service_client::PathInfoServiceClient, @@ -14,7 +16,8 @@ use crate::{ /// Constructs and returns a gRPC PathInfoService. /// We also return memory-based {Blob,Directory}Service, /// as the consumer of this function accepts a 3-tuple. -pub async fn make_grpc_path_info_service_client() -> super::BSDSPS { +pub async fn make_grpc_path_info_service_client( +) -> (impl BlobService, impl DirectoryService, GRPCPathInfoService) { let (left, right) = tokio::io::duplex(64); let blob_service = blob_service(); @@ -26,12 +29,15 @@ pub async fn make_grpc_path_info_service_client() -> super::BSDSPS { let directory_service = directory_service.clone(); async move { let path_info_service: Arc<dyn PathInfoService> = - Arc::from(MemoryPathInfoService::new(blob_service, directory_service)); + Arc::from(MemoryPathInfoService::default()); + let nar_calculation_service = + Box::new(SimpleRenderer::new(blob_service, directory_service)) + as Box<dyn NarCalculationService>; - // spin up a new DirectoryService + // spin up a new PathInfoService let mut server = Server::builder(); let router = server.add_service(PathInfoServiceServer::new( - GRPCPathInfoServiceWrapper::new(path_info_service), + GRPCPathInfoServiceWrapper::new(path_info_service, nar_calculation_service), )); router @@ -43,18 +49,27 @@ pub async fn make_grpc_path_info_service_client() -> super::BSDSPS { // Create a client, connecting to the right side. The URI is unused. let mut maybe_right = Some(right); - let path_info_service = Box::new(GRPCPathInfoService::from_client( - PathInfoServiceClient::new( - Endpoint::try_from("http://[::]:50051") - .unwrap() - .connect_with_connector(tower::service_fn(move |_: Uri| { - let right = maybe_right.take().unwrap(); - async move { Ok::<_, std::io::Error>(right) } - })) - .await - .unwrap(), - ), + let path_info_service = GRPCPathInfoService::from_client(PathInfoServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(right) } + })) + .await + .unwrap(), )); (blob_service, directory_service, path_info_service) } + +#[cfg(all(feature = "cloud", feature = "integration"))] +pub(crate) async fn make_bigtable_path_info_service( +) -> crate::pathinfoservice::BigtablePathInfoService { + use crate::pathinfoservice::bigtable::BigtableParameters; + use crate::pathinfoservice::BigtablePathInfoService; + + BigtablePathInfoService::connect(BigtableParameters::default_for_tests()) + .await + .unwrap() +} diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 9f45818227..68f5575676 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -1,4 +1,4 @@ -use crate::nar::RenderError; +use crate::nar::{NarCalculationService, RenderError}; use crate::pathinfoservice::PathInfoService; use crate::proto; use futures::{stream::BoxStream, TryStreamExt}; @@ -7,23 +7,26 @@ use tonic::{async_trait, Request, Response, Result, Status}; use tracing::{instrument, warn}; use tvix_castore::proto as castorepb; -pub struct GRPCPathInfoServiceWrapper<PS> { - inner: PS, +pub struct GRPCPathInfoServiceWrapper<PS, NS> { + path_info_service: PS, // FUTUREWORK: allow exposing without allowing listing + nar_calculation_service: NS, } -impl<PS> GRPCPathInfoServiceWrapper<PS> { - pub fn new(path_info_service: PS) -> Self { +impl<PS, NS> GRPCPathInfoServiceWrapper<PS, NS> { + pub fn new(path_info_service: PS, nar_calculation_service: NS) -> Self { Self { - inner: path_info_service, + path_info_service, + nar_calculation_service, } } } #[async_trait] -impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS> +impl<PS, NS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS, NS> where PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static, + NS: NarCalculationService + Send + Sync + 'static, { type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>; @@ -39,7 +42,7 @@ where .to_vec() .try_into() .map_err(|_e| Status::invalid_argument("invalid output digest length"))?; - match self.inner.get(digest).await { + match self.path_info_service.get(digest).await { Ok(None) => Err(Status::not_found("PathInfo not found")), Ok(Some(path_info)) => Ok(Response::new(path_info)), Err(e) => { @@ -57,7 +60,7 @@ where // Store the PathInfo in the client. Clients MUST validate the data // they receive, so we don't validate additionally here. - match self.inner.put(path_info).await { + match self.path_info_service.put(path_info).await { Ok(path_info_new) => Ok(Response::new(path_info_new)), Err(e) => { warn!(err = %e, "failed to put PathInfo"); @@ -79,7 +82,7 @@ where Err(Status::invalid_argument("invalid root node"))? } - match self.inner.calculate_nar(&root_node).await { + match self.nar_calculation_service.calculate_nar(&root_node).await { Ok((nar_size, nar_sha256)) => Ok(Response::new(proto::CalculateNarResponse { nar_size, nar_sha256: nar_sha256.to_vec().into(), @@ -99,7 +102,7 @@ where _request: Request<proto::ListPathInfoRequest>, ) -> Result<Response<Self::ListStream>, Status> { let stream = Box::pin( - self.inner + self.path_info_service .list() .map_err(|e| Status::internal(e.to_string())), ); diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs index 0b171377bd..e6e42f6ec4 100644 --- a/tvix/store/src/utils.rs +++ b/tvix/store/src/utils.rs @@ -10,9 +10,10 @@ use tvix_castore::{ directoryservice::{self, DirectoryService}, }; +use crate::nar::{NarCalculationService, SimpleRenderer}; use crate::pathinfoservice::{self, PathInfoService}; -/// Construct the three store handles from their addrs. +/// Construct the store handles from their addrs. pub async fn construct_services( blob_service_addr: impl AsRef<str>, directory_service_addr: impl AsRef<str>, @@ -21,6 +22,7 @@ pub async fn construct_services( Arc<dyn BlobService>, Arc<dyn DirectoryService>, Box<dyn PathInfoService>, + Box<dyn NarCalculationService>, )> { let blob_service: Arc<dyn BlobService> = blobservice::from_addr(blob_service_addr.as_ref()) .await? @@ -36,7 +38,18 @@ pub async fn construct_services( ) .await?; - Ok((blob_service, directory_service, path_info_service)) + // TODO: grpc client also implements NarCalculationService + let nar_calculation_service = Box::new(SimpleRenderer::new( + blob_service.clone(), + directory_service.clone(), + )) as Box<dyn NarCalculationService>; + + Ok(( + blob_service, + directory_service, + path_info_service, + nar_calculation_service, + )) } /// The inverse of [tokio_util::io::SyncIoBridge]. |