diff options
author | Florian Klink <flokli@flokli.de> | 2023-05-11T15·01+0300 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-05-24T09·25+0000 |
commit | 5774117d5ea4115a33923640a39a553eb57df59a (patch) | |
tree | 2e77cdb73837f688a7bc268366d6846ef5a503f9 /tvix | |
parent | 4b00f1d7ac855e90e15633a2e2ba3a90c3921fac (diff) |
feat(tvix/store): implement TvixStoreIO r/6191
This providesEvalIO, asking given PathInfoService, DirectoryService and BlobService. Change-Id: I32f210f5a7aa8173ad9a7d53e8a5ac03619f527a Reviewed-on: https://cl.tvl.fyi/c/depot/+/8561 Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de> Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/Cargo.lock | 3 | ||||
-rw-r--r-- | tvix/Cargo.nix | 12 | ||||
-rw-r--r-- | tvix/store/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 54 | ||||
-rw-r--r-- | tvix/store/src/lib.rs | 2 | ||||
-rw-r--r-- | tvix/store/src/store_io.rs | 363 |
6 files changed, 398 insertions, 39 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 9c2e464a6da4..ad5bbeed5340 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -2708,8 +2708,10 @@ dependencies = [ "prost", "prost-build", "rayon", + "serde_json", "sha2 0.10.6", "sled", + "smol_str", "tempfile", "test-case", "thiserror", @@ -2723,6 +2725,7 @@ dependencies = [ "tower", "tracing", "tracing-subscriber", + "tvix-eval", "walkdir", ] diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index cf571a36b0d6..f7cc72d6f815 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -8033,6 +8033,10 @@ rec { packageId = "rayon"; } { + name = "serde_json"; + packageId = "serde_json"; + } + { name = "sha2"; packageId = "sha2 0.10.6"; } @@ -8042,6 +8046,10 @@ rec { features = [ "compression" ]; } { + name = "smol_str"; + packageId = "smol_str"; + } + { name = "thiserror"; packageId = "thiserror"; } @@ -8082,6 +8090,10 @@ rec { features = [ "json" ]; } { + name = "tvix-eval"; + packageId = "tvix-eval"; + } + { name = "walkdir"; packageId = "walkdir"; } diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index be282dddb015..a88bdefd7aac 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -15,6 +15,7 @@ prost = "0.11.2" rayon = "1.6.1" sha2 = "0.10.6" sled = { version = "0.34.7", features = ["compression"] } +tvix-eval = { path = "../eval" } thiserror = "1.0.38" tokio-stream = "0.1.14" tokio = { version = "1.28.0", features = ["rt-multi-thread", "net"] } @@ -26,6 +27,8 @@ tokio-util = { version = "0.7.8", features = ["io", "io-util"] } tower = "0.4.13" futures = "0.3.28" bytes = "1.4.0" +smol_str = "0.2.0" +serde_json = "1.0" [dependencies.tonic-reflection] optional = true diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index a6bda8c1873b..dbd9e2986e24 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -1,16 +1,9 @@ use clap::Subcommand; use data_encoding::BASE64; -use nix_compat::derivation::Derivation; -use nix_compat::derivation::Output; -use nix_compat::nixhash::HashAlgo; -use nix_compat::nixhash::NixHash; -use nix_compat::nixhash::NixHashWithMode; use std::path::PathBuf; use tracing_subscriber::prelude::*; use tvix_store::blobservice::SledBlobService; use tvix_store::directoryservice::SledDirectoryService; -use tvix_store::import::ingest_path; -use tvix_store::nar::NARCalculationService; use tvix_store::nar::NonCachingNARCalculationService; use tvix_store::pathinfoservice::SledPathInfoService; use tvix_store::proto::blob_service_server::BlobServiceServer; @@ -19,6 +12,7 @@ use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCBlobServiceWrapper; use tvix_store::proto::GRPCDirectoryServiceWrapper; use tvix_store::proto::GRPCPathInfoServiceWrapper; +use tvix_store::TvixStoreIO; #[cfg(feature = "reflection")] use tvix_store::proto::FILE_DESCRIPTOR_SET; @@ -85,8 +79,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber"); // initialize stores - let mut blob_service = SledBlobService::new("blobs.sled".into())?; - let mut directory_service = SledDirectoryService::new("directories.sled".into())?; + let blob_service = SledBlobService::new("blobs.sled".into())?; + let directory_service = SledDirectoryService::new("directories.sled".into())?; let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?; match cli.command { @@ -134,37 +128,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { directory_service.clone(), ); + let mut io = TvixStoreIO::new( + blob_service, + directory_service, + path_info_service, + nar_calculation_service, + ); + for path in paths { - let root_node = ingest_path(&mut blob_service, &mut directory_service, &path)?; - - let nar_hash = NixHashWithMode::Recursive(NixHash::new( - HashAlgo::Sha256, - nar_calculation_service - .calculate_nar(&root_node)? - .1 - .to_vec(), - )); - - let mut drv = Derivation::default(); - drv.outputs.insert( - "out".to_string(), - Output { - path: "".to_string(), - hash_with_mode: Some(nar_hash), - }, - ); - drv.calculate_output_paths( - path.file_name() - .expect("path must not be ..") - .to_str() - .expect("path must be valid unicode"), - // Note the derivation_or_fod_hash argument is *unused* for FODs, so it doesn't matter what we pass here. - &NixHash::new(HashAlgo::Sha256, vec![]), - )?; - - println!("{}", drv.outputs.get("out").unwrap().path); - - match root_node { + let path_info = io + .import_path_with_pathinfo(&path) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; + + match path_info.node.unwrap().node.unwrap() { tvix_store::proto::node::Node::Directory(directory_node) => { info!( path = ?path, diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index da0b6400a10b..7ae8587f8b26 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,5 +1,6 @@ mod digests; mod errors; +mod store_io; pub mod blobservice; pub mod directoryservice; @@ -10,6 +11,7 @@ pub mod proto; pub use digests::B3Digest; pub use errors::Error; +pub use store_io::TvixStoreIO; #[cfg(test)] mod tests; diff --git a/tvix/store/src/store_io.rs b/tvix/store/src/store_io.rs new file mode 100644 index 000000000000..e4c45f9e5952 --- /dev/null +++ b/tvix/store/src/store_io.rs @@ -0,0 +1,363 @@ +//! This module provides an implementation of EvalIO. +//! +//! It can be used by the tvix evalutator to talk to a tvix store. + +use data_encoding::BASE64; +use nix_compat::{ + nixhash::{HashAlgo, NixHash, NixHashWithMode}, + store_path::{build_regular_ca_path, StorePath}, +}; +use smol_str::SmolStr; +use std::{io, path::Path, path::PathBuf}; +use tracing::{error, instrument, warn}; +use tvix_eval::{EvalIO, FileType, StdIO}; + +use crate::{ + blobservice::BlobService, + directoryservice::{self, DirectoryService}, + import, + nar::NARCalculationService, + pathinfoservice::PathInfoService, + proto::NamedNode, + B3Digest, +}; + +/// Implements [EvalIO], asking given [PathInfoService], [DirectoryService] +/// and [BlobService]. +/// +/// In case the given path does not exist in these stores, we ask StdIO. +/// This is to both cover cases of syntactically valid store paths, that exist +/// on the filesystem (still managed by Nix), as well as being able to read +/// files outside store paths. +pub struct TvixStoreIO< + BS: BlobService, + DS: DirectoryService, + PS: PathInfoService, + NCS: NARCalculationService, +> { + blob_service: BS, + directory_service: DS, + path_info_service: PS, + nar_calculation_service: NCS, + std_io: StdIO, +} + +impl<BS: BlobService, DS: DirectoryService, PS: PathInfoService, NCS: NARCalculationService> + TvixStoreIO<BS, DS, PS, NCS> +{ + pub fn new( + blob_service: BS, + directory_service: DS, + path_info_service: PS, + nar_calculation_service: NCS, + ) -> Self { + Self { + blob_service, + directory_service, + path_info_service, + nar_calculation_service, + std_io: StdIO {}, + } + } + + /// for a given [StorePath] and additional [Path] inside the store path, + /// look up the [PathInfo], and if it exists, traverse the directory structure to + /// return the [crate::proto::node::Node] specified by `sub_path`. + #[instrument(skip(self), ret, err)] + fn store_path_to_root_node( + &mut self, + store_path: &StorePath, + sub_path: &Path, + ) -> Result<Option<crate::proto::node::Node>, crate::Error> { + let path_info = { + match self.path_info_service.get(store_path.digest)? { + // If there's no PathInfo found, early exit + None => return Ok(None), + Some(path_info) => path_info, + } + }; + + let root_node = { + match path_info.node { + None => { + warn!( + "returned PathInfo {:?} node is None, this shouldn't happen.", + &path_info + ); + return Ok(None); + } + Some(root_node) => match root_node.node { + None => { + warn!("node for {:?} is None, this shouldn't happen.", &root_node); + return Ok(None); + } + Some(root_node) => root_node, + }, + } + }; + + directoryservice::traverse_to(&mut self.directory_service, root_node, sub_path) + } + + /// Imports a given path on the filesystem into the store, and returns the + /// [crate::proto::PathInfo] describing the path, that was sent to + /// [PathInfoService]. + /// While not part of the [EvalIO], it's still useful for clients who + /// care about the [PathInfo]. + #[instrument(skip(self), ret, err)] + pub fn import_path_with_pathinfo( + &mut self, + path: &std::path::Path, + ) -> Result<crate::proto::PathInfo, io::Error> { + // Call [import::ingest_path], which will walk over the given path and return a root_node. + let root_node = + import::ingest_path(&mut self.blob_service, &mut self.directory_service, path) + .expect("error during import_path"); + + // Render the NAR + let (nar_size, nar_sha256) = self + .nar_calculation_service + .calculate_nar(&root_node) + .expect("error during nar calculation"); // TODO: handle error + + // For given NAR sha256 digest and name, return the new [StorePath] this would have. + let nar_hash_with_mode = + NixHashWithMode::Recursive(NixHash::new(HashAlgo::Sha256, nar_sha256.to_vec())); + + let name = path + .file_name() + .expect("path must not be ..") + .to_str() + .expect("path must be valid unicode"); + + let output_path = + build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap(); + + // assemble a new root_node with a name that is derived from the nar hash. + let renamed_root_node = { + let name = output_path.to_string(); + + match root_node { + crate::proto::node::Node::Directory(n) => { + crate::proto::node::Node::Directory(crate::proto::DirectoryNode { name, ..n }) + } + crate::proto::node::Node::File(n) => { + crate::proto::node::Node::File(crate::proto::FileNode { name, ..n }) + } + crate::proto::node::Node::Symlink(n) => { + crate::proto::node::Node::Symlink(crate::proto::SymlinkNode { name, ..n }) + } + } + }; + + // assemble the [crate::proto::PathInfo] object. + let path_info = crate::proto::PathInfo { + node: Some(crate::proto::Node { + node: Some(renamed_root_node), + }), + // There's no reference scanning on path contents ingested like this. + references: vec![], + narinfo: Some(crate::proto::NarInfo { + nar_size, + nar_sha256: nar_sha256.to_vec(), + signatures: vec![], + reference_names: vec![], + // TODO: narinfo for talosctl.src contains `CA: fixed:r:sha256:1x13j5hy75221bf6kz7cpgld9vgic6bqx07w5xjs4pxnksj6lxb6` + // do we need this anywhere? + }), + }; + + // put into [PathInfoService], and return the PathInfo that we get back + // from there (it might contain additional signatures). + let path_info = self.path_info_service.put(path_info)?; + + Ok(path_info) + } +} + +/// For given NAR sha256 digest and name, return the new [StorePath] this would have. +#[instrument(skip(nar_sha256_digest), ret, fields(nar_sha256_digest=BASE64.encode(nar_sha256_digest)))] +fn calculate_nar_based_store_path(nar_sha256_digest: &[u8; 32], name: &str) -> StorePath { + let nar_hash_with_mode = + NixHashWithMode::Recursive(NixHash::new(HashAlgo::Sha256, nar_sha256_digest.to_vec())); + + build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap() +} + +impl< + BS: BlobService + Clone, + DS: DirectoryService + Clone, + PS: PathInfoService, + NCS: NARCalculationService, + > EvalIO for TvixStoreIO<BS, DS, PS, NCS> +{ + #[instrument(skip(self), ret, err)] + fn path_exists(&mut self, path: &Path) -> Result<bool, io::Error> { + if let Ok((store_path, sub_path)) = + StorePath::from_absolute_path_full(&path.to_string_lossy()) + { + if self + .store_path_to_root_node(&store_path, &sub_path)? + .is_some() + { + Ok(true) + } else { + // As tvix-store doesn't manage /nix/store on the filesystem, + // we still need to also ask self.std_io here. + self.std_io.path_exists(path) + } + } else { + // The store path is no store path, so do regular StdIO. + self.std_io.path_exists(path) + } + } + + #[instrument(skip(self), ret, err)] + fn read_to_string(&mut self, path: &Path) -> Result<String, io::Error> { + if let Ok((store_path, sub_path)) = + StorePath::from_absolute_path_full(&path.to_string_lossy()) + { + if let Some(node) = self.store_path_to_root_node(&store_path, &sub_path)? { + // depending on the node type, treat read_to_string differently + match node { + crate::proto::node::Node::Directory(_) => { + // This would normally be a io::ErrorKind::IsADirectory (still unstable) + Err(io::Error::new( + io::ErrorKind::Unsupported, + "tried to read directory at {path} to string", + )) + } + crate::proto::node::Node::File(file_node) => { + let digest = + B3Digest::from_vec(file_node.digest.clone()).map_err(|_e| { + error!( + file_node = ?file_node, + "invalid digest" + ); + io::Error::new( + io::ErrorKind::InvalidData, + format!("invalid digest length in file node: {:?}", file_node), + ) + })?; + + let reader = { + let resp = self.blob_service.open_read(&digest)?; + match resp { + Some(blob_reader) => blob_reader, + None => { + error!( + blob.digest = %digest, + "blob not found", + ); + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("blob {} not found", &digest), + ))? + } + } + }; + + io::read_to_string(reader) + } + crate::proto::node::Node::Symlink(_symlink_node) => Err(io::Error::new( + io::ErrorKind::Unsupported, + "read_to_string for symlinks is unsupported", + ))?, + } + } else { + // As tvix-store doesn't manage /nix/store on the filesystem, + // we still need to also ask self.std_io here. + self.std_io.read_to_string(path) + } + } else { + // The store path is no store path, so do regular StdIO. + self.std_io.read_to_string(path) + } + } + + #[instrument(skip(self), ret, err)] + fn read_dir(&mut self, path: &Path) -> Result<Vec<(SmolStr, FileType)>, io::Error> { + if let Ok((store_path, sub_path)) = + StorePath::from_absolute_path_full(&path.to_string_lossy()) + { + if let Some(node) = self.store_path_to_root_node(&store_path, &sub_path)? { + match node { + crate::proto::node::Node::Directory(directory_node) => { + // fetch the Directory itself. + let digest = + B3Digest::from_vec(directory_node.digest.clone()).map_err(|_e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "invalid digest length in directory node: {:?}", + directory_node + ), + ) + })?; + + if let Some(directory) = self.directory_service.get(&digest)? { + let mut children: Vec<(SmolStr, FileType)> = Vec::new(); + for node in directory.nodes() { + children.push(match node { + crate::proto::node::Node::Directory(e) => { + (e.name.into(), FileType::Directory) + } + crate::proto::node::Node::File(e) => { + (e.name.into(), FileType::Regular) + } + crate::proto::node::Node::Symlink(e) => { + (e.name.into(), FileType::Symlink) + } + }) + } + Ok(children) + } else { + // If we didn't get the directory node that's linked, that's a store inconsistency! + error!( + directory.digest = %digest, + path = ?path, + "directory not found", + ); + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("directory {digest} does not exist"), + ))? + } + } + crate::proto::node::Node::File(_file_node) => { + // This would normally be a io::ErrorKind::NotADirectory (still unstable) + Err(io::Error::new( + io::ErrorKind::Unsupported, + "tried to readdir path {:?}, which is a file", + ))? + } + crate::proto::node::Node::Symlink(_symlink_node) => Err(io::Error::new( + io::ErrorKind::Unsupported, + "read_dir for symlinks is unsupported", + ))?, + } + } else { + self.std_io.read_dir(path) + } + } else { + self.std_io.read_dir(path) + } + } + + #[instrument(skip(self), ret, err)] + fn import_path(&mut self, path: &std::path::Path) -> Result<PathBuf, std::io::Error> { + let path_info = self.import_path_with_pathinfo(path)?; + + // from the [PathInfo], extract the store path (as string). + let mut path = PathBuf::from(nix_compat::store_path::STORE_DIR_WITH_SLASH); + path.push(path_info.node.unwrap().node.unwrap().get_name()); + + // and return it + Ok(path) + } + + #[instrument(skip(self), ret)] + fn store_dir(&self) -> Option<String> { + Some("/nix/store".to_string()) + } +} |