From a1acb5bcb301bd599d97909abebcda6235421dc4 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 9 Jun 2023 19:07:00 +0300 Subject: feat(tvix/store/blobsvc): add from_addr This allows constructing blob stores with a URL syntax at runtime, by passing the --blob-service-addr arg. We probably still want to have some builder pattern here, to allow additional schemes to be registered. Change-Id: Ie588ff7a7c6fb64c9474dfbd2e4bc5f168dfd778 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8742 Reviewed-by: tazjin Tested-by: BuildkiteCI --- tvix/Cargo.lock | 65 ++++++++++- tvix/Cargo.nix | 166 +++++++++++++++++++++++++- tvix/store/Cargo.toml | 1 + tvix/store/src/bin/tvix-store.rs | 46 +++++--- tvix/store/src/blobservice/from_addr.rs | 31 +++++ tvix/store/src/blobservice/grpc.rs | 199 +++++++++++++++++++++++++++++++- tvix/store/src/blobservice/memory.rs | 62 ++++++++++ tvix/store/src/blobservice/mod.rs | 7 ++ tvix/store/src/blobservice/sled.rs | 104 +++++++++++++++++ 9 files changed, 657 insertions(+), 24 deletions(-) create mode 100644 tvix/store/src/blobservice/from_addr.rs diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index dbc572b01aa7..7bcf60dceca3 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -724,6 +724,15 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652" +dependencies = [ + "percent-encoding", +] + [[package]] name = "fs2" version = "0.4.3" @@ -1042,6 +1051,16 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "idna" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "imbl" version = "2.0.0" @@ -1458,9 +1477,9 @@ checksum = "ecba01bf2678719532c5e3059e0b5f0811273d94b397088b82e3bd0a78c78fdd" [[package]] name = "percent-encoding" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" [[package]] name = "petgraph" @@ -2369,6 +2388,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.28.0" @@ -2761,6 +2795,7 @@ dependencies = [ "tracing", "tracing-subscriber", "tvix-eval", + "url", "walkdir", ] @@ -2776,12 +2811,27 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" +[[package]] +name = "unicode-bidi" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" + [[package]] name = "unicode-ident" version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5464a87b239f13a63a501f2701565754bae92d243d4bb7eb12f6d57d2269bf4" +[[package]] +name = "unicode-normalization" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-segmentation" version = "1.10.1" @@ -2800,6 +2850,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" +[[package]] +name = "url" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + [[package]] name = "users" version = "0.11.0" diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 30a845dd9c05..df2b4495ae69 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -2014,6 +2014,28 @@ rec { }; resolvedDefaultFeatures = [ "default" "std" ]; }; + "form_urlencoded" = rec { + crateName = "form_urlencoded"; + version = "1.2.0"; + edition = "2018"; + sha256 = "0ljn0kz23nr9yf3432k656k178nh4jqryfji9b0jw343dz7w2ax6"; + authors = [ + "The rust-url developers" + ]; + dependencies = [ + { + name = "percent-encoding"; + packageId = "percent-encoding"; + usesDefaultFeatures = false; + } + ]; + features = { + "alloc" = [ "percent-encoding/alloc" ]; + "default" = [ "std" ]; + "std" = [ "alloc" "percent-encoding/std" ]; + }; + resolvedDefaultFeatures = [ "alloc" "default" "std" ]; + }; "fs2" = rec { crateName = "fs2"; version = "0.4.3"; @@ -2956,6 +2978,33 @@ rec { ]; }; + "idna" = rec { + crateName = "idna"; + version = "0.4.0"; + edition = "2018"; + sha256 = "0z4i1dhqk83bbv230pp1c31dqdlnscvqxvc85n40ihgvgfqdc83x"; + authors = [ + "The rust-url developers" + ]; + dependencies = [ + { + name = "unicode-bidi"; + packageId = "unicode-bidi"; + usesDefaultFeatures = false; + features = [ "hardcoded-data" ]; + } + { + name = "unicode-normalization"; + packageId = "unicode-normalization"; + usesDefaultFeatures = false; + } + ]; + features = { + "default" = [ "std" ]; + "std" = [ "alloc" "unicode-bidi/std" "unicode-normalization/std" ]; + }; + resolvedDefaultFeatures = [ "alloc" "default" "std" ]; + }; "imbl" = rec { crateName = "imbl"; version = "2.0.0"; @@ -4144,16 +4193,17 @@ rec { }; "percent-encoding" = rec { crateName = "percent-encoding"; - version = "2.2.0"; + version = "2.3.0"; edition = "2018"; - sha256 = "13nrpp6r1f4k14viksga3094krcrxgv4b42kqbriy63k7ln5g327"; + sha256 = "152slflmparkh27hprw62sph8rv77wckzhwl2dhqk6bf563lfalv"; authors = [ "The rust-url developers" ]; features = { - "default" = [ "alloc" ]; + "default" = [ "std" ]; + "std" = [ "alloc" ]; }; - resolvedDefaultFeatures = [ "alloc" "default" ]; + resolvedDefaultFeatures = [ "alloc" "default" "std" ]; }; "petgraph" = rec { crateName = "petgraph"; @@ -6720,6 +6770,43 @@ rec { } ]; + }; + "tinyvec" = rec { + crateName = "tinyvec"; + version = "1.6.0"; + edition = "2018"; + sha256 = "0l6bl2h62a5m44jdnpn7lmj14rd44via8180i7121fvm73mmrk47"; + authors = [ + "Lokathor " + ]; + dependencies = [ + { + name = "tinyvec_macros"; + packageId = "tinyvec_macros"; + optional = true; + } + ]; + features = { + "alloc" = [ "tinyvec_macros" ]; + "arbitrary" = [ "dep:arbitrary" ]; + "real_blackbox" = [ "criterion/real_blackbox" ]; + "rustc_1_55" = [ "rustc_1_40" ]; + "rustc_1_57" = [ "rustc_1_55" ]; + "serde" = [ "dep:serde" ]; + "std" = [ "alloc" ]; + "tinyvec_macros" = [ "dep:tinyvec_macros" ]; + }; + resolvedDefaultFeatures = [ "alloc" "default" "tinyvec_macros" ]; + }; + "tinyvec_macros" = rec { + crateName = "tinyvec_macros"; + version = "0.1.1"; + edition = "2018"; + sha256 = "081gag86208sc3y6sdkshgw3vysm5d34p431dzw0bshz66ncng0z"; + authors = [ + "Soveu " + ]; + }; "tokio" = rec { crateName = "tokio"; @@ -8219,6 +8306,10 @@ rec { name = "tvix-eval"; packageId = "tvix-eval"; } + { + name = "url"; + packageId = "url"; + } { name = "walkdir"; packageId = "walkdir"; @@ -8278,6 +8369,25 @@ rec { sha256 = "154smf048k84prsdgh09nkm2n0w0336v84jd4zikyn6v6jrqbspa"; }; + "unicode-bidi" = rec { + crateName = "unicode-bidi"; + version = "0.3.13"; + edition = "2018"; + sha256 = "0q0l7rdkiq54pan7a4ama39dgynaf1mnjj1nddrq1w1zayjqp24j"; + libName = "unicode_bidi"; + authors = [ + "The Servo Project Developers" + ]; + features = { + "default" = [ "std" "hardcoded-data" ]; + "flame" = [ "dep:flame" ]; + "flame_it" = [ "flame" "flamer" ]; + "flamer" = [ "dep:flamer" ]; + "serde" = [ "dep:serde" ]; + "with_serde" = [ "serde" ]; + }; + resolvedDefaultFeatures = [ "hardcoded-data" "std" ]; + }; "unicode-ident" = rec { crateName = "unicode-ident"; version = "1.0.8"; @@ -8288,6 +8398,27 @@ rec { ]; }; + "unicode-normalization" = rec { + crateName = "unicode-normalization"; + version = "0.1.22"; + edition = "2018"; + sha256 = "08d95g7b1irc578b2iyhzv4xhsa4pfvwsqxcl9lbcpabzkq16msw"; + authors = [ + "kwantam " + "Manish Goregaokar " + ]; + dependencies = [ + { + name = "tinyvec"; + packageId = "tinyvec"; + features = [ "alloc" ]; + } + ]; + features = { + "default" = [ "std" ]; + }; + resolvedDefaultFeatures = [ "std" ]; + }; "unicode-segmentation" = rec { crateName = "unicode-segmentation"; version = "1.10.1"; @@ -8328,6 +8459,33 @@ rec { features = { }; resolvedDefaultFeatures = [ "default" ]; }; + "url" = rec { + crateName = "url"; + version = "2.4.0"; + edition = "2018"; + sha256 = "1jw89ack5ldvajpzsvhq9sy12y2xqa2x0cbin62hl80r3s1zggsh"; + authors = [ + "The rust-url developers" + ]; + dependencies = [ + { + name = "form_urlencoded"; + packageId = "form_urlencoded"; + } + { + name = "idna"; + packageId = "idna"; + } + { + name = "percent-encoding"; + packageId = "percent-encoding"; + } + ]; + features = { + "serde" = [ "dep:serde" ]; + }; + resolvedDefaultFeatures = [ "default" ]; + }; "users" = rec { crateName = "users"; version = "0.11.0"; diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 3b2765f99281..db4e2b3c27e9 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -29,6 +29,7 @@ futures = "0.3.28" bytes = "1.4.0" smol_str = "0.2.0" serde_json = "1.0" +url = "2.4.0" [dependencies.fuser] optional = true diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index b95f408a55b6..6e2424a7e578 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -6,16 +6,13 @@ use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use tracing_subscriber::prelude::*; -use tvix_store::blobservice::BlobService; -use tvix_store::blobservice::GRPCBlobService; -use tvix_store::blobservice::SledBlobService; +use tvix_store::blobservice; use tvix_store::directoryservice::DirectoryService; use tvix_store::directoryservice::GRPCDirectoryService; use tvix_store::directoryservice::SledDirectoryService; use tvix_store::pathinfoservice::GRPCPathInfoService; use tvix_store::pathinfoservice::PathInfoService; use tvix_store::pathinfoservice::SledPathInfoService; -use tvix_store::proto::blob_service_client::BlobServiceClient; use tvix_store::proto::blob_service_server::BlobServiceServer; use tvix_store::proto::directory_service_client::DirectoryServiceClient; use tvix_store::proto::directory_service_server::DirectoryServiceServer; @@ -55,17 +52,26 @@ enum Commands { Daemon { #[arg(long, short = 'l')] listen_address: Option, + + #[arg(long, env, default_value = "sled:///var/lib/tvix-store/blobs.sled")] + blob_service_addr: String, }, /// Imports a list of paths into the store (not using the daemon) Import { #[clap(value_name = "PATH")] paths: Vec, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, }, /// Mounts a tvix-store at the given mountpoint #[cfg(feature = "fuse")] Mount { #[clap(value_name = "PATH")] dest: PathBuf, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, }, } @@ -99,10 +105,12 @@ async fn main() -> Result<(), Box> { tracing::subscriber::set_global_default(subscriber).expect("Unable to set global subscriber"); match cli.command { - Commands::Daemon { listen_address } => { + Commands::Daemon { + listen_address, + blob_service_addr, + } => { // initialize stores - let blob_service: Arc = - Arc::new(SledBlobService::new("blobs.sled".into())?); + let blob_service = blobservice::from_addr(&blob_service_addr).await?; let directory_service: Arc = Arc::new(SledDirectoryService::new("directories.sled".into())?); let path_info_service: Arc = Arc::new(SledPathInfoService::new( @@ -142,10 +150,12 @@ async fn main() -> Result<(), Box> { router.serve(listen_address).await?; } - Commands::Import { paths } => { - let blob_service = GRPCBlobService::from_client( - BlobServiceClient::connect("http://[::1]:8000").await?, - ); + Commands::Import { + paths, + blob_service_addr, + } => { + let blob_service = blobservice::from_addr(&blob_service_addr).await?; + let directory_service = GRPCDirectoryService::from_client( DirectoryServiceClient::connect("http://[::1]:8000").await?, ); @@ -155,7 +165,7 @@ async fn main() -> Result<(), Box> { GRPCPathInfoService::from_client(path_info_service_client.clone()); let io = Arc::new(TvixStoreIO::new( - Arc::new(blob_service), + blob_service, Arc::new(directory_service), Arc::new(path_info_service), )); @@ -178,10 +188,12 @@ async fn main() -> Result<(), Box> { try_join_all(tasks).await?; } #[cfg(feature = "fuse")] - Commands::Mount { dest } => { - let blob_service = GRPCBlobService::from_client( - BlobServiceClient::connect("http://[::1]:8000").await?, - ); + Commands::Mount { + dest, + blob_service_addr, + } => { + let blob_service = blobservice::from_addr(&blob_service_addr).await?; + let directory_service = GRPCDirectoryService::from_client( DirectoryServiceClient::connect("http://[::1]:8000").await?, ); @@ -192,7 +204,7 @@ async fn main() -> Result<(), Box> { tokio::task::spawn_blocking(move || { let f = FUSE::new( - Arc::new(blob_service), + blob_service, Arc::new(directory_service), Arc::new(path_info_service), ); diff --git a/tvix/store/src/blobservice/from_addr.rs b/tvix/store/src/blobservice/from_addr.rs new file mode 100644 index 000000000000..761498041d9b --- /dev/null +++ b/tvix/store/src/blobservice/from_addr.rs @@ -0,0 +1,31 @@ +use std::sync::Arc; +use url::Url; + +use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService}; + +/// Constructs a new instance of a [BlobService] from an URI. +/// +/// The following schemes are supported by the following services: +/// - `memory://` ([MemoryBlobService]) +/// - `sled://` ([SledBlobService]) +/// - `grpc+*://` ([GRPCBlobService]) +/// +/// See their [from_url] methods for more details about their syntax. +pub async fn from_addr(uri: &str) -> Result, crate::Error> { + let url = Url::parse(uri).map_err(|e| { + crate::Error::StorageError(format!("unable to parse url: {}", e.to_string())) + })?; + + Ok(if url.scheme() == "memory" { + Arc::new(MemoryBlobService::from_url(&url)?) + } else if url.scheme() == "sled" { + Arc::new(SledBlobService::from_url(&url)?) + } else if url.scheme().starts_with("grpc+") { + Arc::new(GRPCBlobService::from_url(&url)?) + } else { + Err(crate::Error::StorageError(format!( + "unknown scheme: {}", + url.scheme() + )))? + }) +} diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index fbf2dfe72e90..a2c7f56f0d3f 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -2,7 +2,7 @@ use super::{BlobService, BlobWriter}; use crate::{proto, B3Digest}; use futures::sink::{SinkExt, SinkMapErr}; use std::{collections::VecDeque, io}; -use tokio::task::JoinHandle; +use tokio::{net::UnixStream, task::JoinHandle}; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tokio_util::{ io::{CopyToBytes, SinkWriter, SyncIoBridge}, @@ -36,6 +36,58 @@ impl GRPCBlobService { } impl BlobService for GRPCBlobService { + /// Constructs a [GRPCBlobService] from the passed [url::Url]: + /// - scheme has to match `grpc+*://`. + /// That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. + /// - In the case of unix sockets, there must be a path, but may not be a host. + /// - In the case of non-unix sockets, there must be a host, but no path. + fn from_url(url: &url::Url) -> Result { + // Start checking for the scheme to start with grpc+. + match url.scheme().strip_prefix("grpc+") { + None => Err(crate::Error::StorageError("invalid scheme".to_string())), + Some(rest) => { + if rest == "unix" { + if url.host_str().is_some() { + return Err(crate::Error::StorageError( + "host may not be set".to_string(), + )); + } + let path = url.path().to_string(); + let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter + .unwrap() + .connect_with_connector_lazy(tower::service_fn( + move |_: tonic::transport::Uri| UnixStream::connect(path.clone()), + )); + let grpc_client = proto::blob_service_client::BlobServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } else { + // ensure path is empty, not supported with gRPC. + if !url.path().is_empty() { + return Err(crate::Error::StorageError( + "path may not be set".to_string(), + )); + } + + // clone the uri, and drop the grpc+ from the scheme. + // Recreate a new uri with the `grpc+` prefix dropped from the scheme. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let url = { + let url_str = url.to_string(); + let s_stripped = url_str.strip_prefix("grpc+").unwrap(); + url::Url::parse(s_stripped).unwrap() + }; + let channel = tonic::transport::Endpoint::try_from(url.to_string()) + .unwrap() + .connect_lazy(); + + let grpc_client = proto::blob_service_client::BlobServiceClient::new(channel); + Ok(Self::from_client(grpc_client)) + } + } + } + } + #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result { // Get a new handle to the gRPC client, and copy the digest. @@ -236,3 +288,148 @@ impl io::Write for GRPCBlobWriter { } } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::thread; + + use tempfile::TempDir; + use tokio::net::UnixListener; + use tokio::task; + use tokio::time; + use tokio_stream::wrappers::UnixListenerStream; + + use crate::blobservice::MemoryBlobService; + use crate::proto::GRPCBlobServiceWrapper; + use crate::tests::fixtures; + + use super::BlobService; + use super::GRPCBlobService; + + /// This uses the wrong scheme + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme for a unix socket. + /// The fact that /path/to/somewhere doesn't exist yet is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_unix_path() { + let url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_ok()); + } + + /// This uses the correct scheme for a unix socket, + /// but sets a host, which is unsupported. + #[tokio::test] + async fn test_invalid_unix_path_with_domain() { + let url = + url::Url::parse("grpc+unix://host.example/path/to/somewhere").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme for a HTTP server. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_http() { + let url = url::Url::parse("grpc+http://localhost").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_ok()); + } + + /// This uses the correct scheme for a HTTPS server. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_valid_https() { + let url = url::Url::parse("grpc+https://localhost").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_ok()); + } + + /// This uses the correct scheme, but also specifies + /// an additional path, which is not supported for gRPC. + /// The fact that nothing is listening there is no problem, because we connect lazily. + #[tokio::test] + async fn test_invalid_http_with_path() { + let url = url::Url::parse("grpc+https://localhost/some-path").expect("must parse"); + + assert!(GRPCBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme for a unix socket, and provides a server on the other side. + #[tokio::test] + async fn test_valid_unix_path_ping_pong() { + let tmpdir = TempDir::new().unwrap(); + let path = tmpdir.path().join("daemon"); + + // let mut join_set = JoinSet::new(); + + // prepare a client + let client = { + let mut url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse"); + url.set_path(path.to_str().unwrap()); + GRPCBlobService::from_url(&url).expect("must succeed") + }; + + let path_copy = path.clone(); + + // Spin up a server, in a thread far away, which spawns its own tokio runtime, + // and blocks on the task. + thread::spawn(move || { + // Create the runtime + let rt = tokio::runtime::Runtime::new().unwrap(); + // Get a handle from this runtime + let handle = rt.handle(); + + let task = handle.spawn(async { + let uds = UnixListener::bind(path_copy).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new server + let mut server = tonic::transport::Server::builder(); + let router = + server.add_service(crate::proto::blob_service_server::BlobServiceServer::new( + GRPCBlobServiceWrapper::from( + Arc::new(MemoryBlobService::default()) as Arc + ), + )); + router.serve_with_incoming(uds_stream).await + }); + + handle.block_on(task) + }); + + // wait for the socket to be created + { + let mut socket_created = false; + for _try in 1..20 { + if path.exists() { + socket_created = true; + break; + } + tokio::time::sleep(time::Duration::from_millis(20)).await; + } + + assert!( + socket_created, + "expected socket path to eventually get created, but never happened" + ); + } + + let has = task::spawn_blocking(move || { + println!("client has?"); + client + .has(&fixtures::BLOB_A_DIGEST) + .expect("must not be err") + }) + .await + .expect("must not be err"); + assert!(!has); + } +} diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index 0ea8479f87e5..a53f8991498b 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -14,6 +14,22 @@ pub struct MemoryBlobService { } impl BlobService for MemoryBlobService { + /// Constructs a [MemoryBlobService] from the passed [url::Url]: + /// - scheme has to be `memory://` + /// - there may not be a host. + /// - there may not be a path. + fn from_url(url: &url::Url) -> Result { + if url.scheme() != "memory" { + return Err(crate::Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() || !url.path().is_empty() { + return Err(crate::Error::StorageError("invalid url".to_string())); + } + + Ok(Self::default()) + } + #[instrument(skip(self, digest), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result { let db = self.db.read().unwrap(); @@ -113,3 +129,49 @@ impl BlobWriter for MemoryBlobWriter { } } } + +#[cfg(test)] +mod tests { + use super::BlobService; + use super::MemoryBlobService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } + + /// This correctly sets the scheme, and doesn't set a path. + #[test] + fn test_valid_scheme() { + let url = url::Url::parse("memory://").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_ok()); + } + + /// This sets the host to `foo` + #[test] + fn test_invalid_host() { + let url = url::Url::parse("memory://foo").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } + + /// This has the path "/", which is invalid. + #[test] + fn test_invalid_has_path() { + let url = url::Url::parse("memory:///").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } + + /// This has the path "/foo", which is invalid. + #[test] + fn test_invalid_path2() { + let url = url::Url::parse("memory:///foo").expect("must parse"); + + assert!(MemoryBlobService::from_url(&url).is_err()); + } +} diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs index c1bca927d7d0..622d058353a6 100644 --- a/tvix/store/src/blobservice/mod.rs +++ b/tvix/store/src/blobservice/mod.rs @@ -2,10 +2,12 @@ use std::io; use crate::{B3Digest, Error}; +mod from_addr; mod grpc; mod memory; mod sled; +pub use self::from_addr::from_addr; pub use self::grpc::GRPCBlobService; pub use self::memory::MemoryBlobService; pub use self::sled::SledBlobService; @@ -16,6 +18,11 @@ pub use self::sled::SledBlobService; /// Blob, which will return something implmenting io::Write, and providing a /// close funtion, to finalize a blob and get its digest. pub trait BlobService: Send + Sync { + /// Create a new instance by passing in a connection URL. + fn from_url(url: &url::Url) -> Result + where + Self: Sized; + /// Check if the service has the blob, by its content hash. fn has(&self, digest: &B3Digest) -> Result; diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 1ae71170e1e4..6b38a5e0ed81 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -28,6 +28,36 @@ impl SledBlobService { } impl BlobService for SledBlobService { + /// Constructs a [SledBlobService] from the passed [url::Url]: + /// - scheme has to be `sled://` + /// - there may not be a host. + /// - a path to the sled needs to be provided (which may not be `/`). + fn from_url(url: &url::Url) -> Result { + if url.scheme() != "sled" { + return Err(crate::Error::StorageError("invalid scheme".to_string())); + } + + if url.has_host() { + return Err(crate::Error::StorageError(format!( + "invalid host: {}", + url.host().unwrap() + ))); + } + + // TODO: expose compression and other parameters as URL parameters, drop new and new_temporary? + if url.path().is_empty() { + Self::new_temporary().map_err(|e| Error::StorageError(e.to_string())) + } else { + if url.path() == "/" { + Err(crate::Error::StorageError( + "cowardly refusing to open / with sled".to_string(), + )) + } else { + Self::new(url.path().into()).map_err(|e| Error::StorageError(e.to_string())) + } + } + } + #[instrument(skip(self), fields(blob.digest=%digest))] fn has(&self, digest: &B3Digest) -> Result { match self.db.contains_key(digest.to_vec()) { @@ -127,3 +157,77 @@ impl BlobWriter for SledBlobWriter { } } } + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use super::BlobService; + use super::SledBlobService; + + /// This uses a wrong scheme. + #[test] + fn test_invalid_scheme() { + let url = url::Url::parse("http://foo.example/test").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and doesn't specify a path (temporary sled). + #[test] + fn test_valid_scheme_temporary() { + let url = url::Url::parse("sled://").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_ok()); + } + + /// This sets the path to a location that doesn't exist, which should fail (as sled doesn't mkdir -p) + #[test] + fn test_nonexistent_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().join("foo").join("bar").to_str().unwrap()); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and specifies / as path (which should fail + // for obvious reasons) + #[test] + fn test_invalid_path_root() { + let url = url::Url::parse("sled:///").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This uses the correct scheme, and sets a tempdir as location. + #[test] + fn test_valid_scheme_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!(SledBlobService::from_url(&url).is_ok()); + } + + /// This sets a host, rather than a path, which should fail. + #[test] + fn test_invalid_host() { + let url = url::Url::parse("sled://foo.example").expect("must parse"); + + assert!(SledBlobService::from_url(&url).is_err()); + } + + /// This sets a host AND a valid path, which should fail + #[test] + fn test_invalid_host_and_path() { + let tmpdir = TempDir::new().unwrap(); + + let mut url = url::Url::parse("sled://foo.example").expect("must parse"); + url.set_path(tmpdir.path().to_str().unwrap()); + + assert!(SledBlobService::from_url(&url).is_err()); + } +} -- cgit 1.4.1