diff options
author | Florian Klink <flokli@flokli.de> | 2024-04-15T08·02+0300 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2024-04-15T09·27+0000 |
commit | 3ed7eda79b53d5e03e32d7198508a356ee9af69e (patch) | |
tree | 6738ac7cc33c9e5645b3f3f1b613112afc7ec39b /tvix/castore/src/fs | |
parent | 1bf6b9f5a0caa5420ded4fba81646da9ea41bfb2 (diff) |
refactor(tvix/castore/fs): use std::sync::Mutex r/7914
This allows us acquiring the lock in sync code still. Also, simplify some of the error handling a bit. Change-Id: I29e83b715f92808e95ecb0ae9de787339d1a371d Reviewed-on: https://cl.tvl.fyi/c/depot/+/11424 Reviewed-by: raitobezarius <tvl@lahfa.xyz> Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI
Diffstat (limited to 'tvix/castore/src/fs')
-rw-r--r-- | tvix/castore/src/fs/mod.rs | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index 53f74fd400ed..9a2df0313869 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -27,6 +27,7 @@ use fuse_backend_rs::api::filesystem::{ use futures::StreamExt; use parking_lot::RwLock; use std::ffi::CStr; +use std::sync::Mutex; use std::{ collections::HashMap, io, @@ -98,7 +99,7 @@ pub struct TvixStoreFs<BS, DS, RN> { /// This holds all open file handles #[allow(clippy::type_complexity)] - file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>, + file_handles: RwLock<HashMap<u64, Arc<Mutex<Box<dyn BlobReader>>>>>, next_file_handle: AtomicU64, @@ -528,7 +529,7 @@ where debug!("add file handle {}", fh); self.file_handles .write() - .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader))); + .insert(fh, Arc::new(Mutex::new(blob_reader))); Ok(( Some(fh), @@ -581,29 +582,31 @@ where // We need to take out the blob reader from self.file_handles, so we can // interact with it in the separate task. // On success, we pass it back out of the task, so we can put it back in self.file_handles. - let blob_reader = match self.file_handles.read().get(&handle) { - Some(blob_reader) => blob_reader.clone(), - None => { + let blob_reader = self + .file_handles + .read() + .get(&handle) + .ok_or_else(|| { warn!("file handle {} unknown", handle); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - }; + io::Error::from_raw_os_error(libc::EIO) + }) + .cloned()?; - let buf = self.tokio_handle.block_on(async move { - let mut blob_reader = blob_reader.lock().await; + let mut blob_reader = blob_reader + .lock() + .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; + let buf = self.tokio_handle.block_on(async move { // seek to the offset specified, which is relative to the start of the file. - let resp = blob_reader.seek(io::SeekFrom::Start(offset)).await; - - match resp { - Ok(pos) => { - debug_assert_eq!(offset, pos); - } - Err(e) => { + let pos = blob_reader + .seek(io::SeekFrom::Start(offset)) + .await + .map_err(|e| { warn!("failed to seek to offset {}: {}", offset, e); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - } + io::Error::from_raw_os_error(libc::EIO) + })?; + + debug_assert_eq!(offset, pos); // As written in the fuse docs, read should send exactly the number // of bytes requested except on EOF or error. @@ -613,7 +616,7 @@ where // copy things from the internal buffer into buf to fill it till up until size tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?; - Ok(buf) + Ok::<_, std::io::Error>(buf) })?; w.write(&buf) |