diff options
Diffstat (limited to 'tvix/store/src/pathinfoservice')
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 176 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 15 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 19 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 18 |
4 files changed, 102 insertions, 126 deletions
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 1649655b69ea..c116ddbc8905 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -4,16 +4,15 @@ use crate::{ directoryservice::DirectoryService, proto::{self, ListPathInfoRequest}, }; -use std::sync::Arc; -use tokio::{net::UnixStream, task::JoinHandle}; -use tonic::{transport::Channel, Code, Status, Streaming}; +use async_stream::try_stream; +use futures::Stream; +use std::{pin::Pin, sync::Arc}; +use tokio::net::UnixStream; +use tonic::{async_trait, transport::Channel, Code}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] pub struct GRPCPathInfoService { - /// A handle into the active tokio runtime. Necessary to spawn tasks. - tokio_handle: tokio::runtime::Handle, - /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, @@ -25,13 +24,11 @@ impl GRPCPathInfoService { pub fn from_client( grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, ) -> Self { - Self { - tokio_handle: tokio::runtime::Handle::current(), - grpc_client, - } + Self { grpc_client } } } +#[async_trait] impl PathInfoService for GRPCPathInfoService { /// Constructs a [GRPCPathInfoService] from the passed [url::Url]: /// - scheme has to match `grpc+*://`. @@ -92,47 +89,39 @@ impl PathInfoService for GRPCPathInfoService { } } - fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> { + async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); - let task: JoinHandle<Result<proto::PathInfo, Status>> = - self.tokio_handle.spawn(async move { - let path_info = grpc_client - .get(proto::GetPathInfoRequest { - by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( - digest.to_vec().into(), - )), - }) - .await? - .into_inner(); - - Ok(path_info) - }); + let path_info = grpc_client + .get(proto::GetPathInfoRequest { + by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( + digest.to_vec().into(), + )), + }) + .await; - match self.tokio_handle.block_on(task)? { - Ok(path_info) => Ok(Some(path_info)), + match path_info { + Ok(path_info) => Ok(Some(path_info.into_inner())), Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(crate::Error::StorageError(e.to_string())), } } - fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> { + async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); - let task: JoinHandle<Result<proto::PathInfo, Status>> = - self.tokio_handle.spawn(async move { - let path_info = grpc_client.put(path_info).await?.into_inner(); - Ok(path_info) - }); + let path_info = grpc_client + .put(path_info) + .await + .map_err(|e| crate::Error::StorageError(e.to_string()))? + .into_inner(); - self.tokio_handle - .block_on(task)? - .map_err(|e| crate::Error::StorageError(e.to_string())) + Ok(path_info) } - fn calculate_nar( + async fn calculate_nar( &self, root_node: &proto::node::Node, ) -> Result<(u64, [u8; 32]), crate::Error> { @@ -140,83 +129,54 @@ impl PathInfoService for GRPCPathInfoService { let mut grpc_client = self.grpc_client.clone(); let root_node = root_node.clone(); - let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move { - let path_info = grpc_client - .calculate_nar(proto::Node { - node: Some(root_node), - }) - .await? - .into_inner(); - Ok(path_info) - }); - - let resp = self - .tokio_handle - .block_on(task)? - .map_err(|e| crate::Error::StorageError(e.to_string()))?; + let path_info = grpc_client + .calculate_nar(proto::Node { + node: Some(root_node), + }) + .await + .map_err(|e| crate::Error::StorageError(e.to_string()))? + .into_inner(); - let nar_sha256: [u8; 32] = resp + let nar_sha256: [u8; 32] = path_info .nar_sha256 .to_vec() .try_into() .map_err(|_e| crate::Error::StorageError("invalid digest length".to_string()))?; - Ok((resp.nar_size, nar_sha256)) + Ok((path_info.nar_size, nar_sha256)) } - fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, crate::Error>> + Send> { - // Get a new handle to the gRPC client. + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, crate::Error>> + Send>> { let mut grpc_client = self.grpc_client.clone(); - let task: JoinHandle<Result<_, Status>> = self.tokio_handle.spawn(async move { - let s = grpc_client - .list(ListPathInfoRequest::default()) - .await? - .into_inner(); - - Ok(s) - }); - - let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); - - Box::new(StreamIterator::new(self.tokio_handle.clone(), stream)) - } -} - -pub struct StreamIterator { - tokio_handle: tokio::runtime::Handle, - stream: Streaming<proto::PathInfo>, -} - -impl StreamIterator { - pub fn new(tokio_handle: tokio::runtime::Handle, stream: Streaming<proto::PathInfo>) -> Self { - Self { - tokio_handle, - stream, - } - } -} - -impl Iterator for StreamIterator { - type Item = Result<proto::PathInfo, crate::Error>; - - fn next(&mut self) -> Option<Self::Item> { - match self.tokio_handle.block_on(self.stream.message()) { - Ok(o) => match o { - Some(pathinfo) => { - // validate the pathinfo - if let Err(e) = pathinfo.validate() { - return Some(Err(crate::Error::StorageError(format!( - "pathinfo {:?} failed validation: {}", - pathinfo, e - )))); - } - Some(Ok(pathinfo)) + let stream = try_stream! { + let resp = grpc_client.list(ListPathInfoRequest::default()).await; + + let mut stream = resp.map_err(|e| crate::Error::StorageError(e.to_string()))?.into_inner(); + + loop { + match stream.message().await { + Ok(o) => match o { + Some(pathinfo) => { + // validate the pathinfo + if let Err(e) = pathinfo.validate() { + Err(crate::Error::StorageError(format!( + "pathinfo {:?} failed validation: {}", + pathinfo, e + )))?; + } + yield pathinfo + } + None => { + return; + }, + }, + Err(e) => Err(crate::Error::StorageError(e.to_string()))?, } - None => None, - }, - Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), - } + } + }; + + Box::pin(stream) } } @@ -227,7 +187,6 @@ mod tests { use tempfile::TempDir; use tokio::net::UnixListener; - use tokio::task; use tokio::time; use tokio_stream::wrappers::UnixListenerStream; @@ -377,13 +336,10 @@ mod tests { ); } - let pi = task::spawn_blocking(move || { - client - .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap()) - .expect("must not be error") - }) - .await - .expect("must not be err"); + let pi = client + .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap()) + .await + .expect("must not be error"); assert!(pi.is_none()); } diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index aba1216c6e96..4cdc411ffb28 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -3,10 +3,13 @@ use crate::{ blobservice::BlobService, directoryservice::DirectoryService, nar::calculate_size_and_sha256, proto, Error, }; +use futures::{stream::iter, Stream}; use std::{ collections::HashMap, + pin::Pin, sync::{Arc, RwLock}, }; +use tonic::async_trait; pub struct MemoryPathInfoService { db: Arc<RwLock<HashMap<[u8; 20], proto::PathInfo>>>, @@ -28,6 +31,7 @@ impl MemoryPathInfoService { } } +#[async_trait] impl PathInfoService for MemoryPathInfoService { /// Constructs a [MemoryPathInfoService] from the passed [url::Url]: /// - scheme has to be `memory://` @@ -49,7 +53,7 @@ impl PathInfoService for MemoryPathInfoService { Ok(Self::new(blob_service, directory_service)) } - fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { + async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { let db = self.db.read().unwrap(); match db.get(&digest) { @@ -58,7 +62,7 @@ impl PathInfoService for MemoryPathInfoService { } } - fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { + async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { // Call validate on the received PathInfo message. match path_info.validate() { Err(e) => Err(Error::InvalidRequest(format!( @@ -77,16 +81,17 @@ impl PathInfoService for MemoryPathInfoService { } } - fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { + async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { calculate_size_and_sha256( root_node, self.blob_service.clone(), self.directory_service.clone(), ) + .await .map_err(|e| Error::StorageError(e.to_string())) } - fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_> { + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>> { let db = self.db.read().unwrap(); // Copy all elements into a list. @@ -95,7 +100,7 @@ impl PathInfoService for MemoryPathInfoService { // memory impl is only for testing purposes anyways. let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); - Box::new(items.into_iter()) + Box::pin(iter(items)) } } diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index 51d3e51115eb..a460c35a0289 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -3,8 +3,12 @@ mod grpc; mod memory; mod sled; +use std::pin::Pin; use std::sync::Arc; +use futures::Stream; +use tonic::async_trait; + use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::{proto, Error}; @@ -16,10 +20,12 @@ pub use self::sled::SledPathInfoService; /// The base trait all PathInfo services need to implement. /// This is a simple get and put of [proto::Directory], returning their digest. +#[async_trait] pub trait PathInfoService: Send + Sync { /// Create a new instance by passing in a connection URL, as well /// as instances of a [PathInfoService] and [DirectoryService] (as the /// [PathInfoService] needs to talk to them). + /// TODO: check if we want to make this async, instead of lazily connecting fn from_url( url: &url::Url, blob_service: Arc<dyn BlobService>, @@ -29,18 +35,23 @@ pub trait PathInfoService: Send + Sync { Self: Sized; /// Retrieve a PathInfo message by the output digest. - fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error>; + async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error>; /// Store a PathInfo message. Implementations MUST call validate and reject /// invalid messages. - fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error>; + async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error>; /// Return the nar size and nar sha256 digest for a given root node. /// This can be used to calculate NAR-based output paths, /// and implementations are encouraged to cache it. - fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>; + async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>; /// Iterate over all PathInfo objects in the store. /// Implementations can decide to disallow listing. - fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send + '_>; + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>>; } diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 4f327626d19d..a9d0b029ee6b 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -3,8 +3,10 @@ use crate::{ blobservice::BlobService, directoryservice::DirectoryService, nar::calculate_size_and_sha256, proto, Error, }; +use futures::{stream::iter, Stream}; use prost::Message; -use std::{path::PathBuf, sync::Arc}; +use std::{path::PathBuf, pin::Pin, sync::Arc}; +use tonic::async_trait; use tracing::warn; /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). @@ -49,6 +51,7 @@ impl SledPathInfoService { } } +#[async_trait] impl PathInfoService for SledPathInfoService { /// Constructs a [SledPathInfoService] from the passed [url::Url]: /// - scheme has to be `sled://` @@ -84,7 +87,7 @@ impl PathInfoService for SledPathInfoService { } } - fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { + async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { match self.db.get(digest) { Ok(None) => Ok(None), Ok(Some(data)) => match proto::PathInfo::decode(&*data) { @@ -107,7 +110,7 @@ impl PathInfoService for SledPathInfoService { } } - fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { + async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { // Call validate on the received PathInfo message. match path_info.validate() { Err(e) => Err(Error::InvalidRequest(format!( @@ -128,17 +131,18 @@ impl PathInfoService for SledPathInfoService { } } - fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { + async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { calculate_size_and_sha256( root_node, self.blob_service.clone(), self.directory_service.clone(), ) + .await .map_err(|e| Error::StorageError(e.to_string())) } - fn list(&self) -> Box<dyn Iterator<Item = Result<proto::PathInfo, Error>> + Send> { - Box::new(self.db.iter().values().map(|v| match v { + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>> { + Box::pin(iter(self.db.iter().values().map(|v| match v { Ok(data) => { // we retrieved some bytes match proto::PathInfo::decode(&*data) { @@ -159,7 +163,7 @@ impl PathInfoService for SledPathInfoService { e ))) } - })) + }))) } } |