diff options
author | Florian Klink <flokli@flokli.de> | 2023-03-27T15·08+0200 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-04-07T09·26+0000 |
commit | 0836450006e3ef3ec4f150696c164fef7eb701db (patch) | |
tree | 04306a0f9b43702177c89283e5d29da9e4c2f6c4 /tvix/store/src/directoryservice/grpc.rs | |
parent | 96d7f4f0acc82bc6b12b41ac8d5fbfbd54e41599 (diff) |
feat(tvix/store/directorysvc): add put_multiple_start r/6073
This provides a handle to upload multiple proto::Directory as part of the same closure. Change-Id: I9213dde257a260c8622239918ea541064b270484 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8356 Tested-by: BuildkiteCI Reviewed-by: tazjin <tazjin@tvl.su> Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/directoryservice/grpc.rs')
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 211 |
1 files changed, 199 insertions, 12 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index e8ab854fd449..3c76d9e315b5 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -1,8 +1,11 @@ use std::collections::HashSet; -use super::DirectoryService; +use super::{DirectoryPutter, DirectoryService}; use crate::proto::{self, get_directory_request::ByWhat}; +use crate::Error; use data_encoding::BASE64; +use tokio::sync::mpsc::UnboundedSender; +use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::{transport::Channel, Status}; use tonic::{Code, Streaming}; use tracing::{instrument, warn}; @@ -85,9 +88,6 @@ impl DirectoryService for GRPCDirectoryService { fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> { let mut grpc_client = self.grpc_client.clone(); - // TODO: this currently doesn't work for directories referring to other - // directories, as we're required to upload the whole closure all the - // time. let task = self .tokio_handle .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await }); @@ -98,7 +98,9 @@ impl DirectoryService for GRPCDirectoryService { .root_digest .as_slice() .try_into() - .unwrap()), // TODO: map error + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?), Err(e) => Err(crate::Error::StorageError(e.to_string())), } } @@ -125,6 +127,30 @@ impl DirectoryService for GRPCDirectoryService { StreamIterator::new(self.tokio_handle.clone(), &root_directory_digest, stream) } + + type DirectoryPutter = GRPCPutter; + + #[instrument(skip_all)] + fn put_multiple_start(&self) -> Self::DirectoryPutter + where + Self: Clone, + { + let mut grpc_client = self.grpc_client.clone(); + + let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); + + let task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>> = + self.tokio_handle.spawn(async move { + let s = grpc_client + .put(UnboundedReceiverStream::new(rx)) + .await? + .into_inner(); + + Ok(s) + }); + + GRPCPutter::new(self.tokio_handle.clone(), tx, task) + } } pub struct StreamIterator { @@ -184,9 +210,10 @@ impl Iterator for StreamIterator { self.received_directory_digests.insert(directory_digest); // register all children in expected_directory_digests. - for child_directories in &directory.directories { + // We ran validate() above, so we know these digests must be correct. + for child_directory in &directory.directories { self.expected_directory_digests - .insert(child_directories.digest.clone().try_into().unwrap()); + .insert(child_directory.digest.clone().try_into().unwrap()); } Some(Ok(directory)) @@ -208,6 +235,88 @@ impl Iterator for StreamIterator { } } +/// Allows uploading multiple Directory messages in the same gRPC stream. +pub struct GRPCPutter { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// Data about the current request - a handle to the task, and the tx part + /// of the channel. + /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request. + /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed. + #[allow(clippy::type_complexity)] // lol + rq: Option<( + tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + UnboundedSender<proto::Directory>, + )>, +} + +impl GRPCPutter { + pub fn new( + tokio_handle: tokio::runtime::Handle, + directory_sender: UnboundedSender<proto::Directory>, + task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>, + ) -> Self { + Self { + tokio_handle, + rq: Some((task, directory_sender)), + } + } + + #[allow(dead_code)] + // allows checking if the tx part of the channel is closed. + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } +} + +impl DirectoryPutter for GRPCPutter { + fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> { + match self.rq { + // If we're not already closed, send the directory to directory_sender. + Some((_, ref directory_sender)) => { + if directory_sender.send(directory).is_err() { + // If the channel has been prematurely closed, invoke close (so we can peek at the error code) + // That error code is much more helpful, because it + // contains the error message from the server. + self.close()?; + } + Ok(()) + } + // If self.close() was already called, we can't put again. + None => Err(Error::StorageError( + "DirectoryPutter already closed".to_string(), + )), + } + } + + /// Closes the stream for sending, and returns the value + fn close(&mut self) -> Result<[u8; 32], crate::Error> { + // get self.rq, and replace it with None. + // This ensures we can only close it once. + match std::mem::take(&mut self.rq) { + None => Err(Error::StorageError("already closed".to_string())), + Some((task, directory_sender)) => { + // close directory_sender, so blocking on task will finish. + drop(directory_sender); + + Ok(self + .tokio_handle + .block_on(task)? + .map_err(|e| Error::StorageError(e.to_string()))? + .root_digest + .try_into() + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?) + } + } + } +} + #[cfg(test)] mod tests { use core::time; @@ -219,10 +328,13 @@ mod tests { use tonic::transport::{Endpoint, Server, Uri}; use crate::{ - directoryservice::DirectoryService, + directoryservice::{DirectoryPutter, DirectoryService}, proto, proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, - tests::{fixtures::DIRECTORY_A, utils::gen_directory_service}, + tests::{ + fixtures::{DIRECTORY_A, DIRECTORY_B}, + utils::gen_directory_service, + }, }; #[test] @@ -260,8 +372,22 @@ mod tests { .build() .unwrap(); - // TODO: wait for the socket to be created - std::thread::sleep(time::Duration::from_millis(200)); + // wait for the socket to be created + { + let mut socket_created = false; + for _try in 1..20 { + if socket_path.exists() { + socket_created = true; + break; + } + std::thread::sleep(time::Duration::from_millis(20)) + } + + assert!( + socket_created, + "expected socket path to eventually get created, but never happened" + ); + } let task = tester_runtime.spawn_blocking(move || { // Create a channel, connecting to the uds at socket_path. @@ -301,8 +427,69 @@ mod tests { .get(&DIRECTORY_A.digest()) .expect("must succeed") .expect("must be some") - ) + ); + + // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A. + directory_service + .put(DIRECTORY_B.clone()) + .expect_err("must fail"); + + // Uploading A and then B should succeed, and closing should return the digest of B. + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_A.clone()).expect("must succeed"); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + let digest = handle.close().expect("must succeed"); + assert_eq!(DIRECTORY_B.digest(), digest); + + // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A. + let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest()); + assert_eq!( + DIRECTORY_B.clone(), + directories_it + .next() + .expect("must be some") + .expect("must succeed") + ); + assert_eq!( + DIRECTORY_A.clone(), + directories_it + .next() + .expect("must be some") + .expect("must succeed") + ); + + // Uploading B and then A should fail during close (if we're a + // fast client) + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + handle.put(DIRECTORY_A.clone()).expect("must succeed"); + handle.close().expect_err("must fail"); + + // Below test is a bit timing sensitive. We send B (which refers to + // A, so should fail), and wait sufficiently enough for the server + // to close us the stream, + // and then assert that uploading anything else via the handle will fail. + + let mut handle = directory_service.put_multiple_start(); + handle.put(DIRECTORY_B.clone()).expect("must succeed"); + + let mut is_closed = false; + for _try in 1..20 { + if handle.is_closed() { + is_closed = true; + break; + } + std::thread::sleep(time::Duration::from_millis(200)) + } + + assert!( + is_closed, + "expected channel to eventually close, but never happened" + ); + + handle.put(DIRECTORY_A.clone()).expect_err("must fail"); }); + tester_runtime.block_on(task)?; Ok(()) |