about summary refs log tree commit diff
path: root/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-09-19T16·46-0500
committerclbot <clbot@tvl.fyi>2023-09-21T17·58+0000
commit37a348b4fae16b2b1c5ec12deaa085a049833d7f (patch)
tree7a1b1a7160036777b010cd81628960c1ca07486e /tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
parent7e737fde34260daa477794d63b0b3344b4a1d81b (diff)
refactor(tvix/store): Asyncify PathInfoService and DirectoryService r/6623
We've decided to asyncify all of the services to reduce some of the
pains going back and for between sync<->async. The end goal will be for
all the tvix-store internals to be async and then expose a sync
interface for things like tvix eval io.

Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369
Autosubmit: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs')
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs17
1 files changed, 9 insertions, 8 deletions
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index 33861d9ffa4e..14ceb34c3af7 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -1,6 +1,7 @@
 use crate::nar::RenderError;
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
+use futures::StreamExt;
 use std::sync::Arc;
 use tokio::task;
 use tokio_stream::wrappers::ReceiverStream;
@@ -36,7 +37,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
                     .to_vec()
                     .try_into()
                     .map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
-                match self.path_info_service.get(digest) {
+                match self.path_info_service.get(digest).await {
                     Ok(None) => Err(Status::not_found("PathInfo not found")),
                     Ok(Some(path_info)) => Ok(Response::new(path_info)),
                     Err(e) => {
@@ -54,7 +55,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
 
         // Store the PathInfo in the client. Clients MUST validate the data
         // they receive, so we don't validate additionally here.
-        match self.path_info_service.put(path_info) {
+        match self.path_info_service.put(path_info).await {
             Ok(path_info_new) => Ok(Response::new(path_info_new)),
             Err(e) => {
                 warn!("failed to insert PathInfo: {}", e);
@@ -72,11 +73,10 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
             None => Err(Status::invalid_argument("no root node sent")),
             Some(root_node) => {
                 let path_info_service = self.path_info_service.clone();
-                let (nar_size, nar_sha256) =
-                    task::spawn_blocking(move || path_info_service.calculate_nar(&root_node))
-                        .await
-                        .unwrap()
-                        .expect("error during nar calculation"); // TODO: handle error
+                let (nar_size, nar_sha256) = path_info_service
+                    .calculate_nar(&root_node)
+                    .await
+                    .expect("error during nar calculation"); // TODO: handle error
 
                 Ok(Response::new(proto::CalculateNarResponse {
                     nar_size,
@@ -96,7 +96,8 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
         let path_info_service = self.path_info_service.clone();
 
         let _task = task::spawn(async move {
-            for e in path_info_service.list() {
+            let mut stream = path_info_service.list();
+            while let Some(e) = stream.next().await {
                 let res = e.map_err(|e| Status::internal(e.to_string()));
                 if tx.send(res).await.is_err() {
                     debug!("receiver dropped");