about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice/grpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/pathinfoservice/grpc.rs')
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs100
1 files changed, 62 insertions, 38 deletions
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index bcee49aac6cf..453044cba13d 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -1,15 +1,18 @@
-use super::PathInfoService;
+use super::{PathInfo, PathInfoService};
 use crate::{
     nar::NarCalculationService,
-    proto::{self, ListPathInfoRequest, PathInfo},
+    proto::{self, ListPathInfoRequest},
 };
 use async_stream::try_stream;
 use futures::stream::BoxStream;
 use nix_compat::nixbase32;
+use std::sync::Arc;
 use tonic::{async_trait, Code};
-use tracing::{instrument, Span};
+use tracing::{instrument, warn, Span};
 use tracing_indicatif::span_ext::IndicatifSpanExt;
-use tvix_castore::{proto as castorepb, Error};
+use tvix_castore::composition::{CompositionContext, ServiceBuilder};
+use tvix_castore::Error;
+use tvix_castore::Node;
 
 /// Connects to a (remote) tvix-store PathInfoService over gRPC.
 #[derive(Clone)]
@@ -50,15 +53,10 @@ where
             .await;
 
         match path_info {
-            Ok(path_info) => {
-                let path_info = path_info.into_inner();
-
-                path_info
-                    .validate()
-                    .map_err(|e| Error::StorageError(format!("invalid pathinfo: {}", e)))?;
-
-                Ok(Some(path_info))
-            }
+            Ok(path_info) => Ok(Some(
+                PathInfo::try_from(path_info.into_inner())
+                    .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?,
+            )),
             Err(e) if e.code() == Code::NotFound => Ok(None),
             Err(e) => Err(Error::StorageError(e.to_string())),
         }
@@ -69,12 +67,12 @@ where
         let path_info = self
             .grpc_client
             .clone()
-            .put(path_info)
+            .put(proto::PathInfo::from(path_info))
             .await
             .map_err(|e| Error::StorageError(e.to_string()))?
             .into_inner();
-
-        Ok(path_info)
+        Ok(PathInfo::try_from(path_info)
+            .map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?)
     }
 
     #[instrument(level = "trace", skip_all)]
@@ -88,21 +86,8 @@ where
 
             loop {
                 match stream.message().await {
-                    Ok(o) => match o {
-                        Some(pathinfo) => {
-                            // validate the pathinfo
-                            if let Err(e) = pathinfo.validate() {
-                                Err(Error::StorageError(format!(
-                                    "pathinfo {:?} failed validation: {}",
-                                    pathinfo, e
-                                )))?;
-                            }
-                            yield pathinfo
-                        }
-                        None => {
-                            return;
-                        },
-                    },
+                    Ok(Some(path_info)) => yield PathInfo::try_from(path_info).map_err(|e| Error::StorageError(format!("Invalid path info: {e}")))?,
+                    Ok(None) => return,
                     Err(e) => Err(Error::StorageError(e.to_string()))?,
                 }
             }
@@ -110,6 +95,13 @@ where
 
         Box::pin(stream)
     }
+
+    #[instrument(level = "trace", skip_all)]
+    fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> {
+        Some(Box::new(GRPCPathInfoService {
+            grpc_client: self.grpc_client.clone(),
+        }) as Box<dyn NarCalculationService>)
+    }
 }
 
 #[async_trait]
@@ -121,10 +113,7 @@ where
     T::Future: Send,
 {
     #[instrument(level = "trace", skip_all, fields(root_node = ?root_node, indicatif.pb_show=1))]
-    async fn calculate_nar(
-        &self,
-        root_node: &castorepb::node::Node,
-    ) -> Result<(u64, [u8; 32]), Error> {
+    async fn calculate_nar(&self, root_node: &Node) -> Result<(u64, [u8; 32]), Error> {
         let span = Span::current();
         span.pb_set_message("Waiting for NAR calculation");
         span.pb_start();
@@ -132,9 +121,10 @@ where
         let path_info = self
             .grpc_client
             .clone()
-            .calculate_nar(castorepb::Node {
-                node: Some(root_node.clone()),
-            })
+            .calculate_nar(tvix_castore::proto::Node::from_name_and_node(
+                "".into(),
+                root_node.to_owned(),
+            ))
             .await
             .map_err(|e| Error::StorageError(e.to_string()))?
             .into_inner();
@@ -149,6 +139,40 @@ where
     }
 }
 
+#[derive(serde::Deserialize, Debug)]
+#[serde(deny_unknown_fields)]
+pub struct GRPCPathInfoServiceConfig {
+    url: String,
+}
+
+impl TryFrom<url::Url> for GRPCPathInfoServiceConfig {
+    type Error = Box<dyn std::error::Error + Send + Sync>;
+    fn try_from(url: url::Url) -> Result<Self, Self::Error> {
+        //   normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
+        // - In the case of unix sockets, there must be a path, but may not be a host.
+        // - In the case of non-unix sockets, there must be a host, but no path.
+        // Constructing the channel is handled by tvix_castore::channel::from_url.
+        Ok(GRPCPathInfoServiceConfig {
+            url: url.to_string(),
+        })
+    }
+}
+
+#[async_trait]
+impl ServiceBuilder for GRPCPathInfoServiceConfig {
+    type Output = dyn PathInfoService;
+    async fn build<'a>(
+        &'a self,
+        _instance_name: &str,
+        _context: &CompositionContext,
+    ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
+        let client = proto::path_info_service_client::PathInfoServiceClient::new(
+            tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?,
+        );
+        Ok(Arc::new(GRPCPathInfoService::from_client(client)))
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use crate::pathinfoservice::tests::make_grpc_path_info_service_client;