about summary refs log tree commit diff
path: root/tvix/store/src
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-09-03T14·09+0300
committerclbot <clbot@tvl.fyi>2023-09-05T20·46+0000
commitda9d706e0a5e4e37087e4841a8fc8edf0da35e77 (patch)
treed6de951e1458bc0a56aff5a1feb463ee6eeb9592 /tvix/store/src
parente41b5ae3f0cd0814367f1e2f4f256669c849fde7 (diff)
feat(tvix/store/pathinfosvc): provide listing r/6555
This provides an additional method in the PathInfoService trait, as
well as an RPC method on the gRPC layer to list all PathInfo objects in
a PathInfoService.

Change-Id: I7378f6bbd334bd6ac4e9be92505bd099a1c2b19a
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9216
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src')
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs64
-rw-r--r--tvix/store/src/pathinfoservice/memory.rs12
-rw-r--r--tvix/store/src/pathinfoservice/mod.rs4
-rw-r--r--tvix/store/src/pathinfoservice/sled.rs25
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs30
-rw-r--r--tvix/store/src/proto/tests/grpc_pathinfoservice.rs4
6 files changed, 135 insertions, 4 deletions
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index c98a89c4b8e7..2bd766697bd1 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -1,8 +1,12 @@
 use super::PathInfoService;
-use crate::{blobservice::BlobService, directoryservice::DirectoryService, proto};
+use crate::{
+    blobservice::BlobService,
+    directoryservice::DirectoryService,
+    proto::{self, ListPathInfoRequest},
+};
 use std::sync::Arc;
 use tokio::net::UnixStream;
-use tonic::{transport::Channel, Code, Status};
+use tonic::{transport::Channel, Code, Status, Streaming};
 
 /// Connects to a (remote) tvix-store PathInfoService over gRPC.
 #[derive(Clone)]
@@ -160,6 +164,62 @@ impl PathInfoService for GRPCPathInfoService {
 
         Ok((resp.nar_size, nar_sha256))
     }
+
+    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, crate::Error>> + Send> {
+        // Get a new handle to the gRPC client.
+        let mut grpc_client = self.grpc_client.clone();
+
+        let task: tokio::task::JoinHandle<Result<_, Status>> =
+            self.tokio_handle.spawn(async move {
+                let s = grpc_client
+                    .list(ListPathInfoRequest::default())
+                    .await?
+                    .into_inner();
+
+                Ok(s)
+            });
+
+        let stream = self.tokio_handle.block_on(task).unwrap().unwrap();
+
+        Box::new(StreamIterator::new(self.tokio_handle.clone(), stream))
+    }
+}
+
+pub struct StreamIterator {
+    tokio_handle: tokio::runtime::Handle,
+    stream: Streaming<proto::PathInfo>,
+}
+
+impl StreamIterator {
+    pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming<proto::PathInfo>) -> Self {
+        Self {
+            tokio_handle,
+            stream,
+        }
+    }
+}
+
+impl Iterator for StreamIterator {
+    type Item = Result<proto::PathInfo, crate::Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        match self.tokio_handle.block_on(self.stream.message()) {
+            Ok(o) => match o {
+                Some(pathinfo) => {
+                    // validate the pathinfo
+                    if let Err(e) = pathinfo.validate() {
+                        return Some(Err(crate::Error::StorageError(format!(
+                            "pathinfo {:?} failed validation: {}",
+                            pathinfo, e
+                        ))));
+                    }
+                    Some(Ok(pathinfo))
+                }
+                None => None,
+            },
+            Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))),
+        }
+    }
 }
 
 #[cfg(test)]
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
index f7abb2180ef7..aba1216c6e96 100644
--- a/tvix/store/src/pathinfoservice/memory.rs
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -85,6 +85,18 @@ impl PathInfoService for MemoryPathInfoService {
         )
         .map_err(|e| Error::StorageError(e.to_string()))
     }
