about summary refs log tree commit diff
path: root/tvix
diff options
context:
space:
mode:
Diffstat (limited to 'tvix')
-rw-r--r--tvix/castore/src/directoryservice/grpc.rs42
-rw-r--r--tvix/castore/src/directoryservice/mod.rs4
-rw-r--r--tvix/castore/src/directoryservice/utils.rs7
3 files changed, 33 insertions, 20 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs
index 3f3796951f75..1d6ad2c13b86 100644
--- a/tvix/castore/src/directoryservice/grpc.rs
+++ b/tvix/castore/src/directoryservice/grpc.rs
@@ -228,6 +228,16 @@ impl GRPCPutter {
             rq: Some((task, directory_sender)),
         }
     }
+
+    // allows checking if the tx part of the channel is closed.
+    // only used in the test case.
+    #[cfg(test)]
+    fn is_closed(&self) -> bool {
+        match self.rq {
+            None => true,
+            Some((_, ref directory_sender)) => directory_sender.is_closed(),
+        }
+    }
 }
 
 #[async_trait]
@@ -272,28 +282,23 @@ impl DirectoryPutter for GRPCPutter {
             }
         }
     }
-
-    // allows checking if the tx part of the channel is closed.
-    fn is_closed(&self) -> bool {
-        match self.rq {
-            None => true,
-            Some((_, ref directory_sender)) => directory_sender.is_closed(),
-        }
-    }
 }
 
 #[cfg(test)]
 mod tests {
     use core::time;
     use futures::StreamExt;
-    use std::{sync::Arc, time::Duration};
+    use std::{any::Any, sync::Arc, time::Duration};
     use tempfile::TempDir;
     use tokio::net::UnixListener;
     use tokio_retry::{strategy::ExponentialBackoff, Retry};
     use tokio_stream::wrappers::UnixListenerStream;
 
     use crate::{
-        directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService},
+        directoryservice::{
+            grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService,
+            MemoryDirectoryService,
+        },
         fixtures::{self, DIRECTORY_A, DIRECTORY_B},
         proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper},
         utils::gen_directorysvc_grpc_client,
@@ -400,6 +405,23 @@ mod tests {
             let mut handle = directory_service.put_multiple_start();
             handle.put(DIRECTORY_B.clone()).await.expect("must succeed");
 
+            // get a GRPCPutter, so we can peek at [is_closed].
+            let handle_any = &mut handle as &mut dyn Any;
+
+            // `unchecked_downcast_mut` is unstable for now,
+            // https://github.com/rust-lang/rust/issues/90850
+            // We do the same thing here.
+            // The reason for why we cannot use the checked downcast lies
+            // in the fact that:
+            // - GRPCPutter has type ID A
+            // - Box<GRPCPutter> has type ID B
+            // - "Box<dyn GRPCPutter>" (invalid type) has type ID C
+            // B seems different from C in this context.
+            // We cannot unpack and perform upcast coercion of the traits as it's an unstable
+            // feature.
+            // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose
+            // of not making leak `is_closed` in the original trait.
+            let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box<GRPCPutter>) };
             let mut is_closed = false;
             for _try in 1..1000 {
                 if handle.is_closed() {
diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs
index 508c9a0be320..d6350d48c09e 100644
--- a/tvix/castore/src/directoryservice/mod.rs
+++ b/tvix/castore/src/directoryservice/mod.rs
@@ -73,8 +73,4 @@ pub trait DirectoryPutter: Send {
     /// If there's been any invalid Directory message uploaded, and error *must*
     /// be returned.
     async fn close(&mut self) -> Result<B3Digest, Error>;
-
-    /// Return whether the stream is closed or not.
-    /// Used from some [DirectoryService] implementations only.
-    fn is_closed(&self) -> bool;
 }
diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs
index 4c5e7cfde37c..ad9ce2535366 100644
--- a/tvix/castore/src/directoryservice/utils.rs
+++ b/tvix/castore/src/directoryservice/utils.rs
@@ -103,7 +103,7 @@ impl<DS: DirectoryService> SimplePutter<DS> {
 }
 
 #[async_trait]
-impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
+impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> {
     async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> {
         if self.closed {
             return Err(Error::StorageError("already closed".to_string()));
@@ -117,7 +117,6 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
         Ok(())
     }
 
-    /// We need to be mutable here, as that's the signature of the trait.
     async fn close(&mut self) -> Result<B3Digest, Error> {
         if self.closed {
             return Err(Error::StorageError("already closed".to_string()));
@@ -133,8 +132,4 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> {
             )),
         }
     }
-
-    fn is_closed(&self) -> bool {
-        self.closed
-    }
 }