diff options
-rw-r--r-- | tvix/cli/src/main.rs | 3 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 20 | ||||
-rw-r--r-- | tvix/store/src/nar/mod.rs | 2 | ||||
-rw-r--r-- | tvix/store/src/nar/renderer.rs | 4 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/grpc.rs | 32 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/memory.rs | 32 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/mod.rs | 5 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/sled.rs | 47 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs | 36 | ||||
-rw-r--r-- | tvix/store/src/proto/tests/grpc_pathinfoservice.rs | 12 | ||||
-rw-r--r-- | tvix/store/src/tests/nar_renderer.rs | 14 | ||||
-rw-r--r-- | tvix/store/src/tests/utils.rs | 7 |
12 files changed, 147 insertions, 67 deletions
diff --git a/tvix/cli/src/main.rs b/tvix/cli/src/main.rs index 0a1794006de2..76e6d66298f5 100644 --- a/tvix/cli/src/main.rs +++ b/tvix/cli/src/main.rs @@ -73,7 +73,8 @@ fn interpret(code: &str, path: Option<PathBuf>, args: &Args, explain: bool) -> b let blob_service = MemoryBlobService::default(); let directory_service = MemoryDirectoryService::default(); - let path_info_service = MemoryPathInfoService::default(); // TODO: update to pass in blob and directory svc + let path_info_service = + MemoryPathInfoService::new(Box::new(blob_service.clone()), directory_service.clone()); eval.io_handle = Box::new(tvix_io::TvixIO::new( known_paths.clone(), diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 49c8c9ec34eb..97e2447ec82a 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -101,9 +101,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { // initialize stores let blob_service = SledBlobService::new("blobs.sled".into())?; let boxed_blob_service: Box<dyn BlobService> = Box::new(blob_service.clone()); - let boxed_blob_service2: Box<dyn BlobService> = Box::new(blob_service); + let boxed_blob_service2: Box<dyn BlobService> = Box::new(blob_service.clone()); let directory_service = SledDirectoryService::new("directories.sled".into())?; - let path_info_service = SledPathInfoService::new("pathinfo.sled".into())?; + let path_info_service = SledPathInfoService::new( + "pathinfo.sled".into(), + boxed_blob_service, + directory_service.clone(), + )?; let listen_address = listen_address .unwrap_or_else(|| "[::]:8000".to_string()) @@ -115,16 +119,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { #[allow(unused_mut)] let mut router = server .add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::from( - boxed_blob_service, + boxed_blob_service2, ))) .add_service(DirectoryServiceServer::new( - GRPCDirectoryServiceWrapper::from(directory_service.clone()), + GRPCDirectoryServiceWrapper::from(directory_service), )) - .add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new( - path_info_service, - boxed_blob_service2, - directory_service, - ))); + .add_service(PathInfoServiceServer::new( + GRPCPathInfoServiceWrapper::from(path_info_service), + )); #[cfg(feature = "reflection")] { diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs index c73e610f4ecb..13e4f7bd93f6 100644 --- a/tvix/store/src/nar/mod.rs +++ b/tvix/store/src/nar/mod.rs @@ -4,7 +4,7 @@ use thiserror::Error; mod renderer; pub use renderer::calculate_size_and_sha256; -pub use renderer::writer_nar; +pub use renderer::write_nar; /// Errors that can encounter while rendering NARs. #[derive(Debug, Error)] diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index 6ea76e1429be..80bf9bc6d816 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -21,7 +21,7 @@ pub fn calculate_size_and_sha256<DS: DirectoryService + Clone>( let h = Sha256::new(); let mut cw = CountWrite::from(h); - writer_nar(&mut cw, root_node, blob_service, directory_service)?; + write_nar(&mut cw, root_node, blob_service, directory_service)?; Ok((cw.count(), cw.into_inner().finalize().into())) } @@ -30,7 +30,7 @@ pub fn calculate_size_and_sha256<DS: DirectoryService + Clone>( /// and uses the passed blob_service and directory_service to /// perform the necessary lookups as it traverses the structure. /// The contents in NAR serialization are writen to the passed [std::io::Write]. -pub fn writer_nar<W: std::io::Write, DS: DirectoryService + Clone>( +pub fn write_nar<W: std::io::Write, DS: DirectoryService + Clone>( w: &mut W, proto_root_node: &proto::node::Node, blob_service: &Box<dyn BlobService>, diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs index 6bb774c668a3..871c3b592256 100644 --- a/tvix/store/src/pathinfoservice/grpc.rs +++ b/tvix/store/src/pathinfoservice/grpc.rs @@ -78,4 +78,36 @@ impl PathInfoService for GRPCPathInfoService { .block_on(task)? .map_err(|e| crate::Error::StorageError(e.to_string())) } + + fn calculate_nar( + &self, + root_node: &proto::node::Node, + ) -> Result<(u64, [u8; 32]), crate::Error> { + // Get a new handle to the gRPC client. + let mut grpc_client = self.grpc_client.clone(); + let root_node = root_node.clone(); + + let task: tokio::task::JoinHandle<Result<_, Status>> = + self.tokio_handle.spawn(async move { + let path_info = grpc_client + .calculate_nar(proto::Node { + node: Some(root_node), + }) + .await? + .into_inner(); + Ok(path_info) + }); + + let resp = self + .tokio_handle + .block_on(task)? + .map_err(|e| crate::Error::StorageError(e.to_string()))?; + + let nar_sha256: [u8; 32] = resp + .nar_sha256 + .try_into() + .map_err(|_e| crate::Error::StorageError("invalid digest length".to_string()))?; + + Ok((resp.nar_size, nar_sha256)) + } } diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs index d0ff1976efab..5b48ed9efa34 100644 --- a/tvix/store/src/pathinfoservice/memory.rs +++ b/tvix/store/src/pathinfoservice/memory.rs @@ -1,16 +1,31 @@ use super::PathInfoService; -use crate::{proto, Error}; +use crate::{ + blobservice::BlobService, directoryservice::DirectoryService, nar::calculate_size_and_sha256, + proto, Error, +}; use std::{ collections::HashMap, sync::{Arc, RwLock}, }; -#[derive(Default)] -pub struct MemoryPathInfoService { +pub struct MemoryPathInfoService<DS: DirectoryService> { db: Arc<RwLock<HashMap<[u8; 20], proto::PathInfo>>>, + + blob_service: Box<dyn BlobService>, + directory_service: DS, +} + +impl<DS: DirectoryService> MemoryPathInfoService<DS> { + pub fn new(blob_service: Box<dyn BlobService>, directory_service: DS) -> Self { + Self { + db: Default::default(), + blob_service, + directory_service, + } + } } -impl PathInfoService for MemoryPathInfoService { +impl<DS: DirectoryService + Clone> PathInfoService for MemoryPathInfoService<DS> { fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { let db = self.db.read().unwrap(); @@ -38,4 +53,13 @@ impl PathInfoService for MemoryPathInfoService { } } } + + fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { + calculate_size_and_sha256( + root_node, + &self.blob_service, + self.directory_service.clone(), + ) + .map_err(|e| Error::StorageError(e.to_string())) + } } diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs index ddede5851575..2483909a1190 100644 --- a/tvix/store/src/pathinfoservice/mod.rs +++ b/tvix/store/src/pathinfoservice/mod.rs @@ -18,5 +18,8 @@ pub trait PathInfoService { /// invalid messages. fn put(&self, path_info: proto::PathInfo) -> Result<proto::PathInfo, Error>; - // TODO: add default impl for nar calculation here, and override from GRPC client! + /// Return the nar size and nar sha256 digest for a given root node. + /// This can be used to calculate NAR-based output paths, + /// and implementations are encouraged to cache it. + fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error>; } diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs index 8776ebcbc106..98ea60ff4440 100644 --- a/tvix/store/src/pathinfoservice/sled.rs +++ b/tvix/store/src/pathinfoservice/sled.rs @@ -1,5 +1,8 @@ use super::PathInfoService; -use crate::{proto, Error}; +use crate::{ + blobservice::BlobService, directoryservice::DirectoryService, nar::calculate_size_and_sha256, + proto, Error, +}; use prost::Message; use std::path::PathBuf; use tracing::warn; @@ -8,28 +11,45 @@ use tracing::warn; /// /// The PathInfo messages are stored as encoded protos, and keyed by their output hash, /// as that's currently the only request type available. -#[derive(Clone)] -pub struct SledPathInfoService { +pub struct SledPathInfoService<DS: DirectoryService> { db: sled::Db, + + blob_service: Box<dyn BlobService>, + directory_service: DS, } -impl SledPathInfoService { - pub fn new(p: PathBuf) -> Result<Self, sled::Error> { +impl<DS: DirectoryService> SledPathInfoService<DS> { + pub fn new( + p: PathBuf, + blob_service: Box<dyn BlobService>, + directory_service: DS, + ) -> Result<Self, sled::Error> { let config = sled::Config::default().use_compression(true).path(p); let db = config.open()?; - Ok(Self { db }) + Ok(Self { + db, + blob_service, + directory_service, + }) } - pub fn new_temporary() -> Result<Self, sled::Error> { + pub fn new_temporary( + blob_service: Box<dyn BlobService>, + directory_service: DS, + ) -> Result<Self, sled::Error> { let config = sled::Config::default().temporary(true); let db = config.open()?; - Ok(Self { db }) + Ok(Self { + db, + blob_service, + directory_service, + }) } } -impl PathInfoService for SledPathInfoService { +impl<DS: DirectoryService + Clone> PathInfoService for SledPathInfoService<DS> { fn get(&self, digest: [u8; 20]) -> Result<Option<proto::PathInfo>, Error> { match self.db.get(digest) { Ok(None) => Ok(None), @@ -73,4 +93,13 @@ impl PathInfoService for SledPathInfoService { }, } } + + fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), Error> { + calculate_size_and_sha256( + root_node, + &self.blob_service, + self.directory_service.clone(), + ) + .map_err(|e| Error::StorageError(e.to_string())) + } } diff --git a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs index c070b883fa16..645f4aa60556 100644 --- a/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_pathinfoservice_wrapper.rs @@ -1,36 +1,24 @@ -use crate::blobservice::BlobService; -use crate::directoryservice::DirectoryService; -use crate::nar::{calculate_size_and_sha256, RenderError}; +use crate::nar::RenderError; use crate::pathinfoservice::PathInfoService; use crate::proto; use tonic::{async_trait, Request, Response, Result, Status}; use tracing::{instrument, warn}; -pub struct GRPCPathInfoServiceWrapper<PS: PathInfoService, DS: DirectoryService> { +pub struct GRPCPathInfoServiceWrapper<PS: PathInfoService> { path_info_service: PS, - blob_service: Box<dyn BlobService>, - directory_service: DS, } -impl<PS: PathInfoService, DS: DirectoryService> GRPCPathInfoServiceWrapper<PS, DS> { - pub fn new( - path_info_service: PS, - blob_service: Box<dyn BlobService>, - directory_service: DS, - ) -> Self { +impl<PS: PathInfoService> From<PS> for GRPCPathInfoServiceWrapper<PS> { + fn from(value: PS) -> Self { Self { - path_info_service, - blob_service, - directory_service, + path_info_service: value, } } } #[async_trait] -impl< - PS: PathInfoService + Send + Sync + 'static, - DS: DirectoryService + Send + Sync + Clone + 'static, - > proto::path_info_service_server::PathInfoService for GRPCPathInfoServiceWrapper<PS, DS> +impl<PS: PathInfoService + Send + Sync + 'static> proto::path_info_service_server::PathInfoService + for GRPCPathInfoServiceWrapper<PS> { #[instrument(skip(self))] async fn get( @@ -78,12 +66,10 @@ impl< match request.into_inner().node { None => Err(Status::invalid_argument("no root node sent")), Some(root_node) => { - let (nar_size, nar_sha256) = calculate_size_and_sha256( - &root_node, - &self.blob_service, - self.directory_service.clone(), - ) - .expect("error during nar calculation"); // TODO: handle error + let (nar_size, nar_sha256) = self + .path_info_service + .calculate_nar(&root_node) + .expect("error during nar calculation"); // TODO: handle error Ok(Response::new(proto::CalculateNarResponse { nar_size, diff --git a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs index dbcdc5ced056..8b7038ccbc35 100644 --- a/tvix/store/src/proto/tests/grpc_pathinfoservice.rs +++ b/tvix/store/src/proto/tests/grpc_pathinfoservice.rs @@ -5,7 +5,9 @@ use crate::proto::GRPCPathInfoServiceWrapper; use crate::proto::PathInfo; use crate::proto::{GetPathInfoRequest, Node, SymlinkNode}; use crate::tests::fixtures::DUMMY_OUTPUT_HASH; -use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; +use crate::tests::utils::gen_blob_service; +use crate::tests::utils::gen_directory_service; +use crate::tests::utils::gen_pathinfo_service; use tonic::Request; /// generates a GRPCPathInfoService out of blob, directory and pathinfo services. @@ -14,11 +16,9 @@ use tonic::Request; /// It uses the NonCachingNARCalculationService NARCalculationService to /// calculate NARs. fn gen_grpc_service() -> impl GRPCPathInfoService { - GRPCPathInfoServiceWrapper::new( - gen_pathinfo_service(), - gen_blob_service(), - gen_directory_service(), - ) + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + GRPCPathInfoServiceWrapper::from(gen_pathinfo_service(blob_service, directory_service)) } /// Trying to get a non-existent PathInfo should return a not found error. diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs index 48c07e53c93e..20cca1e77848 100644 --- a/tvix/store/src/tests/nar_renderer.rs +++ b/tvix/store/src/tests/nar_renderer.rs @@ -1,6 +1,6 @@ use crate::directoryservice::DirectoryService; use crate::nar::calculate_size_and_sha256; -use crate::nar::writer_nar; +use crate::nar::write_nar; use crate::proto::DirectoryNode; use crate::proto::FileNode; use crate::proto::SymlinkNode; @@ -13,7 +13,7 @@ use std::io; fn single_symlink() { let mut buf: Vec<u8> = vec![]; - writer_nar( + write_nar( &mut buf, &crate::proto::node::Node::Symlink(SymlinkNode { name: "doesntmatter".to_string(), @@ -33,7 +33,7 @@ fn single_symlink() { fn single_file_missing_blob() { let mut buf: Vec<u8> = vec![]; - let e = writer_nar( + let e = write_nar( &mut buf, &crate::proto::node::Node::File(FileNode { name: "doesntmatter".to_string(), @@ -74,7 +74,7 @@ fn single_file_wrong_blob_size() { { let mut buf: Vec<u8> = vec![]; - let e = writer_nar( + let e = write_nar( &mut buf, &crate::proto::node::Node::File(FileNode { name: "doesntmatter".to_string(), @@ -99,7 +99,7 @@ fn single_file_wrong_blob_size() { { let mut buf: Vec<u8> = vec![]; - let e = writer_nar( + let e = write_nar( &mut buf, &crate::proto::node::Node::File(FileNode { name: "doesntmatter".to_string(), @@ -136,7 +136,7 @@ fn single_file() { let mut buf: Vec<u8> = vec![]; - writer_nar( + write_nar( &mut buf, &crate::proto::node::Node::File(FileNode { name: "doesntmatter".to_string(), @@ -174,7 +174,7 @@ fn test_complicated() { let mut buf: Vec<u8> = vec![]; - writer_nar( + write_nar( &mut buf, &crate::proto::node::Node::Directory(DirectoryNode { name: "doesntmatter".to_string(), diff --git a/tvix/store/src/tests/utils.rs b/tvix/store/src/tests/utils.rs index ec379bddcf77..6905fe56b5d7 100644 --- a/tvix/store/src/tests/utils.rs +++ b/tvix/store/src/tests/utils.rs @@ -12,6 +12,9 @@ pub fn gen_directory_service() -> impl DirectoryService + Send + Sync + Clone + MemoryDirectoryService::default() } -pub fn gen_pathinfo_service() -> impl PathInfoService { - MemoryPathInfoService::default() +pub fn gen_pathinfo_service<DS: DirectoryService + Clone>( + blob_service: Box<dyn BlobService>, + directory_service: DS, +) -> impl PathInfoService { + MemoryPathInfoService::new(blob_service, directory_service) } |