use std::sync::Arc; use std::{ pin::Pin, task::{self, Poll}, }; use tokio::io::{self, AsyncWrite}; use tvix_castore::{ blobservice::{self, BlobService}, directoryservice::{self, DirectoryService}, }; use url::Url; use crate::nar::{NarCalculationService, SimpleRenderer}; use crate::pathinfoservice::{self, PathInfoService}; /// Construct the store handles from their addrs. pub async fn construct_services( blob_service_addr: impl AsRef, directory_service_addr: impl AsRef, path_info_service_addr: impl AsRef, ) -> Result< ( Arc, Arc, Box, Box, ), Box, > { let blob_service: Arc = blobservice::from_addr(blob_service_addr.as_ref()).await?; let directory_service: Arc = directoryservice::from_addr(directory_service_addr.as_ref()).await?; let path_info_service = pathinfoservice::from_addr( path_info_service_addr.as_ref(), blob_service.clone(), directory_service.clone(), ) .await?; // HACK: The grpc client also implements NarCalculationService, and we // really want to use it (otherwise we'd need to fetch everything again for hashing). // Until we revamped store composition and config, detect this special case here. let nar_calculation_service: Box = { use crate::pathinfoservice::GRPCPathInfoService; use crate::proto::path_info_service_client::PathInfoServiceClient; let url = Url::parse(path_info_service_addr.as_ref()) .map_err(|e| io::Error::other(e.to_string()))?; if url.scheme().starts_with("grpc+") { Box::new(GRPCPathInfoService::from_client( PathInfoServiceClient::with_interceptor( tvix_castore::tonic::channel_from_url(&url) .await .map_err(|e| io::Error::other(e.to_string()))?, tvix_tracing::propagate::tonic::send_trace, ), )) } else { Box::new(SimpleRenderer::new( blob_service.clone(), directory_service.clone(), )) as Box } }; Ok(( blob_service, directory_service, path_info_service, nar_calculation_service, )) } /// The inverse of [tokio_util::io::SyncIoBridge]. /// Don't use this with anything that actually does blocking I/O. pub struct AsyncIoBridge(pub T); impl AsyncWrite for AsyncIoBridge { fn poll_write( self: Pin<&mut Self>, _cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { Poll::Ready(self.get_mut().0.write(buf)) } fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { Poll::Ready(self.get_mut().0.flush()) } fn poll_shutdown( self: Pin<&mut Self>, _cx: &mut task::Context<'_>, ) -> Poll> { Poll::Ready(Ok(())) } }