diff options
Diffstat (limited to 'tvix/store/src/nar')
-rw-r--r-- | tvix/store/src/nar/grpc_nar_calculation_service.rs | 69 | ||||
-rw-r--r-- | tvix/store/src/nar/mod.rs | 35 | ||||
-rw-r--r-- | tvix/store/src/nar/non_caching_calculation_service.rs | 37 | ||||
-rw-r--r-- | tvix/store/src/nar/renderer.rs | 136 |
4 files changed, 277 insertions, 0 deletions
diff --git a/tvix/store/src/nar/grpc_nar_calculation_service.rs b/tvix/store/src/nar/grpc_nar_calculation_service.rs new file mode 100644 index 000000000000..429593743914 --- /dev/null +++ b/tvix/store/src/nar/grpc_nar_calculation_service.rs @@ -0,0 +1,69 @@ +use super::NARCalculationService; +use crate::proto; +use tonic::transport::Channel; +use tonic::Status; + +/// A NAR calculation service which asks a remote tvix-store for NAR calculation +/// (via the gRPC PathInfoService). +#[derive(Clone)] +pub struct GRPCNARCalculationService { + /// A handle into the active tokio runtime. Necessary to spawn tasks. + tokio_handle: tokio::runtime::Handle, + + /// The internal reference to a gRPC client. + /// Cloning it is cheap, and it internally handles concurrent requests. + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, +} + +impl GRPCNARCalculationService { + /// construct a new [GRPCNARCalculationService], by passing a handle to the + /// tokio runtime, and a gRPC client. + pub fn new( + tokio_handle: tokio::runtime::Handle, + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle, + grpc_client, + } + } + + /// construct a [GRPCNARCalculationService], from a [proto::path_info_service_client::PathInfoServiceClient<Channel>]. + /// panics if called outside the context of a tokio runtime. + pub fn from_client( + grpc_client: proto::path_info_service_client::PathInfoServiceClient<Channel>, + ) -> Self { + Self { + tokio_handle: tokio::runtime::Handle::current(), + grpc_client, + } + } +} + +impl NARCalculationService for GRPCNARCalculationService { + fn calculate_nar( + &self, + root_node: &proto::node::Node, + ) -> Result<(u64, [u8; 32]), super::RenderError> { + // Get a new handle to the gRPC client, and copy the root node. + let mut grpc_client = self.grpc_client.clone(); + let root_node = root_node.clone(); + + let task: tokio::task::JoinHandle<Result<_, Status>> = + self.tokio_handle.spawn(async move { + Ok(grpc_client + .calculate_nar(proto::Node { + node: Some(root_node), + }) + .await? + .into_inner()) + }); + + match self.tokio_handle.block_on(task).unwrap() { + Ok(resp) => Ok((resp.nar_size, resp.nar_sha256.to_vec().try_into().unwrap())), + Err(e) => Err(super::RenderError::StoreError(crate::Error::StorageError( + e.to_string(), + ))), + } + } +} diff --git a/tvix/store/src/nar/mod.rs b/tvix/store/src/nar/mod.rs new file mode 100644 index 000000000000..a29cc5451bae --- /dev/null +++ b/tvix/store/src/nar/mod.rs @@ -0,0 +1,35 @@ +use crate::{proto, B3Digest}; +use data_encoding::BASE64; +use thiserror::Error; + +mod grpc_nar_calculation_service; +mod non_caching_calculation_service; +mod renderer; + +pub use grpc_nar_calculation_service::GRPCNARCalculationService; +pub use non_caching_calculation_service::NonCachingNARCalculationService; +pub use renderer::NARRenderer; + +/// Errors that can encounter while rendering NARs. +#[derive(Debug, Error)] +pub enum RenderError { + #[error("failure talking to a backing store client: {0}")] + StoreError(crate::Error), + + #[error("unable to find directory {}, referred from {}", .0, .1)] + DirectoryNotFound(B3Digest, String), + + #[error("unable to find blob {}, referred from {}", BASE64.encode(.0), .1)] + BlobNotFound([u8; 32], String), + + #[error("unexpected size in metadata for blob {}, referred from {} returned, expected {}, got {}", BASE64.encode(.0), .1, .2, .3)] + UnexpectedBlobMeta([u8; 32], String, u32, u32), + + #[error("failure using the NAR writer: {0}")] + NARWriterError(std::io::Error), +} + +/// The base trait for something calculating NARs, and returning their size and sha256. +pub trait NARCalculationService { + fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), RenderError>; +} diff --git a/tvix/store/src/nar/non_caching_calculation_service.rs b/tvix/store/src/nar/non_caching_calculation_service.rs new file mode 100644 index 000000000000..8a080cb4df5e --- /dev/null +++ b/tvix/store/src/nar/non_caching_calculation_service.rs @@ -0,0 +1,37 @@ +use count_write::CountWrite; +use sha2::{Digest, Sha256}; + +use crate::blobservice::BlobService; +use crate::directoryservice::DirectoryService; +use crate::proto; + +use super::renderer::NARRenderer; +use super::{NARCalculationService, RenderError}; + +/// A NAR calculation service which simply renders the whole NAR whenever +/// we ask for the calculation. +#[derive(Clone)] +pub struct NonCachingNARCalculationService<BS: BlobService, DS: DirectoryService> { + nar_renderer: NARRenderer<BS, DS>, +} + +impl<BS: BlobService, DS: DirectoryService> NonCachingNARCalculationService<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { + Self { + nar_renderer: NARRenderer::new(blob_service, directory_service), + } + } +} + +impl<BS: BlobService, DS: DirectoryService> NARCalculationService + for NonCachingNARCalculationService<BS, DS> +{ + fn calculate_nar(&self, root_node: &proto::node::Node) -> Result<(u64, [u8; 32]), RenderError> { + let h = Sha256::new(); + let mut cw = CountWrite::from(h); + + self.nar_renderer.write_nar(&mut cw, root_node)?; + + Ok((cw.count(), cw.into_inner().finalize().into())) + } +} diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs new file mode 100644 index 000000000000..c10f2ddf52fa --- /dev/null +++ b/tvix/store/src/nar/renderer.rs @@ -0,0 +1,136 @@ +use super::RenderError; +use crate::{ + blobservice::BlobService, + directoryservice::DirectoryService, + proto::{self, NamedNode}, + B3Digest, +}; +use nix_compat::nar; +use std::io::{self, BufReader}; +use tracing::warn; + +/// A NAR renderer, using a blob_service, chunk_service and directory_service +/// to render a NAR to a writer. +#[derive(Clone)] +pub struct NARRenderer<BS: BlobService, DS: DirectoryService> { + blob_service: BS, + directory_service: DS, +} + +impl<BS: BlobService, DS: DirectoryService> NARRenderer<BS, DS> { + pub fn new(blob_service: BS, directory_service: DS) -> Self { + Self { + blob_service, + directory_service, + } + } + + /// Consumes a [proto::node::Node] pointing to the root of a (store) path, + /// and writes the contents in NAR serialization to the passed + /// [std::io::Write]. + /// + /// It uses the different clients in the struct to perform the necessary + /// lookups as it traverses the structure. + pub fn write_nar<W: std::io::Write>( + &self, + w: &mut W, + proto_root_node: &proto::node::Node, + ) -> Result<(), RenderError> { + // Initialize NAR writer + let nar_root_node = nar::writer::open(w).map_err(RenderError::NARWriterError)?; + + self.walk_node(nar_root_node, proto_root_node) + } + + /// Process an intermediate node in the structure. + /// This consumes the node. + fn walk_node( + &self, + nar_node: nar::writer::Node, + proto_node: &proto::node::Node, + ) -> Result<(), RenderError> { + match proto_node { + proto::node::Node::Symlink(proto_symlink_node) => { + nar_node + .symlink(&proto_symlink_node.target) + .map_err(RenderError::NARWriterError)?; + } + proto::node::Node::File(proto_file_node) => { + let digest = B3Digest::from_vec(proto_file_node.digest.clone()).map_err(|_e| { + warn!( + file_node = ?proto_file_node, + "invalid digest length in file node", + ); + + RenderError::StoreError(crate::Error::StorageError( + "invalid digest len in file node".to_string(), + )) + })?; + + let mut blob_reader = match self + .blob_service + .open_read(&digest) + .map_err(RenderError::StoreError)? + { + Some(blob_reader) => Ok(BufReader::new(blob_reader)), + None => Err(RenderError::NARWriterError(io::Error::new( + io::ErrorKind::NotFound, + format!("blob with digest {} not found", &digest), + ))), + }?; + + nar_node + .file( + proto_file_node.executable, + proto_file_node.size.into(), + &mut blob_reader, + ) + .map_err(RenderError::NARWriterError)?; + } + proto::node::Node::Directory(proto_directory_node) => { + let digest = + B3Digest::from_vec(proto_directory_node.digest.to_vec()).map_err(|_e| { + RenderError::StoreError(crate::Error::StorageError( + "invalid digest len in directory node".to_string(), + )) + })?; + + // look it up with the directory service + let resp = self + .directory_service + .get(&digest) + .map_err(RenderError::StoreError)?; + + match resp { + // if it's None, that's an error! + None => { + return Err(RenderError::DirectoryNotFound( + digest, + proto_directory_node.name.to_owned(), + )) + } + Some(proto_directory) => { + // start a directory node + let mut nar_node_directory = + nar_node.directory().map_err(RenderError::NARWriterError)?; + + // for each node in the directory, create a new entry with its name, + // and then invoke walk_node on that entry. + for proto_node in proto_directory.nodes() { + let child_node = nar_node_directory + .entry(proto_node.get_name()) + .map_err(RenderError::NARWriterError)?; + self.walk_node(child_node, &proto_node)?; + } + + // close the directory + nar_node_directory + .close() + .map_err(RenderError::NARWriterError)?; + } + } + } + } + Ok(()) + } +} |