about summary refs log tree commit diff
path: root/tvix/castore/src/directoryservice
diff options
context:
space:
mode:
authorYureka <tvl@yuka.dev>2024-06-16T23·10+0200
committeryuka <yuka@yuka.dev>2024-07-18T10·42+0000
commit1a6b6e3ef310c8eea37b55f8007c85a8772ff8e9 (patch)
tree228afb6571ccb8e010f991841b32f3337f78c332 /tvix/castore/src/directoryservice
parent64fd1d3e56cb0edcbe77227cf18ca8425438a68a (diff)
feat(tvix/castore): add composition module r/8365
Change-Id: I0868f3278db85ae5fe030089ee9033837bc08748
Signed-off-by: Yureka <tvl@yuka.dev>
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11853
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r--tvix/castore/src/directoryservice/bigtable.rs87
-rw-r--r--tvix/castore/src/directoryservice/combinators.rs29
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs23
-rw-r--r--tvix/castore/src/directoryservice/memory.rs17
-rw-r--r--tvix/castore/src/directoryservice/mod.rs25
-rw-r--r--tvix/castore/src/directoryservice/object_store.rs31
6 files changed, 170 insertions, 42 deletions
diff --git a/tvix/castore/src/directoryservice/bigtable.rs b/tvix/castore/src/directoryservice/bigtable.rs
index 1194c6ddc999..69edc05d787d 100644
--- a/tvix/castore/src/directoryservice/bigtable.rs
+++ b/tvix/castore/src/directoryservice/bigtable.rs
@@ -5,10 +5,12 @@ use futures::stream::BoxStream;
 use prost::Message;
 use serde::{Deserialize, Serialize};
 use serde_with::{serde_as, DurationSeconds};
+use std::sync::Arc;
 use tonic::async_trait;
 use tracing::{instrument, trace, warn};
 
 use super::{utils::traverse_directory, DirectoryPutter, DirectoryService, SimplePutter};
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
 
 /// There should not be more than 10 MiB in a single cell.
@@ -43,41 +45,6 @@ pub struct BigtableDirectoryService {
     emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
 }
 
-/// Represents configuration of [BigtableDirectoryService].
-/// This currently conflates both connect parameters and data model/client
-/// behaviour parameters.
-#[serde_as]
-#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
-pub struct BigtableParameters {
-    project_id: String,
-    instance_name: String,
-    #[serde(default)]
-    is_read_only: bool,
-    #[serde(default = "default_channel_size")]
-    channel_size: usize,
-
-    #[serde_as(as = "Option<DurationSeconds<String>>")]
-    #[serde(default = "default_timeout")]
-    timeout: Option<std::time::Duration>,
-    table_name: String,
-    family_name: String,
-
-    #[serde(default = "default_app_profile_id")]
-    app_profile_id: String,
-}
-
-fn default_app_profile_id() -> String {
-    "default".to_owned()
-}
-
-fn default_channel_size() -> usize {
-    4
-}
-
-fn default_timeout() -> Option<std::time::Duration> {
-    Some(std::time::Duration::from_secs(4))
-}
-
 impl BigtableDirectoryService {
     #[cfg(not(test))]
     pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
@@ -355,3 +322,53 @@ impl DirectoryService for BigtableDirectoryService {
         Box::new(SimplePutter::new(self.clone()))
     }
 }
+
+/// Represents configuration of [BigtableDirectoryService].
+/// This currently conflates both connect parameters and data model/client
+/// behaviour parameters.
+#[serde_as]
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
+#[serde(deny_unknown_fields)]
+pub struct BigtableParameters {
+    project_id: String,
+    instance_name: String,
+    #[serde(default)]
+    is_read_only: bool,
+    #[serde(default = "default_channel_size")]
+    channel_size: usize,
+
+    #[serde_as(as = "Option<DurationSeconds<String>>")]
+    #[serde(default = "default_timeout")]
+    timeout: Option<std::time::Duration>,
+    table_name: String,
+    family_name: String,
+
+    #[serde(default = "default_app_profile_id")]
+    app_profile_id: String,
+}
+
+#[async_trait]
+impl ServiceBuilder for BigtableParameters {
+    type Output = dyn DirectoryService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext<dyn DirectoryService>,
+    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync>> {
+        Ok(Arc::new(
+            BigtableDirectoryService::connect(self.clone()).await?,
+        ))
+    }
+}
+
+fn default_app_profile_id() -> String {
+    "default".to_owned()
+}
+
+fn default_channel_size() -> usize {
+    4
+}
+
+fn default_timeout() -> Option<std::time::Duration> {
+    Some(std::time::Duration::from_secs(4))
+}
diff --git a/tvix/castore/src/directoryservice/combinators.rs b/tvix/castore/src/directoryservice/combinators.rs
index d3f351d6b689..2364a313d5f4 100644
--- a/tvix/castore/src/directoryservice/combinators.rs
+++ b/tvix/castore/src/directoryservice/combinators.rs
@@ -1,3 +1,5 @@
+use std::sync::Arc;
+
 use futures::stream::BoxStream;
 use futures::StreamExt;
 use futures::TryFutureExt;
