diff options
author | Florian Klink <flokli@flokli.de> | 2023-06-09T16·07+0300 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-06-14T20·44+0000 |
commit | a1acb5bcb301bd599d97909abebcda6235421dc4 (patch) | |
tree | 11df4e81bfbc1920943953706bf4b3725470a282 /tvix/store/src | |
parent | e409d9cc7e615eab366b62bc9bd0dfa3cfbf7395 (diff) |
feat(tvix/store/blobsvc): add from_addr r/6302
This allows constructing blob stores with a URL syntax at runtime, by passing the --blob-service-addr arg. We probably still want to have some builder pattern here, to allow additional schemes to be registered. Change-Id: Ie588ff7a7c6fb64c9474dfbd2e4bc5f168dfd778 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8742 Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 46 | ||||
-rw-r--r-- | tvix/store/src/blobservice/from_addr.rs | 31 | ||||
-rw-r--r-- | tvix/store/src/blobservice/grpc.rs | 199 | ||||
-rw-r--r-- | tvix/store/src/blobservice/memory.rs | 62 | ||||
-rw-r--r-- | tvix/store/src/blobservice/mod.rs | 7 | ||||
-rw-r--r-- | tvix/store/src/blobservice/sled.rs | 104 |
6 files changed, 431 insertions, 18 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index b95f408a55b6..6e2424a7e578 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -6,16 +6,13 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use tracing_subscriber::prelude::*; -use tvix_store::blobservice::BlobService; -use tvix_store::blobservice::GRPCBlobService; -use tvix_store::blobservice::SledBlobService; +use tvix_store::blobservice; use tvix_store::directoryservice::DirectoryService; use tvix_store::directoryservice::GRPCDirectoryService; use tvix_store::directoryservice::SledDirectoryService; use tvix_store::pathinfoservice::GRPCPathInfoService; use tvix_store::pathinfoservice::PathInfoService; use tvix_store::pathinfoservice::SledPathInfoService; -use tvix_store::proto::blob_service_client::BlobServiceClient; use tvix_store::proto::blob_service_server::BlobServiceServer; use tvix_store::proto::directory_service_client::DirectoryServiceClient; use tvix_store::proto::directory_service_server::DirectoryServiceServer; @@ -55,17 +52,26 @@ enum Commands { Daemon { #[arg(long, short = 'l')] listen_address: Option<String>, + + #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")] + blob_service_addr: String, }, /// Imports a list of paths into the store (not using the daemon) Import { #[clap(value_name = "PATH")] paths: Vec<PathBuf>, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, }, /// Mounts a tvix-store at the given mountpoint #[cfg(feature = "fuse")] Mount { #[clap(value_name = "PATH")] dest: PathBuf, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, }, } @@ -99,10 +105,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber"); match cli.command { - Commands::Daemon { listen_address } => { + Commands::Daemon { + listen_address, + blob_service_addr, + } => { // initialize stores - let blob_service: Arc<dyn BlobService> = - Arc::new(SledBlobService::new("blobs.sled".into())?); + let blob_service = blobservice::from_addr(&blob_service_addr).await?; let directory_service: Arc<dyn DirectoryService> = Arc::new(SledDirectoryService::new("directories.sled".into())?); let path_info_service: Arc<dyn PathInfoService> = Arc::new(SledPathInfoService::new( @@ -142,10 +150,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { router.serve(listen_address).await?; } - Commands::Import { paths } => { - let blob_service = GRPCBlobService::from_client( - BlobServiceClient::connect("http://[::1]:8000").await?, - ); + Commands::Import { + paths, + blob_service_addr, + } => { + let blob_service = blobservice::from_addr(&blob_service_addr).await?; + let directory_service = GRPCDirectoryService::from_client( DirectoryServiceClient::connect("http://[::1]:8000").await?, ); @@ -155,7 +165,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { GRPCPathInfoService::from_client(path_info_service_client.clone()); let io = Arc::new(TvixStoreIO::new( - Arc::new(blob_service), + blob_service, Arc::new(directory_service), Arc::new(path_info_service), )); @@ -178,10 +188,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { try_join_all(tasks).await?; } #[cfg(feature = "fuse")] - Commands::Mount { dest } => { - let blob_service = GRPCBlobService::from_client( - BlobServiceClient::connect("http://[::1]:8000").await?, - ); + Commands::Mount { + dest, + blob_service_addr, + } => { + let blob_service = blobservice::from_addr(&blob_service_addr).await?; + let directory_service = GRPCDirectoryService::from_client( DirectoryServiceClient::connect("http://[::1]:8000").await?, ); @@ -192,7 +204,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { tokio::task::spawn_blocking(move || { let f = FUSE::new( - Arc::new(blob_service), + blob_service, Arc::new(directory_service), Arc::new(path_info_service), ); diff --git a/tvix/store/src/blobservice/from_addr.rs b/tvix/store/src/blobservice/from_addr.rs new file mode 100644 index 000000000000..761498041d9b --- /dev/null +++ b/tvix/store/src/blobservice/from_addr.rs @@ -0,0 +1,31 @@ +use std::sync::Arc; +use url::Url; + +use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService}; + +/// Constructs a new instance of a [BlobService] from an URI. +/// +/// The following schemes are supported by the following services: +/// - `memory://` ([MemoryBlobService]) +/// - `sled://` ([SledBlobService]) +/// - `grpc+*://` ([GRPCBlobService]) +/// +/// See their [from_url] methods for more details about their syntax. +pub async fn from_addr(uri: &str) -> Result<Arc<dyn BlobService>, crate::Error> { + let url = Url::parse(uri).map_err(|e| { + crate::Error::StorageError(format!("unable to parse url: {}", e.to_string())) + })?; + + Ok(if url.scheme() == "memory" { + Arc::new(MemoryBlobService::from_url(&url)?) + } else if url.scheme() == "sled" { + Arc::new(SledBlobService::from_url(&url)?) + } else if url.scheme().starts_with("grpc+") { + Arc::new(GRPCBlobService::from_url(&url)?) + } else { + Err(crate::Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))? + }) +} diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index fbf2dfe72e90..a2c7f56f0d3f 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -2,7 +2,7 @@ use super::{BlobService, BlobWriter}; use crate::{proto, B3Digest}; use futures::sink::{SinkExt, SinkMapErr}; use std::{collections::VecDeque, io}; -use tokio::task::JoinHandle; +use tokio::{net::UnixStream, task::JoinHandle}; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_util::{ io::{CopyToBytes, SinkWriter, SyncIoBridge}, @@ -36,6 +36,58 @@ impl GRPCBlobService { } impl BlobService for GRPCBlobService { + /// Constructs a [GRPCBlobService] from the passed [url::Url]: + /// - scheme has to match `grpc+*://`. + /// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + /// - In the case of unix sockets, there must be a path, but may not be a host. + /// - In the case of non-unix sockets, there must be a host, but no path. + fn from_url(url: &url::Url) -> Result<Self, crate::Error> { + // Start checking for the scheme to start with grpc+. + match url.scheme().strip_prefix("grpc+") { + None => Err(crate::Error::StorageError("invalid scheme".to_string())), + Some(rest) => { + if rest == "unix" { + if url.host_str().is_some() { + return Err(crate::Error::StorageError( + "host may not be set".to_string(), + )); + } + let path = url.path().to_string(); + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter + .unwrap() + .connect_with_connector_lazy(tower::service_fn( + move |_: tonic::transport::Uri| UnixStream::connect(path.clone()), + )); + let grpc_client = proto::blob_service_client::BlobServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } else { + // ensure path is empty, not supported with gRPC. + if !url.path().is_empty() { + return Err(crate::Error::StorageError( + "path may not be set".to_string(), + )); + } + + // clone the uri, and drop the grpc+ from the scheme. + // Recreate a new uri with the `grpc+` prefix dropped from the scheme. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let url = { + let url_str = url.to_string(); + let s_stripped = url_str.strip_prefix("grpc+").unwrap(); + url::Url::parse(s_stripped).unwrap() + }; + let channel = tonic::transport::Endpoint::try_from(url.to_string()) + .unwrap() + .connect_lazy(); + + let grpc_client = proto::blob_service_client::BlobServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } + } + } + } + #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. @@ -236,3 +288,148 @@ impl io::Write for GRPCBlobWriter { } } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::thread; + + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio::task; + use tokio::time; + use tokio_stream::wrappers::UnixListenerStream; + + use crate::blobservice::MemoryBlobService; + use crate::proto::GRPCBlobServiceWrapper; + use crate::tests::fixtures; + + use super::BlobService; + use super::GRPCBlobService; + + /// This uses the wrong scheme + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme for a unix socket. + /// The fact that /path/to/somewhere doesn't exist yet is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_unix_path() { + let url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_ok()); + } + + /// This uses the correct scheme for a unix socket, + /// but sets a host, which is unsupported. + #[tokio::test] + async fn test_invalid_unix_path_with_domain() { + let url = + url::Url::parse("grpc+unix://host.example/path/to/somewhere").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme for a HTTP server. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_http() { + let url = url::Url::parse("grpc+http://localhost").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_ok()); + } + + /// This uses the correct scheme for a HTTPS server. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_https() { + let url = url::Url::parse("grpc+https://localhost").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_ok()); + } + + /// This uses the correct scheme, but also specifies + /// an additional path, which is not supported for gRPC. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_invalid_http_with_path() { + let url = url::Url::parse("grpc+https://localhost/some-path").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme for a unix socket, and provides a server on the other side. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let tmpdir = TempDir::new().unwrap(); + let path = tmpdir.path().join("daemon"); + + // let mut join_set = JoinSet::new(); + + // prepare a client + let client = { + let mut url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse"); + url.set_path(path.to_str().unwrap()); + GRPCBlobService::from_url(&url).expect("must succeed") + }; + + let path_copy = path.clone(); + + // Spin up a server, in a thread far away, which spawns its own tokio runtime, + // and blocks on the task. + thread::spawn(move || { + // Create the runtime + let rt = tokio::runtime::Runtime::new().unwrap(); + // Get a handle from this runtime + let handle = rt.handle(); + + let task = handle.spawn(async { + let uds = UnixListener::bind(path_copy).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = + server.add_service(crate::proto::blob_service_server::BlobServiceServer::new( + GRPCBlobServiceWrapper::from( + Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService> + ), + )); + router.serve_with_incoming(uds_stream).await + }); + + handle.block_on(task) + }); + + // wait for the socket to be created + { + let mut socket_created = false; + for _try in 1..20 { + if path.exists() { + socket_created = true; + break; + } + tokio::time::sleep(time::Duration::from_millis(20)).await; + } + + assert!( + socket_created, + "expected socket path to eventually get created, but never happened" + ); + } + + let has = task::spawn_blocking(move || { + println!("client has?"); + client + .has(&fixtures::BLOB_A_DIGEST) + .expect("must not be err") + }) + .await + .expect("must not be err"); + assert!(!has); + } +} diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 0ea8479f87e5..a53f8991498b 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -14,6 +14,22 @@ pub struct MemoryBlobService { } impl BlobService for MemoryBlobService { + /// Constructs a [MemoryBlobService] from the passed [url::Url]: + /// - scheme has to be `memory://` + /// - there may not be a host. + /// - there may not be a path. + fn from_url(url: &url::Url) -> Result<Self, Error> { + if url.scheme() != "memory" { + return Err(crate::Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() || !url.path().is_empty() { + return Err(crate::Error::StorageError("invalid url".to_string())); + } + + Ok(Self::default()) + } + #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result<bool, Error> { let db = self.db.read().unwrap(); @@ -113,3 +129,49 @@ impl BlobWriter for MemoryBlobWriter { } } } + +#[cfg(test)] +mod tests { + use super::BlobService; + use super::MemoryBlobService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } + + /// This correctly sets the scheme, and doesn't set a path. + #[test] + fn test_valid_scheme() { + let url = url::Url::parse("memory://").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_ok()); + } + + /// This sets the host to `foo` + #[test] + fn test_invalid_host() { + let url = url::Url::parse("memory://foo").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } + + /// This has the path "/", which is invalid. + #[test] + fn test_invalid_has_path() { + let url = url::Url::parse("memory:///").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } + + /// This has the path "/foo", which is invalid. + #[test] + fn test_invalid_path2() { + let url = url::Url::parse("memory:///foo").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } +} diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs index c1bca927d7d0..622d058353a6 100644 --- a/tvix/store/src/blobservice/mod.rs +++ b/tvix/store/src/blobservice/mod.rs @@ -2,10 +2,12 @@ use std::io; use crate::{B3Digest, Error}; +mod from_addr; mod grpc; mod memory; mod sled; +pub use self::from_addr::from_addr; pub use self::grpc::GRPCBlobService; pub use self::memory::MemoryBlobService; pub use self::sled::SledBlobService; @@ -16,6 +18,11 @@ pub use self::sled::SledBlobService; /// Blob, which will return something implmenting io::Write, and providing a /// close funtion, to finalize a blob and get its digest. pub trait BlobService: Send + Sync { + /// Create a new instance by passing in a connection URL. + fn from_url(url: &url::Url) -> Result<Self, Error> + where + Self: Sized; + /// Check if the service has the blob, by its content hash. fn has(&self, digest: &B3Digest) -> Result<bool, Error>; diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 1ae71170e1e4..6b38a5e0ed81 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -28,6 +28,36 @@ impl SledBlobService { } impl BlobService for SledBlobService { + /// Constructs a [SledBlobService] from the passed [url::Url]: + /// - scheme has to be `sled://` + /// - there may not be a host. + /// - a path to the sled needs to be provided (which may not be `/`). + fn from_url(url: &url::Url) -> Result<Self, Error> { + if url.scheme() != "sled" { + return Err(crate::Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() { + return Err(crate::Error::StorageError(format!( + "invalid host: {}", + url.host().unwrap() + ))); + } + + // TODO: expose compression and other parameters as URL parameters, drop new and new_temporary? + if url.path().is_empty() { + Self::new_temporary().map_err(|e| Error::StorageError(e.to_string())) + } else { + if url.path() == "/" { + Err(crate::Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )) + } else { + Self::new(url.path().into()).map_err(|e| Error::StorageError(e.to_string())) + } + } + } + #[instrument(skip(self), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result<bool, Error> { match self.db.contains_key(digest.to_vec()) { @@ -127,3 +157,77 @@ impl BlobWriter for SledBlobWriter { } } } + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use super::BlobService; + use super::SledBlobService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and doesn't specify a path (temporary sled). + #[test] + fn test_valid_scheme_temporary() { + let url = url::Url::parse("sled://").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_ok()); + } + + /// This sets the path to a location that doesn't exist, which should fail (as sled doesn't mkdir -p) + #[test] + fn test_nonexistent_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().join("foo").join("bar").to_str().unwrap()); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and specifies / as path (which should fail + // for obvious reasons) + #[test] + fn test_invalid_path_root() { + let url = url::Url::parse("sled:///").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and sets a tempdir as location. + #[test] + fn test_valid_scheme_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!(SledBlobService::from_url(&url).is_ok()); + } + + /// This sets a host, rather than a path, which should fail. + #[test] + fn test_invalid_host() { + let url = url::Url::parse("sled://foo.example").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This sets a host AND a valid path, which should fail + #[test] + fn test_invalid_host_and_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!(SledBlobService::from_url(&url).is_err()); + } +} |