diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/Cargo.lock | 1 | ||||
-rw-r--r-- | tvix/Cargo.nix | 4 | ||||
-rw-r--r-- | tvix/castore/Cargo.toml | 1 | ||||
-rw-r--r-- | tvix/castore/src/fs/mod.rs | 95 | ||||
-rw-r--r-- | tvix/castore/src/fs/tests.rs | 117 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 12 | ||||
-rw-r--r-- | tvix/store/src/pathinfoservice/fs/mod.rs | 2 |
7 files changed, 227 insertions, 5 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index d53ced76aeb1..a15c71d26d3f 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -4380,6 +4380,7 @@ dependencies = [ "vm-memory", "vmm-sys-util", "walkdir", + "xattr", "zstd", ] diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 6886685b63aa..48b53a1943b5 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -13953,6 +13953,10 @@ rec { name = "tokio-retry"; packageId = "tokio-retry"; } + { + name = "xattr"; + packageId = "xattr"; + } ]; features = { "cloud" = [ "dep:bigtable_rs" "object_store/aws" "object_store/azure" "object_store/gcp" ]; diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index 6fc44f1574c0..447ddea1dc79 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -86,6 +86,7 @@ tempfile = "3.3.0" tokio-retry = "0.3.0" hex-literal = "0.4.1" rstest_reuse = "0.6.0" +xattr = "1.3.1" [features] default = [] diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index 699598b520ff..8731c8374df9 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -19,10 +19,14 @@ use crate::{ proto::{node::Node, NamedNode}, B3Digest, }; +use bstr::ByteVec; use fuse_backend_rs::abi::fuse_abi::stat64; -use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; +use fuse_backend_rs::api::filesystem::{ + Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID, +}; use futures::StreamExt; use parking_lot::RwLock; +use std::ffi::CStr; use std::{ collections::HashMap, io, @@ -83,6 +87,9 @@ pub struct TvixStoreFs<BS, DS, RN> { /// Whether to (try) listing elements in the root. list_root: bool, + /// Whether to expose blob and directory digests as extended attributes. + show_xattr: bool, + /// This maps a given basename in the root to the inode we allocated for the node. root_nodes: RwLock<HashMap<Vec<u8>, u64>>, @@ -109,6 +116,7 @@ where directory_service: DS, root_nodes_provider: RN, list_root: bool, + show_xattr: bool, ) -> Self { Self { blob_service, @@ -116,6 +124,7 @@ where root_nodes_provider, list_root, + show_xattr, root_nodes: RwLock::new(HashMap::default()), inode_tracker: RwLock::new(Default::default()), @@ -267,6 +276,9 @@ where } } +const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest"; +const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest"; + impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN> where BS: AsRef<dyn BlobService> + Clone + Send + 'static, @@ -628,4 +640,85 @@ where InodeData::Symlink(ref target) => Ok(target.to_vec()), } } + + #[tracing::instrument(skip_all, fields(rq.inode = inode, name=?name))] + fn getxattr( + &self, + _ctx: &Context, + inode: Self::Inode, + name: &CStr, + size: u32, + ) -> io::Result<GetxattrReply> { + if !self.show_xattr { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // Peek at the inode requested, and construct the response. + let digest_str = match *self + .inode_tracker + .read() + .get(inode) + .ok_or_else(|| io::Error::from_raw_os_error(libc::ENODATA))? + { + InodeData::Directory(DirectoryInodeData::Sparse(ref digest, _)) + | InodeData::Directory(DirectoryInodeData::Populated(ref digest, _)) + if name.to_bytes() == XATTR_NAME_DIRECTORY_DIGEST => + { + digest.to_string() + } + InodeData::Regular(ref digest, _, _) if name.to_bytes() == XATTR_NAME_BLOB_DIGEST => { + digest.to_string() + } + _ => { + return Err(io::Error::from_raw_os_error(libc::ENODATA)); + } + }; + + if size == 0 { + Ok(GetxattrReply::Count(digest_str.len() as u32)) + } else if size < digest_str.len() as u32 { + Err(io::Error::from_raw_os_error(libc::ERANGE)) + } else { + Ok(GetxattrReply::Value(digest_str.into_bytes())) + } + } + + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn listxattr( + &self, + _ctx: &Context, + inode: Self::Inode, + size: u32, + ) -> io::Result<ListxattrReply> { + if !self.show_xattr { + return Err(io::Error::from_raw_os_error(libc::ENOSYS)); + } + + // determine the (\0-terminated list) to of xattr keys present, depending on the type of the inode. + let xattrs_names = { + let mut out = Vec::new(); + if let Some(inode_data) = self.inode_tracker.read().get(inode) { + match *inode_data { + InodeData::Directory(_) => { + out.extend_from_slice(XATTR_NAME_DIRECTORY_DIGEST); + out.push_byte(b'\x00'); + } + InodeData::Regular(..) => { + out.extend_from_slice(XATTR_NAME_BLOB_DIGEST); + out.push_byte(b'\x00'); + } + _ => {} + } + } + out + }; + + if size == 0 { + Ok(ListxattrReply::Count(xattrs_names.len() as u32)) + } else if size < xattrs_names.len() as u32 { + Err(io::Error::from_raw_os_error(libc::ERANGE)) + } else { + Ok(ListxattrReply::Names(xattrs_names.to_vec())) + } + } } diff --git a/tvix/castore/src/fs/tests.rs b/tvix/castore/src/fs/tests.rs index 924454caa6dd..226c9975d573 100644 --- a/tvix/castore/src/fs/tests.rs +++ b/tvix/castore/src/fs/tests.rs @@ -1,18 +1,19 @@ +use bstr::ByteSlice; +use bytes::Bytes; use std::{ collections::BTreeMap, + ffi::{OsStr, OsString}, io::{self, Cursor}, - os::unix::fs::MetadataExt, + os::unix::{ffi::OsStrExt, fs::MetadataExt}, path::Path, sync::Arc, }; - -use bytes::Bytes; use tempfile::TempDir; use tokio_stream::{wrappers::ReadDirStream, StreamExt}; use super::{fuse::FuseDaemon, TvixStoreFs}; +use crate::proto as castorepb; use crate::proto::node::Node; -use crate::proto::{self as castorepb}; use crate::{ blobservice::{BlobService, MemoryBlobService}, directoryservice::{DirectoryService, MemoryDirectoryService}, @@ -40,6 +41,7 @@ fn do_mount<P: AsRef<Path>, BS, DS>( root_nodes: BTreeMap<bytes::Bytes, Node>, mountpoint: P, list_root: bool, + show_xattr: bool, ) -> io::Result<FuseDaemon> where BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static, @@ -50,6 +52,7 @@ where directory_service, Arc::new(root_nodes), list_root, + show_xattr, ); FuseDaemon::new(Arc::new(fs), mountpoint.as_ref(), 4, false) } @@ -250,6 +253,7 @@ async fn mount() { BTreeMap::default(), tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -272,6 +276,7 @@ async fn root() { BTreeMap::default(), tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -311,6 +316,7 @@ async fn root_with_listing() { root_nodes, tmpdir.path(), true, /* allow listing */ + false, ) .expect("must succeed"); @@ -354,6 +360,7 @@ async fn stat_file_at_root() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -390,6 +397,7 @@ async fn read_file_at_root() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -426,6 +434,7 @@ async fn read_large_file_at_root() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -470,6 +479,7 @@ async fn symlink_readlink() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -516,6 +526,7 @@ async fn read_stat_through_symlink() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -560,6 +571,7 @@ async fn read_stat_directory() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -573,6 +585,91 @@ async fn read_stat_directory() { fuse_daemon.unmount().expect("unmount"); } +/// Read a directory and file in the root, and ensure the xattrs expose blob or +/// directory digests. +#[tokio::test] +async fn xattr() { + // https://plume.benboeckel.net/~/JustAnotherBlog/skipping-tests-in-rust + if !std::path::Path::new("/dev/fuse").exists() { + eprintln!("skipping test"); + return; + } + let tmpdir = TempDir::new().unwrap(); + + let (blob_service, directory_service) = gen_svcs(); + let mut root_nodes = BTreeMap::default(); + + populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; + populate_blob_a(&blob_service, &mut root_nodes).await; + + let mut fuse_daemon = do_mount( + blob_service, + directory_service, + root_nodes, + tmpdir.path(), + false, + true, /* support xattr */ + ) + .expect("must succeed"); + + // peek at the directory + { + let p = tmpdir.path().join(DIRECTORY_WITH_KEEP_NAME); + + let xattr_names: Vec<OsString> = xattr::list(&p).expect("must succeed").collect(); + // There should be 1 key, XATTR_NAME_DIRECTORY_DIGEST. + assert_eq!(1, xattr_names.len(), "there should be 1 xattr name"); + assert_eq!( + super::XATTR_NAME_DIRECTORY_DIGEST, + xattr_names.first().unwrap().as_encoded_bytes() + ); + + // The key should equal to the string-formatted b3 digest. + let val = xattr::get(&p, OsStr::from_bytes(super::XATTR_NAME_DIRECTORY_DIGEST)) + .expect("must succeed") + .expect("must be some"); + assert_eq!( + fixtures::DIRECTORY_WITH_KEEP + .digest() + .to_string() + .as_bytes() + .as_bstr(), + val.as_bstr() + ); + + // Reading another xattr key is gonna return None. + let val = xattr::get(&p, OsStr::from_bytes(b"user.cheesecake")).expect("must succeed"); + assert_eq!(None, val); + } + // peek at the file + { + let p = tmpdir.path().join(BLOB_A_NAME); + + let xattr_names: Vec<OsString> = xattr::list(&p).expect("must succeed").collect(); + // There should be 1 key, XATTR_NAME_BLOB_DIGEST. + assert_eq!(1, xattr_names.len(), "there should be 1 xattr name"); + assert_eq!( + super::XATTR_NAME_BLOB_DIGEST, + xattr_names.first().unwrap().as_encoded_bytes() + ); + + // The key should equal to the string-formatted b3 digest. + let val = xattr::get(&p, OsStr::from_bytes(super::XATTR_NAME_BLOB_DIGEST)) + .expect("must succeed") + .expect("must be some"); + assert_eq!( + fixtures::BLOB_A_DIGEST.to_string().as_bytes().as_bstr(), + val.as_bstr() + ); + + // Reading another xattr key is gonna return None. + let val = xattr::get(&p, OsStr::from_bytes(b"user.cheesecake")).expect("must succeed"); + assert_eq!(None, val); + } + + fuse_daemon.unmount().expect("unmount"); +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] /// Read a blob inside a directory. This ensures we successfully populate directory data. async fn read_blob_inside_dir() { @@ -594,6 +691,7 @@ async fn read_blob_inside_dir() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -633,6 +731,7 @@ async fn read_blob_deep_inside_dir() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -675,6 +774,7 @@ async fn readdir() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -734,6 +834,7 @@ async fn readdir_deep() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -783,6 +884,7 @@ async fn check_attributes() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -857,6 +959,7 @@ async fn compare_inodes_directories() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -900,6 +1003,7 @@ async fn compare_inodes_files() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -948,6 +1052,7 @@ async fn compare_inodes_symlinks() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -990,6 +1095,7 @@ async fn read_wrong_paths_in_root() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -1044,6 +1150,7 @@ async fn disallow_writes() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -1075,6 +1182,7 @@ async fn missing_directory() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); @@ -1122,6 +1230,7 @@ async fn missing_blob() { root_nodes, tmpdir.path(), false, + false, ) .expect("must succeed"); diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index a5845c74aed0..15f37d301f9d 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -161,6 +161,10 @@ enum Commands { /// (exhaustive) listing. #[clap(long, short, action)] list_root: bool, + + #[arg(long, default_value_t = true)] + /// Whether to expose blob and directory digests as extended attributes. + show_xattr: bool, }, /// Starts a tvix-store virtiofs daemon at the given socket path. #[cfg(feature = "virtiofs")] @@ -183,6 +187,10 @@ enum Commands { /// (exhaustive) listing. #[clap(long, short, action)] list_root: bool, + + #[arg(long, default_value_t = true)] + /// Whether to expose blob and directory digests as extended attributes. + show_xattr: bool, }, } @@ -459,6 +467,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { list_root, threads, allow_other, + show_xattr, } => { let (blob_service, directory_service, path_info_service) = tvix_store::utils::construct_services( @@ -474,6 +483,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { directory_service, Arc::from(path_info_service), list_root, + show_xattr, ); info!(mount_path=?dest, "mounting"); @@ -499,6 +509,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { directory_service_addr, path_info_service_addr, list_root, + show_xattr, } => { let (blob_service, directory_service, path_info_service) = tvix_store::utils::construct_services( @@ -514,6 +525,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { directory_service, Arc::from(path_info_service), list_root, + show_xattr, ); info!(socket_path=?socket, "starting virtiofs-daemon"); diff --git a/tvix/store/src/pathinfoservice/fs/mod.rs b/tvix/store/src/pathinfoservice/fs/mod.rs index 45d59fd0bcb8..aa64b1c01f16 100644 --- a/tvix/store/src/pathinfoservice/fs/mod.rs +++ b/tvix/store/src/pathinfoservice/fs/mod.rs @@ -17,6 +17,7 @@ pub fn make_fs<BS, DS, PS>( directory_service: DS, path_info_service: PS, list_root: bool, + show_xattr: bool, ) -> TvixStoreFs<BS, DS, RootNodesWrapper<PS>> where BS: AsRef<dyn BlobService> + Send + Clone + 'static, @@ -28,6 +29,7 @@ where directory_service, RootNodesWrapper(path_info_service), list_root, + show_xattr, ) } |