about summary refs log tree commit diff
path: root/tvix/castore
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore')
-rw-r--r--tvix/castore/src/blobservice/memory.rs28
1 files changed, 9 insertions, 19 deletions
diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs
index 25eec334de60..873d06b461de 100644
--- a/tvix/castore/src/blobservice/memory.rs
+++ b/tvix/castore/src/blobservice/memory.rs
@@ -1,9 +1,7 @@
+use parking_lot::RwLock;
 use std::io::{self, Cursor, Write};
 use std::task::Poll;
-use std::{
-    collections::HashMap,
-    sync::{Arc, RwLock},
-};
+use std::{collections::HashMap, sync::Arc};
 use tonic::async_trait;
 use tracing::instrument;
 
@@ -19,13 +17,13 @@ pub struct MemoryBlobService {
 impl BlobService for MemoryBlobService {
     #[instrument(skip_all, ret, err, fields(blob.digest=%digest))]
     async fn has(&self, digest: &B3Digest) -> io::Result<bool> {
-        let db = self.db.read().unwrap();
+        let db = self.db.read();
         Ok(db.contains_key(digest))
     }
 
     #[instrument(skip_all, err, fields(blob.digest=%digest))]
     async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> {
-        let db = self.db.read().unwrap();
+        let db = self.db.read();
 
         match db.get(digest).map(|x| Cursor::new(x.clone())) {
             Some(result) => Ok(Some(Box::new(result))),
@@ -109,24 +107,16 @@ impl BlobWriter for MemoryBlobWriter {
         } else {
             let (buf, hasher) = self.writers.take().unwrap();
 
-            // We know self.hasher is doing blake3 hashing, so this won't fail.
             let digest: B3Digest = hasher.finalize().as_bytes().into();
 
             // Only insert if the blob doesn't already exist.
-            let db = self.db.read().map_err(|e| {
-                io::Error::new(io::ErrorKind::BrokenPipe, format!("RwLock poisoned: {}", e))
-            })?;
+            let mut db = self.db.upgradable_read();
             if !db.contains_key(&digest) {
-                // drop the read lock, so we can open for writing.
-                drop(db);
-
                 // open the database for writing.
-                let mut db = self.db.write().map_err(|e| {
-                    io::Error::new(io::ErrorKind::BrokenPipe, format!("RwLock poisoned: {}", e))
-                })?;
-
-                // and put buf in there. This will move buf out.
-                db.insert(digest.clone(), buf);
+                db.with_upgraded(|db| {
+                    // and put buf in there. This will move buf out.
+                    db.insert(digest.clone(), buf);
+                });
             }
 
             self.digest = Some(digest.clone());