about summary refs log tree commit diff
path: root/tvix/store/src/blobservice
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2023-05-22T11·42+0300
committerclbot <clbot@tvl.fyi>2023-05-23T10·48+0000
commit066179651c999e9680edae11817ca4ab42acc1ea (patch)
treedcc9badcaf27e0a0bc43842e8b3520e39080f351 /tvix/store/src/blobservice
parentb8ff08b1b0d2dbd8dd546dc9cbdea2f11304d5c8 (diff)
refactor(tvix/store/blobsvc): move from Vec<u8> to B3Digest r/6178
Change-Id: I809bab75221f81b6023cfe75c2fe9e589c1e9192
Reviewed-on: https://cl.tvl.fyi/c/depot/+/8605
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: tazjin <tazjin@tvl.su>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/store/src/blobservice')
-rw-r--r--tvix/store/src/blobservice/grpc.rs25
-rw-r--r--tvix/store/src/blobservice/memory.rs24
-rw-r--r--tvix/store/src/blobservice/mod.rs8
-rw-r--r--tvix/store/src/blobservice/sled.rs28
4 files changed, 38 insertions, 47 deletions
diff --git a/tvix/store/src/blobservice/grpc.rs b/tvix/store/src/blobservice/grpc.rs
index a50c94faf5e9..9c07345839e3 100644
--- a/tvix/store/src/blobservice/grpc.rs
+++ b/tvix/store/src/blobservice/grpc.rs
@@ -1,4 +1,5 @@
-use data_encoding::BASE64;
+use super::{BlobService, BlobWriter};
+use crate::{proto, B3Digest};
 use futures::sink::{SinkExt, SinkMapErr};
 use std::{collections::VecDeque, io};
 use tokio::task::JoinHandle;
