From 52cad8619511b97c4bcd5768ce9b3579ff665505 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Sun, 17 Dec 2023 01:32:38 +0200 Subject: refactor(tvix/store): remove Arc<> from PathInfoService::from_addr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This makes PathInfoService::from_addr return a Box, rather than an Arc, and leaves it up to the consumers to rewrap it into an Arc where needed. This allows us to drop the Arc for the tvix-store daemon subcommand. Change-Id: Ic83aa2ade6c51912281bd17c7eef7252e152b2d1 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10409 Autosubmit: flokli Tested-by: BuildkiteCI Reviewed-by: sterni --- tvix/store/src/bin/tvix-store.rs | 17 ++++++++++++--- tvix/store/src/pathinfoservice/from_addr.rs | 12 +++++------ tvix/store/src/pathinfoservice/grpc.rs | 5 ++--- .../src/proto/grpc_pathinfoservice_wrapper.rs | 25 ++++++++++++---------- tvix/store/src/proto/tests/grpc_pathinfoservice.rs | 2 +- 5 files changed, 37 insertions(+), 24 deletions(-) (limited to 'tvix/store/src') diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 03ced18699ec..e4f2e0801b81 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -6,6 +6,7 @@ use nix_compat::store_path::StorePath; use std::io; use std::path::Path; use std::path::PathBuf; +use std::sync::Arc; use tokio::task::JoinHandle; use tokio_listener::Listener; use tokio_listener::SystemOptions; @@ -21,6 +22,7 @@ use tvix_castore::proto::GRPCBlobServiceWrapper; use tvix_castore::proto::GRPCDirectoryServiceWrapper; use tvix_castore::proto::NamedNode; use tvix_store::pathinfoservice; +use tvix_store::pathinfoservice::PathInfoService; use tvix_store::proto::nar_info; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCPathInfoServiceWrapper; @@ -217,9 +219,9 @@ async fn main() -> Result<(), Box> { .add_service(DirectoryServiceServer::new( GRPCDirectoryServiceWrapper::from(directory_service), )) - .add_service(PathInfoServiceServer::new( - GRPCPathInfoServiceWrapper::from(path_info_service), - )); + .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( + path_info_service, + ))); #[cfg(feature = "tonic-reflection")] { @@ -257,6 +259,9 @@ async fn main() -> Result<(), Box> { ) .await?; + // Arc the PathInfoService, as we clone it . + let path_info_service: Arc = path_info_service.into(); + let tasks = paths .into_iter() .map(|path| { @@ -356,6 +361,9 @@ async fn main() -> Result<(), Box> { ) .await?; + // Arc the PathInfoService, as TvixStoreFS requires Clone + let path_info_service: Arc = path_info_service.into(); + let mut fuse_daemon = tokio::task::spawn_blocking(move || { let f = TvixStoreFs::new( blob_service, @@ -397,6 +405,9 @@ async fn main() -> Result<(), Box> { ) .await?; + // Arc the PathInfoService, as TvixStoreFS requires Clone + let path_info_service: Arc = path_info_service.into(); + tokio::task::spawn_blocking(move || { let fs = TvixStoreFs::new( blob_service, diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 35f2bd3730e3..922cd3351548 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -31,7 +31,7 @@ pub async fn from_addr( uri: &str, blob_service: Arc, directory_service: Arc, -) -> Result, Error> { +) -> Result, Error> { let url = Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; @@ -40,7 +40,7 @@ pub async fn from_addr( if url.has_host() || !url.path().is_empty() { return Err(Error::StorageError("invalid url".to_string())); } - Arc::new(MemoryPathInfoService::new(blob_service, directory_service)) + Box::new(MemoryPathInfoService::new(blob_service, directory_service)) } else if url.scheme() == "sled" { // sled doesn't support host, and a path can be provided (otherwise // it'll live in memory only). @@ -57,12 +57,12 @@ pub async fn from_addr( // TODO: expose other parameters as URL parameters? if url.path().is_empty() { - return Ok(Arc::new( + return Ok(Box::new( SledPathInfoService::new_temporary(blob_service, directory_service) .map_err(|e| Error::StorageError(e.to_string()))?, )); } - return Ok(Arc::new( + return Ok(Box::new( SledPathInfoService::new(url.path(), blob_service, directory_service) .map_err(|e| Error::StorageError(e.to_string()))?, )); @@ -92,7 +92,7 @@ pub async fn from_addr( } } - Arc::new(nix_http_path_info_service) + Box::new(nix_http_path_info_service) } else if url.scheme().starts_with("grpc+") { // schemes starting with grpc+ go to the GRPCPathInfoService. // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. @@ -100,7 +100,7 @@ pub async fn from_addr( // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. let client = PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?); - Arc::new(GRPCPathInfoService::from_client(client)) + Box::new(GRPCPathInfoService::from_client(client)) } else { Err(Error::StorageError(format!( "unknown scheme: {}", diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index ef3b0b77ec54..9a8599bce26c 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -115,7 +115,6 @@ impl PathInfoService for GRPCPathInfoService { #[cfg(test)] mod tests { - use std::sync::Arc; use std::time::Duration; use tempfile::TempDir; @@ -151,11 +150,11 @@ mod tests { let mut server = tonic::transport::Server::builder(); let router = server.add_service( crate::proto::path_info_service_server::PathInfoServiceServer::new( - GRPCPathInfoServiceWrapper::from(Arc::new(MemoryPathInfoService::new( + GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new( gen_blob_service(), gen_directory_service(), )) - as Arc), + as Box), ), ); router.serve_with_incoming(uds_stream).await diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 06c4b2f1fd0a..19430aed381a 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -2,28 +2,31 @@ use crate::nar::RenderError; use crate::pathinfoservice::PathInfoService; use crate::proto; use futures::StreamExt; -use std::sync::Arc; +use std::ops::Deref; use tokio::task; use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Result, Status}; use tracing::{debug, instrument, warn}; use tvix_castore::proto as castorepb; -pub struct GRPCPathInfoServiceWrapper { - path_info_service: Arc, +pub struct GRPCPathInfoServiceWrapper { + inner: PS, // FUTUREWORK: allow exposing without allowing listing } -impl From> for GRPCPathInfoServiceWrapper { - fn from(value: Arc) -> Self { +impl GRPCPathInfoServiceWrapper { + pub fn new(path_info_service: PS) -> Self { Self { - path_info_service: value, + inner: path_info_service, } } } #[async_trait] -impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper { +impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper +where + PS: Deref + Send + Sync + 'static, +{ type ListStream = ReceiverStream>; #[instrument(skip(self))] @@ -38,7 +41,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra .to_vec() .try_into() .map_err(|_e| Status::invalid_argument("invalid output digest length"))?; - match self.path_info_service.get(digest).await { + match self.inner.get(digest).await { Ok(None) => Err(Status::not_found("PathInfo not found")), Ok(Some(path_info)) => Ok(Response::new(path_info)), Err(e) => { @@ -56,7 +59,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra // Store the PathInfo in the client. Clients MUST validate the data // they receive, so we don't validate additionally here. - match self.path_info_service.put(path_info).await { + match self.inner.put(path_info).await { Ok(path_info_new) => Ok(Response::new(path_info_new)), Err(e) => { warn!("failed to insert PathInfo: {}", e); @@ -74,7 +77,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra None => Err(Status::invalid_argument("no root node sent")), Some(root_node) => { let (nar_size, nar_sha256) = self - .path_info_service + .inner .calculate_nar(&root_node) .await .expect("error during nar calculation"); // TODO: handle error @@ -94,7 +97,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra ) -> Result, Status> { let (tx, rx) = tokio::sync::mpsc::channel(5); - let mut stream = self.path_info_service.list(); + let mut stream = self.inner.list(); let _task = task::spawn(async move { while let Some(e) = stream.next().await { diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs index c0b953d0f2e9..e8da7792cdb1 100644 --- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -21,7 +21,7 @@ fn gen_grpc_service( ) -> Arc>>> { let blob_service = gen_blob_service(); let directory_service = gen_directory_service(); - Arc::new(GRPCPathInfoServiceWrapper::from(gen_pathinfo_service( + Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service( blob_service, directory_service, ))) -- cgit 1.4.1