about summary refs log tree commit diff
path: root/tvix/store/src/directoryservice/grpc.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-03-27T15·08+0200
committerclbot <clbot@tvl.fyi>2023-04-07T09·26+0000
commit0836450006e3ef3ec4f150696c164fef7eb701db (patch)
tree04306a0f9b43702177c89283e5d29da9e4c2f6c4 /tvix/store/src/directoryservice/grpc.rs
parent96d7f4f0acc82bc6b12b41ac8d5fbfbd54e41599 (diff)
feat(tvix/store/directorysvc): add put_multiple_start r/6073
This provides a handle to upload multiple proto::Directory as part of
the same closure.

Change-Id: I9213dde257a260c8622239918ea541064b270484
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8356
Tested-by: BuildkiteCI
Reviewed-by: tazjin <tazjin@tvl.su>
Autosubmit: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/directoryservice/grpc.rs')
-rw-r--r--tvix/store/src/directoryservice/grpc.rs211
1 files changed, 199 insertions, 12 deletions
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs
index e8ab854fd449..3c76d9e315b5 100644
--- a/tvix/store/src/directoryservice/grpc.rs
+++ b/tvix/store/src/directoryservice/grpc.rs
@@ -1,8 +1,11 @@
 use std::collections::HashSet;
 
-use super::DirectoryService;
+use super::{DirectoryPutter, DirectoryService};
 use crate::proto::{self, get_directory_request::ByWhat};
+use crate::Error;
 use data_encoding::BASE64;
+use tokio::sync::mpsc::UnboundedSender;
+use tokio_stream::wrappers::UnboundedReceiverStream;
 use tonic::{transport::Channel, Status};
 use tonic::{Code, Streaming};
 use tracing::{instrument, warn};
@@ -85,9 +88,6 @@ impl DirectoryService for GRPCDirectoryService {
     fn put(&self, directory: crate::proto::Directory) -> Result<[u8; 32], crate::Error> {
         let mut grpc_client = self.grpc_client.clone();
 
-        // TODO: this currently doesn't work for directories referring to other
-        // directories, as we're required to upload the whole closure all the
-        // time.
         let task = self
             .tokio_handle
             .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await });
@@ -98,7 +98,9 @@ impl DirectoryService for GRPCDirectoryService {
                 .root_digest
                 .as_slice()
                 .try_into()
-                .unwrap()), // TODO: map error
+                .map_err(|_| {
+                    Error::StorageError("invalid root digest length in response".to_string())
+                })?),
             Err(e) => Err(crate::Error::StorageError(e.to_string())),
         }
     }
@@ -125,6 +127,30 @@ impl DirectoryService for GRPCDirectoryService {
 
         StreamIterator::new(self.tokio_handle.clone(), &root_directory_digest, stream)
     }
