about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice/grpc.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-09-03T14·09+0300
committerclbot <clbot@tvl.fyi>2023-09-05T20·46+0000
commitda9d706e0a5e4e37087e4841a8fc8edf0da35e77 (patch)
treed6de951e1458bc0a56aff5a1feb463ee6eeb9592 /tvix/store/src/pathinfoservice/grpc.rs
parente41b5ae3f0cd0814367f1e2f4f256669c849fde7 (diff)
feat(tvix/store/pathinfosvc): provide listing r/6555
This provides an additional method in the PathInfoService trait, as
well as an RPC method on the gRPC layer to list all PathInfo objects in
a PathInfoService.

Change-Id: I7378f6bbd334bd6ac4e9be92505bd099a1c2b19a
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9216
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/pathinfoservice/grpc.rs')
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs64
1 files changed, 62 insertions, 2 deletions
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index c98a89c4b8e7..2bd766697bd1 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -1,8 +1,12 @@
 use super::PathInfoService;
-use crate::{blobservice::BlobService, directoryservice::DirectoryService, proto};
+use crate::{
+    blobservice::BlobService,
+    directoryservice::DirectoryService,
+    proto::{self, ListPathInfoRequest},
+};
 use std::sync::Arc;
 use tokio::net::UnixStream;
-use tonic::{transport::Channel, Code, Status};
+use tonic::{transport::Channel, Code, Status, Streaming};
 
 /// Connects to a (remote) tvix-store PathInfoService over gRPC.
 #[derive(Clone)]
@@ -160,6 +164,62 @@ impl PathInfoService for GRPCPathInfoService {
 
         Ok((resp.nar_size, nar_sha256))
     }
+
+    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, crate::Error>> + Send> {
+        // Get a new handle to the gRPC client.
+        let mut grpc_client = self.grpc_client.clone();
+
+        let task: tokio::task::JoinHandle<Result<_, Status>> =
+            self.tokio_handle.spawn(async move {
+                let s = grpc_client
+                    .list(ListPathInfoRequest::default())
+                    .await?
+                    .into_inner();
+
+                Ok(s)
+            });
+
+        let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
+
+        Box::new(StreamIterator::new(self.tokio_handle.clone(), stream))
+    }
+}
+
+pub struct StreamIterator {
+    tokio_handle: tokio::runtime::Handle,
+    stream: Streaming<proto::PathInfo>,
+}
+
+impl StreamIterator {
+    pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming<proto::PathInfo>) -> Self {
+        Self {
+            tokio_handle,
+            stream,
+        }
+    }
+}
+
+impl Iterator for StreamIterator {
+    type Item = Result<proto::PathInfo, crate::Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        match self.tokio_handle.block_on(self.stream.message()) {
+            Ok(o) => match o {
+                Some(pathinfo) => {
+                    // validate the pathinfo
+                    if let Err(e) = pathinfo.validate() {
+                        return Some(Err(crate::Error::StorageError(format!(
+                            "pathinfo {:?} failed validation: {}",
+                            pathinfo, e
+                        ))));
+                    }
+                    Some(Ok(pathinfo))
+                }
+                None => None,
+            },
+            Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))),
+        }
+    }
 }
 
 #[cfg(test)]