From 7e737fde34260daa477794d63b0b3344b4a1d81b Mon Sep 17 00:00:00 2001 From: Connor Brewster Date: Sat, 16 Sep 2023 13:54:10 -0500 Subject: refactor(tvix/store/fs): Separate FUSE and filesystem code In prepration for adding virtiofs support, I thought it would make sense to split out the filesystem implementation from FUSE itself. The `fs` module holds the tvix-store filesystem implemetation and the `fuse` module holds the code to spawn a FUSE daemon backed by multiple threads. Change-Id: I8c58447b8c3aa016a613068f8e7ec166554e237c Reviewed-on: https://cl.tvl.fyi/c/depot/+/9343 Reviewed-by: flokli Tested-by: BuildkiteCI Autosubmit: Connor Brewster --- tvix/Cargo.nix | 5 +- tvix/store/Cargo.toml | 3 +- tvix/store/src/bin/tvix-store.rs | 9 +- tvix/store/src/fs/file_attr.rs | 46 ++ tvix/store/src/fs/fuse.rs | 113 ++++ tvix/store/src/fs/inode_tracker.rs | 455 ++++++++++++++ tvix/store/src/fs/inodes.rs | 68 +++ tvix/store/src/fs/mod.rs | 660 ++++++++++++++++++++ tvix/store/src/fs/tests.rs | 1114 ++++++++++++++++++++++++++++++++++ tvix/store/src/fuse/file_attr.rs | 46 -- tvix/store/src/fuse/inode_tracker.rs | 455 -------------- tvix/store/src/fuse/inodes.rs | 68 --- tvix/store/src/fuse/mod.rs | 771 ----------------------- tvix/store/src/fuse/tests.rs | 1113 --------------------------------- tvix/store/src/lib.rs | 8 +- 15 files changed, 2471 insertions(+), 2463 deletions(-) create mode 100644 tvix/store/src/fs/file_attr.rs create mode 100644 tvix/store/src/fs/fuse.rs create mode 100644 tvix/store/src/fs/inode_tracker.rs create mode 100644 tvix/store/src/fs/inodes.rs create mode 100644 tvix/store/src/fs/mod.rs create mode 100644 tvix/store/src/fs/tests.rs delete mode 100644 tvix/store/src/fuse/file_attr.rs delete mode 100644 tvix/store/src/fuse/inode_tracker.rs delete mode 100644 tvix/store/src/fuse/inodes.rs delete mode 100644 tvix/store/src/fuse/mod.rs delete mode 100644 tvix/store/src/fuse/tests.rs diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index cd9570c48ac5..002aaeab0989 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -8859,11 +8859,12 @@ rec { ]; features = { "default" = [ "fuse" "reflection" ]; - "fuse" = [ "dep:libc" "dep:fuse-backend-rs" ]; + "fs" = [ "dep:libc" "dep:fuse-backend-rs" ]; + "fuse" = [ "fs" ]; "reflection" = [ "tonic-reflection" ]; "tonic-reflection" = [ "dep:tonic-reflection" ]; }; - resolvedDefaultFeatures = [ "default" "fuse" "reflection" "tonic-reflection" ]; + resolvedDefaultFeatures = [ "default" "fs" "fuse" "reflection" "tonic-reflection" ]; }; "typenum" = rec { crateName = "typenum"; diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index cd71555851ff..ffa4426aef6a 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -58,5 +58,6 @@ tonic-mock = { git = "https://github.com/brainrake/tonic-mock", branch = "bump-d [features] default = ["fuse", "reflection"] -fuse = ["dep:libc", "dep:fuse-backend-rs"] +fs = ["dep:libc", "dep:fuse-backend-rs"] +fuse = ["fs"] reflection = ["tonic-reflection"] diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index f707e3850d4f..1be8b00bd9b8 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -22,7 +22,12 @@ use tvix_store::proto::GRPCPathInfoServiceWrapper; use tvix_store::proto::NamedNode; use tvix_store::proto::NarInfo; use tvix_store::proto::PathInfo; -use tvix_store::{FuseDaemon, FUSE}; + +#[cfg(feature = "fs")] +use tvix_store::fs::TvixStoreFs; + +#[cfg(feature = "fuse")] +use tvix_store::fs::fuse::FuseDaemon; #[cfg(feature = "reflection")] use tvix_store::proto::FILE_DESCRIPTOR_SET; @@ -302,7 +307,7 @@ async fn main() -> Result<(), Box> { )?; let mut fuse_daemon = tokio::task::spawn_blocking(move || { - let f = FUSE::new( + let f = TvixStoreFs::new( blob_service, directory_service, path_info_service, diff --git a/tvix/store/src/fs/file_attr.rs b/tvix/store/src/fs/file_attr.rs new file mode 100644 index 000000000000..b946aa977a0a --- /dev/null +++ b/tvix/store/src/fs/file_attr.rs @@ -0,0 +1,46 @@ +use super::inodes::{DirectoryInodeData, InodeData}; +use fuse_backend_rs::abi::fuse_abi::Attr; + +/// The [Attr] describing the root +pub const ROOT_FILE_ATTR: Attr = Attr { + ino: fuse_backend_rs::api::filesystem::ROOT_ID, + size: 0, + blksize: 1024, + blocks: 0, + mode: libc::S_IFDIR | 0o555, + atime: 0, + mtime: 0, + ctime: 0, + atimensec: 0, + mtimensec: 0, + ctimensec: 0, + nlink: 0, + uid: 0, + gid: 0, + rdev: 0, + flags: 0, +}; + +/// for given &Node and inode, construct an [Attr] +pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> Attr { + Attr { + ino: inode, + // FUTUREWORK: play with this numbers, as it affects read sizes for client applications. + blocks: 1024, + size: match inode_data { + InodeData::Regular(_, size, _) => *size as u64, + InodeData::Symlink(target) => target.len() as u64, + InodeData::Directory(DirectoryInodeData::Sparse(_, size)) => *size as u64, + InodeData::Directory(DirectoryInodeData::Populated(_, ref children)) => { + children.len() as u64 + } + }, + mode: match inode_data { + InodeData::Regular(_, _, false) => libc::S_IFREG | 0o444, // no-executable files + InodeData::Regular(_, _, true) => libc::S_IFREG | 0o555, // executable files + InodeData::Symlink(_) => libc::S_IFLNK | 0o444, + InodeData::Directory(_) => libc::S_IFDIR | 0o555, + }, + ..Default::default() + } +} diff --git a/tvix/store/src/fs/fuse.rs b/tvix/store/src/fs/fuse.rs new file mode 100644 index 000000000000..8535c7858450 --- /dev/null +++ b/tvix/store/src/fs/fuse.rs @@ -0,0 +1,113 @@ +use std::{io, path::Path, sync::Arc, thread}; + +use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; +use tracing::error; + +struct FuseServer +where + FS: FileSystem + Sync + Send, +{ + server: Arc>>, + channel: fuse_backend_rs::transport::FuseChannel, +} + +impl FuseServer +where + FS: FileSystem + Sync + Send, +{ + fn start(&mut self) -> io::Result<()> { + loop { + if let Some((reader, writer)) = self + .channel + .get_request() + .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? + { + if let Err(e) = self + .server + .handle_message(reader, writer.into(), None, None) + { + match e { + // This indicates the session has been shut down. + fuse_backend_rs::Error::EncodeMessage(e) + if e.raw_os_error() == Some(libc::EBADFD) => + { + break; + } + error => { + error!(?error, "failed to handle fuse request"); + continue; + } + } + } + } else { + break; + } + } + Ok(()) + } +} + +pub struct FuseDaemon { + session: FuseSession, + threads: Vec>, +} + +impl FuseDaemon { + pub fn new(fs: FS, mountpoint: P, threads: usize) -> Result + where + FS: FileSystem + Sync + Send + 'static, + P: AsRef, + { + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + session.set_allow_other(false); + session + .mount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + let mut join_handles = Vec::with_capacity(threads); + for _ in 0..threads { + let mut server = FuseServer { + server: server.clone(), + channel: session + .new_channel() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + }; + let join_handle = thread::Builder::new() + .name("fuse_server".to_string()) + .spawn(move || { + let _ = server.start(); + })?; + join_handles.push(join_handle); + } + + Ok(FuseDaemon { + session, + threads: join_handles, + }) + } + + pub fn unmount(&mut self) -> Result<(), io::Error> { + self.session + .umount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + for thread in self.threads.drain(..) { + thread.join().map_err(|_| { + io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") + })?; + } + + Ok(()) + } +} + +impl Drop for FuseDaemon { + fn drop(&mut self) { + if let Err(error) = self.unmount() { + error!(?error, "failed to unmont fuse filesystem") + } + } +} diff --git a/tvix/store/src/fs/inode_tracker.rs b/tvix/store/src/fs/inode_tracker.rs new file mode 100644 index 000000000000..ad1ef859a2f3 --- /dev/null +++ b/tvix/store/src/fs/inode_tracker.rs @@ -0,0 +1,455 @@ +use std::{collections::HashMap, sync::Arc}; + +use crate::{proto, B3Digest}; + +use super::inodes::{DirectoryInodeData, InodeData}; + +/// InodeTracker keeps track of inodes, stores data being these inodes and deals +/// with inode allocation. +pub struct InodeTracker { + data: HashMap>, + + // lookup table for blobs by their B3Digest + blob_digest_to_inode: HashMap, + + // lookup table for symlinks by their target + symlink_target_to_inode: HashMap, + + // lookup table for directories by their B3Digest. + // Note the corresponding directory may not be present in data yet. + directory_digest_to_inode: HashMap, + + // the next inode to allocate + next_inode: u64, +} + +impl Default for InodeTracker { + fn default() -> Self { + Self { + data: Default::default(), + + blob_digest_to_inode: Default::default(), + symlink_target_to_inode: Default::default(), + directory_digest_to_inode: Default::default(), + + next_inode: 2, + } + } +} + +impl InodeTracker { + // Retrieves data for a given inode, if it exists. + pub fn get(&self, ino: u64) -> Option> { + self.data.get(&ino).cloned() + } + + // Stores data and returns the inode for it. + // In case an inode has already been allocated for the same data, that inode + // is returned, otherwise a new one is allocated. + // In case data is a [InodeData::Directory], inodes for all items are looked + // up + pub fn put(&mut self, data: InodeData) -> u64 { + match data { + InodeData::Regular(ref digest, _, _) => { + match self.blob_digest_to_inode.get(digest) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => self.insert_and_increment(data), + } + } + InodeData::Symlink(ref target) => { + match self.symlink_target_to_inode.get(target) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => self.insert_and_increment(data), + } + } + InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { + // check the lookup table if the B3Digest is known. + match self.directory_digest_to_inode.get(digest) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => { + // insert and return the inode + self.insert_and_increment(data) + } + } + } + // Inserting [DirectoryInodeData::Populated] usually replaces an + // existing [DirectoryInodeData::Sparse] one. + InodeData::Directory(DirectoryInodeData::Populated(ref digest, ref children)) => { + let dir_ino = self.directory_digest_to_inode.get(digest); + if let Some(dir_ino) = dir_ino { + let dir_ino = *dir_ino; + + // We know the data must exist, as we found it in [directory_digest_to_inode]. + let needs_update = match **self.data.get(&dir_ino).unwrap() { + InodeData::Regular(..) | InodeData::Symlink(_) => { + panic!("unexpected type at inode {}", dir_ino); + } + // already populated, nothing to do + InodeData::Directory(DirectoryInodeData::Populated(..)) => false, + // in case the actual data is sparse, replace it with the populated one. + // this allocates inodes for new children in the process. + InodeData::Directory(DirectoryInodeData::Sparse( + ref old_digest, + ref _old_size, + )) => { + // sanity checking to ensure we update the right node + debug_assert_eq!(old_digest, digest); + + true + } + }; + + if needs_update { + // populate inode fields in children + let children = self.allocate_inodes_for_children(children.to_vec()); + + // update sparse data with populated data + self.data.insert( + dir_ino, + Arc::new(InodeData::Directory(DirectoryInodeData::Populated( + digest.clone(), + children, + ))), + ); + } + + dir_ino + } else { + // populate inode fields in children + let children = self.allocate_inodes_for_children(children.to_vec()); + // insert and return InodeData + self.insert_and_increment(InodeData::Directory(DirectoryInodeData::Populated( + digest.clone(), + children, + ))) + } + } + } + } + + // Consume a list of children with zeroed inodes, and allocate (or fetch existing) inodes. + fn allocate_inodes_for_children( + &mut self, + children: Vec<(u64, proto::node::Node)>, + ) -> Vec<(u64, proto::node::Node)> { + // allocate new inodes for all children + let mut children_new: Vec<(u64, proto::node::Node)> = Vec::new(); + + for (child_ino, ref child_node) in children { + debug_assert_eq!(0, child_ino, "expected child inode to be 0"); + let child_ino = match child_node { + proto::node::Node::Directory(directory_node) => { + // Try putting the sparse data in. If we already have a + // populated version, it'll not update it. + self.put(directory_node.into()) + } + proto::node::Node::File(file_node) => self.put(file_node.into()), + proto::node::Node::Symlink(symlink_node) => self.put(symlink_node.into()), + }; + + children_new.push((child_ino, child_node.clone())) + } + children_new + } + + // Inserts the data and returns the inode it was stored at, while + // incrementing next_inode. + fn insert_and_increment(&mut self, data: InodeData) -> u64 { + let ino = self.next_inode; + // insert into lookup tables + match data { + InodeData::Regular(ref digest, _, _) => { + self.blob_digest_to_inode.insert(digest.clone(), ino); + } + InodeData::Symlink(ref target) => { + self.symlink_target_to_inode.insert(target.clone(), ino); + } + InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { + self.directory_digest_to_inode.insert(digest.clone(), ino); + } + // This is currently not used outside test fixtures. + // Usually a [DirectoryInodeData::Sparse] is inserted and later + // "upgraded" with more data. + // However, as a future optimization, a lookup for a PathInfo could trigger a + // [DirectoryService::get_recursive()] request that "forks into + // background" and prepopulates all Directories in a closure. + InodeData::Directory(DirectoryInodeData::Populated(ref digest, _)) => { + self.directory_digest_to_inode.insert(digest.clone(), ino); + } + } + // Insert data + self.data.insert(ino, Arc::new(data)); + + // increment inode counter and return old inode. + self.next_inode += 1; + ino + } +} + +#[cfg(test)] +mod tests { + use crate::fs::inodes::DirectoryInodeData; + use crate::proto; + use crate::tests::fixtures; + + use super::InodeData; + use super::InodeTracker; + + /// Getting something non-existent should be none + #[test] + fn get_nonexistent() { + let inode_tracker = InodeTracker::default(); + assert!(inode_tracker.get(1).is_none()); + } + + /// Put of a regular file should allocate a uid, which should be the same when inserting again. + #[test] + fn put_regular() { + let mut inode_tracker = InodeTracker::default(); + let f = InodeData::Regular( + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u32, + false, + ); + + // put it in + let ino = inode_tracker.put(f.clone()); + + // a get should return the right data + let data = inode_tracker.get(ino).expect("must be some"); + match *data { + InodeData::Regular(ref digest, _, _) => { + assert_eq!(&fixtures::BLOB_A_DIGEST.clone(), digest); + } + InodeData::Symlink(_) | InodeData::Directory(..) => panic!("wrong type"), + } + + // another put should return the same ino + assert_eq!(ino, inode_tracker.put(f)); + + // inserting another file should return a different ino + assert_ne!( + ino, + inode_tracker.put(InodeData::Regular( + fixtures::BLOB_B_DIGEST.clone(), + fixtures::BLOB_B.len() as u32, + false, + )) + ); + } + + // Put of a symlink should allocate a uid, which should be the same when inserting again + #[test] + fn put_symlink() { + let mut inode_tracker = InodeTracker::default(); + let f = InodeData::Symlink("target".into()); + + // put it in + let ino = inode_tracker.put(f.clone()); + + // a get should return the right data + let data = inode_tracker.get(ino).expect("must be some"); + match *data { + InodeData::Symlink(ref target) => { + assert_eq!(b"target".to_vec(), *target); + } + InodeData::Regular(..) | InodeData::Directory(..) => panic!("wrong type"), + } + + // another put should return the same ino + assert_eq!(ino, inode_tracker.put(f)); + + // inserting another file should return a different ino + assert_ne!(ino, inode_tracker.put(InodeData::Symlink("target2".into()))); + } + + // TODO: put sparse directory + + /// Put a directory into the inode tracker, which refers to a file not seen yet. + #[test] + fn put_directory_leaf() { + let mut inode_tracker = InodeTracker::default(); + + // this is a directory with a single item, a ".keep" file pointing to a 0 bytes blob. + let dir: InodeData = fixtures::DIRECTORY_WITH_KEEP.clone().into(); + + // put it in + let dir_ino = inode_tracker.put(dir); + + // a get should return the right data + let data = inode_tracker.get(dir_ino).expect("must be some"); + match *data { + InodeData::Directory(super::DirectoryInodeData::Sparse(..)) => { + panic!("wrong type"); + } + InodeData::Directory(super::DirectoryInodeData::Populated( + ref directory_digest, + ref children, + )) => { + // ensure the directory digest matches + assert_eq!(&fixtures::DIRECTORY_WITH_KEEP.digest(), directory_digest); + + // ensure the child is populated, with a different inode than + // the parent, and the data matches expectations. + assert_eq!(1, children.len()); + let (child_ino, child_node) = children.first().unwrap(); + assert_ne!(dir_ino, *child_ino); + assert_eq!( + &proto::node::Node::File( + fixtures::DIRECTORY_WITH_KEEP.files.first().unwrap().clone() + ), + child_node + ); + + // ensure looking up that inode directly returns the data + let child_data = inode_tracker.get(*child_ino).expect("must exist"); + match *child_data { + InodeData::Regular(ref digest, size, executable) => { + assert_eq!(&fixtures::EMPTY_BLOB_DIGEST.clone(), digest); + assert_eq!(0, size); + assert!(!executable); + } + InodeData::Symlink(_) | InodeData::Directory(..) => panic!("wrong type"), + } + } + InodeData::Symlink(_) | InodeData::Regular(..) => panic!("wrong type"), + } + } + + /// Put a directory into the inode tracker, referring to files, directories + /// and symlinks not seen yet. + #[test] + fn put_directory_complicated() { + let mut inode_tracker = InodeTracker::default(); + + // this is a directory with a single item, a ".keep" file pointing to a 0 bytes blob. + let dir_complicated: InodeData = fixtures::DIRECTORY_COMPLICATED.clone().into(); + + // put it in + let dir_complicated_ino = inode_tracker.put(dir_complicated); + + // a get should return the right data + let dir_data = inode_tracker + .get(dir_complicated_ino) + .expect("must be some"); + + let child_dir_ino = match *dir_data { + InodeData::Directory(DirectoryInodeData::Sparse(..)) => { + panic!("wrong type"); + } + InodeData::Directory(DirectoryInodeData::Populated( + ref directory_digest, + ref children, + )) => { + // assert the directory digest matches + assert_eq!(&fixtures::DIRECTORY_COMPLICATED.digest(), directory_digest); + + // ensure there's three children, all with different inodes + assert_eq!(3, children.len()); + let mut seen_inodes = Vec::from([dir_complicated_ino]); + + // check the first child (.keep) + { + let (child_ino, child_node) = &children[0]; + assert!(!seen_inodes.contains(child_ino)); + assert_eq!( + &proto::node::Node::File(fixtures::DIRECTORY_COMPLICATED.files[0].clone()), + child_node + ); + seen_inodes.push(*child_ino); + } + + // check the second child (aa) + { + let (child_ino, child_node) = &children[1]; + assert!(!seen_inodes.contains(child_ino)); + assert_eq!( + &proto::node::Node::Symlink( + fixtures::DIRECTORY_COMPLICATED.symlinks[0].clone() + ), + child_node + ); + seen_inodes.push(*child_ino); + } + + // check the third child (keep) + { + let (child_ino, child_node) = &children[2]; + assert!(!seen_inodes.contains(child_ino)); + assert_eq!( + &proto::node::Node::Directory( + fixtures::DIRECTORY_COMPLICATED.directories[0].clone() + ), + child_node + ); + seen_inodes.push(*child_ino); + + // return the child_ino + *child_ino + } + } + InodeData::Regular(..) | InodeData::Symlink(_) => panic!("wrong type"), + }; + + // get of the inode for child_ino + let child_dir_data = inode_tracker.get(child_dir_ino).expect("must be some"); + // it should be a sparse InodeData::Directory with the right digest. + match *child_dir_data { + InodeData::Directory(DirectoryInodeData::Sparse( + ref child_dir_digest, + child_dir_size, + )) => { + assert_eq!(&fixtures::DIRECTORY_WITH_KEEP.digest(), child_dir_digest); + assert_eq!(fixtures::DIRECTORY_WITH_KEEP.size(), child_dir_size); + } + InodeData::Directory(DirectoryInodeData::Populated(..)) + | InodeData::Regular(..) + | InodeData::Symlink(_) => { + panic!("wrong type") + } + } + + // put DIRECTORY_WITH_KEEP, which should return the same ino as [child_dir_ino], + // but update the sparse object to a populated one at the same time. + let child_dir_ino2 = inode_tracker.put(fixtures::DIRECTORY_WITH_KEEP.clone().into()); + assert_eq!(child_dir_ino, child_dir_ino2); + + // get the data + match *inode_tracker.get(child_dir_ino).expect("must be some") { + // it should be a populated InodeData::Directory with the right digest! + InodeData::Directory(DirectoryInodeData::Populated( + ref directory_digest, + ref children, + )) => { + // ensure the directory digest matches + assert_eq!(&fixtures::DIRECTORY_WITH_KEEP.digest(), directory_digest); + + // ensure the child is populated, with a different inode than + // the parent, and the data matches expectations. + assert_eq!(1, children.len()); + let (child_node_inode, child_node) = children.first().unwrap(); + assert_ne!(dir_complicated_ino, *child_node_inode); + assert_eq!( + &proto::node::Node::File( + fixtures::DIRECTORY_WITH_KEEP.files.first().unwrap().clone() + ), + child_node + ); + } + InodeData::Directory(DirectoryInodeData::Sparse(..)) + | InodeData::Regular(..) + | InodeData::Symlink(_) => panic!("wrong type"), + } + } +} + +// TODO: add test inserting a populated one first, then ensure an update doesn't degrade it back to sparse. diff --git a/tvix/store/src/fs/inodes.rs b/tvix/store/src/fs/inodes.rs new file mode 100644 index 000000000000..e8959ce3629b --- /dev/null +++ b/tvix/store/src/fs/inodes.rs @@ -0,0 +1,68 @@ +//! This module contains all the data structures used to track information +//! about inodes, which present tvix-store nodes in a filesystem. +use crate::{proto, B3Digest}; + +#[derive(Clone, Debug)] +pub enum InodeData { + Regular(B3Digest, u32, bool), // digest, size, executable + Symlink(bytes::Bytes), // target + Directory(DirectoryInodeData), // either [DirectoryInodeData:Sparse] or [DirectoryInodeData:Populated] +} + +/// This encodes the two different states of [InodeData::Directory]. +/// Either the data still is sparse (we only saw a [proto::DirectoryNode], but +/// didn't fetch the [proto::Directory] struct yet, +/// or we processed a lookup and did fetch the data. +#[derive(Clone, Debug)] +pub enum DirectoryInodeData { + Sparse(B3Digest, u32), // digest, size + Populated(B3Digest, Vec<(u64, proto::node::Node)>), // [(child_inode, node)] +} + +impl From<&proto::node::Node> for InodeData { + fn from(value: &proto::node::Node) -> Self { + match value { + proto::node::Node::Directory(directory_node) => directory_node.into(), + proto::node::Node::File(file_node) => file_node.into(), + proto::node::Node::Symlink(symlink_node) => symlink_node.into(), + } + } +} + +impl From<&proto::SymlinkNode> for InodeData { + fn from(value: &proto::SymlinkNode) -> Self { + InodeData::Symlink(value.target.clone()) + } +} + +impl From<&proto::FileNode> for InodeData { + fn from(value: &proto::FileNode) -> Self { + InodeData::Regular( + value.digest.clone().try_into().unwrap(), + value.size, + value.executable, + ) + } +} + +/// Converts a DirectoryNode to a sparsely populated InodeData::Directory. +impl From<&proto::DirectoryNode> for InodeData { + fn from(value: &proto::DirectoryNode) -> Self { + InodeData::Directory(DirectoryInodeData::Sparse( + value.digest.clone().try_into().unwrap(), + value.size, + )) + } +} + +/// converts a proto::Directory to a InodeData::Directory(DirectoryInodeData::Populated(..)). +/// The inodes for each child are 0, because it's up to the InodeTracker to allocate them. +impl From for InodeData { + fn from(value: proto::Directory) -> Self { + let digest = value.digest(); + + let children: Vec<(u64, proto::node::Node)> = value.nodes().map(|node| (0, node)).collect(); + + InodeData::Directory(DirectoryInodeData::Populated(digest, children)) + } +} diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs new file mode 100644 index 000000000000..48e605406331 --- /dev/null +++ b/tvix/store/src/fs/mod.rs @@ -0,0 +1,660 @@ +mod file_attr; +mod inode_tracker; +mod inodes; + +#[cfg(feature = "fuse")] +pub mod fuse; + +#[cfg(test)] +mod tests; + +use crate::{ + blobservice::{BlobReader, BlobService}, + directoryservice::DirectoryService, + pathinfoservice::PathInfoService, + proto::{node::Node, NamedNode}, + B3Digest, Error, +}; +use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; +use nix_compat::store_path::StorePath; +use parking_lot::RwLock; +use std::{ + collections::HashMap, + io, + str::FromStr, + sync::atomic::AtomicU64, + sync::{atomic::Ordering, Arc}, + time::Duration, +}; +use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; +use tracing::{debug, info_span, warn}; + +use self::{ + file_attr::{gen_file_attr, ROOT_FILE_ATTR}, + inode_tracker::InodeTracker, + inodes::{DirectoryInodeData, InodeData}, +}; + +/// This implements a read-only FUSE filesystem for a tvix-store +/// with the passed [BlobService], [DirectoryService] and [PathInfoService]. +/// +/// We don't allow listing on the root mountpoint (inode 0). +/// In the future, this might be made configurable once a listing method is +/// added to [self.path_info_service], and then show all store paths in that +/// store. +/// +/// Linux uses inodes in filesystems. When implementing FUSE, most calls are +/// *for* a given inode. +/// +/// This means, we need to have a stable mapping of inode numbers to the +/// corresponding store nodes. +/// +/// We internally delegate all inode allocation and state keeping to the +/// inode tracker, and store the currently "explored" store paths together with +/// root inode of the root. +/// +/// There's some places where inodes are allocated / data inserted into +/// the inode tracker, if not allocated before already: +/// - Processing a `lookup` request, either in the mount root, or somewhere +/// deeper +/// - Processing a `readdir` request +/// +/// Things pointing to the same contents get the same inodes, irrespective of +/// their own location. +/// This means: +/// - Symlinks with the same target will get the same inode. +/// - Regular/executable files with the same contents will get the same inode +/// - Directories with the same contents will get the same inode. +/// +/// Due to the above being valid across the whole store, and considering the +/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed +/// allocation", aka reserve Directory.size inodes for each PathInfo. +pub struct TvixStoreFs { + blob_service: Arc, + directory_service: Arc, + path_info_service: Arc, + + /// Whether to (try) listing elements in the root. + list_root: bool, + + /// This maps a given StorePath to the inode we allocated for the root inode. + store_paths: RwLock>, + + /// This keeps track of inodes and data alongside them. + inode_tracker: RwLock, + + /// This holds all open file handles + file_handles: RwLock>>>>, + + next_file_handle: AtomicU64, + + tokio_handle: tokio::runtime::Handle, +} + +impl TvixStoreFs { + pub fn new( + blob_service: Arc, + directory_service: Arc, + path_info_service: Arc, + list_root: bool, + ) -> Self { + Self { + blob_service, + directory_service, + path_info_service, + + list_root, + + store_paths: RwLock::new(HashMap::default()), + inode_tracker: RwLock::new(Default::default()), + + file_handles: RwLock::new(Default::default()), + next_file_handle: AtomicU64::new(1), + tokio_handle: tokio::runtime::Handle::current(), + } + } + + /// This will turn a lookup request for [std::ffi::OsStr] in the root to + /// a ino and [InodeData]. + /// It will peek in [self.store_paths], and then either look it up from + /// [self.inode_tracker], + /// or otherwise fetch from [self.path_info_service], and then insert into + /// [self.inode_tracker]. + fn name_in_root_to_ino_and_data( + &self, + name: &std::ffi::CStr, + ) -> Result)>, Error> { + // parse the name into a [StorePath]. + let store_path = if let Some(name) = name.to_str().ok() { + match StorePath::from_str(name) { + Ok(store_path) => store_path, + Err(e) => { + debug!(e=?e, "unable to parse as store path"); + // This is not an error, but a "ENOENT", as someone can stat + // a file inside the root that's no valid store path + return Ok(None); + } + } + } else { + debug!("{name:?} is not a valid utf-8 string"); + // same here. + return Ok(None); + }; + + let ino = { + // This extra scope makes sure we drop the read lock + // immediately after reading, to prevent deadlocks. + let store_paths = self.store_paths.read(); + store_paths.get(&store_path).cloned() + }; + + if let Some(ino) = ino { + // If we already have that store path, lookup the inode from + // self.store_paths and then get the data from [self.inode_tracker], + // which in the case of a [InodeData::Directory] will be fully + // populated. + Ok(Some(( + ino, + self.inode_tracker.read().get(ino).expect("must exist"), + ))) + } else { + // If we don't have it, look it up in PathInfoService. + match self.path_info_service.get(store_path.digest)? { + // the pathinfo doesn't exist, so the file doesn't exist. + None => Ok(None), + Some(path_info) => { + // The pathinfo does exist, so there must be a root node + let root_node = path_info.node.unwrap().node.unwrap(); + + // The name must match what's passed in the lookup, otherwise we return nothing. + if root_node.get_name() != store_path.to_string().as_bytes() { + return Ok(None); + } + + // Let's check if someone else beat us to updating the inode tracker and + // store_paths map. + let mut store_paths = self.store_paths.write(); + if let Some(ino) = store_paths.get(&store_path).cloned() { + return Ok(Some(( + ino, + self.inode_tracker.read().get(ino).expect("must exist"), + ))); + } + + // insert the (sparse) inode data and register in + // self.store_paths. + // FUTUREWORK: change put to return the data after + // inserting, so we don't need to lookup a second + // time? + let (ino, inode) = { + let mut inode_tracker = self.inode_tracker.write(); + let ino = inode_tracker.put((&root_node).into()); + (ino, inode_tracker.get(ino).unwrap()) + }; + store_paths.insert(store_path, ino); + + Ok(Some((ino, inode))) + } + } + } + } + + /// This will lookup a directory by digest, and will turn it into a + /// [InodeData::Directory(DirectoryInodeData::Populated(..))]. + /// This is both used to initially insert the root node of a store path, + /// as well as when looking up an intermediate DirectoryNode. + fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result { + match self.directory_service.get(directory_digest) { + Err(e) => { + warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory"); + Err(e) + } + // If the Directory can't be found, this is a hole, bail out. + Ok(None) => { + tracing::error!(directory.digest=%directory_digest, "directory not found in directory service"); + Err(Error::StorageError(format!( + "directory {} not found", + directory_digest + ))) + } + Ok(Some(directory)) => Ok(directory.into()), + } + } +} + +impl FileSystem for TvixStoreFs { + type Inode = u64; + type Handle = u64; + + fn init(&self, _capable: FsOptions) -> io::Result { + Ok(FsOptions::empty()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn getattr( + &self, + _ctx: &Context, + inode: Self::Inode, + _handle: Option, + ) -> io::Result<(libc::stat64, Duration)> { + if inode == ROOT_ID { + return Ok((ROOT_FILE_ATTR.into(), Duration::MAX)); + } + + match self.inode_tracker.read().get(inode) { + None => return Err(io::Error::from_raw_os_error(libc::ENOENT)), + Some(node) => { + debug!(node = ?node, "found node"); + Ok((gen_file_attr(&node, inode).into(), Duration::MAX)) + } + } + } + + #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))] + fn lookup( + &self, + _ctx: &Context, + parent: Self::Inode, + name: &std::ffi::CStr, + ) -> io::Result { + debug!("lookup"); + + // This goes from a parent inode to a node. + // - If the parent is [ROOT_ID], we need to check + // [self.store_paths] (fetching from PathInfoService if needed) + // - Otherwise, lookup the parent in [self.inode_tracker] (which must be + // a [InodeData::Directory]), and find the child with that name. + if parent == ROOT_ID { + return match self.name_in_root_to_ino_and_data(name) { + Err(e) => { + warn!("{}", e); + Err(io::Error::from_raw_os_error(libc::ENOENT)) + } + Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), + Ok(Some((ino, inode_data))) => { + debug!(inode_data=?&inode_data, ino=ino, "Some"); + Ok(fuse_backend_rs::api::filesystem::Entry { + inode: ino, + attr: gen_file_attr(&inode_data, ino).into(), + attr_timeout: Duration::MAX, + entry_timeout: Duration::MAX, + ..Default::default() + }) + } + }; + } + + // This is the "lookup for "a" inside inode 42. + // We already know that inode 42 must be a directory. + // It might not be populated yet, so if it isn't, we do (by + // fetching from [self.directory_service]), and save the result in + // [self.inode_tracker]. + // Now it for sure is populated, so we search for that name in the + // list of children and return the FileAttrs. + + // TODO: Reduce the critical section of this write lock. + let mut inode_tracker = self.inode_tracker.write(); + let parent_data = inode_tracker.get(parent).unwrap(); + let parent_data = match *parent_data { + InodeData::Regular(..) | InodeData::Symlink(_) => { + // if the parent inode was not a directory, this doesn't make sense + return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); + } + InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { + match self.fetch_directory_inode_data(parent_digest) { + Ok(new_data) => { + // update data in [self.inode_tracker] with populated variant. + // FUTUREWORK: change put to return the data after + // inserting, so we don't need to lookup a second + // time? + let ino = inode_tracker.put(new_data); + inode_tracker.get(ino).unwrap() + } + Err(_e) => { + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + } + } + InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data, + }; + + // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))]. + let (parent_digest, children) = if let InodeData::Directory( + DirectoryInodeData::Populated(ref parent_digest, ref children), + ) = *parent_data + { + (parent_digest, children) + } else { + panic!("unexpected type") + }; + let span = info_span!("lookup", directory.digest = %parent_digest); + let _enter = span.enter(); + + // in the children, find the one with the desired name. + if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { + // lookup the child [InodeData] in [self.inode_tracker]. + // We know the inodes for children have already been allocated. + let child_inode_data = inode_tracker.get(*child_ino).unwrap(); + + // Reply with the file attributes for the child. + // For child directories, we still have all data we need to reply. + Ok(fuse_backend_rs::api::filesystem::Entry { + inode: *child_ino, + attr: gen_file_attr(&child_inode_data, *child_ino).into(), + attr_timeout: Duration::MAX, + entry_timeout: Duration::MAX, + ..Default::default() + }) + } else { + // Child not found, return ENOENT. + Err(io::Error::from_raw_os_error(libc::ENOENT)) + } + } + + // TODO: readdirplus? + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))] + fn readdir( + &self, + _ctx: &Context, + inode: Self::Inode, + _handle: Self::Handle, + _size: u32, + offset: u64, + add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result, + ) -> io::Result<()> { + debug!("readdir"); + + if inode == ROOT_ID { + if !self.list_root { + return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo + } else { + for (i, path_info) in self + .path_info_service + .list() + .skip(offset as usize) + .enumerate() + { + let path_info = match path_info { + Err(e) => { + warn!("failed to retrieve pathinfo: {}", e); + return Err(io::Error::from_raw_os_error(libc::EPERM)); + } + Ok(path_info) => path_info, + }; + + // We know the root node exists and the store_path can be parsed because clients MUST validate. + let root_node = path_info.node.unwrap().node.unwrap(); + let store_path = StorePath::from_bytes(root_node.get_name()).unwrap(); + + let ino = { + // This extra scope makes sure we drop the read lock + // immediately after reading, to prevent deadlocks. + let store_paths = self.store_paths.read(); + store_paths.get(&store_path).cloned() + }; + let ino = match ino { + Some(ino) => ino, + None => { + // insert the (sparse) inode data and register in + // self.store_paths. + let ino = self.inode_tracker.write().put((&root_node).into()); + self.store_paths.write().insert(store_path.clone(), ino); + ino + } + }; + + let ty = match root_node { + Node::Directory(_) => libc::S_IFDIR, + Node::File(_) => libc::S_IFREG, + Node::Symlink(_) => libc::S_IFLNK, + }; + + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino, + offset: offset + i as u64 + 1, + type_: ty, + name: store_path.to_string().as_bytes(), + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { + break; + } + } + return Ok(()); + } + } + + // lookup the inode data. + let mut inode_tracker = self.inode_tracker.write(); + let dir_inode_data = inode_tracker.get(inode).unwrap(); + let dir_inode_data = match *dir_inode_data { + InodeData::Regular(..) | InodeData::Symlink(..) => { + warn!("Not a directory"); + return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); + } + InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => { + match self.fetch_directory_inode_data(directory_digest) { + Ok(new_data) => { + // update data in [self.inode_tracker] with populated variant. + // FUTUREWORK: change put to return the data after + // inserting, so we don't need to lookup a second + // time? + let ino = inode_tracker.put(new_data.clone()); + inode_tracker.get(ino).unwrap() + } + Err(_e) => { + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + } + } + InodeData::Directory(DirectoryInodeData::Populated(..)) => dir_inode_data, + }; + + // now parent_data can only be InodeData::Directory(DirectoryInodeData::Populated(..)) + if let InodeData::Directory(DirectoryInodeData::Populated(ref _digest, ref children)) = + *dir_inode_data + { + for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() { + // the second parameter will become the "offset" parameter on the next call. + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino: *ino, + offset: offset + i as u64 + 1, + type_: match child_node { + Node::Directory(_) => libc::S_IFDIR, + Node::File(_) => libc::S_IFREG, + Node::Symlink(_) => libc::S_IFLNK, + }, + name: child_node.get_name(), + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { + break; + } + } + } else { + panic!("unexpected type") + } + + Ok(()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn open( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + _fuse_flags: u32, + ) -> io::Result<( + Option, + fuse_backend_rs::api::filesystem::OpenOptions, + )> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // lookup the inode + match *self.inode_tracker.read().get(inode).unwrap() { + // read is invalid on non-files. + InodeData::Directory(..) | InodeData::Symlink(_) => { + warn!("is directory"); + return Err(io::Error::from_raw_os_error(libc::EISDIR)); + } + InodeData::Regular(ref blob_digest, _blob_size, _) => { + let span = info_span!("read", blob.digest = %blob_digest); + let _enter = span.enter(); + + let blob_service = self.blob_service.clone(); + let blob_digest = blob_digest.clone(); + + let task = self + .tokio_handle + .spawn(async move { blob_service.open_read(&blob_digest).await }); + + let blob_reader = self.tokio_handle.block_on(task).unwrap(); + + match blob_reader { + Ok(None) => { + warn!("blob not found"); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + Err(e) => { + warn!(e=?e, "error opening blob"); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + Ok(Some(blob_reader)) => { + // get a new file handle + // TODO: this will overflow after 2**64 operations, + // which is fine for now. + // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 + // for the discussion on alternatives. + let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst); + + debug!("add file handle {}", fh); + self.file_handles + .write() + .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader))); + + Ok(( + Some(fh), + fuse_backend_rs::api::filesystem::OpenOptions::empty(), + )) + } + } + } + } + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))] + fn release( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + handle: Self::Handle, + _flush: bool, + _flock_release: bool, + _lock_owner: Option, + ) -> io::Result<()> { + // remove and get ownership on the blob reader + match self.file_handles.write().remove(&handle) { + // drop it, which will close it. + Some(blob_reader) => drop(blob_reader), + None => { + // These might already be dropped if a read error occured. + debug!("file_handle {} not found", handle); + } + } + + Ok(()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))] + fn read( + &self, + _ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter, + size: u32, + offset: u64, + _lock_owner: Option, + _flags: u32, + ) -> io::Result { + debug!("read"); + + // We need to take out the blob reader from self.file_handles, so we can + // interact with it in the separate task. + // On success, we pass it back out of the task, so we can put it back in self.file_handles. + let blob_reader = match self.file_handles.read().get(&handle) { + Some(blob_reader) => blob_reader.clone(), + None => { + warn!("file handle {} unknown", handle); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + }; + + let task = self.tokio_handle.spawn(async move { + let mut blob_reader = blob_reader.lock().await; + + // seek to the offset specified, which is relative to the start of the file. + let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await; + + match resp { + Ok(pos) => { + debug_assert_eq!(offset as u64, pos); + } + Err(e) => { + warn!("failed to seek to offset {}: {}", offset, e); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + } + + // As written in the fuse docs, read should send exactly the number + // of bytes requested except on EOF or error. + + let mut buf: Vec = Vec::with_capacity(size as usize); + + while (buf.len() as u64) < size as u64 { + let int_buf = blob_reader.fill_buf().await?; + // copy things from the internal buffer into buf to fill it till up until size + + // an empty buffer signals we reached EOF. + if int_buf.is_empty() { + break; + } + + // calculate how many bytes we can read from int_buf. + // It's either all of int_buf, or the number of bytes missing in buf to reach size. + let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len()); + + // copy these bytes into our buffer + buf.extend_from_slice(&int_buf[..len_to_copy]); + // and consume them in the buffered reader. + blob_reader.consume(len_to_copy); + } + + Ok(buf) + }); + + let buf = self.tokio_handle.block_on(task).unwrap()?; + + w.write(&buf) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // lookup the inode + match *self.inode_tracker.read().get(inode).unwrap() { + InodeData::Directory(..) | InodeData::Regular(..) => { + Err(io::Error::from_raw_os_error(libc::EINVAL)) + } + InodeData::Symlink(ref target) => Ok(target.to_vec()), + } + } +} diff --git a/tvix/store/src/fs/tests.rs b/tvix/store/src/fs/tests.rs new file mode 100644 index 000000000000..30f5ca3f40aa --- /dev/null +++ b/tvix/store/src/fs/tests.rs @@ -0,0 +1,1114 @@ +use std::io::Cursor; +use std::os::unix::prelude::MetadataExt; +use std::path::Path; +use std::sync::Arc; +use std::{fs, io}; + +use tempfile::TempDir; + +use crate::blobservice::BlobService; +use crate::directoryservice::DirectoryService; +use crate::fs::{fuse::FuseDaemon, TvixStoreFs}; +use crate::pathinfoservice::PathInfoService; +use crate::proto; +use crate::proto::{DirectoryNode, FileNode, PathInfo}; +use crate::tests::fixtures; +use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; + +const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; +const BLOB_B_NAME: &str = "55555555555555555555555555555555-test"; +const HELLOWORLD_BLOB_NAME: &str = "66666666666666666666666666666666-test"; +const SYMLINK_NAME: &str = "11111111111111111111111111111111-test"; +const SYMLINK_NAME2: &str = "44444444444444444444444444444444-test"; +const DIRECTORY_WITH_KEEP_NAME: &str = "22222222222222222222222222222222-test"; +const DIRECTORY_COMPLICATED_NAME: &str = "33333333333333333333333333333333-test"; + +fn gen_svcs() -> ( + Arc, + Arc, + Arc, +) { + let blob_service = gen_blob_service(); + let directory_service = gen_directory_service(); + let path_info_service = gen_pathinfo_service(blob_service.clone(), directory_service.clone()); + + (blob_service, directory_service, path_info_service) +} + +fn do_mount>( + blob_service: Arc, + directory_service: Arc, + path_info_service: Arc, + mountpoint: P, + list_root: bool, +) -> io::Result { + let fs = TvixStoreFs::new( + blob_service, + directory_service, + path_info_service, + list_root, + ); + FuseDaemon::new(fs, mountpoint.as_ref(), 4) +} + +async fn populate_blob_a( + blob_service: &Arc, + _directory_service: &Arc, + path_info_service: &Arc, +) { + // Upload BLOB_A + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + // Create a PathInfo for it + let path_info = PathInfo { + node: Some(proto::Node { + node: Some(proto::node::Node::File(FileNode { + name: BLOB_A_NAME.into(), + digest: fixtures::BLOB_A_DIGEST.clone().into(), + size: fixtures::BLOB_A.len() as u32, + executable: false, + })), + }), + ..Default::default() + }; + path_info_service.put(path_info).expect("must succeed"); +} + +async fn populate_blob_b( + blob_service: &Arc, + _directory_service: &Arc, + path_info_service: &Arc, +) { + // Upload BLOB_B + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + // Create a PathInfo for it + let path_info = PathInfo { + node: Some(proto::Node { + node: Some(proto::node::Node::File(FileNode { + name: BLOB_B_NAME.into(), + digest: fixtures::BLOB_B_DIGEST.clone().into(), + size: fixtures::BLOB_B.len() as u32, + executable: false, + })), + }), + ..Default::default() + }; + path_info_service.put(path_info).expect("must succeed"); +} + +/// adds a blob containing helloworld and marks it as executable +async fn populate_helloworld_blob( + blob_service: &Arc, + _directory_service: &Arc, + path_info_service: &Arc, +) { + // Upload BLOB_B + let mut bw = blob_service.open_write().await; + tokio::io::copy( + &mut Cursor::new(fixtures::HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut bw, + ) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + // Create a PathInfo for it + let path_info = PathInfo { + node: Some(proto::Node { + node: Some(proto::node::Node::File(FileNode { + name: HELLOWORLD_BLOB_NAME.into(), + digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), + size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u32, + executable: true, + })), + }), + ..Default::default() + }; + path_info_service.put(path_info).expect("must succeed"); +} + +fn populate_symlink( + _blob_service: &Arc, + _directory_service: &Arc, + path_info_service: &Arc, +) { + // Create a PathInfo for it + let path_info = PathInfo { + node: Some(proto::Node { + node: Some(proto::node::Node::Symlink(proto::SymlinkNode { + name: SYMLINK_NAME.into(), + target: BLOB_A_NAME.into(), + })), + }), + ..Default::default() + }; + path_info_service.put(path_info).expect("must succeed"); +} + +/// This writes a symlink pointing to /nix/store/somewhereelse, +/// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. +fn populate_symlink2( + _blob_service: &Arc, + _directory_service: &Arc, + path_info_service: &Arc, +) { + // Create a PathInfo for it + let path_info = PathInfo { + node: Some(proto::Node { + node: Some(proto::node::Node::Symlink(proto::SymlinkNode { + name: SYMLINK_NAME2.into(), + target: "/nix/store/somewhereelse".into(), + })), + }), + ..Default::default() + }; + path_info_service.put(path_info).expect("must succeed"); +} + +async fn populate_directory_with_keep( + blob_service: &Arc, + directory_service: &Arc, + path_info_service: &Arc, +) { + // upload empty blob + let mut bw = blob_service.open_write().await; + assert_eq!( + fixtures::EMPTY_BLOB_DIGEST.to_vec(), + bw.close().await.expect("must succeed closing").to_vec(), + ); + + // upload directory + directory_service + .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .expect("must succeed uploading"); + + // upload pathinfo + let path_info = PathInfo { + node: Some(proto::Node { + node: Some(proto::node::Node::Directory(DirectoryNode { + name: DIRECTORY_WITH_KEEP_NAME.into(), + digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + size: fixtures::DIRECTORY_WITH_KEEP.size(), + })), + }), + ..Default::default() + }; + path_info_service.put(path_info).expect("must succeed"); +} + +/// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory +/// itself. +fn populate_pathinfo_without_directory( + _: &Arc, + _: &Arc, + path_info_service: &Arc, +) { + // upload pathinfo + let path_info = PathInfo { + node: Some(proto::Node { + node: Some(proto::node::Node::Directory(DirectoryNode { + name: DIRECTORY_WITH_KEEP_NAME.into(), + digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + size: fixtures::DIRECTORY_WITH_KEEP.size(), + })), + }), + ..Default::default() + }; + path_info_service.put(path_info).expect("must succeed"); +} + +/// Insert , but don't provide the blob .keep is pointing to +fn populate_blob_a_without_blob( + _: &Arc, + _: &Arc, + path_info_service: &Arc, +) { + // Create a PathInfo for blob A + let path_info = PathInfo { + node: Some(proto::Node { + node: Some(proto::node::Node::File(FileNode { + name: BLOB_A_NAME.into(), + digest: fixtures::BLOB_A_DIGEST.clone().into(), + size: fixtures::BLOB_A.len() as u32, + executable: false, + })), + }), + ..Default::default() + }; + path_info_service.put(path_info).expect("must succeed"); +} + +async fn populate_directory_complicated( + blob_service: &Arc, + directory_service: &Arc, + path_info_service: &Arc, +) { + // upload empty blob + let mut bw = blob_service.open_write().await; + assert_eq!( + fixtures::EMPTY_BLOB_DIGEST.to_vec(), + bw.close().await.expect("must succeed closing").to_vec(), + ); + + // upload inner directory + directory_service + .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .expect("must succeed uploading"); + + // uplodad parent directory + directory_service + .put(fixtures::DIRECTORY_COMPLICATED.clone()) + .expect("must succeed uploading"); + + // upload pathinfo + let path_info = PathInfo { + node: Some(proto::Node { + node: Some(proto::node::Node::Directory(DirectoryNode { + name: DIRECTORY_COMPLICATED_NAME.into(), + digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), + size: fixtures::DIRECTORY_COMPLICATED.size(), + })), + }), + ..Default::default() + }; + path_info_service.put(path_info).expect("must succeed"); +} + +/// Ensure mounting itself doesn't fail +#[tokio::test] +async fn mount() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure listing the root isn't allowed +#[tokio::test] +async fn root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + { + // read_dir succeeds, but getting the first element will fail. + let mut it = fs::read_dir(tmpdir).expect("must succeed"); + + let err = it.next().expect("must be some").expect_err("must be err"); + assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure listing the root is allowed if configured explicitly +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn root_with_listing() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + true, /* allow listing */ + ) + .expect("must succeed"); + + { + // read_dir succeeds, but getting the first element will fail. + let mut it = fs::read_dir(tmpdir).expect("must succeed"); + + let e = it.next().expect("must be some").expect("must succeed"); + + let metadata = e.metadata().expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can stat a file at the root +#[tokio::test] +async fn stat_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + // peek at the file metadata + let metadata = fs::metadata(p).expect("must succeed"); + + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can read a file at the root +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + // read the file contents + let data = fs::read(p).expect("must succeed"); + + // ensure size and contents match + assert_eq!(fixtures::BLOB_A.len(), data.len()); + assert_eq!(fixtures::BLOB_A.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can read a large file at the root +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_large_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_b(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_B_NAME); + { + // peek at the file metadata + let metadata = fs::metadata(&p).expect("must succeed"); + + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_B.len() as u64, metadata.len()); + } + + // read the file contents + let data = fs::read(p).expect("must succeed"); + + // ensure size and contents match + assert_eq!(fixtures::BLOB_B.len(), data.len()); + assert_eq!(fixtures::BLOB_B.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read the target of a symlink +#[tokio::test] +async fn symlink_readlink() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_symlink(&blob_service, &directory_service, &path_info_service); + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(SYMLINK_NAME); + + let target = fs::read_link(&p).expect("must succeed"); + assert_eq!(BLOB_A_NAME, target.to_str().unwrap()); + + // peek at the file metadata, which follows symlinks. + // this must fail, as we didn't populate the target. + let e = fs::metadata(&p).expect_err("must fail"); + assert_eq!(std::io::ErrorKind::NotFound, e.kind()); + + // peeking at the file metadata without following symlinks will succeed. + let metadata = fs::symlink_metadata(&p).expect("must succeed"); + assert!(metadata.is_symlink()); + + // reading from the symlink (which follows) will fail, because the target doesn't exist. + let e = fs::read(p).expect_err("must fail"); + assert_eq!(std::io::ErrorKind::NotFound, e.kind()); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read and stat a regular file through a symlink pointing to it. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_stat_through_symlink() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + populate_symlink(&blob_service, &directory_service, &path_info_service); + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_symlink = tmpdir.path().join(SYMLINK_NAME); + let p_blob = tmpdir.path().join(SYMLINK_NAME); + + // peek at the file metadata, which follows symlinks. + // this must now return the same metadata as when statting at the target directly. + let metadata_symlink = fs::metadata(&p_symlink).expect("must succeed"); + let metadata_blob = fs::metadata(&p_blob).expect("must succeed"); + assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type()); + assert_eq!(metadata_blob.len(), metadata_symlink.len()); + + // reading from the symlink (which follows) will return the same data as if + // we were reading from the file directly. + assert_eq!( + std::fs::read(p_blob).expect("must succeed"), + std::fs::read(p_symlink).expect("must succeed"), + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read a directory in the root, and validate some attributes. +#[tokio::test] +async fn read_stat_directory() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + + // peek at the metadata of the directory + let metadata = fs::metadata(p).expect("must succeed"); + assert!(metadata.is_dir()); + assert!(metadata.permissions().readonly()); + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Read a blob inside a directory. This ensures we successfully populate directory data. +async fn read_blob_inside_dir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep"); + + // peek at metadata. + let metadata = fs::metadata(&p).expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + + // read from it + let data = fs::read(&p).expect("must succeed"); + assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Read a blob inside a directory inside a directory. This ensures we properly +/// populate directories as we traverse down the structure. +async fn read_blob_deep_inside_dir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir + .path() + .join(DIRECTORY_COMPLICATED_NAME) + .join("keep") + .join(".keep"); + + // peek at metadata. + let metadata = fs::metadata(&p).expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + + // read from it + let data = fs::read(&p).expect("must succeed"); + assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure readdir works. +#[tokio::test] +async fn readdir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME); + + { + // read_dir should succeed. Collect all elements + let elements: Vec<_> = fs::read_dir(p) + .expect("must succeed") + .map(|e| e.expect("must not be err")) + .collect(); + + assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and .. + + // We explicitly look at specific positions here, because we always emit + // them ordered. + + // ".keep", 0 byte file. + let e = &elements[0]; + assert_eq!(".keep", e.file_name()); + assert!(e.file_type().expect("must succeed").is_file()); + assert_eq!(0, e.metadata().expect("must succeed").len()); + + // "aa", symlink. + let e = &elements[1]; + assert_eq!("aa", e.file_name()); + assert!(e.file_type().expect("must succeed").is_symlink()); + + // "keep", directory + let e = &elements[2]; + assert_eq!("keep", e.file_name()); + assert!(e.file_type().expect("must succeed").is_dir()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Do a readdir deeper inside a directory, without doing readdir or stat in the parent directory. +async fn readdir_deep() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); + + { + // read_dir should succeed. Collect all elements + let elements: Vec<_> = fs::read_dir(p) + .expect("must succeed") + .map(|e| e.expect("must not be err")) + .collect(); + + assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and .. + + // ".keep", 0 byte file. + let e = &elements[0]; + assert_eq!(".keep", e.file_name()); + assert!(e.file_type().expect("must succeed").is_file()); + assert_eq!(0, e.metadata().expect("must succeed").len()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Check attributes match how they show up in /nix/store normally. +#[tokio::test] +async fn check_attributes() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + populate_symlink(&blob_service, &directory_service, &path_info_service); + populate_helloworld_blob(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_file = tmpdir.path().join(BLOB_A_NAME); + let p_directory = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + let p_symlink = tmpdir.path().join(SYMLINK_NAME); + let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME); + + // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident. + let metadata_file = fs::symlink_metadata(&p_file).expect("must succeed"); + let metadata_executable_file = fs::symlink_metadata(&p_executable_file).expect("must succeed"); + let metadata_directory = fs::symlink_metadata(&p_directory).expect("must succeed"); + let metadata_symlink = fs::symlink_metadata(&p_symlink).expect("must succeed"); + + // modes should match. We & with 0o777 to remove any higher bits. + assert_eq!(0o444, metadata_file.mode() & 0o777); + assert_eq!(0o555, metadata_executable_file.mode() & 0o777); + assert_eq!(0o555, metadata_directory.mode() & 0o777); + assert_eq!(0o444, metadata_symlink.mode() & 0o777); + + // files should have the correct filesize + assert_eq!(fixtures::BLOB_A.len() as u64, metadata_file.len()); + // directories should have their "size" as filesize + assert_eq!( + fixtures::DIRECTORY_WITH_KEEP.size() as u64, + metadata_directory.size() + ); + + for metadata in &[&metadata_file, &metadata_directory, &metadata_symlink] { + // uid and gid should be 0. + assert_eq!(0, metadata.uid()); + assert_eq!(0, metadata.gid()); + + // all times should be set to the unix epoch. + assert_eq!(0, metadata.atime()); + assert_eq!(0, metadata.mtime()); + assert_eq!(0, metadata.ctime()); + // crtime seems MacOS only + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Ensure we allocate the same inodes for the same directory contents. +/// $DIRECTORY_COMPLICATED_NAME/keep contains the same data as $DIRECTORY_WITH_KEEP. +async fn compare_inodes_directories() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_dir_with_keep = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + let p_sibling_dir = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); + + // peek at metadata. + assert_eq!( + fs::metadata(p_dir_with_keep).expect("must succeed").ino(), + fs::metadata(p_sibling_dir).expect("must succeed").ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we allocate the same inodes for the same directory contents. +/// $DIRECTORY_COMPLICATED_NAME/keep/,keep contains the same data as $DIRECTORY_COMPLICATED_NAME/.keep +#[tokio::test] +async fn compare_inodes_files() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p_keep1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join(".keep"); + let p_keep2 = tmpdir + .path() + .join(DIRECTORY_COMPLICATED_NAME) + .join("keep") + .join(".keep"); + + // peek at metadata. + assert_eq!( + fs::metadata(p_keep1).expect("must succeed").ino(), + fs::metadata(p_keep2).expect("must succeed").ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we allocate the same inode for symlinks pointing to the same targets. +/// $DIRECTORY_COMPLICATED_NAME/aa points to the same target as SYMLINK_NAME2. +#[tokio::test] +async fn compare_inodes_symlinks() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; + populate_symlink2(&blob_service, &directory_service, &path_info_service); + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("aa"); + let p2 = tmpdir.path().join(SYMLINK_NAME2); + + // peek at metadata. + assert_eq!( + fs::symlink_metadata(p1).expect("must succeed").ino(), + fs::symlink_metadata(p2).expect("must succeed").ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Check we match paths exactly. +#[tokio::test] +async fn read_wrong_paths_in_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a(&blob_service, &directory_service, &path_info_service).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + // wrong name + assert!(!tmpdir + .path() + .join("00000000000000000000000000000000-tes") + .exists()); + + // invalid hash + assert!(!tmpdir + .path() + .join("0000000000000000000000000000000-test") + .exists()); + + // right name, must exist + assert!(tmpdir + .path() + .join("00000000000000000000000000000000-test") + .exists()); + + // now wrong name with right hash still may not exist + assert!(!tmpdir + .path() + .join("00000000000000000000000000000000-tes") + .exists()); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Make sure writes are not allowed +#[tokio::test] +async fn disallow_writes() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + let e = std::fs::File::create(p).expect_err("must fail"); + + assert_eq!(Some(libc::EROFS), e.raw_os_error()); + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Ensure we get an IO error if the directory service does not have the Directory object. +async fn missing_directory() { + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service); + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + + { + // `stat` on the path should succeed, because it doesn't trigger the directory request. + fs::metadata(&p).expect("must succeed"); + + // However, calling either `readdir` or `stat` on a child should fail with an IO error. + // It fails when trying to pull the first entry, because we don't implement opendir separately + fs::read_dir(&p) + .unwrap() + .next() + .expect("must be some") + .expect_err("must be err"); + + // rust currently sets e.kind() to Uncategorized, which isn't very + // helpful, so we don't look at the error more closely than that.. + fs::metadata(p.join(".keep")).expect_err("must fail"); + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Ensure we get an IO error if the blob service does not have the blob +async fn missing_blob() { + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service, path_info_service) = gen_svcs(); + populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service); + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + path_info_service, + tmpdir.path(), + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + { + // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service. + fs::metadata(&p).expect("must succeed"); + + // However, calling read on the blob should fail. + // rust currently sets e.kind() to Uncategorized, which isn't very + // helpful, so we don't look at the error more closely than that.. + fs::read(p).expect_err("must fail"); + } + + fuse_daemon.unmount().expect("unmount"); +} diff --git a/tvix/store/src/fuse/file_attr.rs b/tvix/store/src/fuse/file_attr.rs deleted file mode 100644 index b946aa977a0a..000000000000 --- a/tvix/store/src/fuse/file_attr.rs +++ /dev/null @@ -1,46 +0,0 @@ -use super::inodes::{DirectoryInodeData, InodeData}; -use fuse_backend_rs::abi::fuse_abi::Attr; - -/// The [Attr] describing the root -pub const ROOT_FILE_ATTR: Attr = Attr { - ino: fuse_backend_rs::api::filesystem::ROOT_ID, - size: 0, - blksize: 1024, - blocks: 0, - mode: libc::S_IFDIR | 0o555, - atime: 0, - mtime: 0, - ctime: 0, - atimensec: 0, - mtimensec: 0, - ctimensec: 0, - nlink: 0, - uid: 0, - gid: 0, - rdev: 0, - flags: 0, -}; - -/// for given &Node and inode, construct an [Attr] -pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> Attr { - Attr { - ino: inode, - // FUTUREWORK: play with this numbers, as it affects read sizes for client applications. - blocks: 1024, - size: match inode_data { - InodeData::Regular(_, size, _) => *size as u64, - InodeData::Symlink(target) => target.len() as u64, - InodeData::Directory(DirectoryInodeData::Sparse(_, size)) => *size as u64, - InodeData::Directory(DirectoryInodeData::Populated(_, ref children)) => { - children.len() as u64 - } - }, - mode: match inode_data { - InodeData::Regular(_, _, false) => libc::S_IFREG | 0o444, // no-executable files - InodeData::Regular(_, _, true) => libc::S_IFREG | 0o555, // executable files - InodeData::Symlink(_) => libc::S_IFLNK | 0o444, - InodeData::Directory(_) => libc::S_IFDIR | 0o555, - }, - ..Default::default() - } -} diff --git a/tvix/store/src/fuse/inode_tracker.rs b/tvix/store/src/fuse/inode_tracker.rs deleted file mode 100644 index 97a9744c31d7..000000000000 --- a/tvix/store/src/fuse/inode_tracker.rs +++ /dev/null @@ -1,455 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use crate::{proto, B3Digest}; - -use super::inodes::{DirectoryInodeData, InodeData}; - -/// InodeTracker keeps track of inodes, stores data being these inodes and deals -/// with inode allocation. -pub struct InodeTracker { - data: HashMap>, - - // lookup table for blobs by their B3Digest - blob_digest_to_inode: HashMap, - - // lookup table for symlinks by their target - symlink_target_to_inode: HashMap, - - // lookup table for directories by their B3Digest. - // Note the corresponding directory may not be present in data yet. - directory_digest_to_inode: HashMap, - - // the next inode to allocate - next_inode: u64, -} - -impl Default for InodeTracker { - fn default() -> Self { - Self { - data: Default::default(), - - blob_digest_to_inode: Default::default(), - symlink_target_to_inode: Default::default(), - directory_digest_to_inode: Default::default(), - - next_inode: 2, - } - } -} - -impl InodeTracker { - // Retrieves data for a given inode, if it exists. - pub fn get(&self, ino: u64) -> Option> { - self.data.get(&ino).cloned() - } - - // Stores data and returns the inode for it. - // In case an inode has already been allocated for the same data, that inode - // is returned, otherwise a new one is allocated. - // In case data is a [InodeData::Directory], inodes for all items are looked - // up - pub fn put(&mut self, data: InodeData) -> u64 { - match data { - InodeData::Regular(ref digest, _, _) => { - match self.blob_digest_to_inode.get(digest) { - Some(found_ino) => { - // We already have it, return the inode. - *found_ino - } - None => self.insert_and_increment(data), - } - } - InodeData::Symlink(ref target) => { - match self.symlink_target_to_inode.get(target) { - Some(found_ino) => { - // We already have it, return the inode. - *found_ino - } - None => self.insert_and_increment(data), - } - } - InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { - // check the lookup table if the B3Digest is known. - match self.directory_digest_to_inode.get(digest) { - Some(found_ino) => { - // We already have it, return the inode. - *found_ino - } - None => { - // insert and return the inode - self.insert_and_increment(data) - } - } - } - // Inserting [DirectoryInodeData::Populated] usually replaces an - // existing [DirectoryInodeData::Sparse] one. - InodeData::Directory(DirectoryInodeData::Populated(ref digest, ref children)) => { - let dir_ino = self.directory_digest_to_inode.get(digest); - if let Some(dir_ino) = dir_ino { - let dir_ino = *dir_ino; - - // We know the data must exist, as we found it in [directory_digest_to_inode]. - let needs_update = match **self.data.get(&dir_ino).unwrap() { - InodeData::Regular(..) | InodeData::Symlink(_) => { - panic!("unexpected type at inode {}", dir_ino); - } - // already populated, nothing to do - InodeData::Directory(DirectoryInodeData::Populated(..)) => false, - // in case the actual data is sparse, replace it with the populated one. - // this allocates inodes for new children in the process. - InodeData::Directory(DirectoryInodeData::Sparse( - ref old_digest, - ref _old_size, - )) => { - // sanity checking to ensure we update the right node - debug_assert_eq!(old_digest, digest); - - true - } - }; - - if needs_update { - // populate inode fields in children - let children = self.allocate_inodes_for_children(children.to_vec()); - - // update sparse data with populated data - self.data.insert( - dir_ino, - Arc::new(InodeData::Directory(DirectoryInodeData::Populated( - digest.clone(), - children, - ))), - ); - } - - dir_ino - } else { - // populate inode fields in children - let children = self.allocate_inodes_for_children(children.to_vec()); - // insert and return InodeData - self.insert_and_increment(InodeData::Directory(DirectoryInodeData::Populated( - digest.clone(), - children, - ))) - } - } - } - } - - // Consume a list of children with zeroed inodes, and allocate (or fetch existing) inodes. - fn allocate_inodes_for_children( - &mut self, - children: Vec<(u64, proto::node::Node)>, - ) -> Vec<(u64, proto::node::Node)> { - // allocate new inodes for all children - let mut children_new: Vec<(u64, proto::node::Node)> = Vec::new(); - - for (child_ino, ref child_node) in children { - debug_assert_eq!(0, child_ino, "expected child inode to be 0"); - let child_ino = match child_node { - proto::node::Node::Directory(directory_node) => { - // Try putting the sparse data in. If we already have a - // populated version, it'll not update it. - self.put(directory_node.into()) - } - proto::node::Node::File(file_node) => self.put(file_node.into()), - proto::node::Node::Symlink(symlink_node) => self.put(symlink_node.into()), - }; - - children_new.push((child_ino, child_node.clone())) - } - children_new - } - - // Inserts the data and returns the inode it was stored at, while - // incrementing next_inode. - fn insert_and_increment(&mut self, data: InodeData) -> u64 { - let ino = self.next_inode; - // insert into lookup tables - match data { - InodeData::Regular(ref digest, _, _) => { - self.blob_digest_to_inode.insert(digest.clone(), ino); - } - InodeData::Symlink(ref target) => { - self.symlink_target_to_inode.insert(target.clone(), ino); - } - InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { - self.directory_digest_to_inode.insert(digest.clone(), ino); - } - // This is currently not used outside test fixtures. - // Usually a [DirectoryInodeData::Sparse] is inserted and later - // "upgraded" with more data. - // However, as a future optimization, a lookup for a PathInfo could trigger a - // [DirectoryService::get_recursive()] request that "forks into - // background" and prepopulates all Directories in a closure. - InodeData::Directory(DirectoryInodeData::Populated(ref digest, _)) => { - self.directory_digest_to_inode.insert(digest.clone(), ino); - } - } - // Insert data - self.data.insert(ino, Arc::new(data)); - - // increment inode counter and return old inode. - self.next_inode += 1; - ino - } -} - -#[cfg(test)] -mod tests { - use crate::fuse::inodes::DirectoryInodeData; - use crate::proto; - use crate::tests::fixtures; - - use super::InodeData; - use super::InodeTracker; - - /// Getting something non-existent should be none - #[test] - fn get_nonexistent() { - let inode_tracker = InodeTracker::default(); - assert!(inode_tracker.get(1).is_none()); - } - - /// Put of a regular file should allocate a uid, which should be the same when inserting again. - #[test] - fn put_regular() { - let mut inode_tracker = InodeTracker::default(); - let f = InodeData::Regular( - fixtures::BLOB_A_DIGEST.clone(), - fixtures::BLOB_A.len() as u32, - false, - ); - - // put it in - let ino = inode_tracker.put(f.clone()); - - // a get should return the right data - let data = inode_tracker.get(ino).expect("must be some"); - match *data { - InodeData::Regular(ref digest, _, _) => { - assert_eq!(&fixtures::BLOB_A_DIGEST.clone(), digest); - } - InodeData::Symlink(_) | InodeData::Directory(..) => panic!("wrong type"), - } - - // another put should return the same ino - assert_eq!(ino, inode_tracker.put(f)); - - // inserting another file should return a different ino - assert_ne!( - ino, - inode_tracker.put(InodeData::Regular( - fixtures::BLOB_B_DIGEST.clone(), - fixtures::BLOB_B.len() as u32, - false, - )) - ); - } - - // Put of a symlink should allocate a uid, which should be the same when inserting again - #[test] - fn put_symlink() { - let mut inode_tracker = InodeTracker::default(); - let f = InodeData::Symlink("target".into()); - - // put it in - let ino = inode_tracker.put(f.clone()); - - // a get should return the right data - let data = inode_tracker.get(ino).expect("must be some"); - match *data { - InodeData::Symlink(ref target) => { - assert_eq!(b"target".to_vec(), *target); - } - InodeData::Regular(..) | InodeData::Directory(..) => panic!("wrong type"), - } - - // another put should return the same ino - assert_eq!(ino, inode_tracker.put(f)); - - // inserting another file should return a different ino - assert_ne!(ino, inode_tracker.put(InodeData::Symlink("target2".into()))); - } - - // TODO: put sparse directory - - /// Put a directory into the inode tracker, which refers to a file not seen yet. - #[test] - fn put_directory_leaf() { - let mut inode_tracker = InodeTracker::default(); - - // this is a directory with a single item, a ".keep" file pointing to a 0 bytes blob. - let dir: InodeData = fixtures::DIRECTORY_WITH_KEEP.clone().into(); - - // put it in - let dir_ino = inode_tracker.put(dir); - - // a get should return the right data - let data = inode_tracker.get(dir_ino).expect("must be some"); - match *data { - InodeData::Directory(super::DirectoryInodeData::Sparse(..)) => { - panic!("wrong type"); - } - InodeData::Directory(super::DirectoryInodeData::Populated( - ref directory_digest, - ref children, - )) => { - // ensure the directory digest matches - assert_eq!(&fixtures::DIRECTORY_WITH_KEEP.digest(), directory_digest); - - // ensure the child is populated, with a different inode than - // the parent, and the data matches expectations. - assert_eq!(1, children.len()); - let (child_ino, child_node) = children.first().unwrap(); - assert_ne!(dir_ino, *child_ino); - assert_eq!( - &proto::node::Node::File( - fixtures::DIRECTORY_WITH_KEEP.files.first().unwrap().clone() - ), - child_node - ); - - // ensure looking up that inode directly returns the data - let child_data = inode_tracker.get(*child_ino).expect("must exist"); - match *child_data { - InodeData::Regular(ref digest, size, executable) => { - assert_eq!(&fixtures::EMPTY_BLOB_DIGEST.clone(), digest); - assert_eq!(0, size); - assert!(!executable); - } - InodeData::Symlink(_) | InodeData::Directory(..) => panic!("wrong type"), - } - } - InodeData::Symlink(_) | InodeData::Regular(..) => panic!("wrong type"), - } - } - - /// Put a directory into the inode tracker, referring to files, directories - /// and symlinks not seen yet. - #[test] - fn put_directory_complicated() { - let mut inode_tracker = InodeTracker::default(); - - // this is a directory with a single item, a ".keep" file pointing to a 0 bytes blob. - let dir_complicated: InodeData = fixtures::DIRECTORY_COMPLICATED.clone().into(); - - // put it in - let dir_complicated_ino = inode_tracker.put(dir_complicated); - - // a get should return the right data - let dir_data = inode_tracker - .get(dir_complicated_ino) - .expect("must be some"); - - let child_dir_ino = match *dir_data { - InodeData::Directory(DirectoryInodeData::Sparse(..)) => { - panic!("wrong type"); - } - InodeData::Directory(DirectoryInodeData::Populated( - ref directory_digest, - ref children, - )) => { - // assert the directory digest matches - assert_eq!(&fixtures::DIRECTORY_COMPLICATED.digest(), directory_digest); - - // ensure there's three children, all with different inodes - assert_eq!(3, children.len()); - let mut seen_inodes = Vec::from([dir_complicated_ino]); - - // check the first child (.keep) - { - let (child_ino, child_node) = &children[0]; - assert!(!seen_inodes.contains(child_ino)); - assert_eq!( - &proto::node::Node::File(fixtures::DIRECTORY_COMPLICATED.files[0].clone()), - child_node - ); - seen_inodes.push(*child_ino); - } - - // check the second child (aa) - { - let (child_ino, child_node) = &children[1]; - assert!(!seen_inodes.contains(child_ino)); - assert_eq!( - &proto::node::Node::Symlink( - fixtures::DIRECTORY_COMPLICATED.symlinks[0].clone() - ), - child_node - ); - seen_inodes.push(*child_ino); - } - - // check the third child (keep) - { - let (child_ino, child_node) = &children[2]; - assert!(!seen_inodes.contains(child_ino)); - assert_eq!( - &proto::node::Node::Directory( - fixtures::DIRECTORY_COMPLICATED.directories[0].clone() - ), - child_node - ); - seen_inodes.push(*child_ino); - - // return the child_ino - *child_ino - } - } - InodeData::Regular(..) | InodeData::Symlink(_) => panic!("wrong type"), - }; - - // get of the inode for child_ino - let child_dir_data = inode_tracker.get(child_dir_ino).expect("must be some"); - // it should be a sparse InodeData::Directory with the right digest. - match *child_dir_data { - InodeData::Directory(DirectoryInodeData::Sparse( - ref child_dir_digest, - child_dir_size, - )) => { - assert_eq!(&fixtures::DIRECTORY_WITH_KEEP.digest(), child_dir_digest); - assert_eq!(fixtures::DIRECTORY_WITH_KEEP.size(), child_dir_size); - } - InodeData::Directory(DirectoryInodeData::Populated(..)) - | InodeData::Regular(..) - | InodeData::Symlink(_) => { - panic!("wrong type") - } - } - - // put DIRECTORY_WITH_KEEP, which should return the same ino as [child_dir_ino], - // but update the sparse object to a populated one at the same time. - let child_dir_ino2 = inode_tracker.put(fixtures::DIRECTORY_WITH_KEEP.clone().into()); - assert_eq!(child_dir_ino, child_dir_ino2); - - // get the data - match *inode_tracker.get(child_dir_ino).expect("must be some") { - // it should be a populated InodeData::Directory with the right digest! - InodeData::Directory(DirectoryInodeData::Populated( - ref directory_digest, - ref children, - )) => { - // ensure the directory digest matches - assert_eq!(&fixtures::DIRECTORY_WITH_KEEP.digest(), directory_digest); - - // ensure the child is populated, with a different inode than - // the parent, and the data matches expectations. - assert_eq!(1, children.len()); - let (child_node_inode, child_node) = children.first().unwrap(); - assert_ne!(dir_complicated_ino, *child_node_inode); - assert_eq!( - &proto::node::Node::File( - fixtures::DIRECTORY_WITH_KEEP.files.first().unwrap().clone() - ), - child_node - ); - } - InodeData::Directory(DirectoryInodeData::Sparse(..)) - | InodeData::Regular(..) - | InodeData::Symlink(_) => panic!("wrong type"), - } - } -} - -// TODO: add test inserting a populated one first, then ensure an update doesn't degrade it back to sparse. diff --git a/tvix/store/src/fuse/inodes.rs b/tvix/store/src/fuse/inodes.rs deleted file mode 100644 index e8959ce3629b..000000000000 --- a/tvix/store/src/fuse/inodes.rs +++ /dev/null @@ -1,68 +0,0 @@ -//! This module contains all the data structures used to track information -//! about inodes, which present tvix-store nodes in a filesystem. -use crate::{proto, B3Digest}; - -#[derive(Clone, Debug)] -pub enum InodeData { - Regular(B3Digest, u32, bool), // digest, size, executable - Symlink(bytes::Bytes), // target - Directory(DirectoryInodeData), // either [DirectoryInodeData:Sparse] or [DirectoryInodeData:Populated] -} - -/// This encodes the two different states of [InodeData::Directory]. -/// Either the data still is sparse (we only saw a [proto::DirectoryNode], but -/// didn't fetch the [proto::Directory] struct yet, -/// or we processed a lookup and did fetch the data. -#[derive(Clone, Debug)] -pub enum DirectoryInodeData { - Sparse(B3Digest, u32), // digest, size - Populated(B3Digest, Vec<(u64, proto::node::Node)>), // [(child_inode, node)] -} - -impl From<&proto::node::Node> for InodeData { - fn from(value: &proto::node::Node) -> Self { - match value { - proto::node::Node::Directory(directory_node) => directory_node.into(), - proto::node::Node::File(file_node) => file_node.into(), - proto::node::Node::Symlink(symlink_node) => symlink_node.into(), - } - } -} - -impl From<&proto::SymlinkNode> for InodeData { - fn from(value: &proto::SymlinkNode) -> Self { - InodeData::Symlink(value.target.clone()) - } -} - -impl From<&proto::FileNode> for InodeData { - fn from(value: &proto::FileNode) -> Self { - InodeData::Regular( - value.digest.clone().try_into().unwrap(), - value.size, - value.executable, - ) - } -} - -/// Converts a DirectoryNode to a sparsely populated InodeData::Directory. -impl From<&proto::DirectoryNode> for InodeData { - fn from(value: &proto::DirectoryNode) -> Self { - InodeData::Directory(DirectoryInodeData::Sparse( - value.digest.clone().try_into().unwrap(), - value.size, - )) - } -} - -/// converts a proto::Directory to a InodeData::Directory(DirectoryInodeData::Populated(..)). -/// The inodes for each child are 0, because it's up to the InodeTracker to allocate them. -impl From for InodeData { - fn from(value: proto::Directory) -> Self { - let digest = value.digest(); - - let children: Vec<(u64, proto::node::Node)> = value.nodes().map(|node| (0, node)).collect(); - - InodeData::Directory(DirectoryInodeData::Populated(digest, children)) - } -} diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fuse/mod.rs deleted file mode 100644 index 1a5d884ef7a9..000000000000 --- a/tvix/store/src/fuse/mod.rs +++ /dev/null @@ -1,771 +0,0 @@ -mod file_attr; -mod inode_tracker; -mod inodes; - -#[cfg(test)] -mod tests; - -use crate::{ - blobservice::{BlobReader, BlobService}, - directoryservice::DirectoryService, - fuse::inodes::{DirectoryInodeData, InodeData}, - pathinfoservice::PathInfoService, - proto::{node::Node, NamedNode}, - B3Digest, Error, -}; -use fuse_backend_rs::{ - api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}, - transport::FuseSession, -}; -use nix_compat::store_path::StorePath; -use parking_lot::RwLock; -use std::{ - collections::HashMap, - io, - path::Path, - str::FromStr, - sync::atomic::AtomicU64, - sync::{atomic::Ordering, Arc}, - thread, - time::Duration, -}; -use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; -use tracing::{debug, error, info_span, warn}; - -use self::{ - file_attr::{gen_file_attr, ROOT_FILE_ATTR}, - inode_tracker::InodeTracker, -}; - -/// This implements a read-only FUSE filesystem for a tvix-store -/// with the passed [BlobService], [DirectoryService] and [PathInfoService]. -/// -/// We don't allow listing on the root mountpoint (inode 0). -/// In the future, this might be made configurable once a listing method is -/// added to [self.path_info_service], and then show all store paths in that -/// store. -/// -/// Linux uses inodes in filesystems. When implementing FUSE, most calls are -/// *for* a given inode. -/// -/// This means, we need to have a stable mapping of inode numbers to the -/// corresponding store nodes. -/// -/// We internally delegate all inode allocation and state keeping to the -/// inode tracker, and store the currently "explored" store paths together with -/// root inode of the root. -/// -/// There's some places where inodes are allocated / data inserted into -/// the inode tracker, if not allocated before already: -/// - Processing a `lookup` request, either in the mount root, or somewhere -/// deeper -/// - Processing a `readdir` request -/// -/// Things pointing to the same contents get the same inodes, irrespective of -/// their own location. -/// This means: -/// - Symlinks with the same target will get the same inode. -/// - Regular/executable files with the same contents will get the same inode -/// - Directories with the same contents will get the same inode. -/// -/// Due to the above being valid across the whole store, and considering the -/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed -/// allocation", aka reserve Directory.size inodes for each PathInfo. -pub struct FUSE { - blob_service: Arc, - directory_service: Arc, - path_info_service: Arc, - - /// Whether to (try) listing elements in the root. - list_root: bool, - - /// This maps a given StorePath to the inode we allocated for the root inode. - store_paths: RwLock>, - - /// This keeps track of inodes and data alongside them. - inode_tracker: RwLock, - - /// This holds all open file handles - file_handles: RwLock>>>>, - - next_file_handle: AtomicU64, - - tokio_handle: tokio::runtime::Handle, -} - -impl FUSE { - pub fn new( - blob_service: Arc, - directory_service: Arc, - path_info_service: Arc, - list_root: bool, - ) -> Self { - Self { - blob_service, - directory_service, - path_info_service, - - list_root, - - store_paths: RwLock::new(HashMap::default()), - inode_tracker: RwLock::new(Default::default()), - - file_handles: RwLock::new(Default::default()), - next_file_handle: AtomicU64::new(1), - tokio_handle: tokio::runtime::Handle::current(), - } - } - - /// This will turn a lookup request for [std::ffi::OsStr] in the root to - /// a ino and [InodeData]. - /// It will peek in [self.store_paths], and then either look it up from - /// [self.inode_tracker], - /// or otherwise fetch from [self.path_info_service], and then insert into - /// [self.inode_tracker]. - fn name_in_root_to_ino_and_data( - &self, - name: &std::ffi::CStr, - ) -> Result)>, Error> { - // parse the name into a [StorePath]. - let store_path = if let Some(name) = name.to_str().ok() { - match StorePath::from_str(name) { - Ok(store_path) => store_path, - Err(e) => { - debug!(e=?e, "unable to parse as store path"); - // This is not an error, but a "ENOENT", as someone can stat - // a file inside the root that's no valid store path - return Ok(None); - } - } - } else { - debug!("{name:?} is not a valid utf-8 string"); - // same here. - return Ok(None); - }; - - let ino = { - // This extra scope makes sure we drop the read lock - // immediately after reading, to prevent deadlocks. - let store_paths = self.store_paths.read(); - store_paths.get(&store_path).cloned() - }; - - if let Some(ino) = ino { - // If we already have that store path, lookup the inode from - // self.store_paths and then get the data from [self.inode_tracker], - // which in the case of a [InodeData::Directory] will be fully - // populated. - Ok(Some(( - ino, - self.inode_tracker.read().get(ino).expect("must exist"), - ))) - } else { - // If we don't have it, look it up in PathInfoService. - match self.path_info_service.get(store_path.digest)? { - // the pathinfo doesn't exist, so the file doesn't exist. - None => Ok(None), - Some(path_info) => { - // The pathinfo does exist, so there must be a root node - let root_node = path_info.node.unwrap().node.unwrap(); - - // The name must match what's passed in the lookup, otherwise we return nothing. - if root_node.get_name() != store_path.to_string().as_bytes() { - return Ok(None); - } - - // Let's check if someone else beat us to updating the inode tracker and - // store_paths map. - let mut store_paths = self.store_paths.write(); - if let Some(ino) = store_paths.get(&store_path).cloned() { - return Ok(Some(( - ino, - self.inode_tracker.read().get(ino).expect("must exist"), - ))); - } - - // insert the (sparse) inode data and register in - // self.store_paths. - // FUTUREWORK: change put to return the data after - // inserting, so we don't need to lookup a second - // time? - let (ino, inode) = { - let mut inode_tracker = self.inode_tracker.write(); - let ino = inode_tracker.put((&root_node).into()); - (ino, inode_tracker.get(ino).unwrap()) - }; - store_paths.insert(store_path, ino); - - Ok(Some((ino, inode))) - } - } - } - } - - /// This will lookup a directory by digest, and will turn it into a - /// [InodeData::Directory(DirectoryInodeData::Populated(..))]. - /// This is both used to initially insert the root node of a store path, - /// as well as when looking up an intermediate DirectoryNode. - fn fetch_directory_inode_data(&self, directory_digest: &B3Digest) -> Result { - match self.directory_service.get(directory_digest) { - Err(e) => { - warn!(e = e.to_string(), directory.digest=%directory_digest, "failed to get directory"); - Err(e) - } - // If the Directory can't be found, this is a hole, bail out. - Ok(None) => { - tracing::error!(directory.digest=%directory_digest, "directory not found in directory service"); - Err(Error::StorageError(format!( - "directory {} not found", - directory_digest - ))) - } - Ok(Some(directory)) => Ok(directory.into()), - } - } -} - -impl FileSystem for FUSE { - type Inode = u64; - type Handle = u64; - - fn init(&self, _capable: FsOptions) -> io::Result { - Ok(FsOptions::empty()) - } - - #[tracing::instrument(skip_all, fields(rq.inode = inode))] - fn getattr( - &self, - _ctx: &Context, - inode: Self::Inode, - _handle: Option, - ) -> io::Result<(libc::stat64, Duration)> { - if inode == ROOT_ID { - return Ok((ROOT_FILE_ATTR.into(), Duration::MAX)); - } - - match self.inode_tracker.read().get(inode) { - None => return Err(io::Error::from_raw_os_error(libc::ENOENT)), - Some(node) => { - debug!(node = ?node, "found node"); - Ok((gen_file_attr(&node, inode).into(), Duration::MAX)) - } - } - } - - #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))] - fn lookup( - &self, - _ctx: &Context, - parent: Self::Inode, - name: &std::ffi::CStr, - ) -> io::Result { - debug!("lookup"); - - // This goes from a parent inode to a node. - // - If the parent is [ROOT_ID], we need to check - // [self.store_paths] (fetching from PathInfoService if needed) - // - Otherwise, lookup the parent in [self.inode_tracker] (which must be - // a [InodeData::Directory]), and find the child with that name. - if parent == ROOT_ID { - return match self.name_in_root_to_ino_and_data(name) { - Err(e) => { - warn!("{}", e); - Err(io::Error::from_raw_os_error(libc::ENOENT)) - } - Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), - Ok(Some((ino, inode_data))) => { - debug!(inode_data=?&inode_data, ino=ino, "Some"); - Ok(fuse_backend_rs::api::filesystem::Entry { - inode: ino, - attr: gen_file_attr(&inode_data, ino).into(), - attr_timeout: Duration::MAX, - entry_timeout: Duration::MAX, - ..Default::default() - }) - } - }; - } - - // This is the "lookup for "a" inside inode 42. - // We already know that inode 42 must be a directory. - // It might not be populated yet, so if it isn't, we do (by - // fetching from [self.directory_service]), and save the result in - // [self.inode_tracker]. - // Now it for sure is populated, so we search for that name in the - // list of children and return the FileAttrs. - - // TODO: Reduce the critical section of this write lock. - let mut inode_tracker = self.inode_tracker.write(); - let parent_data = inode_tracker.get(parent).unwrap(); - let parent_data = match *parent_data { - InodeData::Regular(..) | InodeData::Symlink(_) => { - // if the parent inode was not a directory, this doesn't make sense - return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); - } - InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { - match self.fetch_directory_inode_data(parent_digest) { - Ok(new_data) => { - // update data in [self.inode_tracker] with populated variant. - // FUTUREWORK: change put to return the data after - // inserting, so we don't need to lookup a second - // time? - let ino = inode_tracker.put(new_data); - inode_tracker.get(ino).unwrap() - } - Err(_e) => { - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - } - } - InodeData::Directory(DirectoryInodeData::Populated(..)) => parent_data, - }; - - // now parent_data can only be a [InodeData::Directory(DirectoryInodeData::Populated(..))]. - let (parent_digest, children) = if let InodeData::Directory( - DirectoryInodeData::Populated(ref parent_digest, ref children), - ) = *parent_data - { - (parent_digest, children) - } else { - panic!("unexpected type") - }; - let span = info_span!("lookup", directory.digest = %parent_digest); - let _enter = span.enter(); - - // in the children, find the one with the desired name. - if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { - // lookup the child [InodeData] in [self.inode_tracker]. - // We know the inodes for children have already been allocated. - let child_inode_data = inode_tracker.get(*child_ino).unwrap(); - - // Reply with the file attributes for the child. - // For child directories, we still have all data we need to reply. - Ok(fuse_backend_rs::api::filesystem::Entry { - inode: *child_ino, - attr: gen_file_attr(&child_inode_data, *child_ino).into(), - attr_timeout: Duration::MAX, - entry_timeout: Duration::MAX, - ..Default::default() - }) - } else { - // Child not found, return ENOENT. - Err(io::Error::from_raw_os_error(libc::ENOENT)) - } - } - - // TODO: readdirplus? - - #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))] - fn readdir( - &self, - _ctx: &Context, - inode: Self::Inode, - _handle: Self::Handle, - _size: u32, - offset: u64, - add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result, - ) -> io::Result<()> { - debug!("readdir"); - - if inode == ROOT_ID { - if !self.list_root { - return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo - } else { - for (i, path_info) in self - .path_info_service - .list() - .skip(offset as usize) - .enumerate() - { - let path_info = match path_info { - Err(e) => { - warn!("failed to retrieve pathinfo: {}", e); - return Err(io::Error::from_raw_os_error(libc::EPERM)); - } - Ok(path_info) => path_info, - }; - - // We know the root node exists and the store_path can be parsed because clients MUST validate. - let root_node = path_info.node.unwrap().node.unwrap(); - let store_path = StorePath::from_bytes(root_node.get_name()).unwrap(); - - let ino = { - // This extra scope makes sure we drop the read lock - // immediately after reading, to prevent deadlocks. - let store_paths = self.store_paths.read(); - store_paths.get(&store_path).cloned() - }; - let ino = match ino { - Some(ino) => ino, - None => { - // insert the (sparse) inode data and register in - // self.store_paths. - let ino = self.inode_tracker.write().put((&root_node).into()); - self.store_paths.write().insert(store_path.clone(), ino); - ino - } - }; - - let ty = match root_node { - Node::Directory(_) => libc::S_IFDIR, - Node::File(_) => libc::S_IFREG, - Node::Symlink(_) => libc::S_IFLNK, - }; - - let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { - ino, - offset: offset + i as u64 + 1, - type_: ty, - name: store_path.to_string().as_bytes(), - })?; - // If the buffer is full, add_entry will return `Ok(0)`. - if written == 0 { - break; - } - } - return Ok(()); - } - } - - // lookup the inode data. - let mut inode_tracker = self.inode_tracker.write(); - let dir_inode_data = inode_tracker.get(inode).unwrap(); - let dir_inode_data = match *dir_inode_data { - InodeData::Regular(..) | InodeData::Symlink(..) => { - warn!("Not a directory"); - return Err(io::Error::from_raw_os_error(libc::ENOTDIR)); - } - InodeData::Directory(DirectoryInodeData::Sparse(ref directory_digest, _)) => { - match self.fetch_directory_inode_data(directory_digest) { - Ok(new_data) => { - // update data in [self.inode_tracker] with populated variant. - // FUTUREWORK: change put to return the data after - // inserting, so we don't need to lookup a second - // time? - let ino = inode_tracker.put(new_data.clone()); - inode_tracker.get(ino).unwrap() - } - Err(_e) => { - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - } - } - InodeData::Directory(DirectoryInodeData::Populated(..)) => dir_inode_data, - }; - - // now parent_data can only be InodeData::Directory(DirectoryInodeData::Populated(..)) - if let InodeData::Directory(DirectoryInodeData::Populated(ref _digest, ref children)) = - *dir_inode_data - { - for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() { - // the second parameter will become the "offset" parameter on the next call. - let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { - ino: *ino, - offset: offset + i as u64 + 1, - type_: match child_node { - Node::Directory(_) => libc::S_IFDIR, - Node::File(_) => libc::S_IFREG, - Node::Symlink(_) => libc::S_IFLNK, - }, - name: child_node.get_name(), - })?; - // If the buffer is full, add_entry will return `Ok(0)`. - if written == 0 { - break; - } - } - } else { - panic!("unexpected type") - } - - Ok(()) - } - - #[tracing::instrument(skip_all, fields(rq.inode = inode))] - fn open( - &self, - _ctx: &Context, - inode: Self::Inode, - _flags: u32, - _fuse_flags: u32, - ) -> io::Result<( - Option, - fuse_backend_rs::api::filesystem::OpenOptions, - )> { - if inode == ROOT_ID { - return Err(io::Error::from_raw_os_error(libc::ENOSYS)); - } - - // lookup the inode - match *self.inode_tracker.read().get(inode).unwrap() { - // read is invalid on non-files. - InodeData::Directory(..) | InodeData::Symlink(_) => { - warn!("is directory"); - return Err(io::Error::from_raw_os_error(libc::EISDIR)); - } - InodeData::Regular(ref blob_digest, _blob_size, _) => { - let span = info_span!("read", blob.digest = %blob_digest); - let _enter = span.enter(); - - let blob_service = self.blob_service.clone(); - let blob_digest = blob_digest.clone(); - - let task = self - .tokio_handle - .spawn(async move { blob_service.open_read(&blob_digest).await }); - - let blob_reader = self.tokio_handle.block_on(task).unwrap(); - - match blob_reader { - Ok(None) => { - warn!("blob not found"); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - Err(e) => { - warn!(e=?e, "error opening blob"); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - Ok(Some(blob_reader)) => { - // get a new file handle - // TODO: this will overflow after 2**64 operations, - // which is fine for now. - // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 - // for the discussion on alternatives. - let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst); - - debug!("add file handle {}", fh); - self.file_handles - .write() - .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader))); - - Ok(( - Some(fh), - fuse_backend_rs::api::filesystem::OpenOptions::empty(), - )) - } - } - } - } - } - - #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))] - fn release( - &self, - _ctx: &Context, - inode: Self::Inode, - _flags: u32, - handle: Self::Handle, - _flush: bool, - _flock_release: bool, - _lock_owner: Option, - ) -> io::Result<()> { - // remove and get ownership on the blob reader - match self.file_handles.write().remove(&handle) { - // drop it, which will close it. - Some(blob_reader) => drop(blob_reader), - None => { - // These might already be dropped if a read error occured. - debug!("file_handle {} not found", handle); - } - } - - Ok(()) - } - - #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))] - fn read( - &self, - _ctx: &Context, - inode: Self::Inode, - handle: Self::Handle, - w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter, - size: u32, - offset: u64, - _lock_owner: Option, - _flags: u32, - ) -> io::Result { - debug!("read"); - - // We need to take out the blob reader from self.file_handles, so we can - // interact with it in the separate task. - // On success, we pass it back out of the task, so we can put it back in self.file_handles. - let blob_reader = match self.file_handles.read().get(&handle) { - Some(blob_reader) => blob_reader.clone(), - None => { - warn!("file handle {} unknown", handle); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - }; - - let task = self.tokio_handle.spawn(async move { - let mut blob_reader = blob_reader.lock().await; - - // seek to the offset specified, which is relative to the start of the file. - let resp = blob_reader.seek(io::SeekFrom::Start(offset as u64)).await; - - match resp { - Ok(pos) => { - debug_assert_eq!(offset as u64, pos); - } - Err(e) => { - warn!("failed to seek to offset {}: {}", offset, e); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - } - - // As written in the fuse docs, read should send exactly the number - // of bytes requested except on EOF or error. - - let mut buf: Vec = Vec::with_capacity(size as usize); - - while (buf.len() as u64) < size as u64 { - let int_buf = blob_reader.fill_buf().await?; - // copy things from the internal buffer into buf to fill it till up until size - - // an empty buffer signals we reached EOF. - if int_buf.is_empty() { - break; - } - - // calculate how many bytes we can read from int_buf. - // It's either all of int_buf, or the number of bytes missing in buf to reach size. - let len_to_copy = std::cmp::min(int_buf.len(), size as usize - buf.len()); - - // copy these bytes into our buffer - buf.extend_from_slice(&int_buf[..len_to_copy]); - // and consume them in the buffered reader. - blob_reader.consume(len_to_copy); - } - - Ok(buf) - }); - - let buf = self.tokio_handle.block_on(task).unwrap()?; - - w.write(&buf) - } - - #[tracing::instrument(skip_all, fields(rq.inode = inode))] - fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result> { - if inode == ROOT_ID { - return Err(io::Error::from_raw_os_error(libc::ENOSYS)); - } - - // lookup the inode - match *self.inode_tracker.read().get(inode).unwrap() { - InodeData::Directory(..) | InodeData::Regular(..) => { - Err(io::Error::from_raw_os_error(libc::EINVAL)) - } - InodeData::Symlink(ref target) => Ok(target.to_vec()), - } - } -} - -struct FuseServer -where - FS: FileSystem + Sync + Send, -{ - server: Arc>>, - channel: fuse_backend_rs::transport::FuseChannel, -} - -impl FuseServer -where - FS: FileSystem + Sync + Send, -{ - fn start(&mut self) -> io::Result<()> { - loop { - if let Some((reader, writer)) = self - .channel - .get_request() - .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? - { - if let Err(e) = self - .server - .handle_message(reader, writer.into(), None, None) - { - match e { - // This indicates the session has been shut down. - fuse_backend_rs::Error::EncodeMessage(e) - if e.raw_os_error() == Some(libc::EBADFD) => - { - break; - } - error => { - error!(?error, "failed to handle fuse request"); - continue; - } - } - } - } else { - break; - } - } - Ok(()) - } -} - -pub struct FuseDaemon { - session: FuseSession, - threads: Vec>, -} - -impl FuseDaemon { - pub fn new(fs: FS, mountpoint: P, threads: usize) -> Result - where - FS: FileSystem + Sync + Send + 'static, - P: AsRef, - { - let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); - - let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - - session.set_allow_other(false); - session - .mount() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - let mut join_handles = Vec::with_capacity(threads); - for _ in 0..threads { - let mut server = FuseServer { - server: server.clone(), - channel: session - .new_channel() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, - }; - let join_handle = thread::Builder::new() - .name("fuse_server".to_string()) - .spawn(move || { - let _ = server.start(); - })?; - join_handles.push(join_handle); - } - - Ok(FuseDaemon { - session, - threads: join_handles, - }) - } - - pub fn unmount(&mut self) -> Result<(), io::Error> { - self.session - .umount() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - - for thread in self.threads.drain(..) { - thread.join().map_err(|_| { - io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") - })?; - } - - Ok(()) - } -} - -impl Drop for FuseDaemon { - fn drop(&mut self) { - if let Err(error) = self.unmount() { - error!(?error, "failed to unmont fuse filesystem") - } - } -} diff --git a/tvix/store/src/fuse/tests.rs b/tvix/store/src/fuse/tests.rs deleted file mode 100644 index 81de8b13de58..000000000000 --- a/tvix/store/src/fuse/tests.rs +++ /dev/null @@ -1,1113 +0,0 @@ -use std::io::Cursor; -use std::os::unix::prelude::MetadataExt; -use std::path::Path; -use std::sync::Arc; -use std::{fs, io}; - -use tempfile::TempDir; - -use crate::blobservice::BlobService; -use crate::directoryservice::DirectoryService; -use crate::pathinfoservice::PathInfoService; -use crate::proto::{DirectoryNode, FileNode, PathInfo}; -use crate::tests::fixtures; -use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; -use crate::{proto, FuseDaemon, FUSE}; - -const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; -const BLOB_B_NAME: &str = "55555555555555555555555555555555-test"; -const HELLOWORLD_BLOB_NAME: &str = "66666666666666666666666666666666-test"; -const SYMLINK_NAME: &str = "11111111111111111111111111111111-test"; -const SYMLINK_NAME2: &str = "44444444444444444444444444444444-test"; -const DIRECTORY_WITH_KEEP_NAME: &str = "22222222222222222222222222222222-test"; -const DIRECTORY_COMPLICATED_NAME: &str = "33333333333333333333333333333333-test"; - -fn gen_svcs() -> ( - Arc, - Arc, - Arc, -) { - let blob_service = gen_blob_service(); - let directory_service = gen_directory_service(); - let path_info_service = gen_pathinfo_service(blob_service.clone(), directory_service.clone()); - - (blob_service, directory_service, path_info_service) -} - -fn do_mount>( - blob_service: Arc, - directory_service: Arc, - path_info_service: Arc, - mountpoint: P, - list_root: bool, -) -> io::Result { - let fs = FUSE::new( - blob_service, - directory_service, - path_info_service, - list_root, - ); - FuseDaemon::new(fs, mountpoint.as_ref(), 4) -} - -async fn populate_blob_a( - blob_service: &Arc, - _directory_service: &Arc, - path_info_service: &Arc, -) { - // Upload BLOB_A - let mut bw = blob_service.open_write().await; - tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) - .await - .expect("must succeed uploading"); - bw.close().await.expect("must succeed closing"); - - // Create a PathInfo for it - let path_info = PathInfo { - node: Some(proto::Node { - node: Some(proto::node::Node::File(FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), - size: fixtures::BLOB_A.len() as u32, - executable: false, - })), - }), - ..Default::default() - }; - path_info_service.put(path_info).expect("must succeed"); -} - -async fn populate_blob_b( - blob_service: &Arc, - _directory_service: &Arc, - path_info_service: &Arc, -) { - // Upload BLOB_B - let mut bw = blob_service.open_write().await; - tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) - .await - .expect("must succeed uploading"); - bw.close().await.expect("must succeed closing"); - - // Create a PathInfo for it - let path_info = PathInfo { - node: Some(proto::Node { - node: Some(proto::node::Node::File(FileNode { - name: BLOB_B_NAME.into(), - digest: fixtures::BLOB_B_DIGEST.clone().into(), - size: fixtures::BLOB_B.len() as u32, - executable: false, - })), - }), - ..Default::default() - }; - path_info_service.put(path_info).expect("must succeed"); -} - -/// adds a blob containing helloworld and marks it as executable -async fn populate_helloworld_blob( - blob_service: &Arc, - _directory_service: &Arc, - path_info_service: &Arc, -) { - // Upload BLOB_B - let mut bw = blob_service.open_write().await; - tokio::io::copy( - &mut Cursor::new(fixtures::HELLOWORLD_BLOB_CONTENTS.to_vec()), - &mut bw, - ) - .await - .expect("must succeed uploading"); - bw.close().await.expect("must succeed closing"); - - // Create a PathInfo for it - let path_info = PathInfo { - node: Some(proto::Node { - node: Some(proto::node::Node::File(FileNode { - name: HELLOWORLD_BLOB_NAME.into(), - digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), - size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u32, - executable: true, - })), - }), - ..Default::default() - }; - path_info_service.put(path_info).expect("must succeed"); -} - -fn populate_symlink( - _blob_service: &Arc, - _directory_service: &Arc, - path_info_service: &Arc, -) { - // Create a PathInfo for it - let path_info = PathInfo { - node: Some(proto::Node { - node: Some(proto::node::Node::Symlink(proto::SymlinkNode { - name: SYMLINK_NAME.into(), - target: BLOB_A_NAME.into(), - })), - }), - ..Default::default() - }; - path_info_service.put(path_info).expect("must succeed"); -} - -/// This writes a symlink pointing to /nix/store/somewhereelse, -/// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. -fn populate_symlink2( - _blob_service: &Arc, - _directory_service: &Arc, - path_info_service: &Arc, -) { - // Create a PathInfo for it - let path_info = PathInfo { - node: Some(proto::Node { - node: Some(proto::node::Node::Symlink(proto::SymlinkNode { - name: SYMLINK_NAME2.into(), - target: "/nix/store/somewhereelse".into(), - })), - }), - ..Default::default() - }; - path_info_service.put(path_info).expect("must succeed"); -} - -async fn populate_directory_with_keep( - blob_service: &Arc, - directory_service: &Arc, - path_info_service: &Arc, -) { - // upload empty blob - let mut bw = blob_service.open_write().await; - assert_eq!( - fixtures::EMPTY_BLOB_DIGEST.to_vec(), - bw.close().await.expect("must succeed closing").to_vec(), - ); - - // upload directory - directory_service - .put(fixtures::DIRECTORY_WITH_KEEP.clone()) - .expect("must succeed uploading"); - - // upload pathinfo - let path_info = PathInfo { - node: Some(proto::Node { - node: Some(proto::node::Node::Directory(DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), - size: fixtures::DIRECTORY_WITH_KEEP.size(), - })), - }), - ..Default::default() - }; - path_info_service.put(path_info).expect("must succeed"); -} - -/// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory -/// itself. -fn populate_pathinfo_without_directory( - _: &Arc, - _: &Arc, - path_info_service: &Arc, -) { - // upload pathinfo - let path_info = PathInfo { - node: Some(proto::Node { - node: Some(proto::node::Node::Directory(DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), - size: fixtures::DIRECTORY_WITH_KEEP.size(), - })), - }), - ..Default::default() - }; - path_info_service.put(path_info).expect("must succeed"); -} - -/// Insert , but don't provide the blob .keep is pointing to -fn populate_blob_a_without_blob( - _: &Arc, - _: &Arc, - path_info_service: &Arc, -) { - // Create a PathInfo for blob A - let path_info = PathInfo { - node: Some(proto::Node { - node: Some(proto::node::Node::File(FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), - size: fixtures::BLOB_A.len() as u32, - executable: false, - })), - }), - ..Default::default() - }; - path_info_service.put(path_info).expect("must succeed"); -} - -async fn populate_directory_complicated( - blob_service: &Arc, - directory_service: &Arc, - path_info_service: &Arc, -) { - // upload empty blob - let mut bw = blob_service.open_write().await; - assert_eq!( - fixtures::EMPTY_BLOB_DIGEST.to_vec(), - bw.close().await.expect("must succeed closing").to_vec(), - ); - - // upload inner directory - directory_service - .put(fixtures::DIRECTORY_WITH_KEEP.clone()) - .expect("must succeed uploading"); - - // uplodad parent directory - directory_service - .put(fixtures::DIRECTORY_COMPLICATED.clone()) - .expect("must succeed uploading"); - - // upload pathinfo - let path_info = PathInfo { - node: Some(proto::Node { - node: Some(proto::node::Node::Directory(DirectoryNode { - name: DIRECTORY_COMPLICATED_NAME.into(), - digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), - size: fixtures::DIRECTORY_COMPLICATED.size(), - })), - }), - ..Default::default() - }; - path_info_service.put(path_info).expect("must succeed"); -} - -/// Ensure mounting itself doesn't fail -#[tokio::test] -async fn mount() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure listing the root isn't allowed -#[tokio::test] -async fn root() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - { - // read_dir succeeds, but getting the first element will fail. - let mut it = fs::read_dir(tmpdir).expect("must succeed"); - - let err = it.next().expect("must be some").expect_err("must be err"); - assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); - } - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure listing the root is allowed if configured explicitly -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn root_with_listing() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - true, /* allow listing */ - ) - .expect("must succeed"); - - { - // read_dir succeeds, but getting the first element will fail. - let mut it = fs::read_dir(tmpdir).expect("must succeed"); - - let e = it.next().expect("must be some").expect("must succeed"); - - let metadata = e.metadata().expect("must succeed"); - assert!(metadata.is_file()); - assert!(metadata.permissions().readonly()); - assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); - } - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure we can stat a file at the root -#[tokio::test] -async fn stat_file_at_root() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(BLOB_A_NAME); - - // peek at the file metadata - let metadata = fs::metadata(p).expect("must succeed"); - - assert!(metadata.is_file()); - assert!(metadata.permissions().readonly()); - assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure we can read a file at the root -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn read_file_at_root() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(BLOB_A_NAME); - - // read the file contents - let data = fs::read(p).expect("must succeed"); - - // ensure size and contents match - assert_eq!(fixtures::BLOB_A.len(), data.len()); - assert_eq!(fixtures::BLOB_A.to_vec(), data); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure we can read a large file at the root -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn read_large_file_at_root() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_b(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(BLOB_B_NAME); - { - // peek at the file metadata - let metadata = fs::metadata(&p).expect("must succeed"); - - assert!(metadata.is_file()); - assert!(metadata.permissions().readonly()); - assert_eq!(fixtures::BLOB_B.len() as u64, metadata.len()); - } - - // read the file contents - let data = fs::read(p).expect("must succeed"); - - // ensure size and contents match - assert_eq!(fixtures::BLOB_B.len(), data.len()); - assert_eq!(fixtures::BLOB_B.to_vec(), data); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Read the target of a symlink -#[tokio::test] -async fn symlink_readlink() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_symlink(&blob_service, &directory_service, &path_info_service); - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(SYMLINK_NAME); - - let target = fs::read_link(&p).expect("must succeed"); - assert_eq!(BLOB_A_NAME, target.to_str().unwrap()); - - // peek at the file metadata, which follows symlinks. - // this must fail, as we didn't populate the target. - let e = fs::metadata(&p).expect_err("must fail"); - assert_eq!(std::io::ErrorKind::NotFound, e.kind()); - - // peeking at the file metadata without following symlinks will succeed. - let metadata = fs::symlink_metadata(&p).expect("must succeed"); - assert!(metadata.is_symlink()); - - // reading from the symlink (which follows) will fail, because the target doesn't exist. - let e = fs::read(p).expect_err("must fail"); - assert_eq!(std::io::ErrorKind::NotFound, e.kind()); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Read and stat a regular file through a symlink pointing to it. -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn read_stat_through_symlink() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - populate_symlink(&blob_service, &directory_service, &path_info_service); - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p_symlink = tmpdir.path().join(SYMLINK_NAME); - let p_blob = tmpdir.path().join(SYMLINK_NAME); - - // peek at the file metadata, which follows symlinks. - // this must now return the same metadata as when statting at the target directly. - let metadata_symlink = fs::metadata(&p_symlink).expect("must succeed"); - let metadata_blob = fs::metadata(&p_blob).expect("must succeed"); - assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type()); - assert_eq!(metadata_blob.len(), metadata_symlink.len()); - - // reading from the symlink (which follows) will return the same data as if - // we were reading from the file directly. - assert_eq!( - std::fs::read(p_blob).expect("must succeed"), - std::fs::read(p_symlink).expect("must succeed"), - ); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Read a directory in the root, and validate some attributes. -#[tokio::test] -async fn read_stat_directory() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); - - // peek at the metadata of the directory - let metadata = fs::metadata(p).expect("must succeed"); - assert!(metadata.is_dir()); - assert!(metadata.permissions().readonly()); - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -/// Read a blob inside a directory. This ensures we successfully populate directory data. -async fn read_blob_inside_dir() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep"); - - // peek at metadata. - let metadata = fs::metadata(&p).expect("must succeed"); - assert!(metadata.is_file()); - assert!(metadata.permissions().readonly()); - - // read from it - let data = fs::read(&p).expect("must succeed"); - assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -/// Read a blob inside a directory inside a directory. This ensures we properly -/// populate directories as we traverse down the structure. -async fn read_blob_deep_inside_dir() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir - .path() - .join(DIRECTORY_COMPLICATED_NAME) - .join("keep") - .join(".keep"); - - // peek at metadata. - let metadata = fs::metadata(&p).expect("must succeed"); - assert!(metadata.is_file()); - assert!(metadata.permissions().readonly()); - - // read from it - let data = fs::read(&p).expect("must succeed"); - assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure readdir works. -#[tokio::test] -async fn readdir() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME); - - { - // read_dir should succeed. Collect all elements - let elements: Vec<_> = fs::read_dir(p) - .expect("must succeed") - .map(|e| e.expect("must not be err")) - .collect(); - - assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and .. - - // We explicitly look at specific positions here, because we always emit - // them ordered. - - // ".keep", 0 byte file. - let e = &elements[0]; - assert_eq!(".keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_file()); - assert_eq!(0, e.metadata().expect("must succeed").len()); - - // "aa", symlink. - let e = &elements[1]; - assert_eq!("aa", e.file_name()); - assert!(e.file_type().expect("must succeed").is_symlink()); - - // "keep", directory - let e = &elements[2]; - assert_eq!("keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_dir()); - } - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test] -/// Do a readdir deeper inside a directory, without doing readdir or stat in the parent directory. -async fn readdir_deep() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); - - { - // read_dir should succeed. Collect all elements - let elements: Vec<_> = fs::read_dir(p) - .expect("must succeed") - .map(|e| e.expect("must not be err")) - .collect(); - - assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and .. - - // ".keep", 0 byte file. - let e = &elements[0]; - assert_eq!(".keep", e.file_name()); - assert!(e.file_type().expect("must succeed").is_file()); - assert_eq!(0, e.metadata().expect("must succeed").len()); - } - - fuse_daemon.unmount().expect("unmount"); -} - -/// Check attributes match how they show up in /nix/store normally. -#[tokio::test] -async fn check_attributes() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - populate_symlink(&blob_service, &directory_service, &path_info_service); - populate_helloworld_blob(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p_file = tmpdir.path().join(BLOB_A_NAME); - let p_directory = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); - let p_symlink = tmpdir.path().join(SYMLINK_NAME); - let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME); - - // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident. - let metadata_file = fs::symlink_metadata(&p_file).expect("must succeed"); - let metadata_executable_file = fs::symlink_metadata(&p_executable_file).expect("must succeed"); - let metadata_directory = fs::symlink_metadata(&p_directory).expect("must succeed"); - let metadata_symlink = fs::symlink_metadata(&p_symlink).expect("must succeed"); - - // modes should match. We & with 0o777 to remove any higher bits. - assert_eq!(0o444, metadata_file.mode() & 0o777); - assert_eq!(0o555, metadata_executable_file.mode() & 0o777); - assert_eq!(0o555, metadata_directory.mode() & 0o777); - assert_eq!(0o444, metadata_symlink.mode() & 0o777); - - // files should have the correct filesize - assert_eq!(fixtures::BLOB_A.len() as u64, metadata_file.len()); - // directories should have their "size" as filesize - assert_eq!( - fixtures::DIRECTORY_WITH_KEEP.size() as u64, - metadata_directory.size() - ); - - for metadata in &[&metadata_file, &metadata_directory, &metadata_symlink] { - // uid and gid should be 0. - assert_eq!(0, metadata.uid()); - assert_eq!(0, metadata.gid()); - - // all times should be set to the unix epoch. - assert_eq!(0, metadata.atime()); - assert_eq!(0, metadata.mtime()); - assert_eq!(0, metadata.ctime()); - // crtime seems MacOS only - } - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test] -/// Ensure we allocate the same inodes for the same directory contents. -/// $DIRECTORY_COMPLICATED_NAME/keep contains the same data as $DIRECTORY_WITH_KEEP. -async fn compare_inodes_directories() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p_dir_with_keep = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); - let p_sibling_dir = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); - - // peek at metadata. - assert_eq!( - fs::metadata(p_dir_with_keep).expect("must succeed").ino(), - fs::metadata(p_sibling_dir).expect("must succeed").ino() - ); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure we allocate the same inodes for the same directory contents. -/// $DIRECTORY_COMPLICATED_NAME/keep/,keep contains the same data as $DIRECTORY_COMPLICATED_NAME/.keep -#[tokio::test] -async fn compare_inodes_files() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p_keep1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join(".keep"); - let p_keep2 = tmpdir - .path() - .join(DIRECTORY_COMPLICATED_NAME) - .join("keep") - .join(".keep"); - - // peek at metadata. - assert_eq!( - fs::metadata(p_keep1).expect("must succeed").ino(), - fs::metadata(p_keep2).expect("must succeed").ino() - ); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure we allocate the same inode for symlinks pointing to the same targets. -/// $DIRECTORY_COMPLICATED_NAME/aa points to the same target as SYMLINK_NAME2. -#[tokio::test] -async fn compare_inodes_symlinks() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - populate_symlink2(&blob_service, &directory_service, &path_info_service); - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("aa"); - let p2 = tmpdir.path().join(SYMLINK_NAME2); - - // peek at metadata. - assert_eq!( - fs::symlink_metadata(p1).expect("must succeed").ino(), - fs::symlink_metadata(p2).expect("must succeed").ino() - ); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Check we match paths exactly. -#[tokio::test] -async fn read_wrong_paths_in_root() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - // wrong name - assert!(!tmpdir - .path() - .join("00000000000000000000000000000000-tes") - .exists()); - - // invalid hash - assert!(!tmpdir - .path() - .join("0000000000000000000000000000000-test") - .exists()); - - // right name, must exist - assert!(tmpdir - .path() - .join("00000000000000000000000000000000-test") - .exists()); - - // now wrong name with right hash still may not exist - assert!(!tmpdir - .path() - .join("00000000000000000000000000000000-tes") - .exists()); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Make sure writes are not allowed -#[tokio::test] -async fn disallow_writes() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(BLOB_A_NAME); - let e = std::fs::File::create(p).expect_err("must fail"); - - assert_eq!(Some(libc::EROFS), e.raw_os_error()); - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test] -/// Ensure we get an IO error if the directory service does not have the Directory object. -async fn missing_directory() { - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service); - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); - - { - // `stat` on the path should succeed, because it doesn't trigger the directory request. - fs::metadata(&p).expect("must succeed"); - - // However, calling either `readdir` or `stat` on a child should fail with an IO error. - // It fails when trying to pull the first entry, because we don't implement opendir separately - fs::read_dir(&p) - .unwrap() - .next() - .expect("must be some") - .expect_err("must be err"); - - // rust currently sets e.kind() to Uncategorized, which isn't very - // helpful, so we don't look at the error more closely than that.. - fs::metadata(p.join(".keep")).expect_err("must fail"); - } - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -/// Ensure we get an IO error if the blob service does not have the blob -async fn missing_blob() { - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service); - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(BLOB_A_NAME); - - { - // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service. - fs::metadata(&p).expect("must succeed"); - - // However, calling read on the blob should fail. - // rust currently sets e.kind() to Uncategorized, which isn't very - // helpful, so we don't look at the error more closely than that.. - fs::read(p).expect_err("must fail"); - } - - fuse_daemon.unmount().expect("unmount"); -} diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index 252506de5977..6270812d47fc 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,7 +1,8 @@ mod digests; mod errors; -#[cfg(feature = "fuse")] -mod fuse; + +#[cfg(feature = "fs")] +pub mod fs; pub mod blobservice; pub mod directoryservice; @@ -13,8 +14,5 @@ pub mod proto; pub use digests::B3Digest; pub use errors::Error; -#[cfg(feature = "fuse")] -pub use fuse::{FuseDaemon, FUSE}; - #[cfg(test)] mod tests; -- cgit 1.4.1