diff options
Diffstat (limited to 'tvix/store/src/utils.rs')
-rw-r--r-- | tvix/store/src/utils.rs | 121 |
1 files changed, 83 insertions, 38 deletions
diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs index d82f2214f050..a09786386eba 100644 --- a/tvix/store/src/utils.rs +++ b/tvix/store/src/utils.rs @@ -1,18 +1,60 @@ -use std::sync::Arc; use std::{ + collections::HashMap, pin::Pin, + sync::Arc, task::{self, Poll}, }; use tokio::io::{self, AsyncWrite}; -use tvix_castore::{ - blobservice::{self, BlobService}, - directoryservice::{self, DirectoryService}, -}; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; use url::Url; +use crate::composition::{ + with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG, +}; use crate::nar::{NarCalculationService, SimpleRenderer}; -use crate::pathinfoservice::{self, PathInfoService}; +use crate::pathinfoservice::PathInfoService; + +#[derive(serde::Deserialize, Default)] +pub struct CompositionConfigs { + pub blobservices: + HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>, + pub directoryservices: HashMap< + String, + DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>>, + >, + pub pathinfoservices: HashMap< + String, + DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>, + >, +} + +pub fn addrs_to_configs( + blob_service_addr: impl AsRef<str>, + directory_service_addr: impl AsRef<str>, + path_info_service_addr: impl AsRef<str>, +) -> Result<CompositionConfigs, Box<dyn std::error::Error + Send + Sync>> { + let mut configs: CompositionConfigs = Default::default(); + + let blob_service_url = Url::parse(blob_service_addr.as_ref())?; + let directory_service_url = Url::parse(directory_service_addr.as_ref())?; + let path_info_service_url = Url::parse(path_info_service_addr.as_ref())?; + + configs.blobservices.insert( + "default".into(), + with_registry(®, || blob_service_url.try_into())?, + ); + configs.directoryservices.insert( + "default".into(), + with_registry(®, || directory_service_url.try_into())?, + ); + configs.pathinfoservices.insert( + "default".into(), + with_registry(®, || path_info_service_url.try_into())?, + ); + + Ok(configs) +} /// Construct the store handles from their addrs. pub async fn construct_services( @@ -23,49 +65,52 @@ pub async fn construct_services( ( Arc<dyn BlobService>, Arc<dyn DirectoryService>, - Box<dyn PathInfoService>, + Arc<dyn PathInfoService>, Box<dyn NarCalculationService>, ), Box<dyn std::error::Error + Send + Sync>, > { - let blob_service: Arc<dyn BlobService> = - blobservice::from_addr(blob_service_addr.as_ref()).await?; - let directory_service: Arc<dyn DirectoryService> = - directoryservice::from_addr(directory_service_addr.as_ref()).await?; - - let path_info_service = pathinfoservice::from_addr( - path_info_service_addr.as_ref(), - blob_service.clone(), - directory_service.clone(), - ) - .await?; + let configs = addrs_to_configs( + blob_service_addr, + directory_service_addr, + path_info_service_addr, + )?; + construct_services_from_configs(configs).await +} + +/// Construct the store handles from their addrs. +pub async fn construct_services_from_configs( + configs: CompositionConfigs, +) -> Result< + ( + Arc<dyn BlobService>, + Arc<dyn DirectoryService>, + Arc<dyn PathInfoService>, + Box<dyn NarCalculationService>, + ), + Box<dyn std::error::Error + Send + Sync>, +> { + let mut comp = Composition::default(); + + comp.extend(configs.blobservices); + comp.extend(configs.directoryservices); + comp.extend(configs.pathinfoservices); + + let blob_service: Arc<dyn BlobService> = comp.build("default").await?; + let directory_service: Arc<dyn DirectoryService> = comp.build("default").await?; + let path_info_service: Arc<dyn PathInfoService> = comp.build("default").await?; // HACK: The grpc client also implements NarCalculationService, and we // really want to use it (otherwise we'd need to fetch everything again for hashing). // Until we revamped store composition and config, detect this special case here. - let nar_calculation_service: Box<dyn NarCalculationService> = { - use crate::pathinfoservice::GRPCPathInfoService; - use crate::proto::path_info_service_client::PathInfoServiceClient; - - let url = Url::parse(path_info_service_addr.as_ref()) - .map_err(|e| io::Error::other(e.to_string()))?; - - if url.scheme().starts_with("grpc+") { - Box::new(GRPCPathInfoService::from_client( - PathInfoServiceClient::with_interceptor( - tvix_castore::tonic::channel_from_url(&url) - .await - .map_err(|e| io::Error::other(e.to_string()))?, - tvix_tracing::propagate::tonic::send_trace, - ), - )) - } else { + let nar_calculation_service: Box<dyn NarCalculationService> = path_info_service + .nar_calculation_service() + .unwrap_or_else(|| { Box::new(SimpleRenderer::new( blob_service.clone(), directory_service.clone(), - )) as Box<dyn NarCalculationService> - } - }; + )) + }); Ok(( blob_service, |