diff options
Diffstat (limited to 'tvix/store/src/fs')
-rw-r--r-- | tvix/store/src/fs/mod.rs | 134 | ||||
-rw-r--r-- | tvix/store/src/fs/root_nodes.rs | 61 |
2 files changed, 119 insertions, 76 deletions
diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs index c1ae6c9c1901..cb2d2154e78d 100644 --- a/tvix/store/src/fs/mod.rs +++ b/tvix/store/src/fs/mod.rs @@ -1,6 +1,7 @@ mod file_attr; mod inode_tracker; mod inodes; +mod root_nodes; #[cfg(feature = "fuse")] pub mod fuse; @@ -11,12 +12,9 @@ pub mod virtiofs; #[cfg(test)] mod tests; -use crate::pathinfoservice::PathInfoService; - use fuse_backend_rs::abi::fuse_abi::stat64; use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; use futures::StreamExt; -use nix_compat::store_path::StorePath; use parking_lot::RwLock; use std::{ collections::HashMap, @@ -38,6 +36,7 @@ use tvix_castore::{ B3Digest, }; +use self::root_nodes::RootNodes; use self::{ file_attr::{gen_file_attr, ROOT_FILE_ATTR}, inode_tracker::InodeTracker, @@ -45,12 +44,7 @@ use self::{ }; /// This implements a read-only FUSE filesystem for a tvix-store -/// with the passed [BlobService], [DirectoryService] and [PathInfoService]. -/// -/// We don't allow listing on the root mountpoint (inode 0). -/// In the future, this might be made configurable once a listing method is -/// added to [self.path_info_service], and then show all store paths in that -/// store. +/// with the passed [BlobService], [DirectoryService] and [RootNodes]. /// /// Linux uses inodes in filesystems. When implementing FUSE, most calls are /// *for* a given inode. @@ -59,7 +53,7 @@ use self::{ /// corresponding store nodes. /// /// We internally delegate all inode allocation and state keeping to the -/// inode tracker, and store the currently "explored" store paths together with +/// inode tracker, and store the currently "explored" root nodes together with /// root inode of the root. /// /// There's some places where inodes are allocated / data inserted into @@ -78,16 +72,16 @@ use self::{ /// Due to the above being valid across the whole store, and considering the /// merkle structure is a DAG, not a tree, this also means we can't do "bucketed /// allocation", aka reserve Directory.size inodes for each PathInfo. -pub struct TvixStoreFs { +pub struct TvixStoreFs<RN> { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, - path_info_service: Arc<dyn PathInfoService>, + root_nodes_provider: RN, /// Whether to (try) listing elements in the root. list_root: bool, /// This maps a given StorePath to the inode we allocated for the root inode. - store_paths: RwLock<HashMap<StorePath, u64>>, + root_nodes: RwLock<HashMap<Vec<u8>, u64>>, /// This keeps track of inodes and data alongside them. inode_tracker: RwLock<InodeTracker>, @@ -101,21 +95,24 @@ pub struct TvixStoreFs { tokio_handle: tokio::runtime::Handle, } -impl TvixStoreFs { +impl<RN> TvixStoreFs<RN> +where + RN: RootNodes + Clone + 'static, +{ pub fn new( blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, - path_info_service: Arc<dyn PathInfoService>, + root_nodes_provider: RN, list_root: bool, ) -> Self { Self { blob_service, directory_service, - path_info_service, + root_nodes_provider, list_root, - store_paths: RwLock::new(HashMap::default()), + root_nodes: RwLock::new(HashMap::default()), inode_tracker: RwLock::new(Default::default()), file_handles: RwLock::new(Default::default()), @@ -124,11 +121,10 @@ impl TvixStoreFs { } } - /// Retrieves the inode for a given StorePath, if present. - /// This obtains a read lock on self.store_paths. - fn get_inode_for_store_path(&self, store_path: &StorePath) -> Option<u64> { - let store_paths = self.store_paths.read(); - store_paths.get(store_path).cloned() + /// Retrieves the inode for a given root node basename, if present. + /// This obtains a read lock on self.root_nodes. + fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> { + self.root_nodes.read().get(name).cloned() } /// For a given inode, look up the given directory behind it (from @@ -200,27 +196,19 @@ impl TvixStoreFs { /// This will turn a lookup request for a name in the root to a ino and /// [InodeData]. - /// It will peek in [self.store_paths], and then either look it up from + /// It will peek in [self.root_nodes], and then either look it up from /// [self.inode_tracker], - /// or otherwise fetch from [self.path_info_service], and then insert into + /// or otherwise fetch from [self.root_nodes], and then insert into /// [self.inode_tracker]. /// In the case the name can't be found, a libc::ENOENT is returned. fn name_in_root_to_ino_and_data( &self, name: &std::ffi::CStr, ) -> io::Result<(u64, Arc<InodeData>)> { - // parse the name into a [StorePath]. - let store_path = StorePath::from_bytes(name.to_bytes()).map_err(|e| { - debug!(e=?e, "unable to parse as store path"); - // This is not an error, but a "ENOENT", as someone can stat - // a file inside the root that's no valid store path - io::Error::from_raw_os_error(libc::ENOENT) - })?; - - // Look up the inode for that store path. + // Look up the inode for that root node. // If there's one, [self.inode_tracker] MUST also contain the data, // which we can then return. - if let Some(inode) = self.get_inode_for_store_path(&store_path) { + if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) { return Ok(( inode, self.inode_tracker @@ -231,46 +219,42 @@ impl TvixStoreFs { )); } - // We don't have it yet, look it up in [self.path_info_service]. + // We don't have it yet, look it up in [self.root_nodes]. match self.tokio_handle.block_on({ - let path_info_service = self.path_info_service.clone(); - let digest = *store_path.digest(); - async move { path_info_service.get(digest).await } + let root_nodes_provider = self.root_nodes_provider.clone(); + async move { root_nodes_provider.get_by_basename(name.to_bytes()).await } }) { - // if there was an error looking up the path_info, propagate up an IO error. + // if there was an error looking up the root node, propagate up an IO error. Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)), - // the pathinfo doesn't exist, so the file doesn't exist. + // the root node doesn't exist, so the file doesn't exist. Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), - // The pathinfo does exist - Ok(Some(path_info)) => { - // There must be a root node (ensured by the validation happening inside clients) - let root_node = path_info.node.unwrap().node.unwrap(); - + // The root node does exist + Ok(Some(root_node)) => { // The name must match what's passed in the lookup, otherwise this is also a ENOENT. - if root_node.get_name() != store_path.to_string().as_bytes() { - debug!(root_node.name=?root_node.get_name(), store_path.name=%store_path.to_string(), "store path mismatch"); + if root_node.get_name() != name.to_bytes() { + debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch"); return Err(io::Error::from_raw_os_error(libc::ENOENT)); } // Let's check if someone else beat us to updating the inode tracker and - // store_paths map. This avoids locking inode_tracker for writing. - if let Some(ino) = self.store_paths.read().get(&store_path) { + // root_nodes map. This avoids locking inode_tracker for writing. + if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) { return Ok(( *ino, self.inode_tracker.read().get(*ino).expect("must exist"), )); } - // Only in case it doesn't, lock [self.store_paths] and + // Only in case it doesn't, lock [self.root_nodes] and // [self.inode_tracker] for writing. - let mut store_paths = self.store_paths.write(); + let mut root_nodes = self.root_nodes.write(); let mut inode_tracker = self.inode_tracker.write(); // insert the (sparse) inode data and register in - // self.store_paths. + // self.root_nodes. let inode_data: InodeData = (&root_node).into(); let ino = inode_tracker.put(inode_data.clone()); - store_paths.insert(store_path, ino); + root_nodes.insert(name.to_bytes().into(), ino); Ok((ino, Arc::new(inode_data))) } @@ -278,7 +262,10 @@ impl TvixStoreFs { } } -impl FileSystem for TvixStoreFs { +impl<RN> FileSystem for TvixStoreFs<RN> +where + RN: RootNodes + Clone + 'static, +{ type Handle = u64; type Inode = u64; @@ -317,7 +304,7 @@ impl FileSystem for TvixStoreFs { // This goes from a parent inode to a node. // - If the parent is [ROOT_ID], we need to check - // [self.store_paths] (fetching from PathInfoService if needed) + // [self.root_nodes] (fetching from a [RootNode] provider if needed) // - Otherwise, lookup the parent in [self.inode_tracker] (which must be // a [InodeData::Directory]), and find the child with that name. if parent == ROOT_ID { @@ -380,15 +367,15 @@ impl FileSystem for TvixStoreFs { if !self.list_root { return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo } else { - let path_info_service = self.path_info_service.clone(); + let root_nodes_provider = self.root_nodes_provider.clone(); let (tx, mut rx) = mpsc::channel(16); // This task will run in the background immediately and will exit // after the stream ends or if we no longer want any more entries. self.tokio_handle.spawn(async move { - let mut stream = path_info_service.list().skip(offset as usize).enumerate(); - while let Some(path_info) = stream.next().await { - if tx.send(path_info).await.is_err() { + let mut stream = root_nodes_provider.list().skip(offset as usize).enumerate(); + while let Some(node) = stream.next().await { + if tx.send(node).await.is_err() { // If we get a send error, it means the sync code // doesn't want any more entries. break; @@ -396,29 +383,24 @@ impl FileSystem for TvixStoreFs { } }); - while let Some((i, path_info)) = rx.blocking_recv() { - let path_info = match path_info { + while let Some((i, root_node)) = rx.blocking_recv() { + let root_node = match root_node { Err(e) => { warn!("failed to retrieve pathinfo: {}", e); return Err(io::Error::from_raw_os_error(libc::EPERM)); } - Ok(path_info) => path_info, + Ok(root_node) => root_node, }; - // We know the root node exists and the store_path can be parsed because clients MUST validate. - let root_node = path_info.node.unwrap().node.unwrap(); - let store_path = StorePath::from_bytes(root_node.get_name()).unwrap(); - + let name = root_node.get_name(); // obtain the inode, or allocate a new one. - let ino = self - .get_inode_for_store_path(&store_path) - .unwrap_or_else(|| { - // insert the (sparse) inode data and register in - // self.store_paths. - let ino = self.inode_tracker.write().put((&root_node).into()); - self.store_paths.write().insert(store_path.clone(), ino); - ino - }); + let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| { + // insert the (sparse) inode data and register in + // self.root_nodes. + let ino = self.inode_tracker.write().put((&root_node).into()); + self.root_nodes.write().insert(name.into(), ino); + ino + }); let ty = match root_node { Node::Directory(_) => libc::S_IFDIR, @@ -430,7 +412,7 @@ impl FileSystem for TvixStoreFs { ino, offset: offset + i as u64 + 1, type_: ty, - name: store_path.to_string().as_bytes(), + name, })?; // If the buffer is full, add_entry will return `Ok(0)`. if written == 0 { diff --git a/tvix/store/src/fs/root_nodes.rs b/tvix/store/src/fs/root_nodes.rs new file mode 100644 index 000000000000..e672c6e647f3 --- /dev/null +++ b/tvix/store/src/fs/root_nodes.rs @@ -0,0 +1,61 @@ +use std::{ops::Deref, pin::Pin}; + +use futures::{Stream, StreamExt}; +use nix_compat::store_path::StorePath; +use tonic::async_trait; +use tvix_castore::{proto::node::Node, Error}; + +use crate::pathinfoservice::PathInfoService; + +/// Provides an interface for looking up root nodes in tvix-castore by given +/// a lookup key (usually the basename), and optionally allow a listing. +/// +#[async_trait] +pub trait RootNodes: Send + Sync { + /// Looks up a root CA node based on the basename of the node in the root + /// directory of the filesystem. + async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>; + + /// Lists all root CA nodes in the filesystem. An error can be returned + /// in case listing is not allowed + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send>>; +} + +/// Implements root node lookup for any [PathInfoService]. This represents a flat +/// directory structure like /nix/store where each entry in the root filesystem +/// directory corresponds to a CA node. +#[async_trait] +impl<T> RootNodes for T +where + T: Deref<Target = dyn PathInfoService> + Send + Sync, +{ + async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> { + let Ok(store_path) = StorePath::from_bytes(name) else { + return Ok(None); + }; + + Ok(self + .deref() + .get(*store_path.digest()) + .await? + .map(|path_info| { + path_info + .node + .expect("missing root node") + .node + .expect("empty node") + })) + } + + fn list(&self) -> Pin<Box<dyn Stream<Item = Result<Node, Error>> + Send>> { + Box::pin(self.deref().list().map(|result| { + result.map(|path_info| { + path_info + .node + .expect("missing root node") + .node + .expect("empty node") + }) + })) + } +} |