diff options
Diffstat (limited to 'tvix/store/src/pathinfoservice')
-rw-r--r-- | tvix/store/src/pathinfoservice/bigtable.rs | 450 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/combinators.rs | 149 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/from_addr.rs | 165 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/fs/mod.rs | 90 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 206 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/lru.rs | 166 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 89 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 100 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/nix_http.rs | 322 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/redb.rs | 218 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 190 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/tests/mod.rs | 82 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/tests/utils.rs | 79 |
13 files changed, 2306 insertions, 0 deletions
diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs new file mode 100644 index 000000000000..15128986ff56 --- /dev/null +++ b/tvix/store/src/pathinfoservice/bigtable.rs @@ -0,0 +1,450 @@ +use super::PathInfoService; +use crate::proto; +use crate::proto::PathInfo; +use async_stream::try_stream; +use bigtable_rs::{bigtable, google::bigtable::v2 as bigtable_v2}; +use bytes::Bytes; +use data_encoding::HEXLOWER; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use prost::Message; +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, DurationSeconds}; +use std::sync::Arc; +use tonic::async_trait; +use tracing::{instrument, trace}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; +use tvix_castore::Error; + +/// There should not be more than 10 MiB in a single cell. +/// https://cloud.google.com/bigtable/docs/schema-design#cells +const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; + +/// Provides a [PathInfoService] implementation using +/// [Bigtable](https://cloud.google.com/bigtable/docs/) +/// as an underlying K/V store. +/// +/// # Data format +/// We use Bigtable as a plain K/V store. +/// The row key is the digest of the store path, in hexlower. +/// Inside the row, we currently have a single column/cell, again using the +/// hexlower store path digest. +/// Its value is the PathInfo message, serialized in canonical protobuf. +/// We currently only populate this column. +/// +/// Listing is ranging over all rows, and calculate_nar is returning a +/// "unimplemented" error. +#[derive(Clone)] +pub struct BigtablePathInfoService { + client: bigtable::BigTable, + params: BigtableParameters, + + #[cfg(test)] + #[allow(dead_code)] + /// Holds the temporary directory containing the unix socket, and the + /// spawned emulator process. + emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, +} + +impl BigtablePathInfoService { + #[cfg(not(test))] + pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + let connection = bigtable::BigTableConnection::new( + ¶ms.project_id, + ¶ms.instance_name, + params.is_read_only, + params.channel_size, + params.timeout, + ) + .await?; + + Ok(Self { + client: connection.client(), + params, + }) + } + + #[cfg(test)] + pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { + use std::time::Duration; + + use async_process::{Command, Stdio}; + use tempfile::TempDir; + use tokio_retry::{strategy::ExponentialBackoff, Retry}; + + let tmpdir = TempDir::new().unwrap(); + + let socket_path = tmpdir.path().join("cbtemulator.sock"); + + let emulator_process = Command::new("cbtemulator") + .arg("-address") + .arg(socket_path.clone()) + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) + .kill_on_drop(true) + .spawn() + .expect("failed to spawn emulator"); + + Retry::spawn( + ExponentialBackoff::from_millis(20) + .max_delay(Duration::from_secs(1)) + .take(3), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // populate the emulator + for cmd in &[ + vec!["createtable", ¶ms.table_name], + vec!["createfamily", ¶ms.table_name, ¶ms.family_name], + ] { + Command::new("cbt") + .args({ + let mut args = vec![ + "-instance", + ¶ms.instance_name, + "-project", + ¶ms.project_id, + ]; + args.extend_from_slice(cmd); + args + }) + .env( + "BIGTABLE_EMULATOR_HOST", + format!("unix://{}", socket_path.to_string_lossy()), + ) + .output() + .await + .expect("failed to run cbt setup command"); + } + + let connection = bigtable_rs::bigtable::BigTableConnection::new_with_emulator( + &format!("unix://{}", socket_path.to_string_lossy()), + ¶ms.project_id, + ¶ms.instance_name, + false, + None, + )?; + + Ok(Self { + client: connection.client(), + params, + emulator: (tmpdir, emulator_process).into(), + }) + } +} + +/// Derives the row/column key for a given output path. +/// We use hexlower encoding, also because it can't be misinterpreted as RE2. +fn derive_pathinfo_key(digest: &[u8; 20]) -> String { + HEXLOWER.encode(digest) +} + +#[async_trait] +impl PathInfoService for BigtablePathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let mut client = self.client.clone(); + let path_info_key = derive_pathinfo_key(&digest); + + let request = bigtable_v2::ReadRowsRequest { + app_profile_id: self.params.app_profile_id.to_string(), + table_name: client.get_full_table_name(&self.params.table_name), + rows_limit: 1, + rows: Some(bigtable_v2::RowSet { + row_keys: vec![path_info_key.clone().into()], + row_ranges: vec![], + }), + // Filter selected family name, and column qualifier matching the digest. + // The latter is to ensure we don't fail once we start adding more metadata. + filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::Chain( + bigtable_v2::row_filter::Chain { + filters: vec![ + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::FamilyNameRegexFilter( + self.params.family_name.to_string(), + ), + ), + }, + bigtable_v2::RowFilter { + filter: Some( + bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + path_info_key.clone().into(), + ), + ), + }, + ], + }, + )), + }), + ..Default::default() + }; + + let mut response = client + .read_rows(request) + .await + .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?; + + if response.len() != 1 { + if response.len() > 1 { + // This shouldn't happen, we limit number of rows to 1 + return Err(Error::StorageError( + "got more than one row from bigtable".into(), + )); + } + // else, this is simply a "not found". + return Ok(None); + } + + let (row_key, mut cells) = response.pop().unwrap(); + if row_key != path_info_key.as_bytes() { + // This shouldn't happen, we requested this row key. + return Err(Error::StorageError( + "got wrong row key from bigtable".into(), + )); + } + + let cell = cells + .pop() + .ok_or_else(|| Error::StorageError("found no cells".into()))?; + + // Ensure there's only one cell (so no more left after the pop()) + // This shouldn't happen, We filter out other cells in our query. + if !cells.is_empty() { + return Err(Error::StorageError( + "more than one cell returned from bigtable".into(), + )); + } + + // We also require the qualifier to be correct in the filter above, + // so this shouldn't happen. + if path_info_key.as_bytes() != cell.qualifier { + return Err(Error::StorageError("unexpected cell qualifier".into())); + } + + // Try to parse the value into a PathInfo message + let path_info = proto::PathInfo::decode(Bytes::from(cell.value)) + .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?; + + let store_path = path_info + .validate() + .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?; + + if store_path.digest() != &digest { + return Err(Error::StorageError("PathInfo has unexpected digest".into())); + } + + Ok(Some(path_info)) + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("pathinfo failed validation: {}", e)))?; + + let mut client = self.client.clone(); + let path_info_key = derive_pathinfo_key(store_path.digest()); + + let data = path_info.encode_to_vec(); + if data.len() as u64 > CELL_SIZE_LIMIT { + return Err(Error::StorageError( + "PathInfo exceeds cell limit on Bigtable".into(), + )); + } + + let resp = client + .check_and_mutate_row(bigtable_v2::CheckAndMutateRowRequest { + table_name: client.get_full_table_name(&self.params.table_name), + app_profile_id: self.params.app_profile_id.to_string(), + row_key: path_info_key.clone().into(), + predicate_filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::ColumnQualifierRegexFilter( + path_info_key.clone().into(), + )), + }), + // If the column was already found, do nothing. + true_mutations: vec![], + // Else, do the insert. + false_mutations: vec![ + // https://cloud.google.com/bigtable/docs/writes + bigtable_v2::Mutation { + mutation: Some(bigtable_v2::mutation::Mutation::SetCell( + bigtable_v2::mutation::SetCell { + family_name: self.params.family_name.to_string(), + column_qualifier: path_info_key.clone().into(), + timestamp_micros: -1, // use server time to fill timestamp + value: data, + }, + )), + }, + ], + }) + .await + .map_err(|e| Error::StorageError(format!("unable to mutate rows: {}", e)))?; + + if resp.predicate_matched { + trace!("already existed") + } + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let mut client = self.client.clone(); + + let request = bigtable_v2::ReadRowsRequest { + app_profile_id: self.params.app_profile_id.to_string(), + table_name: client.get_full_table_name(&self.params.table_name), + filter: Some(bigtable_v2::RowFilter { + filter: Some(bigtable_v2::row_filter::Filter::FamilyNameRegexFilter( + self.params.family_name.to_string(), + )), + }), + ..Default::default() + }; + + let stream = try_stream! { + // TODO: add pagination, we don't want to hold all of this in memory. + let response = client + .read_rows(request) + .await + .map_err(|e| Error::StorageError(format!("unable to read rows: {}", e)))?; + + for (row_key, mut cells) in response { + let cell = cells + .pop() + .ok_or_else(|| Error::StorageError("found no cells".into()))?; + + // The cell must have the same qualifier as the row key + if row_key != cell.qualifier { + Err(Error::StorageError("unexpected cell qualifier".into()))?; + } + + // Ensure there's only one cell (so no more left after the pop()) + // This shouldn't happen, We filter out other cells in our query. + if !cells.is_empty() { + + Err(Error::StorageError( + "more than one cell returned from bigtable".into(), + ))? + } + + // Try to parse the value into a PathInfo message. + let path_info = proto::PathInfo::decode(Bytes::from(cell.value)) + .map_err(|e| Error::StorageError(format!("unable to decode pathinfo proto: {}", e)))?; + + // Validate the containing PathInfo, ensure its StorePath digest + // matches row key. + let store_path = path_info + .validate() + .map_err(|e| Error::StorageError(format!("invalid PathInfo: {}", e)))?; + + let exp_path_info_key = derive_pathinfo_key(store_path.digest()); + + if exp_path_info_key.as_bytes() != row_key.as_slice() { + Err(Error::StorageError("PathInfo has unexpected digest".into()))? + } + + + yield path_info + } + }; + + Box::pin(stream) + } +} + +/// Represents configuration of [BigtablePathInfoService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option<DurationSeconds<String>>")] + #[serde(default = "default_timeout")] + timeout: Option<std::time::Duration>, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +impl BigtableParameters { + #[cfg(test)] + pub fn default_for_tests() -> Self { + Self { + project_id: "project-1".into(), + instance_name: "instance-1".into(), + is_read_only: false, + channel_size: default_channel_size(), + timeout: default_timeout(), + table_name: "table-1".into(), + family_name: "cf1".into(), + app_profile_id: default_app_profile_id(), + } + } +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option<std::time::Duration> { + Some(std::time::Duration::from_secs(4)) +} + +#[async_trait] +impl ServiceBuilder for BigtableParameters { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> { + Ok(Arc::new( + BigtablePathInfoService::connect(self.clone()).await?, + )) + } +} + +impl TryFrom<url::Url> for BigtableParameters { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(mut url: url::Url) -> Result<Self, Self::Error> { + // parse the instance name from the hostname. + let instance_name = url + .host_str() + .ok_or_else(|| Error::StorageError("instance name missing".into()))? + .to_string(); + + // … but add it to the query string now, so we just need to parse that. + url.query_pairs_mut() + .append_pair("instance_name", &instance_name); + + let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; + + Ok(params) + } +} diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs new file mode 100644 index 000000000000..bb5595f72b10 --- /dev/null +++ b/tvix/store/src/pathinfoservice/combinators.rs @@ -0,0 +1,149 @@ +use std::sync::Arc; + +use crate::proto::PathInfo; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use tonic::async_trait; +use tracing::{debug, instrument}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; +use tvix_castore::Error; + +use super::PathInfoService; + +/// Asks near first, if not found, asks far. +/// If found in there, returns it, and *inserts* it into +/// near. +/// There is no negative cache. +/// Inserts and listings are not implemented for now. +pub struct Cache<PS1, PS2> { + near: PS1, + far: PS2, +} + +impl<PS1, PS2> Cache<PS1, PS2> { + pub fn new(near: PS1, far: PS2) -> Self { + Self { near, far } + } +} + +#[async_trait] +impl<PS1, PS2> PathInfoService for Cache<PS1, PS2> +where + PS1: PathInfoService, + PS2: PathInfoService, +{ + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + match self.near.get(digest).await? { + Some(path_info) => { + debug!("serving from cache"); + Ok(Some(path_info)) + } + None => { + debug!("not found in near, asking remote…"); + match self.far.get(digest).await? { + None => Ok(None), + Some(path_info) => { + debug!("found in remote, adding to cache"); + self.near.put(path_info.clone()).await?; + Ok(Some(path_info)) + } + } + } + } + } + + async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> { + Err(Error::StorageError("unimplemented".to_string())) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + Box::pin(tokio_stream::once(Err(Error::StorageError( + "unimplemented".to_string(), + )))) + } +} + +#[derive(serde::Deserialize)] +pub struct CacheConfig { + pub near: String, + pub far: String, +} + +impl TryFrom<url::Url> for CacheConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(_url: url::Url) -> Result<Self, Self::Error> { + Err(Error::StorageError( + "Instantiating a CombinedPathInfoService from a url is not supported".into(), + ) + .into()) + } +} + +#[async_trait] +impl ServiceBuilder for CacheConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (near, far) = futures::join!( + context.resolve(self.near.clone()), + context.resolve(self.far.clone()) + ); + Ok(Arc::new(Cache { + near: near?, + far: far?, + })) + } +} + +#[cfg(test)] +mod test { + use std::num::NonZeroUsize; + + use crate::{ + pathinfoservice::{LruPathInfoService, MemoryPathInfoService, PathInfoService}, + tests::fixtures::PATH_INFO_WITH_NARINFO, + }; + + const PATH_INFO_DIGEST: [u8; 20] = [0; 20]; + + /// Helper function setting up an instance of a "far" and "near" + /// PathInfoService. + async fn create_pathinfoservice() -> super::Cache<LruPathInfoService, MemoryPathInfoService> { + // Create an instance of a "far" PathInfoService. + let far = MemoryPathInfoService::default(); + + // … and an instance of a "near" PathInfoService. + let near = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap()); + + // create a Pathinfoservice combining the two and return it. + super::Cache::new(near, far) + } + + /// Getting from the far backend is gonna insert it into the near one. + #[tokio::test] + async fn test_populate_cache() { + let svc = create_pathinfoservice().await; + + // query the PathInfo, things should not be there. + assert!(svc.get(PATH_INFO_DIGEST).await.unwrap().is_none()); + + // insert it into the far one. + svc.far.put(PATH_INFO_WITH_NARINFO.clone()).await.unwrap(); + + // now try getting it again, it should succeed. + assert_eq!( + Some(PATH_INFO_WITH_NARINFO.clone()), + svc.get(PATH_INFO_DIGEST).await.unwrap() + ); + + // peek near, it should now be there. + assert_eq!( + Some(PATH_INFO_WITH_NARINFO.clone()), + svc.near.get(PATH_INFO_DIGEST).await.unwrap() + ); + } +} diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs new file mode 100644 index 000000000000..d4719219b996 --- /dev/null +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -0,0 +1,165 @@ +use super::PathInfoService; + +use crate::composition::{ + with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG, +}; +use std::sync::Arc; +use tvix_castore::Error; +use url::Url; + +/// Constructs a new instance of a [PathInfoService] from an URI. +/// +/// The following URIs are supported: +/// - `memory:` +/// Uses a in-memory implementation. +/// - `sled:` +/// Uses a in-memory sled implementation. +/// - `sled:///absolute/path/to/somewhere` +/// Uses sled, using a path on the disk for persistency. Can be only opened +/// from one process at the same time. +/// - `redb:` +/// Uses a in-memory redb implementation. +/// - `redb:///absolute/path/to/somewhere` +/// Uses redb, using a path on the disk for persistency. Can be only opened +/// from one process at the same time. +/// - `nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=` +/// Exposes the Nix binary cache as a PathInfoService, ingesting NARs into the +/// {Blob,Directory}Service. You almost certainly want to use this with some cache. +/// The `trusted-public-keys` URL parameter can be provided, which will then +/// enable signature verification. +/// - `grpc+unix:///absolute/path/to/somewhere` +/// Connects to a local tvix-store gRPC service via Unix socket. +/// - `grpc+http://host:port`, `grpc+https://host:port` +/// Connects to a (remote) tvix-store gRPC service. +/// +/// As the [PathInfoService] needs to talk to [BlobService] and [DirectoryService], +/// these also need to be passed in. +pub async fn from_addr( + uri: &str, + context: Option<&CompositionContext<'_>>, +) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> { + #[allow(unused_mut)] + let mut url = + Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; + + let path_info_service_config = with_registry(®, || { + <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>>::try_from( + url, + ) + })? + .0; + let path_info_service = path_info_service_config + .build("anonymous", context.unwrap_or(&CompositionContext::blank())) + .await?; + + Ok(path_info_service) +} + +#[cfg(test)] +mod tests { + use super::from_addr; + use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder}; + use lazy_static::lazy_static; + use rstest::rstest; + use tempfile::TempDir; + use tvix_castore::blobservice::{BlobService, MemoryBlobServiceConfig}; + use tvix_castore::directoryservice::{DirectoryService, MemoryDirectoryServiceConfig}; + + lazy_static! { + static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); + static ref TMPDIR_SLED_2: TempDir = TempDir::new().unwrap(); + static ref TMPDIR_REDB_1: TempDir = TempDir::new().unwrap(); + static ref TMPDIR_REDB_2: TempDir = TempDir::new().unwrap(); + } + + // the gRPC tests below don't fail, because we connect lazily. + + #[rstest] + /// This uses a unsupported scheme. + #[case::unsupported_scheme("http://foo.example/test", false)] + /// This configures sled in temporary mode. + #[case::sled_temporary("sled://", true)] + /// This configures sled with /, which should fail. + #[case::sled_invalid_root("sled:///", false)] + /// This configures sled with a host, not path, which should fail. + #[case::sled_invalid_host("sled://foo.example", false)] + /// This configures sled with a valid path path, which should succeed. + #[case::sled_valid_path(&format!("sled://{}", &TMPDIR_SLED_1.path().to_str().unwrap()), true)] + /// This configures sled with a host, and a valid path path, which should fail. + #[case::sled_invalid_host_with_valid_path(&format!("sled://foo.example{}", &TMPDIR_SLED_2.path().to_str().unwrap()), false)] + /// This correctly sets the scheme, and doesn't set a path. + #[case::memory_valid("memory://", true)] + /// This sets a memory url host to `foo` + #[case::memory_invalid_host("memory://foo", false)] + /// This sets a memory url path to "/", which is invalid. + #[case::memory_invalid_root_path("memory:///", false)] + /// This sets a memory url path to "/foo", which is invalid. + #[case::memory_invalid_root_path_foo("memory:///foo", false)] + /// redb with a host, and a valid path path, which should fail. + #[case::redb_invalid_host_with_valid_path(&format!("redb://foo.example{}", &TMPDIR_REDB_1.path().join("bar").to_str().unwrap()), false)] + /// redb with / as path, which should fail. + #[case::redb_invalid_root("redb:///", false)] + /// This configures redb with a valid path, which should succeed. + #[case::redb_valid_path(&format!("redb://{}", &TMPDIR_REDB_2.path().join("foo").to_str().unwrap()), true)] + /// redb using the in-memory backend, which should succeed. + #[case::redb_valid_in_memory("redb://", true)] + /// Correct Scheme for the cache.nixos.org binary cache. + #[case::correct_nix_https("nix+https://cache.nixos.org", true)] + /// Correct Scheme for the cache.nixos.org binary cache (HTTP URL). + #[case::correct_nix_http("nix+http://cache.nixos.org", true)] + /// Correct Scheme for Nix HTTP Binary cache, with a subpath. + #[case::correct_nix_http_with_subpath("nix+http://192.0.2.1/foo", true)] + /// Correct Scheme for Nix HTTP Binary cache, with a subpath and port. + #[case::correct_nix_http_with_subpath_and_port("nix+http://[::1]:8080/foo", true)] + /// Correct Scheme for the cache.nixos.org binary cache, and correct trusted public key set + #[case::correct_nix_https_with_trusted_public_key("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=", true)] + /// Correct Scheme for the cache.nixos.org binary cache, and two correct trusted public keys set + #[case::correct_nix_https_with_two_trusted_public_keys("nix+https://cache.nixos.org?trusted-public-keys=cache.nixos.org-1:6NCHdD59X431o0gWypbMrAURkbJ16ZPMQFGspcDShjY=%20foo:jp4fCEx9tBEId/L0ZsVJ26k0wC0fu7vJqLjjIGFkup8=", true)] + /// Correct scheme to connect to a unix socket. + #[case::grpc_valid_unix_socket("grpc+unix:///path/to/somewhere", true)] + /// Correct scheme for unix socket, but setting a host too, which is invalid. + #[case::grpc_invalid_unix_socket_and_host("grpc+unix://host.example/path/to/somewhere", false)] + /// Correct scheme to connect to localhost, with port 12345 + #[case::grpc_valid_ipv6_localhost_port_12345("grpc+http://[::1]:12345", true)] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[case::grpc_valid_http_host_without_port("grpc+http://localhost", true)] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[case::grpc_valid_https_host_without_port("grpc+https://localhost", true)] + /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. + #[case::grpc_invalid_host_and_path("grpc+http://localhost/some-path", false)] + /// A valid example for Bigtable. + #[cfg_attr( + all(feature = "cloud", feature = "integration"), + case::bigtable_valid( + "bigtable://instance-1?project_id=project-1&table_name=table-1&family_name=cf1", + true + ) + )] + /// An invalid example for Bigtable, missing fields + #[cfg_attr( + all(feature = "cloud", feature = "integration"), + case::bigtable_invalid_missing_fields("bigtable://instance-1", false) + )] + #[tokio::test] + async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { + let mut comp = Composition::default(); + comp.extend(vec![( + "default".into(), + DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {}) + as Box<dyn ServiceBuilder<Output = dyn BlobService>>), + )]); + comp.extend(vec![( + "default".into(), + DeserializeWithRegistry(Box::new(MemoryDirectoryServiceConfig {}) + as Box<dyn ServiceBuilder<Output = dyn DirectoryService>>), + )]); + + let resp = from_addr(uri_str, Some(&comp.context())).await; + + if exp_succeed { + resp.expect("should succeed"); + } else { + assert!(resp.is_err(), "should fail"); + } + } +} diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs new file mode 100644 index 000000000000..1f7fa8a8afce --- /dev/null +++ b/tvix/store/src/pathinfoservice/fs/mod.rs @@ -0,0 +1,90 @@ +use futures::stream::BoxStream; +use futures::StreamExt; +use nix_compat::store_path::StorePathRef; +use tonic::async_trait; +use tvix_castore::fs::{RootNodes, TvixStoreFs}; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; +use tvix_castore::{Error, Node, PathComponent}; + +use super::PathInfoService; + +/// Helper to construct a [TvixStoreFs] from a [BlobService], [DirectoryService] +/// and [PathInfoService]. +/// This avoids users to have to interact with the wrapper struct directly, as +/// it leaks into the type signature of TvixStoreFS. +pub fn make_fs<BS, DS, PS>( + blob_service: BS, + directory_service: DS, + path_info_service: PS, + list_root: bool, + show_xattr: bool, +) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>> +where + BS: AsRef<dyn BlobService> + Send + Clone + 'static, + DS: AsRef<dyn DirectoryService> + Send + Clone + 'static, + PS: AsRef<dyn PathInfoService> + Send + Sync + Clone + 'static, +{ + TvixStoreFs::new( + blob_service, + directory_service, + RootNodesWrapper(path_info_service), + list_root, + show_xattr, + ) +} + +/// Wrapper to satisfy Rust's orphan rules for trait implementations, as +/// RootNodes is coming from the [tvix-castore] crate. +#[doc(hidden)] +#[derive(Clone, Debug)] +pub struct RootNodesWrapper<T>(pub(crate) T); + +/// Implements root node lookup for any [PathInfoService]. This represents a flat +/// directory structure like /nix/store where each entry in the root filesystem +/// directory corresponds to a CA node. +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +#[async_trait] +impl<T> RootNodes for RootNodesWrapper<T> +where + T: AsRef<dyn PathInfoService> + Send + Sync, +{ + async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error> { + let Ok(store_path) = StorePathRef::from_bytes(name.as_ref()) else { + return Ok(None); + }; + + Ok(self + .0 + .as_ref() + .get(*store_path.digest()) + .await? + .map(|path_info| { + let node = path_info + .node + .as_ref() + .expect("missing root node") + .to_owned(); + + match node.into_name_and_node() { + Ok((_name, node)) => Ok(node), + Err(e) => Err(Error::StorageError(e.to_string())), + } + }) + .transpose()?) + } + + fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>> { + Box::pin(self.0.as_ref().list().map(|result| { + result.and_then(|path_info| { + let node = path_info + .node + .as_ref() + .expect("missing root node") + .to_owned(); + + node.into_name_and_node() + .map_err(|e| Error::StorageError(e.to_string())) + }) + })) + } +} diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs new file mode 100644 index 000000000000..7510ccd911f0 --- /dev/null +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -0,0 +1,206 @@ +use super::PathInfoService; +use crate::{ + nar::NarCalculationService, + proto::{self, ListPathInfoRequest, PathInfo}, +}; +use async_stream::try_stream; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use std::sync::Arc; +use tonic::{async_trait, Code}; +use tracing::{instrument, Span}; +use tracing_indicatif::span_ext::IndicatifSpanExt; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; +use tvix_castore::Error; +use tvix_castore::Node; + +/// Connects to a (remote) tvix-store PathInfoService over gRPC. +#[derive(Clone)] +pub struct GRPCPathInfoService<T> { + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>, +} + +impl<T> GRPCPathInfoService<T> { + /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>, + ) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl<T> PathInfoService for GRPCPathInfoService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let path_info = self + .grpc_client + .clone() + .get(proto::GetPathInfoRequest { + by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( + digest.to_vec().into(), + )), + }) + .await; + + match path_info { + Ok(path_info) => { + let path_info = path_info.into_inner(); + + path_info + .validate() + .map_err(|e| Error::StorageError(format!("invalid pathinfo: {}", e)))?; + + Ok(Some(path_info)) + } + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + let path_info = self + .grpc_client + .clone() + .put(path_info) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + Ok(path_info) + } + + #[instrument(level = "trace", skip_all)] + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let mut grpc_client = self.grpc_client.clone(); + + let stream = try_stream! { + let resp = grpc_client.list(ListPathInfoRequest::default()).await; + + let mut stream = resp.map_err(|e| Error::StorageError(e.to_string()))?.into_inner(); + + loop { + match stream.message().await { + Ok(o) => match o { + Some(pathinfo) => { + // validate the pathinfo + if let Err(e) = pathinfo.validate() { + Err(Error::StorageError(format!( + "pathinfo {:?} failed validation: {}", + pathinfo, e + )))?; + } + yield pathinfo + } + None => { + return; + }, + }, + Err(e) => Err(Error::StorageError(e.to_string()))?, + } + } + }; + + Box::pin(stream) + } +} + +#[async_trait] +impl<T> NarCalculationService for GRPCPathInfoService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ + #[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=1))] + async fn calculate_nar(&self, root_node: &Node) -> Result<(u64, [u8; 32]), Error> { + let span = Span::current(); + span.pb_set_message("Waiting for NAR calculation"); + span.pb_start(); + + let path_info = self + .grpc_client + .clone() + .calculate_nar(tvix_castore::proto::Node::from_name_and_node( + "".into(), + root_node.to_owned(), + )) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + let nar_sha256: [u8; 32] = path_info + .nar_sha256 + .to_vec() + .try_into() + .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; + + Ok((path_info.nar_size, nar_sha256)) + } +} + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCPathInfoServiceConfig { + url: String, +} + +impl TryFrom<url::Url> for GRPCPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + Ok(GRPCPathInfoServiceConfig { + url: url.to_string(), + }) + } +} + +#[async_trait] +impl ServiceBuilder for GRPCPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let client = proto::path_info_service_client::PathInfoServiceClient::new( + tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCPathInfoService::from_client(client))) + } +} + +#[cfg(test)] +mod tests { + use crate::pathinfoservice::tests::make_grpc_path_info_service_client; + use crate::pathinfoservice::PathInfoService; + use crate::tests::fixtures; + + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let (_blob_service, _directory_service, path_info_service) = + make_grpc_path_info_service_client().await; + + let path_info = path_info_service + .get(fixtures::DUMMY_PATH_DIGEST) + .await + .expect("must not be error"); + + assert!(path_info.is_none()); + } +} diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs new file mode 100644 index 000000000000..695c04636089 --- /dev/null +++ b/tvix/store/src/pathinfoservice/lru.rs @@ -0,0 +1,166 @@ +use async_stream::try_stream; +use futures::stream::BoxStream; +use lru::LruCache; +use nix_compat::nixbase32; +use std::num::NonZeroUsize; +use std::sync::Arc; +use tokio::sync::RwLock; +use tonic::async_trait; +use tracing::instrument; + +use crate::proto::PathInfo; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; +use tvix_castore::Error; + +use super::PathInfoService; + +pub struct LruPathInfoService { + lru: Arc<RwLock<LruCache<[u8; 20], PathInfo>>>, +} + +impl LruPathInfoService { + pub fn with_capacity(capacity: NonZeroUsize) -> Self { + Self { + lru: Arc::new(RwLock::new(LruCache::new(capacity))), + } + } +} + +#[async_trait] +impl PathInfoService for LruPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + Ok(self.lru.write().await.get(&digest).cloned()) + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // call validate + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("invalid PathInfo: {}", e)))?; + + self.lru + .write() + .await + .put(*store_path.digest(), path_info.clone()); + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let lru = self.lru.clone(); + Box::pin(try_stream! { + let lru = lru.read().await; + let it = lru.iter(); + + for (_k,v) in it { + yield v.clone() + } + }) + } +} + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct LruPathInfoServiceConfig { + capacity: NonZeroUsize, +} + +impl TryFrom<url::Url> for LruPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(_url: url::Url) -> Result<Self, Self::Error> { + Err(Error::StorageError( + "Instantiating a LruPathInfoService from a url is not supported".into(), + ) + .into()) + } +} + +#[async_trait] +impl ServiceBuilder for LruPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + Ok(Arc::new(LruPathInfoService::with_capacity(self.capacity))) + } +} + +#[cfg(test)] +mod test { + use std::num::NonZeroUsize; + + use crate::{ + pathinfoservice::{LruPathInfoService, PathInfoService}, + proto::PathInfo, + tests::fixtures::PATH_INFO_WITH_NARINFO, + }; + use lazy_static::lazy_static; + use tvix_castore::proto as castorepb; + + lazy_static! { + static ref PATHINFO_1: PathInfo = PATH_INFO_WITH_NARINFO.clone(); + static ref PATHINFO_1_DIGEST: [u8; 20] = [0; 20]; + static ref PATHINFO_2: PathInfo = { + let mut p = PATHINFO_1.clone(); + let root_node = p.node.as_mut().unwrap(); + if let castorepb::Node { node: Some(node) } = root_node { + match node { + castorepb::node::Node::Directory(n) => { + n.name = "11111111111111111111111111111111-dummy2".into() + } + castorepb::node::Node::File(n) => { + n.name = "11111111111111111111111111111111-dummy2".into() + } + castorepb::node::Node::Symlink(n) => { + n.name = "11111111111111111111111111111111-dummy2".into() + } + } + } else { + unreachable!() + } + p + }; + static ref PATHINFO_2_DIGEST: [u8; 20] = *(PATHINFO_2.validate().unwrap()).digest(); + } + + #[tokio::test] + async fn evict() { + let svc = LruPathInfoService::with_capacity(NonZeroUsize::new(1).unwrap()); + + // pathinfo_1 should not be there + assert!(svc + .get(*PATHINFO_1_DIGEST) + .await + .expect("no error") + .is_none()); + + // insert it + svc.put(PATHINFO_1.clone()).await.expect("no error"); + + // now it should be there. + assert_eq!( + Some(PATHINFO_1.clone()), + svc.get(*PATHINFO_1_DIGEST).await.expect("no error") + ); + + // insert pathinfo_2. This will evict pathinfo 1 + svc.put(PATHINFO_2.clone()).await.expect("no error"); + + // now pathinfo 2 should be there. + assert_eq!( + Some(PATHINFO_2.clone()), + svc.get(*PATHINFO_2_DIGEST).await.expect("no error") + ); + + // … but pathinfo 1 not anymore. + assert!(svc + .get(*PATHINFO_1_DIGEST) + .await + .expect("no error") + .is_none()); + } +} diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs new file mode 100644 index 000000000000..3fabd239c7b1 --- /dev/null +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -0,0 +1,89 @@ +use super::PathInfoService; +use crate::proto::PathInfo; +use async_stream::try_stream; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use std::{collections::HashMap, sync::Arc}; +use tokio::sync::RwLock; +use tonic::async_trait; +use tracing::instrument; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; +use tvix_castore::Error; + +#[derive(Default)] +pub struct MemoryPathInfoService { + db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>, +} + +#[async_trait] +impl PathInfoService for MemoryPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let db = self.db.read().await; + + match db.get(&digest) { + None => Ok(None), + Some(path_info) => Ok(Some(path_info.clone())), + } + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + match path_info.validate() { + Err(e) => Err(Error::InvalidRequest(format!( + "failed to validate PathInfo: {}", + e + ))), + + // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. + // This overwrites existing PathInfo objects. + Ok(nix_path) => { + let mut db = self.db.write().await; + db.insert(*nix_path.digest(), path_info.clone()); + + Ok(path_info) + } + } + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let db = self.db.clone(); + + Box::pin(try_stream! { + let db = db.read().await; + let it = db.iter(); + + for (_k, v) in it { + yield v.clone() + } + }) + } +} + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MemoryPathInfoServiceConfig {} + +impl TryFrom<url::Url> for MemoryPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string()).into()); + } + Ok(MemoryPathInfoServiceConfig {}) + } +} + +#[async_trait] +impl ServiceBuilder for MemoryPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + Ok(Arc::new(MemoryPathInfoService::default())) + } +} diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs new file mode 100644 index 000000000000..d118a8af1e73 --- /dev/null +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -0,0 +1,100 @@ +mod combinators; +mod from_addr; +mod grpc; +mod lru; +mod memory; +mod nix_http; +mod redb; +mod sled; + +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +mod fs; + +#[cfg(test)] +mod tests; + +use futures::stream::BoxStream; +use tonic::async_trait; +use tvix_castore::composition::{Registry, ServiceBuilder}; +use tvix_castore::Error; + +use crate::nar::NarCalculationService; +use crate::proto::PathInfo; + +pub use self::combinators::{ + Cache as CachePathInfoService, CacheConfig as CachePathInfoServiceConfig, +}; +pub use self::from_addr::from_addr; +pub use self::grpc::{GRPCPathInfoService, GRPCPathInfoServiceConfig}; +pub use self::lru::{LruPathInfoService, LruPathInfoServiceConfig}; +pub use self::memory::{MemoryPathInfoService, MemoryPathInfoServiceConfig}; +pub use self::nix_http::{NixHTTPPathInfoService, NixHTTPPathInfoServiceConfig}; +pub use self::redb::{RedbPathInfoService, RedbPathInfoServiceConfig}; +pub use self::sled::{SledPathInfoService, SledPathInfoServiceConfig}; + +#[cfg(feature = "cloud")] +mod bigtable; +#[cfg(feature = "cloud")] +pub use self::bigtable::{BigtableParameters, BigtablePathInfoService}; + +#[cfg(any(feature = "fuse", feature = "virtiofs"))] +pub use self::fs::make_fs; + +/// The base trait all PathInfo services need to implement. +#[async_trait] +pub trait PathInfoService: Send + Sync { + /// Retrieve a PathInfo message by the output digest. + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error>; + + /// Store a PathInfo message. Implementations MUST call validate and reject + /// invalid messages. + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>; + + /// Iterate over all PathInfo objects in the store. + /// Implementations can decide to disallow listing. + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>; + + fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> { + None + } +} + +#[async_trait] +impl<A> PathInfoService for A +where + A: AsRef<dyn PathInfoService> + Send + Sync + 'static, +{ + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + self.as_ref().get(digest).await + } + + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + self.as_ref().put(path_info).await + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + self.as_ref().list() + } +} + +/// Registers the builtin PathInfoService implementations with the registry +pub(crate) fn register_pathinfo_services(reg: &mut Registry) { + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, CachePathInfoServiceConfig>("cache"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, GRPCPathInfoServiceConfig>("grpc"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, LruPathInfoServiceConfig>("lru"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, MemoryPathInfoServiceConfig>("memory"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, NixHTTPPathInfoServiceConfig>("nix"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, SledPathInfoServiceConfig>("sled"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, RedbPathInfoServiceConfig>("redb"); + #[cfg(feature = "cloud")] + { + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, BigtableParameters>( + "bigtable", + ); + } +} diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs new file mode 100644 index 000000000000..5f1eed1a0a9f --- /dev/null +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -0,0 +1,322 @@ +use super::PathInfoService; +use crate::{nar::ingest_nar_and_hash, proto::PathInfo}; +use futures::{stream::BoxStream, TryStreamExt}; +use nix_compat::{ + narinfo::{self, NarInfo}, + nixbase32, + nixhash::NixHash, +}; +use reqwest::StatusCode; +use std::sync::Arc; +use tokio::io::{self, AsyncRead}; +use tonic::async_trait; +use tracing::{debug, instrument, warn}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; +use tvix_castore::{ + blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, +}; +use url::Url; + +/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache +/// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix +/// Store Model. +/// It implements the [PathInfoService] trait in an interesting way: +/// Every [PathInfoService::get] fetches the .narinfo and referred NAR file, +/// inserting components into a [BlobService] and [DirectoryService], then +/// returning a [PathInfo] struct with the root. +/// +/// Due to this being quite a costly operation, clients are expected to layer +/// this service with store composition, so they're only ingested once. +/// +/// The client is expected to be (indirectly) using the same [BlobService] and +/// [DirectoryService], so able to fetch referred Directories and Blobs. +/// [PathInfoService::put] is not implemented and returns an error if called. +/// TODO: what about reading from nix-cache-info? +pub struct NixHTTPPathInfoService<BS, DS> { + base_url: url::Url, + http_client: reqwest_middleware::ClientWithMiddleware, + + blob_service: BS, + directory_service: DS, + + /// An optional list of [narinfo::PubKey]. + /// If set, the .narinfo files received need to have correct signature by at least one of these. + public_keys: Option<Vec<narinfo::VerifyingKey>>, +} + +impl<BS, DS> NixHTTPPathInfoService<BS, DS> { + pub fn new(base_url: url::Url, blob_service: BS, directory_service: DS) -> Self { + Self { + base_url, + http_client: reqwest_middleware::ClientBuilder::new(reqwest::Client::new()) + .with(tvix_tracing::propagate::reqwest::tracing_middleware()) + .build(), + blob_service, + directory_service, + + public_keys: None, + } + } + + /// Configures [Self] to validate NARInfo fingerprints with the public keys passed. + pub fn set_public_keys(&mut self, public_keys: Vec<narinfo::VerifyingKey>) { + self.public_keys = Some(public_keys); + } +} + +#[async_trait] +impl<BS, DS> PathInfoService for NixHTTPPathInfoService<BS, DS> +where + BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, + DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static, +{ + #[instrument(skip_all, err, fields(path.digest=nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let narinfo_url = self + .base_url + .join(&format!("{}.narinfo", nixbase32::encode(&digest))) + .map_err(|e| { + warn!(e = %e, "unable to join URL"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to join url") + })?; + + debug!(narinfo_url= %narinfo_url, "constructed NARInfo url"); + + let resp = self + .http_client + .get(narinfo_url) + .send() + .await + .map_err(|e| { + warn!(e=%e,"unable to send NARInfo request"); + io::Error::new( + io::ErrorKind::InvalidInput, + "unable to send NARInfo request", + ) + })?; + + // In the case of a 404, return a NotFound. + // We also return a NotFound in case of a 403 - this is to match the behaviour as Nix, + // when querying nix-cache.s3.amazonaws.com directly, rather than cache.nixos.org. + if resp.status() == StatusCode::NOT_FOUND || resp.status() == StatusCode::FORBIDDEN { + return Ok(None); + } + + let narinfo_str = resp.text().await.map_err(|e| { + warn!(e=%e,"unable to decode response as string"); + io::Error::new( + io::ErrorKind::InvalidData, + "unable to decode response as string", + ) + })?; + + // parse the received narinfo + let narinfo = NarInfo::parse(&narinfo_str).map_err(|e| { + warn!(e=%e,"unable to parse response as NarInfo"); + io::Error::new( + io::ErrorKind::InvalidData, + "unable to parse response as NarInfo", + ) + })?; + + // if [self.public_keys] is set, ensure there's at least one valid signature. + if let Some(public_keys) = &self.public_keys { + let fingerprint = narinfo.fingerprint(); + + if !public_keys.iter().any(|pubkey| { + narinfo + .signatures + .iter() + .any(|sig| pubkey.verify(&fingerprint, sig)) + }) { + warn!("no valid signature found"); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "no valid signature found", + ))?; + } + } + + // Convert to a (sparse) PathInfo. We still need to populate the node field, + // and for this we need to download the NAR file. + // FUTUREWORK: Keep some database around mapping from narsha256 to + // (unnamed) rootnode, so we can use that (and the name from the + // StorePath) and avoid downloading the same NAR a second time. + let pathinfo: PathInfo = (&narinfo).into(); + + // create a request for the NAR file itself. + let nar_url = self.base_url.join(narinfo.url).map_err(|e| { + warn!(e = %e, "unable to join URL"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to join url") + })?; + debug!(nar_url= %nar_url, "constructed NAR url"); + + let resp = self + .http_client + .get(nar_url.clone()) + .send() + .await + .map_err(|e| { + warn!(e=%e,"unable to send NAR request"); + io::Error::new(io::ErrorKind::InvalidInput, "unable to send NAR request") + })?; + + // if the request is not successful, return an error. + if !resp.status().is_success() { + return Err(Error::StorageError(format!( + "unable to retrieve NAR at {}, status {}", + nar_url, + resp.status() + ))); + } + + // get a reader of the response body. + let r = tokio_util::io::StreamReader::new(resp.bytes_stream().map_err(|e| { + let e = e.without_url(); + warn!(e=%e, "failed to get response body"); + io::Error::new(io::ErrorKind::BrokenPipe, e.to_string()) + })); + + // handle decompression, depending on the compression field. + let mut r: Box<dyn AsyncRead + Send + Unpin> = match narinfo.compression { + None => Box::new(r) as Box<dyn AsyncRead + Send + Unpin>, + Some("bzip2") => Box::new(async_compression::tokio::bufread::BzDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("gzip") => Box::new(async_compression::tokio::bufread::GzipDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("xz") => Box::new(async_compression::tokio::bufread::XzDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some("zstd") => Box::new(async_compression::tokio::bufread::ZstdDecoder::new(r)) + as Box<dyn AsyncRead + Send + Unpin>, + Some(comp_str) => { + return Err(Error::StorageError(format!( + "unsupported compression: {comp_str}" + ))); + } + }; + + let (root_node, nar_hash, nar_size) = ingest_nar_and_hash( + self.blob_service.clone(), + self.directory_service.clone(), + &mut r, + ) + .await + .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; + + // ensure the ingested narhash and narsize do actually match. + if narinfo.nar_size != nar_size { + warn!( + narinfo.nar_size = narinfo.nar_size, + http.nar_size = nar_size, + "NarSize mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarSize mismatch".to_string(), + ))?; + } + if narinfo.nar_hash != nar_hash { + warn!( + narinfo.nar_hash = %NixHash::Sha256(narinfo.nar_hash), + http.nar_hash = %NixHash::Sha256(nar_hash), + "NarHash mismatch" + ); + Err(io::Error::new( + io::ErrorKind::InvalidData, + "NarHash mismatch".to_string(), + ))?; + } + + Ok(Some(PathInfo { + node: Some(castorepb::Node::from_name_and_node( + narinfo.store_path.to_string().into(), + root_node, + )), + references: pathinfo.references, + narinfo: pathinfo.narinfo, + })) + } + + #[instrument(skip_all, fields(path_info=?_path_info))] + async fn put(&self, _path_info: PathInfo) -> Result<PathInfo, Error> { + Err(Error::InvalidRequest( + "put not supported for this backend".to_string(), + )) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + Box::pin(futures::stream::once(async { + Err(Error::InvalidRequest( + "list not supported for this backend".to_string(), + )) + })) + } +} + +#[derive(serde::Deserialize)] +pub struct NixHTTPPathInfoServiceConfig { + base_url: String, + blob_service: String, + directory_service: String, + #[serde(default)] + /// An optional list of [narinfo::PubKey]. + /// If set, the .narinfo files received need to have correct signature by at least one of these. + public_keys: Option<Vec<String>>, +} + +impl TryFrom<Url> for NixHTTPPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: Url) -> Result<Self, Self::Error> { + let mut public_keys: Option<Vec<String>> = None; + for (_, v) in url + .query_pairs() + .into_iter() + .filter(|(k, _)| k == "trusted-public-keys") + { + public_keys + .get_or_insert(Default::default()) + .extend(v.split_ascii_whitespace().map(ToString::to_string)); + } + Ok(NixHTTPPathInfoServiceConfig { + // Stringify the URL and remove the nix+ prefix. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(), + blob_service: "default".to_string(), + directory_service: "default".to_string(), + public_keys, + }) + } +} + +#[async_trait] +impl ServiceBuilder for NixHTTPPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (blob_service, directory_service) = futures::join!( + context.resolve(self.blob_service.clone()), + context.resolve(self.directory_service.clone()) + ); + let mut svc = NixHTTPPathInfoService::new( + Url::parse(&self.base_url)?, + blob_service?, + directory_service?, + ); + if let Some(public_keys) = &self.public_keys { + svc.set_public_keys( + public_keys + .iter() + .map(|pubkey_str| { + narinfo::VerifyingKey::parse(pubkey_str) + .map_err(|e| Error::StorageError(format!("invalid public key: {e}"))) + }) + .collect::<Result<Vec<_>, Error>>()?, + ); + } + Ok(Arc::new(svc)) + } +} diff --git a/tvix/store/src/pathinfoservice/redb.rs b/tvix/store/src/pathinfoservice/redb.rs new file mode 100644 index 000000000000..bd0e0fc2b686 --- /dev/null +++ b/tvix/store/src/pathinfoservice/redb.rs @@ -0,0 +1,218 @@ +use super::PathInfoService; +use crate::proto::PathInfo; +use data_encoding::BASE64; +use futures::{stream::BoxStream, StreamExt}; +use prost::Message; +use redb::{Database, ReadableTable, TableDefinition}; +use std::{path::PathBuf, sync::Arc}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::async_trait; +use tracing::{instrument, warn}; +use tvix_castore::{ + composition::{CompositionContext, ServiceBuilder}, + Error, +}; + +const PATHINFO_TABLE: TableDefinition<[u8; 20], Vec<u8>> = TableDefinition::new("pathinfo"); + +/// PathInfoService implementation using redb under the hood. +/// redb stores all of its data in a single file with a K/V pointing from a path's output hash to +/// its corresponding protobuf-encoded PathInfo. +pub struct RedbPathInfoService { + // We wrap db in an Arc to be able to move it into spawn_blocking, + // as discussed in https://github.com/cberner/redb/issues/789 + db: Arc<Database>, +} + +impl RedbPathInfoService { + /// Constructs a new instance using the specified file system path for + /// storage. + pub async fn new(path: PathBuf) -> Result<Self, Error> { + if path == PathBuf::from("/") { + return Err(Error::StorageError( + "cowardly refusing to open / with redb".to_string(), + )); + } + + let db = tokio::task::spawn_blocking(|| -> Result<_, redb::Error> { + let db = redb::Database::create(path)?; + create_schema(&db)?; + Ok(db) + }) + .await??; + + Ok(Self { db: Arc::new(db) }) + } + + /// Constructs a new instance using the in-memory backend. + pub fn new_temporary() -> Result<Self, Error> { + let db = + redb::Database::builder().create_with_backend(redb::backends::InMemoryBackend::new())?; + + create_schema(&db)?; + + Ok(Self { db: Arc::new(db) }) + } +} + +/// Ensures all tables are present. +/// Opens a write transaction and calls open_table on PATHINFO_TABLE, which will +/// create it if not present. +fn create_schema(db: &redb::Database) -> Result<(), redb::Error> { + let txn = db.begin_write()?; + txn.open_table(PATHINFO_TABLE)?; + txn.commit()?; + + Ok(()) +} + +#[async_trait] +impl PathInfoService for RedbPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let db = self.db.clone(); + + tokio::task::spawn_blocking({ + move || { + let txn = db.begin_read()?; + let table = txn.open_table(PATHINFO_TABLE)?; + match table.get(digest)? { + Some(pathinfo_bytes) => Ok(Some( + PathInfo::decode(pathinfo_bytes.value().as_slice()).map_err(|e| { + warn!(err=%e, "failed to decode stored PathInfo"); + Error::StorageError("failed to decode stored PathInfo".to_string()) + })?, + )), + None => Ok(None), + } + } + }) + .await? + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + let store_path = path_info + .validate() + .map_err(|e| { + warn!(err=%e, "failed to validate PathInfo"); + Error::StorageError("failed to validate PathInfo".to_string()) + })? + .to_owned(); + + let path_info_encoded = path_info.encode_to_vec(); + let db = self.db.clone(); + + tokio::task::spawn_blocking({ + move || -> Result<(), Error> { + let txn = db.begin_write()?; + { + let mut table = txn.open_table(PATHINFO_TABLE)?; + table + .insert(store_path.digest(), path_info_encoded) + .map_err(|e| { + warn!(err=%e, "failed to insert PathInfo"); + Error::StorageError("failed to insert PathInfo".to_string()) + })?; + } + Ok(txn.commit()?) + } + }) + .await??; + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let db = self.db.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(50); + + // Spawn a blocking task which writes all PathInfos to tx. + tokio::task::spawn_blocking({ + move || -> Result<(), Error> { + let read_txn = db.begin_read()?; + let table = read_txn.open_table(PATHINFO_TABLE)?; + + for elem in table.iter()? { + let elem = elem?; + tokio::runtime::Handle::current() + .block_on(tx.send(Ok( + PathInfo::decode(elem.1.value().as_slice()).map_err(|e| { + warn!(err=%e, "invalid PathInfo"); + Error::StorageError("invalid PathInfo".to_string()) + })?, + ))) + .map_err(|e| Error::StorageError(e.to_string()))?; + } + + Ok(()) + } + }); + + ReceiverStream::from(rx).boxed() + } +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct RedbPathInfoServiceConfig { + is_temporary: bool, + #[serde(default)] + /// required when is_temporary = false + path: Option<PathBuf>, +} + +impl TryFrom<url::Url> for RedbPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // redb doesn't support host, and a path can be provided (otherwise it'll live in memory only) + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string()).into()); + } + + Ok(if url.path().is_empty() { + RedbPathInfoServiceConfig { + is_temporary: true, + path: None, + } + } else { + RedbPathInfoServiceConfig { + is_temporary: false, + path: Some(url.path().into()), + } + }) + } +} + +#[async_trait] +impl ServiceBuilder for RedbPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + match self { + RedbPathInfoServiceConfig { + is_temporary: true, + path: None, + } => Ok(Arc::new(RedbPathInfoService::new_temporary()?)), + RedbPathInfoServiceConfig { + is_temporary: true, + path: Some(_), + } => Err( + Error::StorageError("Temporary RedbPathInfoService can not have path".into()) + .into(), + ), + RedbPathInfoServiceConfig { + is_temporary: false, + path: None, + } => Err(Error::StorageError("RedbPathInfoService is missing path".into()).into()), + RedbPathInfoServiceConfig { + is_temporary: false, + path: Some(path), + } => Ok(Arc::new(RedbPathInfoService::new(path.to_owned()).await?)), + } + } +} diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs new file mode 100644 index 000000000000..837eb9d079e1 --- /dev/null +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -0,0 +1,190 @@ +use super::PathInfoService; +use crate::proto::PathInfo; +use async_stream::try_stream; +use futures::stream::BoxStream; +use nix_compat::nixbase32; +use prost::Message; +use std::path::Path; +use std::sync::Arc; +use tonic::async_trait; +use tracing::{instrument, warn}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; +use tvix_castore::Error; + +/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). +/// +/// The PathInfo messages are stored as encoded protos, and keyed by their output hash, +/// as that's currently the only request type available. +pub struct SledPathInfoService { + db: sled::Db, +} + +impl SledPathInfoService { + pub fn new<P: AsRef<Path>>(p: P) -> Result<Self, sled::Error> { + if p.as_ref() == Path::new("/") { + return Err(sled::Error::Unsupported( + "cowardly refusing to open / with sled".to_string(), + )); + } + + let config = sled::Config::default() + .use_compression(false) // is a required parameter + .path(p); + let db = config.open()?; + + Ok(Self { db }) + } + + pub fn new_temporary() -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { db }) + } +} + +#[async_trait] +impl PathInfoService for SledPathInfoService { + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let resp = tokio::task::spawn_blocking({ + let db = self.db.clone(); + move || db.get(digest.as_slice()) + }) + .await? + .map_err(|e| { + warn!("failed to retrieve PathInfo: {}", e); + Error::StorageError(format!("failed to retrieve PathInfo: {}", e)) + })?; + match resp { + None => Ok(None), + Some(data) => { + let path_info = PathInfo::decode(&*data).map_err(|e| { + warn!("failed to decode stored PathInfo: {}", e); + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?; + Ok(Some(path_info)) + } + } + } + + #[instrument(level = "trace", skip_all, fields(path_info.root_node = ?path_info.node))] + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + let store_path = path_info + .validate() + .map_err(|e| Error::InvalidRequest(format!("failed to validate PathInfo: {}", e)))?; + + // In case the PathInfo is valid, we were able to parse a StorePath. + // Store it in the database, keyed by its digest. + // This overwrites existing PathInfo objects. + tokio::task::spawn_blocking({ + let db = self.db.clone(); + let k = *store_path.digest(); + let data = path_info.encode_to_vec(); + move || db.insert(k, data) + }) + .await? + .map_err(|e| { + warn!("failed to insert PathInfo: {}", e); + Error::StorageError(format! { + "failed to insert PathInfo: {}", e + }) + })?; + + Ok(path_info) + } + + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { + let db = self.db.clone(); + let mut it = db.iter().values(); + + Box::pin(try_stream! { + // Don't block the executor while waiting for .next(), so wrap that + // in a spawn_blocking call. + // We need to pass around it to be able to reuse it. + while let (Some(elem), new_it) = tokio::task::spawn_blocking(move || { + (it.next(), it) + }).await? { + it = new_it; + let data = elem.map_err(|e| { + warn!("failed to retrieve PathInfo: {}", e); + Error::StorageError(format!("failed to retrieve PathInfo: {}", e)) + })?; + + let path_info = PathInfo::decode(&*data).map_err(|e| { + warn!("failed to decode stored PathInfo: {}", e); + Error::StorageError(format!("failed to decode stored PathInfo: {}", e)) + })?; + + yield path_info + } + }) + } +} + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct SledPathInfoServiceConfig { + is_temporary: bool, + #[serde(default)] + /// required when is_temporary = false + path: Option<String>, +} + +impl TryFrom<url::Url> for SledPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // sled doesn't support host, and a path can be provided (otherwise + // it'll live in memory only). + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string()).into()); + } + + // TODO: expose compression and other parameters as URL parameters? + + Ok(if url.path().is_empty() { + SledPathInfoServiceConfig { + is_temporary: true, + path: None, + } + } else { + SledPathInfoServiceConfig { + is_temporary: false, + path: Some(url.path().to_string()), + } + }) + } +} + +#[async_trait] +impl ServiceBuilder for SledPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + match self { + SledPathInfoServiceConfig { + is_temporary: true, + path: None, + } => Ok(Arc::new(SledPathInfoService::new_temporary()?)), + SledPathInfoServiceConfig { + is_temporary: true, + path: Some(_), + } => Err( + Error::StorageError("Temporary SledPathInfoService can not have path".into()) + .into(), + ), + SledPathInfoServiceConfig { + is_temporary: false, + path: None, + } => Err(Error::StorageError("SledPathInfoService is missing path".into()).into()), + SledPathInfoServiceConfig { + is_temporary: false, + path: Some(path), + } => Ok(Arc::new(SledPathInfoService::new(path)?)), + } + } +} diff --git a/tvix/store/src/pathinfoservice/tests/mod.rs b/tvix/store/src/pathinfoservice/tests/mod.rs new file mode 100644 index 000000000000..777588e9beda --- /dev/null +++ b/tvix/store/src/pathinfoservice/tests/mod.rs @@ -0,0 +1,82 @@ +//! This contains test scenarios that a given [PathInfoService] needs to pass. +//! We use [rstest] and [rstest_reuse] to provide all services we want to test +//! against, and then apply this template to all test functions. + +use futures::TryStreamExt; +use rstest::*; +use rstest_reuse::{self, *}; + +use super::PathInfoService; +use crate::pathinfoservice::redb::RedbPathInfoService; +use crate::pathinfoservice::MemoryPathInfoService; +use crate::pathinfoservice::SledPathInfoService; +use crate::proto::PathInfo; +use crate::tests::fixtures::DUMMY_PATH_DIGEST; +use tvix_castore::proto as castorepb; + +mod utils; +pub use self::utils::make_grpc_path_info_service_client; + +#[cfg(all(feature = "cloud", feature = "integration"))] +use self::utils::make_bigtable_path_info_service; + +#[template] +#[rstest] +#[case::memory(MemoryPathInfoService::default())] +#[case::grpc({ + let (_, _, svc) = make_grpc_path_info_service_client().await; + svc +})] +#[case::sled(SledPathInfoService::new_temporary().unwrap())] +#[case::redb(RedbPathInfoService::new_temporary().unwrap())] +#[cfg_attr(all(feature = "cloud",feature="integration"), case::bigtable(make_bigtable_path_info_service().await))] +pub fn path_info_services(#[case] svc: impl PathInfoService) {} + +// FUTUREWORK: add more tests rejecting invalid PathInfo messages. +// A subset of them should also ensure references to other PathInfos, or +// elements in {Blob,Directory}Service do exist. + +/// Trying to get a non-existent PathInfo should return Ok(None). +#[apply(path_info_services)] +#[tokio::test] +async fn not_found(svc: impl PathInfoService) { + assert!(svc + .get(DUMMY_PATH_DIGEST) + .await + .expect("must succeed") + .is_none()); +} + +/// Put a PathInfo into the store, get it back. +#[apply(path_info_services)] +#[tokio::test] +async fn put_get(svc: impl PathInfoService) { + let path_info = PathInfo { + node: Some(castorepb::Node { + node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode { + name: "00000000000000000000000000000000-foo".into(), + target: "doesntmatter".into(), + })), + }), + ..Default::default() + }; + + // insert + let resp = svc.put(path_info.clone()).await.expect("must succeed"); + + // expect the returned PathInfo to be equal (for now) + // in the future, some stores might add additional fields/signatures. + assert_eq!(path_info, resp); + + // get it back + let resp = svc.get(DUMMY_PATH_DIGEST).await.expect("must succeed"); + + assert_eq!(Some(path_info.clone()), resp); + + // Ensure the listing endpoint works, and returns the same path_info. + // FUTUREWORK: split this, some impls might (rightfully) not support listing + let pathinfos: Vec<PathInfo> = svc.list().try_collect().await.expect("must succeed"); + + // We should get a single pathinfo back, the one we inserted. + assert_eq!(vec![path_info], pathinfos); +} diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs new file mode 100644 index 000000000000..8b192e303b89 --- /dev/null +++ b/tvix/store/src/pathinfoservice/tests/utils.rs @@ -0,0 +1,79 @@ +use std::sync::Arc; + +use hyper_util::rt::TokioIo; +use tonic::transport::{Endpoint, Server, Uri}; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; + +use crate::{ + nar::{NarCalculationService, SimpleRenderer}, + pathinfoservice::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService}, + proto::{ + path_info_service_client::PathInfoServiceClient, + path_info_service_server::PathInfoServiceServer, GRPCPathInfoServiceWrapper, + }, + tests::fixtures::{blob_service, directory_service}, +}; + +/// Constructs and returns a gRPC PathInfoService. +/// We also return memory-based {Blob,Directory}Service, +/// as the consumer of this function accepts a 3-tuple. +pub async fn make_grpc_path_info_service_client() -> ( + impl BlobService, + impl DirectoryService, + GRPCPathInfoService<tonic::transport::Channel>, +) { + let (left, right) = tokio::io::duplex(64); + + let blob_service = blob_service(); + let directory_service = directory_service(); + + // spin up a server, which will only connect once, to the left side. + tokio::spawn({ + let blob_service = blob_service.clone(); + let directory_service = directory_service.clone(); + async move { + let path_info_service: Arc<dyn PathInfoService> = + Arc::from(MemoryPathInfoService::default()); + let nar_calculation_service = + Box::new(SimpleRenderer::new(blob_service, directory_service)) + as Box<dyn NarCalculationService>; + + // spin up a new PathInfoService + let mut server = Server::builder(); + let router = server.add_service(PathInfoServiceServer::new( + GRPCPathInfoServiceWrapper::new(path_info_service, nar_calculation_service), + )); + + router + .serve_with_incoming(tokio_stream::once(Ok::<_, std::io::Error>(left))) + .await + } + }); + + // Create a client, connecting to the right side. The URI is unused. + let mut maybe_right = Some(right); + + let path_info_service = GRPCPathInfoService::from_client(PathInfoServiceClient::new( + Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector(tower::service_fn(move |_: Uri| { + let right = maybe_right.take().unwrap(); + async move { Ok::<_, std::io::Error>(TokioIo::new(right)) } + })) + .await + .unwrap(), + )); + + (blob_service, directory_service, path_info_service) +} + +#[cfg(all(feature = "cloud", feature = "integration"))] +pub(crate) async fn make_bigtable_path_info_service( +) -> crate::pathinfoservice::BigtablePathInfoService { + use crate::pathinfoservice::bigtable::BigtableParameters; + use crate::pathinfoservice::BigtablePathInfoService; + + BigtablePathInfoService::connect(BigtableParameters::default_for_tests()) + .await + .unwrap() +} |