@@ -10,9 +11,6 @@ use tokio_util::{
 use tonic::{transport::Channel, Code, Status, Streaming};
 use tracing::instrument;
 
-use super::{BlobService, BlobWriter};
-use crate::proto;
-
 /// Connects to a (remote) tvix-store BlobService over gRPC.
 #[derive(Clone)]
 pub struct GRPCBlobService {
@@ -30,11 +28,11 @@ impl BlobService for GRPCBlobService {
     type BlobReader = Box<dyn io::Read + Send>;
     type BlobWriter = GRPCBlobWriter;
 
-    #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, crate::Error> {
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    fn has(&self, digest: &B3Digest) -> Result<bool, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
-        let digest = digest.to_owned();
+        let digest = digest.clone();
 
         let task: tokio::task::JoinHandle<Result<_, Status>> =
             self.tokio_handle.spawn(async move {
@@ -56,10 +54,10 @@ impl BlobService for GRPCBlobService {
 
     // On success, this returns a Ok(Some(io::Read)), which can be used to read
     // the contents of the Blob, identified by the digest.
-    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, crate::Error> {
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, crate::Error> {
         // Get a new handle to the gRPC client, and copy the digest.
         let mut grpc_client = self.grpc_client.clone();
-        let digest = digest.to_owned();
+        let digest = digest.clone();
 
         // Construct the task that'll send out the request and return the stream
         // the gRPC client should use to send [proto::BlobChunk], or an error if
@@ -68,7 +66,7 @@ impl BlobService for GRPCBlobService {
             self.tokio_handle.spawn(async move {
                 let stream = grpc_client
                     .read(proto::ReadBlobRequest {
-                        digest: digest.into(),
+                        digest: digest.to_vec(),
                     })
                     .await?
                     .into_inner();
@@ -164,7 +162,7 @@ pub struct GRPCBlobWriter {
 }
 
 impl BlobWriter for GRPCBlobWriter {
-    fn close(mut self) -> Result<[u8; 32], crate::Error> {
+    fn close(mut self) -> Result<B3Digest, crate::Error> {
         // invoke shutdown, so the inner writer closes its internal tx side of
         // the channel.
         self.inner_writer
@@ -177,8 +175,9 @@ impl BlobWriter for GRPCBlobWriter {
         match self.tokio_handle.block_on(self.task)? {
             Ok(resp) => {
                 // return the digest from the response.
-                let digest: Vec<u8> = resp.digest;
-                Ok(digest.try_into().unwrap())
+                B3Digest::from_vec(resp.digest).map_err(|_| {
+                    crate::Error::StorageError("invalid root digest length in response".to_string())
+                })
             }
             Err(e) => Err(crate::Error::StorageError(e.to_string())),
         }
diff --git a/tvix/store/src/blobservice/memory.rs b/tvix/store/src/blobservice/memory.rs
index 9a796ca2c0c8..1ee59d108743 100644
--- a/tvix/store/src/blobservice/memory.rs
+++ b/tvix/store/src/blobservice/memory.rs
@@ -1,4 +1,3 @@
-use data_encoding::BASE64;
 use std::io::Cursor;
 use std::{
     collections::HashMap,
@@ -7,27 +6,24 @@ use std::{
 use tracing::{instrument, warn};
 
 use super::{BlobService, BlobWriter};
-use crate::Error;
-
-// type B3Digest = [u8; 32];
-// struct B3Digest ([u8; 32]);
+use crate::{B3Digest, Error};
 
 #[derive(Clone, Default)]
 pub struct MemoryBlobService {
-    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
+    db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
 }
 
 impl BlobService for MemoryBlobService {
     type BlobReader = Cursor<Vec<u8>>;
     type BlobWriter = MemoryBlobWriter;
 
-    #[instrument(skip(self, digest), fields(blob.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
+    #[instrument(skip(self, digest), fields(blob.digest=%digest))]
+    fn has(&self, digest: &B3Digest) -> Result<bool, Error> {
         let db = self.db.read().unwrap();
         Ok(db.contains_key(digest))
     }
 
-    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> {
         let db = self.db.read().unwrap();
 
         Ok(db.get(digest).map(|x| Cursor::new(x.clone())))
@@ -40,13 +36,13 @@ impl BlobService for MemoryBlobService {
 }
 
 pub struct MemoryBlobWriter {
-    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
+    db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>,
 
     buf: Vec<u8>,
 }
 
 impl MemoryBlobWriter {
-    fn new(db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>) -> Self {
+    fn new(db: Arc<RwLock<HashMap<B3Digest, Vec<u8>>>>) -> Self {
         Self {
             buf: Vec::new(),
             db,
@@ -64,16 +60,16 @@ impl std::io::Write for MemoryBlobWriter {
 }
 
 impl BlobWriter for MemoryBlobWriter {
-    fn close(self) -> Result<[u8; 32], Error> {
+    fn close(self) -> Result<B3Digest, Error> {
         // in this memory implementation, we don't actually bother hashing
         // incrementally while writing, but do it at the end.
         let mut hasher = blake3::Hasher::new();
         hasher.update(&self.buf);
-        let digest: [u8; 32] = hasher.finalize().into();
+        let digest = B3Digest::from_vec(hasher.finalize().as_bytes().to_vec()).unwrap();
 
         // open the database for writing.
         let mut db = self.db.write()?;
-        db.insert(digest, self.buf);
+        db.insert(digest.clone(), self.buf);
 
         Ok(digest)
     }
diff --git a/tvix/store/src/blobservice/mod.rs b/tvix/store/src/blobservice/mod.rs
index 0e7433b0e786..c5a2de124656 100644
--- a/tvix/store/src/blobservice/mod.rs
+++ b/tvix/store/src/blobservice/mod.rs
@@ -1,6 +1,6 @@
 use std::io;
 
-use crate::Error;
+use crate::{B3Digest, Error};
 
 mod grpc;
 mod memory;
@@ -19,10 +19,10 @@ pub trait BlobService {
     type BlobWriter: BlobWriter + Send;
 
     /// Check if the service has the blob, by its content hash.
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
+    fn has(&self, digest: &B3Digest) -> Result<bool, Error>;
 
     /// Request a blob from the store, by its content hash. Returns a Option<BlobReader>.
-    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error>;
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error>;
 
     /// Insert a new blob into the store. Returns a [BlobWriter], which
     /// implements [io::Write] and a [BlobWriter::close].
@@ -37,5 +37,5 @@ pub trait BlobWriter: io::Write {
     /// contents written.
     ///
     /// This consumes self, so it's not possible to close twice.
-    fn close(self) -> Result<[u8; 32], Error>;
+    fn close(self) -> Result<B3Digest, Error>;
 }
diff --git a/tvix/store/src/blobservice/sled.rs b/tvix/store/src/blobservice/sled.rs
index 0f9d8671a82b..2b090335344d 100644
--- a/tvix/store/src/blobservice/sled.rs
+++ b/tvix/store/src/blobservice/sled.rs
@@ -1,11 +1,9 @@
+use super::{BlobService, BlobWriter};
+use crate::{B3Digest, Error};
 use std::{
     io::{self, Cursor},
     path::PathBuf,
 };
-
-use super::{BlobService, BlobWriter};
-use crate::Error;
-use data_encoding::BASE64;
 use tracing::instrument;
 
 #[derive(Clone)]
@@ -33,24 +31,24 @@ impl BlobService for SledBlobService {
     type BlobReader = Cursor<Vec<u8>>;
     type BlobWriter = SledBlobWriter;
 
-    #[instrument(name = "SledBlobService::has", skip(self), fields(blob.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
-        match self.db.contains_key(digest) {
+    #[instrument(skip(self), fields(blob.digest=%digest))]
+    fn has(&self, digest: &B3Digest) -> Result<bool, Error> {
+        match self.db.contains_key(digest.to_vec()) {
             Ok(has) => Ok(has),
             Err(e) => Err(Error::StorageError(e.to_string())),
         }
     }
 
-    #[instrument(name = "SledBlobService::open_read", skip(self), fields(blob.digest=BASE64.encode(digest)))]
-    fn open_read(&self, digest: &[u8; 32]) -> Result<Option<Self::BlobReader>, Error> {
-        match self.db.get(digest) {
+    #[instrument(skip(self), fields(blob.digest=%digest))]
+    fn open_read(&self, digest: &B3Digest) -> Result<Option<Self::BlobReader>, Error> {
+        match self.db.get(digest.to_vec()) {
             Ok(None) => Ok(None),
             Ok(Some(data)) => Ok(Some(Cursor::new(data[..].to_vec()))),
             Err(e) => Err(Error::StorageError(e.to_string())),
         }
     }
 
-    #[instrument(name = "SledBlobService::open_write", skip(self))]
+    #[instrument(skip(self))]
     fn open_write(&self) -> Result<Self::BlobWriter, Error> {
         Ok(SledBlobWriter::new(self.db.clone()))
     }
@@ -84,15 +82,13 @@ impl io::Write for SledBlobWriter {
 }
 
 impl BlobWriter for SledBlobWriter {
-    fn close(self) -> Result<[u8; 32], Error> {
+    fn close(self) -> Result<B3Digest, Error> {
         let digest = self.hasher.finalize();
         self.db
             .insert(digest.as_bytes(), self.buf)
             .map_err(|e| Error::StorageError(format!("unable to insert blob: {}", e)))?;
 
-        digest
-            .to_owned()
-            .try_into()
-            .map_err(|_| Error::StorageError("invalid digest length in response".to_string()))
+        // We know self.hasher is doing blake3 hashing, so this won't fail.
+        Ok(B3Digest::from_vec(digest.as_bytes().to_vec()).unwrap())
     }
 }