diff options
Diffstat (limited to 'tvix/store/src/pathinfoservice')
-rw-r--r-- | tvix/store/src/pathinfoservice/from_addr.rs | 56 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 329 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 172 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 61 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 269 |
5 files changed, 887 insertions, 0 deletions
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs new file mode 100644 index 000000000000..93cb487f29b9 --- /dev/null +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -0,0 +1,56 @@ +use super::{GRPCPathInfoService, MemoryPathInfoService, PathInfoService, SledPathInfoService}; + +use std::sync::Arc; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; +use url::Url; + +/// Constructs a new instance of a [PathInfoService] from an URI. +/// +/// The following URIs are supported: +/// - `memory:` +/// Uses a in-memory implementation. +/// - `sled:` +/// Uses a in-memory sled implementation. +/// - `sled:///absolute/path/to/somewhere` +/// Uses sled, using a path on the disk for persistency. Can be only opened +/// from one process at the same time. +/// - `grpc+unix:///absolute/path/to/somewhere` +/// Connects to a local tvix-store gRPC service via Unix socket. +/// - `grpc+http://host:port`, `grpc+https://host:port` +/// Connects to a (remote) tvix-store gRPC service. +/// +/// As the [PathInfoService] needs to talk to [BlobService] and [DirectoryService], +/// these also need to be passed in. +pub fn from_addr( + uri: &str, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +) -> Result<Arc<dyn PathInfoService>, Error> { + let url = + Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; + + Ok(if url.scheme() == "memory" { + Arc::new(MemoryPathInfoService::from_url( + &url, + blob_service, + directory_service, + )?) + } else if url.scheme() == "sled" { + Arc::new(SledPathInfoService::from_url( + &url, + blob_service, + directory_service, + )?) + } else if url.scheme().starts_with("grpc+") { + Arc::new(GRPCPathInfoService::from_url( + &url, + blob_service, + directory_service, + )?) + } else { + Err(Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))? + }) +} diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs new file mode 100644 index 000000000000..a88828083940 --- /dev/null +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -0,0 +1,329 @@ +use super::PathInfoService; +use crate::proto::{self, ListPathInfoRequest, PathInfo}; +use async_stream::try_stream; +use futures::Stream; +use std::{pin::Pin, sync::Arc}; +use tokio::net::UnixStream; +use tonic::{async_trait, transport::Channel, Code}; +use tvix_castore::{ + blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, +}; + +/// Connects to a (remote) tvix-store PathInfoService over gRPC. +#[derive(Clone)] +pub struct GRPCPathInfoService { + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, +} + +impl GRPCPathInfoService { + /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + ) -> Self { + Self { grpc_client } + } +} + +#[async_trait] +impl PathInfoService for GRPCPathInfoService { + /// Constructs a [GRPCPathInfoService] from the passed [url::Url]: + /// - scheme has to match `grpc+*://`. + /// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + /// - In the case of unix sockets, there must be a path, but may not be a host. + /// - In the case of non-unix sockets, there must be a host, but no path. + /// The blob_service and directory_service arguments are ignored, because the gRPC service already provides answers to these questions. + fn from_url( + url: &url::Url, + _blob_service: Arc<dyn BlobService>, + _directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, tvix_castore::Error> { + // Start checking for the scheme to start with grpc+. + match url.scheme().strip_prefix("grpc+") { + None => Err(Error::StorageError("invalid scheme".to_string())), + Some(rest) => { + if rest == "unix" { + if url.host_str().is_some() { + return Err(Error::StorageError("host may not be set".to_string())); + } + let path = url.path().to_string(); + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter + .unwrap() + .connect_with_connector_lazy(tower::service_fn( + move |_: tonic::transport::Uri| UnixStream::connect(path.clone()), + )); + let grpc_client = + proto::path_info_service_client::PathInfoServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } else { + // ensure path is empty, not supported with gRPC. + if !url.path().is_empty() { + return Err(tvix_castore::Error::StorageError( + "path may not be set".to_string(), + )); + } + + // clone the uri, and drop the grpc+ from the scheme. + // Recreate a new uri with the `grpc+` prefix dropped from the scheme. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let url = { + let url_str = url.to_string(); + let s_stripped = url_str.strip_prefix("grpc+").unwrap(); + url::Url::parse(s_stripped).unwrap() + }; + let channel = tonic::transport::Endpoint::try_from(url.to_string()) + .unwrap() + .connect_lazy(); + + let grpc_client = + proto::path_info_service_client::PathInfoServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } + } + } + } + + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + + let path_info = grpc_client + .get(proto::GetPathInfoRequest { + by_what: Some(proto::get_path_info_request::ByWhat::ByOutputHash( + digest.to_vec().into(), + )), + }) + .await; + + match path_info { + Ok(path_info) => Ok(Some(path_info.into_inner())), + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(Error::StorageError(e.to_string())), + } + } + + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + + let path_info = grpc_client + .put(path_info) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + Ok(path_info) + } + + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + let root_node = root_node.clone(); + + let path_info = grpc_client + .calculate_nar(castorepb::Node { + node: Some(root_node), + }) + .await + .map_err(|e| Error::StorageError(e.to_string()))? + .into_inner(); + + let nar_sha256: [u8; 32] = path_info + .nar_sha256 + .to_vec() + .try_into() + .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; + + Ok((path_info.nar_size, nar_sha256)) + } + + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { + let mut grpc_client = self.grpc_client.clone(); + + let stream = try_stream! { + let resp = grpc_client.list(ListPathInfoRequest::default()).await; + + let mut stream = resp.map_err(|e| Error::StorageError(e.to_string()))?.into_inner(); + + loop { + match stream.message().await { + Ok(o) => match o { + Some(pathinfo) => { + // validate the pathinfo + if let Err(e) = pathinfo.validate() { + Err(Error::StorageError(format!( + "pathinfo {:?} failed validation: {}", + pathinfo, e + )))?; + } + yield pathinfo + } + None => { + return; + }, + }, + Err(e) => Err(Error::StorageError(e.to_string()))?, + } + } + }; + + Box::pin(stream) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio_retry::strategy::ExponentialBackoff; + use tokio_retry::Retry; + use tokio_stream::wrappers::UnixListenerStream; + + use crate::pathinfoservice::MemoryPathInfoService; + use crate::proto::GRPCPathInfoServiceWrapper; + use crate::tests::fixtures; + use crate::tests::utils::gen_blob_service; + use crate::tests::utils::gen_directory_service; + + use super::GRPCPathInfoService; + use super::PathInfoService; + + /// This uses the wrong scheme + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This uses the correct scheme for a unix socket. + /// The fact that /path/to/somewhere doesn't exist yet is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_unix_path() { + let url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This uses the correct scheme for a unix socket, + /// but sets a host, which is unsupported. + #[tokio::test] + async fn test_invalid_unix_path_with_domain() { + let url = + url::Url::parse("grpc+unix://host.example/path/to/somewhere").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This uses the correct scheme for a HTTP server. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_http() { + let url = url::Url::parse("grpc+http://localhost").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This uses the correct scheme for a HTTPS server. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_https() { + let url = url::Url::parse("grpc+https://localhost").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This uses the correct scheme, but also specifies + /// an additional path, which is not supported for gRPC. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_invalid_http_with_path() { + let url = url::Url::parse("grpc+https://localhost/some-path").expect("must parse"); + + assert!( + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This ensures connecting via gRPC works as expected. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("daemon"); + + let path_clone = socket_path.clone(); + + // Spin up a server + tokio::spawn(async { + let uds = UnixListener::bind(path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = server.add_service( + crate::proto::path_info_service_server::PathInfoServiceServer::new( + GRPCPathInfoServiceWrapper::from(Arc::new(MemoryPathInfoService::new( + gen_blob_service(), + gen_directory_service(), + )) + as Arc<dyn PathInfoService>), + ), + ); + router.serve_with_incoming(uds_stream).await + }); + + // wait for the socket to be created + Retry::spawn( + ExponentialBackoff::from_millis(20).max_delay(Duration::from_secs(10)), + || async { + if socket_path.exists() { + Ok(()) + } else { + Err(()) + } + }, + ) + .await + .expect("failed to wait for socket"); + + // prepare a client + let grpc_client = { + let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) + .expect("must parse"); + GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .expect("must succeed") + }; + + let path_info = grpc_client + .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap()) + .await + .expect("must not be error"); + + assert!(path_info.is_none()); + } +} diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs new file mode 100644 index 000000000000..dbb4b02dd013 --- /dev/null +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -0,0 +1,172 @@ +use super::PathInfoService; +use crate::{nar::calculate_size_and_sha256, proto::PathInfo}; +use futures::{stream::iter, Stream}; +use std::{ + collections::HashMap, + pin::Pin, + sync::{Arc, RwLock}, +}; +use tonic::async_trait; +use tvix_castore::proto as castorepb; +use tvix_castore::Error; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; + +pub struct MemoryPathInfoService { + db: Arc<RwLock<HashMap<[u8; 20], PathInfo>>>, + + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +} + +impl MemoryPathInfoService { + pub fn new( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Self { + Self { + db: Default::default(), + blob_service, + directory_service, + } + } +} + +#[async_trait] +impl PathInfoService for MemoryPathInfoService { + /// Constructs a [MemoryPathInfoService] from the passed [url::Url]: + /// - scheme has to be `memory://` + /// - there may not be a host. + /// - there may not be a path. + fn from_url( + url: &url::Url, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, Error> { + if url.scheme() != "memory" { + return Err(Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() || !url.path().is_empty() { + return Err(Error::StorageError("invalid url".to_string())); + } + + Ok(Self::new(blob_service, directory_service)) + } + + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + let db = self.db.read().unwrap(); + + match db.get(&digest) { + None => Ok(None), + Some(path_info) => Ok(Some(path_info.clone())), + } + } + + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + match path_info.validate() { + Err(e) => Err(Error::InvalidRequest(format!( + "failed to validate PathInfo: {}", + e + ))), + + // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. + // This overwrites existing PathInfo objects. + Ok(nix_path) => { + let mut db = self.db.write().unwrap(); + db.insert(nix_path.digest, path_info.clone()); + + Ok(path_info) + } + } + } + + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + calculate_size_and_sha256( + root_node, + self.blob_service.clone(), + self.directory_service.clone(), + ) + .await + .map_err(|e| Error::StorageError(e.to_string())) + } + + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { + let db = self.db.read().unwrap(); + + // Copy all elements into a list. + // This is a bit ugly, because we can't have db escape the lifetime + // of this function, but elements need to be returned owned anyways, and this in- + // memory impl is only for testing purposes anyways. + let items: Vec<_> = db.iter().map(|(_k, v)| Ok(v.clone())).collect(); + + Box::pin(iter(items)) + } +} + +#[cfg(test)] +mod tests { + use crate::tests::utils::gen_blob_service; + use crate::tests::utils::gen_directory_service; + + use super::MemoryPathInfoService; + use super::PathInfoService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!( + MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This correctly sets the scheme, and doesn't set a path. + #[test] + fn test_valid_scheme() { + let url = url::Url::parse("memory://").expect("must parse"); + + assert!( + MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This sets the host to `foo` + #[test] + fn test_invalid_host() { + let url = url::Url::parse("memory://foo").expect("must parse"); + + assert!( + MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This has the path "/", which is invalid. + #[test] + fn test_invalid_has_path() { + let url = url::Url::parse("memory:///").expect("must parse"); + + assert!( + MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This has the path "/foo", which is invalid. + #[test] + fn test_invalid_path2() { + let url = url::Url::parse("memory:///foo").expect("must parse"); + + assert!( + MemoryPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } +} diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs new file mode 100644 index 000000000000..af7bbc9f88e4 --- /dev/null +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -0,0 +1,61 @@ +mod from_addr; +mod grpc; +mod memory; +mod sled; + +use std::pin::Pin; +use std::sync::Arc; + +use futures::Stream; +use tonic::async_trait; +use tvix_castore::blobservice::BlobService; +use tvix_castore::directoryservice::DirectoryService; +use tvix_castore::proto as castorepb; +use tvix_castore::Error; + +use crate::proto::PathInfo; + +pub use self::from_addr::from_addr; +pub use self::grpc::GRPCPathInfoService; +pub use self::memory::MemoryPathInfoService; +pub use self::sled::SledPathInfoService; + +/// The base trait all PathInfo services need to implement. +#[async_trait] +pub trait PathInfoService: Send + Sync { + /// Create a new instance by passing in a connection URL, as well + /// as instances of a [PathInfoService] and [DirectoryService] (as the + /// [PathInfoService] needs to talk to them). + /// TODO: check if we want to make this async, instead of lazily connecting + fn from_url( + url: &url::Url, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, Error> + where + Self: Sized; + + /// Retrieve a PathInfo message by the output digest. + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error>; + + /// Store a PathInfo message. Implementations MUST call validate and reject + /// invalid messages. + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error>; + + /// Return the nar size and nar sha256 digest for a given root node. + /// This can be used to calculate NAR-based output paths, + /// and implementations are encouraged to cache it. + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error>; + + /// Iterate over all PathInfo objects in the store. + /// Implementations can decide to disallow listing. + /// + /// This returns a pinned, boxed stream. The pinning allows for it to be polled easily, + /// and the box allows different underlying stream implementations to be returned since + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>>; +} diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs new file mode 100644 index 000000000000..bac384ea0912 --- /dev/null +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -0,0 +1,269 @@ +use super::PathInfoService; +use crate::nar::calculate_size_and_sha256; +use crate::proto::PathInfo; +use futures::{stream::iter, Stream}; +use prost::Message; +use std::{path::PathBuf, pin::Pin, sync::Arc}; +use tonic::async_trait; +use tracing::warn; +use tvix_castore::proto as castorepb; +use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; + +/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). +/// +/// The PathInfo messages are stored as encoded protos, and keyed by their output hash, +/// as that's currently the only request type available. +pub struct SledPathInfoService { + db: sled::Db, + + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, +} + +impl SledPathInfoService { + pub fn new( + p: PathBuf, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, sled::Error> { + let config = sled::Config::default().use_compression(true).path(p); + let db = config.open()?; + + Ok(Self { + db, + blob_service, + directory_service, + }) + } + + pub fn new_temporary( + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, sled::Error> { + let config = sled::Config::default().temporary(true); + let db = config.open()?; + + Ok(Self { + db, + blob_service, + directory_service, + }) + } +} + +#[async_trait] +impl PathInfoService for SledPathInfoService { + /// Constructs a [SledPathInfoService] from the passed [url::Url]: + /// - scheme has to be `sled://` + /// - there may not be a host. + /// - a path to the sled needs to be provided (which may not be `/`). + fn from_url( + url: &url::Url, + blob_service: Arc<dyn BlobService>, + directory_service: Arc<dyn DirectoryService>, + ) -> Result<Self, Error> { + if url.scheme() != "sled" { + return Err(Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() { + return Err(Error::StorageError(format!( + "invalid host: {}", + url.host().unwrap() + ))); + } + + // TODO: expose compression and other parameters as URL parameters, drop new and new_temporary? + if url.path().is_empty() { + Self::new_temporary(blob_service, directory_service) + .map_err(|e| Error::StorageError(e.to_string())) + } else if url.path() == "/" { + Err(Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )) + } else { + Self::new(url.path().into(), blob_service, directory_service) + .map_err(|e| Error::StorageError(e.to_string())) + } + } + + async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { + match self.db.get(digest) { + Ok(None) => Ok(None), + Ok(Some(data)) => match PathInfo::decode(&*data) { + Ok(path_info) => Ok(Some(path_info)), + Err(e) => { + warn!("failed to decode stored PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to decode stored PathInfo: {}", + e + ))) + } + }, + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to retrieve PathInfo: {}", + e + ))) + } + } + } + + async fn put(&self, path_info: PathInfo) -> Result<PathInfo, Error> { + // Call validate on the received PathInfo message. + match path_info.validate() { + Err(e) => Err(Error::InvalidRequest(format!( + "failed to validate PathInfo: {}", + e + ))), + // In case the PathInfo is valid, and we were able to extract a NixPath, store it in the database. + // This overwrites existing PathInfo objects. + Ok(nix_path) => match self.db.insert(nix_path.digest, path_info.encode_to_vec()) { + Ok(_) => Ok(path_info), + Err(e) => { + warn!("failed to insert PathInfo: {}", e); + Err(Error::StorageError(format! { + "failed to insert PathInfo: {}", e + })) + } + }, + } + } + + async fn calculate_nar( + &self, + root_node: &castorepb::node::Node, + ) -> Result<(u64, [u8; 32]), Error> { + calculate_size_and_sha256( + root_node, + self.blob_service.clone(), + self.directory_service.clone(), + ) + .await + .map_err(|e| Error::StorageError(e.to_string())) + } + + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<PathInfo, Error>> + Send>> { + Box::pin(iter(self.db.iter().values().map(|v| match v { + Ok(data) => { + // we retrieved some bytes + match PathInfo::decode(&*data) { + Ok(path_info) => Ok(path_info), + Err(e) => { + warn!("failed to decode stored PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to decode stored PathInfo: {}", + e + ))) + } + } + } + Err(e) => { + warn!("failed to retrieve PathInfo: {}", e); + Err(Error::StorageError(format!( + "failed to retrieve PathInfo: {}", + e + ))) + } + }))) + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use crate::tests::utils::gen_blob_service; + use crate::tests::utils::gen_directory_service; + + use super::PathInfoService; + use super::SledPathInfoService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This uses the correct scheme, and doesn't specify a path (temporary sled). + #[test] + fn test_valid_scheme_temporary() { + let url = url::Url::parse("sled://").expect("must parse"); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This sets the path to a location that doesn't exist, which should fail (as sled doesn't mkdir -p) + #[test] + fn test_nonexistent_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().join("foo").join("bar").to_str().unwrap()); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This uses the correct scheme, and specifies / as path (which should fail + // for obvious reasons) + #[test] + fn test_invalid_path_root() { + let url = url::Url::parse("sled:///").expect("must parse"); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This uses the correct scheme, and sets a tempdir as location. + #[test] + fn test_valid_scheme_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_ok() + ); + } + + /// This sets a host, rather than a path, which should fail. + #[test] + fn test_invalid_host() { + let url = url::Url::parse("sled://foo.example").expect("must parse"); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } + + /// This sets a host AND a valid path, which should fail + #[test] + fn test_invalid_host_and_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!( + SledPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service()) + .is_err() + ); + } +} |