about summary refs log tree commit diff
path: root/tvix/store/src/proto
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/proto')
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs30
-rw-r--r--tvix/store/src/proto/tests/grpc_pathinfoservice.rs4
2 files changed, 32 insertions, 2 deletions
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index bbf235f8ace9..16a2fd51d0df 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -2,11 +2,14 @@ use crate::nar::RenderError;
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
 use std::sync::Arc;
+use tokio::task;
+use tokio_stream::wrappers::ReceiverStream;
 use tonic::{async_trait, Request, Response, Result, Status};
-use tracing::{instrument, warn};
+use tracing::{debug, instrument, warn};
 
 pub struct GRPCPathInfoServiceWrapper {
     path_info_service: Arc<dyn PathInfoService>,
+    // FUTUREWORK: allow exposing without allowing listing
 }
 
 impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper {
@@ -19,6 +22,8 @@ impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper {
 
 #[async_trait]
 impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper {
+    type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>;
+
     #[instrument(skip(self))]
     async fn get(
         &self,
@@ -78,6 +83,29 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
             }
         }
     }
+
+    #[instrument(skip(self))]
+    async fn list(
+        &self,
+        _request: Request<proto::ListPathInfoRequest>,
+    ) -> Result<Response<Self::ListStream>, Status> {
+        let (tx, rx) = tokio::sync::mpsc::channel(5);
+
+        let path_info_service = self.path_info_service.clone();
+
+        let _task = task::spawn(async move {
+            for e in path_info_service.list() {
+                let res = e.map_err(|e| Status::internal(e.to_string()));
+                if tx.send(res).await.is_err() {
+                    debug!("receiver dropped");
+                    break;
+                }
+            }
+        });
+
+        let receiver_stream = ReceiverStream::new(rx);
+        Ok(Response::new(receiver_stream))
+    }
 }
 
 impl From<RenderError> for tonic::Status {
diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
index 95d97bb12890..114e89cacc10 100644
--- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
+++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
@@ -9,6 +9,7 @@ use crate::tests::utils::gen_blob_service;
 use crate::tests::utils::gen_directory_service;
 use crate::tests::utils::gen_pathinfo_service;
 use std::sync::Arc;
+use tokio_stream::wrappers::ReceiverStream;
 use tonic::Request;
 
 /// generates a GRPCPathInfoService out of blob, directory and pathinfo services.
@@ -16,7 +17,8 @@ use tonic::Request;
 /// We only interact with it via the PathInfo GRPC interface.
 /// It uses the NonCachingNARCalculationService NARCalculationService to
 /// calculate NARs.
-fn gen_grpc_service() -> Arc<dyn GRPCPathInfoService> {
+fn gen_grpc_service(
+) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> {
     let blob_service = gen_blob_service();
     let directory_service = gen_directory_service();
     Arc::new(GRPCPathInfoServiceWrapper::from(gen_pathinfo_service(