diff options
Diffstat (limited to 'tvix/store/src')
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 64 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 12 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 4 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 25 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs | 30 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_pathinfoservice.rs | 4 |
6 files changed, 135 insertions, 4 deletions
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index c98a89c4b8e7..2bd766697bd1 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -1,8 +1,12 @@ use super::PathInfoService; -use crate::{blobservice::BlobService, directoryservice::DirectoryService, proto}; +use crate::{ + blobservice::BlobService, + directoryservice::DirectoryService, + proto::{self, ListPathInfoRequest}, +}; use std::sync::Arc; use tokio::net::UnixStream; -use tonic::{transport::Channel, Code, Status}; +use tonic::{transport::Channel, Code, Status, Streaming}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] @@ -160,6 +164,62 @@ impl PathInfoService for GRPCPathInfoService { Ok((resp.nar_size, nar_sha256)) } + + fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, crate::Error>> + Send> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + + let task: tokio::task::JoinHandle<Result<_, Status>> = + self.tokio_handle.spawn(async move { + let s = grpc_client + .list(ListPathInfoRequest::default()) + .await? + .into_inner(); + + Ok(s) + }); + + let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); + + Box::new(StreamIterator::new(self.tokio_handle.clone(), stream)) + } +} + +pub struct StreamIterator { + tokio_handle: tokio::runtime::Handle, + stream: Streaming<proto::PathInfo>, +} + +impl StreamIterator { + pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming<proto::PathInfo>) -> Self { + Self { + tokio_handle, + stream, + } + } +} + +impl Iterator for StreamIterator { + type Item = Result<proto::PathInfo, crate::Error>; + + fn next(&mut self) -> Option<Self::Item> { + match self.tokio_handle.block_on(self.stream.message()) { + Ok(o) => match o { + Some(pathinfo) => { + // validate the pathinfo + if let Err(e) = pathinfo.validate() { + return Some(Err(crate::Error::StorageError(format!( + "pathinfo {:?} failed validation: {}", + pathinfo, e + )))); + } + Some(Ok(pathinfo)) + } + None => None, + }, + Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), + } + } } #[cfg(test)] diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index f7abb2180ef7..aba1216c6e96 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -85,6 +85,18 @@ impl PathInfoService for MemoryPathInfoService { ) .map_err(|e| Error::StorageError(e.to_string())) } + + fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_> { + let db = self.db.read().unwrap(); + + // Copy all elements into a list. + // This is a bit ugly, because we can't have db escape the lifetime + // of this function, but elements need to be returned owned anyways, and this in- + // memory impl is only for testing purposes anyways. + let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); + + Box::new(items.into_iter()) + } } #[cfg(test)] diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index 191e8dbd602e..51d3e51115eb 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -39,4 +39,8 @@ pub trait PathInfoService: Send + Sync { /// This can be used to calculate NAR-based output paths, /// and implementations are encouraged to cache it. fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>; + + /// Iterate over all PathInfo objects in the store. + /// Implementations can decide to disallow listing. + fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_>; } diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 2448b073c622..4f327626d19d 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -136,6 +136,31 @@ impl PathInfoService for SledPathInfoService { ) .map_err(|e| Error::StorageError(e.to_string())) } + + fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send> { + Box::new(self.db.iter().values().map(|v| match v { + Ok(data) => { + // we retrieved some bytes + match proto::PathInfo::decode(&*data) { + Ok(path_info) => Ok(path_info), + Err(e) => { + warn!("failed to decode stored PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to decode stored PathInfo: {}", + e + ))) + } + } + } + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to retrieve PathInfo: {}", + e + ))) + } + })) + } } #[cfg(test)] diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index bbf235f8ace9..16a2fd51d0df 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -2,11 +2,14 @@ use crate::nar::RenderError; use crate::pathinfoservice::PathInfoService; use crate::proto; use std::sync::Arc; +use tokio::task; +use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Result, Status}; -use tracing::{instrument, warn}; +use tracing::{debug, instrument, warn}; pub struct GRPCPathInfoServiceWrapper { path_info_service: Arc<dyn PathInfoService>, + // FUTUREWORK: allow exposing without allowing listing } impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper { @@ -19,6 +22,8 @@ impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper { #[async_trait] impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper { + type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>; + #[instrument(skip(self))] async fn get( &self, @@ -78,6 +83,29 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra } } } + + #[instrument(skip(self))] + async fn list( + &self, + _request: Request<proto::ListPathInfoRequest>, + ) -> Result<Response<Self::ListStream>, Status> { + let (tx, rx) = tokio::sync::mpsc::channel(5); + + let path_info_service = self.path_info_service.clone(); + + let _task = task::spawn(async move { + for e in path_info_service.list() { + let res = e.map_err(|e| Status::internal(e.to_string())); + if tx.send(res).await.is_err() { + debug!("receiver dropped"); + break; + } + } + }); + + let receiver_stream = ReceiverStream::new(rx); + Ok(Response::new(receiver_stream)) + } } impl From<RenderError> for tonic::Status { diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs index 95d97bb12890..114e89cacc10 100644 --- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -9,6 +9,7 @@ use crate::tests::utils::gen_blob_service; use crate::tests::utils::gen_directory_service; use crate::tests::utils::gen_pathinfo_service; use std::sync::Arc; +use tokio_stream::wrappers::ReceiverStream; use tonic::Request; /// generates a GRPCPathInfoService out of blob, directory and pathinfo services. @@ -16,7 +17,8 @@ use tonic::Request; /// We only interact with it via the PathInfo GRPC interface. /// It uses the NonCachingNARCalculationService NARCalculationService to /// calculate NARs. -fn gen_grpc_service() -> Arc<dyn GRPCPathInfoService> { +fn gen_grpc_service( +) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> { let blob_service = gen_blob_service(); let directory_service = gen_directory_service(); Arc::new(GRPCPathInfoServiceWrapper::from(gen_pathinfo_service( |