diff options
Diffstat (limited to 'tvix/store/src/fuse')
-rw-r--r-- | tvix/store/src/fuse/file_attr.rs | 55 | ||||
-rw-r--r-- | tvix/store/src/fuse/inodes.rs | 10 | ||||
-rw-r--r-- | tvix/store/src/fuse/mod.rs | 640 | ||||
-rw-r--r-- | tvix/store/src/fuse/tests.rs | 92 |
4 files changed, 470 insertions, 327 deletions
diff --git a/tvix/store/src/fuse/file_attr.rs b/tvix/store/src/fuse/file_attr.rs index 25cfd28dd1f9..b946aa977a0a 100644 --- a/tvix/store/src/fuse/file_attr.rs +++ b/tvix/store/src/fuse/file_attr.rs @@ -1,20 +1,19 @@ -use std::time::SystemTime; - use super::inodes::{DirectoryInodeData, InodeData}; -use fuser::FileAttr; +use fuse_backend_rs::abi::fuse_abi::Attr; -/// The [FileAttr] describing the root -pub const ROOT_FILE_ATTR: FileAttr = FileAttr { - ino: fuser::FUSE_ROOT_ID, +/// The [Attr] describing the root +pub const ROOT_FILE_ATTR: Attr = Attr { + ino: fuse_backend_rs::api::filesystem::ROOT_ID, size: 0, blksize: 1024, blocks: 0, - atime: SystemTime::UNIX_EPOCH, - mtime: SystemTime::UNIX_EPOCH, - ctime: SystemTime::UNIX_EPOCH, - crtime: SystemTime::UNIX_EPOCH, - kind: fuser::FileType::Directory, - perm: 0o555, + mode: libc::S_IFDIR | 0o555, + atime: 0, + mtime: 0, + ctime: 0, + atimensec: 0, + mtimensec: 0, + ctimensec: 0, nlink: 0, uid: 0, gid: 0, @@ -22,10 +21,12 @@ pub const ROOT_FILE_ATTR: FileAttr = FileAttr { flags: 0, }; -/// for given &Node and inode, construct a [FileAttr] -pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> FileAttr { - FileAttr { +/// for given &Node and inode, construct an [Attr] +pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> Attr { + Attr { ino: inode, + // FUTUREWORK: play with this numbers, as it affects read sizes for client applications. + blocks: 1024, size: match inode_data { InodeData::Regular(_, size, _) => *size as u64, InodeData::Symlink(target) => target.len() as u64, @@ -34,24 +35,12 @@ pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> FileAttr { children.len() as u64 } }, - // FUTUREWORK: play with this numbers, as it affects read sizes for client applications. - blksize: 1024, - blocks: 0, - atime: SystemTime::UNIX_EPOCH, - mtime: SystemTime::UNIX_EPOCH, - ctime: SystemTime::UNIX_EPOCH, - crtime: SystemTime::UNIX_EPOCH, - kind: inode_data.into(), - perm: match inode_data { - InodeData::Regular(_, _, false) => 0o444, // no-executable files - InodeData::Regular(_, _, true) => 0o555, // executable files - InodeData::Symlink(_) => 0o444, - InodeData::Directory(..) => 0o555, + mode: match inode_data { + InodeData::Regular(_, _, false) => libc::S_IFREG | 0o444, // no-executable files + InodeData::Regular(_, _, true) => libc::S_IFREG | 0o555, // executable files + InodeData::Symlink(_) => libc::S_IFLNK | 0o444, + InodeData::Directory(_) => libc::S_IFDIR | 0o555, }, - nlink: 0, - uid: 0, - gid: 0, - rdev: 0, - flags: 0, + ..Default::default() } } diff --git a/tvix/store/src/fuse/inodes.rs b/tvix/store/src/fuse/inodes.rs index f44dde7b804f..e8959ce3629b 100644 --- a/tvix/store/src/fuse/inodes.rs +++ b/tvix/store/src/fuse/inodes.rs @@ -66,13 +66,3 @@ impl From<proto::Directory> for InodeData { InodeData::Directory(DirectoryInodeData::Populated(digest, children)) } } - -impl From<&InodeData> for fuser::FileType { - fn from(val: &InodeData) -> Self { - match val { - InodeData::Regular(..) => fuser::FileType::RegularFile, - InodeData::Symlink(_) => fuser::FileType::Symlink, - InodeData::Directory(..) => fuser::FileType::Directory, - } - } -} diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs index 978fd50e2634..1a5d884ef7a9 100644 --- a/tvix/store/src/fuse/mod.rs +++ b/tvix/store/src/fuse/mod.rs @@ -8,25 +8,34 @@ mod tests; use crate::{ blobservice::{BlobReader, BlobService}, directoryservice::DirectoryService, - fuse::{ - file_attr::gen_file_attr, - inodes::{DirectoryInodeData, InodeData}, - }, + fuse::inodes::{DirectoryInodeData, InodeData}, pathinfoservice::PathInfoService, proto::{node::Node, NamedNode}, B3Digest, Error, }; -use fuser::{FileAttr, ReplyAttr, Request}; +use fuse_backend_rs::{ + api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}, + transport::FuseSession, +}; use nix_compat::store_path::StorePath; -use std::io; -use std::os::unix::ffi::OsStrExt; -use std::str::FromStr; -use std::sync::Arc; -use std::{collections::HashMap, time::Duration}; +use parking_lot::RwLock; +use std::{ + collections::HashMap, + io, + path::Path, + str::FromStr, + sync::atomic::AtomicU64, + sync::{atomic::Ordering, Arc}, + thread, + time::Duration, +}; use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; -use tracing::{debug, info_span, warn}; +use tracing::{debug, error, info_span, warn}; -use self::inode_tracker::InodeTracker; +use self::{ + file_attr::{gen_file_attr, ROOT_FILE_ATTR}, + inode_tracker::InodeTracker, +}; /// This implements a read-only FUSE filesystem for a tvix-store /// with the passed [BlobService], [DirectoryService] and [PathInfoService]. @@ -71,15 +80,15 @@ pub struct FUSE { list_root: bool, /// This maps a given StorePath to the inode we allocated for the root inode. - store_paths: HashMap<StorePath, u64>, + store_paths: RwLock<HashMap<StorePath, u64>>, /// This keeps track of inodes and data alongside them. - inode_tracker: InodeTracker, + inode_tracker: RwLock<InodeTracker>, /// This holds all open file handles - file_handles: HashMap<u64, Box<dyn BlobReader>>, + file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>, - next_file_handle: u64, + next_file_handle: AtomicU64, tokio_handle: tokio::runtime::Handle, } @@ -98,11 +107,11 @@ impl FUSE { list_root, - store_paths: HashMap::default(), - inode_tracker: Default::default(), + store_paths: RwLock::new(HashMap::default()), + inode_tracker: RwLock::new(Default::default()), - file_handles: Default::default(), - next_file_handle: 1, + file_handles: RwLock::new(Default::default()), + next_file_handle: AtomicU64::new(1), tokio_handle: tokio::runtime::Handle::current(), } } @@ -114,11 +123,11 @@ impl FUSE { /// or otherwise fetch from [self.path_info_service], and then insert into /// [self.inode_tracker]. fn name_in_root_to_ino_and_data( - &mut self, - name: &std::ffi::OsStr, + &self, + name: &std::ffi::CStr, ) -> Result<Option<(u64, Arc<InodeData>)>, Error> { // parse the name into a [StorePath]. - let store_path = if let Some(name) = name.to_str() { + let store_path = if let Some(name) = name.to_str().ok() { match StorePath::from_str(name) { Ok(store_path) => store_path, Err(e) => { @@ -129,19 +138,26 @@ impl FUSE { } } } else { - debug!("{name:?} is no string"); + debug!("{name:?} is not a valid utf-8 string"); // same here. return Ok(None); }; - if let Some(ino) = self.store_paths.get(&store_path) { + let ino = { + // This extra scope makes sure we drop the read lock + // immediately after reading, to prevent deadlocks. + let store_paths = self.store_paths.read(); + store_paths.get(&store_path).cloned() + }; + + if let Some(ino) = ino { // If we already have that store path, lookup the inode from // self.store_paths and then get the data from [self.inode_tracker], // which in the case of a [InodeData::Directory] will be fully // populated. Ok(Some(( - *ino, - self.inode_tracker.get(*ino).expect("must exist"), + ino, + self.inode_tracker.read().get(ino).expect("must exist"), ))) } else { // If we don't have it, look it up in PathInfoService. @@ -157,15 +173,29 @@ impl FUSE { return Ok(None); } + // Let's check if someone else beat us to updating the inode tracker and + // store_paths map. + let mut store_paths = self.store_paths.write(); + if let Some(ino) = store_paths.get(&store_path).cloned() { + return Ok(Some(( + ino, + self.inode_tracker.read().get(ino).expect("must exist"), + ))); + } + // insert the (sparse) inode data and register in // self.store_paths. // FUTUREWORK: change put to return the data after // inserting, so we don't need to lookup a second // time? - let ino = self.inode_tracker.put((&root_node).into()); - self.store_paths.insert(store_path, ino); + let (ino, inode) = { + let mut inode_tracker = self.inode_tracker.write(); + let ino = inode_tracker.put((&root_node).into()); + (ino, inode_tracker.get(ino).unwrap()) + }; + store_paths.insert(store_path, ino); - Ok(Some((ino, self.inode_tracker.get(ino).unwrap()))) + Ok(Some((ino, inode))) } } } @@ -194,136 +224,152 @@ impl FUSE { } } -impl fuser::Filesystem for FUSE { - #[tracing::instrument(skip_all, fields(rq.inode = ino))] - fn getattr(&mut self, _req: &Request, ino: u64, reply: ReplyAttr) { - debug!("getattr"); +impl FileSystem for FUSE { + type Inode = u64; + type Handle = u64; + + fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> { + Ok(FsOptions::empty()) + } - if ino == fuser::FUSE_ROOT_ID { - reply.attr(&Duration::MAX, &file_attr::ROOT_FILE_ATTR); - return; + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn getattr( + &self, + _ctx: &Context, + inode: Self::Inode, + _handle: Option<Self::Handle>, + ) -> io::Result<(libc::stat64, Duration)> { + if inode == ROOT_ID { + return Ok((ROOT_FILE_ATTR.into(), Duration::MAX)); } - match self.inode_tracker.get(ino) { - None => reply.error(libc::ENOENT), + match self.inode_tracker.read().get(inode) { + None => return Err(io::Error::from_raw_os_error(libc::ENOENT)), Some(node) => { debug!(node = ?node, "found node"); - reply.attr(&Duration::MAX, &file_attr::gen_file_attr(&node, ino)); + Ok((gen_file_attr(&node, inode).into(), Duration::MAX)) } } } - #[tracing::instrument(skip_all, fields(rq.parent_inode = parent_ino, rq.name = ?name))] + #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))] fn lookup( - &mut self, - _req: &Request, - parent_ino: u64, - name: &std::ffi::OsStr, - reply: fuser::ReplyEntry, - ) { + &self, + _ctx: &Context, + parent: Self::Inode, + name: &std::ffi::CStr, + ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> { debug!("lookup"); // This goes from a parent inode to a node. - // - If the parent is [fuser::FUSE_ROOT_ID], we need to check + // - If the parent is [ROOT_ID], we need to check // [self.store_paths] (fetching from PathInfoService if needed) // - Otherwise, lookup the parent in [self.inode_tracker] (which must be // a [InodeData::Directory]), and find the child with that name. - if parent_ino == fuser::FUSE_ROOT_ID { - match self.name_in_root_to_ino_and_data(name) { + if parent == ROOT_ID { + return match self.name_in_root_to_ino_and_data(name) { Err(e) => { warn!("{}", e); - reply.error(libc::EIO); - } - Ok(None) => { - reply.error(libc::ENOENT); + Err(io::Error::from_raw_os_error(libc::ENOENT)) } + Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), Ok(Some((ino, inode_data))) => { debug!(inode_data=?&inode_data, ino=ino, "Some"); - reply_with_entry(reply, &gen_file_attr(&inode_data, ino)); + Ok(fuse_backend_rs::api::filesystem::Entry { + inode: ino, + attr: gen_file_attr(&inode_data, ino).into(), + attr_timeout: Duration::MAX, + entry_timeout: Duration::MAX, + ..Default::default() + }) } + }; + } + + // This is the "lookup for "a" inside inode 42. + // We already know that inode 42 must be a directory. + // It might not be populated yet, so if it isn't, we do (by + // fetching from [self.directory_service]), and save the result in + // [self.inode_tracker]. + // Now it for sure is populated, so we search for that name in the + // list of children and return the FileAttrs. + + // TODO: Reduce the critical section of this write lock. + let mut inode_tracker = self.inode_tracker.write(); + let parent_data = inode_tracker.get(parent).unwrap(); + let parent_data = match *parent_data { + InodeData::Regular(..) | InodeData::Symlink(_) => { + // if the parent inode was not a directory, this doesn't make sense + return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); } - } else { - // This is the "lookup for "a" inside inode 42. - // We already know that inode 42 must be a directory. - // It might not be populated yet, so if it isn't, we do (by - // fetching from [self.directory_service]), and save the result in - // [self.inode_tracker]. - // Now it for sure is populated, so we search for that name in the - // list of children and return the FileAttrs. - - let parent_data = self.inode_tracker.get(parent_ino).unwrap(); - let parent_data = match *parent_data { - InodeData::Regular(..) | InodeData::Symlink(_) => { - // if the parent inode was not a directory, this doesn't make sense - reply.error(libc::ENOTDIR); - return; - } - InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { - match self.fetch_directory_inode_data(parent_digest) { - Ok(new_data) => { - // update data in [self.inode_tracker] with populated variant. - // FUTUREWORK: change put to return the data after - // inserting, so we don't need to lookup a second - // time? - let ino = self.inode_tracker.put(new_data); - self.inode_tracker.get(ino).unwrap() - } - Err(_e) => { - reply.error(libc::EIO); - return; - } + InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { + match self.fetch_directory_inode_data(parent_digest) { + Ok(new_data) => { + // update data in [self.inode_tracker] with populated variant. + // FUTUREWORK: change put to return the data after + // inserting, so we don't need to lookup a second + // time? + let ino = inode_tracker.put(new_data); + inode_tracker.get(ino).unwrap() + } + Err(_e) => { + return Err(io::Error::from_raw_os_error(libc::EIO)); } } - InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data, - }; - - // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))]. - let (parent_digest, children) = if let InodeData::Directory( - DirectoryInodeData::Populated(ref parent_digest, ref children), - ) = *parent_data - { - (parent_digest, children) - } else { - panic!("unexpected type") - }; - let span = info_span!("lookup", directory.digest = %parent_digest); - let _enter = span.enter(); - - // in the children, find the one with the desired name. - if let Some((child_ino, _)) = - children.iter().find(|e| e.1.get_name() == name.as_bytes()) - { - // lookup the child [InodeData] in [self.inode_tracker]. - // We know the inodes for children have already been allocated. - let child_inode_data = self.inode_tracker.get(*child_ino).unwrap(); - - // Reply with the file attributes for the child. - // For child directories, we still have all data we need to reply. - reply_with_entry(reply, &gen_file_attr(&child_inode_data, *child_ino)); - } else { - // Child not found, return ENOENT. - reply.error(libc::ENOENT); } + InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data, + }; + + // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))]. + let (parent_digest, children) = if let InodeData::Directory( + DirectoryInodeData::Populated(ref parent_digest, ref children), + ) = *parent_data + { + (parent_digest, children) + } else { + panic!("unexpected type") + }; + let span = info_span!("lookup", directory.digest = %parent_digest); + let _enter = span.enter(); + + // in the children, find the one with the desired name. + if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { + // lookup the child [InodeData] in [self.inode_tracker]. + // We know the inodes for children have already been allocated. + let child_inode_data = inode_tracker.get(*child_ino).unwrap(); + + // Reply with the file attributes for the child. + // For child directories, we still have all data we need to reply. + Ok(fuse_backend_rs::api::filesystem::Entry { + inode: *child_ino, + attr: gen_file_attr(&child_inode_data, *child_ino).into(), + attr_timeout: Duration::MAX, + entry_timeout: Duration::MAX, + ..Default::default() + }) + } else { + // Child not found, return ENOENT. + Err(io::Error::from_raw_os_error(libc::ENOENT)) } } // TODO: readdirplus? - #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset))] + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))] fn readdir( - &mut self, - _req: &Request<'_>, - ino: u64, - _fh: u64, - offset: i64, - mut reply: fuser::ReplyDirectory, - ) { + &self, + _ctx: &Context, + inode: Self::Inode, + _handle: Self::Handle, + _size: u32, + offset: u64, + add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>, + ) -> io::Result<()> { debug!("readdir"); - if ino == fuser::FUSE_ROOT_ID { + if inode == ROOT_ID { if !self.list_root { - reply.error(libc::EPERM); // same error code as ipfs/kubo - return; + return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } else { for (i, path_info) in self .path_info_service @@ -334,8 +380,7 @@ impl fuser::Filesystem for FUSE { let path_info = match path_info { Err(e) => { warn!("failed to retrieve pathinfo: {}", e); - reply.error(libc::EPERM); - return; + return Err(io::Error::from_raw_os_error(libc::EPERM)); } Ok(path_info) => path_info, }; @@ -344,41 +389,51 @@ impl fuser::Filesystem for FUSE { let root_node = path_info.node.unwrap().node.unwrap(); let store_path = StorePath::from_bytes(root_node.get_name()).unwrap(); - let ino = match self.store_paths.get(&store_path) { - Some(ino) => *ino, + let ino = { + // This extra scope makes sure we drop the read lock + // immediately after reading, to prevent deadlocks. + let store_paths = self.store_paths.read(); + store_paths.get(&store_path).cloned() + }; + let ino = match ino { + Some(ino) => ino, None => { // insert the (sparse) inode data and register in // self.store_paths. - let ino = self.inode_tracker.put((&root_node).into()); - self.store_paths.insert(store_path.clone(), ino); + let ino = self.inode_tracker.write().put((&root_node).into()); + self.store_paths.write().insert(store_path.clone(), ino); ino } }; let ty = match root_node { - Node::Directory(_) => fuser::FileType::Directory, - Node::File(_) => fuser::FileType::RegularFile, - Node::Symlink(_) => fuser::FileType::Symlink, + Node::Directory(_) => libc::S_IFDIR, + Node::File(_) => libc::S_IFREG, + Node::Symlink(_) => libc::S_IFLNK, }; - let full = - reply.add(ino, offset + i as i64 + 1_i64, ty, store_path.to_string()); - if full { + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino, + offset: offset + i as u64 + 1, + type_: ty, + name: store_path.to_string().as_bytes(), + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { break; } } - reply.ok(); - return; + return Ok(()); } } // lookup the inode data. - let dir_inode_data = self.inode_tracker.get(ino).unwrap(); + let mut inode_tracker = self.inode_tracker.write(); + let dir_inode_data = inode_tracker.get(inode).unwrap(); let dir_inode_data = match *dir_inode_data { InodeData::Regular(..) | InodeData::Symlink(..) => { warn!("Not a directory"); - reply.error(libc::ENOTDIR); - return; + return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); } InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => { match self.fetch_directory_inode_data(directory_digest) { @@ -387,12 +442,11 @@ impl fuser::Filesystem for FUSE { // FUTUREWORK: change put to return the data after // inserting, so we don't need to lookup a second // time? - let ino = self.inode_tracker.put(new_data); - self.inode_tracker.get(ino).unwrap() + let ino = inode_tracker.put(new_data.clone()); + inode_tracker.get(ino).unwrap() } Err(_e) => { - reply.error(libc::EIO); - return; + return Err(io::Error::from_raw_os_error(libc::EIO)); } } } @@ -405,42 +459,49 @@ impl fuser::Filesystem for FUSE { { for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() { // the second parameter will become the "offset" parameter on the next call. - let full = reply.add( - *ino, - offset + i as i64 + 1_i64, - match child_node { - Node::Directory(_) => fuser::FileType::Directory, - Node::File(_) => fuser::FileType::RegularFile, - Node::Symlink(_) => fuser::FileType::Symlink, + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino: *ino, + offset: offset + i as u64 + 1, + type_: match child_node { + Node::Directory(_) => libc::S_IFDIR, + Node::File(_) => libc::S_IFREG, + Node::Symlink(_) => libc::S_IFLNK, }, - std::ffi::OsStr::from_bytes(child_node.get_name()), - ); - if full { + name: child_node.get_name(), + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { break; } } - reply.ok(); } else { panic!("unexpected type") } - } - #[tracing::instrument(skip_all, fields(rq.inode = ino))] - fn open(&mut self, _req: &Request<'_>, ino: u64, _flags: i32, reply: fuser::ReplyOpen) { - // get a new file handle - let fh = self.next_file_handle; + Ok(()) + } - if ino == fuser::FUSE_ROOT_ID { - reply.error(libc::ENOSYS); - return; + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn open( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + _fuse_flags: u32, + ) -> io::Result<( + Option<Self::Handle>, + fuse_backend_rs::api::filesystem::OpenOptions, + )> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); } // lookup the inode - match *self.inode_tracker.get(ino).unwrap() { + match *self.inode_tracker.read().get(inode).unwrap() { // read is invalid on non-files. InodeData::Directory(..) | InodeData::Symlink(_) => { warn!("is directory"); - reply.error(libc::EISDIR); + return Err(io::Error::from_raw_os_error(libc::EISDIR)); } InodeData::Regular(ref blob_digest, _blob_size, _) => { let span = info_span!("read", blob.digest = %blob_digest); @@ -458,79 +519,87 @@ impl fuser::Filesystem for FUSE { match blob_reader { Ok(None) => { warn!("blob not found"); - reply.error(libc::EIO); + return Err(io::Error::from_raw_os_error(libc::EIO)); } Err(e) => { warn!(e=?e, "error opening blob"); - reply.error(libc::EIO); + return Err(io::Error::from_raw_os_error(libc::EIO)); } Ok(Some(blob_reader)) => { - debug!("add file handle {}", fh); - self.file_handles.insert(fh, blob_reader); - reply.opened(fh, 0); - + // get a new file handle // TODO: this will overflow after 2**64 operations, // which is fine for now. // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 // for the discussion on alternatives. - self.next_file_handle += 1; + let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst); + + debug!("add file handle {}", fh); + self.file_handles + .write() + .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader))); + + Ok(( + Some(fh), + fuse_backend_rs::api::filesystem::OpenOptions::empty(), + )) } } } } } - #[tracing::instrument(skip_all, fields(rq.inode = ino, fh = fh))] + #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))] fn release( - &mut self, - _req: &Request<'_>, - ino: u64, - fh: u64, - _flags: i32, - _lock_owner: Option<u64>, + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + handle: Self::Handle, _flush: bool, - reply: fuser::ReplyEmpty, - ) { + _flock_release: bool, + _lock_owner: Option<u64>, + ) -> io::Result<()> { // remove and get ownership on the blob reader - match self.file_handles.remove(&fh) { + match self.file_handles.write().remove(&handle) { // drop it, which will close it. Some(blob_reader) => drop(blob_reader), None => { // These might already be dropped if a read error occured. - debug!("file_handle {} not found", fh); + debug!("file_handle {} not found", handle); } } - reply.ok(); + Ok(()) } - #[tracing::instrument(skip_all, fields(rq.inode = ino, rq.offset = offset, rq.size = size))] + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))] fn read( - &mut self, - _req: &Request<'_>, - ino: u64, - fh: u64, - offset: i64, + &self, + _ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter, size: u32, - _flags: i32, + offset: u64, _lock_owner: Option<u64>, - reply: fuser::ReplyData, - ) { + _flags: u32, + ) -> io::Result<usize> { debug!("read"); // We need to take out the blob reader from self.file_handles, so we can // interact with it in the separate task. // On success, we pass it back out of the task, so we can put it back in self.file_handles. - let mut blob_reader = match self.file_handles.remove(&fh) { - Some(blob_reader) => blob_reader, + let blob_reader = match self.file_handles.read().get(&handle) { + Some(blob_reader) => blob_reader.clone(), None => { - warn!("file handle {} unknown", fh); - reply.error(libc::EIO); - return; + warn!("file handle {} unknown", handle); + return Err(io::Error::from_raw_os_error(libc::EIO)); } }; let task = self.tokio_handle.spawn(async move { + let mut blob_reader = blob_reader.lock().await; + // seek to the offset specified, which is relative to the start of the file. let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await; @@ -540,68 +609,163 @@ impl fuser::Filesystem for FUSE { } Err(e) => { warn!("failed to seek to offset {}: {}", offset, e); - return Err(libc::EIO); + return Err(io::Error::from_raw_os_error(libc::EIO)); } } - // As written in the fuser docs, read should send exactly the number + // As written in the fuse docs, read should send exactly the number // of bytes requested except on EOF or error. let mut buf: Vec<u8> = Vec::with_capacity(size as usize); while (buf.len() as u64) < size as u64 { - match blob_reader.fill_buf().await { - Ok(int_buf) => { - // copy things from the internal buffer into buf to fill it till up until size + let int_buf = blob_reader.fill_buf().await?; + // copy things from the internal buffer into buf to fill it till up until size - // an empty buffer signals we reached EOF. - if int_buf.is_empty() { - break; - } + // an empty buffer signals we reached EOF. + if int_buf.is_empty() { + break; + } - // calculate how many bytes we can read from int_buf. - // It's either all of int_buf, or the number of bytes missing in buf to reach size. - let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len()); + // calculate how many bytes we can read from int_buf. + // It's either all of int_buf, or the number of bytes missing in buf to reach size. + let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len()); - // copy these bytes into our buffer - buf.extend_from_slice(&int_buf[..len_to_copy]); - // and consume them in the buffered reader. - blob_reader.consume(len_to_copy); - } - Err(e) => return Err(e.raw_os_error().unwrap()), - } + // copy these bytes into our buffer + buf.extend_from_slice(&int_buf[..len_to_copy]); + // and consume them in the buffered reader. + blob_reader.consume(len_to_copy); } - Ok((buf, blob_reader)) + + Ok(buf) }); - let resp = self.tokio_handle.block_on(task).unwrap(); + let buf = self.tokio_handle.block_on(task).unwrap()?; - match resp { - Err(e) => reply.error(e), - Ok((buf, blob_reader)) => { - reply.data(&buf); - self.file_handles.insert(fh, blob_reader); - } - } + w.write(&buf) } - #[tracing::instrument(skip_all, fields(rq.inode = ino))] - fn readlink(&mut self, _req: &Request<'_>, ino: u64, reply: fuser::ReplyData) { - if ino == fuser::FUSE_ROOT_ID { - reply.error(libc::ENOSYS); - return; + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); } // lookup the inode - match *self.inode_tracker.get(ino).unwrap() { + match *self.inode_tracker.read().get(inode).unwrap() { InodeData::Directory(..) | InodeData::Regular(..) => { - reply.error(libc::EINVAL); + Err(io::Error::from_raw_os_error(libc::EINVAL)) } - InodeData::Symlink(ref target) => reply.data(target), + InodeData::Symlink(ref target) => Ok(target.to_vec()), } } } -fn reply_with_entry(reply: fuser::ReplyEntry, file_attr: &FileAttr) { - reply.entry(&Duration::MAX, file_attr, 1 /* TODO: generation */); +struct FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>, + channel: fuse_backend_rs::transport::FuseChannel, +} + +impl<FS> FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + fn start(&mut self) -> io::Result<()> { + loop { + if let Some((reader, writer)) = self + .channel + .get_request() + .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? + { + if let Err(e) = self + .server + .handle_message(reader, writer.into(), None, None) + { + match e { + // This indicates the session has been shut down. + fuse_backend_rs::Error::EncodeMessage(e) + if e.raw_os_error() == Some(libc::EBADFD) => + { + break; + } + error => { + error!(?error, "failed to handle fuse request"); + continue; + } + } + } + } else { + break; + } + } + Ok(()) + } +} + +pub struct FuseDaemon { + session: FuseSession, + threads: Vec<thread::JoinHandle<()>>, +} + +impl FuseDaemon { + pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error> + where + FS: FileSystem + Sync + Send + 'static, + P: AsRef<Path>, + { + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + session.set_allow_other(false); + session + .mount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + let mut join_handles = Vec::with_capacity(threads); + for _ in 0..threads { + let mut server = FuseServer { + server: server.clone(), + channel: session + .new_channel() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + }; + let join_handle = thread::Builder::new() + .name("fuse_server".to_string()) + .spawn(move || { + let _ = server.start(); + })?; + join_handles.push(join_handle); + } + + Ok(FuseDaemon { + session, + threads: join_handles, + }) + } + + pub fn unmount(&mut self) -> Result<(), io::Error> { + self.session + .umount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + for thread in self.threads.drain(..) { + thread.join().map_err(|_| { + io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") + })?; + } + + Ok(()) + } +} + +impl Drop for FuseDaemon { + fn drop(&mut self) { + if let Err(error) = self.unmount() { + error!(?error, "failed to unmont fuse filesystem") + } + } } diff --git a/tvix/store/src/fuse/tests.rs b/tvix/store/src/fuse/tests.rs index 015e27ee9988..81de8b13de58 100644 --- a/tvix/store/src/fuse/tests.rs +++ b/tvix/store/src/fuse/tests.rs @@ -12,7 +12,7 @@ use crate::pathinfoservice::PathInfoService; use crate::proto::{DirectoryNode, FileNode, PathInfo}; use crate::tests::fixtures; use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; -use crate::{proto, FUSE}; +use crate::{proto, FuseDaemon, FUSE}; const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; const BLOB_B_NAME: &str = "55555555555555555555555555555555-test"; @@ -40,14 +40,14 @@ fn do_mount<P: AsRef<Path>>( path_info_service: Arc<dyn PathInfoService>, mountpoint: P, list_root: bool, -) -> io::Result<fuser::BackgroundSession> { +) -> io::Result<FuseDaemon> { let fs = FUSE::new( blob_service, directory_service, path_info_service, list_root, ); - fuser::spawn_mount2(fs, mountpoint, &[]) + FuseDaemon::new(fs, mountpoint.as_ref(), 4) } async fn populate_blob_a( @@ -294,7 +294,7 @@ async fn mount() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service, path_info_service) = gen_svcs(); - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -303,7 +303,7 @@ async fn mount() { ) .expect("must succeed"); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Ensure listing the root isn't allowed @@ -317,7 +317,7 @@ async fn root() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service, path_info_service) = gen_svcs(); - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -334,7 +334,7 @@ async fn root() { assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); } - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Ensure listing the root is allowed if configured explicitly @@ -350,7 +350,7 @@ async fn root_with_listing() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -371,7 +371,7 @@ async fn root_with_listing() { assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); } - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Ensure we can stat a file at the root @@ -387,7 +387,7 @@ async fn stat_file_at_root() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -405,7 +405,7 @@ async fn stat_file_at_root() { assert!(metadata.permissions().readonly()); assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Ensure we can read a file at the root @@ -421,7 +421,7 @@ async fn read_file_at_root() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -439,7 +439,7 @@ async fn read_file_at_root() { assert_eq!(fixtures::BLOB_A.len(), data.len()); assert_eq!(fixtures::BLOB_A.to_vec(), data); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Ensure we can read a large file at the root @@ -455,7 +455,7 @@ async fn read_large_file_at_root() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_b(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -481,7 +481,7 @@ async fn read_large_file_at_root() { assert_eq!(fixtures::BLOB_B.len(), data.len()); assert_eq!(fixtures::BLOB_B.to_vec(), data); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Read the target of a symlink @@ -497,7 +497,7 @@ async fn symlink_readlink() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_symlink(&blob_service, &directory_service, &path_info_service); - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -524,7 +524,7 @@ async fn symlink_readlink() { let e = fs::read(p).expect_err("must fail"); assert_eq!(std::io::ErrorKind::NotFound, e.kind()); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Read and stat a regular file through a symlink pointing to it. @@ -541,7 +541,7 @@ async fn read_stat_through_symlink() { populate_blob_a(&blob_service, &directory_service, &path_info_service).await; populate_symlink(&blob_service, &directory_service, &path_info_service); - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -567,7 +567,7 @@ async fn read_stat_through_symlink() { std::fs::read(p_symlink).expect("must succeed"), ); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Read a directory in the root, and validate some attributes. @@ -583,7 +583,7 @@ async fn read_stat_directory() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -599,7 +599,7 @@ async fn read_stat_directory() { assert!(metadata.is_dir()); assert!(metadata.permissions().readonly()); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -615,7 +615,7 @@ async fn read_blob_inside_dir() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -635,7 +635,7 @@ async fn read_blob_inside_dir() { let data = fs::read(&p).expect("must succeed"); assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -652,7 +652,7 @@ async fn read_blob_deep_inside_dir() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -676,7 +676,7 @@ async fn read_blob_deep_inside_dir() { let data = fs::read(&p).expect("must succeed"); assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Ensure readdir works. @@ -692,7 +692,7 @@ async fn readdir() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -732,7 +732,7 @@ async fn readdir() { assert!(e.file_type().expect("must succeed").is_dir()); } - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } #[tokio::test] @@ -748,7 +748,7 @@ async fn readdir_deep() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -775,7 +775,7 @@ async fn readdir_deep() { assert_eq!(0, e.metadata().expect("must succeed").len()); } - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Check attributes match how they show up in /nix/store normally. @@ -794,7 +794,7 @@ async fn check_attributes() { populate_symlink(&blob_service, &directory_service, &path_info_service); populate_helloworld_blob(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -840,7 +840,7 @@ async fn check_attributes() { // crtime seems MacOS only } - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } #[tokio::test] @@ -858,7 +858,7 @@ async fn compare_inodes_directories() { populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -876,7 +876,7 @@ async fn compare_inodes_directories() { fs::metadata(p_sibling_dir).expect("must succeed").ino() ); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Ensure we allocate the same inodes for the same directory contents. @@ -893,7 +893,7 @@ async fn compare_inodes_files() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -915,7 +915,7 @@ async fn compare_inodes_files() { fs::metadata(p_keep2).expect("must succeed").ino() ); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Ensure we allocate the same inode for symlinks pointing to the same targets. @@ -933,7 +933,7 @@ async fn compare_inodes_symlinks() { populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; populate_symlink2(&blob_service, &directory_service, &path_info_service); - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -951,7 +951,7 @@ async fn compare_inodes_symlinks() { fs::symlink_metadata(p2).expect("must succeed").ino() ); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Check we match paths exactly. @@ -967,7 +967,7 @@ async fn read_wrong_paths_in_root() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -1000,7 +1000,7 @@ async fn read_wrong_paths_in_root() { .join("00000000000000000000000000000000-tes") .exists()); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } /// Make sure writes are not allowed @@ -1016,7 +1016,7 @@ async fn disallow_writes() { let (blob_service, directory_service, path_info_service) = gen_svcs(); - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -1028,9 +1028,9 @@ async fn disallow_writes() { let p = tmpdir.path().join(BLOB_A_NAME); let e = std::fs::File::create(p).expect_err("must fail"); - assert_eq!(std::io::ErrorKind::Unsupported, e.kind()); + assert_eq!(Some(libc::EROFS), e.raw_os_error()); - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } #[tokio::test] @@ -1045,7 +1045,7 @@ async fn missing_directory() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service); - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -1073,7 +1073,7 @@ async fn missing_directory() { fs::metadata(p.join(".keep")).expect_err("must fail"); } - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -1088,7 +1088,7 @@ async fn missing_blob() { let (blob_service, directory_service, path_info_service) = gen_svcs(); populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service); - let fuser_session = do_mount( + let mut fuse_daemon = do_mount( blob_service, directory_service, path_info_service, @@ -1109,5 +1109,5 @@ async fn missing_blob() { fs::read(p).expect_err("must fail"); } - fuser_session.join() + fuse_daemon.unmount().expect("unmount"); } |