From 7725eb53ad67730e92a3839a6c10925c668e5586 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 9 Jun 2023 12:26:34 +0300 Subject: refactor(tvix/store): use Box Once we support configuring services at runtime, we don't know what DirectoryService we're using at compile time. This also means, we can't explicitly use the is_closed method from GRPCPutter, without making it part of the DirectoryPutter itself. Change-Id: Icd2a1ec4fc5649a6cd15c9cc7db4c2b473630431 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8727 Autosubmit: flokli Reviewed-by: tazjin Tested-by: BuildkiteCI --- tvix/store/src/directoryservice/grpc.rs | 36 ++++++++++++++--------------- tvix/store/src/directoryservice/memory.rs | 20 ++++++++-------- tvix/store/src/directoryservice/mod.rs | 16 ++++++++----- tvix/store/src/directoryservice/sled.rs | 20 +++++++++------- tvix/store/src/directoryservice/traverse.rs | 14 ++++------- tvix/store/src/directoryservice/utils.rs | 19 ++++++++++++++- 6 files changed, 73 insertions(+), 52 deletions(-) (limited to 'tvix/store/src/directoryservice') diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index 1b33572cf7de..3b1a7906f7d0 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -46,8 +46,6 @@ impl GRPCDirectoryService { } impl DirectoryService for GRPCDirectoryService { - type DirectoriesIterator = StreamIterator; - fn get(&self, digest: &B3Digest) -> Result, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); @@ -113,7 +111,10 @@ impl DirectoryService for GRPCDirectoryService { } #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box> + Send> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest_as_vec = root_directory_digest.to_vec(); @@ -132,17 +133,15 @@ impl DirectoryService for GRPCDirectoryService { let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); - StreamIterator::new( + Box::new(StreamIterator::new( self.tokio_handle.clone(), root_directory_digest.clone(), stream, - ) + )) } - type DirectoryPutter = GRPCPutter; - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Self::DirectoryPutter + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, { @@ -160,7 +159,7 @@ impl DirectoryService for GRPCDirectoryService { Ok(s) }); - GRPCPutter::new(self.tokio_handle.clone(), tx, task) + Box::new(GRPCPutter::new(self.tokio_handle.clone(), tx, task)) } } @@ -276,15 +275,6 @@ impl GRPCPutter { rq: Some((task, directory_sender)), } } - - #[allow(dead_code)] - // allows checking if the tx part of the channel is closed. - fn is_closed(&self) -> bool { - match self.rq { - None => true, - Some((_, ref directory_sender)) => directory_sender.is_closed(), - } - } } impl DirectoryPutter for GRPCPutter { @@ -329,6 +319,14 @@ impl DirectoryPutter for GRPCPutter { } } } + + // allows checking if the tx part of the channel is closed. + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } } #[cfg(test)] @@ -342,7 +340,7 @@ mod tests { use tonic::transport::{Endpoint, Server, Uri}; use crate::{ - directoryservice::{DirectoryPutter, DirectoryService}, + directoryservice::DirectoryService, proto, proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, tests::{ diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs index 1fd619f7c8cb..1f203834a7cf 100644 --- a/tvix/store/src/directoryservice/memory.rs +++ b/tvix/store/src/directoryservice/memory.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, RwLock}; use tracing::{instrument, warn}; use super::utils::SimplePutter; -use super::{DirectoryService, DirectoryTraverser}; +use super::{DirectoryPutter, DirectoryService, DirectoryTraverser}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { @@ -12,8 +12,6 @@ pub struct MemoryDirectoryService { } impl DirectoryService for MemoryDirectoryService { - type DirectoriesIterator = DirectoryTraverser; - #[instrument(skip(self, digest), fields(directory.digest = %digest))] fn get(&self, digest: &B3Digest) -> Result, Error> { let db = self.db.read()?; @@ -68,17 +66,21 @@ impl DirectoryService for MemoryDirectoryService { } #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { - DirectoryTraverser::with(self.clone(), root_directory_digest) + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box> + Send> { + Box::new(DirectoryTraverser::with( + self.clone(), + root_directory_digest, + )) } - type DirectoryPutter = SimplePutter; - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Self::DirectoryPutter + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, { - SimplePutter::new(self.clone()) + Box::new(SimplePutter::new(self.clone())) } } diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index f387d28948f0..6589a5b62599 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -14,10 +14,7 @@ pub use self::utils::DirectoryTraverser; /// The base trait all Directory services need to implement. /// This is a simple get and put of [crate::proto::Directory], returning their /// digest. -pub trait DirectoryService { - type DirectoriesIterator: Iterator> + Send; - type DirectoryPutter: DirectoryPutter; - +pub trait DirectoryService: Send + Sync { /// Get looks up a single Directory message by its digest. /// In case the directory is not found, Ok(None) is returned. fn get(&self, digest: &B3Digest) -> Result, Error>; @@ -29,11 +26,14 @@ pub trait DirectoryService { /// Ideally this would be a `impl Iterator>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator; + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box> + Send>; /// Allows persisting a closure of [proto::Directory], which is a graph of /// connected Directory messages. - fn put_multiple_start(&self) -> Self::DirectoryPutter; + fn put_multiple_start(&self) -> Box; } /// Provides a handle to put a closure of connected [proto::Directory] elements. @@ -51,4 +51,8 @@ pub trait DirectoryPutter { /// Close the stream, and wait for any errors. fn close(&mut self) -> Result; + + /// Return whether the stream is closed or not. + /// Used from some [DirectoryService] implementations only. + fn is_closed(&self) -> bool; } diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs index e189e8acf507..8ed2c59c2d0f 100644 --- a/tvix/store/src/directoryservice/sled.rs +++ b/tvix/store/src/directoryservice/sled.rs @@ -1,3 +1,4 @@ +use crate::directoryservice::DirectoryPutter; use crate::proto::Directory; use crate::{proto, B3Digest, Error}; use prost::Message; @@ -29,8 +30,6 @@ impl SledDirectoryService { } impl DirectoryService for SledDirectoryService { - type DirectoriesIterator = DirectoryTraverser; - #[instrument(skip(self, digest), fields(directory.digest = %digest))] fn get(&self, digest: &B3Digest) -> Result, Error> { match self.db.get(digest.to_vec()) { @@ -91,17 +90,22 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { - DirectoryTraverser::with(self.clone(), root_directory_digest) + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box<(dyn Iterator> + std::marker::Send + 'static)> + { + Box::new(DirectoryTraverser::with( + self.clone(), + root_directory_digest, + )) } - type DirectoryPutter = SimplePutter; - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Self::DirectoryPutter + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, { - SimplePutter::new(self.clone()) + Box::new(SimplePutter::new(self.clone())) } } diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs index 8691baa8b73f..c1c0c6f8df64 100644 --- a/tvix/store/src/directoryservice/traverse.rs +++ b/tvix/store/src/directoryservice/traverse.rs @@ -10,8 +10,8 @@ use tracing::{instrument, warn}; /// TODO: the name of this function (and mod) is a bit bad, because it doesn't /// clearly distinguish it from the BFS traversers. #[instrument(skip(directory_service))] -pub fn traverse_to( - directory_service: &DS, +pub fn traverse_to( + directory_service: &Box, node: crate::proto::node::Node, path: &std::path::Path, ) -> Result, Error> { @@ -82,13 +82,9 @@ pub fn traverse_to( mod tests { use std::path::PathBuf; - use crate::{ - directoryservice::DirectoryPutter, - directoryservice::DirectoryService, - tests::{ - fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, - utils::gen_directory_service, - }, + use crate::tests::{ + fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, + utils::gen_directory_service, }; use super::traverse_to; diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs index 3661808734f3..d152fb78a90c 100644 --- a/tvix/store/src/directoryservice/utils.rs +++ b/tvix/store/src/directoryservice/utils.rs @@ -107,12 +107,14 @@ impl Iterator for DirectoryTraverser { pub struct SimplePutter { directory_service: DS, last_directory_digest: Option, + closed: bool, } impl SimplePutter { pub fn new(directory_service: DS) -> Self { Self { directory_service, + closed: false, last_directory_digest: None, } } @@ -120,6 +122,10 @@ impl SimplePutter { impl DirectoryPutter for SimplePutter { fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + let digest = self.directory_service.put(directory)?; // track the last directory digest @@ -130,11 +136,22 @@ impl DirectoryPutter for SimplePutter { /// We need to be mutable here, as that's the signature of the trait. fn close(&mut self) -> Result { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + match &self.last_directory_digest { - Some(last_digest) => Ok(last_digest.clone()), + Some(last_digest) => { + self.closed = true; + Ok(last_digest.clone()) + } None => Err(Error::InvalidRequest( "no directories sent, can't show root digest".to_string(), )), } } + + fn is_closed(&self) -> bool { + self.closed + } } -- cgit 1.4.1