about summary refs log tree commit diff
path: root/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-09-19T16·46-0500
committerclbot <clbot@tvl.fyi>2023-09-21T17·58+0000
commit37a348b4fae16b2b1c5ec12deaa085a049833d7f (patch)
tree7a1b1a7160036777b010cd81628960c1ca07486e /tvix/store/src/proto/grpc_directoryservice_wrapper.rs
parent7e737fde34260daa477794d63b0b3344b4a1d81b (diff)
refactor(tvix/store): Asyncify PathInfoService and DirectoryService r/6623
We've decided to asyncify all of the services to reduce some of the
pains going back and for between sync<->async. The end goal will be for
all the tvix-store internals to be async and then expose a sync
interface for things like tvix eval io.

Change-Id: I97c71f8db1d05a38bd8f625df5087d565705d52d
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9369
Autosubmit: Connor Brewster <cbrewster@hey.com>
Tested-by: BuildkiteCI
Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/proto/grpc_directoryservice_wrapper.rs')
-rw-r--r--tvix/store/src/proto/grpc_directoryservice_wrapper.rs11
1 files changed, 6 insertions, 5 deletions
diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
index ec53d7d76ce3..5e143a7bd7a8 100644
--- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
@@ -1,5 +1,6 @@
 use crate::proto;
 use crate::{directoryservice::DirectoryService, B3Digest};
+use futures::StreamExt;
 use std::collections::HashMap;
 use std::sync::Arc;
 use tokio::{sync::mpsc::channel, task};
@@ -47,7 +48,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
                     task::spawn(async move {
                         if !req_inner.recursive {
                             let e: Result<proto::Directory, Status> =
-                                match directory_service.get(&digest) {
+                                match directory_service.get(&digest).await {
                                     Ok(Some(directory)) => Ok(directory),
                                     Ok(None) => Err(Status::not_found(format!(
                                         "directory {} not found",
@@ -61,9 +62,9 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
                             }
                         } else {
                             // If recursive was requested, traverse via get_recursive.
-                            let directories_it = directory_service.get_recursive(&digest);
+                            let mut directories_it = directory_service.get_recursive(&digest);
 
-                            for e in directories_it {
+                            while let Some(e) = directories_it.next().await {
                                 // map err in res from Error to Status
                                 let res = e.map_err(|e| Status::internal(e.to_string()));
                                 if tx.send(res).await.is_err() {
@@ -157,7 +158,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
 
             // check if the directory already exists in the database. We can skip
             // inserting if it's already there, as that'd be a no-op.
-            match self.directory_service.get(&dgst) {
+            match self.directory_service.get(&dgst).await {
                 Err(e) => {
                     warn!("error checking if directory already exists: {}", e);
                     return Err(e.into());
@@ -166,7 +167,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
                 Ok(Some(_)) => {}
                 // insert if it doesn't already exist
                 Ok(None) => {
-                    self.directory_service.put(directory)?;
+                    self.directory_service.put(directory).await?;
                 }
             }
         }