use std::{ collections::HashMap, pin::Pin, sync::Arc, task::{self, Poll}, }; use tokio::io::{self, AsyncWrite}; use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; use url::Url; use crate::composition::{ with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG, }; use crate::nar::{NarCalculationService, SimpleRenderer}; use crate::pathinfoservice::PathInfoService; #[derive(serde::Deserialize, Default)] pub struct CompositionConfigs { pub blobservices: HashMap>>>, pub directoryservices: HashMap< String, DeserializeWithRegistry>>, >, pub pathinfoservices: HashMap< String, DeserializeWithRegistry>>, >, } pub fn addrs_to_configs( blob_service_addr: impl AsRef, directory_service_addr: impl AsRef, path_info_service_addr: impl AsRef, ) -> Result> { let mut configs: CompositionConfigs = Default::default(); let blob_service_url = Url::parse(blob_service_addr.as_ref())?; let directory_service_url = Url::parse(directory_service_addr.as_ref())?; let path_info_service_url = Url::parse(path_info_service_addr.as_ref())?; configs.blobservices.insert( "default".into(), with_registry(®, || blob_service_url.try_into())?, ); configs.directoryservices.insert( "default".into(), with_registry(®, || directory_service_url.try_into())?, ); configs.pathinfoservices.insert( "default".into(), with_registry(®, || path_info_service_url.try_into())?, ); Ok(configs) } /// Construct the store handles from their addrs. pub async fn construct_services( blob_service_addr: impl AsRef, directory_service_addr: impl AsRef, path_info_service_addr: impl AsRef, ) -> Result< ( Arc, Arc, Arc, Box, ), Box, > { let configs = addrs_to_configs( blob_service_addr, directory_service_addr, path_info_service_addr, )?; construct_services_from_configs(configs).await } /// Construct the store handles from their addrs. pub async fn construct_services_from_configs( configs: CompositionConfigs, ) -> Result< ( Arc, Arc, Arc, Box, ), Box, > { let mut comp = Composition::default(); comp.extend(configs.blobservices); comp.extend(configs.directoryservices); comp.extend(configs.pathinfoservices); let blob_service: Arc = comp.build("default").await?; let directory_service: Arc = comp.build("default").await?; let path_info_service: Arc = comp.build("default").await?; // HACK: The grpc client also implements NarCalculationService, and we // really want to use it (otherwise we'd need to fetch everything again for hashing). // Until we revamped store composition and config, detect this special case here. let nar_calculation_service: Box = path_info_service .nar_calculation_service() .unwrap_or_else(|| { Box::new(SimpleRenderer::new( blob_service.clone(), directory_service.clone(), )) }); Ok(( blob_service, directory_service, path_info_service, nar_calculation_service, )) } /// The inverse of [tokio_util::io::SyncIoBridge]. /// Don't use this with anything that actually does blocking I/O. pub struct AsyncIoBridge(pub T); impl AsyncWrite for AsyncIoBridge { fn poll_write( self: Pin<&mut Self>, _cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { Poll::Ready(self.get_mut().0.write(buf)) } fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { Poll::Ready(self.get_mut().0.flush()) } fn poll_shutdown( self: Pin<&mut Self>, _cx: &mut task::Context<'_>, ) -> Poll> { Poll::Ready(Ok(())) } }