From 4e341fb5d915ea9e4ae1b8257972ef69437f3ed0 Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Sat, 20 Jan 2024 16:25:39 -0600 Subject: chore(tvix/store): Use BoxStream type alias The BoxStream type alias is a more concise and easier to read than the full `Pin + Send + ...>>` type. Change-Id: I5b7bccfd066ded5557e01f7895f4cf5c4a33bd44 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10677 Reviewed-by: flokli Tested-by: BuildkiteCI Autosubmit: Connor Brewster --- .../src/proto/grpc_pathinfoservice_wrapper.rs | 29 +++++++--------------- tvix/store/src/proto/tests/grpc_pathinfoservice.rs | 5 ++-- 2 files changed, 12 insertions(+), 22 deletions(-) (limited to 'tvix/store/src/proto') diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 19430aed381a..a5c5776cd4ef 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -1,12 +1,10 @@ use crate::nar::RenderError; use crate::pathinfoservice::PathInfoService; use crate::proto; -use futures::StreamExt; +use futures::{stream::BoxStream, TryStreamExt}; use std::ops::Deref; -use tokio::task; -use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Result, Status}; -use tracing::{debug, instrument, warn}; +use tracing::{instrument, warn}; use tvix_castore::proto as castorepb; pub struct GRPCPathInfoServiceWrapper { @@ -27,7 +25,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServic where PS: Deref + Send + Sync + 'static, { - type ListStream = ReceiverStream>; + type ListStream = BoxStream<'static, tonic::Result>; #[instrument(skip(self))] async fn get( @@ -95,22 +93,13 @@ where &self, _request: Request, ) -> Result, Status> { - let (tx, rx) = tokio::sync::mpsc::channel(5); + let stream = Box::pin( + self.inner + .list() + .map_err(|e| Status::internal(e.to_string())), + ); - let mut stream = self.inner.list(); - - let _task = task::spawn(async move { - while let Some(e) = stream.next().await { - let res = e.map_err(|e| Status::internal(e.to_string())); - if tx.send(res).await.is_err() { - debug!("receiver dropped"); - break; - } - } - }); - - let receiver_stream = ReceiverStream::new(rx); - Ok(Response::new(receiver_stream)) + Ok(Response::new(Box::pin(stream))) } } diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs index e8da7792cdb1..8016b9901d96 100644 --- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -7,8 +7,8 @@ use crate::tests::fixtures::DUMMY_OUTPUT_HASH; use crate::tests::utils::gen_blob_service; use crate::tests::utils::gen_directory_service; use crate::tests::utils::gen_pathinfo_service; +use futures::stream::BoxStream; use std::sync::Arc; -use tokio_stream::wrappers::ReceiverStream; use tonic::Request; use tvix_castore::proto as castorepb; @@ -18,7 +18,8 @@ use tvix_castore::proto as castorepb; /// It uses the NonCachingNARCalculationService NARCalculationService to /// calculate NARs. fn gen_grpc_service( -) -> Arc>>> { +) -> Arc>>> +{ let blob_service = gen_blob_service(); let directory_service = gen_directory_service(); Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service( -- cgit 1.4.1