diff options
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 62 | ||||
-rw-r--r-- | tvix/store/src/composition.rs | 22 | ||||
-rw-r--r-- | tvix/store/src/lib.rs | 1 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/bigtable.rs | 140 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/combinators.rs | 38 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/from_addr.rs | 151 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 36 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/lru.rs | 29 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 28 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 40 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/nix_http.rs | 71 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 68 | ||||
-rw-r--r-- | tvix/store/src/utils.rs | 121 |
13 files changed, 559 insertions, 248 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 82c73f8f416c..99323d2a50a1 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -16,7 +16,6 @@ use tracing::{info, info_span, instrument, Level, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt as _; use tvix_castore::import::fs::ingest_path; use tvix_store::nar::NarCalculationService; -use tvix_store::pathinfoservice::CachePathInfoService; use tvix_store::proto::NarInfo; use tvix_store::proto::PathInfo; @@ -210,31 +209,38 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync remote_path_info_service_addr, } => { // initialize stores - let (blob_service, directory_service, path_info_service, nar_calculation_service) = - tvix_store::utils::construct_services( - blob_service_addr, - directory_service_addr, - path_info_service_addr, - ) - .await?; + let mut configs = tvix_store::utils::addrs_to_configs( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + )?; // if remote_path_info_service_addr has been specified, // update path_info_service to point to a cache combining the two. - let path_info_service = if let Some(addr) = remote_path_info_service_addr { - let remote_path_info_service = tvix_store::pathinfoservice::from_addr( - &addr, - blob_service.clone(), - directory_service.clone(), - ) - .await?; - - let path_info_service = - CachePathInfoService::new(path_info_service, remote_path_info_service); + if let Some(addr) = remote_path_info_service_addr { + use tvix_store::composition::{with_registry, DeserializeWithRegistry, REG}; + use tvix_store::pathinfoservice::CachePathInfoServiceConfig; + + let remote_url = url::Url::parse(&addr)?; + let remote_config = with_registry(®, || remote_url.try_into())?; + + let local = configs.pathinfoservices.insert( + "default".into(), + DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig { + near: "local".into(), + far: "remote".into(), + })), + ); + configs + .pathinfoservices + .insert("local".into(), local.unwrap()); + configs + .pathinfoservices + .insert("remote".into(), remote_config); + } - Box::new(path_info_service) as Box<dyn PathInfoService> - } else { - path_info_service - }; + let (blob_service, directory_service, path_info_service, nar_calculation_service) = + tvix_store::utils::construct_services_from_configs(configs).await?; let mut server = Server::builder().layer( ServiceBuilder::new() @@ -257,7 +263,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync GRPCDirectoryServiceWrapper::new(directory_service), )) .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( - Arc::from(path_info_service), + path_info_service, nar_calculation_service, ))); @@ -302,8 +308,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync ) .await?; - // Arc PathInfoService and NarCalculationService, as we clone it . - let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + // Arc NarCalculationService, as we clone it . let nar_calculation_service: Arc<dyn NarCalculationService> = nar_calculation_service.into(); @@ -365,9 +370,6 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync let reference_graph: ReferenceGraph<'_> = serde_json::from_slice(reference_graph_json.as_slice())?; - // Arc the PathInfoService, as we clone it . - let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); - let lookups_span = info_span!( "lookup pathinfos", "indicatif.pb_show" = tracing::field::Empty @@ -475,7 +477,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync let fs = make_fs( blob_service, directory_service, - Arc::from(path_info_service), + path_info_service, list_root, show_xattr, ); @@ -523,7 +525,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync let fs = make_fs( blob_service, directory_service, - Arc::from(path_info_service), + path_info_service, list_root, show_xattr, ); diff --git a/tvix/store/src/composition.rs b/tvix/store/src/composition.rs new file mode 100644 index 000000000000..a32f22cf7796 --- /dev/null +++ b/tvix/store/src/composition.rs @@ -0,0 +1,22 @@ +use lazy_static::lazy_static; + +pub use tvix_castore::composition::*; + +lazy_static! { + /// The provided registry of tvix_store, which has all the builtin + /// tvix_castore (BlobStore/DirectoryStore) and tvix_store + /// (PathInfoService) implementations. + pub static ref REG: Registry = { + let mut reg = Default::default(); + add_default_services(&mut reg); + reg + }; +} + +/// Register the builtin services of tvix_castore and tvix_store with the given +/// registry. This is useful for creating your own registry with the builtin +/// types _and_ extra third party types. +pub fn add_default_services(reg: &mut Registry) { + tvix_castore::composition::add_default_services(reg); + crate::pathinfoservice::register_pathinfo_services(reg); +} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index 8c32aaf885e8..81a77cd978a2 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,3 +1,4 @@ +pub mod composition; pub mod import; pub mod nar; pub mod pathinfoservice; diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs index 26d07689d71f..15128986ff56 100644 --- a/tvix/store/src/pathinfoservice/bigtable.rs +++ b/tvix/store/src/pathinfoservice/bigtable.rs @@ -10,15 +10,17 @@ use nix_compat::nixbase32; use prost::Message; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationSeconds}; +use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, trace}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; /// There should not be more than 10 MiB in a single cell. /// https://cloud.google.com/bigtable/docs/schema-design#cells const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; -/// Provides a [DirectoryService] implementation using +/// Provides a [PathInfoService] implementation using /// [Bigtable](https://cloud.google.com/bigtable/docs/) /// as an underlying K/V store. /// @@ -44,57 +46,6 @@ pub struct BigtablePathInfoService { emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, } -/// Represents configuration of [BigtablePathInfoService]. -/// This currently conflates both connect parameters and data model/client -/// behaviour parameters. -#[serde_as] -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -pub struct BigtableParameters { - project_id: String, - instance_name: String, - #[serde(default)] - is_read_only: bool, - #[serde(default = "default_channel_size")] - channel_size: usize, - - #[serde_as(as = "Option<DurationSeconds<String>>")] - #[serde(default = "default_timeout")] - timeout: Option<std::time::Duration>, - table_name: String, - family_name: String, - - #[serde(default = "default_app_profile_id")] - app_profile_id: String, -} - -impl BigtableParameters { - #[cfg(test)] - pub fn default_for_tests() -> Self { - Self { - project_id: "project-1".into(), - instance_name: "instance-1".into(), - is_read_only: false, - channel_size: default_channel_size(), - timeout: default_timeout(), - table_name: "table-1".into(), - family_name: "cf1".into(), - app_profile_id: default_app_profile_id(), - } - } -} - -fn default_app_profile_id() -> String { - "default".to_owned() -} - -fn default_channel_size() -> usize { - 4 -} - -fn default_timeout() -> Option<std::time::Duration> { - Some(std::time::Duration::from_secs(4)) -} - impl BigtablePathInfoService { #[cfg(not(test))] pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { @@ -412,3 +363,88 @@ impl PathInfoService for BigtablePathInfoService { Box::pin(stream) } } + +/// Represents configuration of [BigtablePathInfoService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option<DurationSeconds<String>>")] + #[serde(default = "default_timeout")] + timeout: Option<std::time::Duration>, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +impl BigtableParameters { + #[cfg(test)] + pub fn default_for_tests() -> Self { + Self { + project_id: "project-1".into(), + instance_name: "instance-1".into(), + is_read_only: false, + channel_size: default_channel_size(), + timeout: default_timeout(), + table_name: "table-1".into(), + family_name: "cf1".into(), + app_profile_id: default_app_profile_id(), + } + } +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option<std::time::Duration> { + Some(std::time::Duration::from_secs(4)) +} + +#[async_trait] +impl ServiceBuilder for BigtableParameters { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> { + Ok(Arc::new( + BigtablePathInfoService::connect(self.clone()).await?, + )) + } +} + +impl TryFrom<url::Url> for BigtableParameters { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(mut url: url::Url) -> Result<Self, Self::Error> { + // parse the instance name from the hostname. + let instance_name = url + .host_str() + .ok_or_else(|| Error::StorageError("instance name missing".into()))? + .to_string(); + + // … but add it to the query string now, so we just need to parse that. + url.query_pairs_mut() + .append_pair("instance_name", &instance_name); + + let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; + + Ok(params) + } +} diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs index 664144ef494b..bb5595f72b10 100644 --- a/tvix/store/src/pathinfoservice/combinators.rs +++ b/tvix/store/src/pathinfoservice/combinators.rs @@ -1,8 +1,11 @@ +use std::sync::Arc; + use crate::proto::PathInfo; use futures::stream::BoxStream; use nix_compat::nixbase32; use tonic::async_trait; use tracing::{debug, instrument}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; use super::PathInfoService; @@ -61,6 +64,41 @@ where } } +#[derive(serde::Deserialize)] +pub struct CacheConfig { + pub near: String, + pub far: String, +} + +impl TryFrom<url::Url> for CacheConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(_url: url::Url) -> Result<Self, Self::Error> { + Err(Error::StorageError( + "Instantiating a CombinedPathInfoService from a url is not supported".into(), + ) + .into()) + } +} + +#[async_trait] +impl ServiceBuilder for CacheConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (near, far) = futures::join!( + context.resolve(self.near.clone()), + context.resolve(self.far.clone()) + ); + Ok(Arc::new(Cache { + near: near?, + far: far?, + })) + } +} + #[cfg(test)] mod test { use std::num::NonZeroUsize; diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 9173d25d05ca..5635c226c2de 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -1,13 +1,10 @@ -use crate::proto::path_info_service_client::PathInfoServiceClient; +use super::PathInfoService; -use super::{ - GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService, - SledPathInfoService, +use crate::composition::{ + with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG, }; - -use nix_compat::narinfo; use std::sync::Arc; -use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; +use tvix_castore::Error; use url::Url; /// Constructs a new instance of a [PathInfoService] from an URI. @@ -34,113 +31,21 @@ use url::Url; /// these also need to be passed in. pub async fn from_addr( uri: &str, - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, -) -> Result<Box<dyn PathInfoService>, Error> { + context: Option<&CompositionContext<'_>>, +) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> { #[allow(unused_mut)] let mut url = Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; - let path_info_service: Box<dyn PathInfoService> = match url.scheme() { - "memory" => { - // memory doesn't support host or path in the URL. - if url.has_host() || !url.path().is_empty() { - return Err(Error::StorageError("invalid url".to_string())); - } - Box::<MemoryPathInfoService>::default() - } - "sled" => { - // sled doesn't support host, and a path can be provided (otherwise - // it'll live in memory only). - if url.has_host() { - return Err(Error::StorageError("no host allowed".to_string())); - } - - if url.path() == "/" { - return Err(Error::StorageError( - "cowardly refusing to open / with sled".to_string(), - )); - } - - // TODO: expose other parameters as URL parameters? - - Box::new(if url.path().is_empty() { - SledPathInfoService::new_temporary() - .map_err(|e| Error::StorageError(e.to_string()))? - } else { - SledPathInfoService::new(url.path()) - .map_err(|e| Error::StorageError(e.to_string()))? - }) - } - "nix+http" | "nix+https" => { - // Stringify the URL and remove the nix+ prefix. - // We can't use `url.set_scheme(rest)`, as it disallows - // setting something http(s) that previously wasn't. - let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap(); - - let mut nix_http_path_info_service = - NixHTTPPathInfoService::new(new_url, blob_service, directory_service); - - let pairs = &url.query_pairs(); - for (k, v) in pairs.into_iter() { - if k == "trusted-public-keys" { - let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect(); - - let mut pubkeys: Vec<narinfo::PubKey> = Vec::with_capacity(pubkey_strs.len()); - for pubkey_str in pubkey_strs { - pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| { - Error::StorageError(format!("invalid public key: {e}")) - })?); - } - - nix_http_path_info_service.set_public_keys(pubkeys); - } - } - - Box::new(nix_http_path_info_service) - } - scheme if scheme.starts_with("grpc+") => { - // schemes starting with grpc+ go to the GRPCPathInfoService. - // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. - // - In the case of unix sockets, there must be a path, but may not be a host. - // - In the case of non-unix sockets, there must be a host, but no path. - // Constructing the channel is handled by tvix_castore::channel::from_url. - Box::new(GRPCPathInfoService::from_client( - PathInfoServiceClient::with_interceptor( - tvix_castore::tonic::channel_from_url(&url).await?, - tvix_tracing::propagate::tonic::send_trace, - ), - )) - } - #[cfg(feature = "cloud")] - "bigtable" => { - use super::bigtable::BigtableParameters; - use super::BigtablePathInfoService; - - // parse the instance name from the hostname. - let instance_name = url - .host_str() - .ok_or_else(|| Error::StorageError("instance name missing".into()))? - .to_string(); - - // … but add it to the query string now, so we just need to parse that. - url.query_pairs_mut() - .append_pair("instance_name", &instance_name); - - let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) - .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; - - Box::new( - BigtablePathInfoService::connect(params) - .await - .map_err(|e| Error::StorageError(e.to_string()))?, - ) - } - _ => Err(Error::StorageError(format!( - "unknown scheme: {}", - url.scheme() - )))?, - }; + let path_info_service_config = with_registry(®, || { + <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>>::try_from( + url, + ) + })? + .0; + let path_info_service = path_info_service_config + .build("anonymous", context.unwrap_or(&CompositionContext::blank())) + .await?; Ok(path_info_service) } @@ -148,14 +53,12 @@ pub async fn from_addr( #[cfg(test)] mod tests { use super::from_addr; + use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder}; use lazy_static::lazy_static; use rstest::rstest; - use std::sync::Arc; use tempfile::TempDir; - use tvix_castore::{ - blobservice::{BlobService, MemoryBlobService}, - directoryservice::{DirectoryService, MemoryDirectoryService}, - }; + use tvix_castore::blobservice::{BlobService, MemoryBlobServiceConfig}; + use tvix_castore::directoryservice::{DirectoryService, MemoryDirectoryServiceConfig}; lazy_static! { static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); @@ -224,11 +127,19 @@ mod tests { )] #[tokio::test] async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { - let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default()); - let directory_service: Arc<dyn DirectoryService> = - Arc::from(MemoryDirectoryService::default()); - - let resp = from_addr(uri_str, blob_service, directory_service).await; + let mut comp = Composition::default(); + comp.extend(vec![( + "default".into(), + DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {}) + as Box<dyn ServiceBuilder<Output = dyn BlobService>>), + )]); + comp.extend(vec![( + "default".into(), + DeserializeWithRegistry(Box::new(MemoryDirectoryServiceConfig {}) + as Box<dyn ServiceBuilder<Output = dyn DirectoryService>>), + )]); + + let resp = from_addr(uri_str, Some(&comp.context())).await; if exp_succeed { resp.expect("should succeed"); diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index bcee49aac6cf..2ac0e43303cb 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -6,9 +6,11 @@ use crate::{ use async_stream::try_stream; use futures::stream::BoxStream; use nix_compat::nixbase32; +use std::sync::Arc; use tonic::{async_trait, Code}; use tracing::{instrument, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::{proto as castorepb, Error}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. @@ -149,6 +151,40 @@ where } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCPathInfoServiceConfig { + url: String, +} + +impl TryFrom<url::Url> for GRPCPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + Ok(GRPCPathInfoServiceConfig { + url: url.to_string(), + }) + } +} + +#[async_trait] +impl ServiceBuilder for GRPCPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let client = proto::path_info_service_client::PathInfoServiceClient::new( + tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCPathInfoService::from_client(client))) + } +} + #[cfg(test)] mod tests { use crate::pathinfoservice::tests::make_grpc_path_info_service_client; diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs index da674f497ad6..39c592bc96fb 100644 --- a/tvix/store/src/pathinfoservice/lru.rs +++ b/tvix/store/src/pathinfoservice/lru.rs @@ -9,6 +9,7 @@ use tonic::async_trait; use tracing::instrument; use crate::proto::PathInfo; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; use super::PathInfoService; @@ -60,6 +61,34 @@ impl PathInfoService for LruPathInfoService { } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct LruPathInfoServiceConfig { + capacity: NonZeroUsize, +} + +impl TryFrom<url::Url> for LruPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(_url: url::Url) -> Result<Self, Self::Error> { + Err(Error::StorageError( + "Instantiating a LruPathInfoService from a url is not supported".into(), + ) + .into()) + } +} + +#[async_trait] +impl ServiceBuilder for LruPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + Ok(Arc::new(LruPathInfoService::with_capacity(self.capacity))) + } +} + #[cfg(test)] mod test { use std::num::NonZeroUsize; diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index 3de3221df27e..3fabd239c7b1 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -7,6 +7,7 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use tonic::async_trait; use tracing::instrument; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; #[derive(Default)] @@ -59,3 +60,30 @@ impl PathInfoService for MemoryPathInfoService { }) } } + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MemoryPathInfoServiceConfig {} + +impl TryFrom<url::Url> for MemoryPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string()).into()); + } + Ok(MemoryPathInfoServiceConfig {}) + } +} + +#[async_trait] +impl ServiceBuilder for MemoryPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + Ok(Arc::new(MemoryPathInfoService::default())) + } +} diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index 574bcc0b8b88..70f752f22916 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -14,22 +14,26 @@ mod tests; use futures::stream::BoxStream; use tonic::async_trait; +use tvix_castore::composition::{Registry, ServiceBuilder}; use tvix_castore::Error; +use crate::nar::NarCalculationService; use crate::proto::PathInfo; -pub use self::combinators::Cache as CachePathInfoService; +pub use self::combinators::{ + Cache as CachePathInfoService, CacheConfig as CachePathInfoServiceConfig, +}; pub use self::from_addr::from_addr; -pub use self::grpc::GRPCPathInfoService; -pub use self::lru::LruPathInfoService; -pub use self::memory::MemoryPathInfoService; -pub use self::nix_http::NixHTTPPathInfoService; -pub use self::sled::SledPathInfoService; +pub use self::grpc::{GRPCPathInfoService, GRPCPathInfoServiceConfig}; +pub use self::lru::{LruPathInfoService, LruPathInfoServiceConfig}; +pub use self::memory::{MemoryPathInfoService, MemoryPathInfoServiceConfig}; +pub use self::nix_http::{NixHTTPPathInfoService, NixHTTPPathInfoServiceConfig}; +pub use self::sled::{SledPathInfoService, SledPathInfoServiceConfig}; #[cfg(feature = "cloud")] mod bigtable; #[cfg(feature = "cloud")] -pub use self::bigtable::BigtablePathInfoService; +pub use self::bigtable::{BigtableParameters, BigtablePathInfoService}; #[cfg(any(feature = "fuse", feature = "virtiofs"))] pub use self::fs::make_fs; @@ -52,12 +56,16 @@ pub trait PathInfoService: Send + Sync { /// Rust doesn't support this as a generic in traits yet. This is the same thing that /// [async_trait] generates, but for streams instead of futures. fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>; + + fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> { + None + } } #[async_trait] impl<A> PathInfoService for A where - A: AsRef<dyn PathInfoService> + Send + Sync, + A: AsRef<dyn PathInfoService> + Send + Sync + 'static, { async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { self.as_ref().get(digest).await @@ -71,3 +79,19 @@ where self.as_ref().list() } } + +/// Registers the builtin PathInfoService implementations with the registry +pub(crate) fn register_pathinfo_services(reg: &mut Registry) { + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, CachePathInfoServiceConfig>("cache"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, GRPCPathInfoServiceConfig>("grpc"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, LruPathInfoServiceConfig>("lru"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, MemoryPathInfoServiceConfig>("memory"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, NixHTTPPathInfoServiceConfig>("nix"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, SledPathInfoServiceConfig>("sled"); + #[cfg(feature = "cloud")] + { + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, BigtableParameters>( + "bigtable", + ); + } +} diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index 57fe37f44e88..af9234bc0337 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -7,12 +7,15 @@ use nix_compat::{ nixhash::NixHash, }; use reqwest::StatusCode; +use std::sync::Arc; use tokio::io::{self, AsyncRead}; use tonic::async_trait; use tracing::{debug, instrument, warn}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, }; +use url::Url; /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix @@ -249,3 +252,71 @@ where })) } } + +#[derive(serde::Deserialize)] +pub struct NixHTTPPathInfoServiceConfig { + base_url: String, + blob_service: String, + directory_service: String, + #[serde(default)] + /// An optional list of [narinfo::PubKey]. + /// If set, the .narinfo files received need to have correct signature by at least one of these. + public_keys: Option<Vec<String>>, +} + +impl TryFrom<Url> for NixHTTPPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: Url) -> Result<Self, Self::Error> { + let mut public_keys: Option<Vec<String>> = None; + for (_, v) in url + .query_pairs() + .into_iter() + .filter(|(k, _)| k == "trusted-public-keys") + { + public_keys + .get_or_insert(Default::default()) + .extend(v.split_ascii_whitespace().map(ToString::to_string)); + } + Ok(NixHTTPPathInfoServiceConfig { + // Stringify the URL and remove the nix+ prefix. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(), + blob_service: "default".to_string(), + directory_service: "default".to_string(), + public_keys, + }) + } +} + +#[async_trait] +impl ServiceBuilder for NixHTTPPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (blob_service, directory_service) = futures::join!( + context.resolve(self.blob_service.clone()), + context.resolve(self.directory_service.clone()) + ); + let mut svc = NixHTTPPathInfoService::new( + Url::parse(&self.base_url)?, + blob_service?, + directory_service?, + ); + if let Some(public_keys) = &self.public_keys { + svc.set_public_keys( + public_keys + .iter() + .map(|pubkey_str| { + narinfo::PubKey::parse(pubkey_str) + .map_err(|e| Error::StorageError(format!("invalid public key: {e}"))) + }) + .collect::<Result<Vec<_>, Error>>()?, + ); + } + Ok(Arc::new(svc)) + } +} diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 96ade181694c..4255bfd1d4d4 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -5,8 +5,10 @@ use futures::stream::BoxStream; use nix_compat::nixbase32; use prost::Message; use std::path::Path; +use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, warn}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). @@ -114,3 +116,69 @@ impl PathInfoService for SledPathInfoService { }) } } + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct SledPathInfoServiceConfig { + is_temporary: bool, + #[serde(default)] + /// required when is_temporary = false + path: Option<String>, +} + +impl TryFrom<url::Url> for SledPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // sled doesn't support host, and a path can be provided (otherwise + // it'll live in memory only). + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string()).into()); + } + + // TODO: expose compression and other parameters as URL parameters? + + Ok(if url.path().is_empty() { + SledPathInfoServiceConfig { + is_temporary: true, + path: None, + } + } else { + SledPathInfoServiceConfig { + is_temporary: false, + path: Some(url.path().to_string()), + } + }) + } +} + +#[async_trait] +impl ServiceBuilder for SledPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + match self { + SledPathInfoServiceConfig { + is_temporary: true, + path: None, + } => Ok(Arc::new(SledPathInfoService::new_temporary()?)), + SledPathInfoServiceConfig { + is_temporary: true, + path: Some(_), + } => Err( + Error::StorageError("Temporary SledPathInfoService can not have path".into()) + .into(), + ), + SledPathInfoServiceConfig { + is_temporary: false, + path: None, + } => Err(Error::StorageError("SledPathInfoService is missing path".into()).into()), + SledPathInfoServiceConfig { + is_temporary: false, + path: Some(path), + } => Ok(Arc::new(SledPathInfoService::new(path)?)), + } + } +} diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs index d82f2214f050..a09786386eba 100644 --- a/tvix/store/src/utils.rs +++ b/tvix/store/src/utils.rs @@ -1,18 +1,60 @@ -use std::sync::Arc; use std::{ + collections::HashMap, pin::Pin, + sync::Arc, task::{self, Poll}, }; use tokio::io::{self, AsyncWrite}; -use tvix_castore::{ - blobservice::{self, BlobService}, - directoryservice::{self, DirectoryService}, -}; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; use url::Url; +use crate::composition::{ + with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG, +}; use crate::nar::{NarCalculationService, SimpleRenderer}; -use crate::pathinfoservice::{self, PathInfoService}; +use crate::pathinfoservice::PathInfoService; + +#[derive(serde::Deserialize, Default)] +pub struct CompositionConfigs { + pub blobservices: + HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>, + pub directoryservices: HashMap< + String, + DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>>, + >, + pub pathinfoservices: HashMap< + String, + DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>, + >, +} + +pub fn addrs_to_configs( + blob_service_addr: impl AsRef<str>, + directory_service_addr: impl AsRef<str>, + path_info_service_addr: impl AsRef<str>, +) -> Result<CompositionConfigs, Box<dyn std::error::Error + Send + Sync>> { + let mut configs: CompositionConfigs = Default::default(); + + let blob_service_url = Url::parse(blob_service_addr.as_ref())?; + let directory_service_url = Url::parse(directory_service_addr.as_ref())?; + let path_info_service_url = Url::parse(path_info_service_addr.as_ref())?; + + configs.blobservices.insert( + "default".into(), + with_registry(®, || blob_service_url.try_into())?, + ); + configs.directoryservices.insert( + "default".into(), + with_registry(®, || directory_service_url.try_into())?, + ); + configs.pathinfoservices.insert( + "default".into(), + with_registry(®, || path_info_service_url.try_into())?, + ); + + Ok(configs) +} /// Construct the store handles from their addrs. pub async fn construct_services( @@ -23,49 +65,52 @@ pub async fn construct_services( ( Arc<dyn BlobService>, Arc<dyn DirectoryService>, - Box<dyn PathInfoService>, + Arc<dyn PathInfoService>, Box<dyn NarCalculationService>, ), Box<dyn std::error::Error + Send + Sync>, > { - let blob_service: Arc<dyn BlobService> = - blobservice::from_addr(blob_service_addr.as_ref()).await?; - let directory_service: Arc<dyn DirectoryService> = - directoryservice::from_addr(directory_service_addr.as_ref()).await?; - - let path_info_service = pathinfoservice::from_addr( - path_info_service_addr.as_ref(), - blob_service.clone(), - directory_service.clone(), - ) - .await?; + let configs = addrs_to_configs( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + )?; + construct_services_from_configs(configs).await +} + +/// Construct the store handles from their addrs. +pub async fn construct_services_from_configs( + configs: CompositionConfigs, +) -> Result< + ( + Arc<dyn BlobService>, + Arc<dyn DirectoryService>, + Arc<dyn PathInfoService>, + Box<dyn NarCalculationService>, + ), + Box<dyn std::error::Error + Send + Sync>, +> { + let mut comp = Composition::default(); + + comp.extend(configs.blobservices); + comp.extend(configs.directoryservices); + comp.extend(configs.pathinfoservices); + + let blob_service: Arc<dyn BlobService> = comp.build("default").await?; + let directory_service: Arc<dyn DirectoryService> = comp.build("default").await?; + let path_info_service: Arc<dyn PathInfoService> = comp.build("default").await?; // HACK: The grpc client also implements NarCalculationService, and we // really want to use it (otherwise we'd need to fetch everything again for hashing). // Until we revamped store composition and config, detect this special case here. - let nar_calculation_service: Box<dyn NarCalculationService> = { - use crate::pathinfoservice::GRPCPathInfoService; - use crate::proto::path_info_service_client::PathInfoServiceClient; - - let url = Url::parse(path_info_service_addr.as_ref()) - .map_err(|e| io::Error::other(e.to_string()))?; - - if url.scheme().starts_with("grpc+") { - Box::new(GRPCPathInfoService::from_client( - PathInfoServiceClient::with_interceptor( - tvix_castore::tonic::channel_from_url(&url) - .await - .map_err(|e| io::Error::other(e.to_string()))?, - tvix_tracing::propagate::tonic::send_trace, - ), - )) - } else { + let nar_calculation_service: Box<dyn NarCalculationService> = path_info_service + .nar_calculation_service() + .unwrap_or_else(|| { Box::new(SimpleRenderer::new( blob_service.clone(), directory_service.clone(), - )) as Box<dyn NarCalculationService> - } - }; + )) + }); Ok(( blob_service, |