about summary refs log tree commit diff
path: root/tvix/store/src/proto
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2024-01-20T22·25-0600
committerclbot <clbot@tvl.fyi>2024-01-21T19·41+0000
commit4e341fb5d915ea9e4ae1b8257972ef69437f3ed0 (patch)
tree6aeb960ced3e1c05edbc0c650ca608508cf0d920 /tvix/store/src/proto
parent56ba7a72d80bc050ef6a7d9031306ee0ccbf8e0a (diff)
chore(tvix/store): Use BoxStream type alias r/7435
The BoxStream type alias is a more concise and easier to read than
the full `Pin<Box<dyn Stream<Item = ...> + Send + ...>>` type.

Change-Id: I5b7bccfd066ded5557e01f7895f4cf5c4a33bd44
Reviewed-on: https://cl.tvl.fyi/c/depot/+/10677
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Autosubmit: Connor Brewster <cbrewster@hey.com>
Diffstat (limited to 'tvix/store/src/proto')
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs29
-rw-r--r--tvix/store/src/proto/tests/grpc_pathinfoservice.rs5
2 files changed, 12 insertions, 22 deletions
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index 19430aed38..a5c5776cd4 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -1,12 +1,10 @@
 use crate::nar::RenderError;
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
-use futures::StreamExt;
+use futures::{stream::BoxStream, TryStreamExt};
 use std::ops::Deref;
-use tokio::task;
-use tokio_stream::wrappers::ReceiverStream;
 use tonic::{async_trait, Request, Response, Result, Status};
-use tracing::{debug, instrument, warn};
+use tracing::{instrument, warn};
 use tvix_castore::proto as castorepb;
 
 pub struct GRPCPathInfoServiceWrapper<PS> {
@@ -27,7 +25,7 @@ impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServic
 where
     PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
 {
-    type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>;
+    type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>;
 
     #[instrument(skip(self))]
     async fn get(
@@ -95,22 +93,13 @@ where
         &self,
         _request: Request<proto::ListPathInfoRequest>,
     ) -> Result<Response<Self::ListStream>, Status> {
-        let (tx, rx) = tokio::sync::mpsc::channel(5);
+        let stream = Box::pin(
+            self.inner
+                .list()
+                .map_err(|e| Status::internal(e.to_string())),
+        );
 
-        let mut stream = self.inner.list();
-
-        let _task = task::spawn(async move {
-            while let Some(e) = stream.next().await {
-                let res = e.map_err(|e| Status::internal(e.to_string()));
-                if tx.send(res).await.is_err() {
-                    debug!("receiver dropped");
-                    break;
-                }
-            }
-        });
-
-        let receiver_stream = ReceiverStream::new(rx);
-        Ok(Response::new(receiver_stream))
+        Ok(Response::new(Box::pin(stream)))
     }
 }
 
diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
index e8da7792cd..8016b9901d 100644
--- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
+++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
@@ -7,8 +7,8 @@ use crate::tests::fixtures::DUMMY_OUTPUT_HASH;
 use crate::tests::utils::gen_blob_service;
 use crate::tests::utils::gen_directory_service;
 use crate::tests::utils::gen_pathinfo_service;
+use futures::stream::BoxStream;
 use std::sync::Arc;
-use tokio_stream::wrappers::ReceiverStream;
 use tonic::Request;
 use tvix_castore::proto as castorepb;
 
@@ -18,7 +18,8 @@ use tvix_castore::proto as castorepb;
 /// It uses the NonCachingNARCalculationService NARCalculationService to
 /// calculate NARs.
 fn gen_grpc_service(
-) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> {
+) -> Arc<dyn GRPCPathInfoService<ListStream = BoxStream<'static, Result<PathInfo, tonic::Status>>>>
+{
     let blob_service = gen_blob_service();
     let directory_service = gen_directory_service();
     Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service(