diff options
author | Florian Klink <flokli@flokli.de> | 2023-03-23T12·49+0100 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-03-27T08·48+0000 |
commit | 367a5e9922264b787667fd5e750c8eadf8a7796f (patch) | |
tree | dd34d886aa0cbda5668f3126870552e6002a857e /tvix | |
parent | b919f297528b74d9eab95fe93aa87541d7dffc53 (diff) |
feat(tvix/store/directorysvc): add gRPC client r/6044
This provides a GRPCDirectoryService struct implementing DirectoryService, allowing a client to Directory objects from a (remote) tvix-store. Remote in this case is anything outside the current process, be it another process, or an endpoint on the network. To keep the sync interface in the `DirectoryService` trait, a handle to some tokio runtime needs to be passed into the constructor, and the two methods use `self.tokio_handle.spawn` to start an async function, and `self.tokio_handle.block_on` to wait for its completion. The client handle, called `grpc_client` itself is easy to clone, and treats concurrent requests internally. This means, even though we keep the `DirectoryService` trait sync, there's nothing preventing it from being used concurrently, let's say from multiple threads. There's still two limitations for now: 1) The trait doesn't make use of the `recursive` request, which currently leads to a N+1 query problem. This can be fixed by `GRPCDirectoryService` having a reference to another `DirectoryService` acting as the local side. I want to wait for general store composition code to pop up before manually coding this here. 2) It's currently only possible to put() leaf directory nodes, as the request normally requires uploading a whole closure. We might want to add another batch function to upload a whole closure, and/or do this batching in certain cases. This still needs some more thinking. Change-Id: I7ffec791610b72c0960cf5307cefbb12ec946dc9 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8336 Tested-by: BuildkiteCI Autosubmit: flokli <flokli@flokli.de> Reviewed-by: tazjin <tazjin@tvl.su>
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/Cargo.lock | 2 | ||||
-rw-r--r-- | tvix/Cargo.nix | 21 | ||||
-rw-r--r-- | tvix/store/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 180 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 2 |
5 files changed, 204 insertions, 4 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index a241f0e8f399..33c2879d8cce 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -2475,6 +2475,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2647,6 +2648,7 @@ dependencies = [ "tonic-build", "tonic-mock", "tonic-reflection", + "tower", "tracing", "tracing-subscriber", "walkdir", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index d7b7d858acb0..027eefeda34f 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -7180,7 +7180,7 @@ rec { "tracing" = [ "dep:tracing" ]; "util" = [ "__common" "futures-util" "pin-project" ]; }; - resolvedDefaultFeatures = [ "__common" "balance" "buffer" "discover" "futures-core" "futures-util" "indexmap" "limit" "load" "make" "pin-project" "pin-project-lite" "rand" "ready-cache" "slab" "timeout" "tokio" "tokio-util" "tracing" "util" ]; + resolvedDefaultFeatures = [ "__common" "balance" "buffer" "default" "discover" "futures-core" "futures-util" "indexmap" "limit" "load" "log" "make" "pin-project" "pin-project-lite" "rand" "ready-cache" "slab" "timeout" "tokio" "tokio-util" "tracing" "util" ]; }; "tower-layer" = rec { crateName = "tower-layer"; @@ -7217,6 +7217,11 @@ rec { packageId = "cfg-if"; } { + name = "log"; + packageId = "log"; + optional = true; + } + { name = "pin-project-lite"; packageId = "pin-project-lite"; } @@ -7231,6 +7236,12 @@ rec { usesDefaultFeatures = false; } ]; + devDependencies = [ + { + name = "log"; + packageId = "log"; + } + ]; features = { "attributes" = [ "tracing-attributes" ]; "default" = [ "std" "attributes" ]; @@ -7240,7 +7251,7 @@ rec { "tracing-attributes" = [ "dep:tracing-attributes" ]; "valuable" = [ "tracing-core/valuable" ]; }; - resolvedDefaultFeatures = [ "attributes" "default" "std" "tracing-attributes" ]; + resolvedDefaultFeatures = [ "attributes" "default" "log" "std" "tracing-attributes" ]; }; "tracing-attributes" = rec { crateName = "tracing-attributes"; @@ -7827,7 +7838,7 @@ rec { { name = "tokio"; packageId = "tokio"; - features = [ "rt-multi-thread" ]; + features = [ "rt-multi-thread" "net" ]; } { name = "tokio-stream"; @@ -7848,6 +7859,10 @@ rec { optional = true; } { + name = "tower"; + packageId = "tower"; + } + { name = "tracing"; packageId = "tracing"; } diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 569b78076ae1..81a3f9a9df32 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -18,12 +18,13 @@ sha2 = "0.10.6" sled = { version = "0.34.7", features = ["compression"] } thiserror = "1.0.38" tokio-stream = "0.1.11" -tokio = { version = "1.23.0", features = ["rt-multi-thread"] } +tokio = { version = "1.23.0", features = ["rt-multi-thread", "net"] } tonic = "0.8.2" tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["json"] } walkdir = "2.3.2" tokio-util = { version = "0.7.7", features = ["io", "io-util"] } +tower = "0.4.13" [dependencies.tonic-reflection] optional = true diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs new file mode 100644 index 000000000000..b036f16aca34 --- /dev/null +++ b/tvix/store/src/directoryservice/grpc.rs @@ -0,0 +1,180 @@ +use super::DirectoryService; +use crate::proto::{self, get_directory_request::ByWhat}; +use tonic::transport::Channel; +use tonic::Code; + +/// Connects to a (remote) tvix-store DirectoryService over gRPC. +#[derive(Clone)] +pub struct GRPCDirectoryService { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, +} + +impl GRPCDirectoryService { + /// Construct a new [GRPCDirectoryService], by passing a handle to the + /// tokio runtime, and a gRPC client. + pub fn new( + tokio_handle: tokio::runtime::Handle, + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle, + grpc_client, + } + } +} + +impl DirectoryService for GRPCDirectoryService { + fn get(&self, digest: &[u8; 32]) -> Result<Option<crate::proto::Directory>, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + let digest = digest.to_owned(); + + // TODO: do requests recursively, populate a backing other + // [DirectoryService] as cache, and ask it first. + let task = self.tokio_handle.spawn(async move { + let mut s = grpc_client + .get(proto::GetDirectoryRequest { + recursive: false, + by_what: Some(ByWhat::Digest(digest.to_vec())), + }) + .await? + .into_inner(); + + // Retrieve the first message only, then close the stream (we set recursive to false) + s.message().await + }); + + match self.tokio_handle.block_on(task)? { + Ok(resp) => Ok(resp), + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> { + let mut grpc_client = self.grpc_client.clone(); + + // TODO: this currently doesn't work for directories referring to other + // directories, as we're required to upload the whole closure all the + // time. + let task = self + .tokio_handle + .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await }); + + match self.tokio_handle.block_on(task)? { + Ok(put_directory_resp) => Ok(put_directory_resp + .into_inner() + .root_digest + .as_slice() + .try_into() + .unwrap()), // TODO: map error + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } +} + +#[cfg(test)] +mod tests { + use core::time; + use std::thread; + + use tempfile::TempDir; + use tokio::net::{UnixListener, UnixStream}; + use tokio_stream::wrappers::UnixListenerStream; + use tonic::transport::{Endpoint, Server, Uri}; + + use crate::{ + directoryservice::DirectoryService, + proto, + proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, + tests::{fixtures::DIRECTORY_A, utils::gen_directory_service}, + }; + + #[test] + fn test() -> anyhow::Result<()> { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("socket"); + + // Spin up a server, in a thread far away, which spawns its own tokio runtime, + // and blocks on the task. + let socket_path_clone = socket_path.clone(); + thread::spawn(move || { + // Create the runtime + let rt = tokio::runtime::Runtime::new().unwrap(); + // Get a handle from this runtime + let handle = rt.handle(); + + let task = handle.spawn(async { + let uds = UnixListener::bind(socket_path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(gen_directory_service()), + )); + router.serve_with_incoming(uds_stream).await + }); + + handle.block_on(task) + }); + + // set up the local client runtime. This is similar to what the [tokio:test] macro desugars to. + let tester_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // TODO: wait for the socket to be created + std::thread::sleep(time::Duration::from_millis(200)); + + let task = tester_runtime.spawn_blocking(move || { + // Create a channel, connecting to the uds at socket_path. + // The URI is unused. + let channel = Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { + UnixStream::connect(socket_path.clone()) + })); + + let grpc_client = proto::directory_service_client::DirectoryServiceClient::new(channel); + + // create the GrpcDirectoryService, using the tester_runtime. + let directory_service = + super::GRPCDirectoryService::new(tokio::runtime::Handle::current(), grpc_client); + + // try to get DIRECTORY_A should return Ok(None) + assert_eq!( + None, + directory_service + .get(&DIRECTORY_A.digest()) + .expect("must not fail") + ); + + // Now upload it + assert_eq!( + DIRECTORY_A.digest(), + directory_service + .put(DIRECTORY_A.clone()) + .expect("must succeed") + ); + + // And retrieve it. We don't compare the two structs literally + assert_eq!( + DIRECTORY_A.clone(), + directory_service + .get(&DIRECTORY_A.digest()) + .expect("must succeed") + .expect("must be some") + ) + }); + tester_runtime.block_on(task)?; + + Ok(()) + } +} diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index a27f6745332a..1fc8fbc4c219 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -1,7 +1,9 @@ use crate::{proto, Error}; +mod grpc; mod memory; mod sled; +pub use self::grpc::GRPCDirectoryService; pub use self::memory::MemoryDirectoryService; pub use self::sled::SledDirectoryService; |