about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-09-03T14·09+0300
committerclbot <clbot@tvl.fyi>2023-09-05T20·46+0000
commitda9d706e0a5e4e37087e4841a8fc8edf0da35e77 (patch)
treed6de951e1458bc0a56aff5a1feb463ee6eeb9592 /tvix/store/src/pathinfoservice
parente41b5ae3f0cd0814367f1e2f4f256669c849fde7 (diff)
feat(tvix/store/pathinfosvc): provide listing r/6555
This provides an additional method in the PathInfoService trait, as
well as an RPC method on the gRPC layer to list all PathInfo objects in
a PathInfoService.

Change-Id: I7378f6bbd334bd6ac4e9be92505bd099a1c2b19a
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9216
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/pathinfoservice')
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs64
-rw-r--r--tvix/store/src/pathinfoservice/memory.rs12
-rw-r--r--tvix/store/src/pathinfoservice/mod.rs4
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs25
4 files changed, 103 insertions, 2 deletions
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index c98a89c4b8..2bd766697b 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -1,8 +1,12 @@
 use super::PathInfoService;
-use crate::{blobservice::BlobService, directoryservice::DirectoryService, proto};
+use crate::{
+    blobservice::BlobService,
+    directoryservice::DirectoryService,
+    proto::{self, ListPathInfoRequest},
+};
 use std::sync::Arc;
 use tokio::net::UnixStream;
-use tonic::{transport::Channel, Code, Status};
+use tonic::{transport::Channel, Code, Status, Streaming};
 
 /// Connects to a (remote) tvix-store PathInfoService over gRPC.
 #[derive(Clone)]
@@ -160,6 +164,62 @@ impl PathInfoService for GRPCPathInfoService {
 
         Ok((resp.nar_size, nar_sha256))
     }
+
+    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, crate::Error>> + Send> {
+        // Get a new handle to the gRPC client.
+        let mut grpc_client = self.grpc_client.clone();
+
+        let task: tokio::task::JoinHandle<Result<_, Status>> =
+            self.tokio_handle.spawn(async move {
+                let s = grpc_client
+                    .list(ListPathInfoRequest::default())
+                    .await?
+                    .into_inner();
+
+                Ok(s)
+            });
+
+        let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
+
+        Box::new(StreamIterator::new(self.tokio_handle.clone(), stream))
+    }
+}
+
+pub struct StreamIterator {
+    tokio_handle: tokio::runtime::Handle,
+    stream: Streaming<proto::PathInfo>,
+}
+
+impl StreamIterator {
+    pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming<proto::PathInfo>) -> Self {
+        Self {
+            tokio_handle,
+            stream,
+        }
+    }
+}
+
+impl Iterator for StreamIterator {
+    type Item = Result<proto::PathInfo, crate::Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        match self.tokio_handle.block_on(self.stream.message()) {
+            Ok(o) => match o {
+                Some(pathinfo) => {
+                    // validate the pathinfo
+                    if let Err(e) = pathinfo.validate() {
+                        return Some(Err(crate::Error::StorageError(format!(
+                            "pathinfo {:?} failed validation: {}",
+                            pathinfo, e
+                        ))));
+                    }
+                    Some(Ok(pathinfo))
+                }
+                None => None,
+            },
+            Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))),
+        }
+    }
 }
 
 #[cfg(test)]
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
index f7abb2180e..aba1216c6e 100644
--- a/tvix/store/src/pathinfoservice/memory.rs
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -85,6 +85,18 @@ impl PathInfoService for MemoryPathInfoService {
         )
         .map_err(|e| Error::StorageError(e.to_string()))
     }
+
+    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_> {
+        let db = self.db.read().unwrap();
+
+        // Copy all elements into a list.
+        // This is a bit ugly, because we can't have db escape the lifetime
+        // of this function, but elements need to be returned owned anyways, and this in-
+        // memory impl is only for testing purposes anyways.
+        let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect();
+
+        Box::new(items.into_iter())
+    }
 }
 
 #[cfg(test)]
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
index 191e8dbd60..51d3e51115 100644
--- a/tvix/store/src/pathinfoservice/mod.rs
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -39,4 +39,8 @@ pub trait PathInfoService: Send + Sync {
     /// This can be used to calculate NAR-based output paths,
     /// and implementations are encouraged to cache it.
     fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>;
+
+    /// Iterate over all PathInfo objects in the store.
+    /// Implementations can decide to disallow listing.
+    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_>;
 }
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
index 2448b073c6..4f327626d1 100644
--- a/tvix/store/src/pathinfoservice/sled.rs
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -136,6 +136,31 @@ impl PathInfoService for SledPathInfoService {
         )
         .map_err(|e| Error::StorageError(e.to_string()))
     }
+
+    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send> {
+        Box::new(self.db.iter().values().map(|v| match v {
+            Ok(data) => {
+                // we retrieved some bytes
+                match proto::PathInfo::decode(&*data) {
+                    Ok(path_info) => Ok(path_info),
+                    Err(e) => {
+                        warn!("failed to decode stored PathInfo: {}", e);
+                        Err(Error::StorageError(format!(
+                            "failed to decode stored PathInfo: {}",
+                            e
+                        )))
+                    }
+                }
+            }
+            Err(e) => {
+                warn!("failed to retrieve PathInfo: {}", e);
+                Err(Error::StorageError(format!(
+                    "failed to retrieve PathInfo: {}",
+                    e
+                )))
+            }
+        }))
+    }
 }
 
 #[cfg(test)]