From 3ed7eda79b53d5e03e32d7198508a356ee9af69e Mon Sep 17 00:00:00 2001 From: Florian Klink Date: Mon, 15 Apr 2024 11:02:49 +0300 Subject: refactor(tvix/castore/fs): use std::sync::Mutex This allows us acquiring the lock in sync code still. Also, simplify some of the error handling a bit. Change-Id: I29e83b715f92808e95ecb0ae9de787339d1a371d Reviewed-on: https://cl.tvl.fyi/c/depot/+/11424 Reviewed-by: raitobezarius Autosubmit: flokli Tested-by: BuildkiteCI --- tvix/castore/src/fs/mod.rs | 45 ++++++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index 53f74fd400ed..9a2df0313869 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -27,6 +27,7 @@ use fuse_backend_rs::api::filesystem::{ use futures::StreamExt; use parking_lot::RwLock; use std::ffi::CStr; +use std::sync::Mutex; use std::{ collections::HashMap, io, @@ -98,7 +99,7 @@ pub struct TvixStoreFs { /// This holds all open file handles #[allow(clippy::type_complexity)] - file_handles: RwLock>>>>, + file_handles: RwLock>>>>, next_file_handle: AtomicU64, @@ -528,7 +529,7 @@ where debug!("add file handle {}", fh); self.file_handles .write() - .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader))); + .insert(fh, Arc::new(Mutex::new(blob_reader))); Ok(( Some(fh), @@ -581,29 +582,31 @@ where // We need to take out the blob reader from self.file_handles, so we can // interact with it in the separate task. // On success, we pass it back out of the task, so we can put it back in self.file_handles. - let blob_reader = match self.file_handles.read().get(&handle) { - Some(blob_reader) => blob_reader.clone(), - None => { + let blob_reader = self + .file_handles + .read() + .get(&handle) + .ok_or_else(|| { warn!("file handle {} unknown", handle); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - }; + io::Error::from_raw_os_error(libc::EIO) + }) + .cloned()?; - let buf = self.tokio_handle.block_on(async move { - let mut blob_reader = blob_reader.lock().await; + let mut blob_reader = blob_reader + .lock() + .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; + let buf = self.tokio_handle.block_on(async move { // seek to the offset specified, which is relative to the start of the file. - let resp = blob_reader.seek(io::SeekFrom::Start(offset)).await; - - match resp { - Ok(pos) => { - debug_assert_eq!(offset, pos); - } - Err(e) => { + let pos = blob_reader + .seek(io::SeekFrom::Start(offset)) + .await + .map_err(|e| { warn!("failed to seek to offset {}: {}", offset, e); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - } + io::Error::from_raw_os_error(libc::EIO) + })?; + + debug_assert_eq!(offset, pos); // As written in the fuse docs, read should send exactly the number // of bytes requested except on EOF or error. @@ -613,7 +616,7 @@ where // copy things from the internal buffer into buf to fill it till up until size tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?; - Ok(buf) + Ok::<_, std::io::Error>(buf) })?; w.write(&buf) -- cgit 1.4.1