diff options
Diffstat (limited to 'tvix/castore/src/fs')
-rw-r--r-- | tvix/castore/src/fs/file_attr.rs | 29 | ||||
-rw-r--r-- | tvix/castore/src/fs/fuse/mod.rs | 123 | ||||
-rw-r--r-- | tvix/castore/src/fs/fuse/tests.rs | 1245 | ||||
-rw-r--r-- | tvix/castore/src/fs/inode_tracker.rs | 207 | ||||
-rw-r--r-- | tvix/castore/src/fs/inodes.rs | 96 | ||||
-rw-r--r-- | tvix/castore/src/fs/mod.rs | 874 | ||||
-rw-r--r-- | tvix/castore/src/fs/root_nodes.rs | 37 | ||||
-rw-r--r-- | tvix/castore/src/fs/virtiofs.rs | 238 |
8 files changed, 2849 insertions, 0 deletions
diff --git a/tvix/castore/src/fs/file_attr.rs b/tvix/castore/src/fs/file_attr.rs new file mode 100644 index 0000000000..2e0e70e3cd --- /dev/null +++ b/tvix/castore/src/fs/file_attr.rs @@ -0,0 +1,29 @@ +#![allow(clippy::unnecessary_cast)] // libc::S_IFDIR is u32 on Linux and u16 on MacOS + +use fuse_backend_rs::abi::fuse_abi::Attr; + +/// The [Attr] describing the root +pub const ROOT_FILE_ATTR: Attr = Attr { + ino: fuse_backend_rs::api::filesystem::ROOT_ID, + size: 0, + blksize: 1024, + blocks: 0, + mode: libc::S_IFDIR as u32 | 0o555, + atime: 0, + mtime: 0, + ctime: 0, + atimensec: 0, + mtimensec: 0, + ctimensec: 0, + nlink: 0, + uid: 0, + gid: 0, + rdev: 0, + flags: 0, + #[cfg(target_os = "macos")] + crtime: 0, + #[cfg(target_os = "macos")] + crtimensec: 0, + #[cfg(target_os = "macos")] + padding: 0, +}; diff --git a/tvix/castore/src/fs/fuse/mod.rs b/tvix/castore/src/fs/fuse/mod.rs new file mode 100644 index 0000000000..94b73d422a --- /dev/null +++ b/tvix/castore/src/fs/fuse/mod.rs @@ -0,0 +1,123 @@ +use std::{io, path::Path, sync::Arc, thread}; + +use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; +use tracing::{error, instrument}; + +#[cfg(test)] +mod tests; + +struct FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>, + channel: fuse_backend_rs::transport::FuseChannel, +} + +#[cfg(target_os = "macos")] +const BADFD: libc::c_int = libc::EBADF; +#[cfg(target_os = "linux")] +const BADFD: libc::c_int = libc::EBADFD; + +impl<FS> FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + fn start(&mut self) -> io::Result<()> { + while let Some((reader, writer)) = self + .channel + .get_request() + .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? + { + if let Err(e) = self + .server + .handle_message(reader, writer.into(), None, None) + { + match e { + // This indicates the session has been shut down. + fuse_backend_rs::Error::EncodeMessage(e) if e.raw_os_error() == Some(BADFD) => { + break; + } + error => { + error!(?error, "failed to handle fuse request"); + continue; + } + } + } + } + Ok(()) + } +} + +pub struct FuseDaemon { + session: FuseSession, + threads: Vec<thread::JoinHandle<()>>, +} + +impl FuseDaemon { + #[instrument(skip(fs, mountpoint), fields(mountpoint=?mountpoint), err)] + pub fn new<FS, P>( + fs: FS, + mountpoint: P, + threads: usize, + allow_other: bool, + ) -> Result<Self, io::Error> + where + FS: FileSystem + Sync + Send + 'static, + P: AsRef<Path> + std::fmt::Debug, + { + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + #[cfg(target_os = "linux")] + session.set_allow_other(allow_other); + session + .mount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + let mut join_handles = Vec::with_capacity(threads); + for _ in 0..threads { + let mut server = FuseServer { + server: server.clone(), + channel: session + .new_channel() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + }; + let join_handle = thread::Builder::new() + .name("fuse_server".to_string()) + .spawn(move || { + let _ = server.start(); + })?; + join_handles.push(join_handle); + } + + Ok(FuseDaemon { + session, + threads: join_handles, + }) + } + + #[instrument(skip_all, err)] + pub fn unmount(&mut self) -> Result<(), io::Error> { + self.session + .umount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + for thread in self.threads.drain(..) { + thread.join().map_err(|_| { + io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") + })?; + } + + Ok(()) + } +} + +impl Drop for FuseDaemon { + fn drop(&mut self) { + if let Err(error) = self.unmount() { + error!(?error, "failed to unmont fuse filesystem") + } + } +} diff --git a/tvix/castore/src/fs/fuse/tests.rs b/tvix/castore/src/fs/fuse/tests.rs new file mode 100644 index 0000000000..bb321f5888 --- /dev/null +++ b/tvix/castore/src/fs/fuse/tests.rs @@ -0,0 +1,1245 @@ +use bstr::ByteSlice; +use bytes::Bytes; +use std::{ + collections::BTreeMap, + ffi::{OsStr, OsString}, + io::{self, Cursor}, + os::unix::{ffi::OsStrExt, fs::MetadataExt}, + path::Path, + sync::Arc, +}; +use tempfile::TempDir; +use tokio_stream::{wrappers::ReadDirStream, StreamExt}; + +use super::FuseDaemon; +use crate::fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST}; +use crate::proto as castorepb; +use crate::proto::node::Node; +use crate::{ + blobservice::{BlobService, MemoryBlobService}, + directoryservice::{DirectoryService, MemoryDirectoryService}, + fixtures, +}; + +const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; +const BLOB_B_NAME: &str = "55555555555555555555555555555555-test"; +const HELLOWORLD_BLOB_NAME: &str = "66666666666666666666666666666666-test"; +const SYMLINK_NAME: &str = "11111111111111111111111111111111-test"; +const SYMLINK_NAME2: &str = "44444444444444444444444444444444-test"; +const DIRECTORY_WITH_KEEP_NAME: &str = "22222222222222222222222222222222-test"; +const DIRECTORY_COMPLICATED_NAME: &str = "33333333333333333333333333333333-test"; + +fn gen_svcs() -> (Arc<dyn BlobService>, Arc<dyn DirectoryService>) { + ( + Arc::new(MemoryBlobService::default()) as Arc<dyn BlobService>, + Arc::new(MemoryDirectoryService::default()) as Arc<dyn DirectoryService>, + ) +} + +fn do_mount<P: AsRef<Path>, BS, DS>( + blob_service: BS, + directory_service: DS, + root_nodes: BTreeMap<bytes::Bytes, Node>, + mountpoint: P, + list_root: bool, + show_xattr: bool, +) -> io::Result<FuseDaemon> +where + BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, + DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static, +{ + let fs = TvixStoreFs::new( + blob_service, + directory_service, + Arc::new(root_nodes), + list_root, + show_xattr, + ); + FuseDaemon::new(Arc::new(fs), mountpoint.as_ref(), 4, false) +} + +async fn populate_blob_a( + blob_service: &Arc<dyn BlobService>, + root_nodes: &mut BTreeMap<Bytes, Node>, +) { + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + root_nodes.insert( + BLOB_A_NAME.into(), + Node::File(castorepb::FileNode { + name: BLOB_A_NAME.into(), + digest: fixtures::BLOB_A_DIGEST.clone().into(), + size: fixtures::BLOB_A.len() as u64, + executable: false, + }), + ); +} + +async fn populate_blob_b( + blob_service: &Arc<dyn BlobService>, + root_nodes: &mut BTreeMap<Bytes, Node>, +) { + let mut bw = blob_service.open_write().await; + tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + root_nodes.insert( + BLOB_B_NAME.into(), + Node::File(castorepb::FileNode { + name: BLOB_B_NAME.into(), + digest: fixtures::BLOB_B_DIGEST.clone().into(), + size: fixtures::BLOB_B.len() as u64, + executable: false, + }), + ); +} + +/// adds a blob containing helloworld and marks it as executable +async fn populate_blob_helloworld( + blob_service: &Arc<dyn BlobService>, + root_nodes: &mut BTreeMap<Bytes, Node>, +) { + let mut bw = blob_service.open_write().await; + tokio::io::copy( + &mut Cursor::new(fixtures::HELLOWORLD_BLOB_CONTENTS.to_vec()), + &mut bw, + ) + .await + .expect("must succeed uploading"); + bw.close().await.expect("must succeed closing"); + + root_nodes.insert( + HELLOWORLD_BLOB_NAME.into(), + Node::File(castorepb::FileNode { + name: HELLOWORLD_BLOB_NAME.into(), + digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(), + size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, + executable: true, + }), + ); +} + +async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { + root_nodes.insert( + SYMLINK_NAME.into(), + Node::Symlink(castorepb::SymlinkNode { + name: SYMLINK_NAME.into(), + target: BLOB_A_NAME.into(), + }), + ); +} + +/// This writes a symlink pointing to /nix/store/somewhereelse, +/// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED. +async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) { + root_nodes.insert( + SYMLINK_NAME2.into(), + Node::Symlink(castorepb::SymlinkNode { + name: SYMLINK_NAME2.into(), + target: "/nix/store/somewhereelse".into(), + }), + ); +} + +async fn populate_directory_with_keep( + blob_service: &Arc<dyn BlobService>, + directory_service: &Arc<dyn DirectoryService>, + root_nodes: &mut BTreeMap<Bytes, Node>, +) { + // upload empty blob + let mut bw = blob_service.open_write().await; + assert_eq!( + fixtures::EMPTY_BLOB_DIGEST.as_slice(), + bw.close().await.expect("must succeed closing").as_slice(), + ); + + // upload directory + directory_service + .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await + .expect("must succeed uploading"); + + root_nodes.insert( + DIRECTORY_WITH_KEEP_NAME.into(), + castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: DIRECTORY_WITH_KEEP_NAME.into(), + digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + size: fixtures::DIRECTORY_WITH_KEEP.size(), + }), + ); +} + +/// Create a root node for DIRECTORY_WITH_KEEP, but don't upload the Directory +/// itself. +async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) { + root_nodes.insert( + DIRECTORY_WITH_KEEP_NAME.into(), + castorepb::node::Node::Directory(castorepb::DirectoryNode { + name: DIRECTORY_WITH_KEEP_NAME.into(), + digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(), + size: fixtures::DIRECTORY_WITH_KEEP.size(), + }), + ); +} + +/// Insert BLOB_A, but don't provide the blob .keep is pointing to. +async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) { + root_nodes.insert( + BLOB_A_NAME.into(), + Node::File(castorepb::FileNode { + name: BLOB_A_NAME.into(), + digest: fixtures::BLOB_A_DIGEST.clone().into(), + size: fixtures::BLOB_A.len() as u64, + executable: false, + }), + ); +} + +async fn populate_directory_complicated( + blob_service: &Arc<dyn BlobService>, + directory_service: &Arc<dyn DirectoryService>, + root_nodes: &mut BTreeMap<Bytes, Node>, +) { + // upload empty blob + let mut bw = blob_service.open_write().await; + assert_eq!( + fixtures::EMPTY_BLOB_DIGEST.as_slice(), + bw.close().await.expect("must succeed closing").as_slice(), + ); + + // upload inner directory + directory_service + .put(fixtures::DIRECTORY_WITH_KEEP.clone()) + .await + .expect("must succeed uploading"); + + // upload parent directory + directory_service + .put(fixtures::DIRECTORY_COMPLICATED.clone()) + .await + .expect("must succeed uploading"); + + root_nodes.insert( + DIRECTORY_COMPLICATED_NAME.into(), + Node::Directory(castorepb::DirectoryNode { + name: DIRECTORY_COMPLICATED_NAME.into(), + digest: fixtures::DIRECTORY_COMPLICATED.digest().into(), + size: fixtures::DIRECTORY_COMPLICATED.size(), + }), + ); +} + +/// Ensure mounting itself doesn't fail +#[tokio::test] +async fn mount() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + BTreeMap::default(), + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + fuse_daemon.unmount().expect("unmount"); +} +/// Ensure listing the root isn't allowed +#[tokio::test] +async fn root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + BTreeMap::default(), + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + { + // read_dir fails (as opendir fails). + let err = tokio::fs::read_dir(tmpdir).await.expect_err("must fail"); + assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure listing the root is allowed if configured explicitly +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn root_with_listing() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + true, /* allow listing */ + false, + ) + .expect("must succeed"); + + { + // read_dir succeeds, but getting the first element will fail. + let mut it = ReadDirStream::new(tokio::fs::read_dir(tmpdir).await.expect("must succeed")); + + let e = it + .next() + .await + .expect("must be some") + .expect("must succeed"); + + let metadata = e.metadata().await.expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can stat a file at the root +#[tokio::test] +async fn stat_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + // peek at the file metadata + let metadata = tokio::fs::metadata(p).await.expect("must succeed"); + + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_A.len() as u64, metadata.len()); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can read a file at the root +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + // read the file contents + let data = tokio::fs::read(p).await.expect("must succeed"); + + // ensure size and contents match + assert_eq!(fixtures::BLOB_A.len(), data.len()); + assert_eq!(fixtures::BLOB_A.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we can read a large file at the root +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_large_file_at_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_b(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_B_NAME); + { + // peek at the file metadata + let metadata = tokio::fs::metadata(&p).await.expect("must succeed"); + + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + assert_eq!(fixtures::BLOB_B.len() as u64, metadata.len()); + } + + // read the file contents + let data = tokio::fs::read(p).await.expect("must succeed"); + + // ensure size and contents match + assert_eq!(fixtures::BLOB_B.len(), data.len()); + assert_eq!(fixtures::BLOB_B.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read the target of a symlink +#[tokio::test] +async fn symlink_readlink() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_symlink(&mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(SYMLINK_NAME); + + let target = tokio::fs::read_link(&p).await.expect("must succeed"); + assert_eq!(BLOB_A_NAME, target.to_str().unwrap()); + + // peek at the file metadata, which follows symlinks. + // this must fail, as we didn't populate the target. + let e = tokio::fs::metadata(&p).await.expect_err("must fail"); + assert_eq!(std::io::ErrorKind::NotFound, e.kind()); + + // peeking at the file metadata without following symlinks will succeed. + let metadata = tokio::fs::symlink_metadata(&p).await.expect("must succeed"); + assert!(metadata.is_symlink()); + + // reading from the symlink (which follows) will fail, because the target doesn't exist. + let e = tokio::fs::read(p).await.expect_err("must fail"); + assert_eq!(std::io::ErrorKind::NotFound, e.kind()); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read and stat a regular file through a symlink pointing to it. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn read_stat_through_symlink() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + populate_symlink(&mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p_symlink = tmpdir.path().join(SYMLINK_NAME); + let p_blob = tmpdir.path().join(SYMLINK_NAME); + + // peek at the file metadata, which follows symlinks. + // this must now return the same metadata as when statting at the target directly. + let metadata_symlink = tokio::fs::metadata(&p_symlink).await.expect("must succeed"); + let metadata_blob = tokio::fs::metadata(&p_blob).await.expect("must succeed"); + assert_eq!(metadata_blob.file_type(), metadata_symlink.file_type()); + assert_eq!(metadata_blob.len(), metadata_symlink.len()); + + // reading from the symlink (which follows) will return the same data as if + // we were reading from the file directly. + assert_eq!( + tokio::fs::read(p_blob).await.expect("must succeed"), + tokio::fs::read(p_symlink).await.expect("must succeed"), + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read a directory in the root, and validate some attributes. +#[tokio::test] +async fn read_stat_directory() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + + // peek at the metadata of the directory + let metadata = tokio::fs::metadata(p).await.expect("must succeed"); + assert!(metadata.is_dir()); + assert!(metadata.permissions().readonly()); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Read a directory and file in the root, and ensure the xattrs expose blob or +/// directory digests. +#[tokio::test] +async fn xattr() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; + populate_blob_a(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + true, /* support xattr */ + ) + .expect("must succeed"); + + // peek at the directory + { + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + + let xattr_names: Vec<OsString> = xattr::list(&p).expect("must succeed").collect(); + // There should be 1 key, XATTR_NAME_DIRECTORY_DIGEST. + assert_eq!(1, xattr_names.len(), "there should be 1 xattr name"); + assert_eq!( + XATTR_NAME_DIRECTORY_DIGEST, + xattr_names.first().unwrap().as_encoded_bytes() + ); + + // The key should equal to the string-formatted b3 digest. + let val = xattr::get(&p, OsStr::from_bytes(XATTR_NAME_DIRECTORY_DIGEST)) + .expect("must succeed") + .expect("must be some"); + assert_eq!( + fixtures::DIRECTORY_WITH_KEEP + .digest() + .to_string() + .as_bytes() + .as_bstr(), + val.as_bstr() + ); + + // Reading another xattr key is gonna return None. + let val = xattr::get(&p, OsStr::from_bytes(b"user.cheesecake")).expect("must succeed"); + assert_eq!(None, val); + } + // peek at the file + { + let p = tmpdir.path().join(BLOB_A_NAME); + + let xattr_names: Vec<OsString> = xattr::list(&p).expect("must succeed").collect(); + // There should be 1 key, XATTR_NAME_BLOB_DIGEST. + assert_eq!(1, xattr_names.len(), "there should be 1 xattr name"); + assert_eq!( + XATTR_NAME_BLOB_DIGEST, + xattr_names.first().unwrap().as_encoded_bytes() + ); + + // The key should equal to the string-formatted b3 digest. + let val = xattr::get(&p, OsStr::from_bytes(XATTR_NAME_BLOB_DIGEST)) + .expect("must succeed") + .expect("must be some"); + assert_eq!( + fixtures::BLOB_A_DIGEST.to_string().as_bytes().as_bstr(), + val.as_bstr() + ); + + // Reading another xattr key is gonna return None. + let val = xattr::get(&p, OsStr::from_bytes(b"user.cheesecake")).expect("must succeed"); + assert_eq!(None, val); + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Read a blob inside a directory. This ensures we successfully populate directory data. +async fn read_blob_inside_dir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME).join(".keep"); + + // peek at metadata. + let metadata = tokio::fs::metadata(&p).await.expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + + // read from it + let data = tokio::fs::read(&p).await.expect("must succeed"); + assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Read a blob inside a directory inside a directory. This ensures we properly +/// populate directories as we traverse down the structure. +async fn read_blob_deep_inside_dir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir + .path() + .join(DIRECTORY_COMPLICATED_NAME) + .join("keep") + .join(".keep"); + + // peek at metadata. + let metadata = tokio::fs::metadata(&p).await.expect("must succeed"); + assert!(metadata.is_file()); + assert!(metadata.permissions().readonly()); + + // read from it + let data = tokio::fs::read(&p).await.expect("must succeed"); + assert_eq!(fixtures::EMPTY_BLOB_CONTENTS.to_vec(), data); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure readdir works. +#[tokio::test] +async fn readdir() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME); + + { + // read_dir should succeed. Collect all elements + let elements: Vec<_> = + ReadDirStream::new(tokio::fs::read_dir(p).await.expect("must succeed")) + .map(|e| e.expect("must not be err")) + .collect() + .await; + + assert_eq!(3, elements.len(), "number of elements should be 3"); // rust skips . and .. + + // We explicitly look at specific positions here, because we always emit + // them ordered. + + // ".keep", 0 byte file. + let e = &elements[0]; + assert_eq!(".keep", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); + + // "aa", symlink. + let e = &elements[1]; + assert_eq!("aa", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_symlink()); + + // "keep", directory + let e = &elements[2]; + assert_eq!("keep", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_dir()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Do a readdir deeper inside a directory, without doing readdir or stat in the parent directory. +async fn readdir_deep() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); + + { + // read_dir should succeed. Collect all elements + let elements: Vec<_> = + ReadDirStream::new(tokio::fs::read_dir(p).await.expect("must succeed")) + .map(|e| e.expect("must not be err")) + .collect() + .await; + + assert_eq!(1, elements.len(), "number of elements should be 1"); // rust skips . and .. + + // ".keep", 0 byte file. + let e = &elements[0]; + assert_eq!(".keep", e.file_name()); + assert!(e.file_type().await.expect("must succeed").is_file()); + assert_eq!(0, e.metadata().await.expect("must succeed").len()); + } + + fuse_daemon.unmount().expect("unmount"); +} + +/// Check attributes match how they show up in /nix/store normally. +#[tokio::test] +async fn check_attributes() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; + populate_symlink(&mut root_nodes).await; + populate_blob_helloworld(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p_file = tmpdir.path().join(BLOB_A_NAME); + let p_directory = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + let p_symlink = tmpdir.path().join(SYMLINK_NAME); + let p_executable_file = tmpdir.path().join(HELLOWORLD_BLOB_NAME); + + // peek at metadata. We use symlink_metadata to ensure we don't traverse a symlink by accident. + let metadata_file = tokio::fs::symlink_metadata(&p_file) + .await + .expect("must succeed"); + let metadata_executable_file = tokio::fs::symlink_metadata(&p_executable_file) + .await + .expect("must succeed"); + let metadata_directory = tokio::fs::symlink_metadata(&p_directory) + .await + .expect("must succeed"); + let metadata_symlink = tokio::fs::symlink_metadata(&p_symlink) + .await + .expect("must succeed"); + + // modes should match. We & with 0o777 to remove any higher bits. + assert_eq!(0o444, metadata_file.mode() & 0o777); + assert_eq!(0o555, metadata_executable_file.mode() & 0o777); + assert_eq!(0o555, metadata_directory.mode() & 0o777); + assert_eq!(0o444, metadata_symlink.mode() & 0o777); + + // files should have the correct filesize + assert_eq!(fixtures::BLOB_A.len() as u64, metadata_file.len()); + // directories should have their "size" as filesize + assert_eq!( + { fixtures::DIRECTORY_WITH_KEEP.size() }, + metadata_directory.size() + ); + + for metadata in &[&metadata_file, &metadata_directory, &metadata_symlink] { + // uid and gid should be 0. + assert_eq!(0, metadata.uid()); + assert_eq!(0, metadata.gid()); + + // all times should be set to the unix epoch. + assert_eq!(0, metadata.atime()); + assert_eq!(0, metadata.mtime()); + assert_eq!(0, metadata.ctime()); + // crtime seems MacOS only + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Ensure we allocate the same inodes for the same directory contents. +/// $DIRECTORY_COMPLICATED_NAME/keep contains the same data as $DIRECTORY_WITH_KEEP. +async fn compare_inodes_directories() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p_dir_with_keep = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + let p_sibling_dir = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("keep"); + + // peek at metadata. + assert_eq!( + tokio::fs::metadata(p_dir_with_keep) + .await + .expect("must succeed") + .ino(), + tokio::fs::metadata(p_sibling_dir) + .await + .expect("must succeed") + .ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we allocate the same inodes for the same directory contents. +/// $DIRECTORY_COMPLICATED_NAME/keep/,keep contains the same data as $DIRECTORY_COMPLICATED_NAME/.keep +#[tokio::test] +async fn compare_inodes_files() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p_keep1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join(".keep"); + let p_keep2 = tmpdir + .path() + .join(DIRECTORY_COMPLICATED_NAME) + .join("keep") + .join(".keep"); + + // peek at metadata. + assert_eq!( + tokio::fs::metadata(p_keep1) + .await + .expect("must succeed") + .ino(), + tokio::fs::metadata(p_keep2) + .await + .expect("must succeed") + .ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Ensure we allocate the same inode for symlinks pointing to the same targets. +/// $DIRECTORY_COMPLICATED_NAME/aa points to the same target as SYMLINK_NAME2. +#[tokio::test] +async fn compare_inodes_symlinks() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; + populate_symlink2(&mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p1 = tmpdir.path().join(DIRECTORY_COMPLICATED_NAME).join("aa"); + let p2 = tmpdir.path().join(SYMLINK_NAME2); + + // peek at metadata. + assert_eq!( + tokio::fs::symlink_metadata(p1) + .await + .expect("must succeed") + .ino(), + tokio::fs::symlink_metadata(p2) + .await + .expect("must succeed") + .ino() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Check we match paths exactly. +#[tokio::test] +async fn read_wrong_paths_in_root() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_blob_a(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + // wrong name + assert!( + tokio::fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); + + // invalid hash + assert!( + tokio::fs::metadata(tmpdir.path().join("0000000000000000000000000000000-test")) + .await + .is_err() + ); + + // right name, must exist + assert!( + tokio::fs::metadata(tmpdir.path().join("00000000000000000000000000000000-test")) + .await + .is_ok() + ); + + // now wrong name with right hash still may not exist + assert!( + tokio::fs::metadata(tmpdir.path().join("00000000000000000000000000000000-tes")) + .await + .is_err() + ); + + fuse_daemon.unmount().expect("unmount"); +} + +/// Make sure writes are not allowed +#[tokio::test] +async fn disallow_writes() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let root_nodes = BTreeMap::default(); + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + let e = tokio::fs::File::create(p).await.expect_err("must fail"); + + assert_eq!(Some(libc::EROFS), e.raw_os_error()); + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test] +/// Ensure we get an IO error if the directory service does not have the Directory object. +async fn missing_directory() { + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directorynode_without_directory(&mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + + { + // `stat` on the path should succeed, because it doesn't trigger the directory request. + tokio::fs::metadata(&p).await.expect("must succeed"); + + // However, calling either `readdir` or `stat` on a child should fail with an IO error. + // It fails when trying to pull the first entry, because we don't implement opendir separately + ReadDirStream::new(tokio::fs::read_dir(&p).await.unwrap()) + .next() + .await + .expect("must be some") + .expect_err("must be err"); + + // rust currently sets e.kind() to Uncategorized, which isn't very + // helpful, so we don't look at the error more closely than that.. + tokio::fs::metadata(p.join(".keep")) + .await + .expect_err("must fail"); + } + + fuse_daemon.unmount().expect("unmount"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +/// Ensure we get an IO error if the blob service does not have the blob +async fn missing_blob() { + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_filenode_without_blob(&mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + false, + ) + .expect("must succeed"); + + let p = tmpdir.path().join(BLOB_A_NAME); + + { + // `stat` on the blob should succeed, because it doesn't trigger a request to the blob service. + tokio::fs::metadata(&p).await.expect("must succeed"); + + // However, calling read on the blob should fail. + // rust currently sets e.kind() to Uncategorized, which isn't very + // helpful, so we don't look at the error more closely than that.. + tokio::fs::read(p).await.expect_err("must fail"); + } + + fuse_daemon.unmount().expect("unmount"); +} diff --git a/tvix/castore/src/fs/inode_tracker.rs b/tvix/castore/src/fs/inode_tracker.rs new file mode 100644 index 0000000000..4a8283b6b1 --- /dev/null +++ b/tvix/castore/src/fs/inode_tracker.rs @@ -0,0 +1,207 @@ +use std::{collections::HashMap, sync::Arc}; + +use super::inodes::{DirectoryInodeData, InodeData}; +use crate::B3Digest; + +/// InodeTracker keeps track of inodes, stores data being these inodes and deals +/// with inode allocation. +pub struct InodeTracker { + data: HashMap<u64, Arc<InodeData>>, + + // lookup table for blobs by their B3Digest + blob_digest_to_inode: HashMap<B3Digest, u64>, + + // lookup table for symlinks by their target + symlink_target_to_inode: HashMap<bytes::Bytes, u64>, + + // lookup table for directories by their B3Digest. + // Note the corresponding directory may not be present in data yet. + directory_digest_to_inode: HashMap<B3Digest, u64>, + + // the next inode to allocate + next_inode: u64, +} + +impl Default for InodeTracker { + fn default() -> Self { + Self { + data: Default::default(), + + blob_digest_to_inode: Default::default(), + symlink_target_to_inode: Default::default(), + directory_digest_to_inode: Default::default(), + + next_inode: 2, + } + } +} + +impl InodeTracker { + // Retrieves data for a given inode, if it exists. + pub fn get(&self, ino: u64) -> Option<Arc<InodeData>> { + self.data.get(&ino).cloned() + } + + // Replaces data for a given inode. + // Panics if the inode doesn't already exist. + pub fn replace(&mut self, ino: u64, data: Arc<InodeData>) { + if self.data.insert(ino, data).is_none() { + panic!("replace called on unknown inode"); + } + } + + // Stores data and returns the inode for it. + // In case an inode has already been allocated for the same data, that inode + // is returned, otherwise a new one is allocated. + // In case data is a [InodeData::Directory], inodes for all items are looked + // up + pub fn put(&mut self, data: InodeData) -> u64 { + match data { + InodeData::Regular(ref digest, _, _) => { + match self.blob_digest_to_inode.get(digest) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => self.insert_and_increment(data), + } + } + InodeData::Symlink(ref target) => { + match self.symlink_target_to_inode.get(target) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => self.insert_and_increment(data), + } + } + InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { + // check the lookup table if the B3Digest is known. + match self.directory_digest_to_inode.get(digest) { + Some(found_ino) => { + // We already have it, return the inode. + *found_ino + } + None => { + // insert and return the inode + self.insert_and_increment(data) + } + } + } + // Inserting [DirectoryInodeData::Populated] doesn't normally happen, + // only via [replace]. + InodeData::Directory(DirectoryInodeData::Populated(..)) => { + unreachable!("should never be called with DirectoryInodeData::Populated") + } + } + } + + // Inserts the data and returns the inode it was stored at, while + // incrementing next_inode. + fn insert_and_increment(&mut self, data: InodeData) -> u64 { + let ino = self.next_inode; + // insert into lookup tables + match data { + InodeData::Regular(ref digest, _, _) => { + self.blob_digest_to_inode.insert(digest.clone(), ino); + } + InodeData::Symlink(ref target) => { + self.symlink_target_to_inode.insert(target.clone(), ino); + } + InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _size)) => { + self.directory_digest_to_inode.insert(digest.clone(), ino); + } + // This is currently not used outside test fixtures. + // Usually a [DirectoryInodeData::Sparse] is inserted and later + // "upgraded" with more data. + // However, as a future optimization, a lookup for a PathInfo could trigger a + // [DirectoryService::get_recursive()] request that "forks into + // background" and prepopulates all Directories in a closure. + InodeData::Directory(DirectoryInodeData::Populated(ref digest, _)) => { + self.directory_digest_to_inode.insert(digest.clone(), ino); + } + } + // Insert data + self.data.insert(ino, Arc::new(data)); + + // increment inode counter and return old inode. + self.next_inode += 1; + ino + } +} + +#[cfg(test)] +mod tests { + use crate::fixtures; + + use super::InodeData; + use super::InodeTracker; + + /// Getting something non-existent should be none + #[test] + fn get_nonexistent() { + let inode_tracker = InodeTracker::default(); + assert!(inode_tracker.get(1).is_none()); + } + + /// Put of a regular file should allocate a uid, which should be the same when inserting again. + #[test] + fn put_regular() { + let mut inode_tracker = InodeTracker::default(); + let f = InodeData::Regular( + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u64, + false, + ); + + // put it in + let ino = inode_tracker.put(f.clone()); + + // a get should return the right data + let data = inode_tracker.get(ino).expect("must be some"); + match *data { + InodeData::Regular(ref digest, _, _) => { + assert_eq!(&fixtures::BLOB_A_DIGEST.clone(), digest); + } + InodeData::Symlink(_) | InodeData::Directory(..) => panic!("wrong type"), + } + + // another put should return the same ino + assert_eq!(ino, inode_tracker.put(f)); + + // inserting another file should return a different ino + assert_ne!( + ino, + inode_tracker.put(InodeData::Regular( + fixtures::BLOB_B_DIGEST.clone(), + fixtures::BLOB_B.len() as u64, + false, + )) + ); + } + + // Put of a symlink should allocate a uid, which should be the same when inserting again + #[test] + fn put_symlink() { + let mut inode_tracker = InodeTracker::default(); + let f = InodeData::Symlink("target".into()); + + // put it in + let ino = inode_tracker.put(f.clone()); + + // a get should return the right data + let data = inode_tracker.get(ino).expect("must be some"); + match *data { + InodeData::Symlink(ref target) => { + assert_eq!(b"target".to_vec(), *target); + } + InodeData::Regular(..) | InodeData::Directory(..) => panic!("wrong type"), + } + + // another put should return the same ino + assert_eq!(ino, inode_tracker.put(f)); + + // inserting another file should return a different ino + assert_ne!(ino, inode_tracker.put(InodeData::Symlink("target2".into()))); + } +} diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs new file mode 100644 index 0000000000..bdd4595434 --- /dev/null +++ b/tvix/castore/src/fs/inodes.rs @@ -0,0 +1,96 @@ +//! This module contains all the data structures used to track information +//! about inodes, which present tvix-castore nodes in a filesystem. +use std::time::Duration; + +use bytes::Bytes; + +use crate::proto as castorepb; +use crate::B3Digest; + +#[derive(Clone, Debug)] +pub enum InodeData { + Regular(B3Digest, u64, bool), // digest, size, executable + Symlink(bytes::Bytes), // target + Directory(DirectoryInodeData), // either [DirectoryInodeData:Sparse] or [DirectoryInodeData:Populated] +} + +/// This encodes the two different states of [InodeData::Directory]. +/// Either the data still is sparse (we only saw a [castorepb::DirectoryNode], +/// but didn't fetch the [castorepb::Directory] struct yet, or we processed a +/// lookup and did fetch the data. +#[derive(Clone, Debug)] +pub enum DirectoryInodeData { + Sparse(B3Digest, u64), // digest, size + Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)] +} + +impl InodeData { + /// Constructs a new InodeData by consuming a [Node]. + /// It splits off the orginal name, so it can be used later. + pub fn from_node(node: castorepb::node::Node) -> (Self, Bytes) { + match node { + castorepb::node::Node::Directory(n) => ( + Self::Directory(DirectoryInodeData::Sparse( + n.digest.try_into().unwrap(), + n.size, + )), + n.name, + ), + castorepb::node::Node::File(n) => ( + Self::Regular(n.digest.try_into().unwrap(), n.size, n.executable), + n.name, + ), + castorepb::node::Node::Symlink(n) => (Self::Symlink(n.target), n.name), + } + } + + pub fn as_fuse_file_attr(&self, inode: u64) -> fuse_backend_rs::abi::fuse_abi::Attr { + fuse_backend_rs::abi::fuse_abi::Attr { + ino: inode, + // FUTUREWORK: play with this numbers, as it affects read sizes for client applications. + blocks: 1024, + size: match self { + InodeData::Regular(_, size, _) => *size, + InodeData::Symlink(target) => target.len() as u64, + InodeData::Directory(DirectoryInodeData::Sparse(_, size)) => *size, + InodeData::Directory(DirectoryInodeData::Populated(_, ref children)) => { + children.len() as u64 + } + }, + mode: self.as_fuse_type() | self.mode(), + ..Default::default() + } + } + + fn mode(&self) -> u32 { + match self { + InodeData::Regular(_, _, false) | InodeData::Symlink(_) => 0o444, + InodeData::Regular(_, _, true) | InodeData::Directory(_) => 0o555, + } + } + + pub fn as_fuse_entry(&self, inode: u64) -> fuse_backend_rs::api::filesystem::Entry { + fuse_backend_rs::api::filesystem::Entry { + inode, + attr: self.as_fuse_file_attr(inode).into(), + attr_timeout: Duration::MAX, + entry_timeout: Duration::MAX, + ..Default::default() + } + } + + /// Returns the u32 fuse type + pub fn as_fuse_type(&self) -> u32 { + #[allow(clippy::let_and_return)] + let ty = match self { + InodeData::Regular(_, _, _) => libc::S_IFREG, + InodeData::Symlink(_) => libc::S_IFLNK, + InodeData::Directory(_) => libc::S_IFDIR, + }; + // libc::S_IFDIR is u32 on Linux and u16 on MacOS + #[cfg(target_os = "macos")] + let ty = ty as u32; + + ty + } +} diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs new file mode 100644 index 0000000000..176199f64a --- /dev/null +++ b/tvix/castore/src/fs/mod.rs @@ -0,0 +1,874 @@ +mod file_attr; +mod inode_tracker; +mod inodes; +mod root_nodes; + +#[cfg(feature = "fuse")] +pub mod fuse; + +#[cfg(feature = "virtiofs")] +pub mod virtiofs; + +pub use self::root_nodes::RootNodes; +use self::{ + file_attr::ROOT_FILE_ATTR, + inode_tracker::InodeTracker, + inodes::{DirectoryInodeData, InodeData}, +}; +use crate::proto as castorepb; +use crate::{ + blobservice::{BlobReader, BlobService}, + directoryservice::DirectoryService, + proto::{node::Node, NamedNode}, + B3Digest, +}; +use bstr::ByteVec; +use bytes::Bytes; +use fuse_backend_rs::abi::fuse_abi::{stat64, OpenOptions}; +use fuse_backend_rs::api::filesystem::{ + Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID, +}; +use futures::StreamExt; +use parking_lot::RwLock; +use std::sync::Mutex; +use std::{ + collections::HashMap, + io, + sync::atomic::AtomicU64, + sync::{atomic::Ordering, Arc}, + time::Duration, +}; +use std::{ffi::CStr, io::Cursor}; +use tokio::{ + io::{AsyncReadExt, AsyncSeekExt}, + sync::mpsc, +}; +use tracing::{debug, error, instrument, warn, Span}; + +/// This implements a read-only FUSE filesystem for a tvix-store +/// with the passed [BlobService], [DirectoryService] and [RootNodes]. +/// +/// Linux uses inodes in filesystems. When implementing FUSE, most calls are +/// *for* a given inode. +/// +/// This means, we need to have a stable mapping of inode numbers to the +/// corresponding store nodes. +/// +/// We internally delegate all inode allocation and state keeping to the +/// inode tracker. +/// We store a mapping from currently "explored" names in the root to their +/// inode. +/// +/// There's some places where inodes are allocated / data inserted into +/// the inode tracker, if not allocated before already: +/// - Processing a `lookup` request, either in the mount root, or somewhere +/// deeper. +/// - Processing a `readdir` request +/// +/// Things pointing to the same contents get the same inodes, irrespective of +/// their own location. +/// This means: +/// - Symlinks with the same target will get the same inode. +/// - Regular/executable files with the same contents will get the same inode +/// - Directories with the same contents will get the same inode. +/// +/// Due to the above being valid across the whole store, and considering the +/// merkle structure is a DAG, not a tree, this also means we can't do "bucketed +/// allocation", aka reserve Directory.size inodes for each directory node we +/// explore. +/// Tests for this live in the tvix-store crate. +pub struct TvixStoreFs<BS, DS, RN> { + blob_service: BS, + directory_service: DS, + root_nodes_provider: RN, + + /// Whether to (try) listing elements in the root. + list_root: bool, + + /// Whether to expose blob and directory digests as extended attributes. + show_xattr: bool, + + /// This maps a given basename in the root to the inode we allocated for the node. + root_nodes: RwLock<HashMap<Bytes, u64>>, + + /// This keeps track of inodes and data alongside them. + inode_tracker: RwLock<InodeTracker>, + + // FUTUREWORK: have a generic container type for dir/file handles and handle + // allocation. + /// Maps from the handle returned from an opendir to + /// This holds all opendir handles (for the root inode) + /// They point to the rx part of the channel producing the listing. + #[allow(clippy::type_complexity)] + dir_handles: RwLock< + HashMap< + u64, + ( + Span, + Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>, + ), + >, + >, + + next_dir_handle: AtomicU64, + + /// This holds all open file handles + #[allow(clippy::type_complexity)] + file_handles: RwLock<HashMap<u64, (Span, Arc<Mutex<Box<dyn BlobReader>>>)>>, + + next_file_handle: AtomicU64, + + tokio_handle: tokio::runtime::Handle, +} + +impl<BS, DS, RN> TvixStoreFs<BS, DS, RN> +where + BS: AsRef<dyn BlobService> + Clone + Send, + DS: AsRef<dyn DirectoryService> + Clone + Send + 'static, + RN: RootNodes + Clone + 'static, +{ + pub fn new( + blob_service: BS, + directory_service: DS, + root_nodes_provider: RN, + list_root: bool, + show_xattr: bool, + ) -> Self { + Self { + blob_service, + directory_service, + root_nodes_provider, + + list_root, + show_xattr, + + root_nodes: RwLock::new(HashMap::default()), + inode_tracker: RwLock::new(Default::default()), + + dir_handles: RwLock::new(Default::default()), + next_dir_handle: AtomicU64::new(1), + + file_handles: RwLock::new(Default::default()), + next_file_handle: AtomicU64::new(1), + tokio_handle: tokio::runtime::Handle::current(), + } + } + + /// Retrieves the inode for a given root node basename, if present. + /// This obtains a read lock on self.root_nodes. + fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> { + self.root_nodes.read().get(name).cloned() + } + + /// For a given inode, look up the given directory behind it (from + /// self.inode_tracker), and return its children. + /// The inode_tracker MUST know about this inode already, and it MUST point + /// to a [InodeData::Directory]. + /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup + /// in self.directory_service is performed, and self.inode_tracker is updated with the + /// [DirectoryInodeData::Populated]. + #[instrument(skip(self), err)] + fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> { + let data = self.inode_tracker.read().get(ino).unwrap(); + match *data { + // if it's populated already, return children. + InodeData::Directory(DirectoryInodeData::Populated( + ref parent_digest, + ref children, + )) => Ok((parent_digest.clone(), children.clone())), + // if it's sparse, fetch data using directory_service, populate child nodes + // and update it in [self.inode_tracker]. + InodeData::Directory(DirectoryInodeData::Sparse(ref parent_digest, _)) => { + let directory = self + .tokio_handle + .block_on({ + let directory_service = self.directory_service.clone(); + let parent_digest = parent_digest.to_owned(); + async move { directory_service.as_ref().get(&parent_digest).await } + })? + .ok_or_else(|| { + warn!(directory.digest=%parent_digest, "directory not found"); + // If the Directory can't be found, this is a hole, bail out. + io::Error::from_raw_os_error(libc::EIO) + })?; + + // Turn the retrieved directory into a InodeData::Directory(DirectoryInodeData::Populated(..)), + // allocating inodes for the children on the way. + // FUTUREWORK: there's a bunch of cloning going on here, which we can probably avoid. + let children = { + let mut inode_tracker = self.inode_tracker.write(); + + let children: Vec<(u64, castorepb::node::Node)> = directory + .nodes() + .map(|child_node| { + let (inode_data, _) = InodeData::from_node(child_node.clone()); + + let child_ino = inode_tracker.put(inode_data); + (child_ino, child_node) + }) + .collect(); + + // replace. + inode_tracker.replace( + ino, + Arc::new(InodeData::Directory(DirectoryInodeData::Populated( + parent_digest.clone(), + children.clone(), + ))), + ); + + children + }; + + Ok((parent_digest.clone(), children)) + } + // if the parent inode was not a directory, this doesn't make sense + InodeData::Regular(..) | InodeData::Symlink(_) => { + Err(io::Error::from_raw_os_error(libc::ENOTDIR)) + } + } + } + + /// This will turn a lookup request for a name in the root to a ino and + /// [InodeData]. + /// It will peek in [self.root_nodes], and then either look it up from + /// [self.inode_tracker], + /// or otherwise fetch from [self.root_nodes], and then insert into + /// [self.inode_tracker]. + /// In the case the name can't be found, a libc::ENOENT is returned. + fn name_in_root_to_ino_and_data( + &self, + name: &std::ffi::CStr, + ) -> io::Result<(u64, Arc<InodeData>)> { + // Look up the inode for that root node. + // If there's one, [self.inode_tracker] MUST also contain the data, + // which we can then return. + if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) { + return Ok(( + inode, + self.inode_tracker + .read() + .get(inode) + .expect("must exist") + .to_owned(), + )); + } + + // We don't have it yet, look it up in [self.root_nodes]. + match self.tokio_handle.block_on({ + let root_nodes_provider = self.root_nodes_provider.clone(); + async move { root_nodes_provider.get_by_basename(name.to_bytes()).await } + }) { + // if there was an error looking up the root node, propagate up an IO error. + Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)), + // the root node doesn't exist, so the file doesn't exist. + Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), + // The root node does exist + Ok(Some(root_node)) => { + // The name must match what's passed in the lookup, otherwise this is also a ENOENT. + if root_node.get_name() != name.to_bytes() { + debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch"); + return Err(io::Error::from_raw_os_error(libc::ENOENT)); + } + + // Let's check if someone else beat us to updating the inode tracker and + // root_nodes map. This avoids locking inode_tracker for writing. + if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) { + return Ok(( + *ino, + self.inode_tracker.read().get(*ino).expect("must exist"), + )); + } + + // Only in case it doesn't, lock [self.root_nodes] and + // [self.inode_tracker] for writing. + let mut root_nodes = self.root_nodes.write(); + let mut inode_tracker = self.inode_tracker.write(); + + // insert the (sparse) inode data and register in + // self.root_nodes. + let (inode_data, name) = InodeData::from_node(root_node); + let ino = inode_tracker.put(inode_data.clone()); + root_nodes.insert(name, ino); + + Ok((ino, Arc::new(inode_data))) + } + } + } +} + +/// Buffer size of the channel providing nodes in the mount root +const ROOT_NODES_BUFFER_SIZE: usize = 16; + +const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest"; +const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest"; + +impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN> +where + BS: AsRef<dyn BlobService> + Clone + Send + 'static, + DS: AsRef<dyn DirectoryService> + Send + Clone + 'static, + RN: RootNodes + Clone + 'static, +{ + type Handle = u64; + type Inode = u64; + + fn init(&self, _capable: FsOptions) -> io::Result<FsOptions> { + Ok(FsOptions::empty()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn getattr( + &self, + _ctx: &Context, + inode: Self::Inode, + _handle: Option<Self::Handle>, + ) -> io::Result<(stat64, Duration)> { + if inode == ROOT_ID { + return Ok((ROOT_FILE_ATTR.into(), Duration::MAX)); + } + + match self.inode_tracker.read().get(inode) { + None => Err(io::Error::from_raw_os_error(libc::ENOENT)), + Some(inode_data) => { + debug!(inode_data = ?inode_data, "found node"); + Ok((inode_data.as_fuse_file_attr(inode).into(), Duration::MAX)) + } + } + } + + #[tracing::instrument(skip_all, fields(rq.parent_inode = parent, rq.name = ?name))] + fn lookup( + &self, + _ctx: &Context, + parent: Self::Inode, + name: &std::ffi::CStr, + ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> { + debug!("lookup"); + + // This goes from a parent inode to a node. + // - If the parent is [ROOT_ID], we need to check + // [self.root_nodes] (fetching from a [RootNode] provider if needed) + // - Otherwise, lookup the parent in [self.inode_tracker] (which must be + // a [InodeData::Directory]), and find the child with that name. + if parent == ROOT_ID { + let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?; + + debug!(inode_data=?&inode_data, ino=ino, "Some"); + return Ok(inode_data.as_fuse_entry(ino)); + } + // This is the "lookup for "a" inside inode 42. + // We already know that inode 42 must be a directory. + let (parent_digest, children) = self.get_directory_children(parent)?; + + Span::current().record("directory.digest", parent_digest.to_string()); + // Search for that name in the list of children and return the FileAttrs. + + // in the children, find the one with the desired name. + if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { + // lookup the child [InodeData] in [self.inode_tracker]. + // We know the inodes for children have already been allocated. + let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap(); + + // Reply with the file attributes for the child. + // For child directories, we still have all data we need to reply. + Ok(child_inode_data.as_fuse_entry(*child_ino)) + } else { + // Child not found, return ENOENT. + Err(io::Error::from_raw_os_error(libc::ENOENT)) + } + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn opendir( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + ) -> io::Result<(Option<Self::Handle>, OpenOptions)> { + // In case opendir on the root is called, we provide the handle, as re-entering that listing is expensive. + // For all other directory inodes we just let readdir take care of it. + if inode == ROOT_ID { + if !self.list_root { + return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo + } + + let root_nodes_provider = self.root_nodes_provider.clone(); + let (tx, rx) = mpsc::channel(ROOT_NODES_BUFFER_SIZE); + + // This task will run in the background immediately and will exit + // after the stream ends or if we no longer want any more entries. + self.tokio_handle.spawn(async move { + let mut stream = root_nodes_provider.list().enumerate(); + while let Some(node) = stream.next().await { + if tx.send(node).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } + } + }); + + // Put the rx part into [self.dir_handles]. + // TODO: this will overflow after 2**64 operations, + // which is fine for now. + // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 + // for the discussion on alternatives. + let dh = self.next_dir_handle.fetch_add(1, Ordering::SeqCst); + + self.dir_handles + .write() + .insert(dh, (Span::current(), Arc::new(Mutex::new(rx)))); + + return Ok(( + Some(dh), + fuse_backend_rs::api::filesystem::OpenOptions::empty(), // TODO: non-seekable + )); + } + + Ok((None, OpenOptions::empty())) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle, rq.offset = offset), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))] + fn readdir( + &self, + _ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + _size: u32, + offset: u64, + add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>, + ) -> io::Result<()> { + debug!("readdir"); + + if inode == ROOT_ID { + if !self.list_root { + return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo + } + + // get the handle from [self.dir_handles] + let (_span, rx) = match self.dir_handles.read().get(&handle) { + Some(rx) => rx.clone(), + None => { + warn!("dir handle {} unknown", handle); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + }; + + let mut rx = rx + .lock() + .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; + + while let Some((i, n)) = rx.blocking_recv() { + let root_node = n.map_err(|e| { + warn!("failed to retrieve root node: {}", e); + io::Error::from_raw_os_error(libc::EIO) + })?; + + let (inode_data, name) = InodeData::from_node(root_node); + + // obtain the inode, or allocate a new one. + let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { + // insert the (sparse) inode data and register in + // self.root_nodes. + let ino = self.inode_tracker.write().put(inode_data.clone()); + self.root_nodes.write().insert(name.clone(), ino); + ino + }); + + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino, + offset: offset + (i as u64) + 1, + type_: inode_data.as_fuse_type(), + name: &name, + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { + break; + } + } + return Ok(()); + } + + // Non root-node case: lookup the children, or return an error if it's not a directory. + let (parent_digest, children) = self.get_directory_children(inode)?; + Span::current().record("directory.digest", parent_digest.to_string()); + + for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { + let (inode_data, name) = InodeData::from_node(child_node); + + // the second parameter will become the "offset" parameter on the next call. + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino, + offset: offset + (i as u64) + 1, + type_: inode_data.as_fuse_type(), + name: &name, + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { + break; + } + } + + Ok(()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))] + fn readdirplus( + &self, + _ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + _size: u32, + offset: u64, + add_entry: &mut dyn FnMut( + fuse_backend_rs::api::filesystem::DirEntry, + fuse_backend_rs::api::filesystem::Entry, + ) -> io::Result<usize>, + ) -> io::Result<()> { + debug!("readdirplus"); + + if inode == ROOT_ID { + if !self.list_root { + return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo + } + + // get the handle from [self.dir_handles] + let (_span, rx) = match self.dir_handles.read().get(&handle) { + Some(rx) => rx.clone(), + None => { + warn!("dir handle {} unknown", handle); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + }; + + let mut rx = rx + .lock() + .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; + + while let Some((i, n)) = rx.blocking_recv() { + let root_node = n.map_err(|e| { + warn!("failed to retrieve root node: {}", e); + io::Error::from_raw_os_error(libc::EPERM) + })?; + + let (inode_data, name) = InodeData::from_node(root_node); + + // obtain the inode, or allocate a new one. + let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { + // insert the (sparse) inode data and register in + // self.root_nodes. + let ino = self.inode_tracker.write().put(inode_data.clone()); + self.root_nodes.write().insert(name.clone(), ino); + ino + }); + + let written = add_entry( + fuse_backend_rs::api::filesystem::DirEntry { + ino, + offset: offset + (i as u64) + 1, + type_: inode_data.as_fuse_type(), + name: &name, + }, + inode_data.as_fuse_entry(ino), + )?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { + break; + } + } + return Ok(()); + } + + // Non root-node case: lookup the children, or return an error if it's not a directory. + let (parent_digest, children) = self.get_directory_children(inode)?; + Span::current().record("directory.digest", parent_digest.to_string()); + + for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { + let (inode_data, name) = InodeData::from_node(child_node); + + // the second parameter will become the "offset" parameter on the next call. + let written = add_entry( + fuse_backend_rs::api::filesystem::DirEntry { + ino, + offset: offset + (i as u64) + 1, + type_: inode_data.as_fuse_type(), + name: &name, + }, + inode_data.as_fuse_entry(ino), + )?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { + break; + } + } + + Ok(()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.dir_handles.read().get(&handle).and_then(|x| x.0.id()))] + fn releasedir( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + handle: Self::Handle, + ) -> io::Result<()> { + if inode == ROOT_ID { + // drop the rx part of the channel. + match self.dir_handles.write().remove(&handle) { + // drop it, which will close it. + Some(rx) => drop(rx), + None => { + warn!("dir handle not found"); + } + } + } + + Ok(()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn open( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + _fuse_flags: u32, + ) -> io::Result<( + Option<Self::Handle>, + fuse_backend_rs::api::filesystem::OpenOptions, + )> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // lookup the inode + match *self.inode_tracker.read().get(inode).unwrap() { + // read is invalid on non-files. + InodeData::Directory(..) | InodeData::Symlink(_) => { + warn!("is directory"); + Err(io::Error::from_raw_os_error(libc::EISDIR)) + } + InodeData::Regular(ref blob_digest, _blob_size, _) => { + Span::current().record("blob.digest", blob_digest.to_string()); + + match self.tokio_handle.block_on({ + let blob_service = self.blob_service.clone(); + let blob_digest = blob_digest.clone(); + async move { blob_service.as_ref().open_read(&blob_digest).await } + }) { + Ok(None) => { + warn!("blob not found"); + Err(io::Error::from_raw_os_error(libc::EIO)) + } + Err(e) => { + warn!(e=?e, "error opening blob"); + Err(io::Error::from_raw_os_error(libc::EIO)) + } + Ok(Some(blob_reader)) => { + // get a new file handle + // TODO: this will overflow after 2**64 operations, + // which is fine for now. + // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 + // for the discussion on alternatives. + let fh = self.next_file_handle.fetch_add(1, Ordering::SeqCst); + + self.file_handles + .write() + .insert(fh, (Span::current(), Arc::new(Mutex::new(blob_reader)))); + + Ok(( + Some(fh), + fuse_backend_rs::api::filesystem::OpenOptions::empty(), + )) + } + } + } + } + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle), parent = self.file_handles.read().get(&handle).and_then(|x| x.0.id()))] + fn release( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + handle: Self::Handle, + _flush: bool, + _flock_release: bool, + _lock_owner: Option<u64>, + ) -> io::Result<()> { + match self.file_handles.write().remove(&handle) { + // drop the blob reader, which will close it. + Some(blob_reader) => drop(blob_reader), + None => { + // These might already be dropped if a read error occured. + warn!("file handle not found"); + } + } + + Ok(()) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle, rq.offset = offset, rq.size = size), parent = self.file_handles.read().get(&handle).and_then(|x| x.0.id()))] + fn read( + &self, + _ctx: &Context, + inode: Self::Inode, + handle: Self::Handle, + w: &mut dyn fuse_backend_rs::api::filesystem::ZeroCopyWriter, + size: u32, + offset: u64, + _lock_owner: Option<u64>, + _flags: u32, + ) -> io::Result<usize> { + debug!("read"); + + // We need to take out the blob reader from self.file_handles, so we can + // interact with it in the separate task. + // On success, we pass it back out of the task, so we can put it back in self.file_handles. + let (_span, blob_reader) = self + .file_handles + .read() + .get(&handle) + .ok_or_else(|| { + warn!("file handle {} unknown", handle); + io::Error::from_raw_os_error(libc::EIO) + }) + .cloned()?; + + let mut blob_reader = blob_reader + .lock() + .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; + + let buf = self.tokio_handle.block_on(async move { + // seek to the offset specified, which is relative to the start of the file. + let pos = blob_reader + .seek(io::SeekFrom::Start(offset)) + .await + .map_err(|e| { + warn!("failed to seek to offset {}: {}", offset, e); + io::Error::from_raw_os_error(libc::EIO) + })?; + + debug_assert_eq!(offset, pos); + + // As written in the fuse docs, read should send exactly the number + // of bytes requested except on EOF or error. + + let mut buf: Vec<u8> = Vec::with_capacity(size as usize); + + // copy things from the internal buffer into buf to fill it till up until size + tokio::io::copy(&mut blob_reader.as_mut().take(size as u64), &mut buf).await?; + + Ok::<_, std::io::Error>(buf) + })?; + + // We cannot use w.write() here, we're required to call write multiple + // times until we wrote the entirety of the buffer (which is `size`, except on EOF). + let buf_len = buf.len(); + let bytes_written = io::copy(&mut Cursor::new(buf), w)?; + if bytes_written != buf_len as u64 { + error!(bytes_written=%bytes_written, "unable to write all of buf to kernel"); + return Err(io::Error::from_raw_os_error(libc::EIO)); + } + + Ok(bytes_written as usize) + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn readlink(&self, _ctx: &Context, inode: Self::Inode) -> io::Result<Vec<u8>> { + if inode == ROOT_ID { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // lookup the inode + match *self.inode_tracker.read().get(inode).unwrap() { + InodeData::Directory(..) | InodeData::Regular(..) => { + Err(io::Error::from_raw_os_error(libc::EINVAL)) + } + InodeData::Symlink(ref target) => Ok(target.to_vec()), + } + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, name=?name))] + fn getxattr( + &self, + _ctx: &Context, + inode: Self::Inode, + name: &CStr, + size: u32, + ) -> io::Result<GetxattrReply> { + if !self.show_xattr { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // Peek at the inode requested, and construct the response. + let digest_str = match *self + .inode_tracker + .read() + .get(inode) + .ok_or_else(|| io::Error::from_raw_os_error(libc::ENODATA))? + { + InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _)) + | InodeData::Directory(DirectoryInodeData::Populated(ref digest, _)) + if name.to_bytes() == XATTR_NAME_DIRECTORY_DIGEST => + { + digest.to_string() + } + InodeData::Regular(ref digest, _, _) if name.to_bytes() == XATTR_NAME_BLOB_DIGEST => { + digest.to_string() + } + _ => { + return Err(io::Error::from_raw_os_error(libc::ENODATA)); + } + }; + + if size == 0 { + Ok(GetxattrReply::Count(digest_str.len() as u32)) + } else if size < digest_str.len() as u32 { + Err(io::Error::from_raw_os_error(libc::ERANGE)) + } else { + Ok(GetxattrReply::Value(digest_str.into_bytes())) + } + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn listxattr( + &self, + _ctx: &Context, + inode: Self::Inode, + size: u32, + ) -> io::Result<ListxattrReply> { + if !self.show_xattr { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // determine the (\0-terminated list) to of xattr keys present, depending on the type of the inode. + let xattrs_names = { + let mut out = Vec::new(); + if let Some(inode_data) = self.inode_tracker.read().get(inode) { + match *inode_data { + InodeData::Directory(_) => { + out.extend_from_slice(XATTR_NAME_DIRECTORY_DIGEST); + out.push_byte(b'\x00'); + } + InodeData::Regular(..) => { + out.extend_from_slice(XATTR_NAME_BLOB_DIGEST); + out.push_byte(b'\x00'); + } + _ => {} + } + } + out + }; + + if size == 0 { + Ok(ListxattrReply::Count(xattrs_names.len() as u32)) + } else if size < xattrs_names.len() as u32 { + Err(io::Error::from_raw_os_error(libc::ERANGE)) + } else { + Ok(ListxattrReply::Names(xattrs_names.to_vec())) + } + } +} diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs new file mode 100644 index 0000000000..6609e049a1 --- /dev/null +++ b/tvix/castore/src/fs/root_nodes.rs @@ -0,0 +1,37 @@ +use std::collections::BTreeMap; + +use crate::{proto::node::Node, Error}; +use bytes::Bytes; +use futures::stream::BoxStream; +use tonic::async_trait; + +/// Provides an interface for looking up root nodes in tvix-castore by given +/// a lookup key (usually the basename), and optionally allow a listing. +#[async_trait] +pub trait RootNodes: Send + Sync { + /// Looks up a root CA node based on the basename of the node in the root + /// directory of the filesystem. + async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>; + + /// Lists all root CA nodes in the filesystem. An error can be returned + /// in case listing is not allowed + fn list(&self) -> BoxStream<Result<Node, Error>>; +} + +#[async_trait] +/// Implements RootNodes for something deref'ing to a BTreeMap of Nodes, where +/// the key is the node name. +impl<T> RootNodes for T +where + T: AsRef<BTreeMap<Bytes, Node>> + Send + Sync, +{ + async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> { + Ok(self.as_ref().get(name).cloned()) + } + + fn list(&self) -> BoxStream<Result<Node, Error>> { + Box::pin(tokio_stream::iter( + self.as_ref().iter().map(|(_, v)| Ok(v.clone())), + )) + } +} diff --git a/tvix/castore/src/fs/virtiofs.rs b/tvix/castore/src/fs/virtiofs.rs new file mode 100644 index 0000000000..d63e2f2bdd --- /dev/null +++ b/tvix/castore/src/fs/virtiofs.rs @@ -0,0 +1,238 @@ +use std::{ + convert, error, fmt, io, + ops::Deref, + path::Path, + sync::{Arc, MutexGuard, RwLock}, +}; + +use fuse_backend_rs::{ + api::{filesystem::FileSystem, server::Server}, + transport::{FsCacheReqHandler, Reader, VirtioFsWriter}, +}; +use tracing::error; +use vhost::vhost_user::{ + Listener, SlaveFsCacheReq, VhostUserProtocolFeatures, VhostUserVirtioFeatures, +}; +use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringMutex, VringState, VringT}; +use virtio_bindings::bindings::virtio_ring::{ + VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC, +}; +use virtio_queue::QueueT; +use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; +use vmm_sys_util::epoll::EventSet; + +const VIRTIO_F_VERSION_1: u32 = 32; +const NUM_QUEUES: usize = 2; +const QUEUE_SIZE: usize = 1024; + +#[derive(Debug)] +enum Error { + /// Failed to handle non-input event. + HandleEventNotEpollIn, + /// Failed to handle unknown event. + HandleEventUnknownEvent, + /// Invalid descriptor chain. + InvalidDescriptorChain, + /// Failed to handle filesystem requests. + #[allow(dead_code)] + HandleRequests(fuse_backend_rs::Error), + /// Failed to construct new vhost user daemon. + NewDaemon, + /// Failed to start the vhost user daemon. + StartDaemon, + /// Failed to wait for the vhost user daemon. + WaitDaemon, +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "vhost_user_fs_error: {self:?}") + } +} + +impl error::Error for Error {} + +impl convert::From<Error> for io::Error { + fn from(e: Error) -> Self { + io::Error::new(io::ErrorKind::Other, e) + } +} + +struct VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + server: Arc<Server<Arc<FS>>>, + event_idx: bool, + guest_mem: GuestMemoryAtomic<GuestMemoryMmap>, + cache_req: Option<SlaveFsCacheReq>, +} + +impl<FS> VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + fn process_queue(&mut self, vring: &mut MutexGuard<VringState>) -> std::io::Result<bool> { + let mut used_descs = false; + + while let Some(desc_chain) = vring + .get_queue_mut() + .pop_descriptor_chain(self.guest_mem.memory()) + { + let memory = desc_chain.memory(); + let reader = Reader::from_descriptor_chain(memory, desc_chain.clone()) + .map_err(|_| Error::InvalidDescriptorChain)?; + let writer = VirtioFsWriter::new(memory, desc_chain.clone()) + .map_err(|_| Error::InvalidDescriptorChain)?; + + self.server + .handle_message( + reader, + writer.into(), + self.cache_req + .as_mut() + .map(|req| req as &mut dyn FsCacheReqHandler), + None, + ) + .map_err(Error::HandleRequests)?; + + // TODO: Is len 0 correct? + if let Err(error) = vring + .get_queue_mut() + .add_used(memory, desc_chain.head_index(), 0) + { + error!(?error, "failed to add desc back to ring"); + } + + // TODO: What happens if we error out before here? + used_descs = true; + } + + let needs_notification = if self.event_idx { + match vring + .get_queue_mut() + .needs_notification(self.guest_mem.memory().deref()) + { + Ok(needs_notification) => needs_notification, + Err(error) => { + error!(?error, "failed to check if queue needs notification"); + true + } + } + } else { + true + }; + + if needs_notification { + if let Err(error) = vring.signal_used_queue() { + error!(?error, "failed to signal used queue"); + } + } + + Ok(used_descs) + } +} + +impl<FS> VhostUserBackendMut<VringMutex> for VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + fn num_queues(&self) -> usize { + NUM_QUEUES + } + + fn max_queue_size(&self) -> usize { + QUEUE_SIZE + } + + fn features(&self) -> u64 { + 1 << VIRTIO_F_VERSION_1 + | 1 << VIRTIO_RING_F_INDIRECT_DESC + | 1 << VIRTIO_RING_F_EVENT_IDX + | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::SLAVE_REQ + } + + fn set_event_idx(&mut self, enabled: bool) { + self.event_idx = enabled; + } + + fn update_memory(&mut self, _mem: GuestMemoryAtomic<GuestMemoryMmap>) -> std::io::Result<()> { + // This is what most the vhost user implementations do... + Ok(()) + } + + fn set_slave_req_fd(&mut self, cache_req: SlaveFsCacheReq) { + self.cache_req = Some(cache_req); + } + + fn handle_event( + &mut self, + device_event: u16, + evset: vmm_sys_util::epoll::EventSet, + vrings: &[VringMutex], + _thread_id: usize, + ) -> std::io::Result<bool> { + if evset != EventSet::IN { + return Err(Error::HandleEventNotEpollIn.into()); + } + + let mut queue = match device_event { + // High priority queue + 0 => vrings[0].get_mut(), + // Regurlar priority queue + 1 => vrings[1].get_mut(), + _ => { + return Err(Error::HandleEventUnknownEvent.into()); + } + }; + + if self.event_idx { + loop { + queue + .get_queue_mut() + .enable_notification(self.guest_mem.memory().deref()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + if !self.process_queue(&mut queue)? { + break; + } + } + } else { + self.process_queue(&mut queue)?; + } + + Ok(false) + } +} + +pub fn start_virtiofs_daemon<FS, P>(fs: FS, socket: P) -> io::Result<()> +where + FS: FileSystem + Send + Sync + 'static, + P: AsRef<Path>, +{ + let guest_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); + + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let backend = Arc::new(RwLock::new(VhostUserFsBackend { + server, + guest_mem: guest_mem.clone(), + event_idx: false, + cache_req: None, + })); + + let listener = Listener::new(socket, true).unwrap(); + + let mut fs_daemon = + VhostUserDaemon::new(String::from("vhost-user-fs-tvix-store"), backend, guest_mem) + .map_err(|_| Error::NewDaemon)?; + + fs_daemon.start(listener).map_err(|_| Error::StartDaemon)?; + + fs_daemon.wait().map_err(|_| Error::WaitDaemon)?; + + Ok(()) +} |