diff options
Diffstat (limited to 'tvix/store/src/pathinfoservice/grpc.rs')
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 144 |
1 files changed, 41 insertions, 103 deletions
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 02e0cb590b..f6a356cf18 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -1,8 +1,11 @@ use super::PathInfoService; -use crate::proto::{self, ListPathInfoRequest, PathInfo}; +use crate::{ + nar::NarCalculationService, + proto::{self, ListPathInfoRequest, PathInfo}, +}; use async_stream::try_stream; -use data_encoding::BASE64; use futures::stream::BoxStream; +use nix_compat::nixbase32; use tonic::{async_trait, transport::Channel, Code}; use tracing::instrument; use tvix_castore::{proto as castorepb, Error}; @@ -27,7 +30,7 @@ impl GRPCPathInfoService { #[async_trait] impl PathInfoService for GRPCPathInfoService { - #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))] + #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let path_info = self .grpc_client @@ -67,30 +70,6 @@ impl PathInfoService for GRPCPathInfoService { Ok(path_info) } - #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))] - async fn calculate_nar( - &self, - root_node: &castorepb::node::Node, - ) -> Result<(u64, [u8; 32]), Error> { - let path_info = self - .grpc_client - .clone() - .calculate_nar(castorepb::Node { - node: Some(root_node.clone()), - }) - .await - .map_err(|e| Error::StorageError(e.to_string()))? - .into_inner(); - - let nar_sha256: [u8; 32] = path_info - .nar_sha256 - .to_vec() - .try_into() - .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; - - Ok((path_info.nar_size, nar_sha256)) - } - #[instrument(level = "trace", skip_all)] fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { let mut grpc_client = self.grpc_client.clone(); @@ -126,88 +105,47 @@ impl PathInfoService for GRPCPathInfoService { } } +#[async_trait] +impl NarCalculationService for GRPCPathInfoService { + #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))] + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + let path_info = self + .grpc_client + .clone() + .calculate_nar(castorepb::Node { + node: Some(root_node.clone()), + }) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + let nar_sha256: [u8; 32] = path_info + .nar_sha256 + .to_vec() + .try_into() + .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; + + Ok((path_info.nar_size, nar_sha256)) + } +} + #[cfg(test)] mod tests { - use std::sync::Arc; - use std::time::Duration; - - use rstest::*; - use tempfile::TempDir; - use tokio::net::UnixListener; - use tokio_retry::strategy::ExponentialBackoff; - use tokio_retry::Retry; - use tokio_stream::wrappers::UnixListenerStream; - use tvix_castore::blobservice::BlobService; - use tvix_castore::directoryservice::DirectoryService; - - use crate::pathinfoservice::MemoryPathInfoService; - use crate::proto::path_info_service_client::PathInfoServiceClient; - use crate::proto::GRPCPathInfoServiceWrapper; - use crate::tests::fixtures::{self, blob_service, directory_service}; - - use super::GRPCPathInfoService; - use super::PathInfoService; + use crate::pathinfoservice::tests::make_grpc_path_info_service_client; + use crate::pathinfoservice::PathInfoService; + use crate::tests::fixtures; /// This ensures connecting via gRPC works as expected. - #[rstest] #[tokio::test] - async fn test_valid_unix_path_ping_pong( - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, - ) { - let tmpdir = TempDir::new().unwrap(); - let socket_path = tmpdir.path().join("daemon"); - - let path_clone = socket_path.clone(); - - // Spin up a server - tokio::spawn(async { - let uds = UnixListener::bind(path_clone).unwrap(); - let uds_stream = UnixListenerStream::new(uds); - - // spin up a new server - let mut server = tonic::transport::Server::builder(); - let router = server.add_service( - crate::proto::path_info_service_server::PathInfoServiceServer::new( - GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new( - blob_service, - directory_service, - )) - as Box<dyn PathInfoService>), - ), - ); - router.serve_with_incoming(uds_stream).await - }); - - // wait for the socket to be created - Retry::spawn( - ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), - || async { - if socket_path.exists() { - Ok(()) - } else { - Err(()) - } - }, - ) - .await - .expect("failed to wait for socket"); - - // prepare a client - let grpc_client = { - let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) - .expect("must parse"); - let client = PathInfoServiceClient::new( - tvix_castore::tonic::channel_from_url(&url) - .await - .expect("must succeed"), - ); - - GRPCPathInfoService::from_client(client) - }; + async fn test_valid_unix_path_ping_pong() { + let (_blob_service, _directory_service, path_info_service) = + make_grpc_path_info_service_client().await; - let path_info = grpc_client - .get(fixtures::DUMMY_OUTPUT_HASH) + let path_info = path_info_service + .get(fixtures::DUMMY_PATH_DIGEST) .await .expect("must not be error"); |