diff options
Diffstat (limited to 'tvix/castore/src/fs')
-rw-r--r-- | tvix/castore/src/fs/fuse/tests.rs | 98 | ||||
-rw-r--r-- | tvix/castore/src/fs/inodes.rs | 25 | ||||
-rw-r--r-- | tvix/castore/src/fs/mod.rs | 54 | ||||
-rw-r--r-- | tvix/castore/src/fs/root_nodes.rs | 13 |
4 files changed, 78 insertions, 112 deletions
diff --git a/tvix/castore/src/fs/fuse/tests.rs b/tvix/castore/src/fs/fuse/tests.rs index 3dad7d83aedd..30bbfc46235d 100644 --- a/tvix/castore/src/fs/fuse/tests.rs +++ b/tvix/castore/src/fs/fuse/tests.rs @@ -68,15 +68,11 @@ async fn populate_blob_a( root_nodes.insert( BLOB_A_NAME.into(), - Node::File( - FileNode::new( - BLOB_A_NAME.into(), - fixtures::BLOB_A_DIGEST.clone(), - fixtures::BLOB_A.len() as u64, - false, - ) - .unwrap(), - ), + Node::File(FileNode::new( + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u64, + false, + )), ); } @@ -92,15 +88,11 @@ async fn populate_blob_b( root_nodes.insert( BLOB_B_NAME.into(), - Node::File( - FileNode::new( - BLOB_B_NAME.into(), - fixtures::BLOB_B_DIGEST.clone(), - fixtures::BLOB_B.len() as u64, - false, - ) - .unwrap(), - ), + Node::File(FileNode::new( + fixtures::BLOB_B_DIGEST.clone(), + fixtures::BLOB_B.len() as u64, + false, + )), ); } @@ -120,22 +112,18 @@ async fn populate_blob_helloworld( root_nodes.insert( HELLOWORLD_BLOB_NAME.into(), - Node::File( - FileNode::new( - HELLOWORLD_BLOB_NAME.into(), - fixtures::HELLOWORLD_BLOB_DIGEST.clone(), - fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, - true, - ) - .unwrap(), - ), + Node::File(FileNode::new( + fixtures::HELLOWORLD_BLOB_DIGEST.clone(), + fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64, + true, + )), ); } async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( SYMLINK_NAME.into(), - Node::Symlink(SymlinkNode::new(SYMLINK_NAME.into(), BLOB_A_NAME.into()).unwrap()), + Node::Symlink(SymlinkNode::new(BLOB_A_NAME.into()).unwrap()), ); } @@ -144,9 +132,7 @@ async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) { async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( SYMLINK_NAME2.into(), - Node::Symlink( - SymlinkNode::new(SYMLINK_NAME2.into(), "/nix/store/somewhereelse".into()).unwrap(), - ), + Node::Symlink(SymlinkNode::new("/nix/store/somewhereelse".into()).unwrap()), ); } @@ -170,14 +156,10 @@ async fn populate_directory_with_keep( root_nodes.insert( DIRECTORY_WITH_KEEP_NAME.into(), - Node::Directory( - DirectoryNode::new( - DIRECTORY_WITH_KEEP_NAME.into(), - fixtures::DIRECTORY_WITH_KEEP.digest(), - fixtures::DIRECTORY_WITH_KEEP.size(), - ) - .unwrap(), - ), + Node::Directory(DirectoryNode::new( + fixtures::DIRECTORY_WITH_KEEP.digest(), + fixtures::DIRECTORY_WITH_KEEP.size(), + )), ); } @@ -186,14 +168,10 @@ async fn populate_directory_with_keep( async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( DIRECTORY_WITH_KEEP_NAME.into(), - Node::Directory( - DirectoryNode::new( - DIRECTORY_WITH_KEEP_NAME.into(), - fixtures::DIRECTORY_WITH_KEEP.digest(), - fixtures::DIRECTORY_WITH_KEEP.size(), - ) - .unwrap(), - ), + Node::Directory(DirectoryNode::new( + fixtures::DIRECTORY_WITH_KEEP.digest(), + fixtures::DIRECTORY_WITH_KEEP.size(), + )), ); } @@ -201,15 +179,11 @@ async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Byte async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) { root_nodes.insert( BLOB_A_NAME.into(), - Node::File( - FileNode::new( - BLOB_A_NAME.into(), - fixtures::BLOB_A_DIGEST.clone(), - fixtures::BLOB_A.len() as u64, - false, - ) - .unwrap(), - ), + Node::File(FileNode::new( + fixtures::BLOB_A_DIGEST.clone(), + fixtures::BLOB_A.len() as u64, + false, + )), ); } @@ -239,14 +213,10 @@ async fn populate_directory_complicated( root_nodes.insert( DIRECTORY_COMPLICATED_NAME.into(), - Node::Directory( - DirectoryNode::new( - DIRECTORY_COMPLICATED_NAME.into(), - fixtures::DIRECTORY_COMPLICATED.digest(), - fixtures::DIRECTORY_COMPLICATED.size(), - ) - .unwrap(), - ), + Node::Directory(DirectoryNode::new( + fixtures::DIRECTORY_COMPLICATED.digest(), + fixtures::DIRECTORY_COMPLICATED.size(), + )), ); } diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs index b04505c9fa98..e49d28f8e52e 100644 --- a/tvix/castore/src/fs/inodes.rs +++ b/tvix/castore/src/fs/inodes.rs @@ -2,9 +2,7 @@ //! about inodes, which present tvix-castore nodes in a filesystem. use std::time::Duration; -use bytes::Bytes; - -use crate::{B3Digest, NamedNode, Node}; +use crate::{B3Digest, Node}; #[derive(Clone, Debug)] pub enum InodeData { @@ -19,24 +17,19 @@ pub enum InodeData { /// lookup and did fetch the data. #[derive(Clone, Debug)] pub enum DirectoryInodeData { - Sparse(B3Digest, u64), // digest, size - Populated(B3Digest, Vec<(u64, Node)>), // [(child_inode, node)] + Sparse(B3Digest, u64), // digest, size + Populated(B3Digest, Vec<(u64, bytes::Bytes, Node)>), // [(child_inode, name, node)] } impl InodeData { /// Constructs a new InodeData by consuming a [Node]. - /// It splits off the orginal name, so it can be used later. - pub fn from_node(node: &Node) -> (Self, Bytes) { + pub fn from_node(node: &Node) -> Self { match node { - Node::Directory(n) => ( - Self::Directory(DirectoryInodeData::Sparse(n.digest().clone(), n.size())), - n.get_name().clone(), - ), - Node::File(n) => ( - Self::Regular(n.digest().clone(), n.size(), n.executable()), - n.get_name().clone(), - ), - Node::Symlink(n) => (Self::Symlink(n.target().clone()), n.get_name().clone()), + Node::Directory(n) => { + Self::Directory(DirectoryInodeData::Sparse(n.digest().clone(), n.size())) + } + Node::File(n) => Self::Regular(n.digest().clone(), n.size(), n.executable()), + Node::Symlink(n) => Self::Symlink(n.target().clone()), } } diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index 90c1f371b546..f819adb549f5 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -18,7 +18,7 @@ use self::{ use crate::{ blobservice::{BlobReader, BlobService}, directoryservice::DirectoryService, - {B3Digest, NamedNode, Node}, + {B3Digest, Node}, }; use bstr::ByteVec; use bytes::Bytes; @@ -103,7 +103,7 @@ pub struct TvixStoreFs<BS, DS, RN> { u64, ( Span, - Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>, + Arc<Mutex<mpsc::Receiver<(usize, Result<(Bytes, Node), crate::Error>)>>>, ), >, >, @@ -165,8 +165,12 @@ where /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup /// in self.directory_service is performed, and self.inode_tracker is updated with the /// [DirectoryInodeData::Populated]. + #[allow(clippy::type_complexity)] #[instrument(skip(self), err)] - fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> { + fn get_directory_children( + &self, + ino: u64, + ) -> io::Result<(B3Digest, Vec<(u64, bytes::Bytes, Node)>)> { let data = self.inode_tracker.read().get(ino).unwrap(); match *data { // if it's populated already, return children. @@ -196,13 +200,13 @@ where let children = { let mut inode_tracker = self.inode_tracker.write(); - let children: Vec<(u64, Node)> = directory + let children: Vec<(u64, Bytes, Node)> = directory .nodes() - .map(|child_node| { - let (inode_data, _) = InodeData::from_node(child_node); + .map(|(child_name, child_node)| { + let inode_data = InodeData::from_node(child_node); let child_ino = inode_tracker.put(inode_data); - (child_ino, child_node.clone()) + (child_ino, child_name.to_owned(), child_node.clone()) }) .collect(); @@ -263,12 +267,6 @@ where Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)), // The root node does exist Ok(Some(root_node)) => { - // The name must match what's passed in the lookup, otherwise this is also a ENOENT. - if root_node.get_name() != name.to_bytes() { - debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch"); - return Err(io::Error::from_raw_os_error(libc::ENOENT)); - } - // Let's check if someone else beat us to updating the inode tracker and // root_nodes map. This avoids locking inode_tracker for writing. if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) { @@ -285,9 +283,9 @@ where // insert the (sparse) inode data and register in // self.root_nodes. - let (inode_data, name) = InodeData::from_node(&root_node); + let inode_data = InodeData::from_node(&root_node); let ino = inode_tracker.put(inode_data.clone()); - root_nodes.insert(name, ino); + root_nodes.insert(bytes::Bytes::copy_from_slice(name.to_bytes()), ino); Ok((ino, Arc::new(inode_data))) } @@ -362,7 +360,7 @@ where // Search for that name in the list of children and return the FileAttrs. // in the children, find the one with the desired name. - if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) { + if let Some((child_ino, _, _)) = children.iter().find(|(_, n, _)| n == name.to_bytes()) { // lookup the child [InodeData] in [self.inode_tracker]. // We know the inodes for children have already been allocated. let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap(); @@ -398,8 +396,8 @@ where self.tokio_handle.spawn( async move { let mut stream = root_nodes_provider.list().enumerate(); - while let Some(node) = stream.next().await { - if tx.send(node).await.is_err() { + while let Some(e) = stream.next().await { + if tx.send(e).await.is_err() { // If we get a send error, it means the sync code // doesn't want any more entries. break; @@ -461,12 +459,12 @@ where .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; while let Some((i, n)) = rx.blocking_recv() { - let root_node = n.map_err(|e| { + let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EIO) })?; - let (inode_data, name) = InodeData::from_node(&root_node); + let inode_data = InodeData::from_node(&node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -495,15 +493,17 @@ where let (parent_digest, children) = self.get_directory_children(inode)?; Span::current().record("directory.digest", parent_digest.to_string()); - for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(&child_node); + for (i, (ino, child_name, child_node)) in + children.into_iter().skip(offset as usize).enumerate() + { + let inode_data = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { ino, offset: offset + (i as u64) + 1, type_: inode_data.as_fuse_type(), - name: &name, + name: &child_name, })?; // If the buffer is full, add_entry will return `Ok(0)`. if written == 0 { @@ -548,12 +548,12 @@ where .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; while let Some((i, n)) = rx.blocking_recv() { - let root_node = n.map_err(|e| { + let (name, node) = n.map_err(|e| { warn!("failed to retrieve root node: {}", e); io::Error::from_raw_os_error(libc::EPERM) })?; - let (inode_data, name) = InodeData::from_node(&root_node); + let inode_data = InodeData::from_node(&node); // obtain the inode, or allocate a new one. let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| { @@ -585,8 +585,8 @@ where let (parent_digest, children) = self.get_directory_children(inode)?; Span::current().record("directory.digest", parent_digest.to_string()); - for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() { - let (inode_data, name) = InodeData::from_node(&child_node); + for (i, (ino, name, child_node)) in children.into_iter().skip(offset as usize).enumerate() { + let inode_data = InodeData::from_node(&child_node); // the second parameter will become the "offset" parameter on the next call. let written = add_entry( diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs index c237142c8d91..62f0b62196a6 100644 --- a/tvix/castore/src/fs/root_nodes.rs +++ b/tvix/castore/src/fs/root_nodes.rs @@ -12,9 +12,10 @@ pub trait RootNodes: Send + Sync { /// directory of the filesystem. async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>; - /// Lists all root CA nodes in the filesystem. An error can be returned - /// in case listing is not allowed - fn list(&self) -> BoxStream<Result<Node, Error>>; + /// Lists all root CA nodes in the filesystem, as a tuple of (base)name + /// and Node. + /// An error can be returned in case listing is not allowed. + fn list(&self) -> BoxStream<Result<(bytes::Bytes, Node), Error>>; } #[async_trait] @@ -28,9 +29,11 @@ where Ok(self.as_ref().get(name).cloned()) } - fn list(&self) -> BoxStream<Result<Node, Error>> { + fn list(&self) -> BoxStream<Result<(bytes::Bytes, Node), Error>> { Box::pin(tokio_stream::iter( - self.as_ref().iter().map(|(_, v)| Ok(v.clone())), + self.as_ref() + .iter() + .map(|(name, node)| Ok((name.to_owned(), node.to_owned()))), )) } } |