about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-07-20T10·37+0300
committerclbot <clbot@tvl.fyi>2023-07-21T19·01+0000
commita6580748aabe7fcbea735396ac700661b6c53e87 (patch)
treefab2df50c860f6ddc6730693223aa42e0416dca0
parent72e82ffcb11b1aaf1f1cc8db4189ced5ec0aa42e (diff)
feat(tvix/store/digests): use bytes::Bytes instead of Vec<u8> r/6437
This will save us some copies, because a clone will simply create an
additional pointer to the same data.

Change-Id: I017a5d6b4c85a861b5541ebad2858ad4fbf8e8fa
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8978
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Autosubmit: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
-rw-r--r--tvix/store/src/blobservice/grpc.rs7
-rw-r--r--tvix/store/src/blobservice/memory.rs2
-rw-r--r--tvix/store/src/blobservice/sled.rs3
-rw-r--r--tvix/store/src/digests.rs43
-rw-r--r--tvix/store/src/directoryservice/grpc.rs17
-rw-r--r--tvix/store/src/directoryservice/traverse.rs6
-rw-r--r--tvix/store/src/directoryservice/utils.rs2
-rw-r--r--tvix/store/src/fuse/inodes.rs4
-rw-r--r--tvix/store/src/nar/renderer.rs10
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs12
-rw-r--r--tvix/store/src/proto/grpc_directoryservice_wrapper.rs13
-rw-r--r--tvix/store/src/proto/mod.rs5
-rw-r--r--tvix/store/src/proto/tests/directory.rs10
-rw-r--r--tvix/store/src/store_io.rs23
14 files changed, 94 insertions, 63 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs
index 96e2869a4f..a7f0e7c6e8 100644
--- a/tvix/store/src/blobservice/grpc.rs
+++ b/tvix/store/src/blobservice/grpc.rs
@@ -143,7 +143,7 @@ impl BlobService for GRPCBlobService {
             Ok(stream) => {
                 // map the stream of proto::BlobChunk to bytes.
                 let data_stream = stream.map(|x| {
-                    x.map(|x| VecDeque::from(x.data))
+                    x.map(|x| VecDeque::from(x.data.to_vec()))
                         .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidInput, e))
                 });
 
@@ -169,8 +169,7 @@ impl BlobService for GRPCBlobService {
 
         // bytes arriving on the RX side are wrapped inside a
         // [proto::BlobChunk], and a [ReceiverStream] is constructed.
-        let blobchunk_stream =
-            ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.to_vec() });
+        let blobchunk_stream = ReceiverStream::new(rx).map(|x| proto::BlobChunk { data: x.into() });
 
         // That receiver stream is used as a stream in the gRPC BlobService.put rpc call.
         let task: tokio::task::JoinHandle<Result<_, Status>> = self
