diff options
Diffstat (limited to 'tvix/store/src/proto/grpc_directoryservice_wrapper.rs')
-rw-r--r-- | tvix/store/src/proto/grpc_directoryservice_wrapper.rs | 130 |
1 files changed, 36 insertions, 94 deletions
diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs index e73dfdab1866..06df07a108fb 100644 --- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs @@ -1,11 +1,11 @@ use crate::directoryservice::DirectoryService; use crate::proto; use data_encoding::BASE64; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::HashMap; use tokio::{sync::mpsc::channel, task}; use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Status, Streaming}; -use tracing::{debug, info_span, instrument, warn}; +use tracing::{debug, instrument, warn}; pub struct GRPCDirectoryServiceWrapper<C: DirectoryService> { directory_service: C, @@ -36,109 +36,51 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static> let directory_service = self.directory_service.clone(); - // kick off an async thread - task::spawn(async move { - // Keep the list of directory digests to traverse. - // As per rpc_directory.proto, we traverse in BFS order. - let mut deq: VecDeque<[u8; 32]> = VecDeque::new(); - + let _task = { // look at the digest in the request and put it in the top of the queue. match &req_inner.by_what { None => return Err(Status::invalid_argument("by_what needs to be specified")), Some(proto::get_directory_request::ByWhat::Digest(digest)) => { - deq.push_back( - digest - .as_slice() - .try_into() - .map_err(|_e| Status::invalid_argument("invalid digest length"))?, - ); - } - } - - // keep a list of all the Directory messages already sent, so we can omit sending the same. - let mut sent_directory_dgsts: HashSet<[u8; 32]> = HashSet::new(); - - // look up the directory at the top of the queue - while let Some(digest) = deq.pop_front() { - let digest_b64: String = BASE64.encode(&digest); - - // add digest we're currently processing to a span, but pay attention to - // https://docs.rs/tracing/0.1.37/tracing/span/struct.Span.html#in-asynchronous-code - // There may be no await in here (we leave the span before the tx.send(…).await) - let span = info_span!("digest", "{}", &digest_b64); - - let res: Result<proto::Directory, Status> = { - let _enter = span.enter(); - - // invoke client.get, and map to a Result<Directory, Status> - match directory_service.get(&digest) { - // The directory was not found, abort - Ok(None) => { - if !sent_directory_dgsts.is_empty() { - // If this is not the first lookup, we have a - // consistency issue, and we're missing some children, of which we have the - // parents. Log this out. - // Both the node we started with, and the - // current digest are part of the span. - warn!("consistency issue: directory not found") + let digest: [u8; 32] = digest + .as_slice() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + let digest_b64: String = BASE64.encode(&digest); + + task::spawn(async move { + if !req_inner.recursive { + let e: Result<proto::Directory, Status> = + match directory_service.get(&digest) { + Ok(Some(directory)) => Ok(directory), + Ok(None) => Err(Status::not_found(format!( + "directory {} not found", + digest_b64 + ))), + Err(e) => Err(e.into()), + }; + + if tx.send(e).await.is_err() { + debug!("receiver dropped"); } - Err(Status::not_found(format!( - "directory {} not found", - digest_b64 - ))) - } - Ok(Some(directory)) => { - // if recursion was requested, all its children need to be added to the queue. - // If a Directory message with the same digest has already - // been sent previously, we can skip enqueueing it. - // Same applies to when it already is in the queue. - if req_inner.recursive { - for child_directory_node in &directory.directories { - let child_directory_node_digest: [u8; 32] = - child_directory_node.digest.clone().try_into().map_err( - |_e| { - Status::internal( - "invalid child directory digest len", - ) - }, - )?; - - if !sent_directory_dgsts.contains(&child_directory_node_digest) - && !deq.contains(&child_directory_node_digest) - { - deq.push_back(child_directory_node_digest); - } + } else { + // If recursive was requested, traverse via get_recursive. + let directories_it = directory_service.get_recursive(&digest); + + for e in directories_it { + // map err in res from Error to Status + let res = e.map_err(|e| Status::internal(e.to_string())); + if tx.send(res).await.is_err() { + debug!("receiver dropped"); + break; } } - - // add it to sent_directory_dgsts. - // Strictly speaking, it wasn't sent yet, but tx.send happens right after, - // and the only way we can still fail is by the remote side to hang up, - // in which case we stop anyways. - sent_directory_dgsts.insert(digest); - - Ok(directory) } - Err(e) => Err(e.into()), - } - }; - - // send the result to the client - if (tx.send(res).await).is_err() { - debug!("receiver dropped"); - break; + }); } } + }; - Ok(()) - }); - - // NOTE: this always returns an Ok response, with the first item in the - // stream being a potential error, instead of directly returning the - // first error. - // There's no need to check if the directory node exists twice, - // and client code should consider an Err(), or the first item of the - // stream being an error to be equivalent. let receiver_stream = ReceiverStream::new(rx); Ok(Response::new(receiver_stream)) } |