diff options
Diffstat (limited to 'tvix/castore/src/fs/mod.rs')
-rw-r--r-- | tvix/castore/src/fs/mod.rs | 111 |
1 files changed, 64 insertions, 47 deletions
diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index b565ed60ac42..4f50868b8f44 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -15,15 +15,13 @@ use self::{ inode_tracker::InodeTracker, inodes::{DirectoryInodeData, InodeData}, }; -use crate::proto as castorepb; use crate::{ blobservice::{BlobReader, BlobService}, directoryservice::DirectoryService, - proto::{node::Node, NamedNode}, - B3Digest, + path::PathComponent, + B3Digest, Node, }; use bstr::ByteVec; -use bytes::Bytes; use fuse_backend_rs::abi::fuse_abi::{stat64, OpenOptions}; use fuse_backend_rs::api::filesystem::{ Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID, @@ -89,7 +87,7 @@ pub struct TvixStoreFs<BS, DS, RN> { show_xattr: bool, /// This maps a given basename in the root to the inode we allocated for the node. - root_nodes: RwLock<HashMap<Bytes, u64>>, + root_nodes: RwLock<HashMap<PathComponent, u64>>, /// This keeps track of inodes and data alongside them. inode_tracker: RwLock<InodeTracker>, @@ -105,7 +103,7 @@ pub struct TvixStoreFs<BS, DS, RN> { u64, ( Span, - Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>, + Arc<Mutex<mpsc::Receiver<(usize, Result<(PathComponent, Node), crate::Error>)>>>, ), >, >, @@ -123,8 +121,8 @@ pub struct TvixStoreFs<BS, DS, RN> { impl<BS, DS, RN> TvixStoreFs<BS, DS, RN> where - BS: AsRef<dyn BlobService> + Clone + Send, - DS: AsRef<dyn DirectoryService> + Clone + Send + 'static, + BS: BlobService + Clone + Send, + DS: DirectoryService + Clone + Send + 'static, RN: RootNodes + Clone + 'static, { pub fn new( @@ -156,7 +154,7 @@ where /// Retrieves the inode for a given root node basename, if present. /// This obtains a read lock on self.root_nodes. - fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> { + fn get_inode_for_root_name(&self, name: &PathComponent) -> Option<u64> { self.root_nodes.read().get(name).cloned() } @@ -167,8 +165,12 @@ where /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup /// in self.directory_service is performed, and self.inode_tracker is updated with the /// [DirectoryInodeData::Populated]. + #[allow(clippy::type_complexity)] #[instrument(skip(self), err)] - fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> { + fn get_directory_children( + &self, + ino: u64, + ) -> io::Result<(B3Digest, Vec<(u64, PathComponent, Node)>)> { let data = self.inode_tracker.read().get(ino).unwrap(); match *data { // if it's populated already, return children. @@ -184,7 +186,7 @@ where .block_on({ let directory_service = self.directory_service.clone(); let parent_digest = parent_digest.to_owned(); - async move { directory_service.as_ref().get(&parent_digest).await } + async move { directory_service.get(&parent_digest).await } })? .ok_or_else(|| { warn!(directory.digest=%parent_digest, "directory not found"); @@ -198,13 +200,13 @@ where let children = { let mut inode_tracker = self.inode_tracker.write(); - let children: Vec<(u64, castorepb::node::Node)> = directory - .nodes() - .map(|child_node| { - let (inode_data, _) = InodeData::from_node(child_node.clone()); + let children: Vec<(u64, PathComponent, Node)> = directory + .into_nodes() + .map(|(child_name, child_node)| { + let inode_data = InodeData::from_node(&child_node); let child_ino = inode_tracker.put(inode_data); - (child_ino, child_node) + (child_ino, child_name, child_node) }) .collect(); @@ -238,12 +240,12 @@ where /// In the case the name can't be found, a libc::ENOENT is returned. fn name_in_root_to_ino_and_data( &self, - name: &std::ffi::CStr, + name: &PathComponent, ) -> io::Result<(u64, Arc<InodeData>)> { // Look up the inode for that root node. // If there's one, [self.inode_tracker] MUST also contain the data, // which we can then return. - if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) { + if let Some(inode) = self.get_inode_for_root_name(name) { return Ok(( inode, self.inode_tracker @@ -257,7 +259,8 @@ where // We don't have it yet, look it up in [self.root_nodes]. match self.tokio_handle.block_on({ let root_nodes_provider = self.root_nodes_provider.clone(); - async move { root_nodes_provider.get_by_basename(name.to_bytes()).await } + let name = name.clone(); + async move { root_nodes_provider.get_by_basename(&name).await } }) { // if there was an error looking up the root node, propagate up an IO error. Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)), @@ -265,15 +268,9 @@ where Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), // The root node does exist Ok(Some(root_node)) => { - // The name must match what's passed in the lookup, otherwise this is also a ENOENT. - if root_node.get_name() != name.to_bytes() { - debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch"); - return Err(io::Error::from_raw_os_error(libc::ENOENT)); - } - // Let's check if someone else beat us to updating the inode tracker and // root_nodes map. This avoids locking inode_tracker for writing. - if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) { + if let Some(ino) = self.root_nodes.read().get(name) { return Ok(( *ino, self.inode_tracker.read().get(*ino).expect("must exist"), @@ -287,9 +284,9 @@ where // insert the (sparse) inode data and register in // self.root_nodes. - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&root_node); let ino = inode_tracker.put(inode_data.clone()); - root_nodes.insert(name, ino); + root_nodes.insert(name.to_owned(), ino); Ok((ino, Arc::new(inode_data))) } @@ -303,10 +300,22 @@ const ROOT_NODES_BUFFER_SIZE: usize = 16; const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest"; const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest"; +#[cfg(all(feature = "virtiofs", target_os = "linux"))] +impl<BS, DS, RN> fuse_backend_rs::api::filesystem::Layer for TvixStoreFs<BS, DS, RN> +where + BS: BlobService + Clone + Send + 'static, + DS: DirectoryService + Send + Clone + 'static, + RN: RootNodes + Clone + 'static, +{ + fn root_inode(&self) -> Self::Inode { + ROOT_ID + } +} + impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN> where - BS: AsRef<dyn BlobService> + Clone + Send + 'static, - DS: AsRef<dyn DirectoryService> + Send + Clone + 'static, + BS: BlobService + Clone + Send + 'static, + DS: DirectoryService + Send + Clone + 'static, RN: RootNodes + Clone + 'static, { type Handle = u64; @@ -345,13 +354,17 @@ where ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> { debug!("lookup"); + // convert the CStr to a PathComponent + // If it can't be converted, we definitely don't have anything here. + let name: PathComponent = name.try_into().map_err(|_| std::io::ErrorKind::NotFound)?; + // This goes from a parent inode to a node. // - If the parent is [ROOT_ID], we need to check // [self.root_nodes] (fetching from a [RootNode] provider if needed) // - Otherwise, lookup the parent in [self.inode_tracker] (which must be // a [InodeData::Directory]), and find the child with that name. if parent == ROOT_ID { - let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?; + let (ino, inode_data) = self.name_in_root_to_ino_and_data(&name)?; debug!(inode_data=?&inode_data, ino=ino, "Some"); return Ok(inode_data.as_fuse_entry(ino)); @@ -364,7 +377,7 @@ where // Search for that name in the list of children and return the FileAttrs. // in the children, find the one with the desired name. - if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { + if let Some((child_ino, _, _)) = children.iter().find(|(_, n, _)| n == &name) { // lookup the child [InodeData] in [self.inode_tracker]. // We know the inodes for children have already been allocated. let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap(); @@ -400,8 +413,8 @@ where self.tokio_handle.spawn( async move { let mut stream = root_nodes_provider.list().enumerate(); - while let Some(node) = stream.next().await { - if tx.send(node).await.is_err() { + while let Some(e) = stream.next().await { + if tx.send(e).await.is_err() { // If we get a send error, it means the sync code // doesn't want any more entries. break; @@ -463,12 +476,12 @@ where .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; while let Some((i, n)) = rx.blocking_recv() { - let root_node = n.map_err(|e| { + let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EIO) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -483,7 +496,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), })?; // If the buffer is full, add_entry will return `Ok(0)`. if written == 0 { @@ -497,15 +510,17 @@ where let (parent_digest, children) = self.get_directory_children(inode)?; Span::current().record("directory.digest", parent_digest.to_string()); - for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + for (i, (ino, child_name, child_node)) in + children.into_iter().skip(offset as usize).enumerate() + { + let inode_data = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: child_name.as_ref(), })?; // If the buffer is full, add_entry will return `Ok(0)`. if written == 0 { @@ -550,12 +565,12 @@ where .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; while let Some((i, n)) = rx.blocking_recv() { - let root_node = n.map_err(|e| { + let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EPERM) })?; - let (inode_data, name) = InodeData::from_node(root_node); + let inode_data = InodeData::from_node(&node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -571,7 +586,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), }, inode_data.as_fuse_entry(ino), )?; @@ -587,8 +602,8 @@ where let (parent_digest, children) = self.get_directory_children(inode)?; Span::current().record("directory.digest", parent_digest.to_string()); - for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(child_node); + for (i, (ino, name, child_node)) in children.into_iter().skip(offset as usize).enumerate() { + let inode_data = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry( @@ -596,7 +611,7 @@ where ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: name.as_ref(), }, inode_data.as_fuse_entry(ino), )?; @@ -641,6 +656,7 @@ where ) -> io::Result<( Option<Self::Handle>, fuse_backend_rs::api::filesystem::OpenOptions, + Option<u32>, )> { if inode == ROOT_ID { return Err(io::Error::from_raw_os_error(libc::ENOSYS)); @@ -659,7 +675,7 @@ where match self.tokio_handle.block_on({ let blob_service = self.blob_service.clone(); let blob_digest = blob_digest.clone(); - async move { blob_service.as_ref().open_read(&blob_digest).await } + async move { blob_service.open_read(&blob_digest).await } }) { Ok(None) => { warn!("blob not found"); @@ -684,6 +700,7 @@ where Ok(( Some(fh), fuse_backend_rs::api::filesystem::OpenOptions::empty(), + None, )) } } |