about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
authorYureka <tvl@yuka.dev>2024-09-27T19·30+0200
committerclbot <clbot@tvl.fyi>2024-10-01T13·40+0000
commitd277bd9fbf17d579b9c51c12a4126f67a9e9db6c (patch)
tree6db503930e6cc9950fc7bfcc357807899deb01ae /tvix/castore
parent84f4ea5e7ce2fe8dc5c315a73c2158b578546f94 (diff)
feat(tvix/castore/refscan): share the scanner between readers r/8740
This changes the only actual state the ReferenceScanner has to use atomic bools, so it no longer requires a mutable borrow for .scan(). This allows passing an immutable borrow of a reference scanner to multiple threads which might be ingesting blobs in parallel, and using
them in the ReferenceReader or calling .scan() there.

Change-Id: Id5c30bcebb06bf15eae8c4451d70eb806cab722e
Reviewed-on: https://cl.tvl.fyi/c/depot/+/12528
Autosubmit: yuka <yuka@yuka.dev>
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/src/refscan.rs69
1 files changed, 30 insertions, 39 deletions
diff --git a/tvix/castore/src/refscan.rs b/tvix/castore/src/refscan.rs
index 80a126349746..0b8af296bb50 100644
--- a/tvix/castore/src/refscan.rs
+++ b/tvix/castore/src/refscan.rs
@@ -9,6 +9,7 @@
 use pin_project::pin_project;
 use std::collections::BTreeSet;
 use std::pin::Pin;
+use std::sync::atomic::{AtomicBool, Ordering};
 use std::sync::Arc;
 use std::task::{ready, Poll};
 use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};
