diff options
author | Florian Klink <flokli@flokli.de> | 2023-03-26T11·51+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-03-27T09·03+0000 |
commit | 2fe53cce40af94d9c8e6971cbf32073ecc77d4a1 (patch) | |
tree | a57a2daee29058143aec64efe335bfbc9cc55555 /tvix/store | |
parent | 2d305fd5b37fa7bf5a0512e8992b4557a1745296 (diff) |
feat(tvix/store/directorysvc): add DirectoryService::get_recursive() r/6046
This moves the recursive BFS traversal of Directory closures from the GRPCDirectoryServiceWrapper out into a a DirectoryTraverser struct implementing Iterator. It is then used from various implementors of DirectoryService in the `get_recursive()` method. This allows distinguishing between recursive requests and non-recursive requests in the gRPC client trait implementation. Change-Id: I50bfd4a0d9eb11832847329b78c587ec7c9dc7b1 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8351 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 140 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/memory.rs | 19 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 113 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/sled.rs | 19 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_directoryservice_wrapper.rs | 130 |
5 files changed, 320 insertions, 101 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index b036f16aca34..e44106b3291f 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -1,7 +1,11 @@ +use std::collections::HashSet; + use super::DirectoryService; use crate::proto::{self, get_directory_request::ByWhat}; -use tonic::transport::Channel; -use tonic::Code; +use data_encoding::BASE64; +use tonic::{transport::Channel, Status}; +use tonic::{Code, Streaming}; +use tracing::{instrument, warn}; /// Connects to a (remote) tvix-store DirectoryService over gRPC. #[derive(Clone)] @@ -29,13 +33,13 @@ impl GRPCDirectoryService { } impl DirectoryService for GRPCDirectoryService { + type DirectoriesIterator = StreamIterator; + fn get(&self, digest: &[u8; 32]) -> Result<Option<crate::proto::Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); let digest = digest.to_owned(); - // TODO: do requests recursively, populate a backing other - // [DirectoryService] as cache, and ask it first. let task = self.tokio_handle.spawn(async move { let mut s = grpc_client .get(proto::GetDirectoryRequest { @@ -50,7 +54,29 @@ impl DirectoryService for GRPCDirectoryService { }); match self.tokio_handle.block_on(task)? { - Ok(resp) => Ok(resp), + Ok(Some(directory)) => { + // Validate the retrieved Directory indeed has the + // digest we expect it to have, to detect corruptions. + let actual_digest = directory.digest(); + if actual_digest != digest { + Err(crate::Error::StorageError(format!( + "requested directory with digest {}, but got {}", + BASE64.encode(&digest), + BASE64.encode(&actual_digest) + ))) + } else if let Err(e) = directory.validate() { + // Validate the Directory itself is valid. + warn!("directory failed validation: {}", e.to_string()); + Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + BASE64.encode(&digest), + e, + ))) + } else { + Ok(Some(directory)) + } + } + Ok(None) => Ok(None), Err(e) if e.code() == Code::NotFound => Ok(None), Err(e) => Err(crate::Error::StorageError(e.to_string())), } @@ -76,6 +102,110 @@ impl DirectoryService for GRPCDirectoryService { Err(e) => Err(crate::Error::StorageError(e.to_string())), } } + + #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))] + fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator { + let mut grpc_client = self.grpc_client.clone(); + let root_directory_digest = root_directory_digest.to_owned(); + + let task: tokio::task::JoinHandle<Result<Streaming<proto::Directory>, Status>> = + self.tokio_handle.spawn(async move { + let s = grpc_client + .get(proto::GetDirectoryRequest { + recursive: true, + by_what: Some(ByWhat::Digest(root_directory_digest.to_vec())), + }) + .await? + .into_inner(); + + Ok(s) + }); + + let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); + + StreamIterator::new(self.tokio_handle.clone(), &root_directory_digest, stream) + } +} + +pub struct StreamIterator { + /// A handle into the active tokio runtime. Necessary to run futures to completion. + tokio_handle: tokio::runtime::Handle, + // A stream of [proto::Directory] + stream: Streaming<proto::Directory>, + // The Directory digests we received so far + received_directory_digests: HashSet<[u8; 32]>, + // The Directory digests we're still expecting to get sent. + expected_directory_digests: HashSet<[u8; 32]>, +} + +impl StreamIterator { + pub fn new( + tokio_handle: tokio::runtime::Handle, + root_digest: &[u8; 32], + stream: Streaming<proto::Directory>, + ) -> Self { + Self { + tokio_handle, + stream, + received_directory_digests: HashSet::new(), + expected_directory_digests: HashSet::from([*root_digest]), + } + } +} + +impl Iterator for StreamIterator { + type Item = Result<proto::Directory, crate::Error>; + + fn next(&mut self) -> Option<Self::Item> { + match self.tokio_handle.block_on(self.stream.message()) { + Ok(ok) => match ok { + Some(directory) => { + // validate the directory itself. + if let Err(e) = directory.validate() { + return Some(Err(crate::Error::StorageError(format!( + "directory {} failed validation: {}", + BASE64.encode(&directory.digest()), + e, + )))); + } + // validate we actually expected that directory, and move it from expected to received. + let directory_digest = directory.digest(); + let was_expected = self.expected_directory_digests.remove(&directory_digest); + if !was_expected { + // FUTUREWORK: dumb clients might send the same stuff twice. + // as a fallback, we might want to tolerate receiving + // it if it's in received_directory_digests (as that + // means it once was in expected_directory_digests) + return Some(Err(crate::Error::StorageError(format!( + "received unexpected directory {}", + BASE64.encode(&directory_digest) + )))); + } + self.received_directory_digests.insert(directory_digest); + + // register all children in expected_directory_digests. + for child_directories in &directory.directories { + self.expected_directory_digests + .insert(child_directories.digest.clone().try_into().unwrap()); + } + + Some(Ok(directory)) + } + None => { + // If we were still expecting something, that's an error. + if !self.expected_directory_digests.is_empty() { + Some(Err(crate::Error::StorageError(format!( + "still expected {} directories, but got premature end of stream", + self.expected_directory_digests.len(), + )))) + } else { + None + } + } + }, + Err(e) => Some(Err(crate::Error::StorageError(e.to_string()))), + } + } } #[cfg(test)] diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs index 7440be112cfd..3d7351033c37 100644 --- a/tvix/store/src/directoryservice/memory.rs +++ b/tvix/store/src/directoryservice/memory.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; use tracing::{instrument, warn}; -use super::DirectoryService; +use super::{DirectoryService, DirectoryTraverser}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { @@ -12,6 +12,8 @@ pub struct MemoryDirectoryService { } impl DirectoryService for MemoryDirectoryService { + type DirectoriesIterator = DirectoryTraverser<Self>; + #[instrument(skip(self, digest), fields(directory.digest = BASE64.encode(digest)))] fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error> { let db = self.db.read()?; @@ -33,6 +35,16 @@ impl DirectoryService for MemoryDirectoryService { ))); } + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + BASE64.encode(&actual_digest), + e, + ))); + } + Ok(Some(directory.clone())) } } @@ -57,4 +69,9 @@ impl DirectoryService for MemoryDirectoryService { Ok(digest) } + + #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))] + fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator { + DirectoryTraverser::with(self.clone(), root_directory_digest) + } } diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index 1fc8fbc4c219..d7b143df3efe 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -1,3 +1,7 @@ +use std::collections::{HashSet, VecDeque}; + +use tracing::{debug_span, instrument, warn}; + use crate::{proto, Error}; mod grpc; mod memory; @@ -11,10 +15,119 @@ pub use self::sled::SledDirectoryService; /// This is a simple get and put of [crate::proto::Directory], returning their /// digest. pub trait DirectoryService { + type DirectoriesIterator: Iterator<Item = Result<proto::Directory, Error>> + Send; + /// Get looks up a single Directory message by its digest. /// In case the directory is not found, Ok(None) is returned. fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error>; /// Get uploads a single Directory message, and returns the calculated /// digest, or an error. fn put(&self, directory: proto::Directory) -> Result<[u8; 32], Error>; + + /// Looks up a closure of [proto::Directory]. + /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`, + /// and we'd be able to add a default implementation for it here, but + /// we can't have that yet. + fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator; +} + +/// Traverses a [proto::Directory] from the root to the children. +/// +/// This is mostly BFS, but directories are only returned once. +pub struct DirectoryTraverser<DS: DirectoryService> { + directory_service: DS, + /// The list of all directories that still need to be traversed. The next + /// element is picked from the front, new elements are enqueued at the + /// back. + worklist_directory_digests: VecDeque<[u8; 32]>, + /// The list of directory digests already sent to the consumer. + /// We omit sending the same directories multiple times. + sent_directory_digests: HashSet<[u8; 32]>, +} + +impl<DS: DirectoryService> DirectoryTraverser<DS> { + pub fn with(directory_service: DS, root_directory_digest: &[u8; 32]) -> Self { + Self { + directory_service, + worklist_directory_digests: VecDeque::from([*root_directory_digest]), + sent_directory_digests: HashSet::new(), + } + } + + // enqueue all child directory digests to the work queue, as + // long as they're not part of the worklist or already sent. + // This panics if the digest looks invalid, it's supposed to be checked first. + fn enqueue_child_directories(&mut self, directory: &proto::Directory) { + for child_directory_node in &directory.directories { + let child_digest: [u8; 32] = child_directory_node + .digest + .as_slice() + .try_into() + .map_err(|_e| Error::StorageError("invalid digest length".to_string())) + .unwrap(); + + if self.worklist_directory_digests.contains(&child_digest) + || self.sent_directory_digests.contains(&child_digest) + { + continue; + } + self.worklist_directory_digests.push_back(child_digest); + } + } +} + +impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { + type Item = Result<proto::Directory, Error>; + + #[instrument(skip_all)] + fn next(&mut self) -> Option<Self::Item> { + // fetch the next directory digest from the top of the work queue. + match self.worklist_directory_digests.pop_front() { + None => None, + Some(current_directory_digest) => { + let current_directory_b64 = data_encoding::BASE64.encode(¤t_directory_digest); + let span = debug_span!("directory.digest", current_directory_b64); + let _ = span.enter(); + + // look up the directory itself. + let current_directory = match self.directory_service.get(¤t_directory_digest) + { + // if we got it + Ok(Some(current_directory)) => { + // validate, we don't want to send invalid directories. + if let Err(e) = current_directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Some(Err(Error::StorageError(format!( + "invalid directory: {}", + current_directory_b64 + )))); + } + current_directory + } + // if it's not there, we have an inconsistent store! + Ok(None) => { + warn!("directory {} does not exist", current_directory_b64); + return Some(Err(Error::StorageError(format!( + "directory {} does not exist", + current_directory_b64 + )))); + } + Err(e) => { + warn!("failed to look up directory"); + return Some(Err(Error::StorageError(format!( + "unable to look up directory {}: {}", + current_directory_b64, e + )))); + } + }; + + // All DirectoryServices MUST validate directory nodes, before returning them out, so we + // can be sure [enqueue_child_directories] doesn't panic. + + // enqueue child directories + self.enqueue_child_directories(¤t_directory); + Some(Ok(current_directory)) + } + } + } } diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs index 1f729a594c19..44d61ae42ae5 100644 --- a/tvix/store/src/directoryservice/sled.rs +++ b/tvix/store/src/directoryservice/sled.rs @@ -5,7 +5,7 @@ use prost::Message; use std::path::PathBuf; use tracing::{instrument, warn}; -use super::DirectoryService; +use super::{DirectoryService, DirectoryTraverser}; #[derive(Clone)] pub struct SledDirectoryService { @@ -29,6 +29,8 @@ impl SledDirectoryService { } impl DirectoryService for SledDirectoryService { + type DirectoriesIterator = DirectoryTraverser<Self>; + #[instrument(name = "SledDirectoryService::get", skip(self, digest), fields(directory.digest = BASE64.encode(digest)))] fn get(&self, digest: &[u8; 32]) -> Result<Option<proto::Directory>, Error> { match self.db.get(digest) { @@ -49,6 +51,16 @@ impl DirectoryService for SledDirectoryService { ))); } + // Validate the Directory itself is valid. + if let Err(e) = directory.validate() { + warn!("directory failed validation: {}", e.to_string()); + return Err(Error::StorageError(format!( + "directory {} failed validation: {}", + BASE64.encode(&actual_digest), + e, + ))); + } + Ok(Some(directory)) } Err(e) => { @@ -80,4 +92,9 @@ impl DirectoryService for SledDirectoryService { } Ok(digest) } + + #[instrument(skip_all, fields(directory.digest = BASE64.encode(root_directory_digest)))] + fn get_recursive(&self, root_directory_digest: &[u8; 32]) -> Self::DirectoriesIterator { + DirectoryTraverser::with(self.clone(), root_directory_digest) + } } diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs index e73dfdab1866..06df07a108fb 100644 --- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs @@ -1,11 +1,11 @@ use crate::directoryservice::DirectoryService; use crate::proto; use data_encoding::BASE64; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::HashMap; use tokio::{sync::mpsc::channel, task}; use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Status, Streaming}; -use tracing::{debug, info_span, instrument, warn}; +use tracing::{debug, instrument, warn}; pub struct GRPCDirectoryServiceWrapper<C: DirectoryService> { directory_service: C, @@ -36,109 +36,51 @@ impl<DS: DirectoryService + Send + Sync + Clone + 'static> let directory_service = self.directory_service.clone(); - // kick off an async thread - task::spawn(async move { - // Keep the list of directory digests to traverse. - // As per rpc_directory.proto, we traverse in BFS order. - let mut deq: VecDeque<[u8; 32]> = VecDeque::new(); - + let _task = { // look at the digest in the request and put it in the top of the queue. match &req_inner.by_what { None => return Err(Status::invalid_argument("by_what needs to be specified")), Some(proto::get_directory_request::ByWhat::Digest(digest)) => { - deq.push_back( - digest - .as_slice() - .try_into() - .map_err(|_e| Status::invalid_argument("invalid digest length"))?, - ); - } - } - - // keep a list of all the Directory messages already sent, so we can omit sending the same. - let mut sent_directory_dgsts: HashSet<[u8; 32]> = HashSet::new(); - - // look up the directory at the top of the queue - while let Some(digest) = deq.pop_front() { - let digest_b64: String = BASE64.encode(&digest); - - // add digest we're currently processing to a span, but pay attention to - // https://docs.rs/tracing/0.1.37/tracing/span/struct.Span.html#in-asynchronous-code - // There may be no await in here (we leave the span before the tx.send(…).await) - let span = info_span!("digest", "{}", &digest_b64); - - let res: Result<proto::Directory, Status> = { - let _enter = span.enter(); - - // invoke client.get, and map to a Result<Directory, Status> - match directory_service.get(&digest) { - // The directory was not found, abort - Ok(None) => { - if !sent_directory_dgsts.is_empty() { - // If this is not the first lookup, we have a - // consistency issue, and we're missing some children, of which we have the - // parents. Log this out. - // Both the node we started with, and the - // current digest are part of the span. - warn!("consistency issue: directory not found") + let digest: [u8; 32] = digest + .as_slice() + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + + let digest_b64: String = BASE64.encode(&digest); + + task::spawn(async move { + if !req_inner.recursive { + let e: Result<proto::Directory, Status> = + match directory_service.get(&digest) { + Ok(Some(directory)) => Ok(directory), + Ok(None) => Err(Status::not_found(format!( + "directory {} not found", + digest_b64 + ))), + Err(e) => Err(e.into()), + }; + + if tx.send(e).await.is_err() { + debug!("receiver dropped"); } - Err(Status::not_found(format!( - "directory {} not found", - digest_b64 - ))) - } - Ok(Some(directory)) => { - // if recursion was requested, all its children need to be added to the queue. - // If a Directory message with the same digest has already - // been sent previously, we can skip enqueueing it. - // Same applies to when it already is in the queue. - if req_inner.recursive { - for child_directory_node in &directory.directories { - let child_directory_node_digest: [u8; 32] = - child_directory_node.digest.clone().try_into().map_err( - |_e| { - Status::internal( - "invalid child directory digest len", - ) - }, - )?; - - if !sent_directory_dgsts.contains(&child_directory_node_digest) - && !deq.contains(&child_directory_node_digest) - { - deq.push_back(child_directory_node_digest); - } + } else { + // If recursive was requested, traverse via get_recursive. + let directories_it = directory_service.get_recursive(&digest); + + for e in directories_it { + // map err in res from Error to Status + let res = e.map_err(|e| Status::internal(e.to_string())); + if tx.send(res).await.is_err() { + debug!("receiver dropped"); + break; } } - - // add it to sent_directory_dgsts. - // Strictly speaking, it wasn't sent yet, but tx.send happens right after, - // and the only way we can still fail is by the remote side to hang up, - // in which case we stop anyways. - sent_directory_dgsts.insert(digest); - - Ok(directory) } - Err(e) => Err(e.into()), - } - }; - - // send the result to the client - if (tx.send(res).await).is_err() { - debug!("receiver dropped"); - break; + }); } } + }; - Ok(()) - }); - - // NOTE: this always returns an Ok response, with the first item in the - // stream being a potential error, instead of directly returning the - // first error. - // There's no need to check if the directory node exists twice, - // and client code should consider an Err(), or the first item of the - // stream being an error to be equivalent. let receiver_stream = ReceiverStream::new(rx); Ok(Response::new(receiver_stream)) } |