diff options
Diffstat (limited to 'tvix/store/src/pathinfoservice/bigtable.rs')
-rw-r--r-- | tvix/store/src/pathinfoservice/bigtable.rs | 140 |
1 files changed, 88 insertions, 52 deletions
diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs index 26d07689d71f..15128986ff56 100644 --- a/tvix/store/src/pathinfoservice/bigtable.rs +++ b/tvix/store/src/pathinfoservice/bigtable.rs @@ -10,15 +10,17 @@ use nix_compat::nixbase32; use prost::Message; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationSeconds}; +use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, trace}; +use tvix_castore::composition::{CompositionContext, ServiceBuilder}; use tvix_castore::Error; /// There should not be more than 10 MiB in a single cell. /// https://cloud.google.com/bigtable/docs/schema-design#cells const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024; -/// Provides a [DirectoryService] implementation using +/// Provides a [PathInfoService] implementation using /// [Bigtable](https://cloud.google.com/bigtable/docs/) /// as an underlying K/V store. /// @@ -44,57 +46,6 @@ pub struct BigtablePathInfoService { emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, } -/// Represents configuration of [BigtablePathInfoService]. -/// This currently conflates both connect parameters and data model/client -/// behaviour parameters. -#[serde_as] -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -pub struct BigtableParameters { - project_id: String, - instance_name: String, - #[serde(default)] - is_read_only: bool, - #[serde(default = "default_channel_size")] - channel_size: usize, - - #[serde_as(as = "Option<DurationSeconds<String>>")] - #[serde(default = "default_timeout")] - timeout: Option<std::time::Duration>, - table_name: String, - family_name: String, - - #[serde(default = "default_app_profile_id")] - app_profile_id: String, -} - -impl BigtableParameters { - #[cfg(test)] - pub fn default_for_tests() -> Self { - Self { - project_id: "project-1".into(), - instance_name: "instance-1".into(), - is_read_only: false, - channel_size: default_channel_size(), - timeout: default_timeout(), - table_name: "table-1".into(), - family_name: "cf1".into(), - app_profile_id: default_app_profile_id(), - } - } -} - -fn default_app_profile_id() -> String { - "default".to_owned() -} - -fn default_channel_size() -> usize { - 4 -} - -fn default_timeout() -> Option<std::time::Duration> { - Some(std::time::Duration::from_secs(4)) -} - impl BigtablePathInfoService { #[cfg(not(test))] pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { @@ -412,3 +363,88 @@ impl PathInfoService for BigtablePathInfoService { Box::pin(stream) } } + +/// Represents configuration of [BigtablePathInfoService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option<DurationSeconds<String>>")] + #[serde(default = "default_timeout")] + timeout: Option<std::time::Duration>, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +impl BigtableParameters { + #[cfg(test)] + pub fn default_for_tests() -> Self { + Self { + project_id: "project-1".into(), + instance_name: "instance-1".into(), + is_read_only: false, + channel_size: default_channel_size(), + timeout: default_timeout(), + table_name: "table-1".into(), + family_name: "cf1".into(), + app_profile_id: default_app_profile_id(), + } + } +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option<std::time::Duration> { + Some(std::time::Duration::from_secs(4)) +} + +#[async_trait] +impl ServiceBuilder for BigtableParameters { + type Output = dyn PathInfoService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext, + ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> { + Ok(Arc::new( + BigtablePathInfoService::connect(self.clone()).await?, + )) + } +} + +impl TryFrom<url::Url> for BigtableParameters { + type Error = Box<dyn std::error::Error + Send + Sync>; + fn try_from(mut url: url::Url) -> Result<Self, Self::Error> { + // parse the instance name from the hostname. + let instance_name = url + .host_str() + .ok_or_else(|| Error::StorageError("instance name missing".into()))? + .to_string(); + + // … but add it to the query string now, so we just need to parse that. + url.query_pairs_mut() + .append_pair("instance_name", &instance_name); + + let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) + .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; + + Ok(params) + } +} |