about summary refs log tree commit diff
path: root/tvix/castore/src/fs
diff options
context:
space:
mode:
Diffstat (limited to 'tvix/castore/src/fs')
-rw-r--r--tvix/castore/src/fs/fuse/mod.rs (renamed from tvix/castore/src/fs/fuse.rs)59
-rw-r--r--tvix/castore/src/fs/fuse/tests.rs (renamed from tvix/castore/src/fs/tests.rs)168
-rw-r--r--tvix/castore/src/fs/inodes.rs33
-rw-r--r--tvix/castore/src/fs/mod.rs132
-rw-r--r--tvix/castore/src/fs/root_nodes.rs22
5 files changed, 218 insertions, 196 deletions
diff --git a/tvix/castore/src/fs/fuse.rs b/tvix/castore/src/fs/fuse/mod.rs
index cd50618ff5bc..64ef29ed2aa1 100644
--- a/tvix/castore/src/fs/fuse.rs
+++ b/tvix/castore/src/fs/fuse/mod.rs
@@ -1,8 +1,13 @@
-use std::{io, path::Path, sync::Arc, thread};
+use std::{io, path::Path, sync::Arc};
 
 use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
+use parking_lot::Mutex;
+use threadpool::ThreadPool;
 use tracing::{error, instrument};
 
+#[cfg(test)]
+mod tests;
+
 struct FuseServer<FS>
 where
     FS: FileSystem + Sync + Send,
@@ -46,9 +51,12 @@ where
     }
 }
 
