about summary refs log tree commit diff
path: root/tvix/store/src/utils.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/utils.rs')
-rw-r--r--tvix/store/src/utils.rs121
1 files changed, 83 insertions, 38 deletions
diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs
index d82f2214f050..a09786386eba 100644
--- a/tvix/store/src/utils.rs
+++ b/tvix/store/src/utils.rs
@@ -1,18 +1,60 @@
-use std::sync::Arc;
 use std::{
+    collections::HashMap,
     pin::Pin,
+    sync::Arc,
     task::{self, Poll},
 };
 use tokio::io::{self, AsyncWrite};
 
-use tvix_castore::{
-    blobservice::{self, BlobService},
-    directoryservice::{self, DirectoryService},
-};
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
 use url::Url;
 
+use crate::composition::{
+    with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG,
+};
 use crate::nar::{NarCalculationService, SimpleRenderer};
-use crate::pathinfoservice::{self, PathInfoService};
+use crate::pathinfoservice::PathInfoService;
+
+#[derive(serde::Deserialize, Default)]
+pub struct CompositionConfigs {
+    pub blobservices:
+        HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>,
+    pub directoryservices: HashMap<
+        String,
+        DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>>,
+    >,
+    pub pathinfoservices: HashMap<
+        String,
+        DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>,
+    >,
+}
+
+pub fn addrs_to_configs(
+    blob_service_addr: impl AsRef<str>,
+    directory_service_addr: impl AsRef<str>,
+    path_info_service_addr: impl AsRef<str>,
+) -> Result<CompositionConfigs, Box<dyn std::error::Error + Send + Sync>> {
+    let mut configs: CompositionConfigs = Default::default();
+
+    let blob_service_url = Url::parse(blob_service_addr.as_ref())?;
+    let directory_service_url = Url::parse(directory_service_addr.as_ref())?;
+    let path_info_service_url = Url::parse(path_info_service_addr.as_ref())?;
+
+    configs.blobservices.insert(
+        "default".into(),
+        with_registry(&REG, || blob_service_url.try_into())?,
+    );
+    configs.directoryservices.insert(
+        "default".into(),
+        with_registry(&REG, || directory_service_url.try_into())?,
+    );
+    configs.pathinfoservices.insert(
+        "default".into(),
+        with_registry(&REG, || path_info_service_url.try_into())?,
+    );
+
+    Ok(configs)
+}
 
 /// Construct the store handles from their addrs.
 pub async fn construct_services(
@@ -23,49 +65,52 @@ pub async fn construct_services(
     (
         Arc<dyn BlobService>,
         Arc<dyn DirectoryService>,
-        Box<dyn PathInfoService>,
+        Arc<dyn PathInfoService>,
         Box<dyn NarCalculationService>,
     ),
     Box<dyn std::error::Error + Send + Sync>,
 > {
-    let blob_service: Arc<dyn BlobService> =
-        blobservice::from_addr(blob_service_addr.as_ref()).await?;
-    let directory_service: Arc<dyn DirectoryService> =
-        directoryservice::from_addr(directory_service_addr.as_ref()).await?;
-
-    let path_info_service = pathinfoservice::from_addr(
-        path_info_service_addr.as_ref(),
-        blob_service.clone(),
-        directory_service.clone(),
-    )
-    .await?;
+    let configs = addrs_to_configs(
+        blob_service_addr,
+        directory_service_addr,
+        path_info_service_addr,
+    )?;
+    construct_services_from_configs(configs).await
+}
+
+/// Construct the store handles from their addrs.
+pub async fn construct_services_from_configs(
+    configs: CompositionConfigs,
+) -> Result<
+    (
+        Arc<dyn BlobService>,
+        Arc<dyn DirectoryService>,
+        Arc<dyn PathInfoService>,
+        Box<dyn NarCalculationService>,
+    ),
+    Box<dyn std::error::Error + Send + Sync>,
+> {
+    let mut comp = Composition::default();
+
+    comp.extend(configs.blobservices);
+    comp.extend(configs.directoryservices);
+    comp.extend(configs.pathinfoservices);
+
+    let blob_service: Arc<dyn BlobService> = comp.build("default").await?;
+    let directory_service: Arc<dyn DirectoryService> = comp.build("default").await?;
+    let path_info_service: Arc<dyn PathInfoService> = comp.build("default").await?;
 
     // HACK: The grpc client also implements NarCalculationService, and we
     // really want to use it (otherwise we'd need to fetch everything again for hashing).
     // Until we revamped store composition and config, detect this special case here.
-    let nar_calculation_service: Box<dyn NarCalculationService> = {
-        use crate::pathinfoservice::GRPCPathInfoService;
-        use crate::proto::path_info_service_client::PathInfoServiceClient;
-
-        let url = Url::parse(path_info_service_addr.as_ref())
-            .map_err(|e| io::Error::other(e.to_string()))?;
-
-        if url.scheme().starts_with("grpc+") {
-            Box::new(GRPCPathInfoService::from_client(
-                PathInfoServiceClient::with_interceptor(
-                    tvix_castore::tonic::channel_from_url(&url)
-                        .await
-                        .map_err(|e| io::Error::other(e.to_string()))?,
-                    tvix_tracing::propagate::tonic::send_trace,
-                ),
-            ))
-        } else {
+    let nar_calculation_service: Box<dyn NarCalculationService> = path_info_service
+        .nar_calculation_service()
+        .unwrap_or_else(|| {
             Box::new(SimpleRenderer::new(
                 blob_service.clone(),
                 directory_service.clone(),
-            )) as Box<dyn NarCalculationService>
-        }
-    };
+            ))
+        });
 
     Ok((
         blob_service,