diff options
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 180 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 2 |
3 files changed, 184 insertions, 1 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index 569b78076ae1..81a3f9a9df32 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -18,12 +18,13 @@ sha2 = "0.10.6" sled = { version = "0.34.7", features = ["compression"] } thiserror = "1.0.38" tokio-stream = "0.1.11" -tokio = { version = "1.23.0", features = ["rt-multi-thread"] } +tokio = { version = "1.23.0", features = ["rt-multi-thread", "net"] } tonic = "0.8.2" tracing = "0.1.37" tracing-subscriber = { version = "0.3.16", features = ["json"] } walkdir = "2.3.2" tokio-util = { version = "0.7.7", features = ["io", "io-util"] } +tower = "0.4.13" [dependencies.tonic-reflection] optional = true diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs new file mode 100644 index 000000000000..b036f16aca34 --- /dev/null +++ b/tvix/store/src/directoryservice/grpc.rs @@ -0,0 +1,180 @@ +use super::DirectoryService; +use crate::proto::{self, get_directory_request::ByWhat}; +use tonic::transport::Channel; +use tonic::Code; + +/// Connects to a (remote) tvix-store DirectoryService over gRPC. +#[derive(Clone)] +pub struct GRPCDirectoryService { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, +} + +impl GRPCDirectoryService { + /// Construct a new [GRPCDirectoryService], by passing a handle to the + /// tokio runtime, and a gRPC client. + pub fn new( + tokio_handle: tokio::runtime::Handle, + grpc_client: proto::directory_service_client::DirectoryServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle, + grpc_client, + } + } +} + +impl DirectoryService for GRPCDirectoryService { + fn get(&self, digest: &[u8; 32]) -> Result<Option<crate::proto::Directory>, crate::Error> { + // Get a new handle to the gRPC client, and copy the digest. + let mut grpc_client = self.grpc_client.clone(); + let digest = digest.to_owned(); + + // TODO: do requests recursively, populate a backing other + // [DirectoryService] as cache, and ask it first. + let task = self.tokio_handle.spawn(async move { + let mut s = grpc_client + .get(proto::GetDirectoryRequest { + recursive: false, + by_what: Some(ByWhat::Digest(digest.to_vec())), + }) + .await? + .into_inner(); + + // Retrieve the first message only, then close the stream (we set recursive to false) + s.message().await + }); + + match self.tokio_handle.block_on(task)? { + Ok(resp) => Ok(resp), + Err(e) if e.code() == Code::NotFound => Ok(None), + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } + + fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> { + let mut grpc_client = self.grpc_client.clone(); + + // TODO: this currently doesn't work for directories referring to other + // directories, as we're required to upload the whole closure all the + // time. + let task = self + .tokio_handle + .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await }); + + match self.tokio_handle.block_on(task)? { + Ok(put_directory_resp) => Ok(put_directory_resp + .into_inner() + .root_digest + .as_slice() + .try_into() + .unwrap()), // TODO: map error + Err(e) => Err(crate::Error::StorageError(e.to_string())), + } + } +} + +#[cfg(test)] +mod tests { + use core::time; + use std::thread; + + use tempfile::TempDir; + use tokio::net::{UnixListener, UnixStream}; + use tokio_stream::wrappers::UnixListenerStream; + use tonic::transport::{Endpoint, Server, Uri}; + + use crate::{ + directoryservice::DirectoryService, + proto, + proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, + tests::{fixtures::DIRECTORY_A, utils::gen_directory_service}, + }; + + #[test] + fn test() -> anyhow::Result<()> { + let tmpdir = TempDir::new().unwrap(); + let socket_path = tmpdir.path().join("socket"); + + // Spin up a server, in a thread far away, which spawns its own tokio runtime, + // and blocks on the task. + let socket_path_clone = socket_path.clone(); + thread::spawn(move || { + // Create the runtime + let rt = tokio::runtime::Runtime::new().unwrap(); + // Get a handle from this runtime + let handle = rt.handle(); + + let task = handle.spawn(async { + let uds = UnixListener::bind(socket_path_clone).unwrap(); + let uds_stream = UnixListenerStream::new(uds); + + // spin up a new DirectoryService + let mut server = Server::builder(); + let router = server.add_service(DirectoryServiceServer::new( + GRPCDirectoryServiceWrapper::from(gen_directory_service()), + )); + router.serve_with_incoming(uds_stream).await + }); + + handle.block_on(task) + }); + + // set up the local client runtime. This is similar to what the [tokio:test] macro desugars to. + let tester_runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + // TODO: wait for the socket to be created + std::thread::sleep(time::Duration::from_millis(200)); + + let task = tester_runtime.spawn_blocking(move || { + // Create a channel, connecting to the uds at socket_path. + // The URI is unused. + let channel = Endpoint::try_from("http://[::]:50051") + .unwrap() + .connect_with_connector_lazy(tower::service_fn(move |_: Uri| { + UnixStream::connect(socket_path.clone()) + })); + + let grpc_client = proto::directory_service_client::DirectoryServiceClient::new(channel); + + // create the GrpcDirectoryService, using the tester_runtime. + let directory_service = + super::GRPCDirectoryService::new(tokio::runtime::Handle::current(), grpc_client); + + // try to get DIRECTORY_A should return Ok(None) + assert_eq!( + None, + directory_service + .get(&DIRECTORY_A.digest()) + .expect("must not fail") + ); + + // Now upload it + assert_eq!( + DIRECTORY_A.digest(), + directory_service + .put(DIRECTORY_A.clone()) + .expect("must succeed") + ); + + // And retrieve it. We don't compare the two structs literally + assert_eq!( + DIRECTORY_A.clone(), + directory_service + .get(&DIRECTORY_A.digest()) + .expect("must succeed") + .expect("must be some") + ) + }); + tester_runtime.block_on(task)?; + + Ok(()) + } +} diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index a27f6745332a..1fc8fbc4c219 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -1,7 +1,9 @@ use crate::{proto, Error}; +mod grpc; mod memory; mod sled; +pub use self::grpc::GRPCDirectoryService; pub use self::memory::MemoryDirectoryService; pub use self::sled::SledDirectoryService; |