diff options
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 5 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/memory.rs | 5 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/mod.rs | 5 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/sled.rs | 5 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/utils.rs | 7 | ||||
-rw-r--r-- | tvix/castore/src/fs/root_nodes.rs | 8 | ||||
-rw-r--r-- | tvix/castore/src/proto/grpc_blobservice_wrapper.rs | 6 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/fs/mod.rs | 5 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 5 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 5 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 5 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/nix_http.rs | 9 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 7 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs | 29 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_pathinfoservice.rs | 5 |
15 files changed, 44 insertions, 67 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index c98708608e56..ad06cb17b668 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,11 +1,10 @@ use std::collections::HashSet; -use std::pin::Pin; use super::{DirectoryPutter, DirectoryService}; use crate::proto::{self, get_directory_request::ByWhat}; use crate::{B3Digest, Error}; use async_stream::try_stream; -use futures::Stream; +use futures::stream::BoxStream; use tokio::spawn; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; @@ -106,7 +105,7 @@ impl DirectoryService for GRPCDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + ) -> BoxStream<Result<proto::Directory, Error>> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest = root_directory_digest.clone(); diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index 3ba309f82927..528ffe2f2c03 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -1,7 +1,6 @@ use crate::{proto, B3Digest, Error}; -use futures::Stream; +use futures::stream::BoxStream; use std::collections::HashMap; -use std::pin::Pin; use std::sync::{Arc, RwLock}; use tonic::async_trait; use tracing::{instrument, warn}; @@ -73,7 +72,7 @@ impl DirectoryService for MemoryDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { + ) -> BoxStream<Result<proto::Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index a82c4d785dbd..db3d5767eadd 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -1,6 +1,5 @@ use crate::{proto, B3Digest, Error}; -use futures::Stream; -use std::pin::Pin; +use futures::stream::BoxStream; use tonic::async_trait; mod from_addr; @@ -44,7 +43,7 @@ pub trait DirectoryService: Send + Sync { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>>; + ) -> BoxStream<Result<proto::Directory, Error>>; /// Allows persisting a closure of [proto::Directory], which is a graph of /// connected Directory messages. diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index 9e6749a753c2..9acd3854184b 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -1,10 +1,9 @@ use crate::directoryservice::DirectoryPutter; use crate::proto::Directory; use crate::{proto, B3Digest, Error}; -use futures::Stream; +use futures::stream::BoxStream; use prost::Message; use std::path::Path; -use std::pin::Pin; use tonic::async_trait; use tracing::{instrument, warn}; @@ -99,7 +98,7 @@ impl DirectoryService for SledDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Pin<Box<(dyn Stream<Item = Result<proto::Directory, Error>> + Send + 'static)>> { + ) -> BoxStream<Result<proto::Directory, Error>> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index ad9ce2535366..341705a8db9f 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -4,19 +4,18 @@ use crate::proto; use crate::B3Digest; use crate::Error; use async_stream::stream; -use futures::Stream; +use futures::stream::BoxStream; use std::collections::{HashSet, VecDeque}; -use std::pin::Pin; use tonic::async_trait; use tracing::warn; /// Traverses a [proto::Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. -pub fn traverse_directory<DS: DirectoryService + 'static>( +pub fn traverse_directory<'a, DS: DirectoryService + 'static>( directory_service: DS, root_directory_digest: &B3Digest, -) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> { +) -> BoxStream<'a, Result<proto::Directory, Error>> { // The list of all directories that still need to be traversed. The next // element is picked from the front, new elements are enqueued at the // back. diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs index a603fd1b37d5..6609e049a1fc 100644 --- a/tvix/castore/src/fs/root_nodes.rs +++ b/tvix/castore/src/fs/root_nodes.rs @@ -1,8 +1,8 @@ -use std::{collections::BTreeMap, pin::Pin}; +use std::collections::BTreeMap; use crate::{proto::node::Node, Error}; use bytes::Bytes; -use futures::Stream; +use futures::stream::BoxStream; use tonic::async_trait; /// Provides an interface for looking up root nodes in tvix-castore by given @@ -15,7 +15,7 @@ pub trait RootNodes: Send + Sync { /// Lists all root CA nodes in the filesystem. An error can be returned /// in case listing is not allowed - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send + '_>>; + fn list(&self) -> BoxStream<Result<Node, Error>>; } #[async_trait] @@ -29,7 +29,7 @@ where Ok(self.as_ref().get(name).cloned()) } - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send + '_>> { + fn list(&self) -> BoxStream<Result<Node, Error>> { Box::pin(tokio_stream::iter( self.as_ref().iter().map(|(_, v)| Ok(v.clone())), )) diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs index 063f0421ddee..f8c2341689c6 100644 --- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs @@ -1,11 +1,10 @@ use crate::blobservice::BlobService; use core::pin::pin; -use futures::TryFutureExt; +use futures::{stream::BoxStream, TryFutureExt}; use std::{ collections::VecDeque, io, ops::{Deref, DerefMut}, - pin::Pin, }; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; @@ -86,8 +85,7 @@ where T: Deref<Target = dyn BlobService> + Send + Sync + 'static, { // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 - type ReadStream = - Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>; + type ReadStream = BoxStream<'static, Result<super::BlobChunk, Status>>; #[instrument(skip(self))] async fn stat( diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs index b7657d638402..45d59fd0bcb8 100644 --- a/tvix/store/src/pathinfoservice/fs/mod.rs +++ b/tvix/store/src/pathinfoservice/fs/mod.rs @@ -1,6 +1,5 @@ -use futures::Stream; +use futures::stream::BoxStream; use futures::StreamExt; -use std::pin::Pin; use tonic::async_trait; use tvix_castore::fs::{RootNodes, TvixStoreFs}; use tvix_castore::proto as castorepb; @@ -66,7 +65,7 @@ where })) } - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<castorepb::node::Node, Error>> + Send>> { + fn list(&self) -> BoxStream<Result<castorepb::node::Node, Error>> { Box::pin(self.0.as_ref().list().map(|result| { result.map(|path_info| { path_info diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 575491333142..4e15a5bb0b5b 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -1,8 +1,7 @@ use super::PathInfoService; use crate::proto::{self, ListPathInfoRequest, PathInfo}; use async_stream::try_stream; -use futures::Stream; -use std::pin::Pin; +use futures::stream::BoxStream; use tonic::{async_trait, transport::Channel, Code}; use tvix_castore::{proto as castorepb, Error}; @@ -87,7 +86,7 @@ impl PathInfoService for GRPCPathInfoService { Ok((path_info.nar_size, nar_sha256)) } - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { let mut grpc_client = self.grpc_client.clone(); let stream = try_stream! { diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index 9f657a9c625b..f8435dbbf809 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -1,9 +1,8 @@ use super::PathInfoService; use crate::{nar::calculate_size_and_sha256, proto::PathInfo}; -use futures::{stream::iter, Stream}; +use futures::stream::{iter, BoxStream}; use std::{ collections::HashMap, - pin::Pin, sync::{Arc, RwLock}, }; use tonic::async_trait; @@ -71,7 +70,7 @@ where .map_err(|e| Error::StorageError(e.to_string())) } - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { let db = self.db.read().unwrap(); // Copy all elements into a list. diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index 3bd0ef206998..55ada92b7e88 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -7,8 +7,7 @@ mod sled; #[cfg(any(feature = "fuse", feature = "virtiofs"))] mod fs; -use futures::Stream; -use std::pin::Pin; +use futures::stream::BoxStream; use tonic::async_trait; use tvix_castore::proto as castorepb; use tvix_castore::Error; @@ -49,5 +48,5 @@ pub trait PathInfoService: Send + Sync { /// and the box allows different underlying stream implementations to be returned since /// Rust doesn't support this as a generic in traits yet. This is the same thing that /// [async_trait] generates, but for streams instead of futures. - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>>; + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>; } diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs index 4929355c6dde..7b4130fcae27 100644 --- a/tvix/store/src/pathinfoservice/nix_http.rs +++ b/tvix/store/src/pathinfoservice/nix_http.rs @@ -1,10 +1,7 @@ -use std::{ - io::{self, BufRead, Read, Write}, - pin::Pin, -}; +use std::io::{self, BufRead, Read, Write}; use data_encoding::BASE64; -use futures::{Stream, TryStreamExt}; +use futures::{stream::BoxStream, TryStreamExt}; use nix_compat::{ narinfo::{self, NarInfo}, nixbase32, @@ -270,7 +267,7 @@ where )) } - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { Box::pin(futures::stream::once(async { Err(Error::InvalidRequest( "list not supported for this backend".to_string(), diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index d4d2dedd0061..7b6d7fd7abcd 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -1,9 +1,10 @@ use super::PathInfoService; use crate::nar::calculate_size_and_sha256; use crate::proto::PathInfo; -use futures::{stream::iter, Stream}; +use futures::stream::iter; +use futures::stream::BoxStream; use prost::Message; -use std::{path::Path, pin::Pin}; +use std::path::Path; use tonic::async_trait; use tracing::warn; use tvix_castore::proto as castorepb; @@ -112,7 +113,7 @@ where .map_err(|e| Error::StorageError(e.to_string())) } - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { + fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> { Box::pin(iter(self.db.iter().values().map(|v| match v { Ok(data) => { // we retrieved some bytes diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 19430aed381a..a5c5776cd4ef 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -1,12 +1,10 @@ use crate::nar::RenderError; use crate::pathinfoservice::PathInfoService; use crate::proto; -use futures::StreamExt; +use futures::{stream::BoxStream, TryStreamExt}; use std::ops::Deref; -use tokio::task; -use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Result, Status}; -use tracing::{debug, instrument, warn}; +use tracing::{instrument, warn}; use tvix_castore::proto as castorepb; pub struct GRPCPathInfoServiceWrapper<PS> { @@ -27,7 +25,7 @@ impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServic where PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static, { - type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>; + type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>; #[instrument(skip(self))] async fn get( @@ -95,22 +93,13 @@ where &self, _request: Request<proto::ListPathInfoRequest>, ) -> Result<Response<Self::ListStream>, Status> { - let (tx, rx) = tokio::sync::mpsc::channel(5); + let stream = Box::pin( + self.inner + .list() + .map_err(|e| Status::internal(e.to_string())), + ); - let mut stream = self.inner.list(); - - let _task = task::spawn(async move { - while let Some(e) = stream.next().await { - let res = e.map_err(|e| Status::internal(e.to_string())); - if tx.send(res).await.is_err() { - debug!("receiver dropped"); - break; - } - } - }); - - let receiver_stream = ReceiverStream::new(rx); - Ok(Response::new(receiver_stream)) + Ok(Response::new(Box::pin(stream))) } } diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs index e8da7792cdb1..8016b9901d96 100644 --- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -7,8 +7,8 @@ use crate::tests::fixtures::DUMMY_OUTPUT_HASH; use crate::tests::utils::gen_blob_service; use crate::tests::utils::gen_directory_service; use crate::tests::utils::gen_pathinfo_service; +use futures::stream::BoxStream; use std::sync::Arc; -use tokio_stream::wrappers::ReceiverStream; use tonic::Request; use tvix_castore::proto as castorepb; @@ -18,7 +18,8 @@ use tvix_castore::proto as castorepb; /// It uses the NonCachingNARCalculationService NARCalculationService to /// calculate NARs. fn gen_grpc_service( -) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> { +) -> Arc<dyn GRPCPathInfoService<ListStream = BoxStream<'static, Result<PathInfo, tonic::Status>>>> +{ let blob_service = gen_blob_service(); let directory_service = gen_directory_service(); Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service( |