diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/Cargo.lock | 41 | ||||
-rw-r--r-- | tvix/Cargo.nix | 167 | ||||
-rw-r--r-- | tvix/castore/Cargo.toml | 2 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/from_addr.rs | 8 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 27 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/from_addr.rs | 8 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 26 | ||||
-rw-r--r-- | tvix/docs/src/TODO.md | 3 | ||||
-rw-r--r-- | tvix/store/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 14 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/from_addr.rs | 9 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 32 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/tests/utils.rs | 7 | ||||
-rw-r--r-- | tvix/store/src/utils.rs | 14 | ||||
-rw-r--r-- | tvix/tracing/Cargo.toml | 11 | ||||
-rw-r--r-- | tvix/tracing/src/lib.rs | 7 | ||||
-rw-r--r-- | tvix/tracing/src/propagate/mod.rs | 9 | ||||
-rw-r--r-- | tvix/tracing/src/propagate/tonic.rs | 59 |
18 files changed, 399 insertions, 48 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index b4faaad0d298..ab7eec0b7324 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -1544,6 +1544,12 @@ dependencies = [ ] [[package]] +name = "http-range-header" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" + +[[package]] name = "httparse" version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2277,6 +2283,18 @@ dependencies = [ ] [[package]] +name = "opentelemetry-http" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7690dc77bf776713848c4faa6501157469017eaf332baccd4eb1cea928743d94" +dependencies = [ + "async-trait", + "bytes", + "http", + "opentelemetry", +] + +[[package]] name = "opentelemetry-otlp" version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4033,6 +4051,25 @@ dependencies = [ ] [[package]] +name = "tower-http" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +dependencies = [ + "bitflags 2.4.2", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] name = "tower-layer" version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4430,6 +4467,7 @@ dependencies = [ "tonic-build", "tonic-reflection", "tower", + "tower-http", "tracing", "tracing-indicatif", "tvix-castore", @@ -4442,13 +4480,16 @@ dependencies = [ name = "tvix-tracing" version = "0.1.0" dependencies = [ + "http", "indicatif", "lazy_static", "opentelemetry", + "opentelemetry-http", "opentelemetry-otlp", "opentelemetry_sdk", "thiserror", "tokio", + "tonic", "tracing", "tracing-indicatif", "tracing-opentelemetry", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index acd3e1dfb329..008080c461a0 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -4653,6 +4653,13 @@ rec { ]; }; + "http-range-header" = rec { + crateName = "http-range-header"; + version = "0.3.1"; + edition = "2018"; + sha256 = "13vm511vq3bhschkw2xi9nhxzkw53m55gn9vxg7qigfxc29spl5d"; + features = { }; + }; "httparse" = rec { crateName = "httparse"; version = "1.8.0"; @@ -6928,6 +6935,39 @@ rec { }; resolvedDefaultFeatures = [ "default" "metrics" "pin-project-lite" "trace" ]; }; + "opentelemetry-http" = rec { + crateName = "opentelemetry-http"; + version = "0.11.1"; + edition = "2021"; + sha256 = "151xfhlakkmi9v6sqarkmxz02sbl2l0nbajgij216rvppxvxr43n"; + dependencies = [ + { + name = "async-trait"; + packageId = "async-trait"; + } + { + name = "bytes"; + packageId = "bytes"; + } + { + name = "http"; + packageId = "http"; + usesDefaultFeatures = false; + } + { + name = "opentelemetry"; + packageId = "opentelemetry"; + features = [ "trace" ]; + } + ]; + features = { + "hyper" = [ "dep:hyper" ]; + "isahc" = [ "dep:isahc" ]; + "reqwest" = [ "dep:reqwest" ]; + "reqwest-rustls" = [ "reqwest" "reqwest/rustls-tls-native-roots" ]; + "tokio" = [ "dep:tokio" ]; + }; + }; "opentelemetry-otlp" = rec { crateName = "opentelemetry-otlp"; version = "0.15.0"; @@ -12569,6 +12609,106 @@ rec { }; resolvedDefaultFeatures = [ "__common" "balance" "buffer" "default" "discover" "futures-core" "futures-util" "indexmap" "limit" "load" "log" "make" "pin-project" "pin-project-lite" "rand" "ready-cache" "slab" "timeout" "tokio" "tokio-util" "tracing" "util" ]; }; + "tower-http" = rec { + crateName = "tower-http"; + version = "0.4.4"; + edition = "2018"; + sha256 = "0h0i2flrw25zwxv72sifq4v5mwcb030spksy7r2a4xl2d4fvpib1"; + authors = [ + "Tower Maintainers <team@tower-rs.com>" + ]; + dependencies = [ + { + name = "bitflags"; + packageId = "bitflags 2.4.2"; + } + { + name = "bytes"; + packageId = "bytes"; + } + { + name = "futures-core"; + packageId = "futures-core"; + } + { + name = "futures-util"; + packageId = "futures-util"; + usesDefaultFeatures = false; + } + { + name = "http"; + packageId = "http"; + } + { + name = "http-body"; + packageId = "http-body"; + } + { + name = "http-range-header"; + packageId = "http-range-header"; + } + { + name = "pin-project-lite"; + packageId = "pin-project-lite"; + } + { + name = "tower-layer"; + packageId = "tower-layer"; + } + { + name = "tower-service"; + packageId = "tower-service"; + } + { + name = "tracing"; + packageId = "tracing"; + optional = true; + usesDefaultFeatures = false; + } + ]; + devDependencies = [ + { + name = "bytes"; + packageId = "bytes"; + } + ]; + features = { + "async-compression" = [ "dep:async-compression" ]; + "auth" = [ "base64" "validate-request" ]; + "base64" = [ "dep:base64" ]; + "catch-panic" = [ "tracing" "futures-util/std" ]; + "compression-br" = [ "async-compression/brotli" "tokio-util" "tokio" ]; + "compression-deflate" = [ "async-compression/zlib" "tokio-util" "tokio" ]; + "compression-full" = [ "compression-br" "compression-deflate" "compression-gzip" "compression-zstd" ]; + "compression-gzip" = [ "async-compression/gzip" "tokio-util" "tokio" ]; + "compression-zstd" = [ "async-compression/zstd" "tokio-util" "tokio" ]; + "decompression-br" = [ "async-compression/brotli" "tokio-util" "tokio" ]; + "decompression-deflate" = [ "async-compression/zlib" "tokio-util" "tokio" ]; + "decompression-full" = [ "decompression-br" "decompression-deflate" "decompression-gzip" "decompression-zstd" ]; + "decompression-gzip" = [ "async-compression/gzip" "tokio-util" "tokio" ]; + "decompression-zstd" = [ "async-compression/zstd" "tokio-util" "tokio" ]; + "follow-redirect" = [ "iri-string" "tower/util" ]; + "fs" = [ "tokio/fs" "tokio-util/io" "tokio/io-util" "mime_guess" "mime" "percent-encoding" "httpdate" "set-status" "futures-util/alloc" "tracing" ]; + "full" = [ "add-extension" "auth" "catch-panic" "compression-full" "cors" "decompression-full" "follow-redirect" "fs" "limit" "map-request-body" "map-response-body" "metrics" "normalize-path" "propagate-header" "redirect" "request-id" "sensitive-headers" "set-header" "set-status" "timeout" "trace" "util" "validate-request" ]; + "httpdate" = [ "dep:httpdate" ]; + "iri-string" = [ "dep:iri-string" ]; + "metrics" = [ "tokio/time" ]; + "mime" = [ "dep:mime" ]; + "mime_guess" = [ "dep:mime_guess" ]; + "percent-encoding" = [ "dep:percent-encoding" ]; + "request-id" = [ "uuid" ]; + "timeout" = [ "tokio/time" ]; + "tokio" = [ "dep:tokio" ]; + "tokio-util" = [ "dep:tokio-util" ]; + "tower" = [ "dep:tower" ]; + "trace" = [ "tracing" ]; + "tracing" = [ "dep:tracing" ]; + "util" = [ "tower" ]; + "uuid" = [ "dep:uuid" ]; + "validate-request" = [ "mime" ]; + }; + resolvedDefaultFeatures = [ "default" "trace" "tracing" ]; + }; "tower-layer" = rec { crateName = "tower-layer"; version = "0.3.2"; @@ -13385,6 +13525,7 @@ rec { { name = "tvix-tracing"; packageId = "tvix-tracing"; + features = [ "tonic" ]; } { name = "url"; @@ -14143,6 +14284,11 @@ rec { packageId = "tower"; } { + name = "tower-http"; + packageId = "tower-http"; + features = [ "trace" ]; + } + { name = "tracing"; packageId = "tracing"; } @@ -14157,6 +14303,7 @@ rec { { name = "tvix-tracing"; packageId = "tvix-tracing"; + features = [ "tonic" ]; } { name = "url"; @@ -14222,6 +14369,11 @@ rec { else ./tracing; dependencies = [ { + name = "http"; + packageId = "http"; + optional = true; + } + { name = "indicatif"; packageId = "indicatif"; } @@ -14235,6 +14387,11 @@ rec { optional = true; } { + name = "opentelemetry-http"; + packageId = "opentelemetry-http"; + optional = true; + } + { name = "opentelemetry-otlp"; packageId = "opentelemetry-otlp"; optional = true; @@ -14255,6 +14412,11 @@ rec { features = [ "sync" "rt" ]; } { + name = "tonic"; + packageId = "tonic"; + optional = true; + } + { name = "tracing"; packageId = "tracing"; features = [ "max_level_trace" "release_max_level_debug" ]; @@ -14281,10 +14443,11 @@ rec { } ]; features = { - "otlp" = [ "dep:tracing-opentelemetry" "dep:opentelemetry" "dep:opentelemetry-otlp" "dep:opentelemetry_sdk" ]; + "otlp" = [ "dep:tracing-opentelemetry" "dep:opentelemetry" "dep:opentelemetry-otlp" "dep:opentelemetry_sdk" "dep:opentelemetry-http" ]; + "tonic" = [ "dep:tonic" "dep:http" ]; "tracy" = [ "dep:tracing-tracy" ]; }; - resolvedDefaultFeatures = [ "default" "otlp" "tracy" ]; + resolvedDefaultFeatures = [ "default" "otlp" "tonic" "tracy" ]; }; "typenum" = rec { crateName = "typenum"; diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index ea4c598fe884..951613057064 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -29,7 +29,7 @@ tonic = "0.11.0" tower = "0.4.13" tracing = "0.1.37" tracing-indicatif = "0.3.6" -tvix-tracing = { path = "../tracing" } +tvix-tracing = { path = "../tracing", features = ["tonic"] } url = "2.4.0" walkdir = "2.4.0" zstd = "0.13.0" diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index 8898bbfb95ce..f76592e509f8 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -30,8 +30,12 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error> // - In the case of unix sockets, there must be a path, but may not be a host. // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?); - Box::new(GRPCBlobService::from_client(client)) + Box::new(GRPCBlobService::from_client( + BlobServiceClient::with_interceptor( + crate::tonic::channel_from_url(&url).await?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) } scheme if scheme.starts_with("objectstore+") => { // We need to convert the URL to string, strip the prefix there, and then diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 8bde6a120d0a..9b54d81de29c 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -17,29 +17,39 @@ use tokio_util::{ io::{CopyToBytes, SinkWriter}, sync::PollSender, }; -use tonic::{async_trait, transport::Channel, Code, Status}; +use tonic::{async_trait, Code, Status}; use tracing::{instrument, Instrument as _}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] -pub struct GRPCBlobService { +pub struct GRPCBlobService<T> +where + T: Clone, +{ /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + grpc_client: proto::blob_service_client::BlobServiceClient<T>, } -impl GRPCBlobService { +impl<T> GRPCBlobService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone, +{ /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. /// panics if called outside the context of a tokio runtime. - pub fn from_client( - grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, - ) -> Self { + pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self { Self { grpc_client } } } #[async_trait] -impl BlobService for GRPCBlobService { +impl<T> BlobService for GRPCBlobService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ #[instrument(skip(self, digest), fields(blob.digest=%digest))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { let mut grpc_client = self.grpc_client.clone(); @@ -337,7 +347,6 @@ mod tests { .await .expect("must succeed"), ); - GRPCBlobService::from_client(client) }; diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs index ee675ca68a9f..9aa01df171d7 100644 --- a/tvix/castore/src/directoryservice/from_addr.rs +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -63,8 +63,12 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Er // - In the case of unix sockets, there must be a path, but may not be a host. // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = DirectoryServiceClient::new(crate::tonic::channel_from_url(&url).await?); - Box::new(GRPCDirectoryService::from_client(client)) + Box::new(GRPCDirectoryService::from_client( + DirectoryServiceClient::with_interceptor( + crate::tonic::channel_from_url(&url).await?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) } scheme if scheme.starts_with("objectstore+") => { // We need to convert the URL to string, strip the prefix there, and then diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 24e498a997ef..54d7575adcc1 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -9,31 +9,41 @@ use tokio::spawn; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::async_trait; -use tonic::Code; -use tonic::{transport::Channel, Status}; +use tonic::{async_trait, Code, Status}; use tracing::{instrument, warn, Instrument as _}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] -pub struct GRPCDirectoryService { +pub struct GRPCDirectoryService<T> +where + T: Clone, +{ /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, } -impl GRPCDirectoryService { +impl<T> GRPCDirectoryService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone, +{ /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. /// panics if called outside the context of a tokio runtime. pub fn from_client( - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, ) -> Self { Self { grpc_client } } } #[async_trait] -impl DirectoryService for GRPCDirectoryService { +impl<T> DirectoryService for GRPCDirectoryService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] async fn get( &self, diff --git a/tvix/docs/src/TODO.md b/tvix/docs/src/TODO.md index a999e137066d..892110a92d77 100644 --- a/tvix/docs/src/TODO.md +++ b/tvix/docs/src/TODO.md @@ -233,9 +233,6 @@ logs etc, but this is something requiring a lot of designing. - Maybe drop `--log-level` entirely, and only use `RUST_LOG` env exclusively? `debug`,`trace` level across all crates is a bit useless, and `RUST_LOG` can be much more granular… - - gRPC trace propagation (cl/10532 + @simon) - We need to wire trace propagation into our gRPC clients, so if we collect - traces both for the client and server they will be connected. - Fix OTLP sending batches on shutdown. It seems for short-lived CLI invocations we don't end up receiving all spans. Ensure we flush these on ctrl-c, and regular process termination. diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 760569eb8dc6..733b7c11f2c2 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -31,13 +31,14 @@ tokio-stream = { version = "0.1.14", features = ["fs"] } tokio-util = { version = "0.7.9", features = ["io", "io-util", "compat"] } tonic = { version = "0.11.0", features = ["tls", "tls-roots"] } tower = "0.4.13" +tower-http = { version = "0.4.4", features = ["trace"] } tvix-castore = { path = "../castore" } url = "2.4.0" walkdir = "2.4.0" reqwest = { version = "0.11.22", features = ["rustls-tls-native-roots", "stream"], default-features = false } lru = "0.12.3" parking_lot = "0.12.2" -tvix-tracing = { path = "../tracing" } +tvix-tracing = { path = "../tracing", features = ["tonic"] } tracing = "0.1.40" tracing-indicatif = "0.3.6" diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 30ebca004873..657ce06720ee 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -13,6 +13,8 @@ use tokio_listener::Listener; use tokio_listener::SystemOptions; use tokio_listener::UserOptions; use tonic::transport::Server; +use tower::ServiceBuilder; +use tower_http::trace::{DefaultMakeSpan, TraceLayer}; use tracing::{info, info_span, instrument, Level, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt as _; use tvix_castore::import::fs::ingest_path; @@ -215,7 +217,17 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> { .parse() .unwrap(); - let mut server = Server::builder(); + let mut server = Server::builder().layer( + ServiceBuilder::new() + .layer( + TraceLayer::new_for_grpc().make_span_with( + DefaultMakeSpan::new() + .level(Level::INFO) + .include_headers(true), + ), + ) + .map_request(tvix_tracing::propagate::tonic::accept_trace), + ); #[allow(unused_mut)] let mut router = server diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 455909e7f235..9173d25d05ca 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -105,9 +105,12 @@ pub async fn from_addr( // - In the case of unix sockets, there must be a path, but may not be a host. // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = - PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?); - Box::new(GRPCPathInfoService::from_client(client)) + Box::new(GRPCPathInfoService::from_client( + PathInfoServiceClient::with_interceptor( + tvix_castore::tonic::channel_from_url(&url).await?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) } #[cfg(feature = "cloud")] "bigtable" => { diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 63656609354d..152bf69a05ac 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -6,31 +6,43 @@ use crate::{ use async_stream::try_stream; use futures::stream::BoxStream; use nix_compat::nixbase32; -use tonic::{async_trait, transport::Channel, Code}; +use tonic::{async_trait, Code}; use tracing::{instrument, Span}; use tracing_indicatif::span_ext::IndicatifSpanExt; use tvix_castore::{proto as castorepb, Error}; /// Connects to a (remote) tvix-store PathInfoService over gRPC. #[derive(Clone)] -pub struct GRPCPathInfoService { +pub struct GRPCPathInfoService<T> +where + T: Clone, +{ /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>, } -impl GRPCPathInfoService { +impl<T> GRPCPathInfoService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone, +{ /// construct a [GRPCPathInfoService] from a [proto::path_info_service_client::PathInfoServiceClient]. /// panics if called outside the context of a tokio runtime. pub fn from_client( - grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + grpc_client: proto::path_info_service_client::PathInfoServiceClient<T>, ) -> Self { Self { grpc_client } } } #[async_trait] -impl PathInfoService for GRPCPathInfoService { +impl<T> PathInfoService for GRPCPathInfoService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ #[instrument(level = "trace", skip_all, fields(path_info.digest = nixbase32::encode(&digest)))] async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> { let path_info = self @@ -107,7 +119,13 @@ impl PathInfoService for GRPCPathInfoService { } #[async_trait] -impl NarCalculationService for GRPCPathInfoService { +impl<T> NarCalculationService for GRPCPathInfoService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ #[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=1))] async fn calculate_nar( &self, diff --git a/tvix/store/src/pathinfoservice/tests/utils.rs b/tvix/store/src/pathinfoservice/tests/utils.rs index ee170468d1d2..3e4fe5c05a24 100644 --- a/tvix/store/src/pathinfoservice/tests/utils.rs +++ b/tvix/store/src/pathinfoservice/tests/utils.rs @@ -16,8 +16,11 @@ use crate::{ /// Constructs and returns a gRPC PathInfoService. /// We also return memory-based {Blob,Directory}Service, /// as the consumer of this function accepts a 3-tuple. -pub async fn make_grpc_path_info_service_client( -) -> (impl BlobService, impl DirectoryService, GRPCPathInfoService) { +pub async fn make_grpc_path_info_service_client() -> ( + impl BlobService, + impl DirectoryService, + GRPCPathInfoService<tonic::transport::Channel>, +) { let (left, right) = tokio::io::duplex(64); let blob_service = blob_service(); diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs index 67815fa94c4a..bd3c65a77998 100644 --- a/tvix/store/src/utils.rs +++ b/tvix/store/src/utils.rs @@ -51,12 +51,14 @@ pub async fn construct_services( .map_err(|e| io::Error::other(e.to_string()))?; if url.scheme().starts_with("grpc+") { - let client = PathInfoServiceClient::new( - tvix_castore::tonic::channel_from_url(&url) - .await - .map_err(|e| io::Error::other(e.to_string()))?, - ); - Box::new(GRPCPathInfoService::from_client(client)) + Box::new(GRPCPathInfoService::from_client( + PathInfoServiceClient::with_interceptor( + tvix_castore::tonic::channel_from_url(&url) + .await + .map_err(|e| io::Error::other(e.to_string()))?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) } else { Box::new(SimpleRenderer::new( blob_service.clone(), diff --git a/tvix/tracing/Cargo.toml b/tvix/tracing/Cargo.toml index 490f086a27f3..bc9a8c3c7792 100644 --- a/tvix/tracing/Cargo.toml +++ b/tvix/tracing/Cargo.toml @@ -17,6 +17,10 @@ opentelemetry = { version = "0.22.0", optional = true } opentelemetry-otlp = { version = "0.15.0", optional = true } opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"], optional = true } tracing-tracy = { version = "0.11.0", features = ["flush-on-exit"], optional = true } +opentelemetry-http = { version = "0.11.0", optional = true } + +tonic = { version = "0.11.0", optional = true } +http = { version = "0.2.11", optional = true } [features] default = [] @@ -24,11 +28,16 @@ otlp = [ "dep:tracing-opentelemetry", "dep:opentelemetry", "dep:opentelemetry-otlp", - "dep:opentelemetry_sdk" + "dep:opentelemetry_sdk", + "dep:opentelemetry-http" ] tracy = [ "dep:tracing-tracy" ] +tonic = [ + "dep:tonic", + "dep:http", +] [lints] workspace = true diff --git a/tvix/tracing/src/lib.rs b/tvix/tracing/src/lib.rs index 68a417998a24..b965ca4a3dbc 100644 --- a/tvix/tracing/src/lib.rs +++ b/tvix/tracing/src/lib.rs @@ -9,6 +9,7 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilte use opentelemetry::{trace::Tracer, KeyValue}; #[cfg(feature = "otlp")] use opentelemetry_sdk::{ + propagation::TraceContextPropagator, resource::{ResourceDetector, SdkProvidedResourceDetector}, trace::BatchConfigBuilder, Resource, @@ -16,6 +17,9 @@ use opentelemetry_sdk::{ #[cfg(feature = "tracy")] use tracing_tracy::TracyLayer; +#[cfg(feature = "tonic")] // TODO or http +pub mod propagate; + lazy_static! { pub static ref PB_PROGRESS_STYLE: ProgressStyle = ProgressStyle::with_template( "{span_child_prefix} {wide_msg} {bar:10} ({elapsed}) {pos:>7}/{len:7}" @@ -186,6 +190,9 @@ impl TracingBuilder { #[cfg(feature = "otlp")] { if let Some(service_name) = self.service_name { + // register a text map propagator for trace propagation + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new()); + let (tracer, tx) = gen_otlp_tracer(service_name.to_string()); // Create a tracing layer with the configured tracer let layer = tracing_opentelemetry::layer().with_tracer(tracer); diff --git a/tvix/tracing/src/propagate/mod.rs b/tvix/tracing/src/propagate/mod.rs new file mode 100644 index 000000000000..42c532e9d8cf --- /dev/null +++ b/tvix/tracing/src/propagate/mod.rs @@ -0,0 +1,9 @@ +#[cfg(feature = "tonic")] +pub mod tonic; + +// TODO: Helper library for reqwest. We could use +// https://github.com/TrueLayer/reqwest-middleware/tree/main/reqwest-tracing to realise this + +// TODO: Helper library for axum or another http server, see +// https://github.com/hseeberger/hello-tracing-rs/blob/main/hello-tracing-common/src/otel/http.rs +// as an example and we can reuse tonic::accept_trace fun, at least for a tower::ServiceBuilder diff --git a/tvix/tracing/src/propagate/tonic.rs b/tvix/tracing/src/propagate/tonic.rs new file mode 100644 index 000000000000..af643eb70e36 --- /dev/null +++ b/tvix/tracing/src/propagate/tonic.rs @@ -0,0 +1,59 @@ +use tonic::{ + metadata::{MetadataKey, MetadataMap, MetadataValue}, + Status, +}; +use tracing::{warn, Span}; + +#[cfg(feature = "otlp")] +use opentelemetry::{global, propagation::Injector}; +#[cfg(feature = "otlp")] +use opentelemetry_http::HeaderExtractor; +#[cfg(feature = "otlp")] +use tracing_opentelemetry::OpenTelemetrySpanExt; + +/// Trace context propagation: associate the current span with the otlp trace of the given request, +/// if any and valid. This only sets the parent trace if the otlp feature is also enabled. +pub fn accept_trace<B>(request: http::Request<B>) -> http::Request<B> { + // we only extract and set a parent trace if otlp feature is enabled, otherwise this feature is + // an noop and we return the request as is + #[cfg(feature = "otlp")] + { + // Current context, if no or invalid data is received. + let parent_context = global::get_text_map_propagator(|propagator| { + propagator.extract(&HeaderExtractor(request.headers())) + }); + Span::current().set_parent(parent_context); + } + request +} + +#[cfg(feature = "otlp")] +struct MetadataInjector<'a>(&'a mut MetadataMap); + +#[cfg(feature = "otlp")] +impl Injector for MetadataInjector<'_> { + fn set(&mut self, key: &str, value: String) { + match MetadataKey::from_bytes(key.as_bytes()) { + Ok(key) => match MetadataValue::try_from(&value) { + Ok(value) => { + self.0.insert(key, value); + } + Err(error) => warn!(value, error = format!("{error:#}"), "parse metadata value"), + }, + Err(error) => warn!(key, error = format!("{error:#}"), "parse metadata key"), + } + } +} + +/// Trace context propagation: send the trace context by injecting it into the metadata of the given +/// request. This only injects the current span if the otlp feature is also enabled. +pub fn send_trace<T>(mut request: tonic::Request<T>) -> Result<tonic::Request<T>, Status> { + #[cfg(feature = "otlp")] + { + global::get_text_map_propagator(|propagator| { + let context = Span::current().context(); + propagator.inject_context(&context, &mut MetadataInjector(request.metadata_mut())) + }); + } + Ok(request) +} |