From a6580748aabe7fcbea735396ac700661b6c53e87 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Thu, 20 Jul 2023 13:37:29 +0300 Subject: feat(tvix/store/digests): use bytes::Bytes instead of Vec This will save us some copies, because a clone will simply create an additional pointer to the same data. Change-Id: I017a5d6b4c85a861b5541ebad2858ad4fbf8e8fa Reviewed-on: https://cl.tvl.fyi/c/depot/+/8978 Reviewed-by: raitobezarius Autosubmit: flokli Tested-by: BuildkiteCI --- tvix/store/src/blobservice/grpc.rs | 7 ++-- tvix/store/src/blobservice/memory.rs | 2 +- tvix/store/src/blobservice/sled.rs | 3 +- tvix/store/src/digests.rs | 43 +++++++++++++++++----- tvix/store/src/directoryservice/grpc.rs | 17 +++++---- tvix/store/src/directoryservice/traverse.rs | 6 ++- tvix/store/src/directoryservice/utils.rs | 2 +- tvix/store/src/fuse/inodes.rs | 4 +- tvix/store/src/nar/renderer.rs | 10 +++-- tvix/store/src/proto/grpc_blobservice_wrapper.rs | 12 +++--- .../src/proto/grpc_directoryservice_wrapper.rs | 13 +++++-- tvix/store/src/proto/mod.rs | 5 +-- tvix/store/src/proto/tests/directory.rs | 10 ++--- tvix/store/src/store_io.rs | 23 ++++++------ 14 files changed, 94 insertions(+), 63 deletions(-) (limited to 'tvix') diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index 96e2869a4f..a7f0e7c6e8 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -143,7 +143,7 @@ impl BlobService for GRPCBlobService { Ok(stream) => { // map the stream of proto::BlobChunk to bytes. let data_stream = stream.map(|x| { - x.map(|x| VecDeque::from(x.data)) + x.map(|x| VecDeque::from(x.data.to_vec())) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) }); @@ -169,8 +169,7 @@ impl BlobService for GRPCBlobService { // bytes arriving on the RX side are wrapped inside a // [proto::BlobChunk], and a [ReceiverStream] is constructed. - let blobchunk_stream = - ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.to_vec() }); + let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.into() }); // That receiver stream is used as a stream in the gRPC BlobService.put rpc call. let task: tokio::task::JoinHandle> = self @@ -250,7 +249,7 @@ impl BlobWriter for GRPCBlobWriter { match self.tokio_handle.block_on(task)? { Ok(resp) => { // return the digest from the response, and store it in self.digest for subsequent closes. - let digest = B3Digest::from_vec(resp.digest).map_err(|_| { + let digest: B3Digest = resp.digest.try_into().map_err(|_| { crate::Error::StorageError( "invalid root digest length in response".to_string(), ) diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index fa2826fe31..893f27364b 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -108,7 +108,7 @@ impl BlobWriter for MemoryBlobWriter { let (buf, hasher) = self.writers.take().unwrap(); // We know self.hasher is doing blake3 hashing, so this won't fail. - let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + let digest: B3Digest = hasher.finalize().as_bytes().into(); // Only insert if the blob doesn't already exist. let db = self.db.read()?; diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 67897cb94a..00291ba887 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -136,8 +136,7 @@ impl BlobWriter for SledBlobWriter { } else { let (buf, hasher) = self.writers.take().unwrap(); - // We know self.hasher is doing blake3 hashing, so this won't fail. - let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + let digest: B3Digest = hasher.finalize().as_bytes().into(); // Only insert if the blob doesn't already exist. if !self.db.contains_key(digest.to_vec()).map_err(|e| { diff --git a/tvix/store/src/digests.rs b/tvix/store/src/digests.rs index 441a059ee0..4df11b389e 100644 --- a/tvix/store/src/digests.rs +++ b/tvix/store/src/digests.rs @@ -1,10 +1,9 @@ +use bytes::Bytes; use data_encoding::BASE64; use thiserror::Error; -// FUTUREWORK: make generic - #[derive(PartialEq, Eq, Hash, Debug)] -pub struct B3Digest(Vec); +pub struct B3Digest(Bytes); // TODO: allow converting these errors to crate::Error #[derive(Error, Debug)] @@ -14,25 +13,49 @@ pub enum Error { } impl B3Digest { + // returns a copy of the inner [Vec]. + pub fn to_vec(&self) -> Vec { + self.0.to_vec() + } +} + +impl From for bytes::Bytes { + fn from(val: B3Digest) -> Self { + val.0 + } +} + +impl TryFrom> for B3Digest { + type Error = Error; + // constructs a [B3Digest] from a [Vec]. // Returns an error if the digest has the wrong length. - pub fn from_vec(value: Vec) -> Result { + fn try_from(value: Vec) -> Result { if value.len() != 32 { Err(Error::InvalidDigestLen(value.len())) } else { - Ok(Self(value)) + Ok(Self(value.into())) } } +} - // returns a copy of the inner [Vec]. - pub fn to_vec(&self) -> Vec { - self.0.to_vec() +impl TryFrom for B3Digest { + type Error = Error; + + // constructs a [B3Digest] from a [bytes::Bytes]. + // Returns an error if the digest has the wrong length. + fn try_from(value: bytes::Bytes) -> Result { + if value.len() != 32 { + Err(Error::InvalidDigestLen(value.len())) + } else { + Ok(Self(value)) + } } } impl From<&[u8; 32]> for B3Digest { fn from(value: &[u8; 32]) -> Self { - Self(value.to_vec()) + Self(value.to_vec().into()) } } @@ -44,6 +67,6 @@ impl Clone for B3Digest { impl std::fmt::Display for B3Digest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "b3:{}", BASE64.encode(self.0.as_slice())) + write!(f, "b3:{}", BASE64.encode(&self.0)) } } diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs index b9a5036a91..e6f34b2bd8 100644 --- a/tvix/store/src/directoryservice/grpc.rs +++ b/tvix/store/src/directoryservice/grpc.rs @@ -142,12 +142,13 @@ impl DirectoryService for GRPCDirectoryService { .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await }); match self.tokio_handle.block_on(task)? { - Ok(put_directory_resp) => Ok(B3Digest::from_vec( - put_directory_resp.into_inner().root_digest, - ) - .map_err(|_| { - Error::StorageError("invalid root digest length in response".to_string()) - })?), + Ok(put_directory_resp) => Ok(put_directory_resp + .into_inner() + .root_digest + .try_into() + .map_err(|_| { + Error::StorageError("invalid root digest length in response".to_string()) + })?), Err(e) => Err(crate::Error::StorageError(e.to_string())), } } @@ -265,7 +266,7 @@ impl Iterator for StreamIterator { for child_directory in &directory.directories { // We ran validate() above, so we know these digests must be correct. let child_directory_digest = - B3Digest::from_vec(child_directory.digest.clone()).unwrap(); + child_directory.digest.clone().try_into().unwrap(); self.expected_directory_digests .insert(child_directory_digest); @@ -355,7 +356,7 @@ impl DirectoryPutter for GRPCPutter { .map_err(|e| Error::StorageError(e.to_string()))? .root_digest; - B3Digest::from_vec(root_digest).map_err(|_| { + root_digest.try_into().map_err(|_| { Error::StorageError("invalid root digest length in response".to_string()) }) } diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs index 17f709f40d..a6e61a813b 100644 --- a/tvix/store/src/directoryservice/traverse.rs +++ b/tvix/store/src/directoryservice/traverse.rs @@ -1,5 +1,5 @@ use super::DirectoryService; -use crate::{proto::NamedNode, B3Digest, Error}; +use crate::{proto::NamedNode, Error}; use std::{os::unix::ffi::OsStrExt, sync::Arc}; use tracing::{instrument, warn}; @@ -40,7 +40,9 @@ pub fn traverse_to( Ok(None) } crate::proto::node::Node::Directory(directory_node) => { - let digest = B3Digest::from_vec(directory_node.digest) + let digest = directory_node + .digest + .try_into() .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?; // fetch the linked node from the directory_service diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs index d152fb78a9..95f02f1f9c 100644 --- a/tvix/store/src/directoryservice/utils.rs +++ b/tvix/store/src/directoryservice/utils.rs @@ -35,7 +35,7 @@ impl DirectoryTraverser { fn enqueue_child_directories(&mut self, directory: &proto::Directory) { for child_directory_node in &directory.directories { // TODO: propagate error - let child_digest = B3Digest::from_vec(child_directory_node.digest.clone()).unwrap(); + let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap(); if self.worklist_directory_digests.contains(&child_digest) || self.sent_directory_digests.contains(&child_digest) diff --git a/tvix/store/src/fuse/inodes.rs b/tvix/store/src/fuse/inodes.rs index bf883cb22f..a52ba7989e 100644 --- a/tvix/store/src/fuse/inodes.rs +++ b/tvix/store/src/fuse/inodes.rs @@ -38,7 +38,7 @@ impl From<&proto::SymlinkNode> for InodeData { impl From<&proto::FileNode> for InodeData { fn from(value: &proto::FileNode) -> Self { InodeData::Regular( - B3Digest::from_vec(value.digest.clone()).unwrap(), + value.digest.clone().try_into().unwrap(), value.size, value.executable, ) @@ -49,7 +49,7 @@ impl From<&proto::FileNode> for InodeData { impl From<&proto::DirectoryNode> for InodeData { fn from(value: &proto::DirectoryNode) -> Self { InodeData::Directory(DirectoryInodeData::Sparse( - B3Digest::from_vec(value.digest.clone()).unwrap(), + value.digest.clone().try_into().unwrap(), value.size, )) } diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs index 518854c09c..e2119ae079 100644 --- a/tvix/store/src/nar/renderer.rs +++ b/tvix/store/src/nar/renderer.rs @@ -3,7 +3,6 @@ use crate::{ blobservice::BlobService, directoryservice::DirectoryService, proto::{self, NamedNode}, - B3Digest, }; use count_write::CountWrite; use nix_compat::nar; @@ -65,7 +64,7 @@ fn walk_node( .map_err(RenderError::NARWriterError)?; } proto::node::Node::File(proto_file_node) => { - let digest = B3Digest::from_vec(proto_file_node.digest.clone()).map_err(|_e| { + let digest = proto_file_node.digest.clone().try_into().map_err(|_e| { warn!( file_node = ?proto_file_node, "invalid digest length in file node", @@ -96,8 +95,11 @@ fn walk_node( .map_err(RenderError::NARWriterError)?; } proto::node::Node::Directory(proto_directory_node) => { - let digest = - B3Digest::from_vec(proto_directory_node.digest.to_vec()).map_err(|_e| { + let digest = proto_directory_node + .digest + .clone() + .try_into() + .map_err(|_e| { RenderError::StoreError(crate::Error::StorageError( "invalid digest len in directory node".to_string(), )) diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index fee97c7d2d..e60ff2ef1d 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -1,6 +1,4 @@ -use crate::{ - blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead, B3Digest, -}; +use crate::{blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead}; use std::{ collections::VecDeque, io, @@ -96,7 +94,9 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { request: Request, ) -> Result, Status> { let rq = request.into_inner(); - let req_digest = B3Digest::from_vec(rq.digest) + let req_digest = rq + .digest + .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; if rq.include_chunks || rq.include_bao { @@ -117,7 +117,9 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper { ) -> Result, Status> { let rq = request.into_inner(); - let req_digest = B3Digest::from_vec(rq.digest) + let req_digest = rq + .digest + .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; match self.blob_service.open_read(&req_digest) { diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs index ec9e3cb123..22fcd2fa6a 100644 --- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs @@ -38,8 +38,10 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW // look at the digest in the request and put it in the top of the queue. match &req_inner.by_what { None => return Err(Status::invalid_argument("by_what needs to be specified")), - Some(proto::get_directory_request::ByWhat::Digest(digest)) => { - let digest = B3Digest::from_vec(digest.to_vec()) + Some(proto::get_directory_request::ByWhat::Digest(ref digest)) => { + let digest: B3Digest = digest + .clone() + .try_into() .map_err(|_e| Status::invalid_argument("invalid digest length"))?; task::spawn(async move { @@ -91,6 +93,8 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW // This keeps track of the seen directory keys, and their size. // This is used to validate the size field of a reference to a previously sent directory. // We don't need to keep the contents around, they're stored in the DB. + // https://github.com/rust-lang/rust-clippy/issues/5812 + #[allow(clippy::mutable_key_type)] let mut seen_directories_sizes: HashMap = HashMap::new(); let mut last_directory_dgst: Option = None; @@ -110,7 +114,10 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW // to ensure it has been seen already in this stream, and that the size // matches what we recorded. for child_directory in &directory.directories { - let child_directory_digest = B3Digest::from_vec(child_directory.digest.to_vec()) + let child_directory_digest: B3Digest = child_directory + .digest + .clone() + .try_into() .map_err(|_e| Status::internal("invalid child directory digest len"))?; match seen_directories_sizes.get(&child_directory_digest) { diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs index 7e81efc517..126a8b0edc 100644 --- a/tvix/store/src/proto/mod.rs +++ b/tvix/store/src/proto/mod.rs @@ -247,12 +247,11 @@ impl Directory { pub fn digest(&self) -> B3Digest { let mut hasher = blake3::Hasher::new(); - let vec = hasher + hasher .update(&self.encode_to_vec()) .finalize() .as_bytes() - .to_vec(); - B3Digest::from_vec(vec).unwrap() + .into() } /// validate checks the directory for invalid data, such as: diff --git a/tvix/store/src/proto/tests/directory.rs b/tvix/store/src/proto/tests/directory.rs index 48eeaa7b5f..22b10ca746 100644 --- a/tvix/store/src/proto/tests/directory.rs +++ b/tvix/store/src/proto/tests/directory.rs @@ -1,7 +1,4 @@ -use crate::{ - proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError}, - B3Digest, -}; +use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError}; use lazy_static::lazy_static; lazy_static! { @@ -69,11 +66,12 @@ fn digest() { assert_eq!( d.digest(), - B3Digest::from_vec(vec![ + vec![ 0xaf, 0x13, 0x49, 0xb9, 0xf5, 0xf9, 0xa1, 0xa6, 0xa0, 0x40, 0x4d, 0xea, 0x36, 0xdc, 0xc9, 0x49, 0x9b, 0xcb, 0x25, 0xc9, 0xad, 0xc1, 0x12, 0xb7, 0xcc, 0x9a, 0x93, 0xca, 0xe4, 0x1f, 0x32, 0x62 - ]) + ] + .try_into() .unwrap() ) } diff --git a/tvix/store/src/store_io.rs b/tvix/store/src/store_io.rs index 701b52f667..19a809b6a1 100644 --- a/tvix/store/src/store_io.rs +++ b/tvix/store/src/store_io.rs @@ -216,8 +216,8 @@ impl EvalIO for TvixStoreIO { )) } crate::proto::node::Node::File(file_node) => { - let digest = - B3Digest::from_vec(file_node.digest.clone()).map_err(|_e| { + let digest: B3Digest = + file_node.digest.clone().try_into().map_err(|_e| { error!( file_node = ?file_node, "invalid digest" @@ -272,16 +272,15 @@ impl EvalIO for TvixStoreIO { match node { crate::proto::node::Node::Directory(directory_node) => { // fetch the Directory itself. - let digest = - B3Digest::from_vec(directory_node.digest.clone()).map_err(|_e| { - io::Error::new( - io::ErrorKind::InvalidData, - format!( - "invalid digest length in directory node: {:?}", - directory_node - ), - ) - })?; + let digest = directory_node.digest.clone().try_into().map_err(|_e| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "invalid digest length in directory node: {:?}", + directory_node + ), + ) + })?; if let Some(directory) = self.directory_service.get(&digest)? { let mut children: Vec<(Vec, FileType)> = Vec::new(); -- cgit 1.4.1