diff options
author | Florian Klink <flokli@flokli.de> | 2023-09-03T14·09+0300 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-09-05T20·46+0000 |
commit | da9d706e0a5e4e37087e4841a8fc8edf0da35e77 (patch) | |
tree | d6de951e1458bc0a56aff5a1feb463ee6eeb9592 /tvix/store/src/pathinfoservice/grpc.rs | |
parent | e41b5ae3f0cd0814367f1e2f4f256669c849fde7 (diff) |
feat(tvix/store/pathinfosvc): provide listing r/6555
This provides an additional method in the PathInfoService trait, as well as an RPC method on the gRPC layer to list all PathInfo objects in a PathInfoService. Change-Id: I7378f6bbd334bd6ac4e9be92505bd099a1c2b19a Reviewed-on: https://cl.tvl.fyi/c/depot/+/9216 Reviewed-by: tazjin <tazjin@tvl.su> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/pathinfoservice/grpc.rs')
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 64 |
1 files changed, 62 insertions, 2 deletions
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index c98a89c4b8e7..2bd766697bd1 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -1,8 +1,12 @@ use super::PathInfoService; -use crate::{blobservice::BlobService, directoryservice::DirectoryService, proto}; +use crate::{ + blobservice::BlobService, + directoryservice::DirectoryService, + proto::{self, ListPathInfoRequest}, +}; use std::sync::Arc; use tokio::net::UnixStream; -use tonic::{transport::Channel, Code, Status}; +use tonic::{transport::Channel, Code, Status, Streaming}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] @@ -160,6 +164,62 @@ impl PathInfoService for GRPCPathInfoService { Ok((resp.nar_size, nar_sha256)) } + + fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, crate::Error>> + Send> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + + let task: tokio::task::JoinHandle<Result<_, Status>> = + self.tokio_handle.spawn(async move { + let s = grpc_client + .list(ListPathInfoRequest::default()) + .await? + .into_inner(); + + Ok(s) + }); + + let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); + + Box::new(StreamIterator::new(self.tokio_handle.clone(), stream)) + } +} + +pub struct StreamIterator { + tokio_handle: tokio::runtime::Handle, + stream: Streaming<proto::PathInfo>, +} + +impl StreamIterator { + pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming<proto::PathInfo>) -> Self { + Self { + tokio_handle, + stream, + } + } +} + +impl Iterator for StreamIterator { + type Item = Result<proto::PathInfo, crate::Error>; + + fn next(&mut self) -> Option<Self::Item> { + match self.tokio_handle.block_on(self.stream.message()) { + Ok(o) => match o { + Some(pathinfo) => { + // validate the pathinfo + if let Err(e) = pathinfo.validate() { + return Some(Err(crate::Error::StorageError(format!( + "pathinfo {:?} failed validation: {}", + pathinfo, e + )))); + } + Some(Ok(pathinfo)) + } + None => None, + }, + Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), + } + } } #[cfg(test)] |