about summary refs log tree commit diff
diff options
context:
space:
mode:
authorConnor Brewster <cbrewster@hey.com>2023-09-16T18·54-0500
committerclbot <clbot@tvl.fyi>2023-09-20T14·27+0000
commit7e737fde34260daa477794d63b0b3344b4a1d81b (patch)
tree51f7d4b3c7f18aa78e584b06840fcdd645474895
parent6b7c936bc50934b45df132f659292e2c45256dea (diff)
refactor(tvix/store/fs): Separate FUSE and filesystem code r/6622
In prepration for adding virtiofs support, I thought it would make sense
to split out the filesystem implementation from FUSE itself.

The `fs` module holds the tvix-store filesystem implemetation and the
`fuse` module holds the code to spawn a FUSE daemon backed by multiple
threads.

Change-Id: I8c58447b8c3aa016a613068f8e7ec166554e237c
Reviewed-on: https://cl.tvl.fyi/c/depot/+/9343
Reviewed-by: flokli <flokli@flokli.de>
Tested-by: BuildkiteCI
Autosubmit: Connor Brewster <cbrewster@hey.com>
-rw-r--r--tvix/Cargo.nix5
-rw-r--r--tvix/store/Cargo.toml3
-rw-r--r--tvix/store/src/bin/tvix-store.rs9
-rw-r--r--tvix/store/src/fs/file_attr.rs (renamed from tvix/store/src/fuse/file_attr.rs)0
-rw-r--r--tvix/store/src/fs/fuse.rs113
-rw-r--r--tvix/store/src/fs/inode_tracker.rs (renamed from tvix/store/src/fuse/inode_tracker.rs)2
-rw-r--r--tvix/store/src/fs/inodes.rs (renamed from tvix/store/src/fuse/inodes.rs)0
-rw-r--r--tvix/store/src/fs/mod.rs (renamed from tvix/store/src/fuse/mod.rs)129
-rw-r--r--tvix/store/src/fs/tests.rs (renamed from tvix/store/src/fuse/tests.rs)5
-rw-r--r--tvix/store/src/lib.rs8
10 files changed, 141 insertions, 133 deletions
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix
index cd9570c48ac5..002aaeab0989 100644
--- a/tvix/Cargo.nix
+++ b/tvix/Cargo.nix
@@ -8859,11 +8859,12 @@ rec {
         ];
         features = {
           "default" = [ "fuse" "reflection" ];
-          "fuse" = [ "dep:libc" "dep:fuse-backend-rs" ];
+          "fs" = [ "dep:libc" "dep:fuse-backend-rs" ];
+          "fuse" = [ "fs" ];
           "reflection" = [ "tonic-reflection" ];
           "tonic-reflection" = [ "dep:tonic-reflection" ];
         };
