diff options
Diffstat (limited to 'tvix/castore/src')
-rw-r--r-- | tvix/castore/src/blobservice/memory.rs | 28 |
1 files changed, 9 insertions, 19 deletions
diff --git a/tvix/castore/src/blobservice/memory.rs b/tvix/castore/src/blobservice/memory.rs index 25eec334de60..873d06b461de 100644 --- a/tvix/castore/src/blobservice/memory.rs +++ b/tvix/castore/src/blobservice/memory.rs @@ -1,9 +1,7 @@ +use parking_lot::RwLock; use std::io::{self, Cursor, Write}; use std::task::Poll; -use std::{ - collections::HashMap, - sync::{Arc, RwLock}, -}; +use std::{collections::HashMap, sync::Arc}; use tonic::async_trait; use tracing::instrument; @@ -19,13 +17,13 @@ pub struct MemoryBlobService { impl BlobService for MemoryBlobService { #[instrument(skip_all, ret, err, fields(blob.digest=%digest))] async fn has(&self, digest: &B3Digest) -> io::Result<bool> { - let db = self.db.read().unwrap(); + let db = self.db.read(); Ok(db.contains_key(digest)) } #[instrument(skip_all, err, fields(blob.digest=%digest))] async fn open_read(&self, digest: &B3Digest) -> io::Result<Option<Box<dyn BlobReader>>> { - let db = self.db.read().unwrap(); + let db = self.db.read(); match db.get(digest).map(|x| Cursor::new(x.clone())) { Some(result) => Ok(Some(Box::new(result))), @@ -109,24 +107,16 @@ impl BlobWriter for MemoryBlobWriter { } else { let (buf, hasher) = self.writers.take().unwrap(); - // We know self.hasher is doing blake3 hashing, so this won't fail. let digest: B3Digest = hasher.finalize().as_bytes().into(); // Only insert if the blob doesn't already exist. - let db = self.db.read().map_err(|e| { - io::Error::new(io::ErrorKind::BrokenPipe, format!("RwLock poisoned: {}", e)) - })?; + let mut db = self.db.upgradable_read(); if !db.contains_key(&digest) { - // drop the read lock, so we can open for writing. - drop(db); - // open the database for writing. - let mut db = self.db.write().map_err(|e| { - io::Error::new(io::ErrorKind::BrokenPipe, format!("RwLock poisoned: {}", e)) - })?; - - // and put buf in there. This will move buf out. - db.insert(digest.clone(), buf); + db.with_upgraded(|db| { + // and put buf in there. This will move buf out. + db.insert(digest.clone(), buf); + }); } self.digest = Some(digest.clone()); |