+/// Starts a [Filesystem] with the specified number of threads, and provides
+/// functions to unmount, and wait for it to have completed.
+#[derive(Clone)]
 pub struct FuseDaemon {
-    session: FuseSession,
-    threads: Vec<thread::JoinHandle<()>>,
+    session: Arc<Mutex<FuseSession>>,
+    threads: Arc<ThreadPool>,
 }
 
 impl FuseDaemon {
@@ -56,7 +64,7 @@ impl FuseDaemon {
     pub fn new<FS, P>(
         fs: FS,
         mountpoint: P,
-        threads: usize,
+        num_threads: usize,
         allow_other: bool,
     ) -> Result<Self, io::Error>
     where
@@ -73,40 +81,49 @@ impl FuseDaemon {
         session
             .mount()
             .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
-        let mut join_handles = Vec::with_capacity(threads);
-        for _ in 0..threads {
+
+        // construct a thread pool
+        let threads = threadpool::Builder::new()
+            .num_threads(num_threads)
+            .thread_name("fuse_server".to_string())
+            .build();
+
+        for _ in 0..num_threads {
+            // for each thread requested, create and start a FuseServer accepting requests.
             let mut server = FuseServer {
                 server: server.clone(),
                 channel: session
                     .new_channel()
                     .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
             };
-            let join_handle = thread::Builder::new()
-                .name("fuse_server".to_string())
-                .spawn(move || {
-                    let _ = server.start();
-                })?;
-            join_handles.push(join_handle);
+
+            threads.execute(move || {
+                let _ = server.start();
+            });
         }
 
         Ok(FuseDaemon {
-            session,
-            threads: join_handles,
+            session: Arc::new(Mutex::new(session)),
+            threads: Arc::new(threads),
         })
     }
 
+    /// Waits for all threads to finish.
+    #[instrument(skip_all)]
+    pub fn wait(&self) {
+        self.threads.join()
+    }
+
+    /// Send the unmount command, and waits for all threads to finish.
     #[instrument(skip_all, err)]
-    pub fn unmount(&mut self) -> Result<(), io::Error> {
+    pub fn unmount(&self) -> Result<(), io::Error> {
+        // Send the unmount command.
         self.session
+            .lock()
             .umount()
             .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
 
-        for thread in self.threads.drain(..) {
-            thread.join().map_err(|_| {
-                io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
-            })?;
-        }
-
+        self.wait();
         Ok(())
     }
 }
diff --git a/tvix/castore/src/fs/tests.rs b/tvix/castore/src/fs/fuse/tests.rs
index d6eeb8a4113d..0d68af090daf 100644
--- a/tvix/castore/src/fs/tests.rs
+++ b/tvix/castore/src/fs/fuse/tests.rs
@@ -1,5 +1,4 @@
 use bstr::ByteSlice;
-use bytes::Bytes;
 use std::{
     collections::BTreeMap,
     ffi::{OsStr, OsString},
@@ -11,13 +10,15 @@ use std::{
 use tempfile::TempDir;
 use tokio_stream::{wrappers::ReadDirStream, StreamExt};
 
-use super::{fuse::FuseDaemon, TvixStoreFs};
-use crate::proto as castorepb;
-use crate::proto::node::Node;
+use super::FuseDaemon;
 use crate::{
     blobservice::{BlobService, MemoryBlobService},
     directoryservice::{DirectoryService, MemoryDirectoryService},
-    fixtures,
+    fixtures, Node,
+};
+use crate::{
+    fs::{TvixStoreFs, XATTR_NAME_BLOB_DIGEST, XATTR_NAME_DIRECTORY_DIGEST},
+    PathComponent,
 };
 
 const BLOB_A_NAME: &str = "00000000000000000000000000000000-test";
@@ -38,14 +39,14 @@ fn gen_svcs() -> (Arc<dyn BlobService>, Arc<dyn DirectoryService>) {
 fn do_mount<P: AsRef<Path>, BS, DS>(
     blob_service: BS,
     directory_service: DS,
-    root_nodes: BTreeMap<bytes::Bytes, Node>,
+    root_nodes: BTreeMap<PathComponent, Node>,
     mountpoint: P,
     list_root: bool,
     show_xattr: bool,
 ) -> io::Result<FuseDaemon>
 where
-    BS: AsRef<dyn BlobService> + Send + Sync + Clone + 'static,
-    DS: AsRef<dyn DirectoryService> + Send + Sync + Clone + 'static,
+    BS: BlobService + Send + Sync + Clone + 'static,
+    DS: DirectoryService + Send + Sync + Clone + 'static,
 {
     let fs = TvixStoreFs::new(
         blob_service,
@@ -59,7 +60,7 @@ where
 
 async fn populate_blob_a(
     blob_service: &Arc<dyn BlobService>,
-    root_nodes: &mut BTreeMap<Bytes, Node>,
+    root_nodes: &mut BTreeMap<PathComponent, Node>,
 ) {
     let mut bw = blob_service.open_write().await;
     tokio::io::copy(&mut Cursor::new(fixtures::BLOB_A.to_vec()), &mut bw)
@@ -68,19 +69,18 @@ async fn populate_blob_a(
     bw.close().await.expect("must succeed closing");
 
     root_nodes.insert(
-        BLOB_A_NAME.into(),
-        Node::File(castorepb::FileNode {
-            name: BLOB_A_NAME.into(),
-            digest: fixtures::BLOB_A_DIGEST.clone().into(),
+        BLOB_A_NAME.try_into().unwrap(),
+        Node::File {
+            digest: fixtures::BLOB_A_DIGEST.clone(),
             size: fixtures::BLOB_A.len() as u64,
             executable: false,
-        }),
+        },
     );
 }
 
 async fn populate_blob_b(
     blob_service: &Arc<dyn BlobService>,
-    root_nodes: &mut BTreeMap<Bytes, Node>,
+    root_nodes: &mut BTreeMap<PathComponent, Node>,
 ) {
     let mut bw = blob_service.open_write().await;
     tokio::io::copy(&mut Cursor::new(fixtures::BLOB_B.to_vec()), &mut bw)
@@ -89,20 +89,19 @@ async fn populate_blob_b(
     bw.close().await.expect("must succeed closing");
 
     root_nodes.insert(
-        BLOB_B_NAME.into(),
-        Node::File(castorepb::FileNode {
-            name: BLOB_B_NAME.into(),
-            digest: fixtures::BLOB_B_DIGEST.clone().into(),
+        BLOB_B_NAME.try_into().unwrap(),
+        Node::File {
+            digest: fixtures::BLOB_B_DIGEST.clone(),
             size: fixtures::BLOB_B.len() as u64,
             executable: false,
-        }),
+        },
     );
 }
 
 /// adds a blob containing helloworld and marks it as executable
 async fn populate_blob_helloworld(
     blob_service: &Arc<dyn BlobService>,
-    root_nodes: &mut BTreeMap<Bytes, Node>,
+    root_nodes: &mut BTreeMap<PathComponent, Node>,
 ) {
     let mut bw = blob_service.open_write().await;
     tokio::io::copy(
@@ -114,42 +113,39 @@ async fn populate_blob_helloworld(
     bw.close().await.expect("must succeed closing");
 
     root_nodes.insert(
-        HELLOWORLD_BLOB_NAME.into(),
-        Node::File(castorepb::FileNode {
-            name: HELLOWORLD_BLOB_NAME.into(),
-            digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone().into(),
+        HELLOWORLD_BLOB_NAME.try_into().unwrap(),
+        Node::File {
+            digest: fixtures::HELLOWORLD_BLOB_DIGEST.clone(),
             size: fixtures::HELLOWORLD_BLOB_CONTENTS.len() as u64,
             executable: true,
-        }),
+        },
     );
 }
 
-async fn populate_symlink(root_nodes: &mut BTreeMap<Bytes, Node>) {
+async fn populate_symlink(root_nodes: &mut BTreeMap<PathComponent, Node>) {
     root_nodes.insert(
-        SYMLINK_NAME.into(),
-        Node::Symlink(castorepb::SymlinkNode {
-            name: SYMLINK_NAME.into(),
-            target: BLOB_A_NAME.into(),
-        }),
+        SYMLINK_NAME.try_into().unwrap(),
+        Node::Symlink {
+            target: BLOB_A_NAME.try_into().unwrap(),
+        },
     );
 }
 
 /// This writes a symlink pointing to /nix/store/somewhereelse,
 /// which is the same symlink target as "aa" inside DIRECTORY_COMPLICATED.
-async fn populate_symlink2(root_nodes: &mut BTreeMap<Bytes, Node>) {
+async fn populate_symlink2(root_nodes: &mut BTreeMap<PathComponent, Node>) {
     root_nodes.insert(
-        SYMLINK_NAME2.into(),
-        Node::Symlink(castorepb::SymlinkNode {
-            name: SYMLINK_NAME2.into(),
-            target: "/nix/store/somewhereelse".into(),
-        }),
+        SYMLINK_NAME2.try_into().unwrap(),
+        Node::Symlink {
+            target: "/nix/store/somewhereelse".try_into().unwrap(),
+        },
     );
 }
 
 async fn populate_directory_with_keep(
     blob_service: &Arc<dyn BlobService>,
     directory_service: &Arc<dyn DirectoryService>,
-    root_nodes: &mut BTreeMap<Bytes, Node>,
+    root_nodes: &mut BTreeMap<PathComponent, Node>,
 ) {
     // upload empty blob
     let mut bw = blob_service.open_write().await;
@@ -165,45 +161,42 @@ async fn populate_directory_with_keep(
         .expect("must succeed uploading");
 
     root_nodes.insert(
-        DIRECTORY_WITH_KEEP_NAME.into(),
-        castorepb::node::Node::Directory(castorepb::DirectoryNode {
-            name: DIRECTORY_WITH_KEEP_NAME.into(),
-            digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(),
+        DIRECTORY_WITH_KEEP_NAME.try_into().unwrap(),
+        Node::Directory {
+            digest: fixtures::DIRECTORY_WITH_KEEP.digest(),
             size: fixtures::DIRECTORY_WITH_KEEP.size(),
-        }),
+        },
     );
 }
 
 /// Create a root node for DIRECTORY_WITH_KEEP, but don't upload the Directory
 /// itself.
-async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<Bytes, Node>) {
+async fn populate_directorynode_without_directory(root_nodes: &mut BTreeMap<PathComponent, Node>) {
     root_nodes.insert(
-        DIRECTORY_WITH_KEEP_NAME.into(),
-        castorepb::node::Node::Directory(castorepb::DirectoryNode {
-            name: DIRECTORY_WITH_KEEP_NAME.into(),
-            digest: fixtures::DIRECTORY_WITH_KEEP.digest().into(),
+        DIRECTORY_WITH_KEEP_NAME.try_into().unwrap(),
+        Node::Directory {
+            digest: fixtures::DIRECTORY_WITH_KEEP.digest(),
             size: fixtures::DIRECTORY_WITH_KEEP.size(),
-        }),
+        },
     );
 }
 
 /// Insert BLOB_A, but don't provide the blob .keep is pointing to.
-async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<Bytes, Node>) {
+async fn populate_filenode_without_blob(root_nodes: &mut BTreeMap<PathComponent, Node>) {
     root_nodes.insert(
-        BLOB_A_NAME.into(),
-        Node::File(castorepb::FileNode {
-            name: BLOB_A_NAME.into(),
-            digest: fixtures::BLOB_A_DIGEST.clone().into(),
+        BLOB_A_NAME.try_into().unwrap(),
+        Node::File {
+            digest: fixtures::BLOB_A_DIGEST.clone(),
             size: fixtures::BLOB_A.len() as u64,
             executable: false,
-        }),
+        },
     );
 }
 
 async fn populate_directory_complicated(
     blob_service: &Arc<dyn BlobService>,
     directory_service: &Arc<dyn DirectoryService>,
-    root_nodes: &mut BTreeMap<Bytes, Node>,
+    root_nodes: &mut BTreeMap<PathComponent, Node>,
 ) {
     // upload empty blob
     let mut bw = blob_service.open_write().await;
@@ -225,12 +218,11 @@ async fn populate_directory_complicated(
         .expect("must succeed uploading");
 
     root_nodes.insert(
-        DIRECTORY_COMPLICATED_NAME.into(),
-        Node::Directory(castorepb::DirectoryNode {
-            name: DIRECTORY_COMPLICATED_NAME.into(),
-            digest: fixtures::DIRECTORY_COMPLICATED.digest().into(),
+        DIRECTORY_COMPLICATED_NAME.try_into().unwrap(),
+        Node::Directory {
+            digest: fixtures::DIRECTORY_COMPLICATED.digest(),
             size: fixtures::DIRECTORY_COMPLICATED.size(),
-        }),
+        },
     );
 }
 
@@ -247,7 +239,7 @@ async fn mount() {
 
     let (blob_service, directory_service) = gen_svcs();
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         BTreeMap::default(),
@@ -270,7 +262,7 @@ async fn root() {
     let tmpdir = TempDir::new().unwrap();
 
     let (blob_service, directory_service) = gen_svcs();
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         BTreeMap::default(),
@@ -304,7 +296,7 @@ async fn root_with_listing() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -348,7 +340,7 @@ async fn stat_file_at_root() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -385,7 +377,7 @@ async fn read_file_at_root() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -422,7 +414,7 @@ async fn read_large_file_at_root() {
 
     populate_blob_b(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -467,7 +459,7 @@ async fn symlink_readlink() {
 
     populate_symlink(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -514,7 +506,7 @@ async fn read_stat_through_symlink() {
     populate_blob_a(&blob_service, &mut root_nodes).await;
     populate_symlink(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -559,7 +551,7 @@ async fn read_stat_directory() {
 
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -596,7 +588,7 @@ async fn xattr() {
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -614,12 +606,12 @@ async fn xattr() {
         // There should be 1 key, XATTR_NAME_DIRECTORY_DIGEST.
         assert_eq!(1, xattr_names.len(), "there should be 1 xattr name");
         assert_eq!(
-            super::XATTR_NAME_DIRECTORY_DIGEST,
+            XATTR_NAME_DIRECTORY_DIGEST,
             xattr_names.first().unwrap().as_encoded_bytes()
         );
 
         // The key should equal to the string-formatted b3 digest.
-        let val = xattr::get(&p, OsStr::from_bytes(super::XATTR_NAME_DIRECTORY_DIGEST))
+        let val = xattr::get(&p, OsStr::from_bytes(XATTR_NAME_DIRECTORY_DIGEST))
             .expect("must succeed")
             .expect("must be some");
         assert_eq!(
@@ -643,12 +635,12 @@ async fn xattr() {
         // There should be 1 key, XATTR_NAME_BLOB_DIGEST.
         assert_eq!(1, xattr_names.len(), "there should be 1 xattr name");
         assert_eq!(
-            super::XATTR_NAME_BLOB_DIGEST,
+            XATTR_NAME_BLOB_DIGEST,
             xattr_names.first().unwrap().as_encoded_bytes()
         );
 
         // The key should equal to the string-formatted b3 digest.
-        let val = xattr::get(&p, OsStr::from_bytes(super::XATTR_NAME_BLOB_DIGEST))
+        let val = xattr::get(&p, OsStr::from_bytes(XATTR_NAME_BLOB_DIGEST))
             .expect("must succeed")
             .expect("must be some");
         assert_eq!(
@@ -679,7 +671,7 @@ async fn read_blob_inside_dir() {
 
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -719,7 +711,7 @@ async fn read_blob_deep_inside_dir() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -762,7 +754,7 @@ async fn readdir() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -822,7 +814,7 @@ async fn readdir_deep() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -872,7 +864,7 @@ async fn check_attributes() {
     populate_symlink(&mut root_nodes).await;
     populate_blob_helloworld(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -947,7 +939,7 @@ async fn compare_inodes_directories() {
     populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await;
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -991,7 +983,7 @@ async fn compare_inodes_files() {
 
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1040,7 +1032,7 @@ async fn compare_inodes_symlinks() {
     populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await;
     populate_symlink2(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1083,7 +1075,7 @@ async fn read_wrong_paths_in_root() {
 
     populate_blob_a(&blob_service, &mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1138,7 +1130,7 @@ async fn disallow_writes() {
     let (blob_service, directory_service) = gen_svcs();
     let root_nodes = BTreeMap::default();
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1170,7 +1162,7 @@ async fn missing_directory() {
 
     populate_directorynode_without_directory(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
@@ -1218,7 +1210,7 @@ async fn missing_blob() {
 
     populate_filenode_without_blob(&mut root_nodes).await;
 
-    let mut fuse_daemon = do_mount(
+    let fuse_daemon = do_mount(
         blob_service,
         directory_service,
         root_nodes,
diff --git a/tvix/castore/src/fs/inodes.rs b/tvix/castore/src/fs/inodes.rs
index bdd459543470..2696fdede378 100644
--- a/tvix/castore/src/fs/inodes.rs
+++ b/tvix/castore/src/fs/inodes.rs
@@ -2,10 +2,7 @@
 //! about inodes, which present tvix-castore nodes in a filesystem.
 use std::time::Duration;
 
-use bytes::Bytes;
-
-use crate::proto as castorepb;
-use crate::B3Digest;
+use crate::{path::PathComponent, B3Digest, Node};
 
 #[derive(Clone, Debug)]
 pub enum InodeData {
@@ -20,27 +17,23 @@ pub enum InodeData {
 /// lookup and did fetch the data.
 #[derive(Clone, Debug)]
 pub enum DirectoryInodeData {
-    Sparse(B3Digest, u64),                                  // digest, size
-    Populated(B3Digest, Vec<(u64, castorepb::node::Node)>), // [(child_inode, node)]
+    Sparse(B3Digest, u64),                                // digest, size
+    Populated(B3Digest, Vec<(u64, PathComponent, Node)>), // [(child_inode, name, node)]
 }
 
 impl InodeData {
     /// Constructs a new InodeData by consuming a [Node].
-    /// It splits off the orginal name, so it can be used later.
-    pub fn from_node(node: castorepb::node::Node) -> (Self, Bytes) {
+    pub fn from_node(node: &Node) -> Self {
         match node {
-            castorepb::node::Node::Directory(n) => (
-                Self::Directory(DirectoryInodeData::Sparse(
-                    n.digest.try_into().unwrap(),
-                    n.size,
-                )),
-                n.name,
-            ),
-            castorepb::node::Node::File(n) => (
-                Self::Regular(n.digest.try_into().unwrap(), n.size, n.executable),
-                n.name,
-            ),
-            castorepb::node::Node::Symlink(n) => (Self::Symlink(n.target), n.name),
+            Node::Directory { digest, size } => {
+                Self::Directory(DirectoryInodeData::Sparse(digest.clone(), *size))
+            }
+            Node::File {
+                digest,
+                size,
+                executable,
+            } => Self::Regular(digest.clone(), *size, *executable),
+            Node::Symlink { target } => Self::Symlink(target.clone().into()),
         }
     }
 
diff --git a/tvix/castore/src/fs/mod.rs b/tvix/castore/src/fs/mod.rs
index 826523131fbd..4f50868b8f44 100644
--- a/tvix/castore/src/fs/mod.rs
+++ b/tvix/castore/src/fs/mod.rs
@@ -9,24 +9,19 @@ pub mod fuse;
 #[cfg(feature = "virtiofs")]
 pub mod virtiofs;
 
-#[cfg(test)]
-mod tests;
-
 pub use self::root_nodes::RootNodes;
 use self::{
     file_attr::ROOT_FILE_ATTR,
     inode_tracker::InodeTracker,
     inodes::{DirectoryInodeData, InodeData},
 };
-use crate::proto as castorepb;
 use crate::{
     blobservice::{BlobReader, BlobService},
     directoryservice::DirectoryService,
-    proto::{node::Node, NamedNode},
-    B3Digest,
+    path::PathComponent,
+    B3Digest, Node,
 };
 use bstr::ByteVec;
-use bytes::Bytes;
 use fuse_backend_rs::abi::fuse_abi::{stat64, OpenOptions};
 use fuse_backend_rs::api::filesystem::{
     Context, FileSystem, FsOptions, GetxattrReply, ListxattrReply, ROOT_ID,
@@ -46,7 +41,7 @@ use tokio::{
     io::{AsyncReadExt, AsyncSeekExt},
     sync::mpsc,
 };
-use tracing::{debug, error, instrument, warn, Span};
+use tracing::{debug, error, instrument, warn, Instrument as _, Span};
 
 /// This implements a read-only FUSE filesystem for a tvix-store
 /// with the passed [BlobService], [DirectoryService] and [RootNodes].
@@ -92,7 +87,7 @@ pub struct TvixStoreFs<BS, DS, RN> {
     show_xattr: bool,
 
     /// This maps a given basename in the root to the inode we allocated for the node.
-    root_nodes: RwLock<HashMap<Bytes, u64>>,
+    root_nodes: RwLock<HashMap<PathComponent, u64>>,
 
     /// This keeps track of inodes and data alongside them.
     inode_tracker: RwLock<InodeTracker>,
@@ -108,7 +103,7 @@ pub struct TvixStoreFs<BS, DS, RN> {
             u64,
             (
                 Span,
-                Arc<Mutex<mpsc::Receiver<(usize, Result<Node, crate::Error>)>>>,
+                Arc<Mutex<mpsc::Receiver<(usize, Result<(PathComponent, Node), crate::Error>)>>>,
             ),
         >,
     >,
@@ -126,8 +121,8 @@ pub struct TvixStoreFs<BS, DS, RN> {
 
 impl<BS, DS, RN> TvixStoreFs<BS, DS, RN>
 where
-    BS: AsRef<dyn BlobService> + Clone + Send,
-    DS: AsRef<dyn DirectoryService> + Clone + Send + 'static,
+    BS: BlobService + Clone + Send,
+    DS: DirectoryService + Clone + Send + 'static,
     RN: RootNodes + Clone + 'static,
 {
     pub fn new(
@@ -159,7 +154,7 @@ where
 
     /// Retrieves the inode for a given root node basename, if present.
     /// This obtains a read lock on self.root_nodes.
-    fn get_inode_for_root_name(&self, name: &[u8]) -> Option<u64> {
+    fn get_inode_for_root_name(&self, name: &PathComponent) -> Option<u64> {
         self.root_nodes.read().get(name).cloned()
     }
 
@@ -170,8 +165,12 @@ where
     /// It is ok if it's a [DirectoryInodeData::Sparse] - in that case, a lookup
     /// in self.directory_service is performed, and self.inode_tracker is updated with the
     /// [DirectoryInodeData::Populated].
+    #[allow(clippy::type_complexity)]
     #[instrument(skip(self), err)]
-    fn get_directory_children(&self, ino: u64) -> io::Result<(B3Digest, Vec<(u64, Node)>)> {
+    fn get_directory_children(
+        &self,
+        ino: u64,
+    ) -> io::Result<(B3Digest, Vec<(u64, PathComponent, Node)>)> {
         let data = self.inode_tracker.read().get(ino).unwrap();
         match *data {
             // if it's populated already, return children.
@@ -187,7 +186,7 @@ where
                     .block_on({
                         let directory_service = self.directory_service.clone();
                         let parent_digest = parent_digest.to_owned();
-                        async move { directory_service.as_ref().get(&parent_digest).await }
+                        async move { directory_service.get(&parent_digest).await }
                     })?
                     .ok_or_else(|| {
                         warn!(directory.digest=%parent_digest, "directory not found");
@@ -201,13 +200,13 @@ where
                 let children = {
                     let mut inode_tracker = self.inode_tracker.write();
 
-                    let children: Vec<(u64, castorepb::node::Node)> = directory
-                        .nodes()
-                        .map(|child_node| {
-                            let (inode_data, _) = InodeData::from_node(child_node.clone());
+                    let children: Vec<(u64, PathComponent, Node)> = directory
+                        .into_nodes()
+                        .map(|(child_name, child_node)| {
+                            let inode_data = InodeData::from_node(&child_node);
 
                             let child_ino = inode_tracker.put(inode_data);
-                            (child_ino, child_node)
+                            (child_ino, child_name, child_node)
                         })
                         .collect();
 
@@ -241,12 +240,12 @@ where
     /// In the case the name can't be found, a libc::ENOENT is returned.
     fn name_in_root_to_ino_and_data(
         &self,
-        name: &std::ffi::CStr,
+        name: &PathComponent,
     ) -> io::Result<(u64, Arc<InodeData>)> {
         // Look up the inode for that root node.
         // If there's one, [self.inode_tracker] MUST also contain the data,
         // which we can then return.
-        if let Some(inode) = self.get_inode_for_root_name(name.to_bytes()) {
+        if let Some(inode) = self.get_inode_for_root_name(name) {
             return Ok((
                 inode,
                 self.inode_tracker
@@ -260,7 +259,8 @@ where
         // We don't have it yet, look it up in [self.root_nodes].
         match self.tokio_handle.block_on({
             let root_nodes_provider = self.root_nodes_provider.clone();
-            async move { root_nodes_provider.get_by_basename(name.to_bytes()).await }
+            let name = name.clone();
+            async move { root_nodes_provider.get_by_basename(&name).await }
         }) {
             // if there was an error looking up the root node, propagate up an IO error.
             Err(_e) => Err(io::Error::from_raw_os_error(libc::EIO)),
@@ -268,15 +268,9 @@ where
             Ok(None) => Err(io::Error::from_raw_os_error(libc::ENOENT)),
             // The root node does exist
             Ok(Some(root_node)) => {
-                // The name must match what's passed in the lookup, otherwise this is also a ENOENT.
-                if root_node.get_name() != name.to_bytes() {
-                    debug!(root_node.name=?root_node.get_name(), found_node.name=%name.to_string_lossy(), "node name mismatch");
-                    return Err(io::Error::from_raw_os_error(libc::ENOENT));
-                }
-
                 // Let's check if someone else beat us to updating the inode tracker and
                 // root_nodes map. This avoids locking inode_tracker for writing.
-                if let Some(ino) = self.root_nodes.read().get(name.to_bytes()) {
+                if let Some(ino) = self.root_nodes.read().get(name) {
                     return Ok((
                         *ino,
                         self.inode_tracker.read().get(*ino).expect("must exist"),
@@ -290,9 +284,9 @@ where
 
                 // insert the (sparse) inode data and register in
                 // self.root_nodes.
-                let (inode_data, name) = InodeData::from_node(root_node);
+                let inode_data = InodeData::from_node(&root_node);
                 let ino = inode_tracker.put(inode_data.clone());
-                root_nodes.insert(name, ino);
+                root_nodes.insert(name.to_owned(), ino);
 
                 Ok((ino, Arc::new(inode_data)))
             }
@@ -306,10 +300,22 @@ const ROOT_NODES_BUFFER_SIZE: usize = 16;
 const XATTR_NAME_DIRECTORY_DIGEST: &[u8] = b"user.tvix.castore.directory.digest";
 const XATTR_NAME_BLOB_DIGEST: &[u8] = b"user.tvix.castore.blob.digest";
 
+#[cfg(all(feature = "virtiofs", target_os = "linux"))]
+impl<BS, DS, RN> fuse_backend_rs::api::filesystem::Layer for TvixStoreFs<BS, DS, RN>
+where
+    BS: BlobService + Clone + Send + 'static,
+    DS: DirectoryService + Send + Clone + 'static,
+    RN: RootNodes + Clone + 'static,
+{
+    fn root_inode(&self) -> Self::Inode {
+        ROOT_ID
+    }
+}
+
 impl<BS, DS, RN> FileSystem for TvixStoreFs<BS, DS, RN>
 where
-    BS: AsRef<dyn BlobService> + Clone + Send + 'static,
-    DS: AsRef<dyn DirectoryService> + Send + Clone + 'static,
+    BS: BlobService + Clone + Send + 'static,
+    DS: DirectoryService + Send + Clone + 'static,
     RN: RootNodes + Clone + 'static,
 {
     type Handle = u64;
@@ -348,13 +354,17 @@ where
     ) -> io::Result<fuse_backend_rs::api::filesystem::Entry> {
         debug!("lookup");
 
+        // convert the CStr to a PathComponent
+        // If it can't be converted, we definitely don't have anything here.
+        let name: PathComponent = name.try_into().map_err(|_| std::io::ErrorKind::NotFound)?;
+
         // This goes from a parent inode to a node.
         // - If the parent is [ROOT_ID], we need to check
         //   [self.root_nodes] (fetching from a [RootNode] provider if needed)
         // - Otherwise, lookup the parent in [self.inode_tracker] (which must be
         //   a [InodeData::Directory]), and find the child with that name.
         if parent == ROOT_ID {
-            let (ino, inode_data) = self.name_in_root_to_ino_and_data(name)?;
+            let (ino, inode_data) = self.name_in_root_to_ino_and_data(&name)?;
 
             debug!(inode_data=?&inode_data, ino=ino, "Some");
             return Ok(inode_data.as_fuse_entry(ino));
@@ -367,7 +377,7 @@ where
         // Search for that name in the list of children and return the FileAttrs.
 
         // in the children, find the one with the desired name.
-        if let Some((child_ino, _)) = children.iter().find(|e| e.1.get_name() == name.to_bytes()) {
+        if let Some((child_ino, _, _)) = children.iter().find(|(_, n, _)| n == &name) {
             // lookup the child [InodeData] in [self.inode_tracker].
             // We know the inodes for children have already been allocated.
             let child_inode_data = self.inode_tracker.read().get(*child_ino).unwrap();
@@ -400,16 +410,20 @@ where
 
             // This task will run in the background immediately and will exit
             // after the stream ends or if we no longer want any more entries.
-            self.tokio_handle.spawn(async move {
-                let mut stream = root_nodes_provider.list().enumerate();
-                while let Some(node) = stream.next().await {
-                    if tx.send(node).await.is_err() {
-                        // If we get a send error, it means the sync code
-                        // doesn't want any more entries.
-                        break;
+            self.tokio_handle.spawn(
+                async move {
+                    let mut stream = root_nodes_provider.list().enumerate();
+                    while let Some(e) = stream.next().await {
+                        if tx.send(e).await.is_err() {
+                            // If we get a send error, it means the sync code
+                            // doesn't want any more entries.
+                            break;
+                        }
                     }
                 }
-            });
+                // instrument the task with the current span, this is not done by default
+                .in_current_span(),
+            );
 
             // Put the rx part into [self.dir_handles].
             // TODO: this will overflow after 2**64 operations,
@@ -462,12 +476,12 @@ where
                 .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
 
             while let Some((i, n)) = rx.blocking_recv() {
-                let root_node = n.map_err(|e| {
+                let (name, node) = n.map_err(|e| {
                     warn!("failed to retrieve root node: {}", e);
                     io::Error::from_raw_os_error(libc::EIO)
                 })?;
 
-                let (inode_data, name) = InodeData::from_node(root_node);
+                let inode_data = InodeData::from_node(&node);
 
                 // obtain the inode, or allocate a new one.
                 let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
@@ -482,7 +496,7 @@ where
                     ino,
                     offset: offset + (i as u64) + 1,
                     type_: inode_data.as_fuse_type(),
-                    name: &name,
+                    name: name.as_ref(),
                 })?;
                 // If the buffer is full, add_entry will return `Ok(0)`.
                 if written == 0 {
@@ -496,15 +510,17 @@ where
         let (parent_digest, children) = self.get_directory_children(inode)?;
         Span::current().record("directory.digest", parent_digest.to_string());
 
-        for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
-            let (inode_data, name) = InodeData::from_node(child_node);
+        for (i, (ino, child_name, child_node)) in
+            children.into_iter().skip(offset as usize).enumerate()
+        {
+            let inode_data = InodeData::from_node(&child_node);
 
             // the second parameter will become the "offset" parameter on the next call.
             let written = add_entry(fuse_backend_rs::api::filesystem::DirEntry {
                 ino,
                 offset: offset + (i as u64) + 1,
                 type_: inode_data.as_fuse_type(),
-                name: &name,
+                name: child_name.as_ref(),
             })?;
             // If the buffer is full, add_entry will return `Ok(0)`.
             if written == 0 {
@@ -549,12 +565,12 @@ where
                 .map_err(|_| crate::Error::StorageError("mutex poisoned".into()))?;
 
             while let Some((i, n)) = rx.blocking_recv() {
-                let root_node = n.map_err(|e| {
+                let (name, node) = n.map_err(|e| {
                     warn!("failed to retrieve root node: {}", e);
                     io::Error::from_raw_os_error(libc::EPERM)
                 })?;
 
-                let (inode_data, name) = InodeData::from_node(root_node);
+                let inode_data = InodeData::from_node(&node);
 
                 // obtain the inode, or allocate a new one.
                 let ino = self.get_inode_for_root_name(&name).unwrap_or_else(|| {
@@ -570,7 +586,7 @@ where
                         ino,
                         offset: offset + (i as u64) + 1,
                         type_: inode_data.as_fuse_type(),
-                        name: &name,
+                        name: name.as_ref(),
                     },
                     inode_data.as_fuse_entry(ino),
                 )?;
@@ -586,8 +602,8 @@ where
         let (parent_digest, children) = self.get_directory_children(inode)?;
         Span::current().record("directory.digest", parent_digest.to_string());
 
-        for (i, (ino, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
-            let (inode_data, name) = InodeData::from_node(child_node);
+        for (i, (ino, name, child_node)) in children.into_iter().skip(offset as usize).enumerate() {
+            let inode_data = InodeData::from_node(&child_node);
 
             // the second parameter will become the "offset" parameter on the next call.
             let written = add_entry(
@@ -595,7 +611,7 @@ where
                     ino,
                     offset: offset + (i as u64) + 1,
                     type_: inode_data.as_fuse_type(),
-                    name: &name,
+                    name: name.as_ref(),
                 },
                 inode_data.as_fuse_entry(ino),
             )?;
@@ -640,6 +656,7 @@ where
     ) -> io::Result<(
         Option<Self::Handle>,
         fuse_backend_rs::api::filesystem::OpenOptions,
+        Option<u32>,
     )> {
         if inode == ROOT_ID {
             return Err(io::Error::from_raw_os_error(libc::ENOSYS));
@@ -658,7 +675,7 @@ where
                 match self.tokio_handle.block_on({
                     let blob_service = self.blob_service.clone();
                     let blob_digest = blob_digest.clone();
-                    async move { blob_service.as_ref().open_read(&blob_digest).await }
+                    async move { blob_service.open_read(&blob_digest).await }
                 }) {
                     Ok(None) => {
                         warn!("blob not found");
@@ -683,6 +700,7 @@ where
                         Ok((
                             Some(fh),
                             fuse_backend_rs::api::filesystem::OpenOptions::empty(),
+                            None,
                         ))
                     }
                 }
diff --git a/tvix/castore/src/fs/root_nodes.rs b/tvix/castore/src/fs/root_nodes.rs
index 6609e049a1fc..5ed1a4d8d6c0 100644
--- a/tvix/castore/src/fs/root_nodes.rs
+++ b/tvix/castore/src/fs/root_nodes.rs
@@ -1,7 +1,6 @@
 use std::collections::BTreeMap;
 
-use crate::{proto::node::Node, Error};
-use bytes::Bytes;
+use crate::{path::PathComponent, Error, Node};
 use futures::stream::BoxStream;
 use tonic::async_trait;
 
@@ -11,11 +10,12 @@ use tonic::async_trait;
 pub trait RootNodes: Send + Sync {
     /// Looks up a root CA node based on the basename of the node in the root
     /// directory of the filesystem.
-    async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error>;
+    async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error>;
 
-    /// Lists all root CA nodes in the filesystem. An error can be returned
-    /// in case listing is not allowed
-    fn list(&self) -> BoxStream<Result<Node, Error>>;
+    /// Lists all root CA nodes in the filesystem, as a tuple of (base)name
+    /// and Node.
+    /// An error can be returned in case listing is not allowed.
+    fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>>;
 }
 
 #[async_trait]
@@ -23,15 +23,17 @@ pub trait RootNodes: Send + Sync {
 /// the key is the node name.
 impl<T> RootNodes for T
 where
-    T: AsRef<BTreeMap<Bytes, Node>> + Send + Sync,
+    T: AsRef<BTreeMap<PathComponent, Node>> + Send + Sync,
 {
-    async fn get_by_basename(&self, name: &[u8]) -> Result<Option<Node>, Error> {
+    async fn get_by_basename(&self, name: &PathComponent) -> Result<Option<Node>, Error> {
         Ok(self.as_ref().get(name).cloned())
     }
 
-    fn list(&self) -> BoxStream<Result<Node, Error>> {
+    fn list(&self) -> BoxStream<Result<(PathComponent, Node), Error>> {
         Box::pin(tokio_stream::iter(
-            self.as_ref().iter().map(|(_, v)| Ok(v.clone())),
+            self.as_ref()
+                .iter()
+                .map(|(name, node)| Ok((name.to_owned(), node.to_owned()))),
         ))
     }
 }