about summary refs log tree commit diff
path: root/tvix/castore/src/fs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/fs')
-rw-r--r--tvix/castore/src/fs/mod.rs45
1 files changed, 24 insertions, 21 deletions
diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs
index 53f74fd400ed..9a2df0313869 100644
--- a/tvix/castore/src/fs/mod.rs
+++ b/tvix/castore/src/fs/mod.rs
@@ -27,6 +27,7 @@ use fuse_backend_rs::api::filesystem::{
 use futures::StreamExt;
 use parking_lot::RwLock;
 use std::ffi::CStr;
+use std::sync::Mutex;
 use std::{
     collections::HashMap,
     io,
@@ -98,7 +99,7 @@ pub struct TvixStoreFs<BS, DS, RN> {
 
     /// This holds all open file handles
     #[allow(clippy::type_complexity)]
-    file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>,
+    file_handles: RwLock<HashMap<u64, Arc<Mutex<Box<dyn BlobReader>>>>>,
 
     next_file_handle: AtomicU64,
 
@@ -528,7 +529,7 @@ where
                         debug!("add file handle {}", fh);
                         self.file_handles
                             .write()
-                            .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader)));
+                            .insert(fh, Arc::new(Mutex::new(blob_reader)));
 
                         Ok((
                             Some(fh),
@@ -581,29 +582,31 @@ where
         // We need to take out the blob reader from self.file_handles, so we can
         // interact with it in the separate task.
         // On success, we pass it back out of the task, so we can put it back in self.file_handles.
-        let blob_reader = match self.file_handles.read().get(&handle) {
-            Some(blob_reader) => blob_reader.clone(),
-            None => {
+        let blob_reader = self
+            .file_handles
+            .read()
+            .get(&handle)
+            .ok_or_else(|| {
                 warn!("file handle {} unknown", handle);
-                return Err(io::Error::from_raw_os_error(libc::EIO));
-            }
-        };
+                io::Error::from_raw_os_error(libc::EIO)
+            })
+            .cloned()?;
 
-        let buf = self.tokio_handle.block_on(async move {
-            let mut blob_reader = blob_reader.lock().await;
+        let mut blob_reader = blob_reader
+            .lock()
+            .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
 
+        let buf = self.tokio_handle.block_on(async move {
             // seek to the offset specified, which is relative to the start of the file.
-            let resp = blob_reader.seek(io::SeekFrom::Start(offset)).await;
-
-            match resp {
-                Ok(pos) => {
-                    debug_assert_eq!(offset, pos);
-                }
-                Err(e) => {
+            let pos = blob_reader
+                .seek(io::SeekFrom::Start(offset))
+                .await
+                .map_err(|e| {
                     warn!("failed to seek to offset {}: {}", offset, e);
-                    return Err(io::Error::from_raw_os_error(libc::EIO));
-                }
-            }
+                    io::Error::from_raw_os_error(libc::EIO)
+                })?;
+
+            debug_assert_eq!(offset, pos);
 
             // As written in the fuse docs, read should send exactly the number
             // of bytes requested except on EOF or error.
@@ -613,7 +616,7 @@ where
             // copy things from the internal buffer into buf to fill it till up until size
             tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?;
 
-            Ok(buf)
+            Ok::<_, std::io::Error>(buf)
         })?;
 
         w.write(&buf)