diff options
Diffstat (limited to 'tvix/store/src')
-rw-r--r-- | tvix/store/src/blobreader.rs | 59 | ||||
-rw-r--r-- | tvix/store/src/chunkservice/memory.rs | 14 | ||||
-rw-r--r-- | tvix/store/src/chunkservice/mod.rs | 6 | ||||
-rw-r--r-- | tvix/store/src/chunkservice/sled.rs | 12 | ||||
-rw-r--r-- | tvix/store/src/chunkservice/util.rs | 8 | ||||
-rw-r--r-- | tvix/store/src/proto/grpc_blobservice_wrapper.rs | 25 |
6 files changed, 70 insertions, 54 deletions
diff --git a/tvix/store/src/blobreader.rs b/tvix/store/src/blobreader.rs index 4da0f9593a58..ea9239300237 100644 --- a/tvix/store/src/blobreader.rs +++ b/tvix/store/src/blobreader.rs @@ -97,33 +97,42 @@ impl<CS: ChunkService> std::io::Read for BlobReader<'_, CS> { return Ok(bytes_read); } // There's another chunk to visit, fetch its contents - Some(chunk_meta) => match self.chunk_service.get(&chunk_meta.digest) { - // Fetch successful, put it into `self.current_chunk` and restart the loop. - Ok(Some(chunk_data)) => { - // make sure the size matches what chunk_meta says as well. - if chunk_data.len() as u32 != chunk_meta.size { - break Err(std::io::Error::new( + Some(chunk_meta) => { + let chunk_meta_digest: [u8; 32] = + chunk_meta.digest.clone().try_into().map_err(|_e| { + std::io::Error::new( + io::ErrorKind::InvalidData, + format!("chunk in chunkmeta has wrong digest size"), + ) + })?; + match self.chunk_service.get(&chunk_meta_digest) { + // Fetch successful, put it into `self.current_chunk` and restart the loop. + Ok(Some(chunk_data)) => { + // make sure the size matches what chunk_meta says as well. + if chunk_data.len() as u32 != chunk_meta.size { + break Err(std::io::Error::new( io::ErrorKind::InvalidData, format!( "chunk_service returned chunk with wrong size for {}, expected {}, got {}", BASE64.encode(&chunk_meta.digest), chunk_meta.size, chunk_data.len() ) )); + } + self.current_chunk = Some(Cursor::new(chunk_data)); + } + // Chunk requested does not exist + Ok(None) => { + break Err(std::io::Error::new( + io::ErrorKind::NotFound, + format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)), + )) + } + // Error occured while fetching the next chunk, propagate the error from the chunk service + Err(e) => { + break Err(std::io::Error::new(io::ErrorKind::InvalidData, e)); } - self.current_chunk = Some(Cursor::new(chunk_data)); - } - // Chunk requested does not exist - Ok(None) => { - break Err(std::io::Error::new( - io::ErrorKind::NotFound, - format!("chunk {} not found", BASE64.encode(&chunk_meta.digest)), - )) - } - // Error occured while fetching the next chunk, propagate the error from the chunk service - Err(e) => { - break Err(std::io::Error::new(io::ErrorKind::InvalidData, e)); } - }, + } } } } @@ -196,7 +205,7 @@ mod tests { // assemble a blobmeta let blobmeta = proto::BlobMeta { chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst, + digest: dgst.to_vec(), size: 0, }], inline_bao: vec![], @@ -228,7 +237,7 @@ mod tests { // assemble a blobmeta let blobmeta = proto::BlobMeta { chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst, + digest: dgst.to_vec(), size: 3, }], inline_bao: vec![], @@ -260,7 +269,7 @@ mod tests { // assemble a blobmeta let blobmeta = proto::BlobMeta { chunks: vec![proto::blob_meta::ChunkMeta { - digest: dgst_1, + digest: dgst_1.to_vec(), size: 42, }], inline_bao: vec![], @@ -294,15 +303,15 @@ mod tests { let blobmeta = proto::BlobMeta { chunks: vec![ proto::blob_meta::ChunkMeta { - digest: dgst_1.clone(), + digest: dgst_1.to_vec(), size: 3, }, proto::blob_meta::ChunkMeta { - digest: dgst_2, + digest: dgst_2.to_vec(), size: 2, }, proto::blob_meta::ChunkMeta { - digest: dgst_1, + digest: dgst_1.to_vec(), size: 3, }, ], diff --git a/tvix/store/src/chunkservice/memory.rs b/tvix/store/src/chunkservice/memory.rs index d9a5ead4de4a..55e646ea7714 100644 --- a/tvix/store/src/chunkservice/memory.rs +++ b/tvix/store/src/chunkservice/memory.rs @@ -11,18 +11,18 @@ use super::ChunkService; #[derive(Clone, Default)] pub struct MemoryChunkService { - db: Arc<RwLock<HashMap<Vec<u8>, Vec<u8>>>>, + db: Arc<RwLock<HashMap<[u8; 32], Vec<u8>>>>, } impl ChunkService for MemoryChunkService { #[instrument(skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))] - fn has(&self, digest: &[u8]) -> Result<bool, Error> { + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { let db = self.db.read().unwrap(); Ok(db.get(digest).is_some()) } #[instrument(skip(self), fields(chunk.digest=BASE64.encode(digest)))] - fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error> { + fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> { let db = self.db.read().unwrap(); match db.get(digest) { None => Ok(None), @@ -42,12 +42,12 @@ impl ChunkService for MemoryChunkService { } #[instrument(skip(self, data))] - fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> { - let digest = blake3::hash(&data).as_bytes().to_vec(); + fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> { + let digest = blake3::hash(&data); let mut db = self.db.write().unwrap(); - db.insert(digest.clone(), data); + db.insert(digest.as_bytes().clone(), data); - Ok(digest) + Ok(digest.as_bytes().clone()) } } diff --git a/tvix/store/src/chunkservice/mod.rs b/tvix/store/src/chunkservice/mod.rs index 50365caccf4a..faf0a88f151a 100644 --- a/tvix/store/src/chunkservice/mod.rs +++ b/tvix/store/src/chunkservice/mod.rs @@ -17,12 +17,12 @@ pub use self::util::upload_chunk; /// chunking information. pub trait ChunkService { /// check if the service has a chunk, given by its digest. - fn has(&self, digest: &[u8]) -> Result<bool, Error>; + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error>; /// retrieve a chunk by its digest. Implementations MUST validate the digest /// matches. - fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error>; + fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error>; /// insert a chunk. returns the digest of the chunk, or an error. - fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error>; + fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error>; } diff --git a/tvix/store/src/chunkservice/sled.rs b/tvix/store/src/chunkservice/sled.rs index 5cb469b2663d..43c98813a240 100644 --- a/tvix/store/src/chunkservice/sled.rs +++ b/tvix/store/src/chunkservice/sled.rs @@ -30,7 +30,7 @@ impl SledChunkService { impl ChunkService for SledChunkService { #[instrument(name = "SledChunkService::has", skip(self, digest), fields(chunk.digest=BASE64.encode(digest)))] - fn has(&self, digest: &[u8]) -> Result<bool, Error> { + fn has(&self, digest: &[u8; 32]) -> Result<bool, Error> { match self.db.get(digest) { Ok(None) => Ok(false), Ok(Some(_)) => Ok(true), @@ -39,7 +39,7 @@ impl ChunkService for SledChunkService { } #[instrument(name = "SledChunkService::get", skip(self), fields(chunk.digest=BASE64.encode(digest)))] - fn get(&self, digest: &[u8]) -> Result<Option<Vec<u8>>, Error> { + fn get(&self, digest: &[u8; 32]) -> Result<Option<Vec<u8>>, Error> { match self.db.get(digest) { Ok(None) => Ok(None), Ok(Some(data)) => { @@ -59,12 +59,12 @@ impl ChunkService for SledChunkService { } #[instrument(name = "SledChunkService::put", skip(self, data))] - fn put(&self, data: Vec<u8>) -> Result<Vec<u8>, Error> { - let digest = blake3::hash(&data).as_bytes().to_vec(); - let result = self.db.insert(&digest, data); + fn put(&self, data: Vec<u8>) -> Result<[u8; 32], Error> { + let digest = blake3::hash(&data); + let result = self.db.insert(&digest.as_bytes(), data); if let Err(e) = result { return Err(Error::StorageError(e.to_string())); } - Ok(digest) + Ok(digest.as_bytes().clone()) } } diff --git a/tvix/store/src/chunkservice/util.rs b/tvix/store/src/chunkservice/util.rs index 761a58a646a3..1121a791fc94 100644 --- a/tvix/store/src/chunkservice/util.rs +++ b/tvix/store/src/chunkservice/util.rs @@ -9,7 +9,7 @@ use super::ChunkService; pub fn upload_chunk<CS: ChunkService>( chunk_service: &CS, chunk_data: Vec<u8>, -) -> Result<Vec<u8>, Error> { +) -> Result<[u8; 32], Error> { let mut hasher = blake3::Hasher::new(); update_hasher(&mut hasher, &chunk_data); let digest = hasher.finalize(); @@ -19,9 +19,9 @@ pub fn upload_chunk<CS: ChunkService>( } let digest_resp = chunk_service.put(chunk_data)?; - assert_eq!(digest_resp, digest.as_bytes()); + assert_eq!(&digest_resp, digest.as_bytes()); - Ok(digest.as_bytes().to_vec()) + Ok(digest.as_bytes().clone()) } /// reads through a reader, writes chunks to a [ChunkService] and returns a @@ -56,7 +56,7 @@ pub fn read_all_and_chunk<CS: ChunkService, R: Read>( let chunk_digest = upload_chunk(chunk_service, chunk.data)?; blob_meta.chunks.push(proto::blob_meta::ChunkMeta { - digest: chunk_digest, + digest: chunk_digest.to_vec(), size: chunk_len, }); } diff --git a/tvix/store/src/proto/grpc_blobservice_wrapper.rs b/tvix/store/src/proto/grpc_blobservice_wrapper.rs index f4d42503e598..e4a64750b98f 100644 --- a/tvix/store/src/proto/grpc_blobservice_wrapper.rs +++ b/tvix/store/src/proto/grpc_blobservice_wrapper.rs @@ -36,7 +36,7 @@ impl<BS: BlobService, CS: ChunkService> GRPCBlobServiceWrapper<BS, CS> { } let digest_resp = chunk_service.put(chunk_data)?; - assert_eq!(digest_resp, digest.as_bytes()); + assert_eq!(&digest_resp, digest.as_bytes()); Ok(digest.as_bytes().to_vec()) } @@ -74,9 +74,14 @@ impl< let req = request.into_inner(); let (tx, rx) = channel(5); + let req_digest: [u8; 32] = req + .digest + .try_into() + .map_err(|_e| Status::invalid_argument("invalid digest length"))?; + // query the chunk service for more detailed blob info let stat_resp = self.blob_service.stat(&super::StatBlobRequest { - digest: req.digest.to_vec(), + digest: req_digest.to_vec(), include_chunks: true, ..Default::default() })?; @@ -86,7 +91,7 @@ impl< // If the stat didn't return any blobmeta, the client might // still have asked for a single chunk to be read. // Check the chunkstore. - if let Some(data) = self.chunk_service.get(&req.digest)? { + if let Some(data) = self.chunk_service.get(&req_digest)? { // We already know the hash matches, and contrary to // iterating over a blobmeta, we can't know the size, // so send the contents of that chunk over, @@ -101,7 +106,7 @@ impl< } else { return Err(Status::not_found(format!( "blob {} not found", - BASE64.encode(&req.digest), + BASE64.encode(&req_digest), ))); } } @@ -118,13 +123,15 @@ impl< // request chunk. // We don't need to validate the digest again, as // that's required for all implementations of ChunkService. - let res = match chunk_client.get(&chunkmeta.digest) { + // TODO: handle error + let chunkmeta_digest = &chunkmeta.digest.try_into().unwrap(); + let res = match chunk_client.get(&chunkmeta_digest) { Err(e) => Err(e.into()), // TODO: make this a separate error type Ok(None) => Err(Error::StorageError(format!( "consistency error: chunk {} for blob {} not found", - BASE64.encode(&chunkmeta.digest), - BASE64.encode(&req.digest), + BASE64.encode(chunkmeta_digest), + BASE64.encode(&req_digest), )) .into()), Ok(Some(data)) => { @@ -133,8 +140,8 @@ impl< if data.len() as u32 != chunkmeta.size { Err(Error::StorageError(format!( "consistency error: chunk {} for blob {} has wrong size, expected {}, got {}", - BASE64.encode(&chunkmeta.digest), - BASE64.encode(&req.digest), + BASE64.encode(chunkmeta_digest), + BASE64.encode(&req_digest), chunkmeta.size, data.len(), )).into()) |