From 37a348b4fae16b2b1c5ec12deaa085a049833d7f Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Tue, 19 Sep 2023 11:46:41 -0500 Subject: refactor(tvix/store): Asyncify PathInfoService and DirectoryService We've decided to asyncify all of the services to reduce some of the pains going back and for between sync<->async. The end goal will be for all the tvix-store internals to be async and then expose a sync interface for things like tvix eval io. Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369 Autosubmit: Connor Brewster Tested-by: BuildkiteCI Reviewed-by: flokli --- tvix/store/src/proto/grpc_directoryservice_wrapper.rs | 11 ++++++----- tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs | 17 +++++++++-------- 2 files changed, 15 insertions(+), 13 deletions(-) (limited to 'tvix/store/src/proto') diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs index ec53d7d76ce3..5e143a7bd7a8 100644 --- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs @@ -1,5 +1,6 @@ use crate::proto; use crate::{directoryservice::DirectoryService, B3Digest}; +use futures::StreamExt; use std::collections::HashMap; use std::sync::Arc; use tokio::{sync::mpsc::channel, task}; @@ -47,7 +48,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW task::spawn(async move { if !req_inner.recursive { let e: Result = - match directory_service.get(&digest) { + match directory_service.get(&digest).await { Ok(Some(directory)) => Ok(directory), Ok(None) => Err(Status::not_found(format!( "directory {} not found", @@ -61,9 +62,9 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW } } else { // If recursive was requested, traverse via get_recursive. - let directories_it = directory_service.get_recursive(&digest); + let mut directories_it = directory_service.get_recursive(&digest); - for e in directories_it { + while let Some(e) = directories_it.next().await { // map err in res from Error to Status let res = e.map_err(|e| Status::internal(e.to_string())); if tx.send(res).await.is_err() { @@ -157,7 +158,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW // check if the directory already exists in the database. We can skip // inserting if it's already there, as that'd be a no-op. - match self.directory_service.get(&dgst) { + match self.directory_service.get(&dgst).await { Err(e) => { warn!("error checking if directory already exists: {}", e); return Err(e.into()); @@ -166,7 +167,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW Ok(Some(_)) => {} // insert if it doesn't already exist Ok(None) => { - self.directory_service.put(directory)?; + self.directory_service.put(directory).await?; } } } diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 33861d9ffa4e..14ceb34c3af7 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -1,6 +1,7 @@ use crate::nar::RenderError; use crate::pathinfoservice::PathInfoService; use crate::proto; +use futures::StreamExt; use std::sync::Arc; use tokio::task; use tokio_stream::wrappers::ReceiverStream; @@ -36,7 +37,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra .to_vec() .try_into() .map_err(|_e| Status::invalid_argument("invalid output digest length"))?; - match self.path_info_service.get(digest) { + match self.path_info_service.get(digest).await { Ok(None) => Err(Status::not_found("PathInfo not found")), Ok(Some(path_info)) => Ok(Response::new(path_info)), Err(e) => { @@ -54,7 +55,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra // Store the PathInfo in the client. Clients MUST validate the data // they receive, so we don't validate additionally here. - match self.path_info_service.put(path_info) { + match self.path_info_service.put(path_info).await { Ok(path_info_new) => Ok(Response::new(path_info_new)), Err(e) => { warn!("failed to insert PathInfo: {}", e); @@ -72,11 +73,10 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra None => Err(Status::invalid_argument("no root node sent")), Some(root_node) => { let path_info_service = self.path_info_service.clone(); - let (nar_size, nar_sha256) = - task::spawn_blocking(move || path_info_service.calculate_nar(&root_node)) - .await - .unwrap() - .expect("error during nar calculation"); // TODO: handle error + let (nar_size, nar_sha256) = path_info_service + .calculate_nar(&root_node) + .await + .expect("error during nar calculation"); // TODO: handle error Ok(Response::new(proto::CalculateNarResponse { nar_size, @@ -96,7 +96,8 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra let path_info_service = self.path_info_service.clone(); let _task = task::spawn(async move { - for e in path_info_service.list() { + let mut stream = path_info_service.list(); + while let Some(e) = stream.next().await { let res = e.map_err(|e| Status::internal(e.to_string())); if tx.send(res).await.is_err() { debug!("receiver dropped"); -- cgit 1.4.1