about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/store/src/bin/tvix-store.rs17
-rw-r--r--tvix/store/src/pathinfoservice/from_addr.rs12
-rw-r--r--tvix/store/src/pathinfoservice/grpc.rs5
-rw-r--r--tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs25
-rw-r--r--tvix/store/src/proto/tests/grpc_pathinfoservice.rs2
5 files changed, 37 insertions, 24 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 03ced18699ec..e4f2e0801b81 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -6,6 +6,7 @@ use nix_compat::store_path::StorePath;
 use std::io;
 use std::path::Path;
 use std::path::PathBuf;
+use std::sync::Arc;
 use tokio::task::JoinHandle;
 use tokio_listener::Listener;
 use tokio_listener::SystemOptions;
@@ -21,6 +22,7 @@ use tvix_castore::proto::GRPCBlobServiceWrapper;
 use tvix_castore::proto::GRPCDirectoryServiceWrapper;
 use tvix_castore::proto::NamedNode;
 use tvix_store::pathinfoservice;
+use tvix_store::pathinfoservice::PathInfoService;
 use tvix_store::proto::nar_info;
 use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
 use tvix_store::proto::GRPCPathInfoServiceWrapper;
@@ -217,9 +219,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
                 .add_service(DirectoryServiceServer::new(
                     GRPCDirectoryServiceWrapper::from(directory_service),
                 ))
-                .add_service(PathInfoServiceServer::new(
-                    GRPCPathInfoServiceWrapper::from(path_info_service),
-                ));
+                .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
+                    path_info_service,
+                )));
 
             #[cfg(feature = "tonic-reflection")]
             {
@@ -257,6 +259,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             )
             .await?;
 
