diff options
author | Florian Klink <flokli@flokli.de> | 2023-03-26T11·51+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-03-27T09·03+0000 |
commit | 2fe53cce40af94d9c8e6971cbf32073ecc77d4a1 (patch) | |
tree | a57a2daee29058143aec64efe335bfbc9cc55555 /tvix/store/src/directoryservice/grpc.rs | |
parent | 2d305fd5b37fa7bf5a0512e8992b4557a1745296 (diff) |
feat(tvix/store/directorysvc): add DirectoryService::get_recursive() r/6046
This moves the recursive BFS traversal of Directory closures from the GRPCDirectoryServiceWrapper out into a a DirectoryTraverser struct implementing Iterator. It is then used from various implementors of DirectoryService in the `get_recursive()` method. This allows distinguishing between recursive requests and non-recursive requests in the gRPC client trait implementation. Change-Id: I50bfd4a0d9eb11832847329b78c587ec7c9dc7b1 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8351 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/directoryservice/grpc.rs')
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 140 |
1 files changed, 135 insertions, 5 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index b036f16aca34..e44106b3291f 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -1,7 +1,11 @@ +use std::collections::HashSet; + use super::DirectoryService; use crate::proto::{self, get_directory_request::ByWhat}; -use tonic::transport::Channel; -use tonic::Code; +use data_encoding::BASE64; +use tonic::{transport::Channel, Status}; +use tonic::{Code, Streaming}; +use tracing::{instrument, warn}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] @@ -29,13 +33,13 @@ impl GRPCDirectoryService { } impl DirectoryService for GRPCDirectoryService { + type DirectoriesIterator = StreamIterator; + fn get(&self, digest: &[u8; 32]) -> Result<Option<crate::proto::Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest = digest.to_owned(); - // TODO: do requests recursively, populate a backing other - // [DirectoryService] as cache, and ask it first. let task = self.tokio_handle.spawn(async move { let mut s = grpc_client .get(proto::GetDirectoryRequest { @@ -50,7 +54,29 @@ impl DirectoryService for GRPCDirectoryService { }); match self.tokio_handle.block_on(task)? { - Ok(resp) => Ok(resp), + Ok(Some(directory)) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != digest { + Err(crate::Error::StorageError(format!( + "requested directory with digest {}, but got {}", + BASE64.encode(&digest), + BASE64.encode(&actual_digest) + ))) + } else if let Err(e) = directory.validate() { + // Validate the Directory itself is valid. + warn!("directory failed validation: {}", e.to_string()); + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + BASE64.encode(&digest), + e, + ))) + } else { + Ok(Some(directory)) + } + } + Ok(None) => Ok(None), Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(crate::Error::StorageError(e.to_string())), } @@ -76,6 +102,110 @@ impl DirectoryService for GRPCDirectoryService { Err(e) => Err(crate::Error::StorageError(e.to_string())), } } + + #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))] + fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator { + let mut grpc_client = self.grpc_client.clone(); + let root_directory_digest = root_directory_digest.to_owned(); + + let task: tokio::task::JoinHandle<Result<Streaming<proto::Directory>, Status>> = + self.tokio_handle.spawn(async move { + let s = grpc_client + .get(proto::GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(root_directory_digest.to_vec())), + }) + .await? + .into_inner(); + + Ok(s) + }); + + let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); + + StreamIterator::new(self.tokio_handle.clone(), &root_directory_digest, stream) + } +} + +pub struct StreamIterator { + /// A handle into the active tokio runtime. Necessary to run futures to completion. + tokio_handle: tokio::runtime::Handle, + // A stream of [proto::Directory] + stream: Streaming<proto::Directory>, + // The Directory digests we received so far + received_directory_digests: HashSet<[u8; 32]>, + // The Directory digests we're still expecting to get sent. + expected_directory_digests: HashSet<[u8; 32]>, +} + +impl StreamIterator { + pub fn new( + tokio_handle: tokio::runtime::Handle, + root_digest: &[u8; 32], + stream: Streaming<proto::Directory>, + ) -> Self { + Self { + tokio_handle, + stream, + received_directory_digests: HashSet::new(), + expected_directory_digests: HashSet::from([*root_digest]), + } + } +} + +impl Iterator for StreamIterator { + type Item = Result<proto::Directory, crate::Error>; + + fn next(&mut self) -> Option<Self::Item> { + match self.tokio_handle.block_on(self.stream.message()) { + Ok(ok) => match ok { + Some(directory) => { + // validate the directory itself. + if let Err(e) = directory.validate() { + return Some(Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + BASE64.encode(&directory.digest()), + e, + )))); + } + // validate we actually expected that directory, and move it from expected to received. + let directory_digest = directory.digest(); + let was_expected = self.expected_directory_digests.remove(&directory_digest); + if !was_expected { + // FUTUREWORK: dumb clients might send the same stuff twice. + // as a fallback, we might want to tolerate receiving + // it if it's in received_directory_digests (as that + // means it once was in expected_directory_digests) + return Some(Err(crate::Error::StorageError(format!( + "received unexpected directory {}", + BASE64.encode(&directory_digest) + )))); + } + self.received_directory_digests.insert(directory_digest); + + // register all children in expected_directory_digests. + for child_directories in &directory.directories { + self.expected_directory_digests + .insert(child_directories.digest.clone().try_into().unwrap()); + } + + Some(Ok(directory)) + } + None => { + // If we were still expecting something, that's an error. + if !self.expected_directory_digests.is_empty() { + Some(Err(crate::Error::StorageError(format!( + "still expected {} directories, but got premature end of stream", + self.expected_directory_digests.len(), + )))) + } else { + None + } + } + }, + Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), + } + } } #[cfg(test)] |