about summary refs log tree commit diff
path: root/tvix/store/src
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src')
-rw-r--r--tvix/store/src/blobreader.rs59
-rw-r--r--tvix/store/src/chunkservice/memory.rs14
-rw-r--r--tvix/store/src/chunkservice/mod.rs6
-rw-r--r--tvix/store/src/chunkservice/sled.rs12
-rw-r--r--tvix/store/src/chunkservice/util.rs8
-rw-r--r--tvix/store/src/proto/grpc_blobservice_wrapper.rs25
6 files changed, 70 insertions, 54 deletions
diff --git a/tvix/store/src/blobreader.rs b/tvix/store/src/blobreader.rs
index 4da0f9593a58..ea9239300237 100644
--- a/tvix/store/src/blobreader.rs
+++ b/tvix/store/src/blobreader.rs
@@ -97,33 +97,42 @@ impl<CS: ChunkService> std::io::Read for BlobReader<'_, CS> {
                     return Ok(bytes_read);
                 }
                 // There's another chunk to visit, fetch its contents
-                Some(chunk_meta) => match self.chunk_service.get(&chunk_meta.digest) {
-                    // Fetch successful, put it into `self.current_chunk` and restart the loop.
-                    Ok(Some(chunk_data)) => {
-                        // make sure the size matches what chunk_meta says as well.
-                        if chunk_data.len() as u32 != chunk_meta.size {
-                            break Err(std::io::Error::new(
+                Some(chunk_meta) => {
+                    let chunk_meta_digest: [u8; 32] =
+                        chunk_meta.digest.clone().try_into().map_err(|_e| {
+                            std::io::Error::new(
+                                io::ErrorKind::InvalidData,
+                                format!("chunk in chunkmeta has wrong digest size"),
+                            )
+                        })?;
+                    match self.chunk_service.get(&chunk_meta_digest) {
+                        // Fetch successful, put it into `self.current_chunk` and restart the loop.
+                        Ok(Some(chunk_data)) => {
+                            // make sure the size matches what chunk_meta says as well.
+                            if chunk_data.len() as u32 != chunk_meta.size {
+                                break Err(std::io::Error::new(
                                 io::ErrorKind::InvalidData,
                                 format!(
                                     "chunk_service returned chunk with wrong size for {}, expected {}, got {}",
                                     BASE64.encode(&chunk_meta.digest), chunk_meta.size, chunk_data.len()
                                 )
                             ));
+                            }
+                            self.current_chunk = Some(Cursor::new(chunk_data));
+                        }
+                        // Chunk requested does not exist
+                        Ok(None) => {
+                            break Err(std::io::Error::new(
+                                io::ErrorKind::NotFound,
+                                format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)),
+                            ))
+                        }
+                        // Error occured while fetching the next chunk, propagate the error from the chunk service
+                        Err(e) => {
+                            break Err(std::io::Error::new(io::ErrorKind::InvalidData, e));
                         }
-                        self.current_chunk = Some(Cursor::new(chunk_data));
-                    }
-                    // Chunk requested does not exist
-                    Ok(None) => {
-                        break Err(std::io::Error::new(
-                            io::ErrorKind::NotFound,
-                            format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)),
-                        ))
-                    }
-                    // Error occured while fetching the next chunk, propagate the error from the chunk service
-                    Err(e) => {
-                        break Err(std::io::Error::new(io::ErrorKind::InvalidData, e));
                     }
-                },
+                }
             }
         }
     }
