about summary refs log tree commit diff
path: root/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-03-26T11·51+0200
committerflokli <flokli@flokli.de>2023-03-27T09·03+0000
commit2fe53cce40af94d9c8e6971cbf32073ecc77d4a1 (patch)
treea57a2daee29058143aec64efe335bfbc9cc55555 /tvix/store/src/proto/grpc_directoryservice_wrapper.rs
parent2d305fd5b37fa7bf5a0512e8992b4557a1745296 (diff)
feat(tvix/store/directorysvc): add DirectoryService::get_recursive() r/6046
This moves the recursive BFS traversal of Directory closures from the
GRPCDirectoryServiceWrapper out into a a DirectoryTraverser struct
implementing Iterator.

It is then used from various implementors of DirectoryService in the
`get_recursive()` method.

This allows distinguishing between recursive requests and non-recursive
requests in the gRPC client trait implementation.

Change-Id: I50bfd4a0d9eb11832847329b78c587ec7c9dc7b1
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8351
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/proto/grpc_directoryservice_wrapper.rs')
-rw-r--r--tvix/store/src/proto/grpc_directoryservice_wrapper.rs130
1 files changed, 36 insertions, 94 deletions
diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
index e73dfdab1866..06df07a108fb 100644
--- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
@@ -1,11 +1,11 @@
 use crate::directoryservice::DirectoryService;
 use crate::proto;
 use data_encoding::BASE64;
-use std::collections::{HashMap, HashSet, VecDeque};
+use std::collections::HashMap;
 use tokio::{sync::mpsc::channel, task};
 use tokio_stream::wrappers::ReceiverStream;
 use tonic::{async_trait, Request, Response, Status, Streaming};
-use tracing::{debug, info_span, instrument, warn};
+use tracing::{debug, instrument, warn};
 
 pub struct GRPCDirectoryServiceWrapper<C: DirectoryService> {
     directory_service: C,
@@ -36,109 +36,51 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static>
 
         let directory_service = self.directory_service.clone();
 
-        // kick off an async thread
-        task::spawn(async move {
-            // Keep the list of directory digests to traverse.
-            // As per rpc_directory.proto, we traverse in BFS order.
-            let mut deq: VecDeque<[u8; 32]> = VecDeque::new();
-
+        let _task = {
             // look at the digest in the request and put it in the top of the queue.
             match &req_inner.by_what {
                 None => return Err(Status::invalid_argument("by_what needs to be specified")),
                 Some(proto::get_directory_request::ByWhat::Digest(digest)) => {
-                    deq.push_back(
-                        digest
-                            .as_slice()
-                            .try_into()
-                            .map_err(|_e| Status::invalid_argument("invalid digest length"))?,
-                    );
-                }
-            }
-
-            // keep a list of all the Directory messages already sent, so we can omit sending the same.
-            let mut sent_directory_dgsts: HashSet<[u8; 32]> = HashSet::new();
-
-            // look up the directory at the top of the queue
-            while let Some(digest) = deq.pop_front() {
-                let digest_b64: String = BASE64.encode(&digest);
-
-                // add digest we're currently processing to a span, but pay attention to
-                // https://docs.rs/tracing/0.1.37/tracing/span/struct.Span.html#in-asynchronous-code
-                // There may be no await in here (we leave the span before the tx.send(…).await)
-                let span = info_span!("digest", "{}", &digest_b64);
-
-                let res: Result<proto::Directory, Status> = {
-                    let _enter = span.enter();
-
-                    // invoke client.get, and map to a Result<Directory, Status>
-                    match directory_service.get(&digest) {
-                        // The directory was not found, abort
-                        Ok(None) => {
-                            if !sent_directory_dgsts.is_empty() {
-                                // If this is not the first lookup, we have a
-                                // consistency issue, and we're missing some children, of which we have the
-                                // parents. Log this out.
-                                // Both the node we started with, and the
-                                // current digest are part of the span.
-                                warn!("consistency issue: directory not found")
+                    let digest: [u8; 32] = digest
+                        .as_slice()
+                        .try_into()
+                        .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
+
+                    let digest_b64: String = BASE64.encode(&digest);
+
+                    task::spawn(async move {
+                        if !req_inner.recursive {
+                            let e: Result<proto::Directory, Status> =
+                                match directory_service.get(&digest) {
+                                    Ok(Some(directory)) => Ok(directory),
+                                    Ok(None) => Err(Status::not_found(format!(
+                                        "directory {} not found",
+                                        digest_b64
+                                    ))),
+                                    Err(e) => Err(e.into()),
+                                };
+
+                            if tx.send(e).await.is_err() {
+                                debug!("receiver dropped");
                             }
-                            Err(Status::not_found(format!(
-                                "directory {} not found",
-                                digest_b64
-                            )))
-                        }
-                        Ok(Some(directory)) => {
-                            // if recursion was requested, all its children need to be added to the queue.
-                            // If a Directory message with the same digest has already
-                            // been sent previously, we can skip enqueueing it.
-                            // Same applies to when it already is in the queue.
-                            if req_inner.recursive {
-                                for child_directory_node in &directory.directories {
-                                    let child_directory_node_digest: [u8; 32] =
-                                        child_directory_node.digest.clone().try_into().map_err(
-                                            |_e| {
-                                                Status::internal(
-                                                    "invalid child directory digest len",
-                                                )
-                                            },
-                                        )?;
-
-                                    if !sent_directory_dgsts.contains(&child_directory_node_digest)
-                                        && !deq.contains(&child_directory_node_digest)
-                                    {
-                                        deq.push_back(child_directory_node_digest);
-                                    }
+                        } else {
+                            // If recursive was requested, traverse via get_recursive.
+                            let directories_it = directory_service.get_recursive(&digest);
+
+                            for e in directories_it {
+                                // map err in res from Error to Status
+                                let res = e.map_err(|e| Status::internal(e.to_string()));
+                                if tx.send(res).await.is_err() {
+                                    debug!("receiver dropped");
+                                    break;
                                 }
                             }
-
-                            // add it to sent_directory_dgsts.
-                            // Strictly speaking, it wasn't sent yet, but tx.send happens right after,
-                            // and the only way we can still fail is by the remote side to hang up,
-                            // in which case we stop anyways.
-                            sent_directory_dgsts.insert(digest);
-
-                            Ok(directory)
                         }
-                        Err(e) => Err(e.into()),
-                    }
-                };
-
-                // send the result to the client
-                if (tx.send(res).await).is_err() {
-                    debug!("receiver dropped");
-                    break;
+                    });
                 }
             }
+        };
 
-            Ok(())
-        });
-
-        // NOTE: this always returns an Ok response, with the first item in the
-        // stream being a potential error, instead of directly returning the
-        // first error.
-        // There's no need to check if the directory node exists twice,
-        // and client code should consider an Err(), or the first item of the
-        // stream being an error to be equivalent.
         let receiver_stream = ReceiverStream::new(rx);
         Ok(Response::new(receiver_stream))
     }