about summary refs log tree commit diff
path: root/tvix/store/src/fuse/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/store/src/fuse/mod.rs')
-rw-r--r--tvix/store/src/fuse/mod.rs107
1 files changed, 84 insertions, 23 deletions
diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs
index 0015abb9d557..978fd50e2634 100644
--- a/tvix/store/src/fuse/mod.rs
+++ b/tvix/store/src/fuse/mod.rs
@@ -18,11 +18,12 @@ use crate::{
 };
 use fuser::{FileAttr, ReplyAttr, Request};
 use nix_compat::store_path::StorePath;
-use std::io::{self, Read, Seek};
+use std::io;
 use std::os::unix::ffi::OsStrExt;
 use std::str::FromStr;
 use std::sync::Arc;
 use std::{collections::HashMap, time::Duration};
+use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
 use tracing::{debug, info_span, warn};
 
 use self::inode_tracker::InodeTracker;
@@ -79,6 +80,8 @@ pub struct FUSE {
     file_handles: HashMap<u64, Box<dyn BlobReader>>,
 
     next_file_handle: u64,
+
+    tokio_handle: tokio::runtime::Handle,
 }
 
 impl FUSE {
@@ -100,6 +103,7 @@ impl FUSE {
 
             file_handles: Default::default(),
             next_file_handle: 1,
+            tokio_handle: tokio::runtime::Handle::current(),
         }
     }
 
@@ -430,6 +434,7 @@ impl fuser::Filesystem for FUSE {
             reply.error(libc::ENOSYS);
             return;
         }
+
         // lookup the inode
         match *self.inode_tracker.get(ino).unwrap() {
             // read is invalid on non-files.
@@ -441,7 +446,16 @@ impl fuser::Filesystem for FUSE {
                 let span = info_span!("read", blob.digest = %blob_digest);
                 let _enter = span.enter();
 
-                match self.blob_service.open_read(blob_digest) {
+                let blob_service = self.blob_service.clone();
+                let blob_digest = blob_digest.clone();
+
+                let task = self
+                    .tokio_handle
+                    .spawn(async move { blob_service.open_read(&blob_digest).await });
+
+                let blob_reader = self.tokio_handle.block_on(task).unwrap();
+
+                match blob_reader {
                     Ok(None) => {
                         warn!("blob not found");
                         reply.error(libc::EIO);
@@ -451,6 +465,7 @@ impl fuser::Filesystem for FUSE {
                         reply.error(libc::EIO);
                     }
                     Ok(Some(blob_reader)) => {
+                        debug!("add file handle {}", fh);
                         self.file_handles.insert(fh, blob_reader);
                         reply.opened(fh, 0);
 
@@ -477,9 +492,14 @@ impl fuser::Filesystem for FUSE {
         reply: fuser::ReplyEmpty,
     ) {
         // remove and get ownership on the blob reader
-        let blob_reader = self.file_handles.remove(&fh).unwrap();
-        // drop it, which will close it.
-        drop(blob_reader);
+        match self.file_handles.remove(&fh) {
+            // drop it, which will close it.
+            Some(blob_reader) => drop(blob_reader),
+            None => {
+                // These might already be dropped if a read error occured.
+                debug!("file_handle {} not found", fh);
+            }
+        }
 
         reply.ok();
     }
@@ -498,29 +518,70 @@ impl fuser::Filesystem for FUSE {
     ) {
         debug!("read");
 
-        let blob_reader = self.file_handles.get_mut(&fh).unwrap();
-
-        // seek to the offset specified, which is relative to the start of the file.
-        let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64));
-        match resp {
-            Ok(pos) => {
-                debug_assert_eq!(offset as u64, pos);
-            }
-            Err(e) => {
-                warn!("failed to seek to offset {}: {}", offset, e);
+        // We need to take out the blob reader from self.file_handles, so we can
+        // interact with it in the separate task.
+        // On success, we pass it back out of the task, so we can put it back in self.file_handles.
+        let mut blob_reader = match self.file_handles.remove(&fh) {
+            Some(blob_reader) => blob_reader,
+            None => {
+                warn!("file handle {} unknown", fh);
                 reply.error(libc::EIO);
                 return;
             }
-        }
+        };
 
-        // now with the blobreader seeked to this location, read size of data
-        let data: std::io::Result<Vec<u8>> =
-            blob_reader.bytes().take(size.try_into().unwrap()).collect();
+        let task = self.tokio_handle.spawn(async move {
+            // seek to the offset specified, which is relative to the start of the file.
+            let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await;
 
-        match data {
-            // respond with the requested data
-            Ok(data) => reply.data(&data),
-            Err(e) => reply.error(e.raw_os_error().unwrap()),
+            match resp {
+                Ok(pos) => {
+                    debug_assert_eq!(offset as u64, pos);
+                }
+                Err(e) => {
+                    warn!("failed to seek to offset {}: {}", offset, e);
+                    return Err(libc::EIO);
+                }
+            }
+
+            // As written in the fuser docs, read should send exactly the number
+            // of bytes requested except on EOF or error.
+
+            let mut buf: Vec<u8> = Vec::with_capacity(size as usize);
+
+            while (buf.len() as u64) < size as u64 {
+                match blob_reader.fill_buf().await {
+                    Ok(int_buf) => {
+                        // copy things from the internal buffer into buf to fill it till up until size
+
+                        // an empty buffer signals we reached EOF.
+                        if int_buf.is_empty() {
+                            break;
+                        }
+
+                        // calculate how many bytes we can read from int_buf.
+                        // It's either all of int_buf, or the number of bytes missing in buf to reach size.
+                        let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len());
+
+                        // copy these bytes into our buffer
+                        buf.extend_from_slice(&int_buf[..len_to_copy]);
+                        // and consume them in the buffered reader.
+                        blob_reader.consume(len_to_copy);
+                    }
+                    Err(e) => return Err(e.raw_os_error().unwrap()),
+                }
+            }
+            Ok((buf, blob_reader))
+        });
+
+        let resp = self.tokio_handle.block_on(task).unwrap();
+
+        match resp {
+            Err(e) => reply.error(e),
+            Ok((buf, blob_reader)) => {
+                reply.data(&buf);
+                self.file_handles.insert(fh, blob_reader);
+            }
         }
     }