diff options
Diffstat (limited to 'tvix/castore/src/fs')
-rw-r--r-- | tvix/castore/src/fs/mod.rs | 202 | ||||
-rw-r--r-- | tvix/castore/src/fs/tests.rs | 10 |
2 files changed, 144 insertions, 68 deletions
diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs index 9a2df0313869..52355bdfbe60 100644 --- a/tvix/castore/src/fs/mod.rs +++ b/tvix/castore/src/fs/mod.rs @@ -20,7 +20,7 @@ use crate::{ B3Digest, }; use bstr::ByteVec; -use fuse_backend_rs::abi::fuse_abi::stat64; +use fuse_backend_rs::abi::fuse_abi::{stat64, OpenOptions}; use fuse_backend_rs::api::filesystem::{ Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID, }; @@ -97,6 +97,17 @@ pub struct TvixStoreFs<BS, DS, RN> { /// This keeps track of inodes and data alongside them. inode_tracker: RwLock<InodeTracker>, + // FUTUREWORK: have a generic container type for dir/file handles and handle + // allocation. + /// Maps from the handle returned from an opendir to + /// This holds all opendir handles (for the root inode) + /// They point to the rx part of the channel producing the listing. + #[allow(clippy::type_complexity)] + dir_handles: + RwLock<HashMap<u64, Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>>>, + + next_dir_handle: AtomicU64, + /// This holds all open file handles #[allow(clippy::type_complexity)] file_handles: RwLock<HashMap<u64, Arc<Mutex<Box<dyn BlobReader>>>>>, @@ -130,6 +141,9 @@ where root_nodes: RwLock::new(HashMap::default()), inode_tracker: RwLock::new(Default::default()), + dir_handles: RwLock::new(Default::default()), + next_dir_handle: AtomicU64::new(1), + file_handles: RwLock::new(Default::default()), next_file_handle: AtomicU64::new(1), tokio_handle: tokio::runtime::Handle::current(), @@ -276,6 +290,9 @@ where } } +/// Buffer size of the channel providing nodes in the mount root +const ROOT_NODES_BUFFER_SIZE: usize = 16; + const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest"; const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest"; @@ -368,14 +385,63 @@ where } } - // TODO: readdirplus? + #[tracing::instrument(skip_all, fields(rq.inode = inode))] + fn opendir( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + ) -> io::Result<(Option<Self::Handle>, OpenOptions)> { + // In case opendir on the root is called, we provide the handle, as re-entering that listing is expensive. + // For all other directory inodes we just let readdir take care of it. + if inode == ROOT_ID { + if !self.list_root { + return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo + } + + let root_nodes_provider = self.root_nodes_provider.clone(); + let (tx, rx) = mpsc::channel(ROOT_NODES_BUFFER_SIZE); + + // This task will run in the background immediately and will exit + // after the stream ends or if we no longer want any more entries. + self.tokio_handle.spawn(async move { + let mut stream = root_nodes_provider.list().enumerate(); + while let Some(node) = stream.next().await { + if tx.send(node).await.is_err() { + // If we get a send error, it means the sync code + // doesn't want any more entries. + break; + } + } + }); + + // Put the rx part into [self.dir_handles]. + // TODO: this will overflow after 2**64 operations, + // which is fine for now. + // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1 + // for the discussion on alternatives. + let dh = self.next_dir_handle.fetch_add(1, Ordering::SeqCst); + + debug!("add dir handle {}", dh); + self.dir_handles + .write() + .insert(dh, Arc::new(Mutex::new(rx))); + + return Ok(( + Some(dh), + fuse_backend_rs::api::filesystem::OpenOptions::empty(), // TODO: non-seekable + )); + } + + Ok((None, OpenOptions::empty())) + } - #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))] + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle, rq.offset = offset))] fn readdir( &self, _ctx: &Context, inode: Self::Inode, - _handle: Self::Handle, + handle: Self::Handle, _size: u32, offset: u64, add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>, @@ -385,68 +451,61 @@ where if inode == ROOT_ID { if !self.list_root { return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo - } else { - let root_nodes_provider = self.root_nodes_provider.clone(); - let (tx, mut rx) = mpsc::channel(16); - - // This task will run in the background immediately and will exit - // after the stream ends or if we no longer want any more entries. - self.tokio_handle.spawn(async move { - let mut stream = root_nodes_provider.list().skip(offset as usize).enumerate(); - while let Some(node) = stream.next().await { - if tx.send(node).await.is_err() { - // If we get a send error, it means the sync code - // doesn't want any more entries. - break; - } - } - }); + } - while let Some((i, ref root_node)) = rx.blocking_recv() { - let root_node = match root_node { - Err(e) => { - warn!("failed to retrieve pathinfo: {}", e); - return Err(io::Error::from_raw_os_error(libc::EPERM)); - } - Ok(root_node) => root_node, - }; - - let name = root_node.get_name(); - // obtain the inode, or allocate a new one. - let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| { - // insert the (sparse) inode data and register in - // self.root_nodes. - let ino = self.inode_tracker.write().put(root_node.into()); - self.root_nodes.write().insert(name.into(), ino); - ino - }); - - let ty = match root_node { - Node::Directory(_) => libc::S_IFDIR, - Node::File(_) => libc::S_IFREG, - Node::Symlink(_) => libc::S_IFLNK, - }; - - #[cfg(target_os = "macos")] - let ty = ty as u32; - - let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { - ino, - offset: offset + i as u64 + 1, - type_: ty, - name, - })?; - // If the buffer is full, add_entry will return `Ok(0)`. - if written == 0 { - break; - } + // get the handle from [self.dir_handles] + let rx = match self.dir_handles.read().get(&handle) { + Some(rx) => rx.clone(), + None => { + warn!("dir handle {} unknown", handle); + return Err(io::Error::from_raw_os_error(libc::EIO)); } + }; + + let mut rx = rx + .lock() + .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?; + + while let Some((i, n)) = rx.blocking_recv() { + let root_node = n.map_err(|e| { + warn!("failed to retrieve root node: {}", e); + io::Error::from_raw_os_error(libc::EIO) + })?; + + let name = root_node.get_name(); + let ty = match root_node { + Node::Directory(_) => libc::S_IFDIR, + Node::File(_) => libc::S_IFREG, + Node::Symlink(_) => libc::S_IFLNK, + }; + + // obtain the inode, or allocate a new one. + let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| { + // insert the (sparse) inode data and register in + // self.root_nodes. + let ino = self.inode_tracker.write().put((&root_node).into()); + self.root_nodes.write().insert(name.into(), ino); + ino + }); + + #[cfg(target_os = "macos")] + let ty = ty as u32; - return Ok(()); + let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry { + ino, + offset: offset + i as u64 + 1, + type_: ty, + name, + })?; + // If the buffer is full, add_entry will return `Ok(0)`. + if written == 0 { + break; + } } + return Ok(()); } - // lookup the children, or return an error if it's not a directory. + // Non root-node case: lookup the children, or return an error if it's not a directory. let (parent_digest, children) = self.get_directory_children(inode)?; let span = info_span!("lookup", directory.digest = %parent_digest); @@ -478,6 +537,29 @@ where Ok(()) } + // TODO: readdirplus? + + #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle))] + fn releasedir( + &self, + _ctx: &Context, + inode: Self::Inode, + _flags: u32, + handle: Self::Handle, + ) -> io::Result<()> { + if inode == ROOT_ID { + // drop the rx part of the channel. + match self.dir_handles.write().remove(&handle) { + // drop it, which will close it. + Some(rx) => drop(rx), + None => { + debug!("dir handle not found"); + } + } + } + + Ok(()) + } #[tracing::instrument(skip_all, fields(rq.inode = inode))] fn open( diff --git a/tvix/castore/src/fs/tests.rs b/tvix/castore/src/fs/tests.rs index 226c9975d573..d6eeb8a4113d 100644 --- a/tvix/castore/src/fs/tests.rs +++ b/tvix/castore/src/fs/tests.rs @@ -281,14 +281,8 @@ async fn root() { .expect("must succeed"); { - // read_dir succeeds, but getting the first element will fail. - let mut it = ReadDirStream::new(tokio::fs::read_dir(tmpdir).await.expect("must succeed")); - - let err = it - .next() - .await - .expect("must be some") - .expect_err("must be err"); + // read_dir fails (as opendir fails). + let err = tokio::fs::read_dir(tmpdir).await.expect_err("must fail"); assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind()); } |