diff options
Diffstat (limited to 'tvix/castore/src/directoryservice')
-rw-r--r-- | tvix/castore/src/directoryservice/grpc.rs | 42 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/mod.rs | 4 | ||||
-rw-r--r-- | tvix/castore/src/directoryservice/utils.rs | 7 |
3 files changed, 33 insertions, 20 deletions
diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 3f3796951f75..1d6ad2c13b86 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -228,6 +228,16 @@ impl GRPCPutter { rq: Some((task, directory_sender)), } } + + // allows checking if the tx part of the channel is closed. + // only used in the test case. + #[cfg(test)] + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } } #[async_trait] @@ -272,28 +282,23 @@ impl DirectoryPutter for GRPCPutter { } } } - - // allows checking if the tx part of the channel is closed. - fn is_closed(&self) -> bool { - match self.rq { - None => true, - Some((_, ref directory_sender)) => directory_sender.is_closed(), - } - } } #[cfg(test)] mod tests { use core::time; use futures::StreamExt; - use std::{sync::Arc, time::Duration}; + use std::{any::Any, sync::Arc, time::Duration}; use tempfile::TempDir; use tokio::net::UnixListener; use tokio_retry::{strategy::ExponentialBackoff, Retry}; use tokio_stream::wrappers::UnixListenerStream; use crate::{ - directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService}, + directoryservice::{ + grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService, + MemoryDirectoryService, + }, fixtures::{self, DIRECTORY_A, DIRECTORY_B}, proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper}, utils::gen_directorysvc_grpc_client, @@ -400,6 +405,23 @@ mod tests { let mut handle = directory_service.put_multiple_start(); handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + // get a GRPCPutter, so we can peek at [is_closed]. + let handle_any = &mut handle as &mut dyn Any; + + // `unchecked_downcast_mut` is unstable for now, + // https://github.com/rust-lang/rust/issues/90850 + // We do the same thing here. + // The reason for why we cannot use the checked downcast lies + // in the fact that: + // - GRPCPutter has type ID A + // - Box<GRPCPutter> has type ID B + // - "Box<dyn GRPCPutter>" (invalid type) has type ID C + // B seems different from C in this context. + // We cannot unpack and perform upcast coercion of the traits as it's an unstable + // feature. + // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose + // of not making leak `is_closed` in the original trait. + let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box<GRPCPutter>) }; let mut is_closed = false; for _try in 1..1000 { if handle.is_closed() { diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index 508c9a0be320..d6350d48c09e 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -73,8 +73,4 @@ pub trait DirectoryPutter: Send { /// If there's been any invalid Directory message uploaded, and error *must* /// be returned. async fn close(&mut self) -> Result<B3Digest, Error>; - - /// Return whether the stream is closed or not. - /// Used from some [DirectoryService] implementations only. - fn is_closed(&self) -> bool; } diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index 4c5e7cfde37c..ad9ce2535366 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -103,7 +103,7 @@ impl<DS: DirectoryService> SimplePutter<DS> { } #[async_trait] -impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { +impl<DS: DirectoryService + 'static> DirectoryPutter for SimplePutter<DS> { async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { if self.closed { return Err(Error::StorageError("already closed".to_string())); @@ -117,7 +117,6 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { Ok(()) } - /// We need to be mutable here, as that's the signature of the trait. async fn close(&mut self) -> Result<B3Digest, Error> { if self.closed { return Err(Error::StorageError("already closed".to_string())); @@ -133,8 +132,4 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { )), } } - - fn is_closed(&self) -> bool { - self.closed - } } |