diff options
author | Yureka <tvl@yuka.dev> | 2024-06-16T23·10+0200 |
---|---|---|
committer | yuka <yuka@yuka.dev> | 2024-07-18T10·42+0000 |
commit | 1a6b6e3ef310c8eea37b55f8007c85a8772ff8e9 (patch) | |
tree | 228afb6571ccb8e010f991841b32f3337f78c332 /tvix/castore/src/directoryservice | |
parent | 64fd1d3e56cb0edcbe77227cf18ca8425438a68a (diff) |
feat(tvix/castore): add composition module r/8365
Change-Id: I0868f3278db85ae5fe030089ee9033837bc08748 Signed-off-by: Yureka <tvl@yuka.dev> Reviewed-on: https://cl.tvl.fyi/c/depot/+/11853 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r-- | tvix/castore/src/directoryservice/bigtable.rs | 87 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/combinators.rs | 29 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 23 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/memory.rs | 17 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/mod.rs | 25 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/object_store.rs | 31 |
6 files changed, 170 insertions, 42 deletions
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs index 1194c6ddc999..69edc05d787d 100644 --- a/tvix/castore/src/directoryservice/bigtable.rs +++ b/tvix/castore/src/directoryservice/bigtable.rs @@ -5,10 +5,12 @@ use futures::stream::BoxStream; use prost::Message; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DurationSeconds}; +use std::sync::Arc; use tonic::async_trait; use tracing::{instrument, trace, warn}; use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; /// There should not be more than 10 MiB in a single cell. @@ -43,41 +45,6 @@ pub struct BigtableDirectoryService { emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>, } -/// Represents configuration of [BigtableDirectoryService]. -/// This currently conflates both connect parameters and data model/client -/// behaviour parameters. -#[serde_as] -#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -pub struct BigtableParameters { - project_id: String, - instance_name: String, - #[serde(default)] - is_read_only: bool, - #[serde(default = "default_channel_size")] - channel_size: usize, - - #[serde_as(as = "Option<DurationSeconds<String>>")] - #[serde(default = "default_timeout")] - timeout: Option<std::time::Duration>, - table_name: String, - family_name: String, - - #[serde(default = "default_app_profile_id")] - app_profile_id: String, -} - -fn default_app_profile_id() -> String { - "default".to_owned() -} - -fn default_channel_size() -> usize { - 4 -} - -fn default_timeout() -> Option<std::time::Duration> { - Some(std::time::Duration::from_secs(4)) -} - impl BigtableDirectoryService { #[cfg(not(test))] pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> { @@ -355,3 +322,53 @@ impl DirectoryService for BigtableDirectoryService { Box::new(SimplePutter::new(self.clone())) } } + +/// Represents configuration of [BigtableDirectoryService]. +/// This currently conflates both connect parameters and data model/client +/// behaviour parameters. +#[serde_as] +#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct BigtableParameters { + project_id: String, + instance_name: String, + #[serde(default)] + is_read_only: bool, + #[serde(default = "default_channel_size")] + channel_size: usize, + + #[serde_as(as = "Option<DurationSeconds<String>>")] + #[serde(default = "default_timeout")] + timeout: Option<std::time::Duration>, + table_name: String, + family_name: String, + + #[serde(default = "default_app_profile_id")] + app_profile_id: String, +} + +#[async_trait] +impl ServiceBuilder for BigtableParameters { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext<dyn DirectoryService>, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> { + Ok(Arc::new( + BigtableDirectoryService::connect(self.clone()).await?, + )) + } +} + +fn default_app_profile_id() -> String { + "default".to_owned() +} + +fn default_channel_size() -> usize { + 4 +} + +fn default_timeout() -> Option<std::time::Duration> { + Some(std::time::Duration::from_secs(4)) +} diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs index d3f351d6b689..2364a313d5f4 100644 --- a/tvix/castore/src/directoryservice/combinators.rs +++ b/tvix/castore/src/directoryservice/combinators.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use futures::stream::BoxStream; use futures::StreamExt; use futures::TryFutureExt; @@ -6,6 +8,7 @@ use tonic::async_trait; use tracing::{instrument, trace}; use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::directoryservice::DirectoryPutter; use crate::proto; use crate::B3Digest; @@ -140,3 +143,29 @@ where Box::new(SimplePutter::new((*self).clone())) } } + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct CacheConfig { + near: String, + far: String, +} + +#[async_trait] +impl ServiceBuilder for CacheConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + context: &CompositionContext<dyn DirectoryService>, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (near, far) = futures::join!( + context.resolve(self.near.clone()), + context.resolve(self.far.clone()) + ); + Ok(Arc::new(Cache { + near: near?, + far: far?, + })) + } +} diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index ca9b0de07b14..e2ca3954c112 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,10 +1,12 @@ use std::collections::HashSet; use super::{DirectoryPutter, DirectoryService}; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::proto::{self, get_directory_request::ByWhat}; use crate::{B3Digest, Error}; use async_stream::try_stream; use futures::stream::BoxStream; +use std::sync::Arc; use tokio::spawn; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; @@ -216,6 +218,27 @@ where } } +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct GRPCDirectoryServiceConfig { + url: String, +} + +#[async_trait] +impl ServiceBuilder for GRPCDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext<dyn DirectoryService>, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let client = proto::directory_service_client::DirectoryServiceClient::new( + crate::tonic::channel_from_url(&self.url.parse()?).await?, + ); + Ok(Arc::new(GRPCDirectoryService::from_client(client))) + } +} + /// Allows uploading multiple Directory messages in the same gRPC stream. pub struct GRPCPutter { /// Data about the current request - a handle to the task, and the tx part diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index 3b2795c3968c..f12ef6e977d0 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -8,6 +8,7 @@ use tracing::{instrument, warn}; use super::utils::traverse_directory; use super::{DirectoryPutter, DirectoryService, SimplePutter}; +use crate::composition::{CompositionContext, ServiceBuilder}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { @@ -85,3 +86,19 @@ impl DirectoryService for MemoryDirectoryService { Box::new(SimplePutter::new(self.clone())) } } + +#[derive(serde::Deserialize, Debug)] +#[serde(deny_unknown_fields)] +pub struct MemoryDirectoryServiceConfig {} + +#[async_trait] +impl ServiceBuilder for MemoryDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext<dyn DirectoryService>, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + Ok(Arc::new(MemoryDirectoryService::default())) + } +} diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index eff4a685fa6d..815502c81455 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -1,7 +1,8 @@ +use crate::composition::{Registry, ServiceBuilder}; use crate::{proto, B3Digest, Error}; + use futures::stream::BoxStream; use tonic::async_trait; - mod combinators; mod directory_graph; mod from_addr; @@ -16,12 +17,12 @@ pub mod tests; mod traverse; mod utils; -pub use self::combinators::Cache; +pub use self::combinators::{Cache, CacheConfig}; pub use self::directory_graph::DirectoryGraph; pub use self::from_addr::from_addr; -pub use self::grpc::GRPCDirectoryService; -pub use self::memory::MemoryDirectoryService; -pub use self::object_store::ObjectStoreDirectoryService; +pub use self::grpc::{GRPCDirectoryService, GRPCDirectoryServiceConfig}; +pub use self::memory::{MemoryDirectoryService, MemoryDirectoryServiceConfig}; +pub use self::object_store::{ObjectStoreDirectoryService, ObjectStoreDirectoryServiceConfig}; pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator}; pub use self::simple_putter::SimplePutter; pub use self::sled::SledDirectoryService; @@ -32,7 +33,7 @@ pub use self::utils::traverse_directory; mod bigtable; #[cfg(feature = "cloud")] -pub use self::bigtable::BigtableDirectoryService; +pub use self::bigtable::{BigtableDirectoryService, BigtableParameters}; /// The base trait all Directory services need to implement. /// This is a simple get and put of [crate::proto::Directory], returning their @@ -126,3 +127,15 @@ pub trait DirectoryPutter: Send { /// be returned. async fn close(&mut self) -> Result<B3Digest, Error>; } + +/// Registers the builtin DirectoryService implementations with the registry +pub(crate) fn register_directory_services(reg: &mut Registry) { + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::ObjectStoreDirectoryServiceConfig>("objectstore"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::MemoryDirectoryServiceConfig>("memory"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::CacheConfig>("cache"); + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::GRPCDirectoryServiceConfig>("grpc"); + #[cfg(feature = "cloud")] + { + reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable"); + } +} diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs index feaaaa39cd50..1977de18fbec 100644 --- a/tvix/castore/src/directoryservice/object_store.rs +++ b/tvix/castore/src/directoryservice/object_store.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use data_encoding::HEXLOWER; @@ -18,6 +19,7 @@ use url::Url; use super::{ DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator, }; +use crate::composition::{CompositionContext, ServiceBuilder}; use crate::{proto, B3Digest, Error}; /// Stores directory closures in an object store. @@ -46,7 +48,7 @@ fn derive_dirs_path(base_path: &Path, digest: &B3Digest) -> Path { const MAX_FRAME_LENGTH: usize = 1 * 1024 * 1024 * 1000; // 1 MiB // impl ObjectStoreDirectoryService { - /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by + /// Constructs a new [ObjectStoreDirectoryService] from a [Url] supported by /// [object_store]. /// Any path suffix becomes the base path of the object store. /// additional options, the same as in [object_store::parse_url_opts] can @@ -169,6 +171,33 @@ impl DirectoryService for ObjectStoreDirectoryService { } } +#[derive(serde::Deserialize)] +#[serde(deny_unknown_fields)] +pub struct ObjectStoreDirectoryServiceConfig { + object_store_url: String, + #[serde(default)] + object_store_options: HashMap<String, String>, +} + +#[async_trait] +impl ServiceBuilder for ObjectStoreDirectoryServiceConfig { + type Output = dyn DirectoryService; + async fn build<'a>( + &'a self, + _instance_name: &str, + _context: &CompositionContext<dyn DirectoryService>, + ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> { + let (object_store, path) = object_store::parse_url_opts( + &self.object_store_url.parse()?, + &self.object_store_options, + )?; + Ok(Arc::new(ObjectStoreDirectoryService { + object_store: Arc::new(object_store), + base_path: path, + })) + } +} + struct ObjectStoreDirectoryPutter { object_store: Arc<dyn ObjectStore>, base_path: Path, |