about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice/from_addr.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/pathinfoservice/from_addr.rs')
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs151
1 files changed, 31 insertions, 120 deletions
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
index 9173d25d05ca..5635c226c2de 100644
--- a/tvix/store/src/pathinfoservice/from_addr.rs
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -1,13 +1,10 @@
-use crate::proto::path_info_service_client::PathInfoServiceClient;
+use super::PathInfoService;
 
-use super::{
-    GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService,
-    SledPathInfoService,
+use crate::composition::{
+    with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG,
 };
-
-use nix_compat::narinfo;
 use std::sync::Arc;
-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
+use tvix_castore::Error;
 use url::Url;
 
 /// Constructs a new instance of a [PathInfoService] from an URI.
@@ -34,113 +31,21 @@ use url::Url;
 /// these also need to be passed in.
 pub async fn from_addr(
     uri: &str,
-    blob_service: Arc<dyn BlobService>,
-    directory_service: Arc<dyn DirectoryService>,
-) -> Result<Box<dyn PathInfoService>, Error> {
+    context: Option<&CompositionContext<'_>>,
+) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
     #[allow(unused_mut)]
     let mut url =
         Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
 
-    let path_info_service: Box<dyn PathInfoService> = match url.scheme() {
-        "memory" => {
-            // memory doesn't support host or path in the URL.
-            if url.has_host() || !url.path().is_empty() {
-                return Err(Error::StorageError("invalid url".to_string()));
-            }
-            Box::<MemoryPathInfoService>::default()
-        }
-        "sled" => {
-            // sled doesn't support host, and a path can be provided (otherwise
-            // it'll live in memory only).
-            if url.has_host() {
-                return Err(Error::StorageError("no host allowed".to_string()));
-            }
-
-            if url.path() == "/" {
-                return Err(Error::StorageError(
-                    "cowardly refusing to open / with sled".to_string(),
-                ));
-            }
-
-            // TODO: expose other parameters as URL parameters?
-
-            Box::new(if url.path().is_empty() {
-                SledPathInfoService::new_temporary()
-                    .map_err(|e| Error::StorageError(e.to_string()))?
-            } else {
-                SledPathInfoService::new(url.path())
-                    .map_err(|e| Error::StorageError(e.to_string()))?
-            })
-        }
-        "nix+http" | "nix+https" => {
-            // Stringify the URL and remove the nix+ prefix.
-            // We can't use `url.set_scheme(rest)`, as it disallows
-            // setting something http(s) that previously wasn't.
-            let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap();
-
-            let mut nix_http_path_info_service =
-                NixHTTPPathInfoService::new(new_url, blob_service, directory_service);
-
-            let pairs = &url.query_pairs();
-            for (k, v) in pairs.into_iter() {
-                if k == "trusted-public-keys" {
-                    let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect();
-
-                    let mut pubkeys: Vec<narinfo::PubKey> = Vec::with_capacity(pubkey_strs.len());
-                    for pubkey_str in pubkey_strs {
-                        pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| {
-                            Error::StorageError(format!("invalid public key: {e}"))
-                        })?);
-                    }
-
-                    nix_http_path_info_service.set_public_keys(pubkeys);
-                }
-            }
-
-            Box::new(nix_http_path_info_service)
-        }
-        scheme if scheme.starts_with("grpc+") => {
-            // schemes starting with grpc+ go to the GRPCPathInfoService.
-            //   That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
-            // - In the case of unix sockets, there must be a path, but may not be a host.
-            // - In the case of non-unix sockets, there must be a host, but no path.
-            // Constructing the channel is handled by tvix_castore::channel::from_url.
-            Box::new(GRPCPathInfoService::from_client(
-                PathInfoServiceClient::with_interceptor(
-                    tvix_castore::tonic::channel_from_url(&url).await?,
-                    tvix_tracing::propagate::tonic::send_trace,
-                ),
-            ))
-        }
-        #[cfg(feature = "cloud")]
-        "bigtable" => {
-            use super::bigtable::BigtableParameters;
-            use super::BigtablePathInfoService;
-
-            // parse the instance name from the hostname.
-            let instance_name = url
-                .host_str()
-                .ok_or_else(|| Error::StorageError("instance name missing".into()))?
-                .to_string();
-
-            // … but add it to the query string now, so we just need to parse that.
-            url.query_pairs_mut()
-                .append_pair("instance_name", &instance_name);
-
-            let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
-                .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
-
-            Box::new(
-                BigtablePathInfoService::connect(params)
-                    .await
-                    .map_err(|e| Error::StorageError(e.to_string()))?,
-            )
-        }
-        _ => Err(Error::StorageError(format!(
-            "unknown scheme: {}",
-            url.scheme()
-        )))?,
-    };
+    let path_info_service_config = with_registry(&REG, || {
+        <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>>::try_from(
+            url,
+        )
+    })?
+    .0;
+    let path_info_service = path_info_service_config
+        .build("anonymous", context.unwrap_or(&CompositionContext::blank()))
+        .await?;
 
     Ok(path_info_service)
 }
@@ -148,14 +53,12 @@ pub async fn from_addr(
 #[cfg(test)]
 mod tests {
     use super::from_addr;
+    use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder};
     use lazy_static::lazy_static;
     use rstest::rstest;
-    use std::sync::Arc;
     use tempfile::TempDir;
-    use tvix_castore::{
-        blobservice::{BlobService, MemoryBlobService},
-        directoryservice::{DirectoryService, MemoryDirectoryService},
-    };
+    use tvix_castore::blobservice::{BlobService, MemoryBlobServiceConfig};
+    use tvix_castore::directoryservice::{DirectoryService, MemoryDirectoryServiceConfig};
 
     lazy_static! {
         static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap();
@@ -224,11 +127,19 @@ mod tests {
     )]
     #[tokio::test]
     async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) {
-        let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
-        let directory_service: Arc<dyn DirectoryService> =
-            Arc::from(MemoryDirectoryService::default());
-
-        let resp = from_addr(uri_str, blob_service, directory_service).await;
+        let mut comp = Composition::default();
+        comp.extend(vec![(
+            "default".into(),
+            DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {})
+                as Box<dyn ServiceBuilder<Output = dyn BlobService>>),
+        )]);
+        comp.extend(vec![(
+            "default".into(),
+            DeserializeWithRegistry(Box::new(MemoryDirectoryServiceConfig {})
+                as Box<dyn ServiceBuilder<Output = dyn DirectoryService>>),
+        )]);
+
+        let resp = from_addr(uri_str, Some(&comp.context())).await;
 
         if exp_succeed {
             resp.expect("should succeed");