diff options
Diffstat (limited to 'tvix/store/src/pathinfoservice')
-rw-r--r-- | tvix/store/src/pathinfoservice/from_addr.rs | 15 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 45 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 25 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 19 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 29 |
5 files changed, 70 insertions, 63 deletions
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 36b30aecdcf5..93cb487f29b9 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -1,10 +1,9 @@ +use super::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService, SledPathInfoService}; + use std::sync::Arc; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; use url::Url; -use crate::{blobservice::BlobService, directoryservice::DirectoryService}; - -use super::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService, SledPathInfoService}; - /// Constructs a new instance of a [PathInfoService] from an URI. /// /// The following URIs are supported: @@ -26,9 +25,9 @@ pub fn from_addr( uri: &str, blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, -) -> Result<Arc<dyn PathInfoService>, crate::Error> { - let url = Url::parse(uri) - .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; +) -> Result<Arc<dyn PathInfoService>, Error> { + let url = + Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; Ok(if url.scheme() == "memory" { Arc::new(MemoryPathInfoService::from_url( @@ -49,7 +48,7 @@ pub fn from_addr( directory_service, )?) } else { - Err(crate::Error::StorageError(format!( + Err(Error::StorageError(format!( "unknown scheme: {}", url.scheme() )))? diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index c116ddbc8905..6883c56104a6 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -1,14 +1,13 @@ use super::PathInfoService; -use crate::{ - blobservice::BlobService, - directoryservice::DirectoryService, - proto::{self, ListPathInfoRequest}, -}; +use crate::proto::{self, ListPathInfoRequest, PathInfo}; use async_stream::try_stream; use futures::Stream; use std::{pin::Pin, sync::Arc}; use tokio::net::UnixStream; use tonic::{async_trait, transport::Channel, Code}; +use tvix_castore::{ + blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, +}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] @@ -40,16 +39,14 @@ impl PathInfoService for GRPCPathInfoService { url: &url::Url, _blob_service: Arc<dyn BlobService>, _directory_service: Arc<dyn DirectoryService>, - ) -> Result<Self, crate::Error> { + ) -> Result<Self, tvix_castore::Error> { // Start checking for the scheme to start with grpc+. match url.scheme().strip_prefix("grpc+") { - None => Err(crate::Error::StorageError("invalid scheme".to_string())), + None => Err(Error::StorageError("invalid scheme".to_string())), Some(rest) => { if rest == "unix" { if url.host_str().is_some() { - return Err(crate::Error::StorageError( - "host may not be set".to_string(), - )); + return Err(Error::StorageError("host may not be set".to_string())); } let path = url.path().to_string(); let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter @@ -63,7 +60,7 @@ impl PathInfoService for GRPCPathInfoService { } else { // ensure path is empty, not supported with gRPC. if !url.path().is_empty() { - return Err(crate::Error::StorageError( + return Err(tvix_castore::Error::StorageError( "path may not be set".to_string(), )); } @@ -89,7 +86,7 @@ impl PathInfoService for GRPCPathInfoService { } } - async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> { + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); @@ -104,18 +101,18 @@ impl PathInfoService for GRPCPathInfoService { match path_info { Ok(path_info) => Ok(Some(path_info.into_inner())), Err(e) if e.code() == Code::NotFound => Ok(None), - Err(e) => Err(crate::Error::StorageError(e.to_string())), + Err(e) => Err(Error::StorageError(e.to_string())), } } - async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, crate::Error> { + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); let path_info = grpc_client .put(path_info) .await - .map_err(|e| crate::Error::StorageError(e.to_string()))? + .map_err(|e| Error::StorageError(e.to_string()))? .into_inner(); Ok(path_info) @@ -123,36 +120,36 @@ impl PathInfoService for GRPCPathInfoService { async fn calculate_nar( &self, - root_node: &proto::node::Node, - ) -> Result<(u64, [u8; 32]), crate::Error> { + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { // Get a new handle to the gRPC client. let mut grpc_client = self.grpc_client.clone(); let root_node = root_node.clone(); let path_info = grpc_client - .calculate_nar(proto::Node { + .calculate_nar(castorepb::Node { node: Some(root_node), }) .await - .map_err(|e| crate::Error::StorageError(e.to_string()))? + .map_err(|e| Error::StorageError(e.to_string()))? .into_inner(); let nar_sha256: [u8; 32] = path_info .nar_sha256 .to_vec() .try_into() - .map_err(|_e| crate::Error::StorageError("invalid digest length".to_string()))?; + .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; Ok((path_info.nar_size, nar_sha256)) } - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, crate::Error>> + Send>> { + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { let mut grpc_client = self.grpc_client.clone(); let stream = try_stream! { let resp = grpc_client.list(ListPathInfoRequest::default()).await; - let mut stream = resp.map_err(|e| crate::Error::StorageError(e.to_string()))?.into_inner(); + let mut stream = resp.map_err(|e| Error::StorageError(e.to_string()))?.into_inner(); loop { match stream.message().await { @@ -160,7 +157,7 @@ impl PathInfoService for GRPCPathInfoService { Some(pathinfo) => { // validate the pathinfo if let Err(e) = pathinfo.validate() { - Err(crate::Error::StorageError(format!( + Err(Error::StorageError(format!( "pathinfo {:?} failed validation: {}", pathinfo, e )))?; @@ -171,7 +168,7 @@ impl PathInfoService for GRPCPathInfoService { return; }, }, - Err(e) => Err(crate::Error::StorageError(e.to_string()))?, + Err(e) => Err(Error::StorageError(e.to_string()))?, } } }; diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index 4cdc411ffb28..dbb4b02dd013 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -1,8 +1,5 @@ use super::PathInfoService; -use crate::{ - blobservice::BlobService, directoryservice::DirectoryService, nar::calculate_size_and_sha256, - proto, Error, -}; +use crate::{nar::calculate_size_and_sha256, proto::PathInfo}; use futures::{stream::iter, Stream}; use std::{ collections::HashMap, @@ -10,9 +7,12 @@ use std::{ sync::{Arc, RwLock}, }; use tonic::async_trait; +use tvix_castore::proto as castorepb; +use tvix_castore::Error; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; pub struct MemoryPathInfoService { - db: Arc<RwLock<HashMap<[u8; 20], proto::PathInfo>>>, + db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>, blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, @@ -43,17 +43,17 @@ impl PathInfoService for MemoryPathInfoService { directory_service: Arc<dyn DirectoryService>, ) -> Result<Self, Error> { if url.scheme() != "memory" { - return Err(crate::Error::StorageError("invalid scheme".to_string())); + return Err(Error::StorageError("invalid scheme".to_string())); } if url.has_host() || !url.path().is_empty() { - return Err(crate::Error::StorageError("invalid url".to_string())); + return Err(Error::StorageError("invalid url".to_string())); } Ok(Self::new(blob_service, directory_service)) } - async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let db = self.db.read().unwrap(); match db.get(&digest) { @@ -62,7 +62,7 @@ impl PathInfoService for MemoryPathInfoService { } } - async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { // Call validate on the received PathInfo message. match path_info.validate() { Err(e) => Err(Error::InvalidRequest(format!( @@ -81,7 +81,10 @@ impl PathInfoService for MemoryPathInfoService { } } - async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { calculate_size_and_sha256( root_node, self.blob_service.clone(), @@ -91,7 +94,7 @@ impl PathInfoService for MemoryPathInfoService { .map_err(|e| Error::StorageError(e.to_string())) } - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>> { + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { let db = self.db.read().unwrap(); // Copy all elements into a list. diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index b436ad0b16dc..af7bbc9f88e4 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -8,10 +8,12 @@ use std::sync::Arc; use futures::Stream; use tonic::async_trait; +use tvix_castore::blobservice::BlobService; +use tvix_castore::directoryservice::DirectoryService; +use tvix_castore::proto as castorepb; +use tvix_castore::Error; -use crate::blobservice::BlobService; -use crate::directoryservice::DirectoryService; -use crate::{proto, Error}; +use crate::proto::PathInfo; pub use self::from_addr::from_addr; pub use self::grpc::GRPCPathInfoService; @@ -34,16 +36,19 @@ pub trait PathInfoService: Send + Sync { Self: Sized; /// Retrieve a PathInfo message by the output digest. - async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error>; + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error>; /// Store a PathInfo message. Implementations MUST call validate and reject /// invalid messages. - async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error>; + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>; /// Return the nar size and nar sha256 digest for a given root node. /// This can be used to calculate NAR-based output paths, /// and implementations are encouraged to cache it. - async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>; + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error>; /// Iterate over all PathInfo objects in the store. /// Implementations can decide to disallow listing. @@ -52,5 +57,5 @@ pub trait PathInfoService: Send + Sync { /// and the box allows different underlying stream implementations to be returned since /// Rust doesn't support this as a generic in traits yet. This is the same thing that /// [async_trait] generates, but for streams instead of futures. - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>>; + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>>; } diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index a9d0b029ee6b..bac384ea0912 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -1,13 +1,13 @@ use super::PathInfoService; -use crate::{ - blobservice::BlobService, directoryservice::DirectoryService, nar::calculate_size_and_sha256, - proto, Error, -}; +use crate::nar::calculate_size_and_sha256; +use crate::proto::PathInfo; use futures::{stream::iter, Stream}; use prost::Message; use std::{path::PathBuf, pin::Pin, sync::Arc}; use tonic::async_trait; use tracing::warn; +use tvix_castore::proto as castorepb; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). /// @@ -63,11 +63,11 @@ impl PathInfoService for SledPathInfoService { directory_service: Arc<dyn DirectoryService>, ) -> Result<Self, Error> { if url.scheme() != "sled" { - return Err(crate::Error::StorageError("invalid scheme".to_string())); + return Err(Error::StorageError("invalid scheme".to_string())); } if url.has_host() { - return Err(crate::Error::StorageError(format!( + return Err(Error::StorageError(format!( "invalid host: {}", url.host().unwrap() ))); @@ -78,7 +78,7 @@ impl PathInfoService for SledPathInfoService { Self::new_temporary(blob_service, directory_service) .map_err(|e| Error::StorageError(e.to_string())) } else if url.path() == "/" { - Err(crate::Error::StorageError( + Err(Error::StorageError( "cowardly refusing to open / with sled".to_string(), )) } else { @@ -87,10 +87,10 @@ impl PathInfoService for SledPathInfoService { } } - async fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { match self.db.get(digest) { Ok(None) => Ok(None), - Ok(Some(data)) => match proto::PathInfo::decode(&*data) { + Ok(Some(data)) => match PathInfo::decode(&*data) { Ok(path_info) => Ok(Some(path_info)), Err(e) => { warn!("failed to decode stored PathInfo: {}", e); @@ -110,7 +110,7 @@ impl PathInfoService for SledPathInfoService { } } - async fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error> { + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { // Call validate on the received PathInfo message. match path_info.validate() { Err(e) => Err(Error::InvalidRequest(format!( @@ -131,7 +131,10 @@ impl PathInfoService for SledPathInfoService { } } - async fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { calculate_size_and_sha256( root_node, self.blob_service.clone(), @@ -141,11 +144,11 @@ impl PathInfoService for SledPathInfoService { .map_err(|e| Error::StorageError(e.to_string())) } - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<proto::PathInfo, Error>> + Send>> { + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { Box::pin(iter(self.db.iter().values().map(|v| match v { Ok(data) => { // we retrieved some bytes - match proto::PathInfo::decode(&*data) { + match PathInfo::decode(&*data) { Ok(path_info) => Ok(path_info), Err(e) => { warn!("failed to decode stored PathInfo: {}", e); |