about summary refs log tree commit diff
path: root/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/proto/grpc_directoryservice_wrapper.rs')
-rw-r--r--tvix/castore/src/proto/grpc_directoryservice_wrapper.rs103
1 files changed, 103 insertions, 0 deletions
diff --git a/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
new file mode 100644
index 0000000000..5c1428690c
--- /dev/null
+++ b/tvix/castore/src/proto/grpc_directoryservice_wrapper.rs
@@ -0,0 +1,103 @@
+use crate::directoryservice::ClosureValidator;
+use crate::proto;
+use crate::{directoryservice::DirectoryService, B3Digest};
+use futures::stream::BoxStream;
+use futures::TryStreamExt;
+use std::ops::Deref;
+use tokio_stream::once;
+use tonic::{async_trait, Request, Response, Status, Streaming};
+use tracing::{instrument, warn};
+
+pub struct GRPCDirectoryServiceWrapper<T> {
+    directory_service: T,
+}
+
+impl<T> GRPCDirectoryServiceWrapper<T> {
+    pub fn new(directory_service: T) -> Self {
+        Self { directory_service }
+    }
+}
+
+#[async_trait]
+impl<T> proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<T>
+where
+    T: Deref<Target = dyn DirectoryService> + Send + Sync + 'static,
+{
+    type GetStream = BoxStream<'static, tonic::Result<proto::Directory, Status>>;
+
+    #[instrument(skip_all)]
+    async fn get<'a>(
+        &'a self,
+        request: Request<proto::GetDirectoryRequest>,
+    ) -> Result<Response<Self::GetStream>, Status> {
+        let req_inner = request.into_inner();
+
+        let by_what = &req_inner
+            .by_what
+            .ok_or_else(|| Status::invalid_argument("invalid by_what"))?;
+
+        match by_what {
+            proto::get_directory_request::ByWhat::Digest(ref digest) => {
+                let digest: B3Digest = digest
+                    .clone()
+                    .try_into()
+                    .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
+
+                Ok(tonic::Response::new({
+                    if !req_inner.recursive {
+                        let directory = self
+                            .directory_service
+                            .get(&digest)
+                            .await
+                            .map_err(|e| {
+                                warn!(err = %e, directory.digest=%digest, "failed to get directory");
+                                tonic::Status::new(tonic::Code::Internal, e.to_string())
+                            })?
+                            .ok_or_else(|| {
+                                Status::not_found(format!("directory {} not found", digest))
+                            })?;
+
+                        Box::pin(once(Ok(directory)))
+                    } else {
+                        // If recursive was requested, traverse via get_recursive.
+                        Box::pin(
+                            self.directory_service.get_recursive(&digest).map_err(|e| {
+                                tonic::Status::new(tonic::Code::Internal, e.to_string())
+                            }),
+                        )
+                    }
+                }))
+            }
+        }
+    }
+
+    #[instrument(skip_all)]
+    async fn put(
+        &self,
+        request: Request<Streaming<proto::Directory>>,
+    ) -> Result<Response<proto::PutDirectoryResponse>, Status> {
+        let mut req_inner = request.into_inner();
+
+        // We put all Directory messages we receive into ClosureValidator first.
+        let mut validator = ClosureValidator::default();
+        while let Some(directory) = req_inner.message().await? {
+            validator.add(directory)?;
+        }
+
+        // drain, which validates connectivity too.
+        let directories = validator.finalize()?;
+
+        let mut directory_putter = self.directory_service.put_multiple_start();
+        for directory in directories {
+            directory_putter.put(directory).await?;
+        }
+
+        // Properly close the directory putter. Peek at last_directory_digest
+        // and return it, or propagate errors.
+        let last_directory_dgst = directory_putter.close().await?;
+
+        Ok(Response::new(proto::PutDirectoryResponse {
+            root_digest: last_directory_dgst.into(),
+        }))
+    }
+}