about summary refs log tree commit diff
path: root/tvix/castore/src/hashing_reader.rs
diff options
context:
space:
mode:
authorFlorian Klink <flokli@flokli.de>2024-03-03T12·46+0200
committerclbot <clbot@tvl.fyi>2024-03-03T15·31+0000
commit4b4443240e45c5200d3135acccd4e52ffa8d706c (patch)
treea483c9b14d8d862189956e418d5664a5e034087f /tvix/castore/src/hashing_reader.rs
parent8383e9e02e1f762013a652e9a842493a1be5bb60 (diff)
feat(tvix/castore): add HashingReader, B3HashingReader r/7641
HashingReader wraps an existing AsyncRead, and allows querying for the
digest of all data read "through" it.
The hash function is configurable by type parameter, and we define
B3HashingReader.

Change-Id: Ic08142077566fc08836662218f5ec8c3aff80be5
Reviewed-on: https://cl.tvl.fyi/c/depot/+/11087
Autosubmit: flokli <flokli@flokli.de>
Reviewed-by: raitobezarius <tvl@lahfa.xyz>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/hashing_reader.rs')
-rw-r--r--tvix/castore/src/hashing_reader.rs88
1 files changed, 88 insertions, 0 deletions
diff --git a/tvix/castore/src/hashing_reader.rs b/tvix/castore/src/hashing_reader.rs
new file mode 100644
index 0000000000..5352363247
--- /dev/null
+++ b/tvix/castore/src/hashing_reader.rs
@@ -0,0 +1,88 @@
+use pin_project_lite::pin_project;
+use tokio::io::AsyncRead;
+
+pin_project! {
+    /// Wraps an existing AsyncRead, and allows querying for the digest of all
+    /// data read "through" it.
+    /// The hash function is configurable by type parameter.
+    pub struct HashingReader<R, H>
+    where
+        R: AsyncRead,
+        H: digest::Digest,
+    {
+        #[pin]
+        inner: R,
+        hasher: H,
+    }
+}
+
+pub type B3HashingReader<R> = HashingReader<R, blake3::Hasher>;
+
+impl<R, H> HashingReader<R, H>
+where
+    R: AsyncRead,
+    H: digest::Digest,
+{
+    pub fn from(r: R) -> Self {
+        Self {
+            inner: r,
+            hasher: H::new(),
+        }
+    }
+
+    /// Return the digest.
+    pub fn digest(self) -> digest::Output<H> {
+        self.hasher.finalize()
+    }
+}
+
+impl<R, H> tokio::io::AsyncRead for HashingReader<R, H>
+where
+    R: AsyncRead,
+    H: digest::Digest,
+{
+    fn poll_read(
+        self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &mut tokio::io::ReadBuf<'_>,
+    ) -> std::task::Poll<std::io::Result<()>> {
+        let buf_filled_len_before = buf.filled().len();
+
+        let this = self.project();
+        let ret = this.inner.poll_read(cx, buf);
+
+        // write everything new filled into the hasher.
+        this.hasher.update(&buf.filled()[buf_filled_len_before..]);
+
+        ret
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::io::Cursor;
+
+    use test_case::test_case;
+
+    use crate::fixtures::BLOB_A;
+    use crate::fixtures::BLOB_A_DIGEST;
+    use crate::fixtures::BLOB_B;
+    use crate::fixtures::BLOB_B_DIGEST;
+    use crate::fixtures::EMPTY_BLOB_DIGEST;
+    use crate::{B3Digest, B3HashingReader};
+
+    #[test_case(&BLOB_A, &BLOB_A_DIGEST; "blob a")]
+    #[test_case(&BLOB_B, &BLOB_B_DIGEST; "blob b")]
+    #[test_case(&[], &EMPTY_BLOB_DIGEST; "empty blob")]
+    #[tokio::test]
+    async fn test_b3_hashing_reader(data: &[u8], b3_digest: &B3Digest) {
+        let r = Cursor::new(data);
+        let mut hr = B3HashingReader::from(r);
+
+        tokio::io::copy(&mut hr, &mut tokio::io::sink())
+            .await
+            .expect("read must succeed");
+
+        assert_eq!(*b3_digest, hr.digest().into());
+    }
+}