+
+    type DirectoryPutter = GRPCPutter;
+
+    #[instrument(skip_all)]
+    fn put_multiple_start(&self) -> Self::DirectoryPutter
+    where
+        Self: Clone,
+    {
+        let mut grpc_client = self.grpc_client.clone();
+
+        let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
+
+        let task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>> =
+            self.tokio_handle.spawn(async move {
+                let s = grpc_client
+                    .put(UnboundedReceiverStream::new(rx))
+                    .await?
+                    .into_inner();
+
+                Ok(s)
+            });
+
+        GRPCPutter::new(self.tokio_handle.clone(), tx, task)
+    }
 }
 
 pub struct StreamIterator {
@@ -184,9 +210,10 @@ impl Iterator for StreamIterator {
                     self.received_directory_digests.insert(directory_digest);
 
                     // register all children in expected_directory_digests.
-                    for child_directories in &directory.directories {
+                    // We ran validate() above, so we know these digests must be correct.
+                    for child_directory in &directory.directories {
                         self.expected_directory_digests
-                            .insert(child_directories.digest.clone().try_into().unwrap());
+                            .insert(child_directory.digest.clone().try_into().unwrap());
                     }
 
                     Some(Ok(directory))
@@ -208,6 +235,88 @@ impl Iterator for StreamIterator {
     }
 }
 
+/// Allows uploading multiple Directory messages in the same gRPC stream.
+pub struct GRPCPutter {
+    /// A handle into the active tokio runtime. Necessary to spawn tasks.
+    tokio_handle: tokio::runtime::Handle,
+
+    /// Data about the current request - a handle to the task, and the tx part
+    /// of the channel.
+    /// The tx part of the pipe is used to send [proto::Directory] to the ongoing request.
+    /// The task will yield a [proto::PutDirectoryResponse] once the stream is closed.
+    #[allow(clippy::type_complexity)] // lol
+    rq: Option<(
+        tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
+        UnboundedSender<proto::Directory>,
+    )>,
+}
+
+impl GRPCPutter {
+    pub fn new(
+        tokio_handle: tokio::runtime::Handle,
+        directory_sender: UnboundedSender<proto::Directory>,
+        task: tokio::task::JoinHandle<Result<proto::PutDirectoryResponse, Status>>,
+    ) -> Self {
+        Self {
+            tokio_handle,
+            rq: Some((task, directory_sender)),
+        }
+    }
+
+    #[allow(dead_code)]
+    // allows checking if the tx part of the channel is closed.
+    fn is_closed(&self) -> bool {
+        match self.rq {
+            None => true,
+            Some((_, ref directory_sender)) => directory_sender.is_closed(),
+        }
+    }
+}
+
+impl DirectoryPutter for GRPCPutter {
+    fn put(&mut self, directory: proto::Directory) -> Result<(), crate::Error> {
+        match self.rq {
+            // If we're not already closed, send the directory to directory_sender.
+            Some((_, ref directory_sender)) => {
+                if directory_sender.send(directory).is_err() {
+                    // If the channel has been prematurely closed, invoke close (so we can peek at the error code)
+                    // That error code is much more helpful, because it
+                    // contains the error message from the server.
+                    self.close()?;
+                }
+                Ok(())
+            }
+            // If self.close() was already called, we can't put again.
+            None => Err(Error::StorageError(
+                "DirectoryPutter already closed".to_string(),
+            )),
+        }
+    }
+
+    /// Closes the stream for sending, and returns the value
+    fn close(&mut self) -> Result<[u8; 32], crate::Error> {
+        // get self.rq, and replace it with None.
+        // This ensures we can only close it once.
+        match std::mem::take(&mut self.rq) {
+            None => Err(Error::StorageError("already closed".to_string())),
+            Some((task, directory_sender)) => {
+                // close directory_sender, so blocking on task will finish.
+                drop(directory_sender);
+
+                Ok(self
+                    .tokio_handle
+                    .block_on(task)?
+                    .map_err(|e| Error::StorageError(e.to_string()))?
+                    .root_digest
+                    .try_into()
+                    .map_err(|_| {
+                        Error::StorageError("invalid root digest length in response".to_string())
+                    })?)
+            }
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use core::time;
@@ -219,10 +328,13 @@ mod tests {
     use tonic::transport::{Endpoint, Server, Uri};
 
     use crate::{
-        directoryservice::DirectoryService,
+        directoryservice::{DirectoryPutter, DirectoryService},
         proto,
         proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper},
-        tests::{fixtures::DIRECTORY_A, utils::gen_directory_service},
+        tests::{
+            fixtures::{DIRECTORY_A, DIRECTORY_B},
+            utils::gen_directory_service,
+        },
     };
 
     #[test]
@@ -260,8 +372,22 @@ mod tests {
             .build()
             .unwrap();
 
-        // TODO: wait for the socket to be created
-        std::thread::sleep(time::Duration::from_millis(200));
+        // wait for the socket to be created
+        {
+            let mut socket_created = false;
+            for _try in 1..20 {
+                if socket_path.exists() {
+                    socket_created = true;
+                    break;
+                }
+                std::thread::sleep(time::Duration::from_millis(20))
+            }
+
+            assert!(
+                socket_created,
+                "expected socket path to eventually get created, but never happened"
+            );
+        }
 
         let task = tester_runtime.spawn_blocking(move || {
             // Create a channel, connecting to the uds at socket_path.
@@ -301,8 +427,69 @@ mod tests {
                     .get(&DIRECTORY_A.digest())
                     .expect("must succeed")
                     .expect("must be some")
-            )
+            );
+
+            // Putting DIRECTORY_B alone should fail, because it refers to DIRECTORY_A.
+            directory_service
+                .put(DIRECTORY_B.clone())
+                .expect_err("must fail");
+
+            // Uploading A and then B should succeed, and closing should return the digest of B.
+            let mut handle = directory_service.put_multiple_start();
+            handle.put(DIRECTORY_A.clone()).expect("must succeed");
+            handle.put(DIRECTORY_B.clone()).expect("must succeed");
+            let digest = handle.close().expect("must succeed");
+            assert_eq!(DIRECTORY_B.digest(), digest);
+
+            // Now try to retrieve the closure of DIRECTORY_B, which should return B and then A.
+            let mut directories_it = directory_service.get_recursive(&DIRECTORY_B.digest());
+            assert_eq!(
+                DIRECTORY_B.clone(),
+                directories_it
+                    .next()
+                    .expect("must be some")
+                    .expect("must succeed")
+            );
+            assert_eq!(
+                DIRECTORY_A.clone(),
+                directories_it
+                    .next()
+                    .expect("must be some")
+                    .expect("must succeed")
+            );
+
+            // Uploading B and then A should fail during close (if we're a
+            // fast client)
+            let mut handle = directory_service.put_multiple_start();
+            handle.put(DIRECTORY_B.clone()).expect("must succeed");
+            handle.put(DIRECTORY_A.clone()).expect("must succeed");
+            handle.close().expect_err("must fail");
+
+            // Below test is a bit timing sensitive. We send B (which refers to
+            // A, so should fail), and wait sufficiently enough for the server
+            // to close us the stream,
+            // and then assert that uploading anything else via the handle will fail.
+
+            let mut handle = directory_service.put_multiple_start();
+            handle.put(DIRECTORY_B.clone()).expect("must succeed");
+
+            let mut is_closed = false;
+            for _try in 1..20 {
+                if handle.is_closed() {
+                    is_closed = true;
+                    break;
+                }
+                std::thread::sleep(time::Duration::from_millis(200))
+            }
+
+            assert!(
+                is_closed,
+                "expected channel to eventually close, but never happened"
+            );
+
+            handle.put(DIRECTORY_A.clone()).expect_err("must fail");
         });
+
         tester_runtime.block_on(task)?;
 
         Ok(())