diff options
author | Florian Klink <flokli@flokli.de> | 2023-06-09T09·26+0300 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-06-12T10·24+0000 |
commit | 7725eb53ad67730e92a3839a6c10925c668e5586 (patch) | |
tree | 82b8abf8e52630039d2a0cd3ae8b251c32e863bd /tvix/store | |
parent | 6f85dbfc06c4fa96deb968cfeb7e98ba36e95043 (diff) |
refactor(tvix/store): use Box<dyn DirectoryService> r/6272
Once we support configuring services at runtime, we don't know what DirectoryService we're using at compile time. This also means, we can't explicitly use the is_closed method from GRPCPutter, without making it part of the DirectoryPutter itself. Change-Id: Icd2a1ec4fc5649a6cd15c9cc7db4c2b473630431 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8727 Autosubmit: flokli <flokli@flokli.de> Reviewed-by: tazjin <tazjin@tvl.su> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 9 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/grpc.rs | 36 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/memory.rs | 20 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/mod.rs | 16 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/sled.rs | 20 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/traverse.rs | 14 | ||||
-rw-r--r-- | tvix/store/src/directoryservice/utils.rs | 19 | ||||
-rw-r--r-- | tvix/store/src/import.rs | 9 | ||||
-rw-r--r-- | tvix/store/src/nar/renderer.rs | 12 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 21 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 20 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_directoryservice_wrapper.rs | 15 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_directoryservice.rs | 4 | ||||
-rw-r--r-- | tvix/store/src/store_io.rs | 19 | ||||
-rw-r--r-- | tvix/store/src/tests/import.rs | 1 | ||||
-rw-r--r-- | tvix/store/src/tests/nar_renderer.rs | 15 | ||||
-rw-r--r-- | tvix/store/src/tests/utils.rs | 8 |
17 files changed, 135 insertions, 123 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 97e2447ec82a..4cedce6849d2 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -9,6 +9,7 @@ use tracing_subscriber::prelude::*; use tvix_store::blobservice::BlobService; use tvix_store::blobservice::GRPCBlobService; use tvix_store::blobservice::SledBlobService; +use tvix_store::directoryservice::DirectoryService; use tvix_store::directoryservice::GRPCDirectoryService; use tvix_store::directoryservice::SledDirectoryService; use tvix_store::pathinfoservice::GRPCPathInfoService; @@ -103,10 +104,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let boxed_blob_service: Box<dyn BlobService> = Box::new(blob_service.clone()); let boxed_blob_service2: Box<dyn BlobService> = Box::new(blob_service.clone()); let directory_service = SledDirectoryService::new("directories.sled".into())?; + let boxed_directory_service = Box::new(directory_service.clone()); + let boxed_directory_service2: Box<dyn DirectoryService> = Box::new(directory_service); let path_info_service = SledPathInfoService::new( "pathinfo.sled".into(), boxed_blob_service, - directory_service.clone(), + boxed_directory_service, )?; let listen_address = listen_address @@ -122,7 +125,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { boxed_blob_service2, ))) .add_service(DirectoryServiceServer::new( - GRPCDirectoryServiceWrapper::from(directory_service), + GRPCDirectoryServiceWrapper::from(boxed_directory_service2), )) .add_service(PathInfoServiceServer::new( GRPCPathInfoServiceWrapper::from(path_info_service), @@ -154,7 +157,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { let io = Arc::new(TvixStoreIO::new( Box::new(blob_service), - directory_service, + Box::new(directory_service), path_info_service, )); diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index 1b33572cf7de..3b1a7906f7d0 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -46,8 +46,6 @@ impl GRPCDirectoryService { } impl DirectoryService for GRPCDirectoryService { - type DirectoriesIterator = StreamIterator; - fn get(&self, digest: &B3Digest) -> Result<Option<crate::proto::Directory>, crate::Error> { // Get a new handle to the gRPC client, and copy the digest. let mut grpc_client = self.grpc_client.clone(); @@ -113,7 +111,10 @@ impl DirectoryService for GRPCDirectoryService { } #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> { let mut grpc_client = self.grpc_client.clone(); let root_directory_digest_as_vec = root_directory_digest.to_vec(); @@ -132,17 +133,15 @@ impl DirectoryService for GRPCDirectoryService { let stream = self.tokio_handle.block_on(task).unwrap().unwrap(); - StreamIterator::new( + Box::new(StreamIterator::new( self.tokio_handle.clone(), root_directory_digest.clone(), stream, - ) + )) } - type DirectoryPutter = GRPCPutter; - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Self::DirectoryPutter + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, { @@ -160,7 +159,7 @@ impl DirectoryService for GRPCDirectoryService { Ok(s) }); - GRPCPutter::new(self.tokio_handle.clone(), tx, task) + Box::new(GRPCPutter::new(self.tokio_handle.clone(), tx, task)) } } @@ -276,15 +275,6 @@ impl GRPCPutter { rq: Some((task, directory_sender)), } } - - #[allow(dead_code)] - // allows checking if the tx part of the channel is closed. - fn is_closed(&self) -> bool { - match self.rq { - None => true, - Some((_, ref directory_sender)) => directory_sender.is_closed(), - } - } } impl DirectoryPutter for GRPCPutter { @@ -329,6 +319,14 @@ impl DirectoryPutter for GRPCPutter { } } } + + // allows checking if the tx part of the channel is closed. + fn is_closed(&self) -> bool { + match self.rq { + None => true, + Some((_, ref directory_sender)) => directory_sender.is_closed(), + } + } } #[cfg(test)] @@ -342,7 +340,7 @@ mod tests { use tonic::transport::{Endpoint, Server, Uri}; use crate::{ - directoryservice::{DirectoryPutter, DirectoryService}, + directoryservice::DirectoryService, proto, proto::{directory_service_server::DirectoryServiceServer, GRPCDirectoryServiceWrapper}, tests::{ diff --git a/tvix/store/src/directoryservice/memory.rs b/tvix/store/src/directoryservice/memory.rs index 1fd619f7c8cb..1f203834a7cf 100644 --- a/tvix/store/src/directoryservice/memory.rs +++ b/tvix/store/src/directoryservice/memory.rs @@ -4,7 +4,7 @@ use std::sync::{Arc, RwLock}; use tracing::{instrument, warn}; use super::utils::SimplePutter; -use super::{DirectoryService, DirectoryTraverser}; +use super::{DirectoryPutter, DirectoryService, DirectoryTraverser}; #[derive(Clone, Default)] pub struct MemoryDirectoryService { @@ -12,8 +12,6 @@ pub struct MemoryDirectoryService { } impl DirectoryService for MemoryDirectoryService { - type DirectoriesIterator = DirectoryTraverser<Self>; - #[instrument(skip(self, digest), fields(directory.digest = %digest))] fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { let db = self.db.read()?; @@ -68,17 +66,21 @@ impl DirectoryService for MemoryDirectoryService { } #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { - DirectoryTraverser::with(self.clone(), root_directory_digest) + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send> { + Box::new(DirectoryTraverser::with( + self.clone(), + root_directory_digest, + )) } - type DirectoryPutter = SimplePutter<Self>; - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Self::DirectoryPutter + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, { - SimplePutter::new(self.clone()) + Box::new(SimplePutter::new(self.clone())) } } diff --git a/tvix/store/src/directoryservice/mod.rs b/tvix/store/src/directoryservice/mod.rs index f387d28948f0..6589a5b62599 100644 --- a/tvix/store/src/directoryservice/mod.rs +++ b/tvix/store/src/directoryservice/mod.rs @@ -14,10 +14,7 @@ pub use self::utils::DirectoryTraverser; /// The base trait all Directory services need to implement. /// This is a simple get and put of [crate::proto::Directory], returning their /// digest. -pub trait DirectoryService { - type DirectoriesIterator: Iterator<Item = Result<proto::Directory, Error>> + Send; - type DirectoryPutter: DirectoryPutter; - +pub trait DirectoryService: Send + Sync { /// Get looks up a single Directory message by its digest. /// In case the directory is not found, Ok(None) is returned. fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error>; @@ -29,11 +26,14 @@ pub trait DirectoryService { /// Ideally this would be a `impl Iterator<Item = Result<proto::Directory, Error>>`, /// and we'd be able to add a default implementation for it here, but /// we can't have that yet. - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator; + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box<dyn Iterator<Item = Result<proto::Directory, Error>> + Send>; /// Allows persisting a closure of [proto::Directory], which is a graph of /// connected Directory messages. - fn put_multiple_start(&self) -> Self::DirectoryPutter; + fn put_multiple_start(&self) -> Box<dyn DirectoryPutter>; } /// Provides a handle to put a closure of connected [proto::Directory] elements. @@ -51,4 +51,8 @@ pub trait DirectoryPutter { /// Close the stream, and wait for any errors. fn close(&mut self) -> Result<B3Digest, Error>; + + /// Return whether the stream is closed or not. + /// Used from some [DirectoryService] implementations only. + fn is_closed(&self) -> bool; } diff --git a/tvix/store/src/directoryservice/sled.rs b/tvix/store/src/directoryservice/sled.rs index e189e8acf507..8ed2c59c2d0f 100644 --- a/tvix/store/src/directoryservice/sled.rs +++ b/tvix/store/src/directoryservice/sled.rs @@ -1,3 +1,4 @@ +use crate::directoryservice::DirectoryPutter; use crate::proto::Directory; use crate::{proto, B3Digest, Error}; use prost::Message; @@ -29,8 +30,6 @@ impl SledDirectoryService { } impl DirectoryService for SledDirectoryService { - type DirectoriesIterator = DirectoryTraverser<Self>; - #[instrument(skip(self, digest), fields(directory.digest = %digest))] fn get(&self, digest: &B3Digest) -> Result<Option<proto::Directory>, Error> { match self.db.get(digest.to_vec()) { @@ -91,17 +90,22 @@ impl DirectoryService for SledDirectoryService { } #[instrument(skip_all, fields(directory.digest = %root_directory_digest))] - fn get_recursive(&self, root_directory_digest: &B3Digest) -> Self::DirectoriesIterator { - DirectoryTraverser::with(self.clone(), root_directory_digest) + fn get_recursive( + &self, + root_directory_digest: &B3Digest, + ) -> Box<(dyn Iterator<Item = Result<proto::Directory, Error>> + std::marker::Send + 'static)> + { + Box::new(DirectoryTraverser::with( + self.clone(), + root_directory_digest, + )) } - type DirectoryPutter = SimplePutter<Self>; - #[instrument(skip_all)] - fn put_multiple_start(&self) -> Self::DirectoryPutter + fn put_multiple_start(&self) -> Box<(dyn DirectoryPutter + 'static)> where Self: Clone, { - SimplePutter::new(self.clone()) + Box::new(SimplePutter::new(self.clone())) } } diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs index 8691baa8b73f..c1c0c6f8df64 100644 --- a/tvix/store/src/directoryservice/traverse.rs +++ b/tvix/store/src/directoryservice/traverse.rs @@ -10,8 +10,8 @@ use tracing::{instrument, warn}; /// TODO: the name of this function (and mod) is a bit bad, because it doesn't /// clearly distinguish it from the BFS traversers. #[instrument(skip(directory_service))] -pub fn traverse_to<DS: DirectoryService>( - directory_service: &DS, +pub fn traverse_to( + directory_service: &Box<dyn DirectoryService>, node: crate::proto::node::Node, path: &std::path::Path, ) -> Result<Option<crate::proto::node::Node>, Error> { @@ -82,13 +82,9 @@ pub fn traverse_to<DS: DirectoryService>( mod tests { use std::path::PathBuf; - use crate::{ - directoryservice::DirectoryPutter, - directoryservice::DirectoryService, - tests::{ - fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, - utils::gen_directory_service, - }, + use crate::tests::{ + fixtures::{DIRECTORY_COMPLICATED, DIRECTORY_WITH_KEEP}, + utils::gen_directory_service, }; use super::traverse_to; diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs index 3661808734f3..d152fb78a90c 100644 --- a/tvix/store/src/directoryservice/utils.rs +++ b/tvix/store/src/directoryservice/utils.rs @@ -107,12 +107,14 @@ impl<DS: DirectoryService> Iterator for DirectoryTraverser<DS> { pub struct SimplePutter<DS: DirectoryService> { directory_service: DS, last_directory_digest: Option<B3Digest>, + closed: bool, } impl<DS: DirectoryService> SimplePutter<DS> { pub fn new(directory_service: DS) -> Self { Self { directory_service, + closed: false, last_directory_digest: None, } } @@ -120,6 +122,10 @@ impl<DS: DirectoryService> SimplePutter<DS> { impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { fn put(&mut self, directory: proto::Directory) -> Result<(), Error> { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + let digest = self.directory_service.put(directory)?; // track the last directory digest @@ -130,11 +136,22 @@ impl<DS: DirectoryService> DirectoryPutter for SimplePutter<DS> { /// We need to be mutable here, as that's the signature of the trait. fn close(&mut self) -> Result<B3Digest, Error> { + if self.closed { + return Err(Error::StorageError("already closed".to_string())); + } + match &self.last_directory_digest { - Some(last_digest) => Ok(last_digest.clone()), + Some(last_digest) => { + self.closed = true; + Ok(last_digest.clone()) + } None => Err(Error::InvalidRequest( "no directories sent, can't show root digest".to_string(), )), } } + + fn is_closed(&self) -> bool { + self.closed + } } diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index a0bd1de5e149..d07ddfc41ea2 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -56,9 +56,9 @@ impl From<super::Error> for Error { // // It assumes the caller adds returned nodes to the directories it assembles. #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] -fn process_entry<DP: DirectoryPutter>( +fn process_entry( blob_service: &Box<dyn BlobService>, - directory_putter: &mut DP, + directory_putter: &mut Box<dyn DirectoryPutter>, entry: &walkdir::DirEntry, maybe_directory: Option<proto::Directory>, ) -> Result<proto::node::Node, Error> { @@ -145,9 +145,9 @@ fn process_entry<DP: DirectoryPutter>( /// possibly register it somewhere (and potentially rename it based on some /// naming scheme. #[instrument(skip(blob_service, directory_service), fields(path=?p))] -pub fn ingest_path<DS: DirectoryService, P: AsRef<Path> + Debug>( +pub fn ingest_path<P: AsRef<Path> + Debug>( blob_service: &Box<dyn BlobService>, - directory_service: &DS, + directory_service: &Box<dyn DirectoryService>, p: P, ) -> Result<proto::node::Node, Error> { // Probe if the path points to a symlink. If it does, we process it manually, @@ -174,6 +174,7 @@ pub fn ingest_path<DS: DirectoryService, P: AsRef<Path> + Debug>( let mut directories: HashMap<PathBuf, proto::Directory> = HashMap::default(); + // TODO: pass this one instead? let mut directory_putter = directory_service.put_multiple_start(); for entry in WalkDir::new(p) diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index 80bf9bc6d816..97dfcfee6e32 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -13,10 +13,10 @@ use tracing::warn; /// Invoke [render_nar], and return the size and sha256 digest of the produced /// NAR output. -pub fn calculate_size_and_sha256<DS: DirectoryService + Clone>( +pub fn calculate_size_and_sha256( root_node: &proto::node::Node, blob_service: &Box<dyn BlobService>, - directory_service: DS, + directory_service: &Box<dyn DirectoryService>, ) -> Result<(u64, [u8; 32]), RenderError> { let h = Sha256::new(); let mut cw = CountWrite::from(h); @@ -30,11 +30,11 @@ pub fn calculate_size_and_sha256<DS: DirectoryService + Clone>( /// and uses the passed blob_service and directory_service to /// perform the necessary lookups as it traverses the structure. /// The contents in NAR serialization are writen to the passed [std::io::Write]. -pub fn write_nar<W: std::io::Write, DS: DirectoryService + Clone>( +pub fn write_nar<W: std::io::Write>( w: &mut W, proto_root_node: &proto::node::Node, blob_service: &Box<dyn BlobService>, - directory_service: DS, + directory_service: &Box<dyn DirectoryService>, ) -> Result<(), RenderError> { // Initialize NAR writer let nar_root_node = nar::writer::open(w).map_err(RenderError::NARWriterError)?; @@ -49,11 +49,11 @@ pub fn write_nar<W: std::io::Write, DS: DirectoryService + Clone>( /// Process an intermediate node in the structure. /// This consumes the node. -fn walk_node<DS: DirectoryService + Clone>( +fn walk_node( nar_node: nar::writer::Node, proto_node: &proto::node::Node, blob_service: &Box<dyn BlobService>, - directory_service: DS, + directory_service: &Box<dyn DirectoryService>, ) -> Result<(), RenderError> { match proto_node { proto::node::Node::Symlink(proto_symlink_node) => { diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index 5b48ed9efa34..1457f3d367f6 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -8,15 +8,18 @@ use std::{ sync::{Arc, RwLock}, }; -pub struct MemoryPathInfoService<DS: DirectoryService> { +pub struct MemoryPathInfoService { db: Arc<RwLock<HashMap<[u8; 20], proto::PathInfo>>>, blob_service: Box<dyn BlobService>, - directory_service: DS, + directory_service: Box<dyn DirectoryService>, } -impl<DS: DirectoryService> MemoryPathInfoService<DS> { - pub fn new(blob_service: Box<dyn BlobService>, directory_service: DS) -> Self { +impl MemoryPathInfoService { + pub fn new( + blob_service: Box<dyn BlobService>, + directory_service: Box<dyn DirectoryService>, + ) -> Self { Self { db: Default::default(), blob_service, @@ -25,7 +28,7 @@ impl<DS: DirectoryService> MemoryPathInfoService<DS> { } } -impl<DS: DirectoryService + Clone> PathInfoService for MemoryPathInfoService<DS> { +impl PathInfoService for MemoryPathInfoService { fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { let db = self.db.read().unwrap(); @@ -55,11 +58,7 @@ impl<DS: DirectoryService + Clone> PathInfoService for MemoryPathInfoService<DS> } fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { - calculate_size_and_sha256( - root_node, - &self.blob_service, - self.directory_service.clone(), - ) - .map_err(|e| Error::StorageError(e.to_string())) + calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service) + .map_err(|e| Error::StorageError(e.to_string())) } } diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 98ea60ff4440..a5aa987020b9 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -11,18 +11,18 @@ use tracing::warn; /// /// The PathInfo messages are stored as encoded protos, and keyed by their output hash, /// as that's currently the only request type available. -pub struct SledPathInfoService<DS: DirectoryService> { +pub struct SledPathInfoService { db: sled::Db, blob_service: Box<dyn BlobService>, - directory_service: DS, + directory_service: Box<dyn DirectoryService>, } -impl<DS: DirectoryService> SledPathInfoService<DS> { +impl SledPathInfoService { pub fn new( p: PathBuf, blob_service: Box<dyn BlobService>, - directory_service: DS, + directory_service: Box<dyn DirectoryService>, ) -> Result<Self, sled::Error> { let config = sled::Config::default().use_compression(true).path(p); let db = config.open()?; @@ -36,7 +36,7 @@ impl<DS: DirectoryService> SledPathInfoService<DS> { pub fn new_temporary( blob_service: Box<dyn BlobService>, - directory_service: DS, + directory_service: Box<dyn DirectoryService>, ) -> Result<Self, sled::Error> { let config = sled::Config::default().temporary(true); let db = config.open()?; @@ -49,7 +49,7 @@ impl<DS: DirectoryService> SledPathInfoService<DS> { } } -impl<DS: DirectoryService + Clone> PathInfoService for SledPathInfoService<DS> { +impl PathInfoService for SledPathInfoService { fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { match self.db.get(digest) { Ok(None) => Ok(None), @@ -95,11 +95,7 @@ impl<DS: DirectoryService + Clone> PathInfoService for SledPathInfoService<DS> { } fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { - calculate_size_and_sha256( - root_node, - &self.blob_service, - self.directory_service.clone(), - ) - .map_err(|e| Error::StorageError(e.to_string())) + calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service) + .map_err(|e| Error::StorageError(e.to_string())) } } diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs index 6d2df310137f..f27688c4e962 100644 --- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs @@ -1,27 +1,26 @@ use crate::proto; use crate::{directoryservice::DirectoryService, B3Digest}; use std::collections::HashMap; +use std::sync::Arc; use tokio::{sync::mpsc::channel, task}; use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Status, Streaming}; use tracing::{debug, instrument, warn}; -pub struct GRPCDirectoryServiceWrapper<C: DirectoryService> { - directory_service: C, +pub struct GRPCDirectoryServiceWrapper { + directory_service: Arc<Box<dyn DirectoryService>>, } -impl<DS: DirectoryService> From<DS> for GRPCDirectoryServiceWrapper<DS> { - fn from(value: DS) -> Self { +impl From<Box<dyn DirectoryService>> for GRPCDirectoryServiceWrapper { + fn from(value: Box<dyn DirectoryService>) -> Self { Self { - directory_service: value, + directory_service: Arc::new(value), } } } #[async_trait] -impl<DS: DirectoryService + Send + Sync + Clone + 'static> - proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper<DS> -{ +impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceWrapper { type GetStream = ReceiverStream<tonic::Result<proto::Directory, Status>>; #[instrument(skip(self))] diff --git a/tvix/store/src/proto/tests/grpc_directoryservice.rs b/tvix/store/src/proto/tests/grpc_directoryservice.rs index 069e82f6463e..a1058706d521 100644 --- a/tvix/store/src/proto/tests/grpc_directoryservice.rs +++ b/tvix/store/src/proto/tests/grpc_directoryservice.rs @@ -1,4 +1,3 @@ -use crate::directoryservice::DirectoryService; use crate::proto::directory_service_server::DirectoryService as GRPCDirectoryService; use crate::proto::get_directory_request::ByWhat; use crate::proto::{Directory, DirectoryNode, SymlinkNode}; @@ -8,8 +7,7 @@ use crate::tests::utils::gen_directory_service; use tokio_stream::StreamExt; use tonic::Status; -fn gen_grpc_service( -) -> GRPCDirectoryServiceWrapper<impl DirectoryService + Send + Sync + Clone + 'static> { +fn gen_grpc_service() -> GRPCDirectoryServiceWrapper { let directory_service = gen_directory_service(); GRPCDirectoryServiceWrapper::from(directory_service) } diff --git a/tvix/store/src/store_io.rs b/tvix/store/src/store_io.rs index 3820931e281f..ee302f3b58ba 100644 --- a/tvix/store/src/store_io.rs +++ b/tvix/store/src/store_io.rs @@ -29,17 +29,17 @@ use crate::{ /// This is to both cover cases of syntactically valid store paths, that exist /// on the filesystem (still managed by Nix), as well as being able to read /// files outside store paths. -pub struct TvixStoreIO<DS: DirectoryService, PS: PathInfoService> { +pub struct TvixStoreIO<PS: PathInfoService> { blob_service: Box<dyn BlobService>, - directory_service: DS, + directory_service: Box<dyn DirectoryService>, path_info_service: PS, std_io: StdIO, } -impl<DS: DirectoryService + Clone, PS: PathInfoService> TvixStoreIO<DS, PS> { +impl<PS: PathInfoService> TvixStoreIO<PS> { pub fn new( blob_service: Box<dyn BlobService>, - directory_service: DS, + directory_service: Box<dyn DirectoryService>, path_info_service: PS, ) -> Self { Self { @@ -104,12 +104,9 @@ impl<DS: DirectoryService + Clone, PS: PathInfoService> TvixStoreIO<DS, PS> { .expect("error during import_path"); // Render the NAR - let (nar_size, nar_sha256) = calculate_size_and_sha256( - &root_node, - &self.blob_service, - self.directory_service.clone(), - ) - .expect("error during nar calculation"); // TODO: handle error + let (nar_size, nar_sha256) = + calculate_size_and_sha256(&root_node, &self.blob_service, &self.directory_service) + .expect("error during nar calculation"); // TODO: handle error // For given NAR sha256 digest and name, return the new [StorePath] this would have. let nar_hash_with_mode = @@ -175,7 +172,7 @@ fn calculate_nar_based_store_path(nar_sha256_digest: &[u8; 32], name: &str) -> S build_regular_ca_path(name, &nar_hash_with_mode, Vec::<String>::new(), false).unwrap() } -impl<DS: DirectoryService + Clone, PS: PathInfoService> EvalIO for TvixStoreIO<DS, PS> { +impl<PS: PathInfoService> EvalIO for TvixStoreIO<PS> { #[instrument(skip(self), ret, err)] fn path_exists(&self, path: &Path) -> Result<bool, io::Error> { if let Ok((store_path, sub_path)) = diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs index 725d467bd0cf..3498cf444640 100644 --- a/tvix/store/src/tests/import.rs +++ b/tvix/store/src/tests/import.rs @@ -1,5 +1,4 @@ use super::utils::{gen_blob_service, gen_directory_service}; -use crate::directoryservice::DirectoryService; use crate::import::ingest_path; use crate::proto; use crate::tests::fixtures::DIRECTORY_COMPLICATED; diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 20cca1e77848..bc9cc635eec3 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -1,4 +1,3 @@ -use crate::directoryservice::DirectoryService; use crate::nar::calculate_size_and_sha256; use crate::nar::write_nar; use crate::proto::DirectoryNode; @@ -21,7 +20,7 @@ fn single_symlink() { }), // don't put anything in the stores, as we don't actually do any requests. &gen_blob_service(), - gen_directory_service(), + &gen_directory_service(), ) .expect("must succeed"); @@ -43,7 +42,7 @@ fn single_file_missing_blob() { }), // the blobservice is empty intentionally, to provoke the error. &gen_blob_service(), - gen_directory_service(), + &gen_directory_service(), ) .expect_err("must fail"); @@ -83,7 +82,7 @@ fn single_file_wrong_blob_size() { executable: false, }), &blob_service, - gen_directory_service(), + &gen_directory_service(), ) .expect_err("must fail"); @@ -108,7 +107,7 @@ fn single_file_wrong_blob_size() { executable: false, }), &blob_service, - gen_directory_service(), + &gen_directory_service(), ) .expect_err("must fail"); @@ -145,7 +144,7 @@ fn single_file() { executable: false, }), &blob_service, - gen_directory_service(), + &gen_directory_service(), ) .expect("must succeed"); @@ -182,7 +181,7 @@ fn test_complicated() { size: DIRECTORY_COMPLICATED.size(), }), &blob_service, - directory_service.clone(), + &directory_service, ) .expect("must succeed"); @@ -196,7 +195,7 @@ fn test_complicated() { size: DIRECTORY_COMPLICATED.size(), }), &blob_service, - directory_service, + &directory_service, ) .expect("must succeed"); diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs index 6905fe56b5d7..285db449d36d 100644 --- a/tvix/store/src/tests/utils.rs +++ b/tvix/store/src/tests/utils.rs @@ -8,13 +8,13 @@ pub fn gen_blob_service() -> Box<dyn BlobService> { Box::new(MemoryBlobService::default()) } -pub fn gen_directory_service() -> impl DirectoryService + Send + Sync + Clone + 'static { - MemoryDirectoryService::default() +pub fn gen_directory_service() -> Box<dyn DirectoryService> { + Box::new(MemoryDirectoryService::default()) } -pub fn gen_pathinfo_service<DS: DirectoryService + Clone>( +pub fn gen_pathinfo_service( blob_service: Box<dyn BlobService>, - directory_service: DS, + directory_service: Box<dyn DirectoryService>, ) -> impl PathInfoService { MemoryPathInfoService::new(blob_service, directory_service) } |