diff options
Diffstat (limited to 'tvix/store/src/pathinfoservice')
-rw-r--r-- | tvix/store/src/pathinfoservice/bigtable.rs | 140 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/combinators.rs | 38 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/from_addr.rs | 151 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 36 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/lru.rs | 29 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 28 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 40 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/nix_http.rs | 71 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 68 |
9 files changed, 421 insertions, 180 deletions
diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs index 26d07689d71f..15128986ff56 100644 --- a/tvix/store/src/pathinfoservice/bigtable.rs +++ b/tvix/store/src/pathinfoservice/bigtable.rs @@ -10,15 +10,17 @@ use nix_compat::nixbase32; use prost::Message; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationSeconds}; +use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, trace}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; /// There should not be more than 10 MiB in a single cell. /// https://cloud.google.com/bigtable/docs/schema-design#cells const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; -/// Provides a [DirectoryService] implementation using +/// Provides a [PathInfoService] implementation using /// [Bigtable](https://cloud.google.com/bigtable/docs/) /// as an underlying K/V store. /// @@ -44,57 +46,6 @@ pub struct BigtablePathInfoService { emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, } -/// Represents configuration of [BigtablePathInfoService]. -/// This currently conflates both connect parameters and data model/client -/// behaviour parameters. -#[serde_as] -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -pub struct BigtableParameters { - project_id: String, - instance_name: String, - #[serde(default)] - is_read_only: bool, - #[serde(default = "default_channel_size")] - channel_size: usize, - - #[serde_as(as = "Option<DurationSeconds<String>>")] - #[serde(default = "default_timeout")] - timeout: Option<std::time::Duration>, - table_name: String, - family_name: String, - - #[serde(default = "default_app_profile_id")] - app_profile_id: String, -} - -impl BigtableParameters { - #[cfg(test)] - pub fn default_for_tests() -> Self { - Self { - project_id: "project-1".into(), - instance_name: "instance-1".into(), - is_read_only: false, - channel_size: default_channel_size(), - timeout: default_timeout(), - table_name: "table-1".into(), - family_name: "cf1".into(), - app_profile_id: default_app_profile_id(), - } - } -} - -fn default_app_profile_id() -> String { - "default".to_owned() -} - -fn default_channel_size() -> usize { - 4 -} - -fn default_timeout() -> Option<std::time::Duration> { - Some(std::time::Duration::from_secs(4)) -} - impl BigtablePathInfoService { #[cfg(not(test))] pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { @@ -412,3 +363,88 @@ impl PathInfoService for BigtablePathInfoService { Box::pin(stream) } } + +/// Represents configuration of [BigtablePathInfoService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option<DurationSeconds<String>>")] + #[serde(default = "default_timeout")] + timeout: Option<std::time::Duration>, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +impl BigtableParameters { + #[cfg(test)] + pub fn default_for_tests() -> Self { + Self { + project_id: "project-1".into(), + instance_name: "instance-1".into(), + is_read_only: false, + channel_size: default_channel_size(), + timeout: default_timeout(), + table_name: "table-1".into(), + family_name: "cf1".into(), + app_profile_id: default_app_profile_id(), + } + } +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option<std::time::Duration> { + Some(std::time::Duration::from_secs(4)) +} + +#[async_trait] +impl ServiceBuilder for BigtableParameters { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> { + Ok(Arc::new( + BigtablePathInfoService::connect(self.clone()).await?, + )) + } +} + +impl TryFrom<url::Url> for BigtableParameters { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(mut url: url::Url) -> Result<Self, Self::Error> { + // parse the instance name from the hostname. + let instance_name = url + .host_str() + .ok_or_else(|| Error::StorageError("instance name missing".into()))? + .to_string(); + + // … but add it to the query string now, so we just need to parse that. + url.query_pairs_mut() + .append_pair("instance_name", &instance_name); + + let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; + + Ok(params) + } +} diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs index 664144ef494b..bb5595f72b10 100644 --- a/tvix/store/src/pathinfoservice/combinators.rs +++ b/tvix/store/src/pathinfoservice/combinators.rs @@ -1,8 +1,11 @@ +use std::sync::Arc; + use crate::proto::PathInfo; use futures::stream::BoxStream; use nix_compat::nixbase32; use tonic::async_trait; use tracing::{debug, instrument}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; use super::PathInfoService; @@ -61,6 +64,41 @@ where } } +#[derive(serde::Deserialize)] +pub struct CacheConfig { + pub near: String, + pub far: String, +} + +impl TryFrom<url::Url> for CacheConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(_url: url::Url) -> Result<Self, Self::Error> { + Err(Error::StorageError( + "Instantiating a CombinedPathInfoService from a url is not supported".into(), + ) + .into()) + } +} + +#[async_trait] +impl ServiceBuilder for CacheConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (near, far) = futures::join!( + context.resolve(self.near.clone()), + context.resolve(self.far.clone()) + ); + Ok(Arc::new(Cache { + near: near?, + far: far?, + })) + } +} + #[cfg(test)] mod test { use std::num::NonZeroUsize; diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 9173d25d05ca..5635c226c2de 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -1,13 +1,10 @@ -use crate::proto::path_info_service_client::PathInfoServiceClient; +use super::PathInfoService; -use super::{ - GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService, - SledPathInfoService, +use crate::composition::{ + with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG, }; - -use nix_compat::narinfo; use std::sync::Arc; -use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; +use tvix_castore::Error; use url::Url; /// Constructs a new instance of a [PathInfoService] from an URI. @@ -34,113 +31,21 @@ use url::Url; /// these also need to be passed in. pub async fn from_addr( uri: &str, - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, -) -> Result<Box<dyn PathInfoService>, Error> { + context: Option<&CompositionContext<'_>>, +) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> { #[allow(unused_mut)] let mut url = Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; - let path_info_service: Box<dyn PathInfoService> = match url.scheme() { - "memory" => { - // memory doesn't support host or path in the URL. - if url.has_host() || !url.path().is_empty() { - return Err(Error::StorageError("invalid url".to_string())); - } - Box::<MemoryPathInfoService>::default() - } - "sled" => { - // sled doesn't support host, and a path can be provided (otherwise - // it'll live in memory only). - if url.has_host() { - return Err(Error::StorageError("no host allowed".to_string())); - } - - if url.path() == "/" { - return Err(Error::StorageError( - "cowardly refusing to open / with sled".to_string(), - )); - } - - // TODO: expose other parameters as URL parameters? - - Box::new(if url.path().is_empty() { - SledPathInfoService::new_temporary() - .map_err(|e| Error::StorageError(e.to_string()))? - } else { - SledPathInfoService::new(url.path()) - .map_err(|e| Error::StorageError(e.to_string()))? - }) - } - "nix+http" | "nix+https" => { - // Stringify the URL and remove the nix+ prefix. - // We can't use `url.set_scheme(rest)`, as it disallows - // setting something http(s) that previously wasn't. - let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap(); - - let mut nix_http_path_info_service = - NixHTTPPathInfoService::new(new_url, blob_service, directory_service); - - let pairs = &url.query_pairs(); - for (k, v) in pairs.into_iter() { - if k == "trusted-public-keys" { - let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect(); - - let mut pubkeys: Vec<narinfo::PubKey> = Vec::with_capacity(pubkey_strs.len()); - for pubkey_str in pubkey_strs { - pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| { - Error::StorageError(format!("invalid public key: {e}")) - })?); - } - - nix_http_path_info_service.set_public_keys(pubkeys); - } - } - - Box::new(nix_http_path_info_service) - } - scheme if scheme.starts_with("grpc+") => { - // schemes starting with grpc+ go to the GRPCPathInfoService. - // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. - // - In the case of unix sockets, there must be a path, but may not be a host. - // - In the case of non-unix sockets, there must be a host, but no path. - // Constructing the channel is handled by tvix_castore::channel::from_url. - Box::new(GRPCPathInfoService::from_client( - PathInfoServiceClient::with_interceptor( - tvix_castore::tonic::channel_from_url(&url).await?, - tvix_tracing::propagate::tonic::send_trace, - ), - )) - } - #[cfg(feature = "cloud")] - "bigtable" => { - use super::bigtable::BigtableParameters; - use super::BigtablePathInfoService; - - // parse the instance name from the hostname. - let instance_name = url - .host_str() - .ok_or_else(|| Error::StorageError("instance name missing".into()))? - .to_string(); - - // … but add it to the query string now, so we just need to parse that. - url.query_pairs_mut() - .append_pair("instance_name", &instance_name); - - let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) - .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; - - Box::new( - BigtablePathInfoService::connect(params) - .await - .map_err(|e| Error::StorageError(e.to_string()))?, - ) - } - _ => Err(Error::StorageError(format!( - "unknown scheme: {}", - url.scheme() - )))?, - }; + let path_info_service_config = with_registry(®, || { + <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>>::try_from( + url, + ) + })? + .0; + let path_info_service = path_info_service_config + .build("anonymous", context.unwrap_or(&CompositionContext::blank())) + .await?; Ok(path_info_service) } @@ -148,14 +53,12 @@ pub async fn from_addr( #[cfg(test)] mod tests { use super::from_addr; + use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder}; use lazy_static::lazy_static; use rstest::rstest; - use std::sync::Arc; use tempfile::TempDir; - use tvix_castore::{ - blobservice::{BlobService, MemoryBlobService}, - directoryservice::{DirectoryService, MemoryDirectoryService}, - }; + use tvix_castore::blobservice::{BlobService, MemoryBlobServiceConfig}; + use tvix_castore::directoryservice::{DirectoryService, MemoryDirectoryServiceConfig}; lazy_static! { static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); @@ -224,11 +127,19 @@ mod tests { )] #[tokio::test] async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { - let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default()); - let directory_service: Arc<dyn DirectoryService> = - Arc::from(MemoryDirectoryService::default()); - - let resp = from_addr(uri_str, blob_service, directory_service).await; + let mut comp = Composition::default(); + comp.extend(vec![( + "default".into(), + DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {}) + as Box<dyn ServiceBuilder<Output = dyn BlobService>>), + )]); + comp.extend(vec![( + "default".into(), + DeserializeWithRegistry(Box::new(MemoryDirectoryServiceConfig {}) + as Box<dyn ServiceBuilder<Output = dyn DirectoryService>>), + )]); + + let resp = from_addr(uri_str, Some(&comp.context())).await; if exp_succeed { resp.expect("should succeed"); diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index bcee49aac6cf..2ac0e43303cb 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -6,9 +6,11 @@ use crate::{ use async_stream::try_stream; use futures::stream::BoxStream; use nix_compat::nixbase32; +use std::sync::Arc; use tonic::{async_trait, Code}; use tracing::{instrument, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::{proto as castorepb, Error}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. @@ -149,6 +151,40 @@ where } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCPathInfoServiceConfig { + url: String, +} + +impl TryFrom<url::Url> for GRPCPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + // - In the case of unix sockets, there must be a path, but may not be a host. + // - In the case of non-unix sockets, there must be a host, but no path. + // Constructing the channel is handled by tvix_castore::channel::from_url. + Ok(GRPCPathInfoServiceConfig { + url: url.to_string(), + }) + } +} + +#[async_trait] +impl ServiceBuilder for GRPCPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let client = proto::path_info_service_client::PathInfoServiceClient::new( + tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCPathInfoService::from_client(client))) + } +} + #[cfg(test)] mod tests { use crate::pathinfoservice::tests::make_grpc_path_info_service_client; diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs index da674f497ad6..39c592bc96fb 100644 --- a/tvix/store/src/pathinfoservice/lru.rs +++ b/tvix/store/src/pathinfoservice/lru.rs @@ -9,6 +9,7 @@ use tonic::async_trait; use tracing::instrument; use crate::proto::PathInfo; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; use super::PathInfoService; @@ -60,6 +61,34 @@ impl PathInfoService for LruPathInfoService { } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct LruPathInfoServiceConfig { + capacity: NonZeroUsize, +} + +impl TryFrom<url::Url> for LruPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(_url: url::Url) -> Result<Self, Self::Error> { + Err(Error::StorageError( + "Instantiating a LruPathInfoService from a url is not supported".into(), + ) + .into()) + } +} + +#[async_trait] +impl ServiceBuilder for LruPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + Ok(Arc::new(LruPathInfoService::with_capacity(self.capacity))) + } +} + #[cfg(test)] mod test { use std::num::NonZeroUsize; diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index 3de3221df27e..3fabd239c7b1 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -7,6 +7,7 @@ use std::{collections::HashMap, sync::Arc}; use tokio::sync::RwLock; use tonic::async_trait; use tracing::instrument; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; #[derive(Default)] @@ -59,3 +60,30 @@ impl PathInfoService for MemoryPathInfoService { }) } } + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MemoryPathInfoServiceConfig {} + +impl TryFrom<url::Url> for MemoryPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // memory doesn't support host or path in the URL. + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string()).into()); + } + Ok(MemoryPathInfoServiceConfig {}) + } +} + +#[async_trait] +impl ServiceBuilder for MemoryPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + Ok(Arc::new(MemoryPathInfoService::default())) + } +} diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index 574bcc0b8b88..70f752f22916 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -14,22 +14,26 @@ mod tests; use futures::stream::BoxStream; use tonic::async_trait; +use tvix_castore::composition::{Registry, ServiceBuilder}; use tvix_castore::Error; +use crate::nar::NarCalculationService; use crate::proto::PathInfo; -pub use self::combinators::Cache as CachePathInfoService; +pub use self::combinators::{ + Cache as CachePathInfoService, CacheConfig as CachePathInfoServiceConfig, +}; pub use self::from_addr::from_addr; -pub use self::grpc::GRPCPathInfoService; -pub use self::lru::LruPathInfoService; -pub use self::memory::MemoryPathInfoService; -pub use self::nix_http::NixHTTPPathInfoService; -pub use self::sled::SledPathInfoService; +pub use self::grpc::{GRPCPathInfoService, GRPCPathInfoServiceConfig}; +pub use self::lru::{LruPathInfoService, LruPathInfoServiceConfig}; +pub use self::memory::{MemoryPathInfoService, MemoryPathInfoServiceConfig}; +pub use self::nix_http::{NixHTTPPathInfoService, NixHTTPPathInfoServiceConfig}; +pub use self::sled::{SledPathInfoService, SledPathInfoServiceConfig}; #[cfg(feature = "cloud")] mod bigtable; #[cfg(feature = "cloud")] -pub use self::bigtable::BigtablePathInfoService; +pub use self::bigtable::{BigtableParameters, BigtablePathInfoService}; #[cfg(any(feature = "fuse", feature = "virtiofs"))] pub use self::fs::make_fs; @@ -52,12 +56,16 @@ pub trait PathInfoService: Send + Sync { /// Rust doesn't support this as a generic in traits yet. This is the same thing that /// [async_trait] generates, but for streams instead of futures. fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>; + + fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> { + None + } } #[async_trait] impl<A> PathInfoService for A where - A: AsRef<dyn PathInfoService> + Send + Sync, + A: AsRef<dyn PathInfoService> + Send + Sync + 'static, { async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { self.as_ref().get(digest).await @@ -71,3 +79,19 @@ where self.as_ref().list() } } + +/// Registers the builtin PathInfoService implementations with the registry +pub(crate) fn register_pathinfo_services(reg: &mut Registry) { + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, CachePathInfoServiceConfig>("cache"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, GRPCPathInfoServiceConfig>("grpc"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, LruPathInfoServiceConfig>("lru"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, MemoryPathInfoServiceConfig>("memory"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, NixHTTPPathInfoServiceConfig>("nix"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, SledPathInfoServiceConfig>("sled"); + #[cfg(feature = "cloud")] + { + reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, BigtableParameters>( + "bigtable", + ); + } +} diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index 57fe37f44e88..af9234bc0337 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -7,12 +7,15 @@ use nix_compat::{ nixhash::NixHash, }; use reqwest::StatusCode; +use std::sync::Arc; use tokio::io::{self, AsyncRead}; use tonic::async_trait; use tracing::{debug, instrument, warn}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::{ blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, }; +use url::Url; /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix @@ -249,3 +252,71 @@ where })) } } + +#[derive(serde::Deserialize)] +pub struct NixHTTPPathInfoServiceConfig { + base_url: String, + blob_service: String, + directory_service: String, + #[serde(default)] + /// An optional list of [narinfo::PubKey]. + /// If set, the .narinfo files received need to have correct signature by at least one of these. + public_keys: Option<Vec<String>>, +} + +impl TryFrom<Url> for NixHTTPPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: Url) -> Result<Self, Self::Error> { + let mut public_keys: Option<Vec<String>> = None; + for (_, v) in url + .query_pairs() + .into_iter() + .filter(|(k, _)| k == "trusted-public-keys") + { + public_keys + .get_or_insert(Default::default()) + .extend(v.split_ascii_whitespace().map(ToString::to_string)); + } + Ok(NixHTTPPathInfoServiceConfig { + // Stringify the URL and remove the nix+ prefix. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(), + blob_service: "default".to_string(), + directory_service: "default".to_string(), + public_keys, + }) + } +} + +#[async_trait] +impl ServiceBuilder for NixHTTPPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (blob_service, directory_service) = futures::join!( + context.resolve(self.blob_service.clone()), + context.resolve(self.directory_service.clone()) + ); + let mut svc = NixHTTPPathInfoService::new( + Url::parse(&self.base_url)?, + blob_service?, + directory_service?, + ); + if let Some(public_keys) = &self.public_keys { + svc.set_public_keys( + public_keys + .iter() + .map(|pubkey_str| { + narinfo::PubKey::parse(pubkey_str) + .map_err(|e| Error::StorageError(format!("invalid public key: {e}"))) + }) + .collect::<Result<Vec<_>, Error>>()?, + ); + } + Ok(Arc::new(svc)) + } +} diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 96ade181694c..4255bfd1d4d4 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -5,8 +5,10 @@ use futures::stream::BoxStream; use nix_compat::nixbase32; use prost::Message; use std::path::Path; +use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, warn}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). @@ -114,3 +116,69 @@ impl PathInfoService for SledPathInfoService { }) } } + +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct SledPathInfoServiceConfig { + is_temporary: bool, + #[serde(default)] + /// required when is_temporary = false + path: Option<String>, +} + +impl TryFrom<url::Url> for SledPathInfoServiceConfig { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(url: url::Url) -> Result<Self, Self::Error> { + // sled doesn't support host, and a path can be provided (otherwise + // it'll live in memory only). + if url.has_host() { + return Err(Error::StorageError("no host allowed".to_string()).into()); + } + + // TODO: expose compression and other parameters as URL parameters? + + Ok(if url.path().is_empty() { + SledPathInfoServiceConfig { + is_temporary: true, + path: None, + } + } else { + SledPathInfoServiceConfig { + is_temporary: false, + path: Some(url.path().to_string()), + } + }) + } +} + +#[async_trait] +impl ServiceBuilder for SledPathInfoServiceConfig { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + match self { + SledPathInfoServiceConfig { + is_temporary: true, + path: None, + } => Ok(Arc::new(SledPathInfoService::new_temporary()?)), + SledPathInfoServiceConfig { + is_temporary: true, + path: Some(_), + } => Err( + Error::StorageError("Temporary SledPathInfoService can not have path".into()) + .into(), + ), + SledPathInfoServiceConfig { + is_temporary: false, + path: None, + } => Err(Error::StorageError("SledPathInfoService is missing path".into()).into()), + SledPathInfoServiceConfig { + is_temporary: false, + path: Some(path), + } => Ok(Arc::new(SledPathInfoService::new(path)?)), + } + } +} |