@@ -74,7 +75,7 @@ where
 /// of bytes patterns to scan for.
 pub struct ReferenceScanner<P> {
     pattern: ReferencePattern<P>,
-    matches: Vec<bool>,
+    matches: Vec<AtomicBool>,
 }
 
 impl<P: AsRef<[u8]>> ReferenceScanner<P> {
@@ -82,20 +83,23 @@ impl<P: AsRef<[u8]>> ReferenceScanner<P> {
     /// candidate bytes patterns.
     pub fn new<IP: Into<ReferencePattern<P>>>(pattern: IP) -> Self {
         let pattern = pattern.into();
-        let matches = vec![false; pattern.candidates().len()];
+        let mut matches = Vec::new();
+        for _ in 0..pattern.candidates().len() {
+            matches.push(AtomicBool::new(false));
+        }
         ReferenceScanner { pattern, matches }
     }
 
     /// Scan the given buffer for all non-overlapping matches and collect them
     /// in the scanner.
-    pub fn scan<S: AsRef<[u8]>>(&mut self, haystack: S) {
+    pub fn scan<S: AsRef<[u8]>>(&self, haystack: S) {
         if haystack.as_ref().len() < self.pattern.longest_candidate() {
             return;
         }
 
         if let Some(searcher) = &self.pattern.inner.searcher {
             for m in searcher.find(haystack) {
-                self.matches[m.pat_idx] = true;
+                self.matches[m.pat_idx].store(true, Ordering::Release);
             }
         }
     }
@@ -104,14 +108,17 @@ impl<P: AsRef<[u8]>> ReferenceScanner<P> {
         &self.pattern
     }
 
-    pub fn matches(&self) -> &[bool] {
-        &self.matches
+    pub fn matches(&self) -> Vec<bool> {
+        self.matches
+            .iter()
+            .map(|m| m.load(Ordering::Acquire))
+            .collect()
     }
 
     pub fn candidate_matches(&self) -> impl Iterator<Item = &P> {
         let candidates = self.pattern.candidates();
         self.matches.iter().enumerate().filter_map(|(idx, found)| {
-            if *found {
+            if found.load(Ordering::Acquire) {
                 Some(&candidates[idx])
             } else {
                 None
@@ -130,52 +137,35 @@ impl<P: Clone + Ord + AsRef<[u8]>> ReferenceScanner<P> {
 const DEFAULT_BUF_SIZE: usize = 8 * 1024;
 
 #[pin_project]
-pub struct ReferenceReader<P, R> {
-    scanner: ReferenceScanner<P>,
+pub struct ReferenceReader<'a, P, R> {
+    scanner: &'a ReferenceScanner<P>,
     buffer: Vec<u8>,
     consumed: usize,
     #[pin]
     reader: R,
 }
 
-impl<P, R> ReferenceReader<P, R>
+impl<'a, P, R> ReferenceReader<'a, P, R>
 where
     P: AsRef<[u8]>,
 {
-    pub fn new(pattern: ReferencePattern<P>, reader: R) -> ReferenceReader<P, R> {
-        Self::with_capacity(DEFAULT_BUF_SIZE, pattern, reader)
+    pub fn new(scanner: &'a ReferenceScanner<P>, reader: R) -> Self {
+        Self::with_capacity(DEFAULT_BUF_SIZE, scanner, reader)
     }
 
-    pub fn with_capacity(
-        capacity: usize,
-        pattern: ReferencePattern<P>,
-        reader: R,
-    ) -> ReferenceReader<P, R> {
+    pub fn with_capacity(capacity: usize, scanner: &'a ReferenceScanner<P>, reader: R) -> Self {
         // If capacity is not at least as long as longest_candidate we can't do a scan
-        let capacity = capacity.max(pattern.longest_candidate());
+        let capacity = capacity.max(scanner.pattern().longest_candidate());
         ReferenceReader {
-            scanner: ReferenceScanner::new(pattern),
+            scanner,
             buffer: Vec::with_capacity(capacity),
             consumed: 0,
             reader,
         }
     }
-
-    pub fn scanner(&self) -> &ReferenceScanner<P> {
-        &self.scanner
-    }
-}
-
-impl<P, R> ReferenceReader<P, R>
-where
-    P: Clone + Ord + AsRef<[u8]>,
-{
-    pub fn finalise(self) -> BTreeSet<P> {
-        self.scanner.finalise()
-    }
 }
 
-impl<P, R> AsyncRead for ReferenceReader<P, R>
+impl<'a, P, R> AsyncRead for ReferenceReader<'a, P, R>
 where
     R: AsyncRead,
     P: AsRef<[u8]>,
@@ -193,7 +183,7 @@ where
     }
 }
 
-impl<P, R> AsyncBufRead for ReferenceReader<P, R>
+impl<'a, P, R> AsyncBufRead for ReferenceReader<'a, P, R>
 where
     R: AsyncRead,
     P: AsRef<[u8]>,
@@ -257,7 +247,7 @@ mod tests {
 
     #[test]
     fn test_no_patterns() {
-        let mut scanner: ReferenceScanner<String> = ReferenceScanner::new(vec![]);
+        let scanner: ReferenceScanner<String> = ReferenceScanner::new(vec![]);
 
         scanner.scan(HELLO_DRV);
 
@@ -268,7 +258,7 @@ mod tests {
 
     #[test]
     fn test_single_match() {
-        let mut scanner = ReferenceScanner::new(vec![
+        let scanner = ReferenceScanner::new(vec![
             "/nix/store/4xw8n979xpivdc46a9ndcvyhwgif00hz-bash-5.1-p16".to_string(),
         ]);
         scanner.scan(HELLO_DRV);
@@ -290,7 +280,7 @@ mod tests {
             "/nix/store/fn7zvafq26f0c8b17brs7s95s10ibfzs-emacs-28.2.drv".to_string(),
         ];
 
-        let mut scanner = ReferenceScanner::new(candidates.clone());
+        let scanner = ReferenceScanner::new(candidates.clone());
         scanner.scan(HELLO_DRV);
 
         let result = scanner.finalise();
@@ -317,17 +307,18 @@ mod tests {
             "fn7zvafq26f0c8b17brs7s95s10ibfzs",
         ];
         let pattern = ReferencePattern::new(candidates.clone());
+        let scanner = ReferenceScanner::new(pattern);
         let mut mock = Builder::new();
         for c in HELLO_DRV.as_bytes().chunks(chunk_size) {
             mock.read(c);
         }
         let mock = mock.build();
-        let mut reader = ReferenceReader::with_capacity(capacity, pattern, mock);
+        let mut reader = ReferenceReader::with_capacity(capacity, &scanner, mock);
         let mut s = String::new();
         reader.read_to_string(&mut s).await.unwrap();
         assert_eq!(s, HELLO_DRV);
 
-        let result = reader.finalise();
+        let result = scanner.finalise();
         assert_eq!(result.len(), 3);
 
         for c in candidates[..3].iter() {