about summary refs log tree commit diff
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-05-22T11·42+0300
committerclbot <clbot@tvl.fyi>2023-05-23T10·48+0000
commit066179651c999e9680edae11817ca4ab42acc1ea (patch)
treedcc9badcaf27e0a0bc43842e8b3520e39080f351
parentb8ff08b1b0d2dbd8dd546dc9cbdea2f11304d5c8 (diff)
refactor(tvix/store/blobsvc): move from Vec<u8> to B3Digest r/6178
Change-Id: I809bab75221f81b6023cfe75c2fe9e589c1e9192
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8605
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
-rw-r--r--tvix/store/src/blobservice/grpc.rs25
-rw-r--r--tvix/store/src/blobservice/memory.rs24
-rw-r--r--tvix/store/src/blobservice/mod.rs8
-rw-r--r--tvix/store/src/blobservice/sled.rs28
-rw-r--r--tvix/store/src/nar/renderer.rs33
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs24
-rw-r--r--tvix/store/src/tests/fixtures.rs18
-rw-r--r--tvix/store/src/tests/import.rs8
-rw-r--r--tvix/store/src/tests/nar_renderer.rs6
9 files changed, 79 insertions, 95 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs
index a50c94faf5..9c07345839 100644
--- a/tvix/store/src/blobservice/grpc.rs
+++ b/tvix/store/src/blobservice/grpc.rs
@@ -1,4 +1,5 @@
-use data_encoding::BASE64;
+use super::{BlobService, BlobWriter};
+use crate::{proto, B3Digest};
 use futures::sink::{SinkExt, SinkMapErr};
 use std::{collections::VecDeque, io};
 use tokio::task::JoinHandle;
