From aa7bdc1199bfbb69091dda942a82812257e30bc4 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Fri, 9 Jun 2023 18:22:25 +0300 Subject: refactor(tvix/store): use Arc instead of Box This allows us to blob services without closing them before putting them in a box. We currently need to use Arc<_>, not Rc<_>, because the GRPC wrappers require Sync. Change-Id: I679c5f06b62304f5b0456cfefe25a0a881de7c84 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8738 Reviewed-by: tazjin Tested-by: BuildkiteCI Autosubmit: flokli --- tvix/store/src/bin/tvix-store.rs | 22 ++++++++-------- tvix/store/src/directoryservice/traverse.rs | 21 ++++++++-------- tvix/store/src/import.rs | 14 ++++++++--- tvix/store/src/nar/renderer.rs | 19 ++++++++------ tvix/store/src/pathinfoservice/memory.rs | 16 +++++++----- tvix/store/src/pathinfoservice/sled.rs | 22 +++++++++------- tvix/store/src/proto/grpc_blobservice_wrapper.rs | 8 +++--- .../src/proto/grpc_directoryservice_wrapper.rs | 8 +++--- tvix/store/src/store_io.rs | 29 ++++++++++++++-------- tvix/store/src/tests/import.rs | 22 +++++++++------- tvix/store/src/tests/nar_renderer.rs | 28 ++++++++++----------- tvix/store/src/tests/utils.rs | 14 ++++++----- 12 files changed, 126 insertions(+), 97 deletions(-) (limited to 'tvix/store') diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 4cedce6849..8cd87abe9c 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -100,16 +100,14 @@ async fn main() -> Result<(), Box> { match cli.command { Commands::Daemon { listen_address } => { // initialize stores - let blob_service = SledBlobService::new("blobs.sled".into())?; - let boxed_blob_service: Box = Box::new(blob_service.clone()); - let boxed_blob_service2: Box = Box::new(blob_service.clone()); - let directory_service = SledDirectoryService::new("directories.sled".into())?; - let boxed_directory_service = Box::new(directory_service.clone()); - let boxed_directory_service2: Box = Box::new(directory_service); + let blob_service: Arc = + Arc::new(SledBlobService::new("blobs.sled".into())?); + let directory_service: Arc = + Arc::new(SledDirectoryService::new("directories.sled".into())?); let path_info_service = SledPathInfoService::new( "pathinfo.sled".into(), - boxed_blob_service, - boxed_directory_service, + blob_service.clone(), + directory_service.clone(), )?; let listen_address = listen_address @@ -122,10 +120,10 @@ async fn main() -> Result<(), Box> { #[allow(unused_mut)] let mut router = server .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( - boxed_blob_service2, + blob_service, ))) .add_service(DirectoryServiceServer::new( - GRPCDirectoryServiceWrapper::from(boxed_directory_service2), + GRPCDirectoryServiceWrapper::from(directory_service), )) .add_service(PathInfoServiceServer::new( GRPCPathInfoServiceWrapper::from(path_info_service), @@ -156,8 +154,8 @@ async fn main() -> Result<(), Box> { GRPCPathInfoService::from_client(path_info_service_client.clone()); let io = Arc::new(TvixStoreIO::new( - Box::new(blob_service), - Box::new(directory_service), + Arc::new(blob_service), + Arc::new(directory_service), path_info_service, )); diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs index c1c0c6f8df..8dfccd4ffb 100644 --- a/tvix/store/src/directoryservice/traverse.rs +++ b/tvix/store/src/directoryservice/traverse.rs @@ -1,5 +1,6 @@ use super::DirectoryService; use crate::{proto::NamedNode, B3Digest, Error}; +use std::sync::Arc; use tracing::{instrument, warn}; /// This traverses from a (root) node to the given (sub)path, returning the Node @@ -11,7 +12,7 @@ use tracing::{instrument, warn}; /// clearly distinguish it from the BFS traversers. #[instrument(skip(directory_service))] pub fn traverse_to( - directory_service: &Box, + directory_service: Arc, node: crate::proto::node::Node, path: &std::path::Path, ) -> Result, Error> { @@ -91,7 +92,7 @@ mod tests { #[test] fn test_traverse_to() { - let mut directory_service = gen_directory_service(); + let directory_service = gen_directory_service(); let mut handle = directory_service.put_multiple_start(); handle @@ -121,7 +122,7 @@ mod tests { // traversal to an empty subpath should return the root node. { let resp = traverse_to( - &mut directory_service, + directory_service.clone(), node_directory_complicated.clone(), &PathBuf::from(""), ) @@ -133,7 +134,7 @@ mod tests { // traversal to `keep` should return the node for DIRECTORY_WITH_KEEP { let resp = traverse_to( - &mut directory_service, + directory_service.clone(), node_directory_complicated.clone(), &PathBuf::from("keep"), ) @@ -145,7 +146,7 @@ mod tests { // traversal to `keep/.keep` should return the node for the .keep file { let resp = traverse_to( - &mut directory_service, + directory_service.clone(), node_directory_complicated.clone(), &PathBuf::from("keep/.keep"), ) @@ -157,7 +158,7 @@ mod tests { // traversal to `keep/.keep` should return the node for the .keep file { let resp = traverse_to( - &mut directory_service, + directory_service.clone(), node_directory_complicated.clone(), &PathBuf::from("/keep/.keep"), ) @@ -169,7 +170,7 @@ mod tests { // traversal to `void` should return None (doesn't exist) { let resp = traverse_to( - &mut directory_service, + directory_service.clone(), node_directory_complicated.clone(), &PathBuf::from("void"), ) @@ -181,7 +182,7 @@ mod tests { // traversal to `void` should return None (doesn't exist) { let resp = traverse_to( - &mut directory_service, + directory_service.clone(), node_directory_complicated.clone(), &PathBuf::from("//v/oid"), ) @@ -194,7 +195,7 @@ mod tests { // reached, as keep/.keep already is a file) { let resp = traverse_to( - &mut directory_service, + directory_service.clone(), node_directory_complicated.clone(), &PathBuf::from("keep/.keep/foo"), ) @@ -206,7 +207,7 @@ mod tests { // traversal to a subpath of '/' should return the root node. { let resp = traverse_to( - &mut directory_service, + directory_service.clone(), node_directory_complicated.clone(), &PathBuf::from("/"), ) diff --git a/tvix/store/src/import.rs b/tvix/store/src/import.rs index d07ddfc41e..1e639e1d7c 100644 --- a/tvix/store/src/import.rs +++ b/tvix/store/src/import.rs @@ -1,6 +1,7 @@ use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; use crate::{directoryservice::DirectoryPutter, proto}; +use std::sync::Arc; use std::{ collections::HashMap, fmt::Debug, @@ -57,7 +58,7 @@ impl From for Error { // It assumes the caller adds returned nodes to the directories it assembles. #[instrument(skip_all, fields(entry.file_type=?&entry.file_type(),entry.path=?entry.path()))] fn process_entry( - blob_service: &Box, + blob_service: Arc, directory_putter: &mut Box, entry: &walkdir::DirEntry, maybe_directory: Option, @@ -146,8 +147,8 @@ fn process_entry( /// naming scheme. #[instrument(skip(blob_service, directory_service), fields(path=?p))] pub fn ingest_path + Debug>( - blob_service: &Box, - directory_service: &Box, + blob_service: Arc, + directory_service: Arc, p: P, ) -> Result { // Probe if the path points to a symlink. If it does, we process it manually, @@ -199,7 +200,12 @@ pub fn ingest_path + Debug>( } }; - let node = process_entry(blob_service, &mut directory_putter, &entry, maybe_directory)?; + let node = process_entry( + blob_service.clone(), + &mut directory_putter, + &entry, + maybe_directory, + )?; if entry.depth() == 0 { return Ok(node); diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index 97dfcfee6e..4bd31e1513 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -8,15 +8,18 @@ use crate::{ use count_write::CountWrite; use nix_compat::nar; use sha2::{Digest, Sha256}; -use std::io::{self, BufReader}; +use std::{ + io::{self, BufReader}, + sync::Arc, +}; use tracing::warn; /// Invoke [render_nar], and return the size and sha256 digest of the produced /// NAR output. pub fn calculate_size_and_sha256( root_node: &proto::node::Node, - blob_service: &Box, - directory_service: &Box, + blob_service: Arc, + directory_service: Arc, ) -> Result<(u64, [u8; 32]), RenderError> { let h = Sha256::new(); let mut cw = CountWrite::from(h); @@ -33,8 +36,8 @@ pub fn calculate_size_and_sha256( pub fn write_nar( w: &mut W, proto_root_node: &proto::node::Node, - blob_service: &Box, - directory_service: &Box, + blob_service: Arc, + directory_service: Arc, ) -> Result<(), RenderError> { // Initialize NAR writer let nar_root_node = nar::writer::open(w).map_err(RenderError::NARWriterError)?; @@ -52,8 +55,8 @@ pub fn write_nar( fn walk_node( nar_node: nar::writer::Node, proto_node: &proto::node::Node, - blob_service: &Box, - directory_service: &Box, + blob_service: Arc, + directory_service: Arc, ) -> Result<(), RenderError> { match proto_node { proto::node::Node::Symlink(proto_symlink_node) => { @@ -127,7 +130,7 @@ fn walk_node( walk_node( child_node, &proto_node, - blob_service, + blob_service.clone(), directory_service.clone(), )?; } diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index 1457f3d367..35455313cb 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -11,14 +11,14 @@ use std::{ pub struct MemoryPathInfoService { db: Arc>>, - blob_service: Box, - directory_service: Box, + blob_service: Arc, + directory_service: Arc, } impl MemoryPathInfoService { pub fn new( - blob_service: Box, - directory_service: Box, + blob_service: Arc, + directory_service: Arc, ) -> Self { Self { db: Default::default(), @@ -58,7 +58,11 @@ impl PathInfoService for MemoryPathInfoService { } fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { - calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service) - .map_err(|e| Error::StorageError(e.to_string())) + calculate_size_and_sha256( + root_node, + self.blob_service.clone(), + self.directory_service.clone(), + ) + .map_err(|e| Error::StorageError(e.to_string())) } } diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index a5aa987020..f06a905cef 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -4,7 +4,7 @@ use crate::{ proto, Error, }; use prost::Message; -use std::path::PathBuf; +use std::{path::PathBuf, sync::Arc}; use tracing::warn; /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). @@ -14,15 +14,15 @@ use tracing::warn; pub struct SledPathInfoService { db: sled::Db, - blob_service: Box, - directory_service: Box, + blob_service: Arc, + directory_service: Arc, } impl SledPathInfoService { pub fn new( p: PathBuf, - blob_service: Box, - directory_service: Box, + blob_service: Arc, + directory_service: Arc, ) -> Result { let config = sled::Config::default().use_compression(true).path(p); let db = config.open()?; @@ -35,8 +35,8 @@ impl SledPathInfoService { } pub fn new_temporary( - blob_service: Box, - directory_service: Box, + blob_service: Arc, + directory_service: Arc, ) -> Result { let config = sled::Config::default().temporary(true); let db = config.open()?; @@ -95,7 +95,11 @@ impl PathInfoService for SledPathInfoService { } fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { - calculate_size_and_sha256(root_node, &self.blob_service, &self.directory_service) - .map_err(|e| Error::StorageError(e.to_string())) + calculate_size_and_sha256( + root_node, + self.blob_service.clone(), + self.directory_service.clone(), + ) + .map_err(|e| Error::StorageError(e.to_string())) } } diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index 066790daf3..f1ab3a87e4 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,7 +1,7 @@ use crate::{ blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead, B3Digest, }; -use std::{collections::VecDeque, io, pin::Pin}; +use std::{collections::VecDeque, io, pin::Pin, sync::Arc}; use tokio::task; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; @@ -9,11 +9,11 @@ use tonic::{async_trait, Request, Response, Status, Streaming}; use tracing::{instrument, warn}; pub struct GRPCBlobServiceWrapper { - blob_service: Box, + blob_service: Arc, } -impl From> for GRPCBlobServiceWrapper { - fn from(value: Box) -> Self { +impl From> for GRPCBlobServiceWrapper { + fn from(value: Arc) -> Self { Self { blob_service: value, } diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs index f27688c4e9..434d660f3d 100644 --- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs @@ -8,13 +8,13 @@ use tonic::{async_trait, Request, Response, Status, Streaming}; use tracing::{debug, instrument, warn}; pub struct GRPCDirectoryServiceWrapper { - directory_service: Arc>, + directory_service: Arc, } -impl From> for GRPCDirectoryServiceWrapper { - fn from(value: Box) -> Self { +impl From> for GRPCDirectoryServiceWrapper { + fn from(value: Arc) -> Self { Self { - directory_service: Arc::new(value), + directory_service: value, } } } diff --git a/tvix/store/src/store_io.rs b/tvix/store/src/store_io.rs index ee302f3b58..c35da49da9 100644 --- a/tvix/store/src/store_io.rs +++ b/tvix/store/src/store_io.rs @@ -8,7 +8,7 @@ use nix_compat::{ store_path::{build_regular_ca_path, StorePath}, }; use smol_str::SmolStr; -use std::{io, path::Path, path::PathBuf}; +use std::{io, path::Path, path::PathBuf, sync::Arc}; use tracing::{error, instrument, warn}; use tvix_eval::{EvalIO, FileType, StdIO}; @@ -30,16 +30,16 @@ use crate::{ /// on the filesystem (still managed by Nix), as well as being able to read /// files outside store paths. pub struct TvixStoreIO { - blob_service: Box, - directory_service: Box, + blob_service: Arc, + directory_service: Arc, path_info_service: PS, std_io: StdIO, } impl TvixStoreIO { pub fn new( - blob_service: Box, - directory_service: Box, + blob_service: Arc, + directory_service: Arc, path_info_service: PS, ) -> Self { Self { @@ -86,7 +86,7 @@ impl TvixStoreIO { } }; - directoryservice::traverse_to(&self.directory_service, root_node, sub_path) + directoryservice::traverse_to(self.directory_service.clone(), root_node, sub_path) } /// Imports a given path on the filesystem into the store, and returns the @@ -100,13 +100,20 @@ impl TvixStoreIO { path: &std::path::Path, ) -> Result { // Call [import::ingest_path], which will walk over the given path and return a root_node. - let root_node = import::ingest_path(&self.blob_service, &self.directory_service, path) - .expect("error during import_path"); + let root_node = import::ingest_path( + self.blob_service.clone(), + self.directory_service.clone(), + path, + ) + .expect("error during import_path"); // Render the NAR - let (nar_size, nar_sha256) = - calculate_size_and_sha256(&root_node, &self.blob_service, &self.directory_service) - .expect("error during nar calculation"); // TODO: handle error + let (nar_size, nar_sha256) = calculate_size_and_sha256( + &root_node, + self.blob_service.clone(), + self.directory_service.clone(), + ) + .expect("error during nar calculation"); // TODO: handle error // For given NAR sha256 digest and name, return the new [StorePath] this would have. let nar_hash_with_mode = diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs index 3498cf4446..ab65574219 100644 --- a/tvix/store/src/tests/import.rs +++ b/tvix/store/src/tests/import.rs @@ -18,8 +18,8 @@ fn symlink() { .unwrap(); let root_node = ingest_path( - &mut gen_blob_service(), - &mut gen_directory_service(), + gen_blob_service(), + gen_directory_service(), tmpdir.path().join("doesntmatter"), ) .expect("must succeed"); @@ -39,11 +39,11 @@ fn single_file() { std::fs::write(tmpdir.path().join("root"), HELLOWORLD_BLOB_CONTENTS).unwrap(); - let mut blob_service = gen_blob_service(); + let blob_service = gen_blob_service(); let root_node = ingest_path( - &mut blob_service, - &mut gen_directory_service(), + blob_service.clone(), + gen_directory_service(), tmpdir.path().join("root"), ) .expect("must succeed"); @@ -75,11 +75,15 @@ fn complicated() { // File ``keep/.keep` std::fs::write(tmpdir.path().join("keep").join(".keep"), vec![]).unwrap(); - let mut blob_service = gen_blob_service(); - let mut directory_service = gen_directory_service(); + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); - let root_node = ingest_path(&mut blob_service, &mut directory_service, tmpdir.path()) - .expect("must succeed"); + let root_node = ingest_path( + blob_service.clone(), + directory_service.clone(), + tmpdir.path(), + ) + .expect("must succeed"); // ensure root_node matched expectations assert_eq!( diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index bc9cc635ee..37a01c2e18 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -19,8 +19,8 @@ fn single_symlink() { target: "/nix/store/somewhereelse".to_string(), }), // don't put anything in the stores, as we don't actually do any requests. - &gen_blob_service(), - &gen_directory_service(), + gen_blob_service(), + gen_directory_service(), ) .expect("must succeed"); @@ -41,8 +41,8 @@ fn single_file_missing_blob() { executable: false, }), // the blobservice is empty intentionally, to provoke the error. - &gen_blob_service(), - &gen_directory_service(), + gen_blob_service(), + gen_directory_service(), ) .expect_err("must fail"); @@ -81,8 +81,8 @@ fn single_file_wrong_blob_size() { size: 42, // <- note the wrong size here! executable: false, }), - &blob_service, - &gen_directory_service(), + blob_service.clone(), + gen_directory_service(), ) .expect_err("must fail"); @@ -106,8 +106,8 @@ fn single_file_wrong_blob_size() { size: 2, // <- note the wrong size here! executable: false, }), - &blob_service, - &gen_directory_service(), + blob_service, + gen_directory_service(), ) .expect_err("must fail"); @@ -143,8 +143,8 @@ fn single_file() { size: HELLOWORLD_BLOB_CONTENTS.len() as u32, executable: false, }), - &blob_service, - &gen_directory_service(), + blob_service, + gen_directory_service(), ) .expect("must succeed"); @@ -180,8 +180,8 @@ fn test_complicated() { digest: DIRECTORY_COMPLICATED.digest().to_vec(), size: DIRECTORY_COMPLICATED.size(), }), - &blob_service, - &directory_service, + blob_service.clone(), + directory_service.clone(), ) .expect("must succeed"); @@ -194,8 +194,8 @@ fn test_complicated() { digest: DIRECTORY_COMPLICATED.digest().to_vec(), size: DIRECTORY_COMPLICATED.size(), }), - &blob_service, - &directory_service, + blob_service, + directory_service, ) .expect("must succeed"); diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs index 285db449d3..171919e4e7 100644 --- a/tvix/store/src/tests/utils.rs +++ b/tvix/store/src/tests/utils.rs @@ -1,20 +1,22 @@ +use std::sync::Arc; + use crate::{ blobservice::{BlobService, MemoryBlobService}, directoryservice::{DirectoryService, MemoryDirectoryService}, pathinfoservice::{MemoryPathInfoService, PathInfoService}, }; -pub fn gen_blob_service() -> Box { - Box::new(MemoryBlobService::default()) +pub fn gen_blob_service() -> Arc { + Arc::new(MemoryBlobService::default()) } -pub fn gen_directory_service() -> Box { - Box::new(MemoryDirectoryService::default()) +pub fn gen_directory_service() -> Arc { + Arc::new(MemoryDirectoryService::default()) } pub fn gen_pathinfo_service( - blob_service: Box, - directory_service: Box, + blob_service: Arc, + directory_service: Arc, ) -> impl PathInfoService { MemoryPathInfoService::new(blob_service, directory_service) } -- cgit 1.4.1