-        resolvedDefaultFeatures = [ "default" "fuse" "reflection" "tonic-reflection" ];
+        resolvedDefaultFeatures = [ "default" "fs" "fuse" "reflection" "tonic-reflection" ];
       };
       "typenum" = rec {
         crateName = "typenum";
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml
index cd71555851ff..ffa4426aef6a 100644
--- a/tvix/store/Cargo.toml
+++ b/tvix/store/Cargo.toml
@@ -58,5 +58,6 @@ tonic-mock = { git = "https://github.com/brainrake/tonic-mock", branch = "bump-d
 
 [features]
 default = ["fuse", "reflection"]
-fuse = ["dep:libc", "dep:fuse-backend-rs"]
+fs = ["dep:libc", "dep:fuse-backend-rs"]
+fuse = ["fs"]
 reflection = ["tonic-reflection"]
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index f707e3850d4f..1be8b00bd9b8 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -22,7 +22,12 @@ use tvix_store::proto::GRPCPathInfoServiceWrapper;
 use tvix_store::proto::NamedNode;
 use tvix_store::proto::NarInfo;
 use tvix_store::proto::PathInfo;
-use tvix_store::{FuseDaemon, FUSE};
+
+#[cfg(feature = "fs")]
+use tvix_store::fs::TvixStoreFs;
+
+#[cfg(feature = "fuse")]
+use tvix_store::fs::fuse::FuseDaemon;
 
 #[cfg(feature = "reflection")]
 use tvix_store::proto::FILE_DESCRIPTOR_SET;
@@ -302,7 +307,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
             )?;
 
             let mut fuse_daemon = tokio::task::spawn_blocking(move || {
-                let f = FUSE::new(
+                let f = TvixStoreFs::new(
                     blob_service,
                     directory_service,
                     path_info_service,
diff --git a/tvix/store/src/fuse/file_attr.rs b/tvix/store/src/fs/file_attr.rs
index b946aa977a0a..b946aa977a0a 100644
--- a/tvix/store/src/fuse/file_attr.rs
+++ b/tvix/store/src/fs/file_attr.rs
diff --git a/tvix/store/src/fs/fuse.rs b/tvix/store/src/fs/fuse.rs
new file mode 100644
index 000000000000..8535c7858450
--- /dev/null
+++ b/tvix/store/src/fs/fuse.rs
@@ -0,0 +1,113 @@
+use std::{io, path::Path, sync::Arc, thread};
+
+use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession};
+use tracing::error;
+
+struct FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
+    channel: fuse_backend_rs::transport::FuseChannel,
+}
+
+impl<FS> FuseServer<FS>
+where
+    FS: FileSystem + Sync + Send,
+{
+    fn start(&mut self) -> io::Result<()> {
+        loop {
+            if let Some((reader, writer)) = self
+                .channel
+                .get_request()
+                .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
+            {
+                if let Err(e) = self
+                    .server
+                    .handle_message(reader, writer.into(), None, None)
+                {
+                    match e {
+                        // This indicates the session has been shut down.
+                        fuse_backend_rs::Error::EncodeMessage(e)
+                            if e.raw_os_error() == Some(libc::EBADFD) =>
+                        {
+                            break;
+                        }
+                        error => {
+                            error!(?error, "failed to handle fuse request");
+                            continue;
+                        }
+                    }
+                }
+            } else {
+                break;
+            }
+        }
+        Ok(())
+    }
+}
+
+pub struct FuseDaemon {
+    session: FuseSession,
+    threads: Vec<thread::JoinHandle<()>>,
+}
+
+impl FuseDaemon {
+    pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error>
+    where
+        FS: FileSystem + Sync + Send + 'static,
+        P: AsRef<Path>,
+    {
+        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
+
+        let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        session.set_allow_other(false);
+        session
+            .mount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+        let mut join_handles = Vec::with_capacity(threads);
+        for _ in 0..threads {
+            let mut server = FuseServer {
+                server: server.clone(),
+                channel: session
+                    .new_channel()
+                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
+            };
+            let join_handle = thread::Builder::new()
+                .name("fuse_server".to_string())
+                .spawn(move || {
+                    let _ = server.start();
+                })?;
+            join_handles.push(join_handle);
+        }
+
+        Ok(FuseDaemon {
+            session,
+            threads: join_handles,
+        })
+    }
+
+    pub fn unmount(&mut self) -> Result<(), io::Error> {
+        self.session
+            .umount()
+            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
+
+        for thread in self.threads.drain(..) {
+            thread.join().map_err(|_| {
+                io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
+            })?;
+        }
+
+        Ok(())
+    }
+}
+
+impl Drop for FuseDaemon {
+    fn drop(&mut self) {
+        if let Err(error) = self.unmount() {
+            error!(?error, "failed to unmont fuse filesystem")
+        }
+    }
+}
diff --git a/tvix/store/src/fuse/inode_tracker.rs b/tvix/store/src/fs/inode_tracker.rs
index 97a9744c31d7..ad1ef859a2f3 100644
--- a/tvix/store/src/fuse/inode_tracker.rs
+++ b/tvix/store/src/fs/inode_tracker.rs
@@ -197,7 +197,7 @@ impl InodeTracker {
 
 #[cfg(test)]
 mod tests {
-    use crate::fuse::inodes::DirectoryInodeData;
+    use crate::fs::inodes::DirectoryInodeData;
     use crate::proto;
     use crate::tests::fixtures;
 
diff --git a/tvix/store/src/fuse/inodes.rs b/tvix/store/src/fs/inodes.rs
index e8959ce3629b..e8959ce3629b 100644
--- a/tvix/store/src/fuse/inodes.rs
+++ b/tvix/store/src/fs/inodes.rs
diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fs/mod.rs
index 1a5d884ef7a9..48e605406331 100644
--- a/tvix/store/src/fuse/mod.rs
+++ b/tvix/store/src/fs/mod.rs
@@ -2,39 +2,37 @@ mod file_attr;
 mod inode_tracker;
 mod inodes;
 
+#[cfg(feature = "fuse")]
+pub mod fuse;
+
 #[cfg(test)]
 mod tests;
 
 use crate::{
     blobservice::{BlobReader, BlobService},
     directoryservice::DirectoryService,
-    fuse::inodes::{DirectoryInodeData, InodeData},
     pathinfoservice::PathInfoService,
     proto::{node::Node, NamedNode},
     B3Digest, Error,
 };
-use fuse_backend_rs::{
-    api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID},
-    transport::FuseSession,
-};
+use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID};
 use nix_compat::store_path::StorePath;
 use parking_lot::RwLock;
 use std::{
     collections::HashMap,
     io,
-    path::Path,
     str::FromStr,
     sync::atomic::AtomicU64,
     sync::{atomic::Ordering, Arc},
-    thread,
     time::Duration,
 };
 use tokio::io::{AsyncBufReadExt, AsyncSeekExt};
-use tracing::{debug, error, info_span, warn};
+use tracing::{debug, info_span, warn};
 
 use self::{
     file_attr::{gen_file_attr, ROOT_FILE_ATTR},
     inode_tracker::InodeTracker,
+    inodes::{DirectoryInodeData, InodeData},
 };
 
 /// This implements a read-only FUSE filesystem for a tvix-store
@@ -71,7 +69,7 @@ use self::{
 /// Due to the above being valid across the whole store, and considering the
 /// merkle structure is a DAG, not a tree, this also means we can't do "bucketed
 /// allocation", aka reserve Directory.size inodes for each PathInfo.
-pub struct FUSE {
+pub struct TvixStoreFs {
     blob_service: Arc<dyn BlobService>,
     directory_service: Arc<dyn DirectoryService>,
     path_info_service: Arc<dyn PathInfoService>,
@@ -93,7 +91,7 @@ pub struct FUSE {
     tokio_handle: tokio::runtime::Handle,
 }
 
-impl FUSE {
+impl TvixStoreFs {
     pub fn new(
         blob_service: Arc<dyn BlobService>,
         directory_service: Arc<dyn DirectoryService>,
@@ -224,7 +222,7 @@ impl FUSE {
     }
 }
 
-impl FileSystem for FUSE {
+impl FileSystem for TvixStoreFs {
     type Inode = u64;
     type Handle = u64;
 
@@ -660,112 +658,3 @@ impl FileSystem for FUSE {
         }
     }
 }
-
-struct FuseServer<FS>
-where
-    FS: FileSystem + Sync + Send,
-{
-    server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>,
-    channel: fuse_backend_rs::transport::FuseChannel,
-}
-
-impl<FS> FuseServer<FS>
-where
-    FS: FileSystem + Sync + Send,
-{
-    fn start(&mut self) -> io::Result<()> {
-        loop {
-            if let Some((reader, writer)) = self
-                .channel
-                .get_request()
-                .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))?
-            {
-                if let Err(e) = self
-                    .server
-                    .handle_message(reader, writer.into(), None, None)
-                {
-                    match e {
-                        // This indicates the session has been shut down.
-                        fuse_backend_rs::Error::EncodeMessage(e)
-                            if e.raw_os_error() == Some(libc::EBADFD) =>
-                        {
-                            break;
-                        }
-                        error => {
-                            error!(?error, "failed to handle fuse request");
-                            continue;
-                        }
-                    }
-                }
-            } else {
-                break;
-            }
-        }
-        Ok(())
-    }
-}
-
-pub struct FuseDaemon {
-    session: FuseSession,
-    threads: Vec<thread::JoinHandle<()>>,
-}
-
-impl FuseDaemon {
-    pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error>
-    where
-        FS: FileSystem + Sync + Send + 'static,
-        P: AsRef<Path>,
-    {
-        let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs)));
-
-        let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true)
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
-
-        session.set_allow_other(false);
-        session
-            .mount()
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
-        let mut join_handles = Vec::with_capacity(threads);
-        for _ in 0..threads {
-            let mut server = FuseServer {
-                server: server.clone(),
-                channel: session
-                    .new_channel()
-                    .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?,
-            };
-            let join_handle = thread::Builder::new()
-                .name("fuse_server".to_string())
-                .spawn(move || {
-                    let _ = server.start();
-                })?;
-            join_handles.push(join_handle);
-        }
-
-        Ok(FuseDaemon {
-            session,
-            threads: join_handles,
-        })
-    }
-
-    pub fn unmount(&mut self) -> Result<(), io::Error> {
-        self.session
-            .umount()
-            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
-
-        for thread in self.threads.drain(..) {
-            thread.join().map_err(|_| {
-                io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread")
-            })?;
-        }
-
-        Ok(())
-    }
-}
-
-impl Drop for FuseDaemon {
-    fn drop(&mut self) {
-        if let Err(error) = self.unmount() {
-            error!(?error, "failed to unmont fuse filesystem")
-        }
-    }
-}
diff --git a/tvix/store/src/fuse/tests.rs b/tvix/store/src/fs/tests.rs
index 81de8b13de58..30f5ca3f40aa 100644
--- a/tvix/store/src/fuse/tests.rs
+++ b/tvix/store/src/fs/tests.rs
@@ -8,11 +8,12 @@ use tempfile::TempDir;
 
 use crate::blobservice::BlobService;
 use crate::directoryservice::DirectoryService;
+use crate::fs::{fuse::FuseDaemon, TvixStoreFs};
 use crate::pathinfoservice::PathInfoService;
+use crate::proto;
 use crate::proto::{DirectoryNode, FileNode, PathInfo};
 use crate::tests::fixtures;
 use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service};
-use crate::{proto, FuseDaemon, FUSE};
 
 const BLOB_A_NAME: &str = "00000000000000000000000000000000-test";
 const BLOB_B_NAME: &str = "55555555555555555555555555555555-test";
@@ -41,7 +42,7 @@ fn do_mount<P: AsRef<Path>>(
     mountpoint: P,
     list_root: bool,
 ) -> io::Result<FuseDaemon> {
-    let fs = FUSE::new(
+    let fs = TvixStoreFs::new(
         blob_service,
         directory_service,
         path_info_service,
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index 252506de5977..6270812d47fc 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -1,7 +1,8 @@
 mod digests;
 mod errors;
-#[cfg(feature = "fuse")]
-mod fuse;
+
+#[cfg(feature = "fs")]
+pub mod fs;
 
 pub mod blobservice;
 pub mod directoryservice;
@@ -13,8 +14,5 @@ pub mod proto;
 pub use digests::B3Digest;
 pub use errors::Error;
 
-#[cfg(feature = "fuse")]
-pub use fuse::{FuseDaemon, FUSE};
-
 #[cfg(test)]
 mod tests;