From 616fa4476f93e1782e68dc713e9e8cb77a426c7d Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Thu, 11 May 2023 15:49:01 +0300 Subject: refactor(tvix/store): remove ChunkService Whether chunking is involved or not, is an implementation detail of each Blobstore. Consumers of a whole blob shouldn't need to worry about that. It currently is not visible in the gRPC interface either. It shouldn't bleed into everything. Let the BlobService trait provide `open_read` and `open_write` methods, which return handles providing io::Read or io::Write, and leave the details up to the implementation. This means, our custom BlobReader module can go away, and all the chunking bits in there, too. In the future, we might still want to add more chunking-aware syncing, but as a syncing strategy some stores can expose, not as a fundamental protocol component. This currently needs "SyncReadIntoAsyncRead", taken and vendored in from https://github.com/tokio-rs/tokio/pull/5669. It provides a AsyncRead for a sync Read, which is necessary to connect our (sync) BlobReader interface to a GRPC server implementation. As an alternative, we could also make the BlobReader itself async, and let consumers of the trait (EvalIO) deal with the async-ness, but this is less of a change for now. In terms of vendoring, I initially tried to move our tokio crate to these commits, but ended up in version incompatibilities, so let's vendor it in for now. Change-Id: I5969ebbc4c0e1ceece47981be3b9e7cfb3f59ad0 Reviewed-on: https://cl.tvl.fyi/c/depot/+/8551 Tested-by: BuildkiteCI Reviewed-by: tazjin --- .../src/nar/non_caching_calculation_service.rs | 21 +++---- tvix/store/src/nar/renderer.rs | 70 +++++++--------------- 2 files changed, 28 insertions(+), 63 deletions(-) (limited to 'tvix/store/src/nar') diff --git a/tvix/store/src/nar/non_caching_calculation_service.rs b/tvix/store/src/nar/non_caching_calculation_service.rs index f77f0b30d61f..94dd51bc6a7f 100644 --- a/tvix/store/src/nar/non_caching_calculation_service.rs +++ b/tvix/store/src/nar/non_caching_calculation_service.rs @@ -2,7 +2,6 @@ use count_write::CountWrite; use sha2::{Digest, Sha256}; use crate::blobservice::BlobService; -use crate::chunkservice::ChunkService; use crate::directoryservice::DirectoryService; use crate::proto; @@ -12,26 +11,20 @@ use super::{NARCalculationService, RenderError}; /// A NAR calculation service which simply renders the whole NAR whenever /// we ask for the calculation. #[derive(Clone)] -pub struct NonCachingNARCalculationService< - BS: BlobService, - CS: ChunkService + Clone, - DS: DirectoryService, -> { - nar_renderer: NARRenderer, +pub struct NonCachingNARCalculationService { + nar_renderer: NARRenderer, } -impl - NonCachingNARCalculationService -{ - pub fn new(blob_service: BS, chunk_service: CS, directory_service: DS) -> Self { +impl NonCachingNARCalculationService { + pub fn new(blob_service: BS, directory_service: DS) -> Self { Self { - nar_renderer: NARRenderer::new(blob_service, chunk_service, directory_service), + nar_renderer: NARRenderer::new(blob_service, directory_service), } } } -impl NARCalculationService - for NonCachingNARCalculationService +impl NARCalculationService + for NonCachingNARCalculationService { fn calculate_nar( &self, diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index a061dad9bb35..b080a713ec0a 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -1,10 +1,11 @@ +use std::io::{self, BufReader}; + use crate::{ blobservice::BlobService, - chunkservice::ChunkService, directoryservice::DirectoryService, proto::{self, NamedNode}, - BlobReader, }; +use data_encoding::BASE64; use nix_compat::nar; use super::RenderError; @@ -12,17 +13,15 @@ use super::RenderError; /// A NAR renderer, using a blob_service, chunk_service and directory_service /// to render a NAR to a writer. #[derive(Clone)] -pub struct NARRenderer { +pub struct NARRenderer { blob_service: BS, - chunk_service: CS, directory_service: DS, } -impl NARRenderer { - pub fn new(blob_service: BS, chunk_service: CS, directory_service: DS) -> Self { +impl NARRenderer { + pub fn new(blob_service: BS, directory_service: DS) -> Self { Self { blob_service, - chunk_service, directory_service, } } @@ -65,49 +64,22 @@ impl NARRendere )) })?; - // query blob_service for blob_meta - let resp = self - .blob_service - .stat(&proto::StatBlobRequest { - digest: digest.to_vec(), - include_chunks: true, - ..Default::default() - }) - .map_err(RenderError::StoreError)?; + // TODO: handle error + let mut blob_reader = match self.blob_service.open_read(&digest).unwrap() { + Some(blob_reader) => Ok(BufReader::new(blob_reader)), + None => Err(RenderError::NARWriterError(io::Error::new( + io::ErrorKind::NotFound, + format!("blob with digest {} not found", BASE64.encode(&digest)), + ))), + }?; - match resp { - // if it's None, that's an error! - None => { - return Err(RenderError::BlobNotFound( - digest.to_vec(), - proto_file_node.name.to_owned(), - )); - } - Some(blob_meta) => { - // make sure the blob_meta size matches what we expect from proto_file_node - let blob_meta_size = blob_meta.chunks.iter().fold(0, |acc, e| acc + e.size); - if blob_meta_size != proto_file_node.size { - return Err(RenderError::UnexpectedBlobMeta( - digest.to_vec(), - proto_file_node.name.to_owned(), - proto_file_node.size, - blob_meta_size, - )); - } - - let mut blob_reader = std::io::BufReader::new(BlobReader::open( - &self.chunk_service, - blob_meta, - )); - nar_node - .file( - proto_file_node.executable, - proto_file_node.size.into(), - &mut blob_reader, - ) - .map_err(RenderError::NARWriterError)?; - } - } + nar_node + .file( + proto_file_node.executable, + proto_file_node.size.into(), + &mut blob_reader, + ) + .map_err(RenderError::NARWriterError)?; } proto::node::Node::Directory(proto_directory_node) => { let digest: [u8; 32] = -- cgit 1.4.1