about summary refs log tree commit diff
path: root/tvix/store/src/directoryservice/grpc.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-06-09T09·26+0300
committerclbot <clbot@tvl.fyi>2023-06-12T10·24+0000
commit7725eb53ad67730e92a3839a6c10925c668e5586 (patch)
tree82b8abf8e52630039d2a0cd3ae8b251c32e863bd /tvix/store/src/directoryservice/grpc.rs
parent6f85dbfc06c4fa96deb968cfeb7e98ba36e95043 (diff)
refactor(tvix/store): use Box<dyn DirectoryService> r/6272
Once we support configuring services at runtime, we don't know what
DirectoryService we're using at compile time.

This also means, we can't explicitly use the is_closed method from
GRPCPutter, without making it part of the DirectoryPutter itself.

Change-Id: Icd2a1ec4fc5649a6cd15c9cc7db4c2b473630431
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8727
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/directoryservice/grpc.rs')
-rw-r--r--tvix/store/src/directoryservice/grpc.rs36
1 files changed, 17 insertions, 19 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs
index 1b33572cf7de..3b1a7906f7d0 100644
--- a/tvix/store/src/directoryservice/grpc.rs
+++ b/tvix/store/src/directoryservice/grpc.rs
@@ -46,8 +46,6 @@ impl GRPCDirectoryService {
 }
 
 impl DirectoryService for GRPCDirectoryService {
-    type DirectoriesIterator = StreamIterator;
-
     fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
@@ -113,7 +111,10 @@ impl DirectoryService for GRPCDirectoryService {
     }
 
     #[instrument(skip_all, fields(directory.digest = %root_directory_digest))]
-    fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator {
+    fn get_recursive(
+        &self,
+        root_directory_digest: &B3Digest,
+    ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> {
         let mut grpc_client = self.grpc_client.clone();
 
         let root_directory_digest_as_vec = root_directory_digest.to_vec();
@@ -132,17 +133,15 @@ impl DirectoryService for GRPCDirectoryService {
 
         let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
 
-        StreamIterator::new(
+        Box::new(StreamIterator::new(
             self.tokio_handle.clone(),
             root_directory_digest.clone(),
             stream,
-        )
+        ))
     }
 
-    type DirectoryPutter = GRPCPutter;
-
     #[instrument(skip_all)]
-    fn put_multiple_start(&self) -> Self::DirectoryPutter
+    fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)>
     where
         Self: Clone,
     {
@@ -160,7 +159,7 @@ impl DirectoryService for GRPCDirectoryService {
                 Ok(s)
             });
 
-        GRPCPutter::new(self.tokio_handle.clone(), tx, task)
+        Box::new(GRPCPutter::new(self.tokio_handle.clone(), tx, task))
     }
 }
 
@@ -276,15 +275,6 @@ impl GRPCPutter {
             rq: Some((task, directory_sender)),
         }
     }
-
-    #[allow(dead_code)]
-    // allows checking if the tx part of the channel is closed.
-    fn is_closed(&self) -> bool {
-        match self.rq {
-            None => true,
-            Some((_, ref directory_sender)) => directory_sender.is_closed(),
-        }
-    }
 }
 
 impl DirectoryPutter for GRPCPutter {
@@ -329,6 +319,14 @@ impl DirectoryPutter for GRPCPutter {
             }
         }
     }
+
+    // allows checking if the tx part of the channel is closed.
+    fn is_closed(&self) -> bool {
+        match self.rq {
+            None => true,
+            Some((_, ref directory_sender)) => directory_sender.is_closed(),
+        }
+    }
 }
 
 #[cfg(test)]
@@ -342,7 +340,7 @@ mod tests {
     use tonic::transport::{Endpoint, Server, Uri};
 
     use crate::{
-        directoryservice::{DirectoryPutter, DirectoryService},
+        directoryservice::DirectoryService,
         proto,
         proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper},
         tests::{