From a6580748aabe7fcbea735396ac700661b6c53e87 Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Thu, 20 Jul 2023 13:37:29 +0300 Subject: feat(tvix/store/digests): use bytes::Bytes instead of Vec This will save us some copies, because a clone will simply create an additional pointer to the same data. Change-Id: I017a5d6b4c85a861b5541ebad2858ad4fbf8e8fa Reviewed-on: https://cl.tvl.fyi/c/depot/+/8978 Reviewed-by: raitobezarius Autosubmit: flokli Tested-by: BuildkiteCI --- tvix/store/src/blobservice/grpc.rs | 7 +++---- tvix/store/src/blobservice/memory.rs | 2 +- tvix/store/src/blobservice/sled.rs | 3 +-- 3 files changed, 5 insertions(+), 7 deletions(-) (limited to 'tvix/store/src/blobservice') diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs index 96e2869a4feb..a7f0e7c6e873 100644 --- a/tvix/store/src/blobservice/grpc.rs +++ b/tvix/store/src/blobservice/grpc.rs @@ -143,7 +143,7 @@ impl BlobService for GRPCBlobService { Ok(stream) => { // map the stream of proto::BlobChunk to bytes. let data_stream = stream.map(|x| { - x.map(|x| VecDeque::from(x.data)) + x.map(|x| VecDeque::from(x.data.to_vec())) .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e)) }); @@ -169,8 +169,7 @@ impl BlobService for GRPCBlobService { // bytes arriving on the RX side are wrapped inside a // [proto::BlobChunk], and a [ReceiverStream] is constructed. - let blobchunk_stream = - ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.to_vec() }); + let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.into() }); // That receiver stream is used as a stream in the gRPC BlobService.put rpc call. let task: tokio::task::JoinHandle> = self @@ -250,7 +249,7 @@ impl BlobWriter for GRPCBlobWriter { match self.tokio_handle.block_on(task)? { Ok(resp) => { // return the digest from the response, and store it in self.digest for subsequent closes. - let digest = B3Digest::from_vec(resp.digest).map_err(|_| { + let digest: B3Digest = resp.digest.try_into().map_err(|_| { crate::Error::StorageError( "invalid root digest length in response".to_string(), ) diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs index fa2826fe3112..893f27364b80 100644 --- a/tvix/store/src/blobservice/memory.rs +++ b/tvix/store/src/blobservice/memory.rs @@ -108,7 +108,7 @@ impl BlobWriter for MemoryBlobWriter { let (buf, hasher) = self.writers.take().unwrap(); // We know self.hasher is doing blake3 hashing, so this won't fail. - let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + let digest: B3Digest = hasher.finalize().as_bytes().into(); // Only insert if the blob doesn't already exist. let db = self.db.read()?; diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs index 67897cb94a24..00291ba88717 100644 --- a/tvix/store/src/blobservice/sled.rs +++ b/tvix/store/src/blobservice/sled.rs @@ -136,8 +136,7 @@ impl BlobWriter for SledBlobWriter { } else { let (buf, hasher) = self.writers.take().unwrap(); - // We know self.hasher is doing blake3 hashing, so this won't fail. - let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap(); + let digest: B3Digest = hasher.finalize().as_bytes().into(); // Only insert if the blob doesn't already exist. if !self.db.contains_key(digest.to_vec()).map_err(|e| { -- cgit 1.4.1