diff options
author | Simon Hauser <simon.hauser@helsinki-systems.de> | 2024-06-20T09·39+0200 |
---|---|---|
committer | Simon Hauser <simon.hauser@helsinki-systems.de> | 2024-06-20T19·21+0000 |
commit | 639a00e2ab1f2a6530477d2d7f91cccbc9f70746 (patch) | |
tree | 4f953d503a6ee0c3d04c623c0eed13f7381bd699 /tvix/castore/src | |
parent | 2b20d8d82dd424f2cb457c0cdef3ab3e98512117 (diff) |
feat(tvix/tracing): gRPC trace context propagation r/8299
This introduces optional helper function in tvix/tracing for trace propagation and uses these helper in the `tvix-store`. The GRPCBlobService, GRPCDirectoryService and GRPCPathInfoService now accept a generic client, meaning the client can be generated with either `::new` or `::with_interceptor`. This was tested and validated by starting a `tvix-store daemon` and `tvix-store import`. Change-Id: I4b194483bf09266820104b4b56e4a135dca2b77a Reviewed-on: https://cl.tvl.fyi/c/depot/+/11863 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src')
-rw-r--r-- | tvix/castore/src/blobservice/from_addr.rs | 8 | ||||
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 27 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/from_addr.rs | 8 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 26 |
4 files changed, 48 insertions, 21 deletions
diff --git a/tvix/castore/src/blobservice/from_addr.rs b/tvix/castore/src/blobservice/from_addr.rs index 8898bbfb95ce..f76592e509f8 100644 --- a/tvix/castore/src/blobservice/from_addr.rs +++ b/tvix/castore/src/blobservice/from_addr.rs @@ -30,8 +30,12 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn BlobService>, crate::Error> // - In the case of unix sockets, there must be a path, but may not be a host. // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = BlobServiceClient::new(crate::tonic::channel_from_url(&url).await?); - Box::new(GRPCBlobService::from_client(client)) + Box::new(GRPCBlobService::from_client( + BlobServiceClient::with_interceptor( + crate::tonic::channel_from_url(&url).await?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) } scheme if scheme.starts_with("objectstore+") => { // We need to convert the URL to string, strip the prefix there, and then diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 8bde6a120d0a..9b54d81de29c 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -17,29 +17,39 @@ use tokio_util::{ io::{CopyToBytes, SinkWriter}, sync::PollSender, }; -use tonic::{async_trait, transport::Channel, Code, Status}; +use tonic::{async_trait, Code, Status}; use tracing::{instrument, Instrument as _}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] -pub struct GRPCBlobService { +pub struct GRPCBlobService<T> +where + T: Clone, +{ /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, + grpc_client: proto::blob_service_client::BlobServiceClient<T>, } -impl GRPCBlobService { +impl<T> GRPCBlobService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone, +{ /// construct a [GRPCBlobService] from a [proto::blob_service_client::BlobServiceClient]. /// panics if called outside the context of a tokio runtime. - pub fn from_client( - grpc_client: proto::blob_service_client::BlobServiceClient<Channel>, - ) -> Self { + pub fn from_client(grpc_client: proto::blob_service_client::BlobServiceClient<T>) -> Self { Self { grpc_client } } } #[async_trait] -impl BlobService for GRPCBlobService { +impl<T> BlobService for GRPCBlobService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ #[instrument(skip(self, digest), fields(blob.digest=%digest))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { let mut grpc_client = self.grpc_client.clone(); @@ -337,7 +347,6 @@ mod tests { .await .expect("must succeed"), ); - GRPCBlobService::from_client(client) }; diff --git a/tvix/castore/src/directoryservice/from_addr.rs b/tvix/castore/src/directoryservice/from_addr.rs index ee675ca68a9f..9aa01df171d7 100644 --- a/tvix/castore/src/directoryservice/from_addr.rs +++ b/tvix/castore/src/directoryservice/from_addr.rs @@ -63,8 +63,12 @@ pub async fn from_addr(uri: &str) -> Result<Box<dyn DirectoryService>, crate::Er // - In the case of unix sockets, there must be a path, but may not be a host. // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. - let client = DirectoryServiceClient::new(crate::tonic::channel_from_url(&url).await?); - Box::new(GRPCDirectoryService::from_client(client)) + Box::new(GRPCDirectoryService::from_client( + DirectoryServiceClient::with_interceptor( + crate::tonic::channel_from_url(&url).await?, + tvix_tracing::propagate::tonic::send_trace, + ), + )) } scheme if scheme.starts_with("objectstore+") => { // We need to convert the URL to string, strip the prefix there, and then diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 24e498a997ef..54d7575adcc1 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -9,31 +9,41 @@ use tokio::spawn; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use tokio_stream::wrappers::UnboundedReceiverStream; -use tonic::async_trait; -use tonic::Code; -use tonic::{transport::Channel, Status}; +use tonic::{async_trait, Code, Status}; use tracing::{instrument, warn, Instrument as _}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] -pub struct GRPCDirectoryService { +pub struct GRPCDirectoryService<T> +where + T: Clone, +{ /// The internal reference to a gRPC client. /// Cloning it is cheap, and it internally handles concurrent requests. - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, } -impl GRPCDirectoryService { +impl<T> GRPCDirectoryService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Clone, +{ /// construct a [GRPCDirectoryService] from a [proto::directory_service_client::DirectoryServiceClient]. /// panics if called outside the context of a tokio runtime. pub fn from_client( - grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + grpc_client: proto::directory_service_client::DirectoryServiceClient<T>, ) -> Self { Self { grpc_client } } } #[async_trait] -impl DirectoryService for GRPCDirectoryService { +impl<T> DirectoryService for GRPCDirectoryService<T> +where + T: tonic::client::GrpcService<tonic::body::BoxBody> + Send + Sync + Clone + 'static, + T::ResponseBody: tonic::codegen::Body<Data = tonic::codegen::Bytes> + Send + 'static, + <T::ResponseBody as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send, + T::Future: Send, +{ #[instrument(level = "trace", skip_all, fields(directory.digest = %digest))] async fn get( &self, |