@@ -10,9 +11,6 @@ use tokio_util::{
 use tonic::{transport::Channel, Code, Status, Streaming};
 use tracing::instrument;
 
-use super::{BlobService, BlobWriter};
-use crate::proto;
-
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
 pub struct GRPCBlobService {
@@ -30,11 +28,11 @@ impl BlobService for GRPCBlobService {
     type BlobReader = Box<dyn io::Read + Send>;
     type BlobWriter = GRPCBlobWriter;
 
-    #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, crate::Error> {
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
-        let digest = digest.to_owned();
+        let digest = digest.clone();
 
         let task: tokio::task::JoinHandle<Result<_, Status>> =
             self.tokio_handle.spawn(async move {
@@ -56,10 +54,10 @@ impl BlobService for GRPCBlobService {
 
     // On success, this returns a Ok(Some(io::Read)), which can be used to read
     // the contents of the Blob, identified by the digest.
-    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, crate::Error> {
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
-        let digest = digest.to_owned();
+        let digest = digest.clone();
 
         // Construct the task that'll send out the request and return the stream
         // the gRPC client should use to send [proto::BlobChunk], or an error if
@@ -68,7 +66,7 @@ impl BlobService for GRPCBlobService {
             self.tokio_handle.spawn(async move {
                 let stream = grpc_client
                     .read(proto::ReadBlobRequest {
-                        digest: digest.into(),
+                        digest: digest.to_vec(),
                     })
                     .await?
                     .into_inner();
@@ -164,7 +162,7 @@ pub struct GRPCBlobWriter {
 }
 
 impl BlobWriter for GRPCBlobWriter {
-    fn close(mut self) -> Result<[u8; 32], crate::Error> {
+    fn close(mut self) -> Result<B3Digest, crate::Error> {
         // invoke shutdown, so the inner writer closes its internal tx side of
         // the channel.
         self.inner_writer
@@ -177,8 +175,9 @@ impl BlobWriter for GRPCBlobWriter {
         match self.tokio_handle.block_on(self.task)? {
             Ok(resp) => {
                 // return the digest from the response.
-                let digest: Vec<u8> = resp.digest;
-                Ok(digest.try_into().unwrap())
+                B3Digest::from_vec(resp.digest).map_err(|_| {
+                    crate::Error::StorageError("invalid root digest length in response".to_string())
+                })
             }
             Err(e) => Err(crate::Error::StorageError(e.to_string())),
         }
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs
index 9a796ca2c0..1ee59d1087 100644
--- a/tvix/store/src/blobservice/memory.rs
+++ b/tvix/store/src/blobservice/memory.rs
@@ -1,4 +1,3 @@
-use data_encoding::BASE64;
 use std::io::Cursor;
 use std::{
     collections::HashMap,
@@ -7,27 +6,24 @@ use std::{
 use tracing::{instrument, warn};
 
 use super::{BlobService, BlobWriter};
-use crate::Error;
-
-// type B3Digest = [u8; 32];
-// struct B3Digest ([u8; 32]);
+use crate::{B3Digest, Error};
 
 #[derive(Clone, Default)]
 pub struct MemoryBlobService {
-    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
+    db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
 }
 
 impl BlobService for MemoryBlobService {
     type BlobReader = Cursor<Vec<u8>>;
     type BlobWriter = MemoryBlobWriter;
 
-    #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    fn has(&self, digest: &B3Digest) -> Result<bool, Error> {
         let db = self.db.read().unwrap();
         Ok(db.contains_key(digest))
     }
 
-    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> {
         let db = self.db.read().unwrap();
 
         Ok(db.get(digest).map(|x| Cursor::new(x.clone())))
@@ -40,13 +36,13 @@ impl BlobService for MemoryBlobService {
 }
 
 pub struct MemoryBlobWriter {
-    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
+    db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
 
     buf: Vec<u8>,
 }
 
 impl MemoryBlobWriter {
-    fn new(db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>) -> Self {
+    fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self {
         Self {
             buf: Vec::new(),
             db,
@@ -64,16 +60,16 @@ impl std::io::Write for MemoryBlobWriter {
 }
 
 impl BlobWriter for MemoryBlobWriter {
-    fn close(self) -> Result<[u8; 32], Error> {
+    fn close(self) -> Result<B3Digest, Error> {
         // in this memory implementation, we don't actually bother hashing
         // incrementally while writing, but do it at the end.
         let mut hasher = blake3::Hasher::new();
         hasher.update(&self.buf);
-        let digest: [u8; 32] = hasher.finalize().into();
+        let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap();
 
         // open the database for writing.
         let mut db = self.db.write()?;
-        db.insert(digest, self.buf);
+        db.insert(digest.clone(), self.buf);
 
         Ok(digest)
     }
diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs
index 0e7433b0e7..c5a2de1246 100644
--- a/tvix/store/src/blobservice/mod.rs
+++ b/tvix/store/src/blobservice/mod.rs
@@ -1,6 +1,6 @@
 use std::io;
 
-use crate::Error;
+use crate::{B3Digest, Error};
 
 mod grpc;
 mod memory;
@@ -19,10 +19,10 @@ pub trait BlobService {
     type BlobWriter: BlobWriter + Send;
 
     /// Check if the service has the blob, by its content hash.
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
+    fn has(&self, digest: &B3Digest) -> Result<bool, Error>;
 
     /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>.
-    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error>;
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error>;
 
     /// Insert a new blob into the store. Returns a [BlobWriter], which
     /// implements [io::Write] and a [BlobWriter::close].
@@ -37,5 +37,5 @@ pub trait BlobWriter: io::Write {
     /// contents written.
     ///
     /// This consumes self, so it's not possible to close twice.
-    fn close(self) -> Result<[u8; 32], Error>;
+    fn close(self) -> Result<B3Digest, Error>;
 }
diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs
index 0f9d8671a8..2b09033534 100644
--- a/tvix/store/src/blobservice/sled.rs
+++ b/tvix/store/src/blobservice/sled.rs
@@ -1,11 +1,9 @@
+use super::{BlobService, BlobWriter};
+use crate::{B3Digest, Error};
 use std::{
     io::{self, Cursor},
     path::PathBuf,
 };
-
-use super::{BlobService, BlobWriter};
-use crate::Error;
-use data_encoding::BASE64;
 use tracing::instrument;
 
 #[derive(Clone)]
@@ -33,24 +31,24 @@ impl BlobService for SledBlobService {
     type BlobReader = Cursor<Vec<u8>>;
     type BlobWriter = SledBlobWriter;
 
-    #[instrument(name = "SledBlobService::has", skip(self), fields(blob.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
-        match self.db.contains_key(digest) {
+    #[instrument(skip(self), fields(blob.digest=%digest))]
+    fn has(&self, digest: &B3Digest) -> Result<bool, Error> {
+        match self.db.contains_key(digest.to_vec()) {
             Ok(has) => Ok(has),
             Err(e) => Err(Error::StorageError(e.to_string())),
         }
     }
 
-    #[instrument(name = "SledBlobService::open_read", skip(self), fields(blob.digest=BASE64.encode(digest)))]
-    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
-        match self.db.get(digest) {
+    #[instrument(skip(self), fields(blob.digest=%digest))]
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> {
+        match self.db.get(digest.to_vec()) {
             Ok(None) => Ok(None),
             Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))),
             Err(e) => Err(Error::StorageError(e.to_string())),
         }
     }
 
-    #[instrument(name = "SledBlobService::open_write", skip(self))]
+    #[instrument(skip(self))]
     fn open_write(&self) -> Result<Self::BlobWriter, Error> {
         Ok(SledBlobWriter::new(self.db.clone()))
     }
@@ -84,15 +82,13 @@ impl io::Write for SledBlobWriter {
 }
 
 impl BlobWriter for SledBlobWriter {
-    fn close(self) -> Result<[u8; 32], Error> {
+    fn close(self) -> Result<B3Digest, Error> {
         let digest = self.hasher.finalize();
         self.db
             .insert(digest.as_bytes(), self.buf)
             .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?;
 
-        digest
-            .to_owned()
-            .try_into()
-            .map_err(|_| Error::StorageError("invalid digest length in response".to_string()))
+        // We know self.hasher is doing blake3 hashing, so this won't fail.
+        Ok(B3Digest::from_vec(digest.as_bytes().to_vec()).unwrap())
     }
 }
diff --git a/tvix/store/src/nar/renderer.rs b/tvix/store/src/nar/renderer.rs
index 9b71d24ac2..1804674fc4 100644
--- a/tvix/store/src/nar/renderer.rs
+++ b/tvix/store/src/nar/renderer.rs
@@ -1,15 +1,13 @@
-use std::io::{self, BufReader};
-
+use super::RenderError;
 use crate::{
     blobservice::BlobService,
     directoryservice::DirectoryService,
     proto::{self, NamedNode},
     B3Digest,
 };
-use data_encoding::BASE64;
 use nix_compat::nar;
-
-use super::RenderError;
+use std::io::{self, BufReader};
+use tracing::warn;
 
 /// A NAR renderer, using a blob_service, chunk_service and directory_service
 /// to render a NAR to a writer.
@@ -58,19 +56,26 @@ impl<BS: BlobService, DS: DirectoryService> NARRenderer<BS, DS> {
                     .map_err(RenderError::NARWriterError)?;
             }
             proto::node::Node::File(proto_file_node) => {
-                let digest: [u8; 32] =
-                    proto_file_node.digest.to_owned().try_into().map_err(|_e| {
-                        RenderError::StoreError(crate::Error::StorageError(
-                            "invalid digest len in file node".to_string(),
-                        ))
-                    })?;
+                let digest = B3Digest::from_vec(proto_file_node.digest.clone()).map_err(|_e| {
+                    warn!(
+                        file_node = ?proto_file_node,
+                        "invalid digest length in file node",
+                    );
+
+                    RenderError::StoreError(crate::Error::StorageError(
+                        "invalid digest len in file node".to_string(),
+                    ))
+                })?;
 
-                // TODO: handle error
-                let mut blob_reader = match self.blob_service.open_read(&digest).unwrap() {
+                let mut blob_reader = match self
+                    .blob_service
+                    .open_read(&digest)
+                    .map_err(|e| RenderError::StoreError(e))?
+                {
                     Some(blob_reader) => Ok(BufReader::new(blob_reader)),
                     None => Err(RenderError::NARWriterError(io::Error::new(
                         io::ErrorKind::NotFound,
-                        format!("blob with digest {} not found", BASE64.encode(&digest)),
+                        format!("blob with digest {} not found", &digest),
                     ))),
                 }?;
 
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index d1e98a5b79..3ec1d68872 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -1,8 +1,8 @@
 use crate::{
     blobservice::{BlobService, BlobWriter},
     proto::sync_read_into_async_read::SyncReadIntoAsyncRead,
+    B3Digest,
 };
-use data_encoding::BASE64;
 use std::{collections::VecDeque, io, pin::Pin};
 use tokio::task;
 use tokio_stream::StreamExt;
@@ -36,10 +36,7 @@ impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server
         request: Request<super::StatBlobRequest>,
     ) -> Result<Response<super::BlobMeta>, Status> {
         let rq = request.into_inner();
-        let req_digest: [u8; 32] = rq
-            .digest
-            .clone()
-            .try_into()
+        let req_digest = B3Digest::from_vec(rq.digest)
             .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
 
         if rq.include_chunks || rq.include_bao {
@@ -48,10 +45,7 @@ impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server
 
         match self.blob_service.has(&req_digest) {
             Ok(true) => Ok(Response::new(super::BlobMeta::default())),
-            Ok(false) => Err(Status::not_found(format!(
-                "blob {} not found",
-                BASE64.encode(&req_digest)
-            ))),
+            Ok(false) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
             Err(e) => Err(e.into()),
         }
     }
@@ -63,11 +57,8 @@ impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server
     ) -> Result<Response<Self::ReadStream>, Status> {
         let rq = request.into_inner();
 
-        let req_digest: [u8; 32] = rq
-            .digest
-            .clone()
-            .try_into()
-            .map_err(|_| Status::invalid_argument("invalid digest length"))?;
+        let req_digest = B3Digest::from_vec(rq.digest)
+            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
 
         match self.blob_service.open_read(&req_digest) {
             Ok(Some(reader)) => {
@@ -87,10 +78,7 @@ impl<BS: BlobService + Send + Sync + Clone + 'static> super::blob_service_server
                 let chunks_stream = ReaderStream::new(async_reader).map(stream_mapper);
                 Ok(Response::new(Box::pin(chunks_stream)))
             }
-            Ok(None) => Err(Status::not_found(format!(
-                "blob {} not found",
-                BASE64.encode(&rq.digest)
-            ))),
+            Ok(None) => Err(Status::not_found(format!("blob {} not found", &req_digest))),
             Err(e) => Err(e.into()),
         }
     }
diff --git a/tvix/store/src/tests/fixtures.rs b/tvix/store/src/tests/fixtures.rs
index 6d38dd4572..934d9e4c53 100644
--- a/tvix/store/src/tests/fixtures.rs
+++ b/tvix/store/src/tests/fixtures.rs
@@ -1,4 +1,7 @@
-use crate::proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode};
+use crate::{
+    proto::{self, Directory, DirectoryNode, FileNode, SymlinkNode},
+    B3Digest,
+};
 use lazy_static::lazy_static;
 
 pub const HELLOWORLD_BLOB_CONTENTS: &[u8] = b"Hello World!";
@@ -12,18 +15,19 @@ lazy_static! {
     ];
     pub static ref DUMMY_DATA_1: Vec<u8> = vec![0x01, 0x02, 0x03];
     pub static ref DUMMY_DATA_2: Vec<u8> = vec![0x04, 0x05];
-    pub static ref HELLOWORLD_BLOB_DIGEST: Vec<u8> =
-        blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().to_vec();
-    pub static ref EMPTY_BLOB_DIGEST: Vec<u8> =
-        blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().to_vec();
+
+    pub static ref HELLOWORLD_BLOB_DIGEST: B3Digest =
+        blake3::hash(HELLOWORLD_BLOB_CONTENTS).as_bytes().into();
+    pub static ref EMPTY_BLOB_DIGEST: B3Digest =
+        blake3::hash(EMPTY_BLOB_CONTENTS).as_bytes().into();
 
     // 2 bytes
     pub static ref BLOB_A: Vec<u8> = vec![0x00, 0x01];
-    pub static ref BLOB_A_DIGEST: Vec<u8> = blake3::hash(&BLOB_A).as_bytes().to_vec();
+    pub static ref BLOB_A_DIGEST: B3Digest = blake3::hash(&BLOB_A).as_bytes().into();
 
     // 1MB
     pub static ref BLOB_B: Vec<u8> = (0..255).collect::<Vec<u8>>().repeat(4 * 1024);
-    pub static ref BLOB_B_DIGEST: Vec<u8> = blake3::hash(&BLOB_B).as_bytes().to_vec();
+    pub static ref BLOB_B_DIGEST: B3Digest = blake3::hash(&BLOB_B).as_bytes().into();
 
     // Directories
     pub static ref DIRECTORY_WITH_KEEP: proto::Directory = proto::Directory {
diff --git a/tvix/store/src/tests/import.rs b/tvix/store/src/tests/import.rs
index ef4ef74d09..8b66cb024b 100644
--- a/tvix/store/src/tests/import.rs
+++ b/tvix/store/src/tests/import.rs
@@ -61,9 +61,7 @@ fn single_file() {
     );
 
     // ensure the blob has been uploaded
-    assert!(blob_service
-        .has(&HELLOWORLD_BLOB_DIGEST.to_vec().try_into().unwrap())
-        .unwrap());
+    assert!(blob_service.has(&HELLOWORLD_BLOB_DIGEST).unwrap());
 }
 
 #[test]
@@ -111,7 +109,5 @@ fn complicated() {
         .is_some());
 
     // ensure EMPTY_BLOB_CONTENTS has been uploaded
-    assert!(blob_service
-        .has(&EMPTY_BLOB_DIGEST.to_vec().try_into().unwrap())
-        .unwrap());
+    assert!(blob_service.has(&EMPTY_BLOB_DIGEST).unwrap());
 }
diff --git a/tvix/store/src/tests/nar_renderer.rs b/tvix/store/src/tests/nar_renderer.rs
index b5dbf59596..f13107b1e4 100644
--- a/tvix/store/src/tests/nar_renderer.rs
+++ b/tvix/store/src/tests/nar_renderer.rs
@@ -69,7 +69,7 @@ fn single_file_wrong_blob_size() {
         &mut writer,
     )
     .unwrap();
-    assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), writer.close().unwrap());
+    assert_eq!(HELLOWORLD_BLOB_DIGEST.clone(), writer.close().unwrap());
 
     let renderer = NARRenderer::new(blob_service, gen_directory_service());
 
@@ -131,7 +131,7 @@ fn single_file() {
         &mut writer,
     )
     .unwrap();
-    assert_eq!(HELLOWORLD_BLOB_DIGEST.to_vec(), writer.close().unwrap());
+    assert_eq!(HELLOWORLD_BLOB_DIGEST.clone(), writer.close().unwrap());
 
     let renderer = NARRenderer::new(blob_service, gen_directory_service());
     let mut buf: Vec<u8> = vec![];
@@ -164,7 +164,7 @@ fn test_complicated() {
         &mut writer,
     )
     .unwrap();
-    assert_eq!(EMPTY_BLOB_DIGEST.to_vec(), writer.close().unwrap());
+    assert_eq!(EMPTY_BLOB_DIGEST.clone(), writer.close().unwrap());
 
     directory_service.put(DIRECTORY_WITH_KEEP.clone()).unwrap();
     directory_service