about summary refs log tree commit diff
path: root/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs')
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs29
1 files changed, 9 insertions, 20 deletions
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index 19430aed381a..a5c5776cd4ef 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -1,12 +1,10 @@
 use crate::nar::RenderError;
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
-use futures::StreamExt;
+use futures::{stream::BoxStream, TryStreamExt};
 use std::ops::Deref;
-use tokio::task;
-use tokio_stream::wrappers::ReceiverStream;
 use tonic::{async_trait, Request, Response, Result, Status};
-use tracing::{debug, instrument, warn};
+use tracing::{instrument, warn};
 use tvix_castore::proto as castorepb;
 
 pub struct GRPCPathInfoServiceWrapper<PS> {
@@ -27,7 +25,7 @@ impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServic
 where
     PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
 {
-    type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>;
+    type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>;
 
     #[instrument(skip(self))]
     async fn get(
@@ -95,22 +93,13 @@ where
         &self,
         _request: Request<proto::ListPathInfoRequest>,
     ) -> Result<Response<Self::ListStream>, Status> {
-        let (tx, rx) = tokio::sync::mpsc::channel(5);
+        let stream = Box::pin(
+            self.inner
+                .list()
+                .map_err(|e| Status::internal(e.to_string())),
+        );
 
-        let mut stream = self.inner.list();
-
-        let _task = task::spawn(async move {
-            while let Some(e) = stream.next().await {
-                let res = e.map_err(|e| Status::internal(e.to_string()));
-                if tx.send(res).await.is_err() {
-                    debug!("receiver dropped");
-                    break;
-                }
-            }
-        });
-
-        let receiver_stream = ReceiverStream::new(rx);
-        Ok(Response::new(receiver_stream))
+        Ok(Response::new(Box::pin(stream)))
     }
 }