@@ -196,7 +205,7 @@ mod tests {
         // assemble a blobmeta
         let blobmeta = proto::BlobMeta {
             chunks: vec![proto::blob_meta::ChunkMeta {
-                digest: dgst,
+                digest: dgst.to_vec(),
                 size: 0,
             }],
             inline_bao: vec![],
@@ -228,7 +237,7 @@ mod tests {
         // assemble a blobmeta
         let blobmeta = proto::BlobMeta {
             chunks: vec![proto::blob_meta::ChunkMeta {
-                digest: dgst,
+                digest: dgst.to_vec(),
                 size: 3,
             }],
             inline_bao: vec![],
@@ -260,7 +269,7 @@ mod tests {
         // assemble a blobmeta
         let blobmeta = proto::BlobMeta {
             chunks: vec![proto::blob_meta::ChunkMeta {
-                digest: dgst_1,
+                digest: dgst_1.to_vec(),
                 size: 42,
             }],
             inline_bao: vec![],
@@ -294,15 +303,15 @@ mod tests {
         let blobmeta = proto::BlobMeta {
             chunks: vec![
                 proto::blob_meta::ChunkMeta {
-                    digest: dgst_1.clone(),
+                    digest: dgst_1.to_vec(),
                     size: 3,
                 },
                 proto::blob_meta::ChunkMeta {
-                    digest: dgst_2,
+                    digest: dgst_2.to_vec(),
                     size: 2,
                 },
                 proto::blob_meta::ChunkMeta {
-                    digest: dgst_1,
+                    digest: dgst_1.to_vec(),
                     size: 3,
                 },
             ],
diff --git a/tvix/store/src/chunkservice/memory.rs b/tvix/store/src/chunkservice/memory.rs
index d9a5ead4de4a..55e646ea7714 100644
--- a/tvix/store/src/chunkservice/memory.rs
+++ b/tvix/store/src/chunkservice/memory.rs
@@ -11,18 +11,18 @@ use super::ChunkService;
 
 #[derive(Clone, Default)]
 pub struct MemoryChunkService {
-    db: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>,
+    db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>,
 }
 
 impl ChunkService for MemoryChunkService {
     #[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8]) -> Result<bool, Error> {
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
         let db = self.db.read().unwrap();
         Ok(db.get(digest).is_some())
     }
 
     #[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))]
-    fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error> {
+    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
         let db = self.db.read().unwrap();
         match db.get(digest) {
             None => Ok(None),
@@ -42,12 +42,12 @@ impl ChunkService for MemoryChunkService {
     }
 
     #[instrument(skip(self, data))]
-    fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> {
-        let digest = blake3::hash(&data).as_bytes().to_vec();
+    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
+        let digest = blake3::hash(&data);
 
         let mut db = self.db.write().unwrap();
-        db.insert(digest.clone(), data);
+        db.insert(digest.as_bytes().clone(), data);
 
-        Ok(digest)
+        Ok(digest.as_bytes().clone())
     }
 }
diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs
index 50365caccf4a..faf0a88f151a 100644
--- a/tvix/store/src/chunkservice/mod.rs
+++ b/tvix/store/src/chunkservice/mod.rs
@@ -17,12 +17,12 @@ pub use self::util::upload_chunk;
 /// chunking information.
 pub trait ChunkService {
     /// check if the service has a chunk, given by its digest.
-    fn has(&self, digest: &[u8]) -> Result<bool, Error>;
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>;
 
     /// retrieve a chunk by its digest. Implementations MUST validate the digest
     /// matches.
-    fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error>;
+    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error>;
 
     /// insert a chunk. returns the digest of the chunk, or an error.
-    fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error>;
+    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error>;
 }
diff --git a/tvix/store/src/chunkservice/sled.rs b/tvix/store/src/chunkservice/sled.rs
index 5cb469b2663d..43c98813a240 100644
--- a/tvix/store/src/chunkservice/sled.rs
+++ b/tvix/store/src/chunkservice/sled.rs
@@ -30,7 +30,7 @@ impl SledChunkService {
 
 impl ChunkService for SledChunkService {
     #[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))]
-    fn has(&self, digest: &[u8]) -> Result<bool, Error> {
+    fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> {
         match self.db.get(digest) {
             Ok(None) => Ok(false),
             Ok(Some(_)) => Ok(true),
@@ -39,7 +39,7 @@ impl ChunkService for SledChunkService {
     }
 
     #[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))]
-    fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error> {
+    fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> {
         match self.db.get(digest) {
             Ok(None) => Ok(None),
             Ok(Some(data)) => {
@@ -59,12 +59,12 @@ impl ChunkService for SledChunkService {
     }
 
     #[instrument(name = "SledChunkService::put", skip(self, data))]
-    fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> {
-        let digest = blake3::hash(&data).as_bytes().to_vec();
-        let result = self.db.insert(&digest, data);
+    fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> {
+        let digest = blake3::hash(&data);
+        let result = self.db.insert(&digest.as_bytes(), data);
         if let Err(e) = result {
             return Err(Error::StorageError(e.to_string()));
         }
-        Ok(digest)
+        Ok(digest.as_bytes().clone())
     }
 }
diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs
index 761a58a646a3..1121a791fc94 100644
--- a/tvix/store/src/chunkservice/util.rs
+++ b/tvix/store/src/chunkservice/util.rs
@@ -9,7 +9,7 @@ use super::ChunkService;
 pub fn upload_chunk<CS: ChunkService>(
     chunk_service: &CS,
     chunk_data: Vec<u8>,
-) -> Result<Vec<u8>, Error> {
+) -> Result<[u8; 32], Error> {
     let mut hasher = blake3::Hasher::new();
     update_hasher(&mut hasher, &chunk_data);
     let digest = hasher.finalize();
@@ -19,9 +19,9 @@ pub fn upload_chunk<CS: ChunkService>(
     }
     let digest_resp = chunk_service.put(chunk_data)?;
 
-    assert_eq!(digest_resp, digest.as_bytes());
+    assert_eq!(&digest_resp, digest.as_bytes());
 
-    Ok(digest.as_bytes().to_vec())
+    Ok(digest.as_bytes().clone())
 }
 
 /// reads through a reader, writes chunks to a [ChunkService] and returns a
@@ -56,7 +56,7 @@ pub fn read_all_and_chunk<CS: ChunkService, R: Read>(
         let chunk_digest = upload_chunk(chunk_service, chunk.data)?;
 
         blob_meta.chunks.push(proto::blob_meta::ChunkMeta {
-            digest: chunk_digest,
+            digest: chunk_digest.to_vec(),
             size: chunk_len,
         });
     }
diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
index f4d42503e598..e4a64750b98f 100644
--- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs
+++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs
@@ -36,7 +36,7 @@ impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> {
         }
         let digest_resp = chunk_service.put(chunk_data)?;
 
-        assert_eq!(digest_resp, digest.as_bytes());
+        assert_eq!(&digest_resp, digest.as_bytes());
 
         Ok(digest.as_bytes().to_vec())
     }
@@ -74,9 +74,14 @@ impl<
         let req = request.into_inner();
         let (tx, rx) = channel(5);
 
+        let req_digest: [u8; 32] = req
+            .digest
+            .try_into()
+            .map_err(|_e| Status::invalid_argument("invalid digest length"))?;
+
         // query the chunk service for more detailed blob info
         let stat_resp = self.blob_service.stat(&super::StatBlobRequest {
-            digest: req.digest.to_vec(),
+            digest: req_digest.to_vec(),
             include_chunks: true,
             ..Default::default()
         })?;
@@ -86,7 +91,7 @@ impl<
                 // If the stat didn't return any blobmeta, the client might
                 // still have asked for a single chunk to be read.
                 // Check the chunkstore.
-                if let Some(data) = self.chunk_service.get(&req.digest)? {
+                if let Some(data) = self.chunk_service.get(&req_digest)? {
                     // We already know the hash matches, and contrary to
                     // iterating over a blobmeta, we can't know the size,
                     // so send the contents of that chunk over,
@@ -101,7 +106,7 @@ impl<
                 } else {
                     return Err(Status::not_found(format!(
                         "blob {} not found",
-                        BASE64.encode(&req.digest),
+                        BASE64.encode(&req_digest),
                     )));
                 }
             }
@@ -118,13 +123,15 @@ impl<
                         // request chunk.
                         // We don't need to validate the digest again, as
                         // that's required for all implementations of ChunkService.
-                        let res = match chunk_client.get(&chunkmeta.digest) {
+                        // TODO: handle error
+                        let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap();
+                        let res = match chunk_client.get(&chunkmeta_digest) {
                             Err(e) => Err(e.into()),
                             // TODO: make this a separate error type
                             Ok(None) => Err(Error::StorageError(format!(
                                 "consistency error: chunk {} for blob {} not found",
-                                BASE64.encode(&chunkmeta.digest),
-                                BASE64.encode(&req.digest),
+                                BASE64.encode(chunkmeta_digest),
+                                BASE64.encode(&req_digest),
                             ))
                             .into()),
                             Ok(Some(data)) => {
@@ -133,8 +140,8 @@ impl<
                                 if data.len() as u32 != chunkmeta.size {
                                     Err(Error::StorageError(format!(
                                         "consistency error: chunk {} for blob {} has wrong size, expected {}, got {}",
-                                        BASE64.encode(&chunkmeta.digest),
-                                        BASE64.encode(&req.digest),
+                                        BASE64.encode(chunkmeta_digest),
+                                        BASE64.encode(&req_digest),
                                         chunkmeta.size,
                                         data.len(),
                                     )).into())