From 923a5737e61da020e6d7672c3aebc00db9f44850 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Tue, 12 Dec 2023 21:25:50 +0200 Subject: refactor(tvix/castore): drop is_closed() from impl DirectoryPutter This is only used in the gRPC version (GRPCPutter), during the test automation. So define it as a method there, behind #[cfg(test)], and remove from the trait. Change-Id: Idf170884e3a10be0e96c75d946d9c431171e5e88 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10340 Tested-by: BuildkiteCI Autosubmit: flokli Reviewed-by: raitobezarius --- tvix/castore/src/directoryservice/grpc.rs | 42 +++++++++++++++++++++++------- tvix/castore/src/directoryservice/mod.rs | 4 --- tvix/castore/src/directoryservice/utils.rs | 7 +---- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/tvix/castore/src/directoryservice/grpc.rs b/tvix/castore/src/directoryservice/grpc.rs index 3f3796951f..1d6ad2c13b 100644 --- a/tvix/castore/src/directoryservice/grpc.rs +++ b/tvix/castore/src/directoryservice/grpc.rs @@ -228,6 +228,16 @@ impl GRPCPutter { rq: Some((task, directory_sender)), } } + + // allows checking if the tx part of the channel is closed. + // only used in the test case. + #[cfg(test)] + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } } #[async_trait] @@ -272,28 +282,23 @@ impl DirectoryPutter for GRPCPutter { } } } - - // allows checking if the tx part of the channel is closed. - fn is_closed(&self) -> bool { - match self.rq { - None => true, - Some((_, ref directory_sender)) => directory_sender.is_closed(), - } - } } #[cfg(test)] mod tests { use core::time; use futures::StreamExt; - use std::{sync::Arc, time::Duration}; + use std::{any::Any, sync::Arc, time::Duration}; use tempfile::TempDir; use tokio::net::UnixListener; use tokio_retry::{strategy::ExponentialBackoff, Retry}; use tokio_stream::wrappers::UnixListenerStream; use crate::{ - directoryservice::{DirectoryService, GRPCDirectoryService, MemoryDirectoryService}, + directoryservice::{ + grpc::GRPCPutter, DirectoryPutter, DirectoryService, GRPCDirectoryService, + MemoryDirectoryService, + }, fixtures::{self, DIRECTORY_A, DIRECTORY_B}, proto::{directory_service_client::DirectoryServiceClient, GRPCDirectoryServiceWrapper}, utils::gen_directorysvc_grpc_client, @@ -400,6 +405,23 @@ mod tests { let mut handle = directory_service.put_multiple_start(); handle.put(DIRECTORY_B.clone()).await.expect("must succeed"); + // get a GRPCPutter, so we can peek at [is_closed]. + let handle_any = &mut handle as &mut dyn Any; + + // `unchecked_downcast_mut` is unstable for now, + // https://github.com/rust-lang/rust/issues/90850 + // We do the same thing here. + // The reason for why we cannot use the checked downcast lies + // in the fact that: + // - GRPCPutter has type ID A + // - Box has type ID B + // - "Box" (invalid type) has type ID C + // B seems different from C in this context. + // We cannot unpack and perform upcast coercion of the traits as it's an unstable + // feature. + // We cannot add `as_any` in `DirectoryPutter` as that would defeat the whole purpose + // of not making leak `is_closed` in the original trait. + let handle = unsafe { &mut *(handle_any as *mut dyn Any as *mut Box) }; let mut is_closed = false; for _try in 1..1000 { if handle.is_closed() { diff --git a/tvix/castore/src/directoryservice/mod.rs b/tvix/castore/src/directoryservice/mod.rs index 508c9a0be3..d6350d48c0 100644 --- a/tvix/castore/src/directoryservice/mod.rs +++ b/tvix/castore/src/directoryservice/mod.rs @@ -73,8 +73,4 @@ pub trait DirectoryPutter: Send { /// If there's been any invalid Directory message uploaded, and error *must* /// be returned. async fn close(&mut self) -> Result; - - /// Return whether the stream is closed or not. - /// Used from some [DirectoryService] implementations only. - fn is_closed(&self) -> bool; } diff --git a/tvix/castore/src/directoryservice/utils.rs b/tvix/castore/src/directoryservice/utils.rs index 4c5e7cfde3..ad9ce25353 100644 --- a/tvix/castore/src/directoryservice/utils.rs +++ b/tvix/castore/src/directoryservice/utils.rs @@ -103,7 +103,7 @@ impl SimplePutter { } #[async_trait] -impl DirectoryPutter for SimplePutter { +impl DirectoryPutter for SimplePutter { async fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { if self.closed { return Err(Error::StorageError("already closed".to_string())); @@ -117,7 +117,6 @@ impl DirectoryPutter for SimplePutter { Ok(()) } - /// We need to be mutable here, as that's the signature of the trait. async fn close(&mut self) -> Result { if self.closed { return Err(Error::StorageError("already closed".to_string())); @@ -133,8 +132,4 @@ impl DirectoryPutter for SimplePutter { )), } } - - fn is_closed(&self) -> bool { - self.closed - } } -- cgit 1.4.1