about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs5
-rw-r--r--tvix/castore/src/directoryservice/memory.rs5
-rw-r--r--tvix/castore/src/directoryservice/mod.rs5
-rw-r--r--tvix/castore/src/directoryservice/sled.rs5
-rw-r--r--tvix/castore/src/directoryservice/utils.rs7
-rw-r--r--tvix/castore/src/fs/root_nodes.rs8
-rw-r--r--tvix/castore/src/proto/grpc_blobservice_wrapper.rs6
-rw-r--r--tvix/store/src/pathinfoservice/fs/mod.rs5
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs5
-rw-r--r--tvix/store/src/pathinfoservice/memory.rs5
-rw-r--r--tvix/store/src/pathinfoservice/mod.rs5
-rw-r--r--tvix/store/src/pathinfoservice/nix_http.rs9
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs7
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs29
-rw-r--r--tvix/store/src/proto/tests/grpc_pathinfoservice.rs5
15 files changed, 44 insertions, 67 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index c98708608e56..ad06cb17b668 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -1,11 +1,10 @@
 use std::collections::HashSet;
-use std::pin::Pin;
 
 use super::{DirectoryPutter, DirectoryService};
 use crate::proto::{self, get_directory_request::ByWhat};
 use crate::{B3Digest, Error};
 use async_stream::try_stream;
-use futures::Stream;
+use futures::stream::BoxStream;
 use tokio::spawn;
 use tokio::sync::mpsc::UnboundedSender;
 use tokio::task::JoinHandle;
@@ -106,7 +105,7 @@ impl DirectoryService for GRPCDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
+    ) -> BoxStream<Result<proto::Directory, Error>> {
         let mut grpc_client = self.grpc_client.clone();
         let root_directory_digest = root_directory_digest.clone();
 
diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs
index 3ba309f82927..528ffe2f2c03 100644
--- a/tvix/castore/src/directoryservice/memory.rs
+++ b/tvix/castore/src/directoryservice/memory.rs
@@ -1,7 +1,6 @@
 use crate::{proto, B3Digest, Error};
-use futures::Stream;
+use futures::stream::BoxStream;
 use std::collections::HashMap;
-use std::pin::Pin;
 use std::sync::{Arc, RwLock};
 use tonic::async_trait;
 use tracing::{instrument, warn};
@@ -73,7 +72,7 @@ impl DirectoryService for MemoryDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
+    ) -> BoxStream<Result<proto::Directory, Error>> {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
index a82c4d785dbd..db3d5767eadd 100644
--- a/tvix/castore/src/directoryservice/mod.rs
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -1,6 +1,5 @@
 use crate::{proto, B3Digest, Error};
-use futures::Stream;
-use std::pin::Pin;
+use futures::stream::BoxStream;
 use tonic::async_trait;
 
 mod from_addr;
@@ -44,7 +43,7 @@ pub trait DirectoryService: Send + Sync {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>>;
+    ) -> BoxStream<Result<proto::Directory, Error>>;
 
     /// Allows persisting a closure of [proto::Directory], which is a graph of
     /// connected Directory messages.
diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs
index 9e6749a753c2..9acd3854184b 100644
--- a/tvix/castore/src/directoryservice/sled.rs
+++ b/tvix/castore/src/directoryservice/sled.rs
@@ -1,10 +1,9 @@
 use crate::directoryservice::DirectoryPutter;
 use crate::proto::Directory;
 use crate::{proto, B3Digest, Error};
-use futures::Stream;
+use futures::stream::BoxStream;
 use prost::Message;
 use std::path::Path;
-use std::pin::Pin;
 use tonic::async_trait;
 use tracing::{instrument, warn};
 
@@ -99,7 +98,7 @@ impl DirectoryService for SledDirectoryService {
     fn get_recursive(
         &self,
         root_directory_digest: &B3Digest,
-    ) -> Pin<Box<(dyn Stream<Item = Result<proto::Directory, Error>> + Send + 'static)>> {
+    ) -> BoxStream<Result<proto::Directory, Error>> {
         traverse_directory(self.clone(), root_directory_digest)
     }
 
diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs
index ad9ce2535366..341705a8db9f 100644
--- a/tvix/castore/src/directoryservice/utils.rs
+++ b/tvix/castore/src/directoryservice/utils.rs
@@ -4,19 +4,18 @@ use crate::proto;
 use crate::B3Digest;
 use crate::Error;
 use async_stream::stream;
-use futures::Stream;
+use futures::stream::BoxStream;
 use std::collections::{HashSet, VecDeque};
-use std::pin::Pin;
 use tonic::async_trait;
 use tracing::warn;
 
 /// Traverses a [proto::Directory] from the root to the children.
 ///
 /// This is mostly BFS, but directories are only returned once.
-pub fn traverse_directory<DS: DirectoryService + 'static>(
+pub fn traverse_directory<'a, DS: DirectoryService + 'static>(
     directory_service: DS,
     root_directory_digest: &B3Digest,
-) -> Pin<Box<dyn Stream<Item = Result<proto::Directory, Error>> + Send>> {
+) -> BoxStream<'a, Result<proto::Directory, Error>> {
     // The list of all directories that still need to be traversed. The next
     // element is picked from the front, new elements are enqueued at the
     // back.
diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs
index a603fd1b37d5..6609e049a1fc 100644
--- a/tvix/castore/src/fs/root_nodes.rs
+++ b/tvix/castore/src/fs/root_nodes.rs
@@ -1,8 +1,8 @@
-use std::{collections::BTreeMap, pin::Pin};
+use std::collections::BTreeMap;
 
 use crate::{proto::node::Node, Error};
 use bytes::Bytes;
-use futures::Stream;
+use futures::stream::BoxStream;
 use tonic::async_trait;
 
 /// Provides an interface for looking up root nodes  in tvix-castore by given
@@ -15,7 +15,7 @@ pub trait RootNodes: Send + Sync {
 
     /// Lists all root CA nodes in the filesystem. An error can be returned
     /// in case listing is not allowed
-    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send + '_>>;
+    fn list(&self) -> BoxStream<Result<Node, Error>>;
 }
 
 #[async_trait]
@@ -29,7 +29,7 @@ where
         Ok(self.as_ref().get(name).cloned())
     }
 
-    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send + '_>> {
+    fn list(&self) -> BoxStream<Result<Node, Error>> {
         Box::pin(tokio_stream::iter(
             self.as_ref().iter().map(|(_, v)| Ok(v.clone())),
         ))
diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
index 063f0421ddee..f8c2341689c6 100644
--- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs
@@ -1,11 +1,10 @@
 use crate::blobservice::BlobService;
 use core::pin::pin;
-use futures::TryFutureExt;
+use futures::{stream::BoxStream, TryFutureExt};
 use std::{
     collections::VecDeque,
     io,
     ops::{Deref, DerefMut},
-    pin::Pin,
 };
 use tokio_stream::StreamExt;
 use tokio_util::io::ReaderStream;
@@ -86,8 +85,7 @@ where
     T: Deref<Target = dyn BlobService> + Send + Sync + 'static,
 {
     // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933
-    type ReadStream =
-        Pin<Box<dyn futures::Stream<Item = Result<super::BlobChunk, Status>> + Send + 'static>>;
+    type ReadStream = BoxStream<'static, Result<super::BlobChunk, Status>>;
 
     #[instrument(skip(self))]
     async fn stat(
diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs
index b7657d638402..45d59fd0bcb8 100644
--- a/tvix/store/src/pathinfoservice/fs/mod.rs
+++ b/tvix/store/src/pathinfoservice/fs/mod.rs
@@ -1,6 +1,5 @@
-use futures::Stream;
+use futures::stream::BoxStream;
 use futures::StreamExt;
-use std::pin::Pin;
 use tonic::async_trait;
 use tvix_castore::fs::{RootNodes, TvixStoreFs};
 use tvix_castore::proto as castorepb;
@@ -66,7 +65,7 @@ where
             }))
     }
 
-    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<castorepb::node::Node, Error>> + Send>> {
+    fn list(&self) -> BoxStream<Result<castorepb::node::Node, Error>> {
         Box::pin(self.0.as_ref().list().map(|result| {
             result.map(|path_info| {
                 path_info
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index 575491333142..4e15a5bb0b5b 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -1,8 +1,7 @@
 use super::PathInfoService;
 use crate::proto::{self, ListPathInfoRequest, PathInfo};
 use async_stream::try_stream;
-use futures::Stream;
-use std::pin::Pin;
+use futures::stream::BoxStream;
 use tonic::{async_trait, transport::Channel, Code};
 use tvix_castore::{proto as castorepb, Error};
 
@@ -87,7 +86,7 @@ impl PathInfoService for GRPCPathInfoService {
         Ok((path_info.nar_size, nar_sha256))
     }
 
-    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         let mut grpc_client = self.grpc_client.clone();
 
         let stream = try_stream! {
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
index 9f657a9c625b..f8435dbbf809 100644
--- a/tvix/store/src/pathinfoservice/memory.rs
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -1,9 +1,8 @@
 use super::PathInfoService;
 use crate::{nar::calculate_size_and_sha256, proto::PathInfo};
-use futures::{stream::iter, Stream};
+use futures::stream::{iter, BoxStream};
 use std::{
     collections::HashMap,
-    pin::Pin,
     sync::{Arc, RwLock},
 };
 use tonic::async_trait;
@@ -71,7 +70,7 @@ where
             .map_err(|e| Error::StorageError(e.to_string()))
     }
 
-    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         let db = self.db.read().unwrap();
 
         // Copy all elements into a list.
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
index 3bd0ef206998..55ada92b7e88 100644
--- a/tvix/store/src/pathinfoservice/mod.rs
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -7,8 +7,7 @@ mod sled;
 #[cfg(any(feature = "fuse", feature = "virtiofs"))]
 mod fs;
 
-use futures::Stream;
-use std::pin::Pin;
+use futures::stream::BoxStream;
 use tonic::async_trait;
 use tvix_castore::proto as castorepb;
 use tvix_castore::Error;
@@ -49,5 +48,5 @@ pub trait PathInfoService: Send + Sync {
     /// and the box allows different underlying stream implementations to be returned since
     /// Rust doesn't support this as a generic in traits yet. This is the same thing that
     /// [async_trait] generates, but for streams instead of futures.
-    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>>;
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>;
 }
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
index 4929355c6dde..7b4130fcae27 100644
--- a/tvix/store/src/pathinfoservice/nix_http.rs
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -1,10 +1,7 @@
-use std::{
-    io::{self, BufRead, Read, Write},
-    pin::Pin,
-};
+use std::io::{self, BufRead, Read, Write};
 
 use data_encoding::BASE64;
-use futures::{Stream, TryStreamExt};
+use futures::{stream::BoxStream, TryStreamExt};
 use nix_compat::{
     narinfo::{self, NarInfo},
     nixbase32,
@@ -270,7 +267,7 @@ where
         ))
     }
 
-    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         Box::pin(futures::stream::once(async {
             Err(Error::InvalidRequest(
                 "list not supported for this backend".to_string(),
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
index d4d2dedd0061..7b6d7fd7abcd 100644
--- a/tvix/store/src/pathinfoservice/sled.rs
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -1,9 +1,10 @@
 use super::PathInfoService;
 use crate::nar::calculate_size_and_sha256;
 use crate::proto::PathInfo;
-use futures::{stream::iter, Stream};
+use futures::stream::iter;
+use futures::stream::BoxStream;
 use prost::Message;
-use std::{path::Path, pin::Pin};
+use std::path::Path;
 use tonic::async_trait;
 use tracing::warn;
 use tvix_castore::proto as castorepb;
@@ -112,7 +113,7 @@ where
             .map_err(|e| Error::StorageError(e.to_string()))
     }
 
-    fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> {
+    fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>> {
         Box::pin(iter(self.db.iter().values().map(|v| match v {
             Ok(data) => {
                 // we retrieved some bytes
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index 19430aed381a..a5c5776cd4ef 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -1,12 +1,10 @@
 use crate::nar::RenderError;
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
-use futures::StreamExt;
+use futures::{stream::BoxStream, TryStreamExt};
 use std::ops::Deref;
-use tokio::task;
-use tokio_stream::wrappers::ReceiverStream;
 use tonic::{async_trait, Request, Response, Result, Status};
-use tracing::{debug, instrument, warn};
+use tracing::{instrument, warn};
 use tvix_castore::proto as castorepb;
 
 pub struct GRPCPathInfoServiceWrapper<PS> {
@@ -27,7 +25,7 @@ impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServic
 where
     PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
 {
-    type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>;
+    type ListStream = BoxStream<'static, tonic::Result<proto::PathInfo, Status>>;
 
     #[instrument(skip(self))]
     async fn get(
@@ -95,22 +93,13 @@ where
         &self,
         _request: Request<proto::ListPathInfoRequest>,
     ) -> Result<Response<Self::ListStream>, Status> {
-        let (tx, rx) = tokio::sync::mpsc::channel(5);
+        let stream = Box::pin(
+            self.inner
+                .list()
+                .map_err(|e| Status::internal(e.to_string())),
+        );
 
-        let mut stream = self.inner.list();
-
-        let _task = task::spawn(async move {
-            while let Some(e) = stream.next().await {
-                let res = e.map_err(|e| Status::internal(e.to_string()));
-                if tx.send(res).await.is_err() {
-                    debug!("receiver dropped");
-                    break;
-                }
-            }
-        });
-
-        let receiver_stream = ReceiverStream::new(rx);
-        Ok(Response::new(receiver_stream))
+        Ok(Response::new(Box::pin(stream)))
     }
 }
 
diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
index e8da7792cdb1..8016b9901d96 100644
--- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
+++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
@@ -7,8 +7,8 @@ use crate::tests::fixtures::DUMMY_OUTPUT_HASH;
 use crate::tests::utils::gen_blob_service;
 use crate::tests::utils::gen_directory_service;
 use crate::tests::utils::gen_pathinfo_service;
+use futures::stream::BoxStream;
 use std::sync::Arc;
-use tokio_stream::wrappers::ReceiverStream;
 use tonic::Request;
 use tvix_castore::proto as castorepb;
 
@@ -18,7 +18,8 @@ use tvix_castore::proto as castorepb;
 /// It uses the NonCachingNARCalculationService NARCalculationService to
 /// calculate NARs.
 fn gen_grpc_service(
-) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> {
+) -> Arc<dyn GRPCPathInfoService<ListStream = BoxStream<'static, Result<PathInfo, tonic::Status>>>>
+{
     let blob_service = gen_blob_service();
     let directory_service = gen_directory_service();
     Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service(