diff options
Diffstat (limited to 'tvix/castore/src/fs')
-rw-r--r-- | tvix/castore/src/fs/fuse/mod.rs (renamed from tvix/castore/src/fs/fuse.rs) | 59 | ||||
-rw-r--r-- | tvix/castore/src/fs/fuse/tests.rs (renamed from tvix/castore/src/fs/tests.rs) | 168 | ||||
-rw-r--r-- | tvix/castore/src/fs/inodes.rs | 33 | ||||
-rw-r--r-- | tvix/castore/src/fs/mod.rs | 132 | ||||
-rw-r--r-- | tvix/castore/src/fs/root_nodes.rs | 22 |
5 files changed, 218 insertions, 196 deletions
diff --git a/tvix/castore/src/fs/fuse.rs b/tvix/castore/src/fs/fuse/mod.rs index cd50618ff5bc..64ef29ed2aa1 100644 --- a/tvix/castore/src/fs/fuse.rs +++ b/tvix/castore/src/fs/fuse/mod.rs @@ -1,8 +1,13 @@ -use std::{io, path::Path, sync::Arc, thread}; +use std::{io, path::Path, sync::Arc}; use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; +use parking_lot::Mutex; +use threadpool::ThreadPool; use tracing::{error, instrument}; +#[cfg(test)] +mod tests; + struct FuseServer<FS> where FS: FileSystem + Sync + Send, @@ -46,9 +51,12 @@ where } } +/// Starts a [Filesystem] with the specified number of threads, and provides +/// functions to unmount, and wait for it to have completed. +#[derive(Clone)] pub struct FuseDaemon { - session: FuseSession, - threads: Vec<thread::JoinHandle<()>>, + session: Arc<Mutex<FuseSession>>, + threads: Arc<ThreadPool>, } impl FuseDaemon { @@ -56,7 +64,7 @@ impl FuseDaemon { pub fn new<FS, P>( fs: FS, mountpoint: P, - threads: usize, + num_threads: usize, allow_other: bool, ) -> Result<Self, io::Error> where @@ -73,40 +81,49 @@ impl FuseDaemon { session .mount() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - let mut join_handles = Vec::with_capacity(threads); - for _ in 0..threads { + + // construct a thread pool + let threads = threadpool::Builder::new() + .num_threads(num_threads) + .thread_name("fuse_server".to_string()) + .build(); + + for _ in 0..num_threads { + // for each thread requested, create and start a FuseServer accepting requests. let mut server = FuseServer { server: server.clone(), channel: session .new_channel() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, }; - let join_handle = thread::Builder::new() - .name("fuse_server".to_string()) - .spawn(move || { - let _ = server.start(); - })?; - join_handles.push(join_handle); + + threads.execute(move || { + let _ = server.start(); + }); } Ok(FuseDaemon { - session, - threads: join_handles, + session: Arc::new(Mutex::new(session)), + threads: Arc::new(threads), }) } + /// Waits for all threads to finish. + #[instrument(skip_all)] + pub fn wait(&self) { + self.threads.join() + } + + /// Send the unmount command, and waits for all threads to finish. #[instrument(skip_all, err)] - pub fn unmount(&mut self) -> Result<(), io::Error> { + pub fn unmount(&self) -> Result<(), io::Error> { + // Send the unmount command. self.session + .lock() .umount() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - for thread in self.threads.drain(..) { - thread.join().map_err(|_| { - io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") - })?; - } - + self.wait(); Ok(()) } } diff --git a/tvix/castore/src/fs/tests.rs b/tvix/castore/src/fs/fuse/tests.rs index d6eeb8a4113d..0d68af090daf 100644 --- a/tvix/castore/src/fs/tests.rs +++ b/tvix/castore/src/fs/fuse/tests.rs @@ -1,5 +1,4 @@ use bstr::ByteSlice; -use bytes::Bytes; use std::{ collections::BTreeMap, ffi::{OsStr, OsString}, @@ -11,13 +10,15 @@ use std::{ use tempfile::TempDir; use tokio_stream::{wrappers::ReadDirStream, StreamExt}; -use super::{fuse::FuseDaemon, TvixStoreFs}; -use crate::proto as castorepb; -use crate::proto::node::Node; +use super::FuseDaemon; use crate::{ blobservice::{BlobService, MemoryBlobService}, directoryservice::{DirectoryService, MemoryDirectoryService}, - fixtures, + fixtures, Node, +}; +use crate::{ + fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST}, + PathComponent, }; const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; @@ -38,14 +39,14 @@ fn gen_svcs() -> (Arc<dyn BlobService>, Arc<dyn DirectoryService>) { fn do_mount<P: AsRef<Path>, BS, DS>( blob_service: BS, directory_service: DS, - root_nodes: BTreeMap<bytes::Bytes, Node>, + root_nodes: BTreeMap<PathComponent, Node>, mountpoint: P, list_root: bool, show_xattr: bool, ) -> io::Result<FuseDaemon> where - BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, - DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static, + BS: BlobService + Send + Sync + Clone + 'static, + DS: DirectoryService + Send + Sync + Clone + 'static, { let fs = TvixStoreFs::new( blob_service, @@ -59,7 +60,7 @@ where async fn populate_blob_a( blob_service: &Arc<dyn BlobService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { let mut bw = blob_service.open_write().await; tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) @@ -68,19 +69,18 @@ async fn populate_blob_a( bw.close().await.expect("must succeed closing"); root_nodes.insert( - BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), + BLOB_A_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::BLOB_A_DIGEST.clone(), size: fixtures::BLOB_A.len() as u64, executable: false, - }), + }, ); } async fn populate_blob_b( blob_service: &Arc<dyn BlobService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { let mut bw = blob_service.open_write().await; tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) @@ -89,20 +89,19 @@ async fn populate_blob_b( bw.close().await.expect("must succeed closing"); root_nodes.insert( - BLOB_B_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_B_NAME.into(), - digest: fixtures::BLOB_B_DIGEST.clone().into(), + BLOB_B_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::BLOB_B_DIGEST.clone(), size: fixtures::BLOB_B.len() as u64, executable: false, - }), + }, ); } /// adds a blob containing helloworld and marks it as executable async fn populate_blob_helloworld( blob_service: &Arc<dyn BlobService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { let mut bw = blob_service.open_write().await; tokio::io::copy( @@ -114,42 +113,39 @@ async fn populate_blob_helloworld( bw.close().await.expect("must succeed closing"); root_nodes.insert( - HELLOWORLD_BLOB_NAME.into(), - Node::File(castorepb::FileNode { - name: HELLOWORLD_BLOB_NAME.into(), - digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), + HELLOWORLD_BLOB_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone(), size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, executable: true, - }), + }, ); } -async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_symlink(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - SYMLINK_NAME.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME.into(), - target: BLOB_A_NAME.into(), - }), + SYMLINK_NAME.try_into().unwrap(), + Node::Symlink { + target: BLOB_A_NAME.try_into().unwrap(), + }, ); } /// This writes a symlink pointing to /nix/store/somewhereelse, /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. -async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_symlink2(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - SYMLINK_NAME2.into(), - Node::Symlink(castorepb::SymlinkNode { - name: SYMLINK_NAME2.into(), - target: "/nix/store/somewhereelse".into(), - }), + SYMLINK_NAME2.try_into().unwrap(), + Node::Symlink { + target: "/nix/store/somewhereelse".try_into().unwrap(), + }, ); } async fn populate_directory_with_keep( blob_service: &Arc<dyn BlobService>, directory_service: &Arc<dyn DirectoryService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { // upload empty blob let mut bw = blob_service.open_write().await; @@ -165,45 +161,42 @@ async fn populate_directory_with_keep( .expect("must succeed uploading"); root_nodes.insert( - DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + DIRECTORY_WITH_KEEP_NAME.try_into().unwrap(), + Node::Directory { + digest: fixtures::DIRECTORY_WITH_KEEP.digest(), size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + }, ); } /// Create a root node for DIRECTORY_WITH_KEEP, but don't upload the Directory /// itself. -async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - DIRECTORY_WITH_KEEP_NAME.into(), - castorepb::node::Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_WITH_KEEP_NAME.into(), - digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + DIRECTORY_WITH_KEEP_NAME.try_into().unwrap(), + Node::Directory { + digest: fixtures::DIRECTORY_WITH_KEEP.digest(), size: fixtures::DIRECTORY_WITH_KEEP.size(), - }), + }, ); } /// Insert BLOB_A, but don't provide the blob .keep is pointing to. -async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) { +async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<PathComponent, Node>) { root_nodes.insert( - BLOB_A_NAME.into(), - Node::File(castorepb::FileNode { - name: BLOB_A_NAME.into(), - digest: fixtures::BLOB_A_DIGEST.clone().into(), + BLOB_A_NAME.try_into().unwrap(), + Node::File { + digest: fixtures::BLOB_A_DIGEST.clone(), size: fixtures::BLOB_A.len() as u64, executable: false, - }), + }, ); } async fn populate_directory_complicated( blob_service: &Arc<dyn BlobService>, directory_service: &Arc<dyn DirectoryService>, - root_nodes: &mut BTreeMap<Bytes, Node>, + root_nodes: &mut BTreeMap<PathComponent, Node>, ) { // upload empty blob let mut bw = blob_service.open_write().await; @@ -225,12 +218,11 @@ async fn populate_directory_complicated( .expect("must succeed uploading"); root_nodes.insert( - DIRECTORY_COMPLICATED_NAME.into(), - Node::Directory(castorepb::DirectoryNode { - name: DIRECTORY_COMPLICATED_NAME.into(), - digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), + DIRECTORY_COMPLICATED_NAME.try_into().unwrap(), + Node::Directory { + digest: fixtures::DIRECTORY_COMPLICATED.digest(), size: fixtures::DIRECTORY_COMPLICATED.size(), - }), + }, ); } @@ -247,7 +239,7 @@ async fn mount() { let (blob_service, directory_service) = gen_svcs(); - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, BTreeMap::default(), @@ -270,7 +262,7 @@ async fn root() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service) = gen_svcs(); - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, BTreeMap::default(), @@ -304,7 +296,7 @@ async fn root_with_listing() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -348,7 +340,7 @@ async fn stat_file_at_root() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -385,7 +377,7 @@ async fn read_file_at_root() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -422,7 +414,7 @@ async fn read_large_file_at_root() { populate_blob_b(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -467,7 +459,7 @@ async fn symlink_readlink() { populate_symlink(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -514,7 +506,7 @@ async fn read_stat_through_symlink() { populate_blob_a(&blob_service, &mut root_nodes).await; populate_symlink(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -559,7 +551,7 @@ async fn read_stat_directory() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -596,7 +588,7 @@ async fn xattr() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -614,12 +606,12 @@ async fn xattr() { // There should be 1 key, XATTR_NAME_DIRECTORY_DIGEST. assert_eq!(1, xattr_names.len(), "there should be 1 xattr name"); assert_eq!( - super::XATTR_NAME_DIRECTORY_DIGEST, + XATTR_NAME_DIRECTORY_DIGEST, xattr_names.first().unwrap().as_encoded_bytes() ); // The key should equal to the string-formatted b3 digest. - let val = xattr::get(&p, OsStr::from_bytes(super::XATTR_NAME_DIRECTORY_DIGEST)) + let val = xattr::get(&p, OsStr::from_bytes(XATTR_NAME_DIRECTORY_DIGEST)) .expect("must succeed") .expect("must be some"); assert_eq!( @@ -643,12 +635,12 @@ async fn xattr() { // There should be 1 key, XATTR_NAME_BLOB_DIGEST. assert_eq!(1, xattr_names.len(), "there should be 1 xattr name"); assert_eq!( - super::XATTR_NAME_BLOB_DIGEST, + XATTR_NAME_BLOB_DIGEST, xattr_names.first().unwrap().as_encoded_bytes() ); // The key should equal to the string-formatted b3 digest. - let val = xattr::get(&p, OsStr::from_bytes(super::XATTR_NAME_BLOB_DIGEST)) + let val = xattr::get(&p, OsStr::from_bytes(XATTR_NAME_BLOB_DIGEST)) .expect("must succeed") .expect("must be some"); assert_eq!( @@ -679,7 +671,7 @@ async fn read_blob_inside_dir() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -719,7 +711,7 @@ async fn read_blob_deep_inside_dir() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -762,7 +754,7 @@ async fn readdir() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -822,7 +814,7 @@ async fn readdir_deep() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -872,7 +864,7 @@ async fn check_attributes() { populate_symlink(&mut root_nodes).await; populate_blob_helloworld(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -947,7 +939,7 @@ async fn compare_inodes_directories() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -991,7 +983,7 @@ async fn compare_inodes_files() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1040,7 +1032,7 @@ async fn compare_inodes_symlinks() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; populate_symlink2(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1083,7 +1075,7 @@ async fn read_wrong_paths_in_root() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1138,7 +1130,7 @@ async fn disallow_writes() { let (blob_service, directory_service) = gen_svcs(); let root_nodes = BTreeMap::default(); - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1170,7 +1162,7 @@ async fn missing_directory() { populate_directorynode_without_directory(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1218,7 +1210,7 @@ async fn missing_blob() { populate_filenode_without_blob(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs index bdd459543470..2696fdede378 100644 --- a/tvix/castore/src/fs/inodes.rs +++ b/tvix/castore/src/fs/inodes.rs @@ -2,10 +2,7 @@ //! about inodes, which present tvix-castore nodes in a filesystem. use std::time::Duration; -use bytes::Bytes; - -use crate::proto as castorepb; -use crate::B3Digest; +use crate::{path::PathComponent, B3Digest, Node}; #[derive(Clone, Debug)] pub enum InodeData { @@ -20,27 +17,23 @@ pub enum InodeData { /// lookup and did fetch the data. #[derive(Clone, Debug)] pub enum DirectoryInodeData { - Sparse(B3Digest, u64), // digest, size - Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)] + Sparse(B3Digest, u64), // digest, size + Populated(B3Digest, Vec<(u64, PathComponent, Node)>), // [(child_inode, name, node)] } impl InodeData { /// Constructs a new InodeData by consuming a [Node]. - /// It splits off the orginal name, so it can be used later. - pub fn from_node(node: castorepb::node::Node) -> (Self, Bytes) { + pub fn from_node(node: &Node) -> Self { match node { - castorepb::node::Node::Directory(n) => ( - Self::Directory(DirectoryInodeData::Sparse( - n.digest.try_into().unwrap(), - n.size, - )), - n.name, - ), - castorepb::node::Node::File(n) => ( - Self::Regular(n.digest.try_into().unwrap(), n.size, n.executable), - n.name, - ), - castorepb::node::Node::Symlink(n) => (Self::Symlink(n.target), n.name), + Node::Directory { digest, size } => { + Self::Directory(DirectoryInodeData::Sparse(digest.clone(), *size)) + } + Node::File { + digest, + size, + executable, + } => Self::Regular(digest.clone(), *size, *executable), + Node::Symlink { target } => Self::Symlink(target.clone().into()), } } diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index 826523131fbd..4f50868b8f44 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -9,24 +9,19 @@ pub mod fuse; #[cfg(feature = "virtiofs")] pub mod virtiofs; -#[cfg(test)] -mod tests; - pub use self::root_nodes::RootNodes; use self::{ file_attr::ROOT_FILE_ATTR, inode_tracker::InodeTracker, inodes::{DirectoryInodeData, InodeData}, }; -use crate::proto as castorepb; use crate::{ blobservice::{BlobReader, BlobService}, directoryservice::DirectoryService, - proto::{node::Node, NamedNode}, - B3Digest, + path::PathComponent, + B3Digest, Node, }; use bstr::ByteVec; -use bytes::Bytes; use fuse_backend_rs::abi::fuse_abi::{stat64, OpenOptions}; use fuse_backend_rs::api::filesystem::{ Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID, @@ -46,7 +41,7 @@ use tokio::{ io::{AsyncReadExt, AsyncSeekExt}, sync::mpsc, }; -use tracing::{debug, error, instrument, warn, Span}; +use tracing::{debug, error, instrument, warn, Instrument as _, Span}; /// This implements a read-only FUSE filesystem for a tvix-store /// with the passed [BlobService], [DirectoryService] and [RootNodes]. @@ -92,7 +87,7 @@ pub struct TvixStoreFs<BS, DS, RN> { show_xattr: bool, /// This maps a given basename in the root to the inode we allocated for the node. - root_nodes: RwLock<HashMap<Bytes, u64>>, + root_nodes: RwLock<HashMap<PathComponent, u64>>, /// This keeps track of inodes and data alongside them. inode_tracker: RwLock<InodeTracker>, @@ -108,7 +103,7 @@ pub struct TvixStoreFs<BS, DS, RN> { u64, ( Span, - Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>, + Arc<Mutex<mpsc::Receiver<(usize, Result<(PathComponent, Node), crate::Error>)>>>, ), >, >, @@ -126,8 +121,8 @@ pub struct TvixStoreFs<BS, DS, RN> { impl<BS, DS, RN> TvixStoreFs<BS, DS, RN> where - BS: AsRef<dyn BlobService> + Clone + Send, - DS: AsRef<dyn DirectoryService> + Clone + Send + 'static, + BS: BlobService + Clone + Send, + DS: DirectoryService + Clone + Send + 'static, RN: RootNodes + Clone + 'static, { pub fn new( @@ -159,7 +154,7 @@ where /// Retrieves the inode for a given root node basename, if present. /// This obtains a read lock on self.root_nodes. - fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> { + fn get_inode_for_root_name(&self, name: &PathComponent) -> Option<u64> { self.root_nodes.read().get(name).cloned() } @@ -170,8 +165,12 @@ where /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup /// in self.directory_service is performed, and self.inode_tracker is updated with the /// [DirectoryInodeData::Populated]. + #[allow(clippy::type_complexity)] #[instrument(skip(self), err)] - fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> { + fn get_directory_children( + &self, + ino: u64, + ) -> io::Result<(B3Digest, Vec<(u64, PathComponent, Node)>)> { let data = self.inode_tracker.read().get(ino).unwrap(); match *data { // if it's populated already, return children. @@ -187,7 +186,7 @@ where .block_on({ let directory_service = self.directory_service.clone(); let parent_digest = parent_digest.to_owned(); - async move { directory_service.as_ref().get(&parent_digest).await } + async move { directory_service.get(&parent_digest).await } })? .ok_or_else(|| { warn!(directory.digest=%parent_digest, "directory not found"); @@ -201,13 +200,13 @@ where let children = { let mut inode_tracker = self.inode_tracker.write(); - let children: Vec<(u64, castorepb::node::Node)> = directory - .nodes() - .map(|child_node| { - let (inode_data, _) = InodeData::from_node(child_node.clone()); + let children: Vec<(u64, PathComponent, Node)> = directory + .into_nodes() + .map(|(child_name, child_node)| { + let inode_data = InodeData::from_node(&child_node); let child_ino = inode_tracker.put(inode_data); - (child_ino, child_node) + (child_ino, child_name, child_node) }) .collect(); @@ -241,12 +240,12 @@ where /// In the case the name can't be found, a libc::ENOENT is returned. fn name_in_root_to_ino_and_data( &self, - name: &std::ffi::CStr, + name: &PathComponent, ) -> io::Result<(u64, Arc<InodeData>)> { // Look up the inode for that root node. // If there's one, [self.inode_tracker] MUST also contain the data, // which we can then return. - if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) { + if let Some(inode) = self.get_inode_for_root_name(name) { return Ok(( inode, self.inode_tracker @@ -260,7 +259,8 @@ where // We don't have it yet, look it up in [self.root_nodes]. match self.tokio_handle.block_on({ let root_nodes_provider = self.root_nodes_provider.clone(); - async move { root_nodes_provider.get_by_basename(name.to_bytes()).await } + let name = name.clone(); + async move { root_nodes_provider.get_by_basename(&name).await } }) { // if there was an error looking up the root node, propagate up an IO error. Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)), @@ -268,15 +268,9 @@ where Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), // The root node does exist Ok(Some(root_node)) => { - // The name must match what's passed in the lookup, otherwise this is also a ENOENT. - if root_node.get_name() != name.to_bytes() { - debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch"); - return Err(io::Error::from_raw_os_error(libc::ENOENT)); - } - // Let's check if someone else beat us to updating the inode tracker and // root_nodes map. This avoids locking inode_tracker for writing. - if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) { + if let Some(ino) = self.root_nodes.read().get(name) { return Ok(( *ino, self.inode_tracker.read().get(*ino).expect("must exist"), @@ -290,9 +284,9 @@ where // insert the (sparse) inode data and register in // self.root_nodes. - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&root_node); let ino = inode_tracker.put(inode_data.clone()); - root_nodes.insert(name, ino); + root_nodes.insert(name.to_owned(), ino); Ok((ino, Arc::new(inode_data))) } @@ -306,10 +300,22 @@ const ROOT_NODES_BUFFER_SIZE: usize = 16; const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest"; const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest"; +#[cfg(all(feature = "virtiofs", target_os = "linux"))] +impl<BS, DS, RN> fuse_backend_rs::api::filesystem::Layer for TvixStoreFs<BS, DS, RN> +where + BS: BlobService + Clone + Send + 'static, + DS: DirectoryService + Send + Clone + 'static, + RN: RootNodes + Clone + 'static, +{ + fn root_inode(&self) -> Self::Inode { + ROOT_ID + } +} + impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN> where - BS: AsRef<dyn BlobService> + Clone + Send + 'static, - DS: AsRef<dyn DirectoryService> + Send + Clone + 'static, + BS: BlobService + Clone + Send + 'static, + DS: DirectoryService + Send + Clone + 'static, RN: RootNodes + Clone + 'static, { type Handle = u64; @@ -348,13 +354,17 @@ where ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> { debug!("lookup"); + // convert the CStr to a PathComponent + // If it can't be converted, we definitely don't have anything here. + let name: PathComponent = name.try_into().map_err(|_| std::io::ErrorKind::NotFound)?; + // This goes from a parent inode to a node. // - If the parent is [ROOT_ID], we need to check // [self.root_nodes] (fetching from a [RootNode] provider if needed) // - Otherwise, lookup the parent in [self.inode_tracker] (which must be // a [InodeData::Directory]), and find the child with that name. if parent == ROOT_ID { - let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?; + let (ino, inode_data) = self.name_in_root_to_ino_and_data(&name)?; debug!(inode_data=?&inode_data, ino=ino, "Some"); return Ok(inode_data.as_fuse_entry(ino)); @@ -367,7 +377,7 @@ where // Search for that name in the list of children and return the FileAttrs. // in the children, find the one with the desired name. - if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { + if let Some((child_ino, _, _)) = children.iter().find(|(_, n, _)| n == &name) { // lookup the child [InodeData] in [self.inode_tracker]. // We know the inodes for children have already been allocated. let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap(); @@ -400,16 +410,20 @@ where // This task will run in the background immediately and will exit // after the stream ends or if we no longer want any more entries. - self.tokio_handle.spawn(async move { - let mut stream = root_nodes_provider.list().enumerate(); - while let Some(node) = stream.next().await { - if tx.send(node).await.is_err() { - // If we get a send error, it means the sync code - // doesn't want any more entries. - break; + self.tokio_handle.spawn( + async move { + let mut stream = root_nodes_provider.list().enumerate(); + while let Some(e) = stream.next().await { + if tx.send(e).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } } } - }); + // instrument the task with the current span, this is not done by default + .in_current_span(), + ); // Put the rx part into [self.dir_handles]. // TODO: this will overflow after 2**64 operations, @@ -462,12 +476,12 @@ where .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; while let Some((i, n)) = rx.blocking_recv() { - let root_node = n.map_err(|e| { + let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EIO) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -482,7 +496,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), })?; // If the buffer is full, add_entry will return `Ok(0)`. if written == 0 { @@ -496,15 +510,17 @@ where let (parent_digest, children) = self.get_directory_children(inode)?; Span::current().record("directory.digest", parent_digest.to_string()); - for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + for (i, (ino, child_name, child_node)) in + children.into_iter().skip(offset as usize).enumerate() + { + let inode_data = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: child_name.as_ref(), })?; // If the buffer is full, add_entry will return `Ok(0)`. if written == 0 { @@ -549,12 +565,12 @@ where .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; while let Some((i, n)) = rx.blocking_recv() { - let root_node = n.map_err(|e| { + let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EPERM) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -570,7 +586,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), }, inode_data.as_fuse_entry(ino), )?; @@ -586,8 +602,8 @@ where let (parent_digest, children) = self.get_directory_children(inode)?; Span::current().record("directory.digest", parent_digest.to_string()); - for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + for (i, (ino, name, child_node)) in children.into_iter().skip(offset as usize).enumerate() { + let inode_data = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry( @@ -595,7 +611,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), }, inode_data.as_fuse_entry(ino), )?; @@ -640,6 +656,7 @@ where ) -> io::Result<( Option<Self::Handle>, fuse_backend_rs::api::filesystem::OpenOptions, + Option<u32>, )> { if inode == ROOT_ID { return Err(io::Error::from_raw_os_error(libc::ENOSYS)); @@ -658,7 +675,7 @@ where match self.tokio_handle.block_on({ let blob_service = self.blob_service.clone(); let blob_digest = blob_digest.clone(); - async move { blob_service.as_ref().open_read(&blob_digest).await } + async move { blob_service.open_read(&blob_digest).await } }) { Ok(None) => { warn!("blob not found"); @@ -683,6 +700,7 @@ where Ok(( Some(fh), fuse_backend_rs::api::filesystem::OpenOptions::empty(), + None, )) } } diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs index 6609e049a1fc..5ed1a4d8d6c0 100644 --- a/tvix/castore/src/fs/root_nodes.rs +++ b/tvix/castore/src/fs/root_nodes.rs @@ -1,7 +1,6 @@ use std::collections::BTreeMap; -use crate::{proto::node::Node, Error}; -use bytes::Bytes; +use crate::{path::PathComponent, Error, Node}; use futures::stream::BoxStream; use tonic::async_trait; @@ -11,11 +10,12 @@ use tonic::async_trait; pub trait RootNodes: Send + Sync { /// Looks up a root CA node based on the basename of the node in the root /// directory of the filesystem. - async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>; + async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error>; - /// Lists all root CA nodes in the filesystem. An error can be returned - /// in case listing is not allowed - fn list(&self) -> BoxStream<Result<Node, Error>>; + /// Lists all root CA nodes in the filesystem, as a tuple of (base)name + /// and Node. + /// An error can be returned in case listing is not allowed. + fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>>; } #[async_trait] @@ -23,15 +23,17 @@ pub trait RootNodes: Send + Sync { /// the key is the node name. impl<T> RootNodes for T where - T: AsRef<BTreeMap<Bytes, Node>> + Send + Sync, + T: AsRef<BTreeMap<PathComponent, Node>> + Send + Sync, { - async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> { + async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error> { Ok(self.as_ref().get(name).cloned()) } - fn list(&self) -> BoxStream<Result<Node, Error>> { + fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>> { Box::pin(tokio_stream::iter( - self.as_ref().iter().map(|(_, v)| Ok(v.clone())), + self.as_ref() + .iter() + .map(|(name, node)| Ok((name.to_owned(), node.to_owned()))), )) } } |