about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--tvix/castore/src/fs/mod.rs202
-rw-r--r--tvix/castore/src/fs/tests.rs10
2 files changed, 144 insertions, 68 deletions
diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs
index 9a2df0313869..52355bdfbe60 100644
--- a/tvix/castore/src/fs/mod.rs
+++ b/tvix/castore/src/fs/mod.rs
@@ -20,7 +20,7 @@ use crate::{
     B3Digest,
 };
 use bstr::ByteVec;
-use fuse_backend_rs::abi::fuse_abi::stat64;
+use fuse_backend_rs::abi::fuse_abi::{stat64, OpenOptions};
 use fuse_backend_rs::api::filesystem::{
     Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID,
 };
@@ -97,6 +97,17 @@ pub struct TvixStoreFs<BS, DS, RN> {
     /// This keeps track of inodes and data alongside them.
     inode_tracker: RwLock<InodeTracker>,
 
+    // FUTUREWORK: have a generic container type for dir/file handles and handle
+    // allocation.
+    /// Maps from the handle returned from an opendir to
+    /// This holds all opendir handles (for the root inode)
+    /// They point to the rx part of the channel producing the listing.
+    #[allow(clippy::type_complexity)]
+    dir_handles:
+        RwLock<HashMap<u64, Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>>>,
+
+    next_dir_handle: AtomicU64,
+
     /// This holds all open file handles
     #[allow(clippy::type_complexity)]
     file_handles: RwLock<HashMap<u64, Arc<Mutex<Box<dyn BlobReader>>>>>,
@@ -130,6 +141,9 @@ where
             root_nodes: RwLock::new(HashMap::default()),
             inode_tracker: RwLock::new(Default::default()),
 
+            dir_handles: RwLock::new(Default::default()),
+            next_dir_handle: AtomicU64::new(1),
+
             file_handles: RwLock::new(Default::default()),
             next_file_handle: AtomicU64::new(1),
             tokio_handle: tokio::runtime::Handle::current(),
@@ -276,6 +290,9 @@ where
     }
 }
 
+/// Buffer size of the channel providing nodes in the mount root
+const ROOT_NODES_BUFFER_SIZE: usize = 16;
+
 const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest";
 const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest";
 
@@ -368,14 +385,63 @@ where
         }
     }
 
-    // TODO: readdirplus?
+    #[tracing::instrument(skip_all, fields(rq.inode = inode))]
+    fn opendir(
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _flags: u32,
+    ) -> io::Result<(Option<Self::Handle>, OpenOptions)> {
+        // In case opendir on the root is called, we provide the handle, as re-entering that listing is expensive.
+        // For all other directory inodes we just let readdir take care of it.
+        if inode == ROOT_ID {
+            if !self.list_root {
+                return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
+            }
+
+            let root_nodes_provider = self.root_nodes_provider.clone();
+            let (tx, rx) = mpsc::channel(ROOT_NODES_BUFFER_SIZE);
+
+            // This task will run in the background immediately and will exit
+            // after the stream ends or if we no longer want any more entries.
+            self.tokio_handle.spawn(async move {
+                let mut stream = root_nodes_provider.list().enumerate();
+                while let Some(node) = stream.next().await {
+                    if tx.send(node).await.is_err() {
+                        // If we get a send error, it means the sync code
+                        // doesn't want any more entries.
+                        break;
+                    }
+                }
+            });
+
+            // Put the rx part into [self.dir_handles].
+            // TODO: this will overflow after 2**64 operations,
+            // which is fine for now.
+            // See https://cl.tvl.fyi/c/depot/+/8834/comment/a6684ce0_d72469d1
+            // for the discussion on alternatives.
+            let dh = self.next_dir_handle.fetch_add(1, Ordering::SeqCst);
+
+            debug!("add dir handle {}", dh);
+            self.dir_handles
+                .write()
+                .insert(dh, Arc::new(Mutex::new(rx)));
+
+            return Ok((
+                Some(dh),
+                fuse_backend_rs::api::filesystem::OpenOptions::empty(), // TODO: non-seekable
+            ));
+        }
+
+        Ok((None, OpenOptions::empty()))
+    }
 
