From 30e0c320666f8ecaf37f6d966e45c40f988cce78 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Mon, 13 Nov 2023 14:32:24 +0200 Subject: refactor(tvix/castore/tonic): make async, support wait-connect=? This moves the sync `channel::from_url` to a async `tonic::channel_from_url`. It now allows connecting non-lazily if `wait- connect=1` is set in the URL params. Also, make the pingpong tests for blobsvc and directorysvc use the wait- connect=1 codepath. Change-Id: Ibeea33117c8121814627e7f6aba0e943ae2e92ca Reviewed-on: https://cl.tvl.fyi/c/depot/+/10030 Tested-by: BuildkiteCI Reviewed-by: Connor Brewster --- tvix/castore/src/blobservice/from_addr.rs | 12 +-- tvix/castore/src/blobservice/grpc.rs | 14 ++- tvix/castore/src/channel.rs | 128 ------------------------- tvix/castore/src/directoryservice/from_addr.rs | 12 +-- tvix/castore/src/directoryservice/grpc.rs | 14 ++- tvix/castore/src/errors.rs | 4 +- tvix/castore/src/lib.rs | 2 +- tvix/castore/src/tonic.rs | 115 ++++++++++++++++++++++ 8 files changed, 144 insertions(+), 157 deletions(-) delete mode 100644 tvix/castore/src/channel.rs create mode 100644 tvix/castore/src/tonic.rs (limited to 'tvix/castore') diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index 916c00442c99..2834d25a1706 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -13,7 +13,7 @@ use super::{BlobService, GRPCBlobService, MemoryBlobService, SledBlobService}; /// - `grpc+*://` ([GRPCBlobService]) /// /// See their `from_url` methods for more details about their syntax. -pub fn from_addr(uri: &str) -> Result, crate::Error> { +pub async fn from_addr(uri: &str) -> Result, crate::Error> { let url = Url::parse(uri) .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; @@ -53,7 +53,7 @@ pub fn from_addr(uri: &str) -> Result, crate::Error> { // - In the case of unix sockets, there must be a path, but may not be a host. // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = BlobServiceClient::new(crate::channel::from_url(&url)?); + let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?); Arc::new(GRPCBlobService::from_client(client)) } else { Err(crate::Error::StorageError(format!( @@ -95,12 +95,6 @@ mod tests { #[test_case("memory:///", false; "memory invalid root path")] /// This sets a memory url path to "/foo", which is invalid. #[test_case("memory:///foo", false; "memory invalid root path foo")] - fn test_from_addr(uri_str: &str, is_ok: bool) { - assert_eq!(from_addr(uri_str).is_ok(), is_ok) - } - - // the gRPC tests below don't fail, because we connect lazily. - /// Correct scheme to connect to a unix socket. #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")] /// Correct scheme for unix socket, but setting a host too, which is invalid. @@ -115,6 +109,6 @@ mod tests { #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")] #[tokio::test] async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) { - assert_eq!(from_addr(uri_str).is_ok(), is_ok) + assert_eq!(from_addr(uri_str).await.is_ok(), is_ok) } } diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 85863c29197f..9cc4f340dbf7 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -285,10 +285,16 @@ mod tests { // prepare a client let grpc_client = { - let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) - .expect("must parse"); - let client = - BlobServiceClient::new(crate::channel::from_url(&url).expect("must succeed")); + let url = url::Url::parse(&format!( + "grpc+unix://{}?wait-connect=1", + socket_path.display() + )) + .expect("must parse"); + let client = BlobServiceClient::new( + crate::tonic::channel_from_url(&url) + .await + .expect("must succeed"), + ); GRPCBlobService::from_client(client) }; diff --git a/tvix/castore/src/channel.rs b/tvix/castore/src/channel.rs deleted file mode 100644 index 2fe97247679a..000000000000 --- a/tvix/castore/src/channel.rs +++ /dev/null @@ -1,128 +0,0 @@ -use tokio::net::UnixStream; -use tonic::transport::Channel; - -/// Turn a [url::Url] to a [Channel] if it can be parsed successfully. -/// It supports `grpc+unix:/path/to/socket`, -/// as well as the regular schemes supported by tonic, prefixed with grpc+, -/// for example `grpc+http://[::1]:8000`. -pub fn from_url(url: &url::Url) -> Result { - // Start checking for the scheme to start with grpc+. - // If it doesn't start with that, bail out. - match url.scheme().strip_prefix("grpc+") { - None => Err(Error::MissingGRPCPrefix()), - Some(rest) => { - if rest == "unix" { - if url.host_str().is_some() { - return Err(Error::HostSetForUnixSocket()); - } - - let url = url.clone(); - Ok( - tonic::transport::Endpoint::from_static("http://[::]:50051") // doesn't matter - .connect_with_connector_lazy(tower::service_fn( - move |_: tonic::transport::Uri| { - UnixStream::connect(url.path().to_string().clone()) - }, - )), - ) - } else { - // ensure path is empty, not supported with gRPC. - if !url.path().is_empty() { - return Err(Error::PathMayNotBeSet()); - } - - // Stringify the URL and remove the grpc+ prefix. - // We can't use `url.set_scheme(rest)`, as it disallows - // setting something http(s) that previously wasn't. - let url = url.to_string().strip_prefix("grpc+").unwrap().to_owned(); - - // Use the regular tonic transport::Endpoint logic to - Ok(tonic::transport::Endpoint::try_from(url) - .unwrap() - .connect_lazy()) - } - } - } -} - -/// Errors occuring when trying to connect to a backend -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("grpc+ prefix is missing from Url")] - MissingGRPCPrefix(), - - #[error("host may not be set for unix domain sockets")] - HostSetForUnixSocket(), - - #[error("path may not be set")] - PathMayNotBeSet(), - - #[error("transport error: {0}")] - TransportError(tonic::transport::Error), -} - -impl From for Error { - fn from(value: tonic::transport::Error) -> Self { - Self::TransportError(value) - } -} - -#[cfg(test)] -mod tests { - use super::from_url; - - /// This uses the correct scheme for a unix socket. - /// The fact that /path/to/somewhere doesn't exist yet is no problem, because we connect lazily. - #[tokio::test] - async fn test_valid_unix_path() { - let url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse"); - - assert!(from_url(&url).is_ok()) - } - - /// This uses the correct scheme for a unix socket, - /// but sets a host, which is unsupported. - #[tokio::test] - async fn test_invalid_unix_path_with_domain() { - let url = - url::Url::parse("grpc+unix://host.example/path/to/somewhere").expect("must parse"); - - assert!(from_url(&url).is_err()) - } - - /// This uses the wrong scheme - #[test] - fn test_invalid_scheme() { - let url = url::Url::parse("http://foo.example/test").expect("must parse"); - - assert!(from_url(&url).is_err()); - } - - /// This uses the correct scheme for a HTTP server. - /// The fact that nothing is listening there is no problem, because we connect lazily. - #[tokio::test] - async fn test_valid_http() { - let url = url::Url::parse("grpc+http://localhost").expect("must parse"); - - assert!(from_url(&url).is_ok()); - } - - /// This uses the correct scheme for a HTTPS server. - /// The fact that nothing is listening there is no problem, because we connect lazily. - #[tokio::test] - async fn test_valid_https() { - let url = url::Url::parse("grpc+https://localhost").expect("must parse"); - - assert!(from_url(&url).is_ok()); - } - - /// This uses the correct scheme, but also specifies - /// an additional path, which is not supported for gRPC. - /// The fact that nothing is listening there is no problem, because we connect lazily. - #[tokio::test] - async fn test_invalid_http_with_path() { - let url = url::Url::parse("grpc+https://localhost/some-path").expect("must parse"); - - assert!(from_url(&url).is_err()); - } -} diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs index 6082a8b49fb7..8f79fa6158cc 100644 --- a/tvix/castore/src/directoryservice/from_addr.rs +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -19,7 +19,7 @@ use super::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService, Sled /// Connects to a local tvix-store gRPC service via Unix socket. /// - `grpc+http://host:port`, `grpc+https://host:port` /// Connects to a (remote) tvix-store gRPC service. -pub fn from_addr(uri: &str) -> Result, crate::Error> { +pub async fn from_addr(uri: &str) -> Result, crate::Error> { let url = Url::parse(uri) .map_err(|e| crate::Error::StorageError(format!("unable to parse url: {}", e)))?; @@ -60,7 +60,7 @@ pub fn from_addr(uri: &str) -> Result, crate::Error> { // - In the case of unix sockets, there must be a path, but may not be a host. // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = DirectoryServiceClient::new(crate::channel::from_url(&url)?); + let client = DirectoryServiceClient::new(crate::tonic::channel_from_url(&url).await?); Arc::new(GRPCDirectoryService::from_client(client)) } else { Err(crate::Error::StorageError(format!( @@ -102,12 +102,6 @@ mod tests { #[test_case("memory:///", false; "memory invalid root path")] /// This sets a memory url path to "/foo", which is invalid. #[test_case("memory:///foo", false; "memory invalid root path foo")] - fn test_from_addr(uri_str: &str, is_ok: bool) { - assert_eq!(from_addr(uri_str).is_ok(), is_ok) - } - - // the gRPC tests below don't fail, because we connect lazily. - /// Correct scheme to connect to a unix socket. #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")] /// Correct scheme for unix socket, but setting a host too, which is invalid. @@ -122,6 +116,6 @@ mod tests { #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")] #[tokio::test] async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) { - assert_eq!(from_addr(uri_str).is_ok(), is_ok) + assert_eq!(from_addr(uri_str).await.is_ok(), is_ok) } } diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index f0569ac87800..eceaf5ed1f3c 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -462,10 +462,16 @@ mod tests { // prepare a client let grpc_client = { - let url = url::Url::parse(&format!("grpc+unix://{}", socket_path.display())) - .expect("must parse"); - let client = - DirectoryServiceClient::new(crate::channel::from_url(&url).expect("must succeed")); + let url = url::Url::parse(&format!( + "grpc+unix://{}?wait-connect=1", + socket_path.display() + )) + .expect("must parse"); + let client = DirectoryServiceClient::new( + crate::tonic::channel_from_url(&url) + .await + .expect("must succeed"), + ); GRPCDirectoryService::from_client(client) }; diff --git a/tvix/castore/src/errors.rs b/tvix/castore/src/errors.rs index 9f596afd4e26..4c164029aeec 100644 --- a/tvix/castore/src/errors.rs +++ b/tvix/castore/src/errors.rs @@ -34,8 +34,8 @@ impl From for Status { } } -impl From for Error { - fn from(value: crate::channel::Error) -> Self { +impl From for Error { + fn from(value: crate::tonic::Error) -> Self { Self::StorageError(value.to_string()) } } diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index 5a031aec7686..8d3dc7b4c4a5 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -2,11 +2,11 @@ mod digests; mod errors; pub mod blobservice; -pub mod channel; pub mod directoryservice; pub mod fixtures; pub mod import; pub mod proto; +pub mod tonic; pub mod utils; pub use digests::{B3Digest, B3_LEN}; diff --git a/tvix/castore/src/tonic.rs b/tvix/castore/src/tonic.rs new file mode 100644 index 000000000000..96f7c7174141 --- /dev/null +++ b/tvix/castore/src/tonic.rs @@ -0,0 +1,115 @@ +use tokio::net::UnixStream; +use tonic::transport::{Channel, Endpoint}; + +fn url_wants_wait_connect(url: &url::Url) -> bool { + url.query_pairs() + .filter(|(k, v)| k == "wait-connect" && v == "1") + .count() + > 0 +} + +/// Turn a [url::Url] to a [Channel] if it can be parsed successfully. +/// It supports `grpc+unix:/path/to/socket`, as well as the regular schemes supported +/// by tonic, for example `grpc+http://[::1]:8000`. +/// It supports wait-connect=1 as a URL parameter, in which case we don't connect lazily. +pub async fn channel_from_url(url: &url::Url) -> Result { + // Stringify the URL and remove the grpc+ prefix. + // We can't use `url.set_scheme(rest)`, as it disallows + // setting something http(s) that previously wasn't. + let unprefixed_url_str = match url.to_string().strip_prefix("grpc+") { + None => return Err(Error::MissingGRPCPrefix()), + Some(url_str) => url_str.to_owned(), + }; + + if url.scheme() == "grpc+unix" { + if url.host_str().is_some() { + return Err(Error::HostSetForUnixSocket()); + } + + let connector = tower::service_fn({ + let url = url.clone(); + move |_: tonic::transport::Uri| UnixStream::connect(url.path().to_string().clone()) + }); + + let channel = if url_wants_wait_connect(url) { + Endpoint::from_static("http://[::]:50051") + .connect_with_connector(connector) + .await? + } else { + Endpoint::from_static("http://[::]:50051").connect_with_connector_lazy(connector) + }; + + return Ok(channel); + } + + // ensure path is empty, not supported with gRPC. + if !url.path().is_empty() { + return Err(Error::PathMayNotBeSet()); + } + + // Use the regular tonic transport::Endpoint logic, but unprefixed_url_str, + // as tonic doesn't know about grpc+http[s]. + let endpoint = Endpoint::try_from(unprefixed_url_str)?; + let channel = if url_wants_wait_connect(url) { + endpoint.connect().await? + } else { + endpoint.connect_lazy() + }; + + Ok(channel) +} + +/// Errors occuring when trying to connect to a backend +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("grpc+ prefix is missing from URL")] + MissingGRPCPrefix(), + + #[error("host may not be set for unix domain sockets")] + HostSetForUnixSocket(), + + #[error("path may not be set")] + PathMayNotBeSet(), + + #[error("transport error: {0}")] + TransportError(tonic::transport::Error), +} + +impl From for Error { + fn from(value: tonic::transport::Error) -> Self { + Self::TransportError(value) + } +} + +#[cfg(test)] +mod tests { + use super::channel_from_url; + use test_case::test_case; + use url::Url; + + /// Correct scheme to connect to a unix socket. + #[test_case("grpc+unix:///path/to/somewhere", true; "grpc valid unix socket")] + /// Connecting with wait-connect set to 0 succeeds, as that's the default. + #[test_case("grpc+unix:///path/to/somewhere?wait-connect=0", true; "grpc valid unix wait-connect=0")] + /// Connecting with wait-connect set to 1 fails, as the path doesn't exist. + #[test_case("grpc+unix:///path/to/somewhere?wait-connect=1", false; "grpc valid unix wait-connect=1")] + /// Correct scheme for unix socket, but setting a host too, which is invalid. + #[test_case("grpc+unix://host.example/path/to/somewhere", false; "grpc invalid unix socket and host")] + /// Correct scheme to connect to localhost, with port 12345 + #[test_case("grpc+http://[::1]:12345", true; "grpc valid IPv6 localhost port 12345")] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[test_case("grpc+http://localhost", true; "grpc valid http host without port")] + /// Correct scheme to connect to localhost over http, without specifying a port. + #[test_case("grpc+https://localhost", true; "grpc valid https host without port")] + /// Correct scheme to connect to localhost over http, but with additional path, which is invalid. + #[test_case("grpc+http://localhost/some-path", false; "grpc valid invalid host and path")] + /// Connecting with wait-connect set to 0 succeeds, as that's the default. + #[test_case("grpc+http://localhost?wait-connect=0", true; "grpc valid host wait-connect=0")] + /// Connecting with wait-connect set to 1 fails, as the host doesn't exist. + #[test_case("grpc+http://nonexist.invalid?wait-connect=1", false; "grpc valid host wait-connect=1")] + #[tokio::test] + async fn test_from_addr_tokio(uri_str: &str, is_ok: bool) { + let url = Url::parse(uri_str).expect("must parse"); + assert_eq!(channel_from_url(&url).await.is_ok(), is_ok) + } +} -- cgit 1.4.1