about summary refs log tree commit diff
path: root/tvix/store/src/pathinfoservice/grpc.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-06-11T21·04+0300
committerclbot <clbot@tvl.fyi>2023-06-14T23·15+0000
commit35bff2bda69d5189d9a439cd2032b86ebb4e6e41 (patch)
treea5b22053e83463e8d87ff6edca7e29a31c2606d3 /tvix/store/src/pathinfoservice/grpc.rs
parentbb7c76739a30d2f312693799d8237eb0eb2da28d (diff)
refactor(tvix/store/pathinfosvc): add from_addr r/6304
Change-Id: I24e822351a837fce2aed568a647d009099ef32ec
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8747
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/pathinfoservice/grpc.rs')
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs232
1 files changed, 231 insertions, 1 deletions
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index 230d630cf476..a14f51c9f890 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -1,5 +1,7 @@
 use super::PathInfoService;
-use crate::proto;
+use crate::{blobservice::BlobService, directoryservice::DirectoryService, proto};
+use std::sync::Arc;
+use tokio::net::UnixStream;
 use tonic::{transport::Channel, Code, Status};
 
 /// Connects to a (remote) tvix-store PathInfoService over gRPC.
@@ -27,6 +29,65 @@ impl GRPCPathInfoService {
 }
 
 impl PathInfoService for GRPCPathInfoService {
+    /// Constructs a [GRPCPathInfoService] from the passed [url::Url]:
+    /// - scheme has to match `grpc+*://`.
+    ///   That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
+    /// - In the case of unix sockets, there must be a path, but may not be a host.
+    /// - In the case of non-unix sockets, there must be a host, but no path.
+    /// The blob_service and directory_service arguments are ignored, because the gRPC service already provides answers to these questions.
+    fn from_url(
+        url: &url::Url,
+        _blob_service: Arc<dyn BlobService>,
+        _directory_service: Arc<dyn DirectoryService>,
+    ) -> Result<Self, crate::Error> {
+        // Start checking for the scheme to start with grpc+.
+        match url.scheme().strip_prefix("grpc+") {
+            None => Err(crate::Error::StorageError("invalid scheme".to_string())),
+            Some(rest) => {
+                if rest == "unix" {
+                    if url.host_str().is_some() {
+                        return Err(crate::Error::StorageError(
+                            "host may not be set".to_string(),
+                        ));
+                    }
+                    let path = url.path().to_string();
+                    let channel = tonic::transport::Endpoint::try_from("http://[::]:50051") // doesn't matter
+                        .unwrap()
+                        .connect_with_connector_lazy(tower::service_fn(
+                            move |_: tonic::transport::Uri| UnixStream::connect(path.clone()),
+                        ));
+                    let grpc_client =
+                        proto::path_info_service_client::PathInfoServiceClient::new(channel);
+                    Ok(Self::from_client(grpc_client))
+                } else {
+                    // ensure path is empty, not supported with gRPC.
+                    if !url.path().is_empty() {
+                        return Err(crate::Error::StorageError(
+                            "path may not be set".to_string(),
+                        ));
+                    }
+
+                    // clone the uri, and drop the grpc+ from the scheme.
+                    // Recreate a new uri with the `grpc+` prefix dropped from the scheme.
+                    // We can't use `url.set_scheme(rest)`, as it disallows
+                    // setting something http(s) that previously wasn't.
+                    let url = {
+                        let url_str = url.to_string();
+                        let s_stripped = url_str.strip_prefix("grpc+").unwrap();
+                        url::Url::parse(s_stripped).unwrap()
+                    };
+                    let channel = tonic::transport::Endpoint::try_from(url.to_string())
+                        .unwrap()
+                        .connect_lazy();
+
+                    let grpc_client =
+                        proto::path_info_service_client::PathInfoServiceClient::new(channel);
+                    Ok(Self::from_client(grpc_client))
+                }
+            }
+        }
+    }
+
     fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, crate::Error> {
         // Get a new handle to the gRPC client.
         let mut grpc_client = self.grpc_client.clone();
@@ -99,3 +160,172 @@ impl PathInfoService for GRPCPathInfoService {
         Ok((resp.nar_size, nar_sha256))
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+    use std::thread;
+
+    use tempfile::TempDir;
+    use tokio::net::UnixListener;
+    use tokio::task;
+    use tokio::time;
+    use tokio_stream::wrappers::UnixListenerStream;
+
+    use crate::pathinfoservice::MemoryPathInfoService;
+    use crate::proto::GRPCPathInfoServiceWrapper;
+    use crate::tests::fixtures;
+    use crate::tests::utils::gen_blob_service;
+    use crate::tests::utils::gen_directory_service;
+
+    use super::GRPCPathInfoService;
+    use super::PathInfoService;
+
+    /// This uses the wrong scheme
+    #[test]
+    fn test_invalid_scheme() {
+        let url = url::Url::parse("http://foo.example/test").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This uses the correct scheme for a unix socket.
+    /// The fact that /path/to/somewhere doesn't exist yet is no problem, because we connect lazily.
+    #[tokio::test]
+    async fn test_valid_unix_path() {
+        let url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_ok()
+        );
+    }
+
+    /// This uses the correct scheme for a unix socket,
+    /// but sets a host, which is unsupported.
+    #[tokio::test]
+    async fn test_invalid_unix_path_with_domain() {
+        let url =
+            url::Url::parse("grpc+unix://host.example/path/to/somewhere").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This uses the correct scheme for a HTTP server.
+    /// The fact that nothing is listening there is no problem, because we connect lazily.
+    #[tokio::test]
+    async fn test_valid_http() {
+        let url = url::Url::parse("grpc+http://localhost").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_ok()
+        );
+    }
+
+    /// This uses the correct scheme for a HTTPS server.
+    /// The fact that nothing is listening there is no problem, because we connect lazily.
+    #[tokio::test]
+    async fn test_valid_https() {
+        let url = url::Url::parse("grpc+https://localhost").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_ok()
+        );
+    }
+
+    /// This uses the correct scheme, but also specifies
+    /// an additional path, which is not supported for gRPC.
+    /// The fact that nothing is listening there is no problem, because we connect lazily.
+    #[tokio::test]
+    async fn test_invalid_http_with_path() {
+        let url = url::Url::parse("grpc+https://localhost/some-path").expect("must parse");
+
+        assert!(
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .is_err()
+        );
+    }
+
+    /// This uses the correct scheme for a unix socket, and provides a server on the other side.
+    #[tokio::test]
+    async fn test_valid_unix_path_ping_pong() {
+        let tmpdir = TempDir::new().unwrap();
+        let path = tmpdir.path().join("daemon");
+
+        // let mut join_set = JoinSet::new();
+
+        // prepare a client
+        let client = {
+            let mut url = url::Url::parse("grpc+unix:///path/to/somewhere").expect("must parse");
+            url.set_path(path.to_str().unwrap());
+            GRPCPathInfoService::from_url(&url, gen_blob_service(), gen_directory_service())
+                .expect("must succeed")
+        };
+
+        let path_copy = path.clone();
+
+        // Spin up a server, in a thread far away, which spawns its own tokio runtime,
+        // and blocks on the task.
+        thread::spawn(move || {
+            // Create the runtime
+            let rt = tokio::runtime::Runtime::new().unwrap();
+            // Get a handle from this runtime
+            let handle = rt.handle();
+
+            let task = handle.spawn(async {
+                let uds = UnixListener::bind(path_copy).unwrap();
+                let uds_stream = UnixListenerStream::new(uds);
+
+                // spin up a new server
+                let mut server = tonic::transport::Server::builder();
+                let router = server.add_service(
+                    crate::proto::path_info_service_server::PathInfoServiceServer::new(
+                        GRPCPathInfoServiceWrapper::from(Arc::new(MemoryPathInfoService::new(
+                            gen_blob_service(),
+                            gen_directory_service(),
+                        ))
+                            as Arc<dyn PathInfoService>),
+                    ),
+                );
+                router.serve_with_incoming(uds_stream).await
+            });
+
+            handle.block_on(task)
+        });
+
+        // wait for the socket to be created
+        {
+            let mut socket_created = false;
+            for _try in 1..20 {
+                if path.exists() {
+                    socket_created = true;
+                    break;
+                }
+                tokio::time::sleep(time::Duration::from_millis(20)).await;
+            }
+
+            assert!(
+                socket_created,
+                "expected socket path to eventually get created, but never happened"
+            );
+        }
+
+        let pi = task::spawn_blocking(move || {
+            client
+                .get(fixtures::DUMMY_OUTPUT_HASH.to_vec().try_into().unwrap())
+                .expect("must not be error")
+        })
+        .await
+        .expect("must not be err");
+
+        assert!(pi.is_none());
+    }
+}