@@ -6,6 +8,7 @@ use tonic::async_trait;
 use tracing::{instrument, trace};
 
 use super::{DirectoryGraph, DirectoryService, RootToLeavesValidator, SimplePutter};
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::directoryservice::DirectoryPutter;
 use crate::proto;
 use crate::B3Digest;
@@ -140,3 +143,29 @@ where
         Box::new(SimplePutter::new((*self).clone()))
     }
 }
+
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct CacheConfig {
+    near: String,
+    far: String,
+}
+
+#[async_trait]
+impl ServiceBuilder for CacheConfig {
+    type Output = dyn DirectoryService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        context: &CompositionContext<dyn DirectoryService>,
+    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let (near, far) = futures::join!(
+            context.resolve(self.near.clone()),
+            context.resolve(self.far.clone())
+        );
+        Ok(Arc::new(Cache {
+            near: near?,
+            far: far?,
+        }))
+    }
+}
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index ca9b0de07b14..e2ca3954c112 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -1,10 +1,12 @@
 use std::collections::HashSet;
 
 use super::{DirectoryPutter, DirectoryService};
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::proto::{self, get_directory_request::ByWhat};
 use crate::{B3Digest, Error};
 use async_stream::try_stream;
 use futures::stream::BoxStream;
+use std::sync::Arc;
 use tokio::spawn;
 use tokio::sync::mpsc::UnboundedSender;
 use tokio::task::JoinHandle;
