diff options
author | Florian Klink <flokli@flokli.de> | 2023-12-16T22·16+0200 |
---|---|---|
committer | flokli <flokli@flokli.de> | 2023-12-22T16·55+0000 |
commit | a5167c508cf2ed92f8a39696a6b4376cf25ee872 (patch) | |
tree | 5ffb2f8d0d331b6fea1aeb4f6391e0408df6d234 /tvix/store/src/fs | |
parent | 52cad8619511b97c4bcd5768ce9b3579ff665505 (diff) |
chore(tvix): move store/fs to castore/fs r/7256
With the recent introduction of the RootNodes trait, there's nothing in the fs module pulling in tvix-store dependencies, so it can live in tvix-castore. This allows other crates to make use of TvixStoreFS, without having to pull in tvix-store. For example, a tvix-build using a fuse mountpoint at /nix/store doesn't need a PathInfoService to hold the root nodes that should be present, but just a list. tvix-store now has a pathinfoservice/fs module, which contains the necessary glue logic to implement the RootNodes trait for a PathInfoService. To satisfy Rust orphan rules for trait implementations, we had to add a small wrapper struct. It's mostly hidden away by the make_fs helper function returning a TvixStoreFs. It can't be entirely private, as its still leaking into the concrete type of TvixStoreFS. tvix-store still has `fuse` and `virtiofs` features, but they now simply enable these features in the `tvix-castore` crate they depend on. The tests for the fuse functionality stay in tvix-store for now, as they populate the root nodes through a PathInfoService. Once above mentioned "list of root nodes" implementation exists, we might want to shuffle this around one more time. Fixes b/341. Change-Id: I989f664827a5a361b23b34368d242d10c157c756 Reviewed-on: https://cl.tvl.fyi/c/depot/+/10378 Autosubmit: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Reviewed-by: sterni <sternenseemann@systemli.org>
Diffstat (limited to 'tvix/store/src/fs')
-rw-r--r-- | tvix/store/src/fs/file_attr.rs | 53 | ||||
-rw-r--r-- | tvix/store/src/fs/fuse.rs | 113 | ||||
-rw-r--r-- | tvix/store/src/fs/inode_tracker.rs | 207 | ||||
-rw-r--r-- | tvix/store/src/fs/inodes.rs | 57 | ||||
-rw-r--r-- | tvix/store/src/fs/mod.rs | 629 | ||||
-rw-r--r-- | tvix/store/src/fs/root_nodes.rs | 61 | ||||
-rw-r--r-- | tvix/store/src/fs/tests.rs | 1171 | ||||
-rw-r--r-- | tvix/store/src/fs/virtiofs.rs | 237 |
8 files changed, 0 insertions, 2528 deletions
diff --git a/tvix/store/src/fs/file_attr.rs b/tvix/store/src/fs/file_attr.rs deleted file mode 100644 index ad41f036a253..000000000000 --- a/tvix/store/src/fs/file_attr.rs +++ /dev/null @@ -1,53 +0,0 @@ -#![allow(clippy::unnecessary_cast)] // libc::S_IFDIR is u32 on Linux and u16 on MacOS -use super::inodes::{DirectoryInodeData, InodeData}; -use fuse_backend_rs::abi::fuse_abi::Attr; - -/// The [Attr] describing the root -pub const ROOT_FILE_ATTR: Attr = Attr { - ino: fuse_backend_rs::api::filesystem::ROOT_ID, - size: 0, - blksize: 1024, - blocks: 0, - mode: libc::S_IFDIR as u32 | 0o555, - atime: 0, - mtime: 0, - ctime: 0, - atimensec: 0, - mtimensec: 0, - ctimensec: 0, - nlink: 0, - uid: 0, - gid: 0, - rdev: 0, - flags: 0, - #[cfg(target_os = "macos")] - crtime: 0, - #[cfg(target_os = "macos")] - crtimensec: 0, - #[cfg(target_os = "macos")] - padding: 0, -}; - -/// for given &Node and inode, construct an [Attr] -pub fn gen_file_attr(inode_data: &InodeData, inode: u64) -> Attr { - Attr { - ino: inode, - // FUTUREWORK: play with this numbers, as it affects read sizes for client applications. - blocks: 1024, - size: match inode_data { - InodeData::Regular(_, size, _) => *size as u64, - InodeData::Symlink(target) => target.len() as u64, - InodeData::Directory(DirectoryInodeData::Sparse(_, size)) => *size as u64, - InodeData::Directory(DirectoryInodeData::Populated(_, ref children)) => { - children.len() as u64 - } - }, - mode: match inode_data { - InodeData::Regular(_, _, false) => libc::S_IFREG as u32 | 0o444, // no-executable files - InodeData::Regular(_, _, true) => libc::S_IFREG as u32 | 0o555, // executable files - InodeData::Symlink(_) => libc::S_IFLNK as u32 | 0o444, - InodeData::Directory(_) => libc::S_IFDIR as u32 | 0o555, - }, - ..Default::default() - } -} diff --git a/tvix/store/src/fs/fuse.rs b/tvix/store/src/fs/fuse.rs deleted file mode 100644 index 98793bf47d58..000000000000 --- a/tvix/store/src/fs/fuse.rs +++ /dev/null @@ -1,113 +0,0 @@ -use std::{io, path::Path, sync::Arc, thread}; - -use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; -use tracing::error; - -struct FuseServer<FS> -where - FS: FileSystem + Sync + Send, -{ - server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>, - channel: fuse_backend_rs::transport::FuseChannel, -} - -#[cfg(target_os = "macos")] -const BADFD: libc::c_int = libc::EBADF; -#[cfg(target_os = "linux")] -const BADFD: libc::c_int = libc::EBADFD; - -impl<FS> FuseServer<FS> -where - FS: FileSystem + Sync + Send, -{ - fn start(&mut self) -> io::Result<()> { - while let Some((reader, writer)) = self - .channel - .get_request() - .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? - { - if let Err(e) = self - .server - .handle_message(reader, writer.into(), None, None) - { - match e { - // This indicates the session has been shut down. - fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => { - break; - } - error => { - error!(?error, "failed to handle fuse request"); - continue; - } - } - } - } - Ok(()) - } -} - -pub struct FuseDaemon { - session: FuseSession, - threads: Vec<thread::JoinHandle<()>>, -} - -impl FuseDaemon { - pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error> - where - FS: FileSystem + Sync + Send + 'static, - P: AsRef<Path>, - { - let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); - - let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - - #[cfg(target_os = "linux")] - session.set_allow_other(false); - session - .mount() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - let mut join_handles = Vec::with_capacity(threads); - for _ in 0..threads { - let mut server = FuseServer { - server: server.clone(), - channel: session - .new_channel() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, - }; - let join_handle = thread::Builder::new() - .name("fuse_server".to_string()) - .spawn(move || { - let _ = server.start(); - })?; - join_handles.push(join_handle); - } - - Ok(FuseDaemon { - session, - threads: join_handles, - }) - } - - pub fn unmount(&mut self) -> Result<(), io::Error> { - self.session - .umount() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - - for thread in self.threads.drain(..) { - thread.join().map_err(|_| { - io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") - })?; - } - - Ok(()) - } -} - -impl Drop for FuseDaemon { - fn drop(&mut self) { - if let Err(error) = self.unmount() { - error!(?error, "failed to unmont fuse filesystem") - } - } -} diff --git a/tvix/store/src/fs/inode_tracker.rs b/tvix/store/src/fs/inode_tracker.rs deleted file mode 100644 index 3cabbbd247b5..000000000000 --- a/tvix/store/src/fs/inode_tracker.rs +++ /dev/null @@ -1,207 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use super::inodes::{DirectoryInodeData, InodeData}; -use tvix_castore::B3Digest; - -/// InodeTracker keeps track of inodes, stores data being these inodes and deals -/// with inode allocation. -pub struct InodeTracker { - data: HashMap<u64, Arc<InodeData>>, - - // lookup table for blobs by their B3Digest - blob_digest_to_inode: HashMap<B3Digest, u64>, - - // lookup table for symlinks by their target - symlink_target_to_inode: HashMap<bytes::Bytes, u64>, - - // lookup table for directories by their B3Digest. - // Note the corresponding directory may not be present in data yet. - directory_digest_to_inode: HashMap<B3Digest, u64>, - - // the next inode to allocate - next_inode: u64, -} - -impl Default for InodeTracker { - fn default() -> Self { - Self { - data: Default::default(), - - blob_digest_to_inode: Default::default(), - symlink_target_to_inode: Default::default(), - directory_digest_to_inode: Default::default(), - - next_inode: 2, - } - } -} - -impl InodeTracker { - // Retrieves data for a given inode, if it exists. - pub fn get(&self, ino: u64) -> Option<Arc<InodeData>> { - self.data.get(&ino).cloned() - } - - // Replaces data for a given inode. - // Panics if the inode doesn't already exist. - pub fn replace(&mut self, ino: u64, data: Arc<InodeData>) { - if self.data.insert(ino, data).is_none() { - panic!("replace called on unknown inode"); - } - } - - // Stores data and returns the inode for it. - // In case an inode has already been allocated for the same data, that inode - // is returned, otherwise a new one is allocated. - // In case data is a [InodeData::Directory], inodes for all items are looked - // up - pub fn put(&mut self, data: InodeData) -> u64 { - match data { - InodeData::Regular(ref digest, _, _) => { - match self.blob_digest_to_inode.get(digest) { - Some(found_ino) => { - // We already have it, return the inode. - *found_ino - } - None => self.insert_and_increment(data), - } - } - InodeData::Symlink(ref target) => { - match self.symlink_target_to_inode.get(target) { - Some(found_ino) => { - // We already have it, return the inode. - *found_ino - } - None => self.insert_and_increment(data), - } - } - InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { - // check the lookup table if the B3Digest is known. - match self.directory_digest_to_inode.get(digest) { - Some(found_ino) => { - // We already have it, return the inode. - *found_ino - } - None => { - // insert and return the inode - self.insert_and_increment(data) - } - } - } - // Inserting [DirectoryInodeData::Populated] doesn't normally happen, - // only via [replace]. - InodeData::Directory(DirectoryInodeData::Populated(..)) => { - unreachable!("should never be called with DirectoryInodeData::Populated") - } - } - } - - // Inserts the data and returns the inode it was stored at, while - // incrementing next_inode. - fn insert_and_increment(&mut self, data: InodeData) -> u64 { - let ino = self.next_inode; - // insert into lookup tables - match data { - InodeData::Regular(ref digest, _, _) => { - self.blob_digest_to_inode.insert(digest.clone(), ino); - } - InodeData::Symlink(ref target) => { - self.symlink_target_to_inode.insert(target.clone(), ino); - } - InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { - self.directory_digest_to_inode.insert(digest.clone(), ino); - } - // This is currently not used outside test fixtures. - // Usually a [DirectoryInodeData::Sparse] is inserted and later - // "upgraded" with more data. - // However, as a future optimization, a lookup for a PathInfo could trigger a - // [DirectoryService::get_recursive()] request that "forks into - // background" and prepopulates all Directories in a closure. - InodeData::Directory(DirectoryInodeData::Populated(ref digest, _)) => { - self.directory_digest_to_inode.insert(digest.clone(), ino); - } - } - // Insert data - self.data.insert(ino, Arc::new(data)); - - // increment inode counter and return old inode. - self.next_inode += 1; - ino - } -} - -#[cfg(test)] -mod tests { - use crate::tests::fixtures; - - use super::InodeData; - use super::InodeTracker; - - /// Getting something non-existent should be none - #[test] - fn get_nonexistent() { - let inode_tracker = InodeTracker::default(); - assert!(inode_tracker.get(1).is_none()); - } - - /// Put of a regular file should allocate a uid, which should be the same when inserting again. - #[test] - fn put_regular() { - let mut inode_tracker = InodeTracker::default(); - let f = InodeData::Regular( - fixtures::BLOB_A_DIGEST.clone(), - fixtures::BLOB_A.len() as u64, - false, - ); - - // put it in - let ino = inode_tracker.put(f.clone()); - - // a get should return the right data - let data = inode_tracker.get(ino).expect("must be some"); - match *data { - InodeData::Regular(ref digest, _, _) => { - assert_eq!(&fixtures::BLOB_A_DIGEST.clone(), digest); - } - InodeData::Symlink(_) | InodeData::Directory(..) => panic!("wrong type"), - } - - // another put should return the same ino - assert_eq!(ino, inode_tracker.put(f)); - - // inserting another file should return a different ino - assert_ne!( - ino, - inode_tracker.put(InodeData::Regular( - fixtures::BLOB_B_DIGEST.clone(), - fixtures::BLOB_B.len() as u64, - false, - )) - ); - } - - // Put of a symlink should allocate a uid, which should be the same when inserting again - #[test] - fn put_symlink() { - let mut inode_tracker = InodeTracker::default(); - let f = InodeData::Symlink("target".into()); - - // put it in - let ino = inode_tracker.put(f.clone()); - - // a get should return the right data - let data = inode_tracker.get(ino).expect("must be some"); - match *data { - InodeData::Symlink(ref target) => { - assert_eq!(b"target".to_vec(), *target); - } - InodeData::Regular(..) | InodeData::Directory(..) => panic!("wrong type"), - } - - // another put should return the same ino - assert_eq!(ino, inode_tracker.put(f)); - - // inserting another file should return a different ino - assert_ne!(ino, inode_tracker.put(InodeData::Symlink("target2".into()))); - } -} diff --git a/tvix/store/src/fs/inodes.rs b/tvix/store/src/fs/inodes.rs deleted file mode 100644 index 4047199982b2..000000000000 --- a/tvix/store/src/fs/inodes.rs +++ /dev/null @@ -1,57 +0,0 @@ -//! This module contains all the data structures used to track information -//! about inodes, which present tvix-store nodes in a filesystem. -use tvix_castore::proto as castorepb; -use tvix_castore::B3Digest; - -#[derive(Clone, Debug)] -pub enum InodeData { - Regular(B3Digest, u64, bool), // digest, size, executable - Symlink(bytes::Bytes), // target - Directory(DirectoryInodeData), // either [DirectoryInodeData:Sparse] or [DirectoryInodeData:Populated] -} - -/// This encodes the two different states of [InodeData::Directory]. -/// Either the data still is sparse (we only saw a [castorepb::DirectoryNode], -/// but didn't fetch the [castorepb::Directory] struct yet, or we processed a -/// lookup and did fetch the data. -#[derive(Clone, Debug)] -pub enum DirectoryInodeData { - Sparse(B3Digest, u64), // digest, size - Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)] -} - -impl From<&castorepb::node::Node> for InodeData { - fn from(value: &castorepb::node::Node) -> Self { - match value { - castorepb::node::Node::Directory(directory_node) => directory_node.into(), - castorepb::node::Node::File(file_node) => file_node.into(), - castorepb::node::Node::Symlink(symlink_node) => symlink_node.into(), - } - } -} - -impl From<&castorepb::SymlinkNode> for InodeData { - fn from(value: &castorepb::SymlinkNode) -> Self { - InodeData::Symlink(value.target.clone()) - } -} - -impl From<&castorepb::FileNode> for InodeData { - fn from(value: &castorepb::FileNode) -> Self { - InodeData::Regular( - value.digest.clone().try_into().unwrap(), - value.size, - value.executable, - ) - } -} - -/// Converts a DirectoryNode to a sparsely populated InodeData::Directory. -impl From<&castorepb::DirectoryNode> for InodeData { - fn from(value: &castorepb::DirectoryNode) -> Self { - InodeData::Directory(DirectoryInodeData::Sparse( - value.digest.clone().try_into().unwrap(), - value.size, - )) - } -} diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs deleted file mode 100644 index c11bd0a44c7e..000000000000 --- a/tvix/store/src/fs/mod.rs +++ /dev/null @@ -1,629 +0,0 @@ -mod file_attr; -mod inode_tracker; -mod inodes; -mod root_nodes; - -#[cfg(feature = "fuse")] -pub mod fuse; - -#[cfg(feature = "virtiofs")] -pub mod virtiofs; - -#[cfg(test)] -mod tests; - -use fuse_backend_rs::abi::fuse_abi::stat64; -use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; -use futures::StreamExt; -use parking_lot::RwLock; -use std::ops::Deref; -use std::{ - collections::HashMap, - io, - sync::atomic::AtomicU64, - sync::{atomic::Ordering, Arc}, - time::Duration, -}; -use tokio::{ - io::{AsyncReadExt, AsyncSeekExt}, - sync::mpsc, -}; -use tracing::{debug, info_span, instrument, warn}; -use tvix_castore::proto as castorepb; -use tvix_castore::{ - blobservice::{BlobReader, BlobService}, - directoryservice::DirectoryService, - proto::{node::Node, NamedNode}, - B3Digest, -}; - -use self::root_nodes::RootNodes; -use self::{ - file_attr::{gen_file_attr, ROOT_FILE_ATTR}, - inode_tracker::InodeTracker, - inodes::{DirectoryInodeData, InodeData}, -}; - -/// This implements a read-only FUSE filesystem for a tvix-store -/// with the passed [BlobService], [DirectoryService] and [RootNodes]. -/// -/// Linux uses inodes in filesystems. When implementing FUSE, most calls are -/// *for* a given inode. -/// -/// This means, we need to have a stable mapping of inode numbers to the -/// corresponding store nodes. -/// -/// We internally delegate all inode allocation and state keeping to the -/// inode tracker. -/// We store a mapping from currently "explored" names in the root to their -/// inode. -/// -/// There's some places where inodes are allocated / data inserted into -/// the inode tracker, if not allocated before already: -/// - Processing a `lookup` request, either in the mount root, or somewhere -/// deeper. -/// - Processing a `readdir` request -/// -/// Things pointing to the same contents get the same inodes, irrespective of -/// their own location. -/// This means: -/// - Symlinks with the same target will get the same inode. -/// - Regular/executable files with the same contents will get the same inode -/// - Directories with the same contents will get the same inode. -/// -/// Due to the above being valid across the whole store, and considering the -/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed -/// allocation", aka reserve Directory.size inodes for each directory node we -/// explore. -pub struct TvixStoreFs<BS, DS, RN> { - blob_service: BS, - directory_service: DS, - root_nodes_provider: RN, - - /// Whether to (try) listing elements in the root. - list_root: bool, - - /// This maps a given basename in the root to the inode we allocated for the node. - root_nodes: RwLock<HashMap<Vec<u8>, u64>>, - - /// This keeps track of inodes and data alongside them. - inode_tracker: RwLock<InodeTracker>, - - /// This holds all open file handles - #[allow(clippy::type_complexity)] - file_handles: RwLock<HashMap<u64, Arc<tokio::sync::Mutex<Box<dyn BlobReader>>>>>, - - next_file_handle: AtomicU64, - - tokio_handle: tokio::runtime::Handle, -} - -impl<BS, DS, RN> TvixStoreFs<BS, DS, RN> -where - BS: Deref<Target = dyn BlobService> + Clone + Send, - DS: Deref<Target = dyn DirectoryService> + Clone + Send + 'static, - RN: RootNodes + Clone + 'static, -{ - pub fn new( - blob_service: BS, - directory_service: DS, - root_nodes_provider: RN, - list_root: bool, - ) -> Self { - Self { - blob_service, - directory_service, - root_nodes_provider, - - list_root, - - root_nodes: RwLock::new(HashMap::default()), - inode_tracker: RwLock::new(Default::default()), - - file_handles: RwLock::new(Default::default()), - next_file_handle: AtomicU64::new(1), - tokio_handle: tokio::runtime::Handle::current(), - } - } - - /// Retrieves the inode for a given root node basename, if present. - /// This obtains a read lock on self.root_nodes. - fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> { - self.root_nodes.read().get(name).cloned() - } - - /// For a given inode, look up the given directory behind it (from - /// self.inode_tracker), and return its children. - /// The inode_tracker MUST know about this inode already, and it MUST point - /// to a [InodeData::Directory]. - /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup - /// in self.directory_service is performed, and self.inode_tracker is updated with the - /// [DirectoryInodeData::Populated]. - #[instrument(skip(self), err)] - fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> { - let data = self.inode_tracker.read().get(ino).unwrap(); - match *data { - // if it's populated already, return children. - InodeData::Directory(DirectoryInodeData::Populated( - ref parent_digest, - ref children, - )) => Ok((parent_digest.clone(), children.clone())), - // if it's sparse, fetch data using directory_service, populate child nodes - // and update it in [self.inode_tracker]. - InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { - let directory = self - .tokio_handle - .block_on(self.tokio_handle.spawn({ - let directory_service = self.directory_service.clone(); - let parent_digest = parent_digest.to_owned(); - async move { directory_service.get(&parent_digest).await } - })) - .unwrap()? - .ok_or_else(|| { - warn!(directory.digest=%parent_digest, "directory not found"); - // If the Directory can't be found, this is a hole, bail out. - io::Error::from_raw_os_error(libc::EIO) - })?; - - // Turn the retrieved directory into a InodeData::Directory(DirectoryInodeData::Populated(..)), - // allocating inodes for the children on the way. - let children = { - let mut inode_tracker = self.inode_tracker.write(); - - let children: Vec<(u64, castorepb::node::Node)> = directory - .nodes() - .map(|child_node| { - let child_ino = inode_tracker.put((&child_node).into()); - (child_ino, child_node) - }) - .collect(); - - // replace. - inode_tracker.replace( - ino, - Arc::new(InodeData::Directory(DirectoryInodeData::Populated( - parent_digest.clone(), - children.clone(), - ))), - ); - - children - }; - - Ok((parent_digest.clone(), children)) - } - // if the parent inode was not a directory, this doesn't make sense - InodeData::Regular(..) | InodeData::Symlink(_) => { - Err(io::Error::from_raw_os_error(libc::ENOTDIR)) - } - } - } - - /// This will turn a lookup request for a name in the root to a ino and - /// [InodeData]. - /// It will peek in [self.root_nodes], and then either look it up from - /// [self.inode_tracker], - /// or otherwise fetch from [self.root_nodes], and then insert into - /// [self.inode_tracker]. - /// In the case the name can't be found, a libc::ENOENT is returned. - fn name_in_root_to_ino_and_data( - &self, - name: &std::ffi::CStr, - ) -> io::Result<(u64, Arc<InodeData>)> { - // Look up the inode for that root node. - // If there's one, [self.inode_tracker] MUST also contain the data, - // which we can then return. - if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) { - return Ok(( - inode, - self.inode_tracker - .read() - .get(inode) - .expect("must exist") - .to_owned(), - )); - } - - // We don't have it yet, look it up in [self.root_nodes]. - match self.tokio_handle.block_on({ - let root_nodes_provider = self.root_nodes_provider.clone(); - async move { root_nodes_provider.get_by_basename(name.to_bytes()).await } - }) { - // if there was an error looking up the root node, propagate up an IO error. - Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)), - // the root node doesn't exist, so the file doesn't exist. - Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), - // The root node does exist - Ok(Some(root_node)) => { - // The name must match what's passed in the lookup, otherwise this is also a ENOENT. - if root_node.get_name() != name.to_bytes() { - debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch"); - return Err(io::Error::from_raw_os_error(libc::ENOENT)); - } - - // Let's check if someone else beat us to updating the inode tracker and - // root_nodes map. This avoids locking inode_tracker for writing. - if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) { - return Ok(( - *ino, - self.inode_tracker.read().get(*ino).expect("must exist"), - )); - } - - // Only in case it doesn't, lock [self.root_nodes] and - // [self.inode_tracker] for writing. - let mut root_nodes = self.root_nodes.write(); - let mut inode_tracker = self.inode_tracker.write(); - - // insert the (sparse) inode data and register in - // self.root_nodes. - let inode_data: InodeData = (&root_node).into(); - let ino = inode_tracker.put(inode_data.clone()); - root_nodes.insert(name.to_bytes().into(), ino); - - Ok((ino, Arc::new(inode_data))) - } - } - } -} - -impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN> -where - BS: Deref<Target = dyn BlobService> + Clone + Send + 'static, - DS: Deref<Target = dyn DirectoryService> + Send + Clone + 'static, - RN: RootNodes + Clone + 'static, -{ - type Handle = u64; - type Inode = u64; - - fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> { - Ok(FsOptions::empty()) - } - - #[tracing::instrument(skip_all, fields(rq.inode = inode))] - fn getattr( - &self, - _ctx: &Context, - inode: Self::Inode, - _handle: Option<Self::Handle>, - ) -> io::Result<(stat64, Duration)> { - if inode == ROOT_ID { - return Ok((ROOT_FILE_ATTR.into(), Duration::MAX)); - } - - match self.inode_tracker.read().get(inode) { - None => Err(io::Error::from_raw_os_error(libc::ENOENT)), - Some(node) => { - debug!(node = ?node, "found node"); - Ok((gen_file_attr(&node, inode).into(), Duration::MAX)) - } - } - } - - #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))] - fn lookup( - &self, - _ctx: &Context, - parent: Self::Inode, - name: &std::ffi::CStr, - ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> { - debug!("lookup"); - - // This goes from a parent inode to a node. - // - If the parent is [ROOT_ID], we need to check - // [self.root_nodes] (fetching from a [RootNode] provider if needed) - // - Otherwise, lookup the parent in [self.inode_tracker] (which must be - // a [InodeData::Directory]), and find the child with that name. - if parent == ROOT_ID { - let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?; - - debug!(inode_data=?&inode_data, ino=ino, "Some"); - return Ok(fuse_backend_rs::api::filesystem::Entry { - inode: ino, - attr: gen_file_attr(&inode_data, ino).into(), - attr_timeout: Duration::MAX, - entry_timeout: Duration::MAX, - ..Default::default() - }); - } - // This is the "lookup for "a" inside inode 42. - // We already know that inode 42 must be a directory. - let (parent_digest, children) = self.get_directory_children(parent)?; - - let span = info_span!("lookup", directory.digest = %parent_digest); - let _enter = span.enter(); - - // Search for that name in the list of children and return the FileAttrs. - - // in the children, find the one with the desired name. - if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { - // lookup the child [InodeData] in [self.inode_tracker]. - // We know the inodes for children have already been allocated. - let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap(); - - // Reply with the file attributes for the child. - // For child directories, we still have all data we need to reply. - Ok(fuse_backend_rs::api::filesystem::Entry { - inode: *child_ino, - attr: gen_file_attr(&child_inode_data, *child_ino).into(), - attr_timeout: Duration::MAX, - entry_timeout: Duration::MAX, - ..Default::default() - }) - } else { - // Child not found, return ENOENT. - Err(io::Error::from_raw_os_error(libc::ENOENT)) - } - } - - // TODO: readdirplus? - - #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))] - fn readdir( - &self, - _ctx: &Context, - inode: Self::Inode, - _handle: Self::Handle, - _size: u32, - offset: u64, - add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>, - ) -> io::Result<()> { - debug!("readdir"); - - if inode == ROOT_ID { - if !self.list_root { - return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo - } else { - let root_nodes_provider = self.root_nodes_provider.clone(); - let (tx, mut rx) = mpsc::channel(16); - - // This task will run in the background immediately and will exit - // after the stream ends or if we no longer want any more entries. - self.tokio_handle.spawn(async move { - let mut stream = root_nodes_provider.list().skip(offset as usize).enumerate(); - while let Some(node) = stream.next().await { - if tx.send(node).await.is_err() { - // If we get a send error, it means the sync code - // doesn't want any more entries. - break; - } - } - }); - - while let Some((i, root_node)) = rx.blocking_recv() { - let root_node = match root_node { - Err(e) => { - warn!("failed to retrieve pathinfo: {}", e); - return Err(io::Error::from_raw_os_error(libc::EPERM)); - } - Ok(root_node) => root_node, - }; - - let name = root_node.get_name(); - // obtain the inode, or allocate a new one. - let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| { - // insert the (sparse) inode data and register in - // self.root_nodes. - let ino = self.inode_tracker.write().put((&root_node).into()); - self.root_nodes.write().insert(name.into(), ino); - ino - }); - - let ty = match root_node { - Node::Directory(_) => libc::S_IFDIR, - Node::File(_) => libc::S_IFREG, - Node::Symlink(_) => libc::S_IFLNK, - }; - - let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { - ino, - offset: offset + i as u64 + 1, - type_: ty, - name, - })?; - // If the buffer is full, add_entry will return `Ok(0)`. - if written == 0 { - break; - } - } - - return Ok(()); - } - } - - // lookup the children, or return an error if it's not a directory. - let (parent_digest, children) = self.get_directory_children(inode)?; - - let span = info_span!("lookup", directory.digest = %parent_digest); - let _enter = span.enter(); - - for (i, (ino, child_node)) in children.iter().skip(offset as usize).enumerate() { - // the second parameter will become the "offset" parameter on the next call. - let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { - ino: *ino, - offset: offset + i as u64 + 1, - type_: match child_node { - #[allow(clippy::unnecessary_cast)] - // libc::S_IFDIR is u32 on Linux and u16 on MacOS - Node::Directory(_) => libc::S_IFDIR as u32, - #[allow(clippy::unnecessary_cast)] - // libc::S_IFDIR is u32 on Linux and u16 on MacOS - Node::File(_) => libc::S_IFREG as u32, - #[allow(clippy::unnecessary_cast)] - // libc::S_IFDIR is u32 on Linux and u16 on MacOS - Node::Symlink(_) => libc::S_IFLNK as u32, - }, - name: child_node.get_name(), - })?; - // If the buffer is full, add_entry will return `Ok(0)`. - if written == 0 { - break; - } - } - - Ok(()) - } - - #[tracing::instrument(skip_all, fields(rq.inode = inode))] - fn open( - &self, - _ctx: &Context, - inode: Self::Inode, - _flags: u32, - _fuse_flags: u32, - ) -> io::Result<( - Option<Self::Handle>, - fuse_backend_rs::api::filesystem::OpenOptions, - )> { - if inode == ROOT_ID { - return Err(io::Error::from_raw_os_error(libc::ENOSYS)); - } - - // lookup the inode - match *self.inode_tracker.read().get(inode).unwrap() { - // read is invalid on non-files. - InodeData::Directory(..) | InodeData::Symlink(_) => { - warn!("is directory"); - Err(io::Error::from_raw_os_error(libc::EISDIR)) - } - InodeData::Regular(ref blob_digest, _blob_size, _) => { - let span = info_span!("read", blob.digest = %blob_digest); - let _enter = span.enter(); - - let blob_service = self.blob_service.clone(); - let blob_digest = blob_digest.clone(); - - let task = self - .tokio_handle - .spawn(async move { blob_service.open_read(&blob_digest).await }); - - let blob_reader = self.tokio_handle.block_on(task).unwrap(); - - match blob_reader { - Ok(None) => { - warn!("blob not found"); - Err(io::Error::from_raw_os_error(libc::EIO)) - } - Err(e) => { - warn!(e=?e, "error opening blob"); - Err(io::Error::from_raw_os_error(libc::EIO)) - } - Ok(Some(blob_reader)) => { - // get a new file handle - // TODO: this will overflow after 2**64 operations, - // which is fine for now. - // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 - // for the discussion on alternatives. - let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst); - - debug!("add file handle {}", fh); - self.file_handles - .write() - .insert(fh, Arc::new(tokio::sync::Mutex::new(blob_reader))); - - Ok(( - Some(fh), - fuse_backend_rs::api::filesystem::OpenOptions::empty(), - )) - } - } - } - } - } - - #[tracing::instrument(skip_all, fields(rq.inode = inode, fh = handle))] - fn release( - &self, - _ctx: &Context, - inode: Self::Inode, - _flags: u32, - handle: Self::Handle, - _flush: bool, - _flock_release: bool, - _lock_owner: Option<u64>, - ) -> io::Result<()> { - // remove and get ownership on the blob reader - match self.file_handles.write().remove(&handle) { - // drop it, which will close it. - Some(blob_reader) => drop(blob_reader), - None => { - // These might already be dropped if a read error occured. - debug!("file_handle {} not found", handle); - } - } - - Ok(()) - } - - #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset, rq.size = size))] - fn read( - &self, - _ctx: &Context, - inode: Self::Inode, - handle: Self::Handle, - w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter, - size: u32, - offset: u64, - _lock_owner: Option<u64>, - _flags: u32, - ) -> io::Result<usize> { - debug!("read"); - - // We need to take out the blob reader from self.file_handles, so we can - // interact with it in the separate task. - // On success, we pass it back out of the task, so we can put it back in self.file_handles. - let blob_reader = match self.file_handles.read().get(&handle) { - Some(blob_reader) => blob_reader.clone(), - None => { - warn!("file handle {} unknown", handle); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - }; - - let task = self.tokio_handle.spawn(async move { - let mut blob_reader = blob_reader.lock().await; - - // seek to the offset specified, which is relative to the start of the file. - let resp = blob_reader.seek(io::SeekFrom::Start(offset)).await; - - match resp { - Ok(pos) => { - debug_assert_eq!(offset, pos); - } - Err(e) => { - warn!("failed to seek to offset {}: {}", offset, e); - return Err(io::Error::from_raw_os_error(libc::EIO)); - } - } - - // As written in the fuse docs, read should send exactly the number - // of bytes requested except on EOF or error. - - let mut buf: Vec<u8> = Vec::with_capacity(size as usize); - - // copy things from the internal buffer into buf to fill it till up until size - tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?; - - Ok(buf) - }); - - let buf = self.tokio_handle.block_on(task).unwrap()?; - - w.write(&buf) - } - - #[tracing::instrument(skip_all, fields(rq.inode = inode))] - fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> { - if inode == ROOT_ID { - return Err(io::Error::from_raw_os_error(libc::ENOSYS)); - } - - // lookup the inode - match *self.inode_tracker.read().get(inode).unwrap() { - InodeData::Directory(..) | InodeData::Regular(..) => { - Err(io::Error::from_raw_os_error(libc::EINVAL)) - } - InodeData::Symlink(ref target) => Ok(target.to_vec()), - } - } -} diff --git a/tvix/store/src/fs/root_nodes.rs b/tvix/store/src/fs/root_nodes.rs deleted file mode 100644 index e672c6e647f3..000000000000 --- a/tvix/store/src/fs/root_nodes.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::{ops::Deref, pin::Pin}; - -use futures::{Stream, StreamExt}; -use nix_compat::store_path::StorePath; -use tonic::async_trait; -use tvix_castore::{proto::node::Node, Error}; - -use crate::pathinfoservice::PathInfoService; - -/// Provides an interface for looking up root nodes in tvix-castore by given -/// a lookup key (usually the basename), and optionally allow a listing. -/// -#[async_trait] -pub trait RootNodes: Send + Sync { - /// Looks up a root CA node based on the basename of the node in the root - /// directory of the filesystem. - async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>; - - /// Lists all root CA nodes in the filesystem. An error can be returned - /// in case listing is not allowed - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send>>; -} - -/// Implements root node lookup for any [PathInfoService]. This represents a flat -/// directory structure like /nix/store where each entry in the root filesystem -/// directory corresponds to a CA node. -#[async_trait] -impl<T> RootNodes for T -where - T: Deref<Target = dyn PathInfoService> + Send + Sync, -{ - async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> { - let Ok(store_path) = StorePath::from_bytes(name) else { - return Ok(None); - }; - - Ok(self - .deref() - .get(*store_path.digest()) - .await? - .map(|path_info| { - path_info - .node - .expect("missing root node") - .node - .expect("empty node") - })) - } - - fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send>> { - Box::pin(self.deref().list().map(|result| { - result.map(|path_info| { - path_info - .node - .expect("missing root node") - .node - .expect("empty node") - }) - })) - } -} diff --git a/tvix/store/src/fs/tests.rs b/tvix/store/src/fs/tests.rs deleted file mode 100644 index a3977c727505..000000000000 --- a/tvix/store/src/fs/tests.rs +++ /dev/null @@ -1,1171 +0,0 @@ -use futures::StreamExt; -use std::io::Cursor; -use std::os::unix::prelude::MetadataExt; -use std::path::Path; -use std::sync::Arc; -use tokio::{fs, io}; -use tokio_stream::wrappers::ReadDirStream; -use tvix_castore::blobservice::BlobService; -use tvix_castore::directoryservice::DirectoryService; - -use tempfile::TempDir; - -use crate::fs::{fuse::FuseDaemon, TvixStoreFs}; -use crate::pathinfoservice::PathInfoService; -use crate::proto::PathInfo; -use crate::tests::fixtures; -use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; -use tvix_castore::proto as castorepb; - -const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; -const BLOB_B_NAME: &str = "55555555555555555555555555555555-test"; -const HELLOWORLD_BLOB_NAME: &str = "66666666666666666666666666666666-test"; -const SYMLINK_NAME: &str = "11111111111111111111111111111111-test"; -const SYMLINK_NAME2: &str = "44444444444444444444444444444444-test"; -const DIRECTORY_WITH_KEEP_NAME: &str = "22222222222222222222222222222222-test"; -const DIRECTORY_COMPLICATED_NAME: &str = "33333333333333333333333333333333-test"; - -fn gen_svcs() -> ( - Arc<dyn BlobService>, - Arc<dyn DirectoryService>, - Arc<dyn PathInfoService>, -) { - let blob_service = gen_blob_service(); - let directory_service = gen_directory_service(); - let path_info_service = gen_pathinfo_service(blob_service.clone(), directory_service.clone()); - - (blob_service, directory_service, path_info_service) -} - -fn do_mount<P: AsRef<Path>>( - blob_service: Arc<dyn BlobService>, - directory_service: Arc<dyn DirectoryService>, - path_info_service: Arc<dyn PathInfoService>, - mountpoint: P, - list_root: bool, -) -> io::Result<FuseDaemon> { - let fs = TvixStoreFs::new( - blob_service, - directory_service, - path_info_service, - list_root, - ); - FuseDaemon::new(fs, mountpoint.as_ref(), 4) -} - -async fn populate_blob_a( - blob_service: &Arc<dyn BlobService>, - _directory_service: &Arc<dyn DirectoryService>, - path_info_service: &Arc<dyn PathInfoService>, -) { - // Upload BLOB_A - let mut bw = blob_service.open_write().await; - tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) - .await - .expect("must succeed uploading"); - bw.close().await.expect("must succeed closing"); - - // Create a PathInfo for it - let path_info = PathInfo { - node: Some(castorepb::Node { - node: Some(castorepb::node::Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), - size: fixtures::BLOB_A.len() as u64, - executable: false, - })), - }), - ..Default::default() - }; - path_info_service - .put(path_info) - .await - .expect("must succeed"); -} - -async fn populate_blob_b( - blob_service: &Arc<dyn BlobService>, - _directory_service: &Arc<dyn DirectoryService>, - path_info_service: &Arc<dyn PathInfoService>, -) { - // Upload BLOB_B - let mut bw = blob_service.open_write().await; - tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) - .await - .expect("must succeed uploading"); - bw.close().await.expect("must succeed closing"); - - // Create a PathInfo for it - let path_info = PathInfo { - node: Some(castorepb::Node { - node: Some(castorepb::node::Node::File(castorepb::FileNode { - name: BLOB_B_NAME.into(), - digest: fixtures::BLOB_B_DIGEST.clone().into(), - size: fixtures::BLOB_B.len() as u64, - executable: false, - })), - }), - ..Default::default() - }; - path_info_service - .put(path_info) - .await - .expect("must succeed"); -} - -/// adds a blob containing helloworld and marks it as executable -async fn populate_helloworld_blob( - blob_service: &Arc<dyn BlobService>, - _directory_service: &Arc<dyn DirectoryService>, - path_info_service: &Arc<dyn PathInfoService>, -) { - // Upload BLOB_B - let mut bw = blob_service.open_write().await; - tokio::io::copy( - &mut Cursor::new(fixtures::HELLOWORLD_BLOB_CONTENTS.to_vec()), - &mut bw, - ) - .await - .expect("must succeed uploading"); - bw.close().await.expect("must succeed closing"); - - // Create a PathInfo for it - let path_info = PathInfo { - node: Some(castorepb::Node { - node: Some(castorepb::node::Node::File(castorepb::FileNode { - name: HELLOWORLD_BLOB_NAME.into(), - digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), - size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, - executable: true, - })), - }), - ..Default::default() - }; - path_info_service - .put(path_info) - .await - .expect("must succeed"); -} - -async fn populate_symlink( - _blob_service: &Arc<dyn BlobService>, - _directory_service: &Arc<dyn DirectoryService>, - path_info_service: &Arc<dyn PathInfoService>, -) { - // Create a PathInfo for it - let path_info = PathInfo { - node: Some(castorepb::Node { - node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME.into(), - target: BLOB_A_NAME.into(), - })), - }), - ..Default::default() - }; - path_info_service - .put(path_info) - .await - .expect("must succeed"); -} - -/// This writes a symlink pointing to /nix/store/somewhereelse, -/// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. -async fn populate_symlink2( - _blob_service: &Arc<dyn BlobService>, - _directory_service: &Arc<dyn DirectoryService>, - path_info_service: &Arc<dyn PathInfoService>, -) { - // Create a PathInfo for it - let path_info = PathInfo { - node: Some(castorepb::Node { - node: Some(castorepb::node::Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME2.into(), - target: "/nix/store/somewhereelse".into(), - })), - }), - ..Default::default() - }; - path_info_service - .put(path_info) - .await - .expect("must succeed"); -} - -async fn populate_directory_with_keep( - blob_service: &Arc<dyn BlobService>, - directory_service: &Arc<dyn DirectoryService>, - path_info_service: &Arc<dyn PathInfoService>, -) { - // upload empty blob - let mut bw = blob_service.open_write().await; - assert_eq!( - fixtures::EMPTY_BLOB_DIGEST.as_slice(), - bw.close().await.expect("must succeed closing").as_slice(), - ); - - // upload directory - directory_service - .put(fixtures::DIRECTORY_WITH_KEEP.clone()) - .await - .expect("must succeed uploading"); - - // upload pathinfo - let path_info = PathInfo { - node: Some(castorepb::Node { - node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), - size: fixtures::DIRECTORY_WITH_KEEP.size(), - })), - }), - ..Default::default() - }; - path_info_service - .put(path_info) - .await - .expect("must succeed"); -} - -/// Insert [PathInfo] for DIRECTORY_WITH_KEEP, but don't provide the Directory -/// itself. -async fn populate_pathinfo_without_directory( - _: &Arc<dyn BlobService>, - _: &Arc<dyn DirectoryService>, - path_info_service: &Arc<dyn PathInfoService>, -) { - // upload pathinfo - let path_info = PathInfo { - node: Some(castorepb::Node { - node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), - size: fixtures::DIRECTORY_WITH_KEEP.size(), - })), - }), - ..Default::default() - }; - path_info_service - .put(path_info) - .await - .expect("must succeed"); -} - -/// Insert , but don't provide the blob .keep is pointing to -async fn populate_blob_a_without_blob( - _: &Arc<dyn BlobService>, - _: &Arc<dyn DirectoryService>, - path_info_service: &Arc<dyn PathInfoService>, -) { - // Create a PathInfo for blob A - let path_info = PathInfo { - node: Some(castorepb::Node { - node: Some(castorepb::node::Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), - size: fixtures::BLOB_A.len() as u64, - executable: false, - })), - }), - ..Default::default() - }; - path_info_service - .put(path_info) - .await - .expect("must succeed"); -} - -async fn populate_directory_complicated( - blob_service: &Arc<dyn BlobService>, - directory_service: &Arc<dyn DirectoryService>, - path_info_service: &Arc<dyn PathInfoService>, -) { - // upload empty blob - let mut bw = blob_service.open_write().await; - assert_eq!( - fixtures::EMPTY_BLOB_DIGEST.as_slice(), - bw.close().await.expect("must succeed closing").as_slice(), - ); - - // upload inner directory - directory_service - .put(fixtures::DIRECTORY_WITH_KEEP.clone()) - .await - .expect("must succeed uploading"); - - // uplodad parent directory - directory_service - .put(fixtures::DIRECTORY_COMPLICATED.clone()) - .await - .expect("must succeed uploading"); - - // upload pathinfo - let path_info = PathInfo { - node: Some(castorepb::Node { - node: Some(castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_COMPLICATED_NAME.into(), - digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), - size: fixtures::DIRECTORY_COMPLICATED.size(), - })), - }), - ..Default::default() - }; - path_info_service - .put(path_info) - .await - .expect("must succeed"); -} - -/// Ensure mounting itself doesn't fail -#[tokio::test] -async fn mount() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure listing the root isn't allowed -#[tokio::test] -async fn root() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - { - // read_dir succeeds, but getting the first element will fail. - let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed")); - - let err = it - .next() - .await - .expect("must be some") - .expect_err("must be err"); - assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); - } - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure listing the root is allowed if configured explicitly -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn root_with_listing() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - true, /* allow listing */ - ) - .expect("must succeed"); - - { - // read_dir succeeds, but getting the first element will fail. - let mut it = ReadDirStream::new(fs::read_dir(tmpdir).await.expect("must succeed")); - - let e = it - .next() - .await - .expect("must be some") - .expect("must succeed"); - - let metadata = e.metadata().await.expect("must succeed"); - assert!(metadata.is_file()); - assert!(metadata.permissions().readonly()); - assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); - } - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure we can stat a file at the root -#[tokio::test] -async fn stat_file_at_root() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(BLOB_A_NAME); - - // peek at the file metadata - let metadata = fs::metadata(p).await.expect("must succeed"); - - assert!(metadata.is_file()); - assert!(metadata.permissions().readonly()); - assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure we can read a file at the root -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn read_file_at_root() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(BLOB_A_NAME); - - // read the file contents - let data = fs::read(p).await.expect("must succeed"); - - // ensure size and contents match - assert_eq!(fixtures::BLOB_A.len(), data.len()); - assert_eq!(fixtures::BLOB_A.to_vec(), data); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure we can read a large file at the root -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn read_large_file_at_root() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_b(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(BLOB_B_NAME); - { - // peek at the file metadata - let metadata = fs::metadata(&p).await.expect("must succeed"); - - assert!(metadata.is_file()); - assert!(metadata.permissions().readonly()); - assert_eq!(fixtures::BLOB_B.len() as u64, metadata.len()); - } - - // read the file contents - let data = fs::read(p).await.expect("must succeed"); - - // ensure size and contents match - assert_eq!(fixtures::BLOB_B.len(), data.len()); - assert_eq!(fixtures::BLOB_B.to_vec(), data); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Read the target of a symlink -#[tokio::test] -async fn symlink_readlink() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_symlink(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(SYMLINK_NAME); - - let target = fs::read_link(&p).await.expect("must succeed"); - assert_eq!(BLOB_A_NAME, target.to_str().unwrap()); - - // peek at the file metadata, which follows symlinks. - // this must fail, as we didn't populate the target. - let e = fs::metadata(&p).await.expect_err("must fail"); - assert_eq!(std::io::ErrorKind::NotFound, e.kind()); - - // peeking at the file metadata without following symlinks will succeed. - let metadata = fs::symlink_metadata(&p).await.expect("must succeed"); - assert!(metadata.is_symlink()); - - // reading from the symlink (which follows) will fail, because the target doesn't exist. - let e = fs::read(p).await.expect_err("must fail"); - assert_eq!(std::io::ErrorKind::NotFound, e.kind()); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Read and stat a regular file through a symlink pointing to it. -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn read_stat_through_symlink() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - populate_symlink(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p_symlink = tmpdir.path().join(SYMLINK_NAME); - let p_blob = tmpdir.path().join(SYMLINK_NAME); - - // peek at the file metadata, which follows symlinks. - // this must now return the same metadata as when statting at the target directly. - let metadata_symlink = fs::metadata(&p_symlink).await.expect("must succeed"); - let metadata_blob = fs::metadata(&p_blob).await.expect("must succeed"); - assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type()); - assert_eq!(metadata_blob.len(), metadata_symlink.len()); - - // reading from the symlink (which follows) will return the same data as if - // we were reading from the file directly. - assert_eq!( - fs::read(p_blob).await.expect("must succeed"), - fs::read(p_symlink).await.expect("must succeed"), - ); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Read a directory in the root, and validate some attributes. -#[tokio::test] -async fn read_stat_directory() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); - - // peek at the metadata of the directory - let metadata = fs::metadata(p).await.expect("must succeed"); - assert!(metadata.is_dir()); - assert!(metadata.permissions().readonly()); - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -/// Read a blob inside a directory. This ensures we successfully populate directory data. -async fn read_blob_inside_dir() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep"); - - // peek at metadata. - let metadata = fs::metadata(&p).await.expect("must succeed"); - assert!(metadata.is_file()); - assert!(metadata.permissions().readonly()); - - // read from it - let data = fs::read(&p).await.expect("must succeed"); - assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -/// Read a blob inside a directory inside a directory. This ensures we properly -/// populate directories as we traverse down the structure. -async fn read_blob_deep_inside_dir() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir - .path() - .join(DIRECTORY_COMPLICATED_NAME) - .join("keep") - .join(".keep"); - - // peek at metadata. - let metadata = fs::metadata(&p).await.expect("must succeed"); - assert!(metadata.is_file()); - assert!(metadata.permissions().readonly()); - - // read from it - let data = fs::read(&p).await.expect("must succeed"); - assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure readdir works. -#[tokio::test] -async fn readdir() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME); - - { - // read_dir should succeed. Collect all elements - let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed")) - .map(|e| e.expect("must not be err")) - .collect() - .await; - - assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and .. - - // We explicitly look at specific positions here, because we always emit - // them ordered. - - // ".keep", 0 byte file. - let e = &elements[0]; - assert_eq!(".keep", e.file_name()); - assert!(e.file_type().await.expect("must succeed").is_file()); - assert_eq!(0, e.metadata().await.expect("must succeed").len()); - - // "aa", symlink. - let e = &elements[1]; - assert_eq!("aa", e.file_name()); - assert!(e.file_type().await.expect("must succeed").is_symlink()); - - // "keep", directory - let e = &elements[2]; - assert_eq!("keep", e.file_name()); - assert!(e.file_type().await.expect("must succeed").is_dir()); - } - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test] -/// Do a readdir deeper inside a directory, without doing readdir or stat in the parent directory. -async fn readdir_deep() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); - - { - // read_dir should succeed. Collect all elements - let elements: Vec<_> = ReadDirStream::new(fs::read_dir(p).await.expect("must succeed")) - .map(|e| e.expect("must not be err")) - .collect() - .await; - - assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and .. - - // ".keep", 0 byte file. - let e = &elements[0]; - assert_eq!(".keep", e.file_name()); - assert!(e.file_type().await.expect("must succeed").is_file()); - assert_eq!(0, e.metadata().await.expect("must succeed").len()); - } - - fuse_daemon.unmount().expect("unmount"); -} - -/// Check attributes match how they show up in /nix/store normally. -#[tokio::test] -async fn check_attributes() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - populate_symlink(&blob_service, &directory_service, &path_info_service).await; - populate_helloworld_blob(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p_file = tmpdir.path().join(BLOB_A_NAME); - let p_directory = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); - let p_symlink = tmpdir.path().join(SYMLINK_NAME); - let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME); - - // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident. - let metadata_file = fs::symlink_metadata(&p_file).await.expect("must succeed"); - let metadata_executable_file = fs::symlink_metadata(&p_executable_file) - .await - .expect("must succeed"); - let metadata_directory = fs::symlink_metadata(&p_directory) - .await - .expect("must succeed"); - let metadata_symlink = fs::symlink_metadata(&p_symlink) - .await - .expect("must succeed"); - - // modes should match. We & with 0o777 to remove any higher bits. - assert_eq!(0o444, metadata_file.mode() & 0o777); - assert_eq!(0o555, metadata_executable_file.mode() & 0o777); - assert_eq!(0o555, metadata_directory.mode() & 0o777); - assert_eq!(0o444, metadata_symlink.mode() & 0o777); - - // files should have the correct filesize - assert_eq!(fixtures::BLOB_A.len() as u64, metadata_file.len()); - // directories should have their "size" as filesize - assert_eq!( - { fixtures::DIRECTORY_WITH_KEEP.size() }, - metadata_directory.size() - ); - - for metadata in &[&metadata_file, &metadata_directory, &metadata_symlink] { - // uid and gid should be 0. - assert_eq!(0, metadata.uid()); - assert_eq!(0, metadata.gid()); - - // all times should be set to the unix epoch. - assert_eq!(0, metadata.atime()); - assert_eq!(0, metadata.mtime()); - assert_eq!(0, metadata.ctime()); - // crtime seems MacOS only - } - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test] -/// Ensure we allocate the same inodes for the same directory contents. -/// $DIRECTORY_COMPLICATED_NAME/keep contains the same data as $DIRECTORY_WITH_KEEP. -async fn compare_inodes_directories() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_with_keep(&blob_service, &directory_service, &path_info_service).await; - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p_dir_with_keep = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); - let p_sibling_dir = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); - - // peek at metadata. - assert_eq!( - fs::metadata(p_dir_with_keep) - .await - .expect("must succeed") - .ino(), - fs::metadata(p_sibling_dir) - .await - .expect("must succeed") - .ino() - ); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure we allocate the same inodes for the same directory contents. -/// $DIRECTORY_COMPLICATED_NAME/keep/,keep contains the same data as $DIRECTORY_COMPLICATED_NAME/.keep -#[tokio::test] -async fn compare_inodes_files() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p_keep1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join(".keep"); - let p_keep2 = tmpdir - .path() - .join(DIRECTORY_COMPLICATED_NAME) - .join("keep") - .join(".keep"); - - // peek at metadata. - assert_eq!( - fs::metadata(p_keep1).await.expect("must succeed").ino(), - fs::metadata(p_keep2).await.expect("must succeed").ino() - ); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Ensure we allocate the same inode for symlinks pointing to the same targets. -/// $DIRECTORY_COMPLICATED_NAME/aa points to the same target as SYMLINK_NAME2. -#[tokio::test] -async fn compare_inodes_symlinks() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_directory_complicated(&blob_service, &directory_service, &path_info_service).await; - populate_symlink2(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("aa"); - let p2 = tmpdir.path().join(SYMLINK_NAME2); - - // peek at metadata. - assert_eq!( - fs::symlink_metadata(p1).await.expect("must succeed").ino(), - fs::symlink_metadata(p2).await.expect("must succeed").ino() - ); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Check we match paths exactly. -#[tokio::test] -async fn read_wrong_paths_in_root() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - // wrong name - assert!( - fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) - .await - .is_err() - ); - - // invalid hash - assert!( - fs::metadata(tmpdir.path().join("0000000000000000000000000000000-test")) - .await - .is_err() - ); - - // right name, must exist - assert!( - fs::metadata(tmpdir.path().join("00000000000000000000000000000000-test")) - .await - .is_ok() - ); - - // now wrong name with right hash still may not exist - assert!( - fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) - .await - .is_err() - ); - - fuse_daemon.unmount().expect("unmount"); -} - -/// Make sure writes are not allowed -#[tokio::test] -async fn disallow_writes() { - // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(BLOB_A_NAME); - let e = fs::File::create(p).await.expect_err("must fail"); - - assert_eq!(Some(libc::EROFS), e.raw_os_error()); - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test] -/// Ensure we get an IO error if the directory service does not have the Directory object. -async fn missing_directory() { - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_pathinfo_without_directory(&blob_service, &directory_service, &path_info_service) - .await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); - - { - // `stat` on the path should succeed, because it doesn't trigger the directory request. - fs::metadata(&p).await.expect("must succeed"); - - // However, calling either `readdir` or `stat` on a child should fail with an IO error. - // It fails when trying to pull the first entry, because we don't implement opendir separately - ReadDirStream::new(fs::read_dir(&p).await.unwrap()) - .next() - .await - .expect("must be some") - .expect_err("must be err"); - - // rust currently sets e.kind() to Uncategorized, which isn't very - // helpful, so we don't look at the error more closely than that.. - fs::metadata(p.join(".keep")).await.expect_err("must fail"); - } - - fuse_daemon.unmount().expect("unmount"); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -/// Ensure we get an IO error if the blob service does not have the blob -async fn missing_blob() { - if !std::path::Path::new("/dev/fuse").exists() { - eprintln!("skipping test"); - return; - } - let tmpdir = TempDir::new().unwrap(); - - let (blob_service, directory_service, path_info_service) = gen_svcs(); - populate_blob_a_without_blob(&blob_service, &directory_service, &path_info_service).await; - - let mut fuse_daemon = do_mount( - blob_service, - directory_service, - path_info_service, - tmpdir.path(), - false, - ) - .expect("must succeed"); - - let p = tmpdir.path().join(BLOB_A_NAME); - - { - // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service. - fs::metadata(&p).await.expect("must succeed"); - - // However, calling read on the blob should fail. - // rust currently sets e.kind() to Uncategorized, which isn't very - // helpful, so we don't look at the error more closely than that.. - fs::read(p).await.expect_err("must fail"); - } - - fuse_daemon.unmount().expect("unmount"); -} diff --git a/tvix/store/src/fs/virtiofs.rs b/tvix/store/src/fs/virtiofs.rs deleted file mode 100644 index 846270d28568..000000000000 --- a/tvix/store/src/fs/virtiofs.rs +++ /dev/null @@ -1,237 +0,0 @@ -use std::{ - convert, error, fmt, io, - ops::Deref, - path::Path, - sync::{Arc, MutexGuard, RwLock}, -}; - -use fuse_backend_rs::{ - api::{filesystem::FileSystem, server::Server}, - transport::{FsCacheReqHandler, Reader, VirtioFsWriter}, -}; -use tracing::error; -use vhost::vhost_user::{ - Listener, SlaveFsCacheReq, VhostUserProtocolFeatures, VhostUserVirtioFeatures, -}; -use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringMutex, VringState, VringT}; -use virtio_bindings::bindings::virtio_ring::{ - VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC, -}; -use virtio_queue::QueueT; -use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; -use vmm_sys_util::epoll::EventSet; - -const VIRTIO_F_VERSION_1: u32 = 32; -const NUM_QUEUES: usize = 2; -const QUEUE_SIZE: usize = 1024; - -#[derive(Debug)] -enum Error { - /// Failed to handle non-input event. - HandleEventNotEpollIn, - /// Failed to handle unknown event. - HandleEventUnknownEvent, - /// Invalid descriptor chain. - InvalidDescriptorChain, - /// Failed to handle filesystem requests. - HandleRequests(fuse_backend_rs::Error), - /// Failed to construct new vhost user daemon. - NewDaemon, - /// Failed to start the vhost user daemon. - StartDaemon, - /// Failed to wait for the vhost user daemon. - WaitDaemon, -} - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "vhost_user_fs_error: {self:?}") - } -} - -impl error::Error for Error {} - -impl convert::From<Error> for io::Error { - fn from(e: Error) -> Self { - io::Error::new(io::ErrorKind::Other, e) - } -} - -struct VhostUserFsBackend<FS> -where - FS: FileSystem + Send + Sync, -{ - server: Arc<Server<Arc<FS>>>, - event_idx: bool, - guest_mem: GuestMemoryAtomic<GuestMemoryMmap>, - cache_req: Option<SlaveFsCacheReq>, -} - -impl<FS> VhostUserFsBackend<FS> -where - FS: FileSystem + Send + Sync, -{ - fn process_queue(&mut self, vring: &mut MutexGuard<VringState>) -> std::io::Result<bool> { - let mut used_descs = false; - - while let Some(desc_chain) = vring - .get_queue_mut() - .pop_descriptor_chain(self.guest_mem.memory()) - { - let memory = desc_chain.memory(); - let reader = Reader::from_descriptor_chain(memory, desc_chain.clone()) - .map_err(|_| Error::InvalidDescriptorChain)?; - let writer = VirtioFsWriter::new(memory, desc_chain.clone()) - .map_err(|_| Error::InvalidDescriptorChain)?; - - self.server - .handle_message( - reader, - writer.into(), - self.cache_req - .as_mut() - .map(|req| req as &mut dyn FsCacheReqHandler), - None, - ) - .map_err(Error::HandleRequests)?; - - // TODO: Is len 0 correct? - if let Err(error) = vring - .get_queue_mut() - .add_used(memory, desc_chain.head_index(), 0) - { - error!(?error, "failed to add desc back to ring"); - } - - // TODO: What happens if we error out before here? - used_descs = true; - } - - let needs_notification = if self.event_idx { - match vring - .get_queue_mut() - .needs_notification(self.guest_mem.memory().deref()) - { - Ok(needs_notification) => needs_notification, - Err(error) => { - error!(?error, "failed to check if queue needs notification"); - true - } - } - } else { - true - }; - - if needs_notification { - if let Err(error) = vring.signal_used_queue() { - error!(?error, "failed to signal used queue"); - } - } - - Ok(used_descs) - } -} - -impl<FS> VhostUserBackendMut<VringMutex> for VhostUserFsBackend<FS> -where - FS: FileSystem + Send + Sync, -{ - fn num_queues(&self) -> usize { - NUM_QUEUES - } - - fn max_queue_size(&self) -> usize { - QUEUE_SIZE - } - - fn features(&self) -> u64 { - 1 << VIRTIO_F_VERSION_1 - | 1 << VIRTIO_RING_F_INDIRECT_DESC - | 1 << VIRTIO_RING_F_EVENT_IDX - | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() - } - - fn protocol_features(&self) -> VhostUserProtocolFeatures { - VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::SLAVE_REQ - } - - fn set_event_idx(&mut self, enabled: bool) { - self.event_idx = enabled; - } - - fn update_memory(&mut self, _mem: GuestMemoryAtomic<GuestMemoryMmap>) -> std::io::Result<()> { - // This is what most the vhost user implementations do... - Ok(()) - } - - fn set_slave_req_fd(&mut self, cache_req: SlaveFsCacheReq) { - self.cache_req = Some(cache_req); - } - - fn handle_event( - &mut self, - device_event: u16, - evset: vmm_sys_util::epoll::EventSet, - vrings: &[VringMutex], - _thread_id: usize, - ) -> std::io::Result<bool> { - if evset != EventSet::IN { - return Err(Error::HandleEventNotEpollIn.into()); - } - - let mut queue = match device_event { - // High priority queue - 0 => vrings[0].get_mut(), - // Regurlar priority queue - 1 => vrings[1].get_mut(), - _ => { - return Err(Error::HandleEventUnknownEvent.into()); - } - }; - - if self.event_idx { - loop { - queue - .get_queue_mut() - .enable_notification(self.guest_mem.memory().deref()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - if !self.process_queue(&mut queue)? { - break; - } - } - } else { - self.process_queue(&mut queue)?; - } - - Ok(false) - } -} - -pub fn start_virtiofs_daemon<FS, P>(fs: FS, socket: P) -> io::Result<()> -where - FS: FileSystem + Send + Sync + 'static, - P: AsRef<Path>, -{ - let guest_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); - - let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); - - let backend = Arc::new(RwLock::new(VhostUserFsBackend { - server, - guest_mem: guest_mem.clone(), - event_idx: false, - cache_req: None, - })); - - let listener = Listener::new(socket, true).unwrap(); - - let mut fs_daemon = - VhostUserDaemon::new(String::from("vhost-user-fs-tvix-store"), backend, guest_mem) - .map_err(|_| Error::NewDaemon)?; - - fs_daemon.start(listener).map_err(|_| Error::StartDaemon)?; - - fs_daemon.wait().map_err(|_| Error::WaitDaemon)?; - - Ok(()) -} |