about summary refs log tree commit diff
path: root/tvix/castore/src
diff options
context:
space:
mode:
authorSimon Hauser <simon.hauser@helsinki-systems.de>2024-06-20T14·10+0200
committerSimon Hauser <simon.hauser@helsinki-systems.de>2024-06-20T15·48+0000
commit2b20d8d82dd424f2cb457c0cdef3ab3e98512117 (patch)
tree59b1013ff5e6f148554d7aa1fb69594fe0524f93 /tvix/castore/src
parentbd8d74a3eea461268c3ee089e001022c7d151c14 (diff)
feat(tvix/castore): instrument tokio task with current span r/8298
By default tokio::spawn does not instrument the spawned task with the
current spawn (https://github.com/tokio-rs/tokio/discussions/6008), do
this manually for all tokio::spawn functions in functions that are
instrumented.

Change-Id: I83dd8145b3a62421454aff57d34180cebbee8304
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11864
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Autosubmit: Simon Hauser <simon.hauser@helsinki-systems.de>
Diffstat (limited to 'tvix/castore/src')
-rw-r--r--tvix/castore/src/blobservice/grpc.rs4
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs19
-rw-r--r--tvix/castore/src/fs/mod.rs22
3 files changed, 27 insertions, 18 deletions
diff --git a/tvix/castore/src/blobservice/grpc.rs b/tvix/castore/src/blobservice/grpc.rs
index 5663cd3838ec..8bde6a120d0a 100644
--- a/tvix/castore/src/blobservice/grpc.rs
+++ b/tvix/castore/src/blobservice/grpc.rs
@@ -18,7 +18,7 @@ use tokio_util::{
     sync::PollSender,
 };
 use tonic::{async_trait, transport::Channel, Code, Status};
-use tracing::instrument;
+use tracing::{instrument, Instrument as _};
 
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
@@ -133,6 +133,8 @@ impl BlobService for GRPCBlobService {
         let task = tokio::spawn({
             let mut grpc_client = self.grpc_client.clone();
             async move { Ok::<_, Status>(grpc_client.put(blobchunk_stream).await?.into_inner()) }
+                // instrument the task with the current span, this is not done by default
+                .in_current_span()
         });
 
         // The tx part of the channel is converted to a sink of byte chunks.
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index fe935629bfcb..24e498a997ef 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -12,7 +12,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
 use tonic::async_trait;
 use tonic::Code;
 use tonic::{transport::Channel, Status};
-use tracing::{instrument, warn};
+use tracing::{instrument, warn, Instrument as _};
 
 /// Connects to a (remote) tvix-store DirectoryService over gRPC.
 #[derive(Clone)]
@@ -194,14 +194,17 @@ impl DirectoryService for GRPCDirectoryService {
 
         let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
 
-        let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(async move {
-            let s = grpc_client
-                .put(UnboundedReceiverStream::new(rx))
-                .await?
-                .into_inner();
+        let task: JoinHandle<Result<proto::PutDirectoryResponse, Status>> = spawn(
+            async move {
+                let s = grpc_client
+                    .put(UnboundedReceiverStream::new(rx))
+                    .await?
+                    .into_inner();
 
-            Ok(s)
-        });
+                Ok(s)
+            } // instrument the task with the current span, this is not done by default
+            .in_current_span(),
+        );
 
         Box::new(GRPCPutter {
             rq: Some((task, tx)),
diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs
index 176199f64aaf..b565ed60ac42 100644
--- a/tvix/castore/src/fs/mod.rs
+++ b/tvix/castore/src/fs/mod.rs
@@ -43,7 +43,7 @@ use tokio::{
     io::{AsyncReadExt, AsyncSeekExt},
     sync::mpsc,
 };
-use tracing::{debug, error, instrument, warn, Span};
+use tracing::{debug, error, instrument, warn, Instrument as _, Span};
 
 /// This implements a read-only FUSE filesystem for a tvix-store
 /// with the passed [BlobService], [DirectoryService] and [RootNodes].
@@ -397,16 +397,20 @@ where
 
             // This task will run in the background immediately and will exit
             // after the stream ends or if we no longer want any more entries.
-            self.tokio_handle.spawn(async move {
-                let mut stream = root_nodes_provider.list().enumerate();
-                while let Some(node) = stream.next().await {
-                    if tx.send(node).await.is_err() {
-                        // If we get a send error, it means the sync code
-                        // doesn't want any more entries.
-                        break;
+            self.tokio_handle.spawn(
+                async move {
+                    let mut stream = root_nodes_provider.list().enumerate();
+                    while let Some(node) = stream.next().await {
+                        if tx.send(node).await.is_err() {
+                            // If we get a send error, it means the sync code
+                            // doesn't want any more entries.
+                            break;
+                        }
                     }
                 }
-            });
+                // instrument the task with the current span, this is not done by default
+                .in_current_span(),
+            );
 
             // Put the rx part into [self.dir_handles].
             // TODO: this will overflow after 2**64 operations,