about summary refs log tree commit diff
path: root/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs')
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs30
1 files changed, 29 insertions, 1 deletions
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index bbf235f8ace9..16a2fd51d0df 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -2,11 +2,14 @@ use crate::nar::RenderError;
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
 use std::sync::Arc;
+use tokio::task;
+use tokio_stream::wrappers::ReceiverStream;
 use tonic::{async_trait, Request, Response, Result, Status};
-use tracing::{instrument, warn};
+use tracing::{debug, instrument, warn};
 
 pub struct GRPCPathInfoServiceWrapper {
     path_info_service: Arc<dyn PathInfoService>,
+    // FUTUREWORK: allow exposing without allowing listing
 }
 
 impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper {
@@ -19,6 +22,8 @@ impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper {
 
 #[async_trait]
 impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper {
+    type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>;
+
     #[instrument(skip(self))]
     async fn get(
         &self,
@@ -78,6 +83,29 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
             }
         }
     }
+
+    #[instrument(skip(self))]
+    async fn list(
+        &self,
+        _request: Request<proto::ListPathInfoRequest>,
+    ) -> Result<Response<Self::ListStream>, Status> {
+        let (tx, rx) = tokio::sync::mpsc::channel(5);
+
+        let path_info_service = self.path_info_service.clone();
+
+        let _task = task::spawn(async move {
+            for e in path_info_service.list() {
+                let res = e.map_err(|e| Status::internal(e.to_string()));
+                if tx.send(res).await.is_err() {
+                    debug!("receiver dropped");
+                    break;
+                }
+            }
+        });
+
+        let receiver_stream = ReceiverStream::new(rx);
+        Ok(Response::new(receiver_stream))
+    }
 }
 
 impl From<RenderError> for tonic::Status {