about summary refs log tree commit diff
path: root/tvix/store/src/proto
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/proto')
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs29
-rw-r--r--tvix/store/src/proto/tests/grpc_pathinfoservice.rs5
2 files changed, 12 insertions, 22 deletions
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index 19430aed381a..a5c5776cd4ef 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -1,12 +1,10 @@
 use crate::nar::RenderError;
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
-use futures::StreamExt;
+use futures::{stream::BoxStream, TryStreamExt};
 use std::ops::Deref;
-use tokio::task;
-use tokio_stream::wrappers::ReceiverStream;
 use tonic::{async_trait, Request, Response, Result, Status};
-use tracing::{debug, instrument, warn};
+use tracing::{instrument, warn};
 use tvix_castore::proto as castorepb;
 
 pub struct GRPCPathInfoServiceWrapper<PS> {
@@ -27,7 +25,7 @@ impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServic
 where
     PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
 {
-    type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>;
+    type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>;
 
     #[instrument(skip(self))]
     async fn get(
@@ -95,22 +93,13 @@ where
         &self,
         _request: Request<proto::ListPathInfoRequest>,
     ) -> Result<Response<Self::ListStream>, Status> {
-        let (tx, rx) = tokio::sync::mpsc::channel(5);
+        let stream = Box::pin(
+            self.inner
+                .list()
+                .map_err(|e| Status::internal(e.to_string())),
+        );
 
-        let mut stream = self.inner.list();
-
-        let _task = task::spawn(async move {
-            while let Some(e) = stream.next().await {
-                let res = e.map_err(|e| Status::internal(e.to_string()));
-                if tx.send(res).await.is_err() {
-                    debug!("receiver dropped");
-                    break;
-                }
-            }
-        });
-
-        let receiver_stream = ReceiverStream::new(rx);
-        Ok(Response::new(receiver_stream))
+        Ok(Response::new(Box::pin(stream)))
     }
 }
 
diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
index e8da7792cdb1..8016b9901d96 100644
--- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
+++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
@@ -7,8 +7,8 @@ use crate::tests::fixtures::DUMMY_OUTPUT_HASH;
 use crate::tests::utils::gen_blob_service;
 use crate::tests::utils::gen_directory_service;
 use crate::tests::utils::gen_pathinfo_service;
+use futures::stream::BoxStream;
 use std::sync::Arc;
-use tokio_stream::wrappers::ReceiverStream;
 use tonic::Request;
 use tvix_castore::proto as castorepb;
 
@@ -18,7 +18,8 @@ use tvix_castore::proto as castorepb;
 /// It uses the NonCachingNARCalculationService NARCalculationService to
 /// calculate NARs.
 fn gen_grpc_service(
-) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> {
+) -> Arc<dyn GRPCPathInfoService<ListStream = BoxStream<'static, Result<PathInfo, tonic::Status>>>>
+{
     let blob_service = gen_blob_service();
     let directory_service = gen_directory_service();
     Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service(