diff options
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/src/proto/grpc_directoryservice_wrapper.rs | 229 | ||||
-rw-r--r-- | tvix/store/src/proto/mod.rs | 3 |
2 files changed, 232 insertions, 0 deletions
diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs new file mode 100644 index 000000000000..00e219c6e56d --- /dev/null +++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs @@ -0,0 +1,229 @@ +use crate::directoryservice::DirectoryService; +use crate::proto; +use data_encoding::BASE64; +use std::collections::{HashMap, HashSet, VecDeque}; +use tokio::{sync::mpsc::channel, task}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{async_trait, Request, Response, Status, Streaming}; +use tracing::{debug, info_span, instrument, warn}; + +pub struct GRPCDirectoryServiceWrapper<C: DirectoryService> { + client: C, +} + +impl<C: DirectoryService> From<C> for GRPCDirectoryServiceWrapper<C> { + fn from(value: C) -> Self { + Self { client: value } + } +} + +#[async_trait] +impl<C: DirectoryService + Send + Sync + Clone + 'static> + proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<C> +{ + type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>; + + #[instrument(skip(self))] + async fn get( + &self, + request: Request<proto::GetDirectoryRequest>, + ) -> Result<Response<Self::GetStream>, Status> { + let (tx, rx) = channel(5); + + let req_inner = request.into_inner(); + + let client = self.client.clone(); + + // kick off an async thread + task::spawn(async move { + // Keep the list of directory digests to traverse. + // As per rpc_directory.proto, we traverse in BFS order. + let mut deq: VecDeque<Vec<u8>> = VecDeque::new(); + + // look at the digest in the request and put it in the top of the queue. + match &req_inner.by_what { + None => return Err(Status::invalid_argument("by_what needs to be specified")), + Some(proto::get_directory_request::ByWhat::Digest(digest)) => { + if digest.len() != 32 { + return Err(Status::invalid_argument("invalid digest length")); + } + deq.push_back(digest.clone()); + } + } + + // keep a list of all the Directory messages already sent, so we can omit sending the same. + let mut sent_directory_dgsts: HashSet<Vec<u8>> = HashSet::new(); + + // look up the directory at the top of the queue + while let Some(ref digest) = deq.pop_front() { + let digest_b64: String = BASE64.encode(digest); + + // add digest we're currently processing to a span, but pay attention to + // https://docs.rs/tracing/0.1.37/tracing/span/struct.Span.html#in-asynchronous-code + // There may be no await in here (we leave the span before the tx.send(…).await) + let span = info_span!("digest", "{}", &digest_b64); + + let res: Result<proto::Directory, Status> = { + let _enter = span.enter(); + + // invoke client.get, and map to a Result<Directory, Status> + match client.get(&proto::get_directory_request::ByWhat::Digest( + digest.to_vec(), + )) { + // The directory was not found, abort + Ok(None) => { + if !sent_directory_dgsts.is_empty() { + // If this is not the first lookup, we have a + // consistency issue, and we're missing some children, of which we have the + // parents. Log this out. + // Both the node we started with, and the + // current digest are part of the span. + warn!("consistency issue: directory not found") + } + Err(Status::not_found(format!( + "directory {} not found", + digest_b64 + ))) + } + Ok(Some(directory)) => { + // if recursion was requested, all its children need to be added to the queue. + // If a Directory message with the same digest has already + // been sent previously, we can skip enqueueing it. + // Same applies to when it already is in the queue. + if req_inner.recursive { + for child_directory_node in &directory.directories { + if !sent_directory_dgsts.contains(&child_directory_node.digest) + && !deq.contains(&child_directory_node.digest) + { + deq.push_back(child_directory_node.digest.clone()); + } + } + } + + // add it to sent_directory_dgsts. + // Strictly speaking, it wasn't sent yet, but tx.send happens right after, + // and the only way we can still fail is by the remote side to hang up, + // in which case we stop anyways. + sent_directory_dgsts.insert(digest.to_vec()); + + Ok(directory) + } + Err(e) => Err(e.into()), + } + }; + + // send the result to the client + if (tx.send(res).await).is_err() { + debug!("receiver dropped"); + break; + } + } + + Ok(()) + }); + + // NOTE: this always returns an Ok response, with the first item in the + // stream being a potential error, instead of directly returning the + // first error. + // There's no need to check if the directory node exists twice, + // and client code should consider an Err(), or the first item of the + // stream being an error to be equivalent. + let receiver_stream = ReceiverStream::new(rx); + Ok(Response::new(receiver_stream)) + } + + #[instrument(skip(self, request))] + async fn put( + &self, + request: Request<Streaming<proto::Directory>>, + ) -> Result<Response<proto::PutDirectoryResponse>, Status> { + let mut req_inner = request.into_inner(); + // This keeps track of the seen directory keys, and their size. + // This is used to validate the size field of a reference to a previously sent directory. + // We don't need to keep the contents around, they're stored in the DB. + let mut seen_directories_sizes: HashMap<Vec<u8>, u32> = HashMap::new(); + let mut last_directory_dgst: Option<Vec<u8>> = None; + + // Consume directories, and insert them into the store. + // Reject directory messages that refer to Directories not sent in the same stream. + while let Some(directory) = req_inner.message().await? { + // validate the directory itself. + if let Err(e) = directory.validate() { + return Err(Status::invalid_argument(format!( + "directory {} failed validation: {}", + BASE64.encode(&directory.digest()), + e, + ))); + } + + // for each child directory this directory refers to, we need + // to ensure it has been seen already in this stream, and that the size + // matches what we recorded. + for child_directory in &directory.directories { + match seen_directories_sizes.get(&child_directory.digest) { + None => { + return Err(Status::invalid_argument(format!( + "child directory '{}' ({}) in directory '{}' not seen yet", + child_directory.name, + BASE64.encode(&child_directory.digest), + BASE64.encode(&directory.digest()), + ))); + } + Some(seen_child_directory_size) => { + if seen_child_directory_size != &child_directory.size { + return Err(Status::invalid_argument(format!( + "child directory '{}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}", + child_directory.name, + BASE64.encode(&child_directory.digest), + BASE64.encode(&directory.digest()), + seen_child_directory_size, + child_directory.size, + ))); + } + } + } + } + + // TODO: We don't validate the currently received directory refers + // to at least one child we already received. + // This means, we thoeretically allow uploading multiple disconnected graphs, + // and the digest of the last element in the stream becomes the root node. + // For example, you can upload a leaf directory A, a leaf directory + // B, and then as last element a directory C that only refers to A, + // leaving B disconnected. + // At some point, we might want to populate a datastructure that + // does a reachability check. + + let dgst = directory.digest(); + seen_directories_sizes.insert(dgst.clone(), directory.size()); + last_directory_dgst = Some(dgst.clone()); + + // check if the directory already exists in the database. We can skip + // inserting if it's already there, as that'd be a no-op. + match self + .client + .get(&proto::get_directory_request::ByWhat::Digest(dgst.to_vec())) + { + Err(e) => { + warn!("error checking if directory already exists: {}", e); + return Err(e.into()); + } + // skip if already exists + Ok(Some(_)) => {} + // insert if it doesn't already exist + Ok(None) => { + self.client.put(directory)?; + } + } + } + + // We're done receiving. peek at last_directory_digest and either return the digest, + // or an error, if we received an empty stream. + match last_directory_dgst { + None => Err(Status::invalid_argument("no directories received")), + Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse { + root_digest: last_directory_dgst, + })), + } + } +} diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs index b58766d2ea7c..84eeda3c66f0 100644 --- a/tvix/store/src/proto/mod.rs +++ b/tvix/store/src/proto/mod.rs @@ -8,7 +8,10 @@ use prost::Message; use nix_compat::store_path::{ParseStorePathError, StorePath}; mod grpc_blobservice_wrapper; +mod grpc_directoryservice_wrapper; + pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper; +pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper; tonic::include_proto!("tvix.store.v1"); |