@@ -250,7 +249,7 @@ impl BlobWriter for GRPCBlobWriter {
             match self.tokio_handle.block_on(task)? {
                 Ok(resp) => {
                     // return the digest from the response, and store it in self.digest for subsequent closes.
-                    let digest = B3Digest::from_vec(resp.digest).map_err(|_| {
+                    let digest: B3Digest = resp.digest.try_into().map_err(|_| {
                         crate::Error::StorageError(
                             "invalid root digest length in response".to_string(),
                         )
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs
index fa2826fe31..893f27364b 100644
--- a/tvix/store/src/blobservice/memory.rs
+++ b/tvix/store/src/blobservice/memory.rs
@@ -108,7 +108,7 @@ impl BlobWriter for MemoryBlobWriter {
             let (buf, hasher) = self.writers.take().unwrap();
 
             // We know self.hasher is doing blake3 hashing, so this won't fail.
-            let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap();
+            let digest: B3Digest = hasher.finalize().as_bytes().into();
 
             // Only insert if the blob doesn't already exist.
             let db = self.db.read()?;
diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs
index 67897cb94a..00291ba887 100644
--- a/tvix/store/src/blobservice/sled.rs
+++ b/tvix/store/src/blobservice/sled.rs
@@ -136,8 +136,7 @@ impl BlobWriter for SledBlobWriter {
         } else {
             let (buf, hasher) = self.writers.take().unwrap();
 
-            // We know self.hasher is doing blake3 hashing, so this won't fail.
-            let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap();
+            let digest: B3Digest = hasher.finalize().as_bytes().into();
 
             // Only insert if the blob doesn't already exist.
             if !self.db.contains_key(digest.to_vec()).map_err(|e| {
diff --git a/tvix/store/src/digests.rs b/tvix/store/src/digests.rs
index 441a059ee0..4df11b389e 100644
--- a/tvix/store/src/digests.rs
+++ b/tvix/store/src/digests.rs
@@ -1,10 +1,9 @@
+use bytes::Bytes;
 use data_encoding::BASE64;
 use thiserror::Error;
 
-// FUTUREWORK: make generic
-
 #[derive(PartialEq, Eq, Hash, Debug)]
-pub struct B3Digest(Vec<u8>);
+pub struct B3Digest(Bytes);
 
 // TODO: allow converting these errors to crate::Error
 #[derive(Error, Debug)]
@@ -14,25 +13,49 @@ pub enum Error {
 }
 
 impl B3Digest {
+    // returns a copy of the inner [Vec<u8>].
+    pub fn to_vec(&self) -> Vec<u8> {
+        self.0.to_vec()
+    }
+}
+
+impl From<B3Digest> for bytes::Bytes {
+    fn from(val: B3Digest) -> Self {
+        val.0
+    }
+}
+
+impl TryFrom<Vec<u8>> for B3Digest {
+    type Error = Error;
+
     // constructs a [B3Digest] from a [Vec<u8>].
     // Returns an error if the digest has the wrong length.
-    pub fn from_vec(value: Vec<u8>) -> Result<Self, Error> {
+    fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
         if value.len() != 32 {
             Err(Error::InvalidDigestLen(value.len()))
         } else {
-            Ok(Self(value))
+            Ok(Self(value.into()))
         }
     }
+}
 
-    // returns a copy of the inner [Vec<u8>].
-    pub fn to_vec(&self) -> Vec<u8> {
-        self.0.to_vec()
+impl TryFrom<bytes::Bytes> for B3Digest {
+    type Error = Error;
+
+    // constructs a [B3Digest] from a [bytes::Bytes].
+    // Returns an error if the digest has the wrong length.
+    fn try_from(value: bytes::Bytes) -> Result<Self, Self::Error> {
+        if value.len() != 32 {
+            Err(Error::InvalidDigestLen(value.len()))
+        } else {
+            Ok(Self(value))
+        }
     }
 }
 
 impl From<&[u8; 32]> for B3Digest {
     fn from(value: &[u8; 32]) -> Self {
-        Self(value.to_vec())
+        Self(value.to_vec().into())
     }
 }
 
@@ -44,6 +67,6 @@ impl Clone for B3Digest {
 
 impl std::fmt::Display for B3Digest {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
-        write!(f, "b3:{}", BASE64.encode(self.0.as_slice()))
+        write!(f, "b3:{}", BASE64.encode(&self.0))
     }
 }
diff --git a/tvix/store/src/directoryservice/grpc.rs b/tvix/store/src/directoryservice/grpc.rs
index b9a5036a91..e6f34b2bd8 100644
--- a/tvix/store/src/directoryservice/grpc.rs
+++ b/tvix/store/src/directoryservice/grpc.rs
@@ -142,12 +142,13 @@ impl DirectoryService for GRPCDirectoryService {
             .spawn(async move { grpc_client.put(tokio_stream::iter(vec![directory])).await });
 
         match self.tokio_handle.block_on(task)? {
-            Ok(put_directory_resp) => Ok(B3Digest::from_vec(
-                put_directory_resp.into_inner().root_digest,
-            )
-            .map_err(|_| {
-                Error::StorageError("invalid root digest length in response".to_string())
-            })?),
+            Ok(put_directory_resp) => Ok(put_directory_resp
+                .into_inner()
+                .root_digest
+                .try_into()
+                .map_err(|_| {
+                    Error::StorageError("invalid root digest length in response".to_string())
+                })?),
             Err(e) => Err(crate::Error::StorageError(e.to_string())),
         }
     }
@@ -265,7 +266,7 @@ impl Iterator for StreamIterator {
                     for child_directory in &directory.directories {
                         // We ran validate() above, so we know these digests must be correct.
                         let child_directory_digest =
-                            B3Digest::from_vec(child_directory.digest.clone()).unwrap();
+                            child_directory.digest.clone().try_into().unwrap();
 
                         self.expected_directory_digests
                             .insert(child_directory_digest);
@@ -355,7 +356,7 @@ impl DirectoryPutter for GRPCPutter {
                     .map_err(|e| Error::StorageError(e.to_string()))?
                     .root_digest;
 
-                B3Digest::from_vec(root_digest).map_err(|_| {
+                root_digest.try_into().map_err(|_| {
                     Error::StorageError("invalid root digest length in response".to_string())
                 })
             }
diff --git a/tvix/store/src/directoryservice/traverse.rs b/tvix/store/src/directoryservice/traverse.rs
index 17f709f40d..a6e61a813b 100644
--- a/tvix/store/src/directoryservice/traverse.rs
+++ b/tvix/store/src/directoryservice/traverse.rs
@@ -1,5 +1,5 @@
 use super::DirectoryService;
-use crate::{proto::NamedNode, B3Digest, Error};
+use crate::{proto::NamedNode, Error};
 use std::{os::unix::ffi::OsStrExt, sync::Arc};
 use tracing::{instrument, warn};
 
@@ -40,7 +40,9 @@ pub fn traverse_to(
                     Ok(None)
                 }
                 crate::proto::node::Node::Directory(directory_node) => {
-                    let digest = B3Digest::from_vec(directory_node.digest)
+                    let digest = directory_node
+                        .digest
+                        .try_into()
                         .map_err(|_e| Error::StorageError("invalid digest length".to_string()))?;
 
                     // fetch the linked node from the directory_service
diff --git a/tvix/store/src/directoryservice/utils.rs b/tvix/store/src/directoryservice/utils.rs
index d152fb78a9..95f02f1f9c 100644
--- a/tvix/store/src/directoryservice/utils.rs
+++ b/tvix/store/src/directoryservice/utils.rs
@@ -35,7 +35,7 @@ impl<DS: DirectoryService> DirectoryTraverser<DS> {
     fn enqueue_child_directories(&mut self, directory: &proto::Directory) {
         for child_directory_node in &directory.directories {
             // TODO: propagate error
-            let child_digest = B3Digest::from_vec(child_directory_node.digest.clone()).unwrap();
+            let child_digest: B3Digest = child_directory_node.digest.clone().try_into().unwrap();
 
             if self.worklist_directory_digests.contains(&child_digest)
                 || self.sent_directory_digests.contains(&child_digest)
diff --git a/tvix/store/src/fuse/inodes.rs b/tvix/store/src/fuse/inodes.rs
index bf883cb22f..a52ba7989e 100644
--- a/tvix/store/src/fuse/inodes.rs
+++ b/tvix/store/src/fuse/inodes.rs
@@ -38,7 +38,7 @@ impl From<&proto::SymlinkNode> for InodeData {
 impl From<&proto::FileNode> for InodeData {
     fn from(value: &proto::FileNode) -> Self {
         InodeData::Regular(
-            B3Digest::from_vec(value.digest.clone()).unwrap(),
+            value.digest.clone().try_into().unwrap(),
             value.size,
             value.executable,
         )
@@ -49,7 +49,7 @@ impl From<&proto::FileNode> for InodeData {
 impl From<&proto::DirectoryNode> for InodeData {
     fn from(value: &proto::DirectoryNode) -> Self {
         InodeData::Directory(DirectoryInodeData::Sparse(
-            B3Digest::from_vec(value.digest.clone()).unwrap(),
+            value.digest.clone().try_into().unwrap(),
             value.size,
         ))
     }
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index 518854c09c..e2119ae079 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -3,7 +3,6 @@ use crate::{
     blobservice::BlobService,
     directoryservice::DirectoryService,
     proto::{self, NamedNode},
-    B3Digest,
 };
 use count_write::CountWrite;
 use nix_compat::nar;
@@ -65,7 +64,7 @@ fn walk_node(
                 .map_err(RenderError::NARWriterError)?;
         }
         proto::node::Node::File(proto_file_node) => {
-            let digest = B3Digest::from_vec(proto_file_node.digest.clone()).map_err(|_e| {
+            let digest = proto_file_node.digest.clone().try_into().map_err(|_e| {
                 warn!(
                     file_node = ?proto_file_node,
                     "invalid digest length in file node",
@@ -96,8 +95,11 @@ fn walk_node(
                 .map_err(RenderError::NARWriterError)?;
         }
         proto::node::Node::Directory(proto_directory_node) => {
-            let digest =
-                B3Digest::from_vec(proto_directory_node.digest.to_vec()).map_err(|_e| {
+            let digest = proto_directory_node
+                .digest
+                .clone()
+                .try_into()
+                .map_err(|_e| {
                     RenderError::StoreError(crate::Error::StorageError(
                         "invalid digest len in directory node".to_string(),
                     ))
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index fee97c7d2d..e60ff2ef1d 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,6 +1,4 @@
-use crate::{
-    blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead, B3Digest,
-};
+use crate::{blobservice::BlobService, proto::sync_read_into_async_read::SyncReadIntoAsyncRead};
 use std::{
     collections::VecDeque,
     io,
@@ -96,7 +94,9 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
         request: Request<super::StatBlobRequest>,
     ) -> Result<Response<super::BlobMeta>, Status> {
         let rq = request.into_inner();
-        let req_digest = B3Digest::from_vec(rq.digest)
+        let req_digest = rq
+            .digest
+            .try_into()
             .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
 
         if rq.include_chunks || rq.include_bao {
@@ -117,7 +117,9 @@ impl super::blob_service_server::BlobService for GRPCBlobServiceWrapper {
     ) -> Result<Response<Self::ReadStream>, Status> {
         let rq = request.into_inner();
 
-        let req_digest = B3Digest::from_vec(rq.digest)
+        let req_digest = rq
+            .digest
+            .try_into()
             .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
 
         match self.blob_service.open_read(&req_digest) {
diff --git a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
index ec9e3cb123..22fcd2fa6a 100644
--- a/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_directoryservice_wrapper.rs
@@ -38,8 +38,10 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
             // look at the digest in the request and put it in the top of the queue.
             match &req_inner.by_what {
                 None => return Err(Status::invalid_argument("by_what needs to be specified")),
-                Some(proto::get_directory_request::ByWhat::Digest(digest)) => {
-                    let digest = B3Digest::from_vec(digest.to_vec())
+                Some(proto::get_directory_request::ByWhat::Digest(ref digest)) => {
+                    let digest: B3Digest = digest
+                        .clone()
+                        .try_into()
                         .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
 
                     task::spawn(async move {
@@ -91,6 +93,8 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
         // This keeps track of the seen directory keys, and their size.
         // This is used to validate the size field of a reference to a previously sent directory.
         // We don't need to keep the contents around, they're stored in the DB.
+        // https://github.com/rust-lang/rust-clippy/issues/5812
+        #[allow(clippy::mutable_key_type)]
         let mut seen_directories_sizes: HashMap<B3Digest, u32> = HashMap::new();
         let mut last_directory_dgst: Option<B3Digest> = None;
 
@@ -110,7 +114,10 @@ impl proto::directory_service_server::DirectoryService for GRPCDirectoryServiceW
             // to ensure it has been seen already in this stream, and that the size
             // matches what we recorded.
             for child_directory in &directory.directories {
-                let child_directory_digest = B3Digest::from_vec(child_directory.digest.to_vec())
+                let child_directory_digest: B3Digest = child_directory
+                    .digest
+                    .clone()
+                    .try_into()
                     .map_err(|_e| Status::internal("invalid child directory digest len"))?;
 
                 match seen_directories_sizes.get(&child_directory_digest) {
diff --git a/tvix/store/src/proto/mod.rs b/tvix/store/src/proto/mod.rs
index 7e81efc517..126a8b0edc 100644
--- a/tvix/store/src/proto/mod.rs
+++ b/tvix/store/src/proto/mod.rs
@@ -247,12 +247,11 @@ impl Directory {
     pub fn digest(&self) -> B3Digest {
         let mut hasher = blake3::Hasher::new();
 
-        let vec = hasher
+        hasher
             .update(&self.encode_to_vec())
             .finalize()
             .as_bytes()
-            .to_vec();
-        B3Digest::from_vec(vec).unwrap()
+            .into()
     }
 
     /// validate checks the directory for invalid data, such as:
diff --git a/tvix/store/src/proto/tests/directory.rs b/tvix/store/src/proto/tests/directory.rs
index 48eeaa7b5f..22b10ca746 100644
--- a/tvix/store/src/proto/tests/directory.rs
+++ b/tvix/store/src/proto/tests/directory.rs
@@ -1,7 +1,4 @@
-use crate::{
-    proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError},
-    B3Digest,
-};
+use crate::proto::{Directory, DirectoryNode, FileNode, SymlinkNode, ValidateDirectoryError};
 use lazy_static::lazy_static;
 
 lazy_static! {
@@ -69,11 +66,12 @@ fn digest() {
 
     assert_eq!(
         d.digest(),
-        B3Digest::from_vec(vec![
+        vec![
             0xaf, 0x13, 0x49, 0xb9, 0xf5, 0xf9, 0xa1, 0xa6, 0xa0, 0x40, 0x4d, 0xea, 0x36, 0xdc,
             0xc9, 0x49, 0x9b, 0xcb, 0x25, 0xc9, 0xad, 0xc1, 0x12, 0xb7, 0xcc, 0x9a, 0x93, 0xca,
             0xe4, 0x1f, 0x32, 0x62
-        ])
+        ]
+        .try_into()
         .unwrap()
     )
 }
diff --git a/tvix/store/src/store_io.rs b/tvix/store/src/store_io.rs
index 701b52f667..19a809b6a1 100644
--- a/tvix/store/src/store_io.rs
+++ b/tvix/store/src/store_io.rs
@@ -216,8 +216,8 @@ impl EvalIO for TvixStoreIO {
                         ))
                     }
                     crate::proto::node::Node::File(file_node) => {
-                        let digest =
-                            B3Digest::from_vec(file_node.digest.clone()).map_err(|_e| {
+                        let digest: B3Digest =
+                            file_node.digest.clone().try_into().map_err(|_e| {
                                 error!(
                                     file_node = ?file_node,
                                     "invalid digest"
@@ -272,16 +272,15 @@ impl EvalIO for TvixStoreIO {
                 match node {
                     crate::proto::node::Node::Directory(directory_node) => {
                         // fetch the Directory itself.
-                        let digest =
-                            B3Digest::from_vec(directory_node.digest.clone()).map_err(|_e| {
-                                io::Error::new(
-                                    io::ErrorKind::InvalidData,
-                                    format!(
-                                        "invalid digest length in directory node: {:?}",
-                                        directory_node
-                                    ),
-                                )
-                            })?;
+                        let digest = directory_node.digest.clone().try_into().map_err(|_e| {
+                            io::Error::new(
+                                io::ErrorKind::InvalidData,
+                                format!(
+                                    "invalid digest length in directory node: {:?}",
+                                    directory_node
+                                ),
+                            )
+                        })?;
 
                         if let Some(directory) = self.directory_service.get(&digest)? {
                             let mut children: Vec<(Vec<u8>, FileType)> = Vec::new();