-    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.offset = offset))]
+    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle, rq.offset = offset))]
     fn readdir(
         &self,
         _ctx: &Context,
         inode: Self::Inode,
-        _handle: Self::Handle,
+        handle: Self::Handle,
         _size: u32,
         offset: u64,
         add_entry: &mut dyn FnMut(fuse_backend_rs::api::filesystem::DirEntry) -> io::Result<usize>,
@@ -385,68 +451,61 @@ where
         if inode == ROOT_ID {
             if !self.list_root {
                 return Err(io::Error::from_raw_os_error(libc::EPERM)); // same error code as ipfs/kubo
-            } else {
-                let root_nodes_provider = self.root_nodes_provider.clone();
-                let (tx, mut rx) = mpsc::channel(16);
-
-                // This task will run in the background immediately and will exit
-                // after the stream ends or if we no longer want any more entries.
-                self.tokio_handle.spawn(async move {
-                    let mut stream = root_nodes_provider.list().skip(offset as usize).enumerate();
-                    while let Some(node) = stream.next().await {
-                        if tx.send(node).await.is_err() {
-                            // If we get a send error, it means the sync code
-                            // doesn't want any more entries.
-                            break;
-                        }
-                    }
-                });
+            }
 
-                while let Some((i, ref root_node)) = rx.blocking_recv() {
-                    let root_node = match root_node {
-                        Err(e) => {
-                            warn!("failed to retrieve pathinfo: {}", e);
-                            return Err(io::Error::from_raw_os_error(libc::EPERM));
-                        }
-                        Ok(root_node) => root_node,
-                    };
-
-                    let name = root_node.get_name();
-                    // obtain the inode, or allocate a new one.
-                    let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| {
-                        // insert the (sparse) inode data and register in
-                        // self.root_nodes.
-                        let ino = self.inode_tracker.write().put(root_node.into());
-                        self.root_nodes.write().insert(name.into(), ino);
-                        ino
-                    });
-
-                    let ty = match root_node {
-                        Node::Directory(_) => libc::S_IFDIR,
-                        Node::File(_) => libc::S_IFREG,
-                        Node::Symlink(_) => libc::S_IFLNK,
-                    };
-
-                    #[cfg(target_os = "macos")]
-                    let ty = ty as u32;
-
-                    let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
-                        ino,
-                        offset: offset + i as u64 + 1,
-                        type_: ty,
-                        name,
-                    })?;
-                    // If the buffer is full, add_entry will return `Ok(0)`.
-                    if written == 0 {
-                        break;
-                    }
+            // get the handle from [self.dir_handles]
+            let rx = match self.dir_handles.read().get(&handle) {
+                Some(rx) => rx.clone(),
+                None => {
+                    warn!("dir handle {} unknown", handle);
+                    return Err(io::Error::from_raw_os_error(libc::EIO));
                 }
+            };
+
+            let mut rx = rx
+                .lock()
+                .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
+
+            while let Some((i, n)) = rx.blocking_recv() {
+                let root_node = n.map_err(|e| {
+                    warn!("failed to retrieve root node: {}", e);
+                    io::Error::from_raw_os_error(libc::EIO)
+                })?;
+
+                let name = root_node.get_name();
+                let ty = match root_node {
+                    Node::Directory(_) => libc::S_IFDIR,
+                    Node::File(_) => libc::S_IFREG,
+                    Node::Symlink(_) => libc::S_IFLNK,
+                };
+
+                // obtain the inode, or allocate a new one.
+                let ino = self.get_inode_for_root_name(name).unwrap_or_else(|| {
+                    // insert the (sparse) inode data and register in
+                    // self.root_nodes.
+                    let ino = self.inode_tracker.write().put((&root_node).into());
+                    self.root_nodes.write().insert(name.into(), ino);
+                    ino
+                });
+
+                #[cfg(target_os = "macos")]
+                let ty = ty as u32;
 
-                return Ok(());
+                let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
+                    ino,
+                    offset: offset + i as u64 + 1,
+                    type_: ty,
+                    name,
+                })?;
+                // If the buffer is full, add_entry will return `Ok(0)`.
+                if written == 0 {
+                    break;
+                }
             }
+            return Ok(());
         }
 
-        // lookup the children, or return an error if it's not a directory.
+        // Non root-node case: lookup the children, or return an error if it's not a directory.
         let (parent_digest, children) = self.get_directory_children(inode)?;
 
         let span = info_span!("lookup", directory.digest = %parent_digest);
@@ -478,6 +537,29 @@ where
 
         Ok(())
     }
+    // TODO: readdirplus?
+
+    #[tracing::instrument(skip_all, fields(rq.inode = inode, rq.handle = handle))]
+    fn releasedir(
+        &self,
+        _ctx: &Context,
+        inode: Self::Inode,
+        _flags: u32,
+        handle: Self::Handle,
+    ) -> io::Result<()> {
+        if inode == ROOT_ID {
+            // drop the rx part of the channel.
+            match self.dir_handles.write().remove(&handle) {
+                // drop it, which will close it.
+                Some(rx) => drop(rx),
+                None => {
+                    debug!("dir handle not found");
+                }
+            }
+        }
+
+        Ok(())
+    }
 
     #[tracing::instrument(skip_all, fields(rq.inode = inode))]
     fn open(
diff --git a/tvix/castore/src/fs/tests.rs b/tvix/castore/src/fs/tests.rs
index 226c9975d573..d6eeb8a4113d 100644
--- a/tvix/castore/src/fs/tests.rs
+++ b/tvix/castore/src/fs/tests.rs
@@ -281,14 +281,8 @@ async fn root() {
     .expect("must succeed");
 
     {
-        // read_dir succeeds, but getting the first element will fail.
-        let mut it = ReadDirStream::new(tokio::fs::read_dir(tmpdir).await.expect("must succeed"));
-
-        let err = it
-            .next()
-            .await
-            .expect("must be some")
-            .expect_err("must be err");
+        // read_dir fails (as opendir fails).
+        let err = tokio::fs::read_dir(tmpdir).await.expect_err("must fail");
         assert_eq!(std::io::ErrorKind::PermissionDenied, err.kind());
     }