@@ -216,6 +218,27 @@ where
     }
 }
 
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct GRPCDirectoryServiceConfig {
+    url: String,
+}
+
+#[async_trait]
+impl ServiceBuilder for GRPCDirectoryServiceConfig {
+    type Output = dyn DirectoryService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext<dyn DirectoryService>,
+    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let client = proto::directory_service_client::DirectoryServiceClient::new(
+            crate::tonic::channel_from_url(&self.url.parse()?).await?,
+        );
+        Ok(Arc::new(GRPCDirectoryService::from_client(client)))
+    }
+}
+
 /// Allows uploading multiple Directory messages in the same gRPC stream.
 pub struct GRPCPutter {
     /// Data about the current request - a handle to the task, and the tx part
diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs
index 3b2795c3968c..f12ef6e977d0 100644
--- a/tvix/castore/src/directoryservice/memory.rs
+++ b/tvix/castore/src/directoryservice/memory.rs
@@ -8,6 +8,7 @@ use tracing::{instrument, warn};
 
 use super::utils::traverse_directory;
 use super::{DirectoryPutter, DirectoryService, SimplePutter};
+use crate::composition::{CompositionContext, ServiceBuilder};
 
 #[derive(Clone, Default)]
 pub struct MemoryDirectoryService {
@@ -85,3 +86,19 @@ impl DirectoryService for MemoryDirectoryService {
         Box::new(SimplePutter::new(self.clone()))
     }
 }
+
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct MemoryDirectoryServiceConfig {}
+
+#[async_trait]
+impl ServiceBuilder for MemoryDirectoryServiceConfig {
+    type Output = dyn DirectoryService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext<dyn DirectoryService>,
+    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        Ok(Arc::new(MemoryDirectoryService::default()))
+    }
+}
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
index eff4a685fa6d..815502c81455 100644
--- a/tvix/castore/src/directoryservice/mod.rs
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -1,7 +1,8 @@
+use crate::composition::{Registry, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
+
 use futures::stream::BoxStream;
 use tonic::async_trait;
-
 mod combinators;
 mod directory_graph;
 mod from_addr;
@@ -16,12 +17,12 @@ pub mod tests;
 mod traverse;
 mod utils;
 
-pub use self::combinators::Cache;
+pub use self::combinators::{Cache, CacheConfig};
 pub use self::directory_graph::DirectoryGraph;
 pub use self::from_addr::from_addr;
-pub use self::grpc::GRPCDirectoryService;
-pub use self::memory::MemoryDirectoryService;
-pub use self::object_store::ObjectStoreDirectoryService;
+pub use self::grpc::{GRPCDirectoryService, GRPCDirectoryServiceConfig};
+pub use self::memory::{MemoryDirectoryService, MemoryDirectoryServiceConfig};
+pub use self::object_store::{ObjectStoreDirectoryService, ObjectStoreDirectoryServiceConfig};
 pub use self::order_validator::{LeavesToRootValidator, OrderValidator, RootToLeavesValidator};
 pub use self::simple_putter::SimplePutter;
 pub use self::sled::SledDirectoryService;
@@ -32,7 +33,7 @@ pub use self::utils::traverse_directory;
 mod bigtable;
 
 #[cfg(feature = "cloud")]
-pub use self::bigtable::BigtableDirectoryService;
+pub use self::bigtable::{BigtableDirectoryService, BigtableParameters};
 
 /// The base trait all Directory services need to implement.
 /// This is a simple get and put of [crate::proto::Directory], returning their
@@ -126,3 +127,15 @@ pub trait DirectoryPutter: Send {
     /// be returned.
     async fn close(&mut self) -> Result<B3Digest, Error>;
 }
+
+/// Registers the builtin DirectoryService implementations with the registry
+pub(crate) fn register_directory_services(reg: &mut Registry) {
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::ObjectStoreDirectoryServiceConfig>("objectstore");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::MemoryDirectoryServiceConfig>("memory");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::CacheConfig>("cache");
+    reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::GRPCDirectoryServiceConfig>("grpc");
+    #[cfg(feature = "cloud")]
+    {
+        reg.register::<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>, super::directoryservice::BigtableParameters>("bigtable");
+    }
+}
diff --git a/tvix/castore/src/directoryservice/object_store.rs b/tvix/castore/src/directoryservice/object_store.rs
index feaaaa39cd50..1977de18fbec 100644
--- a/tvix/castore/src/directoryservice/object_store.rs
+++ b/tvix/castore/src/directoryservice/object_store.rs
@@ -1,3 +1,4 @@
+use std::collections::HashMap;
 use std::sync::Arc;
 
 use data_encoding::HEXLOWER;
@@ -18,6 +19,7 @@ use url::Url;
 use super::{
     DirectoryGraph, DirectoryPutter, DirectoryService, LeavesToRootValidator, RootToLeavesValidator,
 };
+use crate::composition::{CompositionContext, ServiceBuilder};
 use crate::{proto, B3Digest, Error};
 
 /// Stores directory closures in an object store.
@@ -46,7 +48,7 @@ fn derive_dirs_path(base_path: &Path, digest: &B3Digest) -> Path {
 const MAX_FRAME_LENGTH: usize = 1 * 1024 * 1024 * 1000; // 1 MiB
                                                         //
 impl ObjectStoreDirectoryService {
-    /// Constructs a new [ObjectStoreBlobService] from a [Url] supported by
+    /// Constructs a new [ObjectStoreDirectoryService] from a [Url] supported by
     /// [object_store].
     /// Any path suffix becomes the base path of the object store.
     /// additional options, the same as in [object_store::parse_url_opts] can
@@ -169,6 +171,33 @@ impl DirectoryService for ObjectStoreDirectoryService {
     }
 }
 
+#[derive(serde::Deserialize)]
+#[serde(deny_unknown_fields)]
+pub struct ObjectStoreDirectoryServiceConfig {
+    object_store_url: String,
+    #[serde(default)]
+    object_store_options: HashMap<String, String>,
+}
+
+#[async_trait]
+impl ServiceBuilder for ObjectStoreDirectoryServiceConfig {
+    type Output = dyn DirectoryService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext<dyn DirectoryService>,
+    ) -> Result<Arc<dyn DirectoryService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let (object_store, path) = object_store::parse_url_opts(
+            &self.object_store_url.parse()?,
+            &self.object_store_options,
+        )?;
+        Ok(Arc::new(ObjectStoreDirectoryService {
+            object_store: Arc::new(object_store),
+            base_path: path,
+        }))
+    }
+}
+
 struct ObjectStoreDirectoryPutter {
     object_store: Arc<dyn ObjectStore>,
     base_path: Path,