diff options
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 17 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/from_addr.rs | 12 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 5 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs | 25 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_pathinfoservice.rs | 2 |
5 files changed, 37 insertions, 24 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 03ced18699ec..e4f2e0801b81 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -6,6 +6,7 @@ use nix_compat::store_path::StorePath; use std::io; use std::path::Path; use std::path::PathBuf; +use std::sync::Arc; use tokio::task::JoinHandle; use tokio_listener::Listener; use tokio_listener::SystemOptions; @@ -21,6 +22,7 @@ use tvix_castore::proto::GRPCBlobServiceWrapper; use tvix_castore::proto::GRPCDirectoryServiceWrapper; use tvix_castore::proto::NamedNode; use tvix_store::pathinfoservice; +use tvix_store::pathinfoservice::PathInfoService; use tvix_store::proto::nar_info; use tvix_store::proto::path_info_service_server::PathInfoServiceServer; use tvix_store::proto::GRPCPathInfoServiceWrapper; @@ -217,9 +219,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { .add_service(DirectoryServiceServer::new( GRPCDirectoryServiceWrapper::from(directory_service), )) - .add_service(PathInfoServiceServer::new( - GRPCPathInfoServiceWrapper::from(path_info_service), - )); + .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( + path_info_service, + ))); #[cfg(feature = "tonic-reflection")] { @@ -257,6 +259,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { ) .await?; + // Arc the PathInfoService, as we clone it . + let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + let tasks = paths .into_iter() .map(|path| { @@ -356,6 +361,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { ) .await?; + // Arc the PathInfoService, as TvixStoreFS requires Clone + let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + let mut fuse_daemon = tokio::task::spawn_blocking(move || { let f = TvixStoreFs::new( blob_service, @@ -397,6 +405,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { ) .await?; + // Arc the PathInfoService, as TvixStoreFS requires Clone + let path_info_service: Arc<dyn PathInfoService> = path_info_service.into(); + tokio::task::spawn_blocking(move || { let fs = TvixStoreFs::new( blob_service, diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs index 35f2bd3730e3..922cd3351548 100644 --- a/tvix/store/src/pathinfoservice/from_addr.rs +++ b/tvix/store/src/pathinfoservice/from_addr.rs @@ -31,7 +31,7 @@ pub async fn from_addr( uri: &str, blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, -) -> Result<Arc<dyn PathInfoService>, Error> { +) -> Result<Box<dyn PathInfoService>, Error> { let url = Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; @@ -40,7 +40,7 @@ pub async fn from_addr( if url.has_host() || !url.path().is_empty() { return Err(Error::StorageError("invalid url".to_string())); } - Arc::new(MemoryPathInfoService::new(blob_service, directory_service)) + Box::new(MemoryPathInfoService::new(blob_service, directory_service)) } else if url.scheme() == "sled" { // sled doesn't support host, and a path can be provided (otherwise // it'll live in memory only). @@ -57,12 +57,12 @@ pub async fn from_addr( // TODO: expose other parameters as URL parameters? if url.path().is_empty() { - return Ok(Arc::new( + return Ok(Box::new( SledPathInfoService::new_temporary(blob_service, directory_service) .map_err(|e| Error::StorageError(e.to_string()))?, )); } - return Ok(Arc::new( + return Ok(Box::new( SledPathInfoService::new(url.path(), blob_service, directory_service) .map_err(|e| Error::StorageError(e.to_string()))?, )); @@ -92,7 +92,7 @@ pub async fn from_addr( } } - Arc::new(nix_http_path_info_service) + Box::new(nix_http_path_info_service) } else if url.scheme().starts_with("grpc+") { // schemes starting with grpc+ go to the GRPCPathInfoService. // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. @@ -100,7 +100,7 @@ pub async fn from_addr( // - In the case of non-unix sockets, there must be a host, but no path. // Constructing the channel is handled by tvix_castore::channel::from_url. let client = PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?); - Arc::new(GRPCPathInfoService::from_client(client)) + Box::new(GRPCPathInfoService::from_client(client)) } else { Err(Error::StorageError(format!( "unknown scheme: {}", diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index ef3b0b77ec54..9a8599bce26c 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -115,7 +115,6 @@ impl PathInfoService for GRPCPathInfoService { #[cfg(test)] mod tests { - use std::sync::Arc; use std::time::Duration; use tempfile::TempDir; @@ -151,11 +150,11 @@ mod tests { let mut server = tonic::transport::Server::builder(); let router = server.add_service( crate::proto::path_info_service_server::PathInfoServiceServer::new( - GRPCPathInfoServiceWrapper::from(Arc::new(MemoryPathInfoService::new( + GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new( gen_blob_service(), gen_directory_service(), )) - as Arc<dyn PathInfoService>), + as Box<dyn PathInfoService>), ), ); router.serve_with_incoming(uds_stream).await diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index 06c4b2f1fd0a..19430aed381a 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -2,28 +2,31 @@ use crate::nar::RenderError; use crate::pathinfoservice::PathInfoService; use crate::proto; use futures::StreamExt; -use std::sync::Arc; +use std::ops::Deref; use tokio::task; use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Result, Status}; use tracing::{debug, instrument, warn}; use tvix_castore::proto as castorepb; -pub struct GRPCPathInfoServiceWrapper { - path_info_service: Arc<dyn PathInfoService>, +pub struct GRPCPathInfoServiceWrapper<PS> { + inner: PS, // FUTUREWORK: allow exposing without allowing listing } -impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper { - fn from(value: Arc<dyn PathInfoService>) -> Self { +impl<PS> GRPCPathInfoServiceWrapper<PS> { + pub fn new(path_info_service: PS) -> Self { Self { - path_info_service: value, + inner: path_info_service, } } } #[async_trait] -impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper { +impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS> +where + PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static, +{ type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>; #[instrument(skip(self))] @@ -38,7 +41,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra .to_vec() .try_into() .map_err(|_e| Status::invalid_argument("invalid output digest length"))?; - match self.path_info_service.get(digest).await { + match self.inner.get(digest).await { Ok(None) => Err(Status::not_found("PathInfo not found")), Ok(Some(path_info)) => Ok(Response::new(path_info)), Err(e) => { @@ -56,7 +59,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra // Store the PathInfo in the client. Clients MUST validate the data // they receive, so we don't validate additionally here. - match self.path_info_service.put(path_info).await { + match self.inner.put(path_info).await { Ok(path_info_new) => Ok(Response::new(path_info_new)), Err(e) => { warn!("failed to insert PathInfo: {}", e); @@ -74,7 +77,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra None => Err(Status::invalid_argument("no root node sent")), Some(root_node) => { let (nar_size, nar_sha256) = self - .path_info_service + .inner .calculate_nar(&root_node) .await .expect("error during nar calculation"); // TODO: handle error @@ -94,7 +97,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra ) -> Result<Response<Self::ListStream>, Status> { let (tx, rx) = tokio::sync::mpsc::channel(5); - let mut stream = self.path_info_service.list(); + let mut stream = self.inner.list(); let _task = task::spawn(async move { while let Some(e) = stream.next().await { diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs index c0b953d0f2e9..e8da7792cdb1 100644 --- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -21,7 +21,7 @@ fn gen_grpc_service( ) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> { let blob_service = gen_blob_service(); let directory_service = gen_directory_service(); - Arc::new(GRPCPathInfoServiceWrapper::from(gen_pathinfo_service( + Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service( blob_service, directory_service, ))) |