diff options
Diffstat (limited to 'tvix/castore/src')
-rw-r--r-- | tvix/castore/src/blobservice/grpc.rs | 4 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 19 | ||||
-rw-r--r-- | tvix/castore/src/fs/mod.rs | 22 |
3 files changed, 27 insertions, 18 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs index 5663cd3838ec..8bde6a120d0a 100644 --- a/tvix/castore/src/blobservice/grpc.rs +++ b/tvix/castore/src/blobservice/grpc.rs @@ -18,7 +18,7 @@ use tokio_util::{ sync::PollSender, }; use tonic::{async_trait, transport::Channel, Code, Status}; -use tracing::instrument; +use tracing::{instrument, Instrument as _}; /// Connects to a (remote) tvix-store BlobService over gRPC. #[derive(Clone)] @@ -133,6 +133,8 @@ impl BlobService for GRPCBlobService { let task = tokio::spawn({ let mut grpc_client = self.grpc_client.clone(); async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) } + // instrument the task with the current span, this is not done by default + .in_current_span() }); // The tx part of the channel is converted to a sink of byte chunks. diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index fe935629bfcb..24e498a997ef 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -12,7 +12,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::async_trait; use tonic::Code; use tonic::{transport::Channel, Status}; -use tracing::{instrument, warn}; +use tracing::{instrument, warn, Instrument as _}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] @@ -194,14 +194,17 @@ impl DirectoryService for GRPCDirectoryService { let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move { - let s = grpc_client - .put(UnboundedReceiverStream::new(rx)) - .await? - .into_inner(); + let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn( + async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); - Ok(s) - }); + Ok(s) + } // instrument the task with the current span, this is not done by default + .in_current_span(), + ); Box::new(GRPCPutter { rq: Some((task, tx)), diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index 176199f64aaf..b565ed60ac42 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -43,7 +43,7 @@ use tokio::{ io::{AsyncReadExt, AsyncSeekExt}, sync::mpsc, }; -use tracing::{debug, error, instrument, warn, Span}; +use tracing::{debug, error, instrument, warn, Instrument as _, Span}; /// This implements a read-only FUSE filesystem for a tvix-store /// with the passed [BlobService], [DirectoryService] and [RootNodes]. @@ -397,16 +397,20 @@ where // This task will run in the background immediately and will exit // after the stream ends or if we no longer want any more entries. - self.tokio_handle.spawn(async move { - let mut stream = root_nodes_provider.list().enumerate(); - while let Some(node) = stream.next().await { - if tx.send(node).await.is_err() { - // If we get a send error, it means the sync code - // doesn't want any more entries. - break; + self.tokio_handle.spawn( + async move { + let mut stream = root_nodes_provider.list().enumerate(); + while let Some(node) = stream.next().await { + if tx.send(node).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } } } - }); + // instrument the task with the current span, this is not done by default + .in_current_span(), + ); // Put the rx part into [self.dir_handles]. // TODO: this will overflow after 2**64 operations, |