use crate::proto; use crate::{directoryservice::DirectoryService, B3Digest}; use futures::StreamExt; use std::collections::HashMap; use std::ops::Deref; use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Status, Streaming}; use tracing::{debug, instrument, warn}; pub struct GRPCDirectoryServiceWrapper<T> { directory_service: T, } impl<T> GRPCDirectoryServiceWrapper<T> { pub fn new(directory_service: T) -> Self { Self { directory_service } } } #[async_trait] impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<T> where T: Deref<Target = dyn DirectoryService> + Send + Sync + 'static, { type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>; #[instrument(skip_all)] async fn get( &self, request: Request<proto::GetDirectoryRequest>, ) -> Result<Response<Self::GetStream>, Status> { let (tx, rx) = channel(5); let req_inner = request.into_inner(); // look at the digest in the request and put it in the top of the queue. match &req_inner.by_what { None => return Err(Status::invalid_argument("by_what needs to be specified")), Some(proto::get_directory_request::ByWhat::Digest(ref digest)) => { let digest: B3Digest = digest .clone() .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; if !req_inner.recursive { let e: Result<proto::Directory, Status> = match self .directory_service .get(&digest) .await { Ok(Some(directory)) => Ok(directory), Ok(None) => { Err(Status::not_found(format!("directory {} not found", digest))) } Err(e) => { warn!(err = %e, directory.digest=%digest, "failed to get directory"); Err(e.into()) } }; if tx.send(e).await.is_err() { debug!("receiver dropped"); } } else { // If recursive was requested, traverse via get_recursive. let mut directories_it = self.directory_service.get_recursive(&digest); while let Some(e) = directories_it.next().await { // map err in res from Error to Status let res = e.map_err(|e| Status::internal(e.to_string())); if tx.send(res).await.is_err() { debug!("receiver dropped"); break; } } } } } let receiver_stream = ReceiverStream::new(rx); Ok(Response::new(receiver_stream)) } #[instrument(skip_all)] async fn put( &self, request: Request<Streaming<proto::Directory>>, ) -> Result<Response<proto::PutDirectoryResponse>, Status> { let mut req_inner = request.into_inner(); // TODO: let this use DirectoryPutter to the store it's connected to, // and move the validation logic into [SimplePutter]. // This keeps track of the seen directory keys, and their size. // This is used to validate the size field of a reference to a previously sent directory. // We don't need to keep the contents around, they're stored in the DB. // https://github.com/rust-lang/rust-clippy/issues/5812 #[allow(clippy::mutable_key_type)] let mut seen_directories_sizes: HashMap<B3Digest, u64> = HashMap::new(); let mut last_directory_dgst: Option<B3Digest> = None; // Consume directories, and insert them into the store. // Reject directory messages that refer to Directories not sent in the same stream. while let Some(directory) = req_inner.message().await? { // validate the directory itself. if let Err(e) = directory.validate() { warn!(err = %e, "invalid directory"); Err(Status::invalid_argument(format!( "directory failed validation: {}", e, )))?; } // for each child directory this directory refers to, we need // to ensure it has been seen already in this stream, and that the size // matches what we recorded. for child_directory in &directory.directories { let child_directory_digest: B3Digest = child_directory .digest .clone() .try_into() .map_err(|_e| Status::internal("invalid child directory digest len"))?; match seen_directories_sizes.get(&child_directory_digest) { None => { return Err(Status::invalid_argument(format!( "child directory '{:?}' ({}) in directory '{}' not seen yet", child_directory.name, &child_directory_digest, &directory.digest(), ))); } Some(seen_child_directory_size) => { if seen_child_directory_size != &child_directory.size { return Err(Status::invalid_argument(format!( "child directory '{:?}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}", child_directory.name, &child_directory_digest, &directory.digest(), seen_child_directory_size, child_directory.size, ))); } } } } // NOTE: We can't know if a directory we're receiving actually is // part of the closure, because we receive directories from the leaf nodes up to // the root. // The only thing we could to would be doing a final check when the // last Directory was received, that all Directories received so far are // reachable from that (root) node. let dgst = directory.digest(); seen_directories_sizes.insert(dgst.clone(), directory.size()); last_directory_dgst = Some(dgst.clone()); // check if the directory already exists in the database. We can skip // inserting if it's already there, as that'd be a no-op. match self.directory_service.get(&dgst).await { Err(e) => { warn!(err = %e, "failed to check if directory already exists"); return Err(e.into()); } // skip if already exists Ok(Some(_)) => {} // insert if it doesn't already exist Ok(None) => { self.directory_service.put(directory).await?; } } } // We're done receiving. peek at last_directory_digest and either return the digest, // or an error, if we received an empty stream. match last_directory_dgst { None => Err(Status::invalid_argument("no directories received")), Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse { root_digest: last_directory_dgst.into(), })), } } }