diff options
Diffstat (limited to 'tvix/store/src/proto/grpc_directoryservice_wrapper.rs')
-rw-r--r-- | tvix/store/src/proto/grpc_directoryservice_wrapper.rs | 11 |
1 files changed, 6 insertions, 5 deletions
diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs index ec53d7d76ce3..5e143a7bd7a8 100644 --- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs @@ -1,5 +1,6 @@ use crate::proto; use crate::{directoryservice::DirectoryService, B3Digest}; +use futures::StreamExt; use std::collections::HashMap; use std::sync::Arc; use tokio::{sync::mpsc::channel, task}; @@ -47,7 +48,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW task::spawn(async move { if !req_inner.recursive { let e: Result<proto::Directory, Status> = - match directory_service.get(&digest) { + match directory_service.get(&digest).await { Ok(Some(directory)) => Ok(directory), Ok(None) => Err(Status::not_found(format!( "directory {} not found", @@ -61,9 +62,9 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW } } else { // If recursive was requested, traverse via get_recursive. - let directories_it = directory_service.get_recursive(&digest); + let mut directories_it = directory_service.get_recursive(&digest); - for e in directories_it { + while let Some(e) = directories_it.next().await { // map err in res from Error to Status let res = e.map_err(|e| Status::internal(e.to_string())); if tx.send(res).await.is_err() { @@ -157,7 +158,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW // check if the directory already exists in the database. We can skip // inserting if it's already there, as that'd be a no-op. - match self.directory_service.get(&dgst) { + match self.directory_service.get(&dgst).await { Err(e) => { warn!("error checking if directory already exists: {}", e); return Err(e.into()); @@ -166,7 +167,7 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW Ok(Some(_)) => {} // insert if it doesn't already exist Ok(None) => { - self.directory_service.put(directory)?; + self.directory_service.put(directory).await?; } } } |