diff options
Diffstat (limited to 'tvix/castore')
-rw-r--r-- | tvix/castore/src/hashing_reader.rs | 88 | ||||
-rw-r--r-- | tvix/castore/src/lib.rs | 2 |
2 files changed, 90 insertions, 0 deletions
diff --git a/tvix/castore/src/hashing_reader.rs b/tvix/castore/src/hashing_reader.rs new file mode 100644 index 000000000000..535236324701 --- /dev/null +++ b/tvix/castore/src/hashing_reader.rs @@ -0,0 +1,88 @@ +use pin_project_lite::pin_project; +use tokio::io::AsyncRead; + +pin_project! { + /// Wraps an existing AsyncRead, and allows querying for the digest of all + /// data read "through" it. + /// The hash function is configurable by type parameter. + pub struct HashingReader<R, H> + where + R: AsyncRead, + H: digest::Digest, + { + #[pin] + inner: R, + hasher: H, + } +} + +pub type B3HashingReader<R> = HashingReader<R, blake3::Hasher>; + +impl<R, H> HashingReader<R, H> +where + R: AsyncRead, + H: digest::Digest, +{ + pub fn from(r: R) -> Self { + Self { + inner: r, + hasher: H::new(), + } + } + + /// Return the digest. + pub fn digest(self) -> digest::Output<H> { + self.hasher.finalize() + } +} + +impl<R, H> tokio::io::AsyncRead for HashingReader<R, H> +where + R: AsyncRead, + H: digest::Digest, +{ + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll<std::io::Result<()>> { + let buf_filled_len_before = buf.filled().len(); + + let this = self.project(); + let ret = this.inner.poll_read(cx, buf); + + // write everything new filled into the hasher. + this.hasher.update(&buf.filled()[buf_filled_len_before..]); + + ret + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use test_case::test_case; + + use crate::fixtures::BLOB_A; + use crate::fixtures::BLOB_A_DIGEST; + use crate::fixtures::BLOB_B; + use crate::fixtures::BLOB_B_DIGEST; + use crate::fixtures::EMPTY_BLOB_DIGEST; + use crate::{B3Digest, B3HashingReader}; + + #[test_case(&BLOB_A, &BLOB_A_DIGEST; "blob a")] + #[test_case(&BLOB_B, &BLOB_B_DIGEST; "blob b")] + #[test_case(&[], &EMPTY_BLOB_DIGEST; "empty blob")] + #[tokio::test] + async fn test_b3_hashing_reader(data: &[u8], b3_digest: &B3Digest) { + let r = Cursor::new(data); + let mut hr = B3HashingReader::from(r); + + tokio::io::copy(&mut hr, &mut tokio::io::sink()) + .await + .expect("read must succeed"); + + assert_eq!(*b3_digest, hr.digest().into()); + } +} diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs index 8da0edef786b..dec7684b4c57 100644 --- a/tvix/castore/src/lib.rs +++ b/tvix/castore/src/lib.rs @@ -1,5 +1,6 @@ mod digests; mod errors; +mod hashing_reader; pub mod blobservice; pub mod directoryservice; @@ -15,6 +16,7 @@ pub mod utils; pub use digests::{B3Digest, B3_LEN}; pub use errors::Error; +pub use hashing_reader::{B3HashingReader, HashingReader}; #[cfg(test)] mod tests; |