about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice/grpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/pathinfoservice/grpc.rs')
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs144
1 files changed, 41 insertions, 103 deletions
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index 02e0cb590b..f6a356cf18 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -1,8 +1,11 @@
 use super::PathInfoService;
-use crate::proto::{self, ListPathInfoRequest, PathInfo};
+use crate::{
+    nar::NarCalculationService,
+    proto::{self, ListPathInfoRequest, PathInfo},
+};
 use async_stream::try_stream;
-use data_encoding::BASE64;
 use futures::stream::BoxStream;
+use nix_compat::nixbase32;
 use tonic::{async_trait, transport::Channel, Code};
 use tracing::instrument;
 use tvix_castore::{proto as castorepb, Error};
@@ -27,7 +30,7 @@ impl GRPCPathInfoService {
 
 #[async_trait]
 impl PathInfoService for GRPCPathInfoService {
-    #[instrument(level = "trace", skip_all, fields(path_info.digest = BASE64.encode(&digest)))]
+    #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))]
     async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
         let path_info = self
             .grpc_client
@@ -67,30 +70,6 @@ impl PathInfoService for GRPCPathInfoService {
         Ok(path_info)
     }
 
-    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
-        let path_info = self
-            .grpc_client
-            .clone()
-            .calculate_nar(castorepb::Node {
-                node: Some(root_node.clone()),
-            })
-            .await
-            .map_err(|e| Error::StorageError(e.to_string()))?
-            .into_inner();
-
-        let nar_sha256: [u8; 32] = path_info
-            .nar_sha256
-            .to_vec()
-            .try_into()
-            .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
-
-        Ok((path_info.nar_size, nar_sha256))
-    }
-
     #[instrument(level = "trace", skip_all)]
     fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         let mut grpc_client = self.grpc_client.clone();
@@ -126,88 +105,47 @@ impl PathInfoService for GRPCPathInfoService {
     }
 }
 
+#[async_trait]
+impl NarCalculationService for GRPCPathInfoService {
+    #[instrument(level = "trace", skip_all, fields(root_node = ?root_node))]
+    async fn calculate_nar(
+        &self,
+        root_node: &castorepb::node::Node,
+    ) -> Result<(u64, [u8; 32]), Error> {
+        let path_info = self
+            .grpc_client
+            .clone()
+            .calculate_nar(castorepb::Node {
+                node: Some(root_node.clone()),
+            })
+            .await
+            .map_err(|e| Error::StorageError(e.to_string()))?
+            .into_inner();
+
+        let nar_sha256: [u8; 32] = path_info
+            .nar_sha256
+            .to_vec()
+            .try_into()
+            .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
+
+        Ok((path_info.nar_size, nar_sha256))
+    }
+}
+
 #[cfg(test)]
 mod tests {
-    use std::sync::Arc;
-    use std::time::Duration;
-
-    use rstest::*;
-    use tempfile::TempDir;
-    use tokio::net::UnixListener;
-    use tokio_retry::strategy::ExponentialBackoff;
-    use tokio_retry::Retry;
-    use tokio_stream::wrappers::UnixListenerStream;
-    use tvix_castore::blobservice::BlobService;
-    use tvix_castore::directoryservice::DirectoryService;
-
-    use crate::pathinfoservice::MemoryPathInfoService;
-    use crate::proto::path_info_service_client::PathInfoServiceClient;
-    use crate::proto::GRPCPathInfoServiceWrapper;
-    use crate::tests::fixtures::{self, blob_service, directory_service};
-
-    use super::GRPCPathInfoService;
-    use super::PathInfoService;
+    use crate::pathinfoservice::tests::make_grpc_path_info_service_client;
+    use crate::pathinfoservice::PathInfoService;
+    use crate::tests::fixtures;
 
     /// This ensures connecting via gRPC works as expected.
-    #[rstest]
     #[tokio::test]
-    async fn test_valid_unix_path_ping_pong(
-        blob_service: Arc<dyn BlobService>,
-        directory_service: Arc<dyn DirectoryService>,
-    ) {
-        let tmpdir = TempDir::new().unwrap();
-        let socket_path = tmpdir.path().join("daemon");
-
-        let path_clone = socket_path.clone();
-
-        // Spin up a server
-        tokio::spawn(async {
-            let uds = UnixListener::bind(path_clone).unwrap();
-            let uds_stream = UnixListenerStream::new(uds);
-
-            // spin up a new server
-            let mut server = tonic::transport::Server::builder();
-            let router = server.add_service(
-                crate::proto::path_info_service_server::PathInfoServiceServer::new(
-                    GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new(
-                        blob_service,
-                        directory_service,
-                    ))
-                        as Box<dyn PathInfoService>),
-                ),
-            );
-            router.serve_with_incoming(uds_stream).await
-        });
-
-        // wait for the socket to be created
-        Retry::spawn(
-            ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)),
-            || async {
-                if socket_path.exists() {
-                    Ok(())
-                } else {
-                    Err(())
-                }
-            },
-        )
-        .await
-        .expect("failed to wait for socket");
-
-        // prepare a client
-        let grpc_client = {
-            let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display()))
-                .expect("must parse");
-            let client = PathInfoServiceClient::new(
-                tvix_castore::tonic::channel_from_url(&url)
-                    .await
-                    .expect("must succeed"),
-            );
-
-            GRPCPathInfoService::from_client(client)
-        };
+    async fn test_valid_unix_path_ping_pong() {
+        let (_blob_service, _directory_service, path_info_service) =
+            make_grpc_path_info_service_client().await;
 
-        let path_info = grpc_client
-            .get(fixtures::DUMMY_OUTPUT_HASH)
+        let path_info = path_info_service
+            .get(fixtures::DUMMY_PATH_DIGEST)
             .await
             .expect("must not be error");