From 4e341fb5d915ea9e4ae1b8257972ef69437f3ed0 Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Sat, 20 Jan 2024 16:25:39 -0600 Subject: chore(tvix/store): Use BoxStream type alias The BoxStream type alias is a more concise and easier to read than the full `Pin + Send + ...>>` type. Change-Id: I5b7bccfd066ded5557e01f7895f4cf5c4a33bd44 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10677 Reviewed-by: flokli Tested-by: BuildkiteCI Autosubmit: Connor Brewster --- tvix/castore/src/directoryservice/grpc.rs | 5 ++--- tvix/castore/src/directoryservice/memory.rs | 5 ++--- tvix/castore/src/directoryservice/mod.rs | 5 ++--- tvix/castore/src/directoryservice/sled.rs | 5 ++--- tvix/castore/src/directoryservice/utils.rs | 7 +++---- tvix/castore/src/fs/root_nodes.rs | 8 ++++---- tvix/castore/src/proto/grpc_blobservice_wrapper.rs | 6 ++---- 7 files changed, 17 insertions(+), 24 deletions(-) (limited to 'tvix/castore') diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index c98708608e56..ad06cb17b668 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -1,11 +1,10 @@ use std::collections::HashSet; -use std::pin::Pin; use super::{DirectoryPutter, DirectoryService}; use crate::proto::{self, get_directory_request::ByWhat}; use crate::{B3Digest, Error}; use async_stream::try_stream; -use futures::Stream; +use futures::stream::BoxStream; use tokio::spawn; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; @@ -106,7 +105,7 @@ impl DirectoryService for GRPCDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Pin> + Send>> { + ) -> BoxStream> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest = root_directory_digest.clone(); diff --git a/tvix/castore/src/directoryservice/memory.rs b/tvix/castore/src/directoryservice/memory.rs index 3ba309f82927..528ffe2f2c03 100644 --- a/tvix/castore/src/directoryservice/memory.rs +++ b/tvix/castore/src/directoryservice/memory.rs @@ -1,7 +1,6 @@ use crate::{proto, B3Digest, Error}; -use futures::Stream; +use futures::stream::BoxStream; use std::collections::HashMap; -use std::pin::Pin; use std::sync::{Arc, RwLock}; use tonic::async_trait; use tracing::{instrument, warn}; @@ -73,7 +72,7 @@ impl DirectoryService for MemoryDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Pin> + Send>> { + ) -> BoxStream> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index a82c4d785dbd..db3d5767eadd 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -1,6 +1,5 @@ use crate::{proto, B3Digest, Error}; -use futures::Stream; -use std::pin::Pin; +use futures::stream::BoxStream; use tonic::async_trait; mod from_addr; @@ -44,7 +43,7 @@ pub trait DirectoryService: Send + Sync { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Pin> + Send>>; + ) -> BoxStream>; /// Allows persisting a closure of [proto::Directory], which is a graph of /// connected Directory messages. diff --git a/tvix/castore/src/directoryservice/sled.rs b/tvix/castore/src/directoryservice/sled.rs index 9e6749a753c2..9acd3854184b 100644 --- a/tvix/castore/src/directoryservice/sled.rs +++ b/tvix/castore/src/directoryservice/sled.rs @@ -1,10 +1,9 @@ use crate::directoryservice::DirectoryPutter; use crate::proto::Directory; use crate::{proto, B3Digest, Error}; -use futures::Stream; +use futures::stream::BoxStream; use prost::Message; use std::path::Path; -use std::pin::Pin; use tonic::async_trait; use tracing::{instrument, warn}; @@ -99,7 +98,7 @@ impl DirectoryService for SledDirectoryService { fn get_recursive( &self, root_directory_digest: &B3Digest, - ) -> Pin> + Send + 'static)>> { + ) -> BoxStream> { traverse_directory(self.clone(), root_directory_digest) } diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index ad9ce2535366..341705a8db9f 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -4,19 +4,18 @@ use crate::proto; use crate::B3Digest; use crate::Error; use async_stream::stream; -use futures::Stream; +use futures::stream::BoxStream; use std::collections::{HashSet, VecDeque}; -use std::pin::Pin; use tonic::async_trait; use tracing::warn; /// Traverses a [proto::Directory] from the root to the children. /// /// This is mostly BFS, but directories are only returned once. -pub fn traverse_directory( +pub fn traverse_directory<'a, DS: DirectoryService + 'static>( directory_service: DS, root_directory_digest: &B3Digest, -) -> Pin> + Send>> { +) -> BoxStream<'a, Result> { // The list of all directories that still need to be traversed. The next // element is picked from the front, new elements are enqueued at the // back. diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs index a603fd1b37d5..6609e049a1fc 100644 --- a/tvix/castore/src/fs/root_nodes.rs +++ b/tvix/castore/src/fs/root_nodes.rs @@ -1,8 +1,8 @@ -use std::{collections::BTreeMap, pin::Pin}; +use std::collections::BTreeMap; use crate::{proto::node::Node, Error}; use bytes::Bytes; -use futures::Stream; +use futures::stream::BoxStream; use tonic::async_trait; /// Provides an interface for looking up root nodes in tvix-castore by given @@ -15,7 +15,7 @@ pub trait RootNodes: Send + Sync { /// Lists all root CA nodes in the filesystem. An error can be returned /// in case listing is not allowed - fn list(&self) -> Pin> + Send + '_>>; + fn list(&self) -> BoxStream>; } #[async_trait] @@ -29,7 +29,7 @@ where Ok(self.as_ref().get(name).cloned()) } - fn list(&self) -> Pin> + Send + '_>> { + fn list(&self) -> BoxStream> { Box::pin(tokio_stream::iter( self.as_ref().iter().map(|(_, v)| Ok(v.clone())), )) diff --git a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs index 063f0421ddee..f8c2341689c6 100644 --- a/tvix/castore/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/castore/src/proto/grpc_blobservice_wrapper.rs @@ -1,11 +1,10 @@ use crate::blobservice::BlobService; use core::pin::pin; -use futures::TryFutureExt; +use futures::{stream::BoxStream, TryFutureExt}; use std::{ collections::VecDeque, io, ops::{Deref, DerefMut}, - pin::Pin, }; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; @@ -86,8 +85,7 @@ where T: Deref + Send + Sync + 'static, { // https://github.com/tokio-rs/tokio/issues/2723#issuecomment-1534723933 - type ReadStream = - Pin> + Send + 'static>>; + type ReadStream = BoxStream<'static, Result>; #[instrument(skip(self))] async fn stat( -- cgit 1.4.1