diff options
Diffstat (limited to 'tvix/store/src/fuse/mod.rs')
-rw-r--r-- | tvix/store/src/fuse/mod.rs | 107 |
1 files changed, 84 insertions, 23 deletions
diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs index 0015abb9d557..978fd50e2634 100644 --- a/tvix/store/src/fuse/mod.rs +++ b/tvix/store/src/fuse/mod.rs @@ -18,11 +18,12 @@ use crate::{ }; use fuser::{FileAttr, ReplyAttr, Request}; use nix_compat::store_path::StorePath; -use std::io::{self, Read, Seek}; +use std::io; use std::os::unix::ffi::OsStrExt; use std::str::FromStr; use std::sync::Arc; use std::{collections::HashMap, time::Duration}; +use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; use tracing::{debug, info_span, warn}; use self::inode_tracker::InodeTracker; @@ -79,6 +80,8 @@ pub struct FUSE { file_handles: HashMap<u64, Box<dyn BlobReader>>, next_file_handle: u64, + + tokio_handle: tokio::runtime::Handle, } impl FUSE { @@ -100,6 +103,7 @@ impl FUSE { file_handles: Default::default(), next_file_handle: 1, + tokio_handle: tokio::runtime::Handle::current(), } } @@ -430,6 +434,7 @@ impl fuser::Filesystem for FUSE { reply.error(libc::ENOSYS); return; } + // lookup the inode match *self.inode_tracker.get(ino).unwrap() { // read is invalid on non-files. @@ -441,7 +446,16 @@ impl fuser::Filesystem for FUSE { let span = info_span!("read", blob.digest = %blob_digest); let _enter = span.enter(); - match self.blob_service.open_read(blob_digest) { + let blob_service = self.blob_service.clone(); + let blob_digest = blob_digest.clone(); + + let task = self + .tokio_handle + .spawn(async move { blob_service.open_read(&blob_digest).await }); + + let blob_reader = self.tokio_handle.block_on(task).unwrap(); + + match blob_reader { Ok(None) => { warn!("blob not found"); reply.error(libc::EIO); @@ -451,6 +465,7 @@ impl fuser::Filesystem for FUSE { reply.error(libc::EIO); } Ok(Some(blob_reader)) => { + debug!("add file handle {}", fh); self.file_handles.insert(fh, blob_reader); reply.opened(fh, 0); @@ -477,9 +492,14 @@ impl fuser::Filesystem for FUSE { reply: fuser::ReplyEmpty, ) { // remove and get ownership on the blob reader - let blob_reader = self.file_handles.remove(&fh).unwrap(); - // drop it, which will close it. - drop(blob_reader); + match self.file_handles.remove(&fh) { + // drop it, which will close it. + Some(blob_reader) => drop(blob_reader), + None => { + // These might already be dropped if a read error occured. + debug!("file_handle {} not found", fh); + } + } reply.ok(); } @@ -498,29 +518,70 @@ impl fuser::Filesystem for FUSE { ) { debug!("read"); - let blob_reader = self.file_handles.get_mut(&fh).unwrap(); - - // seek to the offset specified, which is relative to the start of the file. - let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)); - match resp { - Ok(pos) => { - debug_assert_eq!(offset as u64, pos); - } - Err(e) => { - warn!("failed to seek to offset {}: {}", offset, e); + // We need to take out the blob reader from self.file_handles, so we can + // interact with it in the separate task. + // On success, we pass it back out of the task, so we can put it back in self.file_handles. + let mut blob_reader = match self.file_handles.remove(&fh) { + Some(blob_reader) => blob_reader, + None => { + warn!("file handle {} unknown", fh); reply.error(libc::EIO); return; } - } + }; - // now with the blobreader seeked to this location, read size of data - let data: std::io::Result<Vec<u8>> = - blob_reader.bytes().take(size.try_into().unwrap()).collect(); + let task = self.tokio_handle.spawn(async move { + // seek to the offset specified, which is relative to the start of the file. + let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await; - match data { - // respond with the requested data - Ok(data) => reply.data(&data), - Err(e) => reply.error(e.raw_os_error().unwrap()), + match resp { + Ok(pos) => { + debug_assert_eq!(offset as u64, pos); + } + Err(e) => { + warn!("failed to seek to offset {}: {}", offset, e); + return Err(libc::EIO); + } + } + + // As written in the fuser docs, read should send exactly the number + // of bytes requested except on EOF or error. + + let mut buf: Vec<u8> = Vec::with_capacity(size as usize); + + while (buf.len() as u64) < size as u64 { + match blob_reader.fill_buf().await { + Ok(int_buf) => { + // copy things from the internal buffer into buf to fill it till up until size + + // an empty buffer signals we reached EOF. + if int_buf.is_empty() { + break; + } + + // calculate how many bytes we can read from int_buf. + // It's either all of int_buf, or the number of bytes missing in buf to reach size. + let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len()); + + // copy these bytes into our buffer + buf.extend_from_slice(&int_buf[..len_to_copy]); + // and consume them in the buffered reader. + blob_reader.consume(len_to_copy); + } + Err(e) => return Err(e.raw_os_error().unwrap()), + } + } + Ok((buf, blob_reader)) + }); + + let resp = self.tokio_handle.block_on(task).unwrap(); + + match resp { + Err(e) => reply.error(e), + Ok((buf, blob_reader)) => { + reply.data(&buf); + self.file_handles.insert(fh, blob_reader); + } } } |