diff options
Diffstat (limited to 'tvix/store/src/fuse/mod.rs')
-rw-r--r-- | tvix/store/src/fuse/mod.rs | 640 |
1 files changed, 402 insertions, 238 deletions
diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs index 978fd50e2634..1a5d884ef7a9 100644 --- a/tvix/store/src/fuse/mod.rs +++ b/tvix/store/src/fuse/mod.rs @@ -8,25 +8,34 @@ mod tests; use crate::{ blobservice::{BlobReader, BlobService}, directoryservice::DirectoryService, - fuse::{ - file_attr::gen_file_attr, - inodes::{DirectoryInodeData, InodeData}, - }, + fuse::inodes::{DirectoryInodeData, InodeData}, pathinfoservice::PathInfoService, proto::{node::Node, NamedNode}, B3Digest, Error, }; -use fuser::{FileAttr, ReplyAttr, Request}; +use fuse_backend_rs::{ + api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}, + transport::FuseSession, +}; use nix_compat::store_path::StorePath; -use std::io; -use std::os::unix::ffi::OsStrExt; -use std::str::FromStr; -use std::sync::Arc; -use std::{collections::HashMap, time::Duration}; +use parking_lot::RwLock; +use std::{ + collections::HashMap, + io, + path::Path, + str::FromStr, + sync::atomic::AtomicU64, + sync::{atomic::Ordering, Arc}, + thread, + time::Duration, +}; use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; -use tracing::{debug, info_span, warn}; +use tracing::{debug, error, info_span, warn}; -use self::inode_tracker::InodeTracker; +use self::{ + file_attr::{gen_file_attr, ROOT_FILE_ATTR}, + inode_tracker::InodeTracker, +}; /// This implements a read-only FUSE filesystem for a tvix-store /// with the passed [BlobService], [DirectoryService] and [PathInfoService]. @@ -71,15 +80,15 @@ pub struct FUSE { list_root: bool, /// This maps a given StorePath to the inode we allocated for the root inode. - store_paths: HashMap<StorePath, u64>, + store_paths: RwLock<HashMap<StorePath, u64>>, /// This keeps track of inodes and data alongside them. - inode_tracker: InodeTracker, + inode_tracker: RwLock<InodeTracker>, /// This holds all open file handles - file_handles: HashMap<u64, Box<dyn BlobReader>>, + file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>, - next_file_handle: u64, + next_file_handle: AtomicU64, tokio_handle: tokio::runtime::Handle, } @@ -98,11 +107,11 @@ impl FUSE { list_root, - store_paths: HashMap::default(), - inode_tracker: Default::default(), + store_paths: RwLock::new(HashMap::default()), + inode_tracker: RwLock::new(Default::default()), - file_handles: Default::default(), - next_file_handle: 1, + file_handles: RwLock::new(Default::default()), + next_file_handle: AtomicU64::new(1), tokio_handle: tokio::runtime::Handle::current(), } } @@ -114,11 +123,11 @@ impl FUSE { /// or otherwise fetch from [self.path_info_service], and then insert into /// [self.inode_tracker]. fn name_in_root_to_ino_and_data( - &mut self, - name: &std::ffi::OsStr, + &self, + name: &std::ffi::CStr, ) -> Result<Option<(u64, Arc<InodeData>)>, Error> { // parse the name into a [StorePath]. - let store_path = if let Some(name) = name.to_str() { + let store_path = if let Some(name) = name.to_str().ok() { match StorePath::from_str(name) { Ok(store_path) => store_path, Err(e) => { @@ -129,19 +138,26 @@ impl FUSE { } } } else { - debug!("{name:?} is no string"); + debug!("{name:?} is not a valid utf-8 string"); // same here. return Ok(None); }; - if let Some(ino) = self.store_paths.get(&store_path) { + let ino = { + // This extra scope makes sure we drop the read lock + // immediately after reading, to prevent deadlocks. + let store_paths = self.store_paths.read(); + store_paths.get(&store_path).cloned() + }; + + if let Some(ino) = ino { // If we already have that store path, lookup the inode from // self.store_paths and then get the data from [self.inode_tracker], // which in the case of a [InodeData::Directory] will be fully // populated. Ok(Some(( - *ino, - self.inode_tracker.get(*ino).expect("must exist"), + ino, + self.inode_tracker.read().get(ino).expect("must exist"), ))) } else { // If we don't have it, look it up in PathInfoService. @@ -157,15 +173,29 @@ impl FUSE { return Ok(None); } + // Let's check if someone else beat us to updating the inode tracker and + // store_paths map. + let mut store_paths = self.store_paths.write(); + if let Some(ino) = store_paths.get(&store_path).cloned() { + return Ok(Some(( + ino, + self.inode_tracker.read().get(ino).expect("must exist"), + ))); + } + // insert the (sparse) inode data and register in // self.store_paths. // FUTUREWORK: change put to return the data after // inserting, so we don't need to lookup a second // time? - let ino = self.inode_tracker.put((&root_node).into()); - self.store_paths.insert(store_path, ino); + let (ino, inode) = { + let mut inode_tracker = self.inode_tracker.write(); + let ino = inode_tracker.put((&root_node).into()); + (ino, inode_tracker.get(ino).unwrap()) + }; + store_paths.insert(store_path, ino); - Ok(Some((ino, self.inode_tracker.get(ino).unwrap()))) + Ok(Some((ino, inode))) } } } @@ -194,136 +224,152 @@ impl FUSE { } } -impl fuser::Filesystem for FUSE { - #[tracing::instrument(skip_all, fields(rq.inode = ino))] - fn getattr(&mut self, _req: &Request, ino: u64, reply: ReplyAttr) { - debug!("getattr"); +impl FileSystem for FUSE { + type Inode = u64; + type Handle = u64; + + fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> { + Ok(FsOptions::empty()) + } - if ino == fuser::FUSE_ROOT_ID { - reply.attr(&Duration::MAX, &file_attr::ROOT_FILE_ATTR); - return; + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn getattr( + &self, + _ctx: &Context, + inode: Self::Inode, + _handle: Option<Self::Handle>, + ) -> io::Result<(libc::stat64, Duration)> { + if inode == ROOT_ID { + return Ok((ROOT_FILE_ATTR.into(), Duration::MAX)); } - match self.inode_tracker.get(ino) { - None => reply.error(libc::ENOENT), + match self.inode_tracker.read().get(inode) { + None => return Err(io::Error::from_raw_os_error(libc::ENOENT)), Some(node) => { debug!(node = ?node, "found node"); - reply.attr(&Duration::MAX, &file_attr::gen_file_attr(&node, ino)); + Ok((gen_file_attr(&node, inode).into(), Duration::MAX)) } } } - #[tracing::instrument(skip_all, fields(rq.parent_inode = parent_ino, rq.name = ?name))] + #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))] fn lookup( - &mut self, - _req: &Request, - parent_ino: u64, - name: &std::ffi::OsStr, - reply: fuser::ReplyEntry, - ) { + &self, + _ctx: &Context, + parent: Self::Inode, + name: &std::ffi::CStr, + ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> { debug!("lookup"); // This goes from a parent inode to a node. - // - If the parent is [fuser::FUSE_ROOT_ID], we need to check + // - If the parent is [ROOT_ID], we need to check // [self.store_paths] (fetching from PathInfoService if needed) // - Otherwise, lookup the parent in [self.inode_tracker] (which must be // a [InodeData::Directory]), and find the child with that name. - if parent_ino == fuser::FUSE_ROOT_ID { - match self.name_in_root_to_ino_and_data(name) { + if parent == ROOT_ID { + return match self.name_in_root_to_ino_and_data(name) { Err(e) => { warn!("{}", e); - reply.error(libc::EIO); - } - Ok(None) => { - reply.error(libc::ENOENT); + Err(io::Error::from_raw_os_error(libc::ENOENT)) } + Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), Ok(Some((ino, inode_data))) => { debug!(inode_data=?&inode_data, ino=ino, "Some"); - reply_with_entry(reply, &gen_file_attr(&inode_data, ino)); + Ok(fuse_backend_rs::api::filesystem::Entry { + inode: ino, + attr: gen_file_attr(&inode_data, ino).into(), + attr_timeout: Duration::MAX, + entry_timeout: Duration::MAX, + ..Default::default() + }) } + }; + } + + // This is the "lookup for "a" inside inode 42. + // We already know that inode 42 must be a directory. + // It might not be populated yet, so if it isn't, we do (by + // fetching from [self.directory_service]), and save the result in + // [self.inode_tracker]. + // Now it for sure is populated, so we search for that name in the + // list of children and return the FileAttrs. + + // TODO: Reduce the critical section of this write lock. + let mut inode_tracker = self.inode_tracker.write(); + let parent_data = inode_tracker.get(parent).unwrap(); + let parent_data = match *parent_data { + InodeData::Regular(..) | InodeData::Symlink(_) => { + // if the parent inode was not a directory, this doesn't make sense + return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); } - } else { - // This is the "lookup for "a" inside inode 42. - // We already know that inode 42 must be a directory. - // It might not be populated yet, so if it isn't, we do (by - // fetching from [self.directory_service]), and save the result in - // [self.inode_tracker]. - // Now it for sure is populated, so we search for that name in the - // list of children and return the FileAttrs. - - let parent_data = self.inode_tracker.get(parent_ino).unwrap(); - let parent_data = match *parent_data { - InodeData::Regular(..) | InodeData::Symlink(_) => { - // if the parent inode was not a directory, this doesn't make sense - reply.error(libc::ENOTDIR); - return; - } - InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { - match self.fetch_directory_inode_data(parent_digest) { - Ok(new_data) => { - // update data in [self.inode_tracker] with populated variant. - // FUTUREWORK: change put to return the data after - // inserting, so we don't need to lookup a second - // time? - let ino = self.inode_tracker.put(new_data); - self.inode_tracker.get(ino).unwrap() - } - Err(_e) => { - reply.error(libc::EIO); - return; - } + InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { + match self.fetch_directory_inode_data(parent_digest) { + Ok(new_data) => { + // update data in [self.inode_tracker] with populated variant. + // FUTUREWORK: change put to return the data after + // inserting, so we don't need to lookup a second + // time? + let ino = inode_tracker.put(new_data); + inode_tracker.get(ino).unwrap() + } + Err(_e) => { + return Err(io::Error::from_raw_os_error(libc::EIO)); } } - InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data, - }; - - // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))]. - let (parent_digest, children) = if let InodeData::Directory( - DirectoryInodeData::Populated(ref parent_digest, ref children), - ) = *parent_data - { - (parent_digest, children) - } else { - panic!("unexpected type") - }; - let span = info_span!("lookup", directory.digest = %parent_digest); - let _enter = span.enter(); - - // in the children, find the one with the desired name. - if let Some((child_ino, _)) = - children.iter().find(|e| e.1.get_name() == name.as_bytes()) - { - // lookup the child [InodeData] in [self.inode_tracker]. - // We know the inodes for children have already been allocated. - let child_inode_data = self.inode_tracker.get(*child_ino).unwrap(); - - // Reply with the file attributes for the child. - // For child directories, we still have all data we need to reply. - reply_with_entry(reply, &gen_file_attr(&child_inode_data, *child_ino)); - } else { - // Child not found, return ENOENT. - reply.error(libc::ENOENT); } + InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data, + }; + + // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))]. + let (parent_digest, children) = if let InodeData::Directory( + DirectoryInodeData::Populated(ref parent_digest, ref children), + ) = *parent_data + { + (parent_digest, children) + } else { + panic!("unexpected type") + }; + let span = info_span!("lookup", directory.digest = %parent_digest); + let _enter = span.enter(); + + // in the children, find the one with the desired name. + if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { + // lookup the child [InodeData] in [self.inode_tracker]. + // We know the inodes for children have already been allocated. + let child_inode_data = inode_tracker.get(*child_ino).unwrap(); + + // Reply with the file attributes for the child. + // For child directories, we still have all data we need to reply. + Ok(fuse_backend_rs::api::filesystem::Entry { + inode: *child_ino, + attr: gen_file_attr(&child_inode_data, *child_ino).into(), + attr_timeout: Duration::MAX, + entry_timeout: Duration::MAX, + ..Default::default() + }) + } else { + // Child not found, return ENOENT. + Err(io::Error::from_raw_os_error(libc::ENOENT)) } } // TODO: readdirplus? - #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset))] + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))] fn readdir( - &mut self, - _req: &Request<'_>, - ino: u64, - _fh: u64, - offset: i64, - mut reply: fuser::ReplyDirectory, - ) { + &self, + _ctx: &Context, + inode: Self::Inode, + _handle: Self::Handle, + _size: u32, + offset: u64, + add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>, + ) -> io::Result<()> { debug!("readdir"); - if ino == fuser::FUSE_ROOT_ID { + if inode == ROOT_ID { if !self.list_root { - reply.error(libc::EPERM); // same error code as ipfs/kubo - return; + return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } else { for (i, path_info) in self .path_info_service @@ -334,8 +380,7 @@ impl fuser::Filesystem for FUSE { let path_info = match path_info { Err(e) => { warn!("failed to retrieve pathinfo: {}", e); - reply.error(libc::EPERM); - return; + return Err(io::Error::from_raw_os_error(libc::EPERM)); } Ok(path_info) => path_info, }; @@ -344,41 +389,51 @@ impl fuser::Filesystem for FUSE { let root_node = path_info.node.unwrap().node.unwrap(); let store_path = StorePath::from_bytes(root_node.get_name()).unwrap(); - let ino = match self.store_paths.get(&store_path) { - Some(ino) => *ino, + let ino = { + // This extra scope makes sure we drop the read lock + // immediately after reading, to prevent deadlocks. + let store_paths = self.store_paths.read(); + store_paths.get(&store_path).cloned() + }; + let ino = match ino { + Some(ino) => ino, None => { // insert the (sparse) inode data and register in // self.store_paths. - let ino = self.inode_tracker.put((&root_node).into()); - self.store_paths.insert(store_path.clone(), ino); + let ino = self.inode_tracker.write().put((&root_node).into()); + self.store_paths.write().insert(store_path.clone(), ino); ino } }; let ty = match root_node { - Node::Directory(_) => fuser::FileType::Directory, - Node::File(_) => fuser::FileType::RegularFile, - Node::Symlink(_) => fuser::FileType::Symlink, + Node::Directory(_) => libc::S_IFDIR, + Node::File(_) => libc::S_IFREG, + Node::Symlink(_) => libc::S_IFLNK, }; - let full = - reply.add(ino, offset + i as i64 + 1_i64, ty, store_path.to_string()); - if full { + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino, + offset: offset + i as u64 + 1, + type_: ty, + name: store_path.to_string().as_bytes(), + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { break; } } - reply.ok(); - return; + return Ok(()); } } // lookup the inode data. - let dir_inode_data = self.inode_tracker.get(ino).unwrap(); + let mut inode_tracker = self.inode_tracker.write(); + let dir_inode_data = inode_tracker.get(inode).unwrap(); let dir_inode_data = match *dir_inode_data { InodeData::Regular(..) | InodeData::Symlink(..) => { warn!("Not a directory"); - reply.error(libc::ENOTDIR); - return; + return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); } InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => { match self.fetch_directory_inode_data(directory_digest) { @@ -387,12 +442,11 @@ impl fuser::Filesystem for FUSE { // FUTUREWORK: change put to return the data after // inserting, so we don't need to lookup a second // time? - let ino = self.inode_tracker.put(new_data); - self.inode_tracker.get(ino).unwrap() + let ino = inode_tracker.put(new_data.clone()); + inode_tracker.get(ino).unwrap() } Err(_e) => { - reply.error(libc::EIO); - return; + return Err(io::Error::from_raw_os_error(libc::EIO)); } } } @@ -405,42 +459,49 @@ impl fuser::Filesystem for FUSE { { for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() { // the second parameter will become the "offset" parameter on the next call. - let full = reply.add( - *ino, - offset + i as i64 + 1_i64, - match child_node { - Node::Directory(_) => fuser::FileType::Directory, - Node::File(_) => fuser::FileType::RegularFile, - Node::Symlink(_) => fuser::FileType::Symlink, + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino: *ino, + offset: offset + i as u64 + 1, + type_: match child_node { + Node::Directory(_) => libc::S_IFDIR, + Node::File(_) => libc::S_IFREG, + Node::Symlink(_) => libc::S_IFLNK, }, - std::ffi::OsStr::from_bytes(child_node.get_name()), - ); - if full { + name: child_node.get_name(), + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { break; } } - reply.ok(); } else { panic!("unexpected type") } - } - #[tracing::instrument(skip_all, fields(rq.inode = ino))] - fn open(&mut self, _req: &Request<'_>, ino: u64, _flags: i32, reply: fuser::ReplyOpen) { - // get a new file handle - let fh = self.next_file_handle; + Ok(()) + } - if ino == fuser::FUSE_ROOT_ID { - reply.error(libc::ENOSYS); - return; + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn open( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + _fuse_flags: u32, + ) -> io::Result<( + Option<Self::Handle>, + fuse_backend_rs::api::filesystem::OpenOptions, + )> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); } // lookup the inode - match *self.inode_tracker.get(ino).unwrap() { + match *self.inode_tracker.read().get(inode).unwrap() { // read is invalid on non-files. InodeData::Directory(..) | InodeData::Symlink(_) => { warn!("is directory"); - reply.error(libc::EISDIR); + return Err(io::Error::from_raw_os_error(libc::EISDIR)); } InodeData::Regular(ref blob_digest, _blob_size, _) => { let span = info_span!("read", blob.digest = %blob_digest); @@ -458,79 +519,87 @@ impl fuser::Filesystem for FUSE { match blob_reader { Ok(None) => { warn!("blob not found"); - reply.error(libc::EIO); + return Err(io::Error::from_raw_os_error(libc::EIO)); } Err(e) => { warn!(e=?e, "error opening blob"); - reply.error(libc::EIO); + return Err(io::Error::from_raw_os_error(libc::EIO)); } Ok(Some(blob_reader)) => { - debug!("add file handle {}", fh); - self.file_handles.insert(fh, blob_reader); - reply.opened(fh, 0); - + // get a new file handle // TODO: this will overflow after 2**64 operations, // which is fine for now. // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 // for the discussion on alternatives. - self.next_file_handle += 1; + let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst); + + debug!("add file handle {}", fh); + self.file_handles + .write() + .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader))); + + Ok(( + Some(fh), + fuse_backend_rs::api::filesystem::OpenOptions::empty(), + )) } } } } } - #[tracing::instrument(skip_all, fields(rq.inode = ino, fh = fh))] + #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))] fn release( - &mut self, - _req: &Request<'_>, - ino: u64, - fh: u64, - _flags: i32, - _lock_owner: Option<u64>, + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + handle: Self::Handle, _flush: bool, - reply: fuser::ReplyEmpty, - ) { + _flock_release: bool, + _lock_owner: Option<u64>, + ) -> io::Result<()> { // remove and get ownership on the blob reader - match self.file_handles.remove(&fh) { + match self.file_handles.write().remove(&handle) { // drop it, which will close it. Some(blob_reader) => drop(blob_reader), None => { // These might already be dropped if a read error occured. - debug!("file_handle {} not found", fh); + debug!("file_handle {} not found", handle); } } - reply.ok(); + Ok(()) } - #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset, rq.size = size))] + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))] fn read( - &mut self, - _req: &Request<'_>, - ino: u64, - fh: u64, - offset: i64, + &self, + _ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter, size: u32, - _flags: i32, + offset: u64, _lock_owner: Option<u64>, - reply: fuser::ReplyData, - ) { + _flags: u32, + ) -> io::Result<usize> { debug!("read"); // We need to take out the blob reader from self.file_handles, so we can // interact with it in the separate task. // On success, we pass it back out of the task, so we can put it back in self.file_handles. - let mut blob_reader = match self.file_handles.remove(&fh) { - Some(blob_reader) => blob_reader, + let blob_reader = match self.file_handles.read().get(&handle) { + Some(blob_reader) => blob_reader.clone(), None => { - warn!("file handle {} unknown", fh); - reply.error(libc::EIO); - return; + warn!("file handle {} unknown", handle); + return Err(io::Error::from_raw_os_error(libc::EIO)); } }; let task = self.tokio_handle.spawn(async move { + let mut blob_reader = blob_reader.lock().await; + // seek to the offset specified, which is relative to the start of the file. let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await; @@ -540,68 +609,163 @@ impl fuser::Filesystem for FUSE { } Err(e) => { warn!("failed to seek to offset {}: {}", offset, e); - return Err(libc::EIO); + return Err(io::Error::from_raw_os_error(libc::EIO)); } } - // As written in the fuser docs, read should send exactly the number + // As written in the fuse docs, read should send exactly the number // of bytes requested except on EOF or error. let mut buf: Vec<u8> = Vec::with_capacity(size as usize); while (buf.len() as u64) < size as u64 { - match blob_reader.fill_buf().await { - Ok(int_buf) => { - // copy things from the internal buffer into buf to fill it till up until size + let int_buf = blob_reader.fill_buf().await?; + // copy things from the internal buffer into buf to fill it till up until size - // an empty buffer signals we reached EOF. - if int_buf.is_empty() { - break; - } + // an empty buffer signals we reached EOF. + if int_buf.is_empty() { + break; + } - // calculate how many bytes we can read from int_buf. - // It's either all of int_buf, or the number of bytes missing in buf to reach size. - let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len()); + // calculate how many bytes we can read from int_buf. + // It's either all of int_buf, or the number of bytes missing in buf to reach size. + let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len()); - // copy these bytes into our buffer - buf.extend_from_slice(&int_buf[..len_to_copy]); - // and consume them in the buffered reader. - blob_reader.consume(len_to_copy); - } - Err(e) => return Err(e.raw_os_error().unwrap()), - } + // copy these bytes into our buffer + buf.extend_from_slice(&int_buf[..len_to_copy]); + // and consume them in the buffered reader. + blob_reader.consume(len_to_copy); } - Ok((buf, blob_reader)) + + Ok(buf) }); - let resp = self.tokio_handle.block_on(task).unwrap(); + let buf = self.tokio_handle.block_on(task).unwrap()?; - match resp { - Err(e) => reply.error(e), - Ok((buf, blob_reader)) => { - reply.data(&buf); - self.file_handles.insert(fh, blob_reader); - } - } + w.write(&buf) } - #[tracing::instrument(skip_all, fields(rq.inode = ino))] - fn readlink(&mut self, _req: &Request<'_>, ino: u64, reply: fuser::ReplyData) { - if ino == fuser::FUSE_ROOT_ID { - reply.error(libc::ENOSYS); - return; + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); } // lookup the inode - match *self.inode_tracker.get(ino).unwrap() { + match *self.inode_tracker.read().get(inode).unwrap() { InodeData::Directory(..) | InodeData::Regular(..) => { - reply.error(libc::EINVAL); + Err(io::Error::from_raw_os_error(libc::EINVAL)) } - InodeData::Symlink(ref target) => reply.data(target), + InodeData::Symlink(ref target) => Ok(target.to_vec()), } } } -fn reply_with_entry(reply: fuser::ReplyEntry, file_attr: &FileAttr) { - reply.entry(&Duration::MAX, file_attr, 1 /* TODO: generation */); +struct FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>, + channel: fuse_backend_rs::transport::FuseChannel, +} + +impl<FS> FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + fn start(&mut self) -> io::Result<()> { + loop { + if let Some((reader, writer)) = self + .channel + .get_request() + .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? + { + if let Err(e) = self + .server + .handle_message(reader, writer.into(), None, None) + { + match e { + // This indicates the session has been shut down. + fuse_backend_rs::Error::EncodeMessage(e) + if e.raw_os_error() == Some(libc::EBADFD) => + { + break; + } + error => { + error!(?error, "failed to handle fuse request"); + continue; + } + } + } + } else { + break; + } + } + Ok(()) + } +} + +pub struct FuseDaemon { + session: FuseSession, + threads: Vec<thread::JoinHandle<()>>, +} + +impl FuseDaemon { + pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error> + where + FS: FileSystem + Sync + Send + 'static, + P: AsRef<Path>, + { + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + session.set_allow_other(false); + session + .mount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + let mut join_handles = Vec::with_capacity(threads); + for _ in 0..threads { + let mut server = FuseServer { + server: server.clone(), + channel: session + .new_channel() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + }; + let join_handle = thread::Builder::new() + .name("fuse_server".to_string()) + .spawn(move || { + let _ = server.start(); + })?; + join_handles.push(join_handle); + } + + Ok(FuseDaemon { + session, + threads: join_handles, + }) + } + + pub fn unmount(&mut self) -> Result<(), io::Error> { + self.session + .umount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + for thread in self.threads.drain(..) { + thread.join().map_err(|_| { + io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") + })?; + } + + Ok(()) + } +} + +impl Drop for FuseDaemon { + fn drop(&mut self) { + if let Err(error) = self.unmount() { + error!(?error, "failed to unmont fuse filesystem") + } + } } |