about summary refs log tree commit diff
path: root/tvix/store
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store')
-rw-r--r--tvix/store/src/proto/grpc_directoryservice_wrapper.rs229
-rw-r--r--tvix/store/src/proto/mod.rs3
2 files changed, 232 insertions, 0 deletions
diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
new file mode 100644
index 000000000000..00e219c6e56d
--- /dev/null
+++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
@@ -0,0 +1,229 @@
+use crate::directoryservice::DirectoryService;
+use crate::proto;
+use data_encoding::BASE64;
+use std::collections::{HashMap, HashSet, VecDeque};
+use tokio::{sync::mpsc::channel, task};
+use tokio_stream::wrappers::ReceiverStream;
+use tonic::{async_trait, Request, Response, Status, Streaming};
+use tracing::{debug, info_span, instrument, warn};
+
+pub struct GRPCDirectoryServiceWrapper<C: DirectoryService> {
+    client: C,
+}
+
+impl<C: DirectoryService> From<C> for GRPCDirectoryServiceWrapper<C> {
+    fn from(value: C) -> Self {
+        Self { client: value }
+    }
+}
+
+#[async_trait]
+impl<C: DirectoryService + Send + Sync + Clone + 'static>
+    proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<C>
+{
+    type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>;
+
+    #[instrument(skip(self))]
+    async fn get(
+        &self,
+        request: Request<proto::GetDirectoryRequest>,
+    ) -> Result<Response<Self::GetStream>, Status> {
+        let (tx, rx) = channel(5);
+
+        let req_inner = request.into_inner();
+
+        let client = self.client.clone();
+
+        // kick off an async thread
+        task::spawn(async move {
+            // Keep the list of directory digests to traverse.
+            // As per rpc_directory.proto, we traverse in BFS order.
+            let mut deq: VecDeque<Vec<u8>> = VecDeque::new();
+
+            // look at the digest in the request and put it in the top of the queue.
+            match &req_inner.by_what {
+                None => return Err(Status::invalid_argument("by_what needs to be specified")),
+                Some(proto::get_directory_request::ByWhat::Digest(digest)) => {
+                    if digest.len() != 32 {
+                        return Err(Status::invalid_argument("invalid digest length"));
+                    }
+                    deq.push_back(digest.clone());
+                }
+            }
+
+            // keep a list of all the Directory messages already sent, so we can omit sending the same.
+            let mut sent_directory_dgsts: HashSet<Vec<u8>> = HashSet::new();
+
+            // look up the directory at the top of the queue
+            while let Some(ref digest) = deq.pop_front() {
+                let digest_b64: String = BASE64.encode(digest);
+
+                // add digest we're currently processing to a span, but pay attention to
+                // https://docs.rs/tracing/0.1.37/tracing/span/struct.Span.html#in-asynchronous-code
+                // There may be no await in here (we leave the span before the tx.send(…).await)
+                let span = info_span!("digest", "{}", &digest_b64);
+
+                let res: Result<proto::Directory, Status> = {
+                    let _enter = span.enter();
+
+                    // invoke client.get, and map to a Result<Directory, Status>
+                    match client.get(&proto::get_directory_request::ByWhat::Digest(
+                        digest.to_vec(),
+                    )) {
+                        // The directory was not found, abort
+                        Ok(None) => {
+                            if !sent_directory_dgsts.is_empty() {
+                                // If this is not the first lookup, we have a
+                                // consistency issue, and we're missing some children, of which we have the
+                                // parents. Log this out.
+                                // Both the node we started with, and the
+                                // current digest are part of the span.
+                                warn!("consistency issue: directory not found")
+                            }
+                            Err(Status::not_found(format!(
+                                "directory {} not found",
+                                digest_b64
+                            )))
+                        }
+                        Ok(Some(directory)) => {
+                            // if recursion was requested, all its children need to be added to the queue.
+                            // If a Directory message with the same digest has already
+                            // been sent previously, we can skip enqueueing it.
+                            // Same applies to when it already is in the queue.
+                            if req_inner.recursive {
+                                for child_directory_node in &directory.directories {
+                                    if !sent_directory_dgsts.contains(&child_directory_node.digest)
+                                        && !deq.contains(&child_directory_node.digest)
+                                    {
+                                        deq.push_back(child_directory_node.digest.clone());
+                                    }
+                                }
+                            }
+
+                            // add it to sent_directory_dgsts.
+                            // Strictly speaking, it wasn't sent yet, but tx.send happens right after,
+                            // and the only way we can still fail is by the remote side to hang up,
+                            // in which case we stop anyways.
+                            sent_directory_dgsts.insert(digest.to_vec());
+
+                            Ok(directory)
+                        }
+                        Err(e) => Err(e.into()),
+                    }
+                };
+
+                // send the result to the client
+                if (tx.send(res).await).is_err() {
+                    debug!("receiver dropped");
+                    break;
+                }
+            }
+
+            Ok(())
+        });
+
+        // NOTE: this always returns an Ok response, with the first item in the
+        // stream being a potential error, instead of directly returning the
+        // first error.
+        // There's no need to check if the directory node exists twice,
+        // and client code should consider an Err(), or the first item of the
+        // stream being an error to be equivalent.
+        let receiver_stream = ReceiverStream::new(rx);
+        Ok(Response::new(receiver_stream))
+    }
+
+    #[instrument(skip(self, request))]
+    async fn put(
+        &self,
+        request: Request<Streaming<proto::Directory>>,
+    ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
+        let mut req_inner = request.into_inner();
+        // This keeps track of the seen directory keys, and their size.
+        // This is used to validate the size field of a reference to a previously sent directory.
+        // We don't need to keep the contents around, they're stored in the DB.
+        let mut seen_directories_sizes: HashMap<Vec<u8>, u32> = HashMap::new();
+        let mut last_directory_dgst: Option<Vec<u8>> = None;
+
+        // Consume directories, and insert them into the store.
+        // Reject directory messages that refer to Directories not sent in the same stream.
+        while let Some(directory) = req_inner.message().await? {
+            // validate the directory itself.
+            if let Err(e) = directory.validate() {
+                return Err(Status::invalid_argument(format!(
+                    "directory {} failed validation: {}",
+                    BASE64.encode(&directory.digest()),
+                    e,
+                )));
+            }
+
+            // for each child directory this directory refers to, we need
+            // to ensure it has been seen already in this stream, and that the size
+            // matches what we recorded.
+            for child_directory in &directory.directories {
+                match seen_directories_sizes.get(&child_directory.digest) {
+                    None => {
+                        return Err(Status::invalid_argument(format!(
+                            "child directory '{}' ({}) in directory '{}' not seen yet",
+                            child_directory.name,
+                            BASE64.encode(&child_directory.digest),
+                            BASE64.encode(&directory.digest()),
+                        )));
+                    }
+                    Some(seen_child_directory_size) => {
+                        if seen_child_directory_size != &child_directory.size {
+                            return Err(Status::invalid_argument(format!(
+                                    "child directory '{}' ({}) in directory '{}' referred with wrong size, expected {}, actual {}",
+                                    child_directory.name,
+                                    BASE64.encode(&child_directory.digest),
+                                    BASE64.encode(&directory.digest()),
+                                    seen_child_directory_size,
+                                    child_directory.size,
+                                )));
+                        }
+                    }
+                }
+            }
+
+            // TODO: We don't validate the currently received directory refers
+            // to at least one child we already received.
+            // This means, we thoeretically allow uploading multiple disconnected graphs,
+            // and the digest of the last element in the stream becomes the root node.
+            // For example, you can upload a leaf directory A, a leaf directory
+            // B, and then as last element a directory C that only refers to A,
+            // leaving B disconnected.
+            // At some point, we might want to populate a datastructure that
+            // does a reachability check.
+
+            let dgst = directory.digest();
+            seen_directories_sizes.insert(dgst.clone(), directory.size());
+            last_directory_dgst = Some(dgst.clone());
+
+            // check if the directory already exists in the database. We can skip
+            // inserting if it's already there, as that'd be a no-op.
+            match self
+                .client
+                .get(&proto::get_directory_request::ByWhat::Digest(dgst.to_vec()))
+            {
+                Err(e) => {
+                    warn!("error checking if directory already exists: {}", e);
+                    return Err(e.into());
+                }
+                // skip if already exists
+                Ok(Some(_)) => {}
+                // insert if it doesn't already exist
+                Ok(None) => {
+                    self.client.put(directory)?;
+                }
+            }
+        }
+
+        // We're done receiving. peek at last_directory_digest and either return the digest,
+        // or an error, if we received an empty stream.
+        match last_directory_dgst {
+            None => Err(Status::invalid_argument("no directories received")),
+            Some(last_directory_dgst) => Ok(Response::new(proto::PutDirectoryResponse {
+                root_digest: last_directory_dgst,
+            })),
+        }
+    }
+}
diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs
index b58766d2ea7c..84eeda3c66f0 100644
--- a/tvix/store/src/proto/mod.rs
+++ b/tvix/store/src/proto/mod.rs
@@ -8,7 +8,10 @@ use prost::Message;
 use nix_compat::store_path::{ParseStorePathError, StorePath};
 
 mod grpc_blobservice_wrapper;
+mod grpc_directoryservice_wrapper;
+
 pub use grpc_blobservice_wrapper::GRPCBlobServiceWrapper;
+pub use grpc_directoryservice_wrapper::GRPCDirectoryServiceWrapper;
 
 tonic::include_proto!("tvix.store.v1");