+
+    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_> {
+        let db = self.db.read().unwrap();
+
+        // Copy all elements into a list.
+        // This is a bit ugly, because we can't have db escape the lifetime
+        // of this function, but elements need to be returned owned anyways, and this in-
+        // memory impl is only for testing purposes anyways.
+        let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect();
+
+        Box::new(items.into_iter())
+    }
 }
 
 #[cfg(test)]
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
index 191e8dbd602e..51d3e51115eb 100644
--- a/tvix/store/src/pathinfoservice/mod.rs
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -39,4 +39,8 @@ pub trait PathInfoService: Send + Sync {
     /// This can be used to calculate NAR-based output paths,
     /// and implementations are encouraged to cache it.
     fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>;
+
+    /// Iterate over all PathInfo objects in the store.
+    /// Implementations can decide to disallow listing.
+    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_>;
 }
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
index 2448b073c622..4f327626d19d 100644
--- a/tvix/store/src/pathinfoservice/sled.rs
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -136,6 +136,31 @@ impl PathInfoService for SledPathInfoService {
         )
         .map_err(|e| Error::StorageError(e.to_string()))
     }
+
+    fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send> {
+        Box::new(self.db.iter().values().map(|v| match v {
+            Ok(data) => {
+                // we retrieved some bytes
+                match proto::PathInfo::decode(&*data) {
+                    Ok(path_info) => Ok(path_info),
+                    Err(e) => {
+                        warn!("failed to decode stored PathInfo: {}", e);
+                        Err(Error::StorageError(format!(
+                            "failed to decode stored PathInfo: {}",
+                            e
+                        )))
+                    }
+                }
+            }
+            Err(e) => {
+                warn!("failed to retrieve PathInfo: {}", e);
+                Err(Error::StorageError(format!(
+                    "failed to retrieve PathInfo: {}",
+                    e
+                )))
+            }
+        }))
+    }
 }
 
 #[cfg(test)]
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index bbf235f8ace9..16a2fd51d0df 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -2,11 +2,14 @@ use crate::nar::RenderError;
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
 use std::sync::Arc;
+use tokio::task;
+use tokio_stream::wrappers::ReceiverStream;
 use tonic::{async_trait, Request, Response, Result, Status};
-use tracing::{instrument, warn};
+use tracing::{debug, instrument, warn};
 
 pub struct GRPCPathInfoServiceWrapper {
     path_info_service: Arc<dyn PathInfoService>,
+    // FUTUREWORK: allow exposing without allowing listing
 }
 
 impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper {
@@ -19,6 +22,8 @@ impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper {
 
 #[async_trait]
 impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper {
+    type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>;
+
     #[instrument(skip(self))]
     async fn get(
         &self,
@@ -78,6 +83,29 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
             }
         }
     }
+
+    #[instrument(skip(self))]
+    async fn list(
+        &self,
+        _request: Request<proto::ListPathInfoRequest>,
+    ) -> Result<Response<Self::ListStream>, Status> {
+        let (tx, rx) = tokio::sync::mpsc::channel(5);
+
+        let path_info_service = self.path_info_service.clone();
+
+        let _task = task::spawn(async move {
+            for e in path_info_service.list() {
+                let res = e.map_err(|e| Status::internal(e.to_string()));
+                if tx.send(res).await.is_err() {
+                    debug!("receiver dropped");
+                    break;
+                }
+            }
+        });
+
+        let receiver_stream = ReceiverStream::new(rx);
+        Ok(Response::new(receiver_stream))
+    }
 }
 
 impl From<RenderError> for tonic::Status {
diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
index 95d97bb12890..114e89cacc10 100644
--- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
+++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
@@ -9,6 +9,7 @@ use crate::tests::utils::gen_blob_service;
 use crate::tests::utils::gen_directory_service;
 use crate::tests::utils::gen_pathinfo_service;
 use std::sync::Arc;
+use tokio_stream::wrappers::ReceiverStream;
 use tonic::Request;
 
 /// generates a GRPCPathInfoService out of blob, directory and pathinfo services.
@@ -16,7 +17,8 @@ use tonic::Request;
 /// We only interact with it via the PathInfo GRPC interface.
 /// It uses the NonCachingNARCalculationService NARCalculationService to
 /// calculate NARs.
-fn gen_grpc_service() -> Arc<dyn GRPCPathInfoService> {
+fn gen_grpc_service(
+) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> {
     let blob_service = gen_blob_service();
     let directory_service = gen_directory_service();
     Arc::new(GRPCPathInfoServiceWrapper::from(gen_pathinfo_service(