+            // Arc the PathInfoService, as we clone it .
+            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+
             let tasks = paths
                 .into_iter()
                 .map(|path| {
@@ -356,6 +361,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             )
             .await?;
 
+            // Arc the PathInfoService, as TvixStoreFS requires Clone
+            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+
             let mut fuse_daemon = tokio::task::spawn_blocking(move || {
                 let f = TvixStoreFs::new(
                     blob_service,
@@ -397,6 +405,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             )
             .await?;
 
+            // Arc the PathInfoService, as TvixStoreFS requires Clone
+            let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+
             tokio::task::spawn_blocking(move || {
                 let fs = TvixStoreFs::new(
                     blob_service,
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
index 35f2bd3730e3..922cd3351548 100644
--- a/tvix/store/src/pathinfoservice/from_addr.rs
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -31,7 +31,7 @@ pub async fn from_addr(
     uri: &str,
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
-) -> Result<Arc<dyn PathInfoService>, Error> {
+) -> Result<Box<dyn PathInfoService>, Error> {
     let url =
         Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
 
@@ -40,7 +40,7 @@ pub async fn from_addr(
         if url.has_host() || !url.path().is_empty() {
             return Err(Error::StorageError("invalid url".to_string()));
         }
-        Arc::new(MemoryPathInfoService::new(blob_service, directory_service))
+        Box::new(MemoryPathInfoService::new(blob_service, directory_service))
     } else if url.scheme() == "sled" {
         // sled doesn't support host, and a path can be provided (otherwise
         // it'll live in memory only).
@@ -57,12 +57,12 @@ pub async fn from_addr(
         // TODO: expose other parameters as URL parameters?
 
         if url.path().is_empty() {
-            return Ok(Arc::new(
+            return Ok(Box::new(
                 SledPathInfoService::new_temporary(blob_service, directory_service)
                     .map_err(|e| Error::StorageError(e.to_string()))?,
             ));
         }
-        return Ok(Arc::new(
+        return Ok(Box::new(
             SledPathInfoService::new(url.path(), blob_service, directory_service)
                 .map_err(|e| Error::StorageError(e.to_string()))?,
         ));
@@ -92,7 +92,7 @@ pub async fn from_addr(
             }
         }
 
-        Arc::new(nix_http_path_info_service)
+        Box::new(nix_http_path_info_service)
     } else if url.scheme().starts_with("grpc+") {
         // schemes starting with grpc+ go to the GRPCPathInfoService.
         //   That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
@@ -100,7 +100,7 @@ pub async fn from_addr(
         // - In the case of non-unix sockets, there must be a host, but no path.
         // Constructing the channel is handled by tvix_castore::channel::from_url.
         let client = PathInfoServiceClient::new(tvix_castore::tonic::channel_from_url(&url).await?);
-        Arc::new(GRPCPathInfoService::from_client(client))
+        Box::new(GRPCPathInfoService::from_client(client))
     } else {
         Err(Error::StorageError(format!(
             "unknown scheme: {}",
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index ef3b0b77ec54..9a8599bce26c 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -115,7 +115,6 @@ impl PathInfoService for GRPCPathInfoService {
 
 #[cfg(test)]
 mod tests {
-    use std::sync::Arc;
     use std::time::Duration;
 
     use tempfile::TempDir;
@@ -151,11 +150,11 @@ mod tests {
             let mut server = tonic::transport::Server::builder();
             let router = server.add_service(
                 crate::proto::path_info_service_server::PathInfoServiceServer::new(
-                    GRPCPathInfoServiceWrapper::from(Arc::new(MemoryPathInfoService::new(
+                    GRPCPathInfoServiceWrapper::new(Box::new(MemoryPathInfoService::new(
                         gen_blob_service(),
                         gen_directory_service(),
                     ))
-                        as Arc<dyn PathInfoService>),
+                        as Box<dyn PathInfoService>),
                 ),
             );
             router.serve_with_incoming(uds_stream).await
diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
index 06c4b2f1fd0a..19430aed381a 100644
--- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs
@@ -2,28 +2,31 @@ use crate::nar::RenderError;
 use crate::pathinfoservice::PathInfoService;
 use crate::proto;
 use futures::StreamExt;
-use std::sync::Arc;
+use std::ops::Deref;
 use tokio::task;
 use tokio_stream::wrappers::ReceiverStream;
 use tonic::{async_trait, Request, Response, Result, Status};
 use tracing::{debug, instrument, warn};
 use tvix_castore::proto as castorepb;
 
-pub struct GRPCPathInfoServiceWrapper {
-    path_info_service: Arc<dyn PathInfoService>,
+pub struct GRPCPathInfoServiceWrapper<PS> {
+    inner: PS,
     // FUTUREWORK: allow exposing without allowing listing
 }
 
-impl From<Arc<dyn PathInfoService>> for GRPCPathInfoServiceWrapper {
-    fn from(value: Arc<dyn PathInfoService>) -> Self {
+impl<PS> GRPCPathInfoServiceWrapper<PS> {
+    pub fn new(path_info_service: PS) -> Self {
         Self {
-            path_info_service: value,
+            inner: path_info_service,
         }
     }
 }
 
 #[async_trait]
-impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper {
+impl<PS> proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS>
+where
+    PS: Deref<Target = dyn PathInfoService> + Send + Sync + 'static,
+{
     type ListStream = ReceiverStream<tonic::Result<proto::PathInfo, Status>>;
 
     #[instrument(skip(self))]
@@ -38,7 +41,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
                     .to_vec()
                     .try_into()
                     .map_err(|_e| Status::invalid_argument("invalid output digest length"))?;
-                match self.path_info_service.get(digest).await {
+                match self.inner.get(digest).await {
                     Ok(None) => Err(Status::not_found("PathInfo not found")),
                     Ok(Some(path_info)) => Ok(Response::new(path_info)),
                     Err(e) => {
@@ -56,7 +59,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
 
         // Store the PathInfo in the client. Clients MUST validate the data
         // they receive, so we don't validate additionally here.
-        match self.path_info_service.put(path_info).await {
+        match self.inner.put(path_info).await {
             Ok(path_info_new) => Ok(Response::new(path_info_new)),
             Err(e) => {
                 warn!("failed to insert PathInfo: {}", e);
@@ -74,7 +77,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
             None => Err(Status::invalid_argument("no root node sent")),
             Some(root_node) => {
                 let (nar_size, nar_sha256) = self
-                    .path_info_service
+                    .inner
                     .calculate_nar(&root_node)
                     .await
                     .expect("error during nar calculation"); // TODO: handle error
@@ -94,7 +97,7 @@ impl proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWra
     ) -> Result<Response<Self::ListStream>, Status> {
         let (tx, rx) = tokio::sync::mpsc::channel(5);
 
-        let mut stream = self.path_info_service.list();
+        let mut stream = self.inner.list();
 
         let _task = task::spawn(async move {
             while let Some(e) = stream.next().await {
diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
index c0b953d0f2e9..e8da7792cdb1 100644
--- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
+++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs
@@ -21,7 +21,7 @@ fn gen_grpc_service(
 ) -> Arc<dyn GRPCPathInfoService<ListStream = ReceiverStream<Result<PathInfo, tonic::Status>>>> {
     let blob_service = gen_blob_service();
     let directory_service = gen_directory_service();
-    Arc::new(GRPCPathInfoServiceWrapper::from(gen_pathinfo_service(
+    Arc::new(GRPCPathInfoServiceWrapper::new(gen_pathinfo_service(
         blob_service,
         directory_service,
     )))