about summary refs log tree commit diff
path: root/tvix/store/src/blobservice
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-07-20T10·37+0300
committerclbot <clbot@tvl.fyi>2023-07-21T19·01+0000
commita6580748aabe7fcbea735396ac700661b6c53e87 (patch)
treefab2df50c860f6ddc6730693223aa42e0416dca0 /tvix/store/src/blobservice
parent72e82ffcb11b1aaf1f1cc8db4189ced5ec0aa42e (diff)
feat(tvix/store/digests): use bytes::Bytes instead of Vec<u8> r/6437
This will save us some copies, because a clone will simply create an
additional pointer to the same data.

Change-Id: I017a5d6b4c85a861b5541ebad2858ad4fbf8e8fa
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8978
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/blobservice')
-rw-r--r--tvix/store/src/blobservice/grpc.rs7
-rw-r--r--tvix/store/src/blobservice/memory.rs2
-rw-r--r--tvix/store/src/blobservice/sled.rs3
3 files changed, 5 insertions, 7 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs
index 96e2869a4feb..a7f0e7c6e873 100644
--- a/tvix/store/src/blobservice/grpc.rs
+++ b/tvix/store/src/blobservice/grpc.rs
@@ -143,7 +143,7 @@ impl BlobService for GRPCBlobService {
             Ok(stream) => {
                 // map the stream of proto::BlobChunk to bytes.
                 let data_stream = stream.map(|x| {
-                    x.map(|x| VecDeque::from(x.data))
+                    x.map(|x| VecDeque::from(x.data.to_vec()))
                         .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
                 });
 
@@ -169,8 +169,7 @@ impl BlobService for GRPCBlobService {
 
         // bytes arriving on the RX side are wrapped inside a
         // [proto::BlobChunk], and a [ReceiverStream] is constructed.
-        let blobchunk_stream =
-            ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.to_vec() });
+        let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.into() });
 
         // That receiver stream is used as a stream in the gRPC BlobService.put rpc call.
         let task: tokio::task::JoinHandle<Result<_, Status>> = self
@@ -250,7 +249,7 @@ impl BlobWriter for GRPCBlobWriter {
             match self.tokio_handle.block_on(task)? {
                 Ok(resp) => {
                     // return the digest from the response, and store it in self.digest for subsequent closes.
-                    let digest = B3Digest::from_vec(resp.digest).map_err(|_| {
+                    let digest: B3Digest = resp.digest.try_into().map_err(|_| {
                         crate::Error::StorageError(
                             "invalid root digest length in response".to_string(),
                         )
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs
index fa2826fe3112..893f27364b80 100644
--- a/tvix/store/src/blobservice/memory.rs
+++ b/tvix/store/src/blobservice/memory.rs
@@ -108,7 +108,7 @@ impl BlobWriter for MemoryBlobWriter {
             let (buf, hasher) = self.writers.take().unwrap();
 
             // We know self.hasher is doing blake3 hashing, so this won't fail.
-            let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap();
+            let digest: B3Digest = hasher.finalize().as_bytes().into();
 
             // Only insert if the blob doesn't already exist.
             let db = self.db.read()?;
diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs
index 67897cb94a24..00291ba88717 100644
--- a/tvix/store/src/blobservice/sled.rs
+++ b/tvix/store/src/blobservice/sled.rs
@@ -136,8 +136,7 @@ impl BlobWriter for SledBlobWriter {
         } else {
             let (buf, hasher) = self.writers.take().unwrap();
 
-            // We know self.hasher is doing blake3 hashing, so this won't fail.
-            let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap();
+            let digest: B3Digest = hasher.finalize().as_bytes().into();
 
             // Only insert if the blob doesn't already exist.
             if !self.db.contains_key(digest.to_vec()).map_err(|e| {