about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/src/hashing_reader.rs88
-rw-r--r--tvix/castore/src/lib.rs2
2 files changed, 90 insertions, 0 deletions
diff --git a/tvix/castore/src/hashing_reader.rs b/tvix/castore/src/hashing_reader.rs
new file mode 100644
index 000000000000..535236324701
--- /dev/null
+++ b/tvix/castore/src/hashing_reader.rs
@@ -0,0 +1,88 @@
+use pin_project_lite::pin_project;
+use tokio::io::AsyncRead;
+
+pin_project! {
+    /// Wraps an existing AsyncRead, and allows querying for the digest of all
+    /// data read "through" it.
+    /// The hash function is configurable by type parameter.
+    pub struct HashingReader<R, H>
+    where
+        R: AsyncRead,
+        H: digest::Digest,
+    {
+        #[pin]
+        inner: R,
+        hasher: H,
+    }
+}
+
+pub type B3HashingReader<R> = HashingReader<R, blake3::Hasher>;
+
+impl<R, H> HashingReader<R, H>
+where
+    R: AsyncRead,
+    H: digest::Digest,
+{
+    pub fn from(r: R) -> Self {
+        Self {
+            inner: r,
+            hasher: H::new(),
+        }
+    }
+
+    /// Return the digest.
+    pub fn digest(self) -> digest::Output<H> {
+        self.hasher.finalize()
+    }
+}
+
+impl<R, H> tokio::io::AsyncRead for HashingReader<R, H>
+where
+    R: AsyncRead,
+    H: digest::Digest,
+{
+    fn poll_read(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &mut tokio::io::ReadBuf<'_>,
+    ) -> std::task::Poll<std::io::Result<()>> {
+        let buf_filled_len_before = buf.filled().len();
+
+        let this = self.project();
+        let ret = this.inner.poll_read(cx, buf);
+
+        // write everything new filled into the hasher.
+        this.hasher.update(&buf.filled()[buf_filled_len_before..]);
+
+        ret
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::io::Cursor;
+
+    use test_case::test_case;
+
+    use crate::fixtures::BLOB_A;
+    use crate::fixtures::BLOB_A_DIGEST;
+    use crate::fixtures::BLOB_B;
+    use crate::fixtures::BLOB_B_DIGEST;
+    use crate::fixtures::EMPTY_BLOB_DIGEST;
+    use crate::{B3Digest, B3HashingReader};
+
+    #[test_case(&BLOB_A, &BLOB_A_DIGEST; "blob a")]
+    #[test_case(&BLOB_B, &BLOB_B_DIGEST; "blob b")]
+    #[test_case(&[], &EMPTY_BLOB_DIGEST; "empty blob")]
+    #[tokio::test]
+    async fn test_b3_hashing_reader(data: &[u8], b3_digest: &B3Digest) {
+        let r = Cursor::new(data);
+        let mut hr = B3HashingReader::from(r);
+
+        tokio::io::copy(&mut hr, &mut tokio::io::sink())
+            .await
+            .expect("read must succeed");
+
+        assert_eq!(*b3_digest, hr.digest().into());
+    }
+}
diff --git a/tvix/castore/src/lib.rs b/tvix/castore/src/lib.rs
index 8da0edef786b..dec7684b4c57 100644
--- a/tvix/castore/src/lib.rs
+++ b/tvix/castore/src/lib.rs
@@ -1,5 +1,6 @@
 mod digests;
 mod errors;
+mod hashing_reader;
 
 pub mod blobservice;
 pub mod directoryservice;
@@ -15,6 +16,7 @@ pub mod utils;
 
 pub use digests::{B3Digest, B3_LEN};
 pub use errors::Error;
+pub use hashing_reader::{B3HashingReader, HashingReader};
 
 #[cfg(test)]
 mod tests;