about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-03-23T12·49+0100
committerclbot <clbot@tvl.fyi>2023-03-27T08·48+0000
commit367a5e9922264b787667fd5e750c8eadf8a7796f (patch)
treedd34d886aa0cbda5668f3126870552e6002a857e
parentb919f297528b74d9eab95fe93aa87541d7dffc53 (diff)
feat(tvix/store/directorysvc): add gRPC client r/6044
This provides a GRPCDirectoryService struct implementing
DirectoryService, allowing a client to Directory objects from a (remote)
tvix-store.

Remote in this case is anything outside the current process, be it
another process, or an endpoint on the network.

To keep the sync interface in the `DirectoryService` trait, a handle to
some tokio runtime needs to be passed into the constructor, and the two
methods use `self.tokio_handle.spawn` to start an async function, and
`self.tokio_handle.block_on` to wait for its completion.

The client handle, called `grpc_client` itself is easy to clone, and
treats concurrent requests internally. This means, even though we keep
the `DirectoryService` trait sync, there's nothing preventing it from
being used concurrently, let's say from multiple threads.

There's still two limitations for now:

1) The trait doesn't make use of the `recursive` request, which
   currently leads to a N+1 query problem. This can be fixed
   by `GRPCDirectoryService` having a reference to another
   `DirectoryService` acting as the local side.
   I want to wait for general store composition code to pop up before
   manually coding this here.

2) It's currently only possible to put() leaf directory nodes, as the
   request normally requires uploading a whole closure. We might want
   to add another batch function to upload a whole closure, and/or do
   this batching in certain cases. This still needs some more thinking.

Change-Id: I7ffec791610b72c0960cf5307cefbb12ec946dc9
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8336
Tested-by: BuildkiteCI
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
-rw-r--r--tvix/Cargo.lock2
-rw-r--r--tvix/Cargo.nix21
-rw-r--r--tvix/store/Cargo.toml3
-rw-r--r--tvix/store/src/directoryservice/grpc.rs180
-rw-r--r--tvix/store/src/directoryservice/mod.rs2
5 files changed, 204 insertions, 4 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock
index a241f0e8f3..33c2879d8c 100644
--- a/tvix/Cargo.lock
+++ b/tvix/Cargo.lock
@@ -2475,6 +2475,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
 dependencies = [
  "cfg-if",
+ "log",
  "pin-project-lite",
  "tracing-attributes",
  "tracing-core",
@@ -2647,6 +2648,7 @@ dependencies = [
  "tonic-build",
  "tonic-mock",
  "tonic-reflection",
+ "tower",
  "tracing",
  "tracing-subscriber",
  "walkdir",
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index d7b7d858ac..027eefeda3 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -7180,7 +7180,7 @@ rec {
           "tracing" = [ "dep:tracing" ];
           "util" = [ "__common" "futures-util" "pin-project" ];
         };
-        resolvedDefaultFeatures = [ "__common" "balance" "buffer" "discover" "futures-core" "futures-util" "indexmap" "limit" "load" "make" "pin-project" "pin-project-lite" "rand" "ready-cache" "slab" "timeout" "tokio" "tokio-util" "tracing" "util" ];
+        resolvedDefaultFeatures = [ "__common" "balance" "buffer" "default" "discover" "futures-core" "futures-util" "indexmap" "limit" "load" "log" "make" "pin-project" "pin-project-lite" "rand" "ready-cache" "slab" "timeout" "tokio" "tokio-util" "tracing" "util" ];
       };
       "tower-layer" = rec {
         crateName = "tower-layer";
@@ -7217,6 +7217,11 @@ rec {
             packageId = "cfg-if";
           }
           {
+            name = "log";
+            packageId = "log";
+            optional = true;
+          }
+          {
             name = "pin-project-lite";
             packageId = "pin-project-lite";
           }
@@ -7231,6 +7236,12 @@ rec {
             usesDefaultFeatures = false;
           }
         ];
+        devDependencies = [
+          {
+            name = "log";
+            packageId = "log";
+          }
+        ];
         features = {
           "attributes" = [ "tracing-attributes" ];
           "default" = [ "std" "attributes" ];
@@ -7240,7 +7251,7 @@ rec {
           "tracing-attributes" = [ "dep:tracing-attributes" ];
           "valuable" = [ "tracing-core/valuable" ];
         };
-        resolvedDefaultFeatures = [ "attributes" "default" "std" "tracing-attributes" ];
+        resolvedDefaultFeatures = [ "attributes" "default" "log" "std" "tracing-attributes" ];
       };
       "tracing-attributes" = rec {
         crateName = "tracing-attributes";
@@ -7827,7 +7838,7 @@ rec {
           {
             name = "tokio";
             packageId = "tokio";
-            features = [ "rt-multi-thread" ];
+            features = [ "rt-multi-thread" "net" ];
           }
           {
             name = "tokio-stream";
@@ -7848,6 +7859,10 @@ rec {
             optional = true;
           }
           {
+            name = "tower";
+            packageId = "tower";
+          }
+          {
             name = "tracing";
             packageId = "tracing";
           }
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index 569b78076a..81a3f9a9df 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -18,12 +18,13 @@ sha2 = "0.10.6"
 sled = { version = "0.34.7", features = ["compression"] }
 thiserror = "1.0.38"
 tokio-stream = "0.1.11"
-tokio = { version = "1.23.0", features = ["rt-multi-thread"] }
+tokio = { version = "1.23.0", features = ["rt-multi-thread", "net"] }
 tonic = "0.8.2"
 tracing = "0.1.37"
 tracing-subscriber = { version = "0.3.16", features = ["json"] }
 walkdir = "2.3.2"
 tokio-util = { version = "0.7.7", features = ["io", "io-util"] }
+tower = "0.4.13"
 
 [dependencies.tonic-reflection]
 optional = true
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs
new file mode 100644
index 0000000000..b036f16aca
--- /dev/null
+++ b/tvix/store/src/directoryservice/grpc.rs
@@ -0,0 +1,180 @@
+use super::DirectoryService;
+use crate::proto::{self, get_directory_request::ByWhat};
+use tonic::transport::Channel;
+use tonic::Code;
+
+/// Connects to a (remote) tvix-store DirectoryService over gRPC.
+#[derive(Clone)]
+pub struct GRPCDirectoryService {
+    /// A handle into the active tokio runtime. Necessary to spawn tasks.
+    tokio_handle: tokio::runtime::Handle,
+
+    /// The internal reference to a gRPC client.
+    /// Cloning it is cheap, and it internally handles concurrent requests.
+    grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
+}
+
+impl GRPCDirectoryService {
+    /// Construct a new [GRPCDirectoryService], by passing a handle to the
+    /// tokio runtime, and a gRPC client.
+    pub fn new(
+        tokio_handle: tokio::runtime::Handle,
+        grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>,
+    ) -> Self {
+        Self {
+            tokio_handle,
+            grpc_client,
+        }
+    }
+}
+
+impl DirectoryService for GRPCDirectoryService {
+    fn get(&self, digest: &[u8; 32]) -> Result<Option<crate::proto::Directory>, crate::Error> {
+        // Get a new handle to the gRPC client, and copy the digest.
+        let mut grpc_client = self.grpc_client.clone();
+        let digest = digest.to_owned();
+
+        // TODO: do requests recursively, populate a backing other
+        // [DirectoryService] as cache, and ask it first.
+        let task = self.tokio_handle.spawn(async move {
+            let mut s = grpc_client
+                .get(proto::GetDirectoryRequest {
+                    recursive: false,
+                    by_what: Some(ByWhat::Digest(digest.to_vec())),
+                })
+                .await?
+                .into_inner();
+
+            // Retrieve the first message only, then close the stream (we set recursive to false)
+            s.message().await
+        });
+
+        match self.tokio_handle.block_on(task)? {
+            Ok(resp) => Ok(resp),
+            Err(e) if e.code() == Code::NotFound => Ok(None),
+            Err(e) => Err(crate::Error::StorageError(e.to_string())),
+        }
+    }
+
+    fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> {
+        let mut grpc_client = self.grpc_client.clone();
+
+        // TODO: this currently doesn't work for directories referring to other
+        // directories, as we're required to upload the whole closure all the
+        // time.
+        let task = self
+            .tokio_handle
+            .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await });
+
+        match self.tokio_handle.block_on(task)? {
+            Ok(put_directory_resp) => Ok(put_directory_resp
+                .into_inner()
+                .root_digest
+                .as_slice()
+                .try_into()
+                .unwrap()), // TODO: map error
+            Err(e) => Err(crate::Error::StorageError(e.to_string())),
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use core::time;
+    use std::thread;
+
+    use tempfile::TempDir;
+    use tokio::net::{UnixListener, UnixStream};
+    use tokio_stream::wrappers::UnixListenerStream;
+    use tonic::transport::{Endpoint, Server, Uri};
+
+    use crate::{
+        directoryservice::DirectoryService,
+        proto,
+        proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper},
+        tests::{fixtures::DIRECTORY_A, utils::gen_directory_service},
+    };
+
+    #[test]
+    fn test() -> anyhow::Result<()> {
+        let tmpdir = TempDir::new().unwrap();
+        let socket_path = tmpdir.path().join("socket");
+
+        // Spin up a server, in a thread far away, which spawns its own tokio runtime,
+        // and blocks on the task.
+        let socket_path_clone = socket_path.clone();
+        thread::spawn(move || {
+            // Create the runtime
+            let rt = tokio::runtime::Runtime::new().unwrap();
+            // Get a handle from this runtime
+            let handle = rt.handle();
+
+            let task = handle.spawn(async {
+                let uds = UnixListener::bind(socket_path_clone).unwrap();
+                let uds_stream = UnixListenerStream::new(uds);
+
+                // spin up a new DirectoryService
+                let mut server = Server::builder();
+                let router = server.add_service(DirectoryServiceServer::new(
+                    GRPCDirectoryServiceWrapper::from(gen_directory_service()),
+                ));
+                router.serve_with_incoming(uds_stream).await
+            });
+
+            handle.block_on(task)
+        });
+
+        // set up the local client runtime. This is similar to what the [tokio:test] macro desugars to.
+        let tester_runtime = tokio::runtime::Builder::new_current_thread()
+            .enable_all()
+            .build()
+            .unwrap();
+
+        // TODO: wait for the socket to be created
+        std::thread::sleep(time::Duration::from_millis(200));
+
+        let task = tester_runtime.spawn_blocking(move || {
+            // Create a channel, connecting to the uds at socket_path.
+            // The URI is unused.
+            let channel = Endpoint::try_from("http://[::]:50051")
+                .unwrap()
+                .connect_with_connector_lazy(tower::service_fn(move |_: Uri| {
+                    UnixStream::connect(socket_path.clone())
+                }));
+
+            let grpc_client = proto::directory_service_client::DirectoryServiceClient::new(channel);
+
+            // create the GrpcDirectoryService, using the tester_runtime.
+            let directory_service =
+                super::GRPCDirectoryService::new(tokio::runtime::Handle::current(), grpc_client);
+
+            // try to get DIRECTORY_A should return Ok(None)
+            assert_eq!(
+                None,
+                directory_service
+                    .get(&DIRECTORY_A.digest())
+                    .expect("must not fail")
+            );
+
+            // Now upload it
+            assert_eq!(
+                DIRECTORY_A.digest(),
+                directory_service
+                    .put(DIRECTORY_A.clone())
+                    .expect("must succeed")
+            );
+
+            // And retrieve it. We don't compare the two structs literally
+            assert_eq!(
+                DIRECTORY_A.clone(),
+                directory_service
+                    .get(&DIRECTORY_A.digest())
+                    .expect("must succeed")
+                    .expect("must be some")
+            )
+        });
+        tester_runtime.block_on(task)?;
+
+        Ok(())
+    }
+}
diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs
index a27f674533..1fc8fbc4c2 100644
--- a/tvix/store/src/directoryservice/mod.rs
+++ b/tvix/store/src/directoryservice/mod.rs
@@ -1,7 +1,9 @@
 use crate::{proto, Error};
+mod grpc;
 mod memory;
 mod sled;
 
+pub use self::grpc::GRPCDirectoryService;
 pub use self::memory::MemoryDirectoryService;
 pub use self::sled::SledDirectoryService;