diff options
Diffstat (limited to 'tvix/store/src/directoryservice/grpc.rs')
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 36 |
1 files changed, 17 insertions, 19 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index 1b33572cf7de..3b1a7906f7d0 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -46,8 +46,6 @@ impl GRPCDirectoryService { } impl DirectoryService for GRPCDirectoryService { - type DirectoriesIterator = StreamIterator; - fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); @@ -113,7 +111,10 @@ impl DirectoryService for GRPCDirectoryService { } #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest_as_vec = root_directory_digest.to_vec(); @@ -132,17 +133,15 @@ impl DirectoryService for GRPCDirectoryService { let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); - StreamIterator::new( + Box::new(StreamIterator::new( self.tokio_handle.clone(), root_directory_digest.clone(), stream, - ) + )) } - type DirectoryPutter = GRPCPutter; - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Self::DirectoryPutter + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, { @@ -160,7 +159,7 @@ impl DirectoryService for GRPCDirectoryService { Ok(s) }); - GRPCPutter::new(self.tokio_handle.clone(), tx, task) + Box::new(GRPCPutter::new(self.tokio_handle.clone(), tx, task)) } } @@ -276,15 +275,6 @@ impl GRPCPutter { rq: Some((task, directory_sender)), } } - - #[allow(dead_code)] - // allows checking if the tx part of the channel is closed. - fn is_closed(&self) -> bool { - match self.rq { - None => true, - Some((_, ref directory_sender)) => directory_sender.is_closed(), - } - } } impl DirectoryPutter for GRPCPutter { @@ -329,6 +319,14 @@ impl DirectoryPutter for GRPCPutter { } } } + + // allows checking if the tx part of the channel is closed. + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } } #[cfg(test)] @@ -342,7 +340,7 @@ mod tests { use tonic::transport::{Endpoint, Server, Uri}; use crate::{ - directoryservice::{DirectoryPutter, DirectoryService}, + directoryservice::DirectoryService, proto, proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, tests::{ |