diff options
author | Connor Brewster <cbrewster@hey.com> | 2023-09-16T18·54-0500 |
---|---|---|
committer | clbot <clbot@tvl.fyi> | 2023-09-20T14·27+0000 |
commit | 7e737fde34260daa477794d63b0b3344b4a1d81b (patch) | |
tree | 51f7d4b3c7f18aa78e584b06840fcdd645474895 | |
parent | 6b7c936bc50934b45df132f659292e2c45256dea (diff) |
refactor(tvix/store/fs): Separate FUSE and filesystem code r/6622
In prepration for adding virtiofs support, I thought it would make sense to split out the filesystem implementation from FUSE itself. The `fs` module holds the tvix-store filesystem implemetation and the `fuse` module holds the code to spawn a FUSE daemon backed by multiple threads. Change-Id: I8c58447b8c3aa016a613068f8e7ec166554e237c Reviewed-on: https://cl.tvl.fyi/c/depot/+/9343 Reviewed-by: flokli <flokli@flokli.de> Tested-by: BuildkiteCI Autosubmit: Connor Brewster <cbrewster@hey.com>
-rw-r--r-- | tvix/Cargo.nix | 5 | ||||
-rw-r--r-- | tvix/store/Cargo.toml | 3 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 9 | ||||
-rw-r--r-- | tvix/store/src/fs/file_attr.rs (renamed from tvix/store/src/fuse/file_attr.rs) | 0 | ||||
-rw-r--r-- | tvix/store/src/fs/fuse.rs | 113 | ||||
-rw-r--r-- | tvix/store/src/fs/inode_tracker.rs (renamed from tvix/store/src/fuse/inode_tracker.rs) | 2 | ||||
-rw-r--r-- | tvix/store/src/fs/inodes.rs (renamed from tvix/store/src/fuse/inodes.rs) | 0 | ||||
-rw-r--r-- | tvix/store/src/fs/mod.rs (renamed from tvix/store/src/fuse/mod.rs) | 129 | ||||
-rw-r--r-- | tvix/store/src/fs/tests.rs (renamed from tvix/store/src/fuse/tests.rs) | 5 | ||||
-rw-r--r-- | tvix/store/src/lib.rs | 8 |
10 files changed, 141 insertions, 133 deletions
diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index cd9570c48ac5..002aaeab0989 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -8859,11 +8859,12 @@ rec { ]; features = { "default" = [ "fuse" "reflection" ]; - "fuse" = [ "dep:libc" "dep:fuse-backend-rs" ]; + "fs" = [ "dep:libc" "dep:fuse-backend-rs" ]; + "fuse" = [ "fs" ]; "reflection" = [ "tonic-reflection" ]; "tonic-reflection" = [ "dep:tonic-reflection" ]; }; - resolvedDefaultFeatures = [ "default" "fuse" "reflection" "tonic-reflection" ]; + resolvedDefaultFeatures = [ "default" "fs" "fuse" "reflection" "tonic-reflection" ]; }; "typenum" = rec { crateName = "typenum"; diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index cd71555851ff..ffa4426aef6a 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -58,5 +58,6 @@ tonic-mock = { git = "https://github.com/brainrake/tonic-mock", branch = "bump-d [features] default = ["fuse", "reflection"] -fuse = ["dep:libc", "dep:fuse-backend-rs"] +fs = ["dep:libc", "dep:fuse-backend-rs"] +fuse = ["fs"] reflection = ["tonic-reflection"] diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index f707e3850d4f..1be8b00bd9b8 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -22,7 +22,12 @@ use tvix_store::proto::GRPCPathInfoServiceWrapper; use tvix_store::proto::NamedNode; use tvix_store::proto::NarInfo; use tvix_store::proto::PathInfo; -use tvix_store::{FuseDaemon, FUSE}; + +#[cfg(feature = "fs")] +use tvix_store::fs::TvixStoreFs; + +#[cfg(feature = "fuse")] +use tvix_store::fs::fuse::FuseDaemon; #[cfg(feature = "reflection")] use tvix_store::proto::FILE_DESCRIPTOR_SET; @@ -302,7 +307,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { )?; let mut fuse_daemon = tokio::task::spawn_blocking(move || { - let f = FUSE::new( + let f = TvixStoreFs::new( blob_service, directory_service, path_info_service, diff --git a/tvix/store/src/fuse/file_attr.rs b/tvix/store/src/fs/file_attr.rs index b946aa977a0a..b946aa977a0a 100644 --- a/tvix/store/src/fuse/file_attr.rs +++ b/tvix/store/src/fs/file_attr.rs diff --git a/tvix/store/src/fs/fuse.rs b/tvix/store/src/fs/fuse.rs new file mode 100644 index 000000000000..8535c7858450 --- /dev/null +++ b/tvix/store/src/fs/fuse.rs @@ -0,0 +1,113 @@ +use std::{io, path::Path, sync::Arc, thread}; + +use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; +use tracing::error; + +struct FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>, + channel: fuse_backend_rs::transport::FuseChannel, +} + +impl<FS> FuseServer<FS> +where + FS: FileSystem + Sync + Send, +{ + fn start(&mut self) -> io::Result<()> { + loop { + if let Some((reader, writer)) = self + .channel + .get_request() + .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? + { + if let Err(e) = self + .server + .handle_message(reader, writer.into(), None, None) + { + match e { + // This indicates the session has been shut down. + fuse_backend_rs::Error::EncodeMessage(e) + if e.raw_os_error() == Some(libc::EBADFD) => + { + break; + } + error => { + error!(?error, "failed to handle fuse request"); + continue; + } + } + } + } else { + break; + } + } + Ok(()) + } +} + +pub struct FuseDaemon { + session: FuseSession, + threads: Vec<thread::JoinHandle<()>>, +} + +impl FuseDaemon { + pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error> + where + FS: FileSystem + Sync + Send + 'static, + P: AsRef<Path>, + { + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + session.set_allow_other(false); + session + .mount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + let mut join_handles = Vec::with_capacity(threads); + for _ in 0..threads { + let mut server = FuseServer { + server: server.clone(), + channel: session + .new_channel() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, + }; + let join_handle = thread::Builder::new() + .name("fuse_server".to_string()) + .spawn(move || { + let _ = server.start(); + })?; + join_handles.push(join_handle); + } + + Ok(FuseDaemon { + session, + threads: join_handles, + }) + } + + pub fn unmount(&mut self) -> Result<(), io::Error> { + self.session + .umount() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + + for thread in self.threads.drain(..) { + thread.join().map_err(|_| { + io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") + })?; + } + + Ok(()) + } +} + +impl Drop for FuseDaemon { + fn drop(&mut self) { + if let Err(error) = self.unmount() { + error!(?error, "failed to unmont fuse filesystem") + } + } +} diff --git a/tvix/store/src/fuse/inode_tracker.rs b/tvix/store/src/fs/inode_tracker.rs index 97a9744c31d7..ad1ef859a2f3 100644 --- a/tvix/store/src/fuse/inode_tracker.rs +++ b/tvix/store/src/fs/inode_tracker.rs @@ -197,7 +197,7 @@ impl InodeTracker { #[cfg(test)] mod tests { - use crate::fuse::inodes::DirectoryInodeData; + use crate::fs::inodes::DirectoryInodeData; use crate::proto; use crate::tests::fixtures; diff --git a/tvix/store/src/fuse/inodes.rs b/tvix/store/src/fs/inodes.rs index e8959ce3629b..e8959ce3629b 100644 --- a/tvix/store/src/fuse/inodes.rs +++ b/tvix/store/src/fs/inodes.rs diff --git a/tvix/store/src/fuse/mod.rs b/tvix/store/src/fs/mod.rs index 1a5d884ef7a9..48e605406331 100644 --- a/tvix/store/src/fuse/mod.rs +++ b/tvix/store/src/fs/mod.rs @@ -2,39 +2,37 @@ mod file_attr; mod inode_tracker; mod inodes; +#[cfg(feature = "fuse")] +pub mod fuse; + #[cfg(test)] mod tests; use crate::{ blobservice::{BlobReader, BlobService}, directoryservice::DirectoryService, - fuse::inodes::{DirectoryInodeData, InodeData}, pathinfoservice::PathInfoService, proto::{node::Node, NamedNode}, B3Digest, Error, }; -use fuse_backend_rs::{ - api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}, - transport::FuseSession, -}; +use fuse_backend_rs::api::filesystem::{Context, FileSystem, FsOptions, ROOT_ID}; use nix_compat::store_path::StorePath; use parking_lot::RwLock; use std::{ collections::HashMap, io, - path::Path, str::FromStr, sync::atomic::AtomicU64, sync::{atomic::Ordering, Arc}, - thread, time::Duration, }; use tokio::io::{AsyncBufReadExt, AsyncSeekExt}; -use tracing::{debug, error, info_span, warn}; +use tracing::{debug, info_span, warn}; use self::{ file_attr::{gen_file_attr, ROOT_FILE_ATTR}, inode_tracker::InodeTracker, + inodes::{DirectoryInodeData, InodeData}, }; /// This implements a read-only FUSE filesystem for a tvix-store @@ -71,7 +69,7 @@ use self::{ /// Due to the above being valid across the whole store, and considering the /// merkle structure is a DAG, not a tree, this also means we can't do "bucketed /// allocation", aka reserve Directory.size inodes for each PathInfo. -pub struct FUSE { +pub struct TvixStoreFs { blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, path_info_service: Arc<dyn PathInfoService>, @@ -93,7 +91,7 @@ pub struct FUSE { tokio_handle: tokio::runtime::Handle, } -impl FUSE { +impl TvixStoreFs { pub fn new( blob_service: Arc<dyn BlobService>, directory_service: Arc<dyn DirectoryService>, @@ -224,7 +222,7 @@ impl FUSE { } } -impl FileSystem for FUSE { +impl FileSystem for TvixStoreFs { type Inode = u64; type Handle = u64; @@ -660,112 +658,3 @@ impl FileSystem for FUSE { } } } - -struct FuseServer<FS> -where - FS: FileSystem + Sync + Send, -{ - server: Arc<fuse_backend_rs::api::server::Server<Arc<FS>>>, - channel: fuse_backend_rs::transport::FuseChannel, -} - -impl<FS> FuseServer<FS> -where - FS: FileSystem + Sync + Send, -{ - fn start(&mut self) -> io::Result<()> { - loop { - if let Some((reader, writer)) = self - .channel - .get_request() - .map_err(|_| io::Error::from_raw_os_error(libc::EINVAL))? - { - if let Err(e) = self - .server - .handle_message(reader, writer.into(), None, None) - { - match e { - // This indicates the session has been shut down. - fuse_backend_rs::Error::EncodeMessage(e) - if e.raw_os_error() == Some(libc::EBADFD) => - { - break; - } - error => { - error!(?error, "failed to handle fuse request"); - continue; - } - } - } - } else { - break; - } - } - Ok(()) - } -} - -pub struct FuseDaemon { - session: FuseSession, - threads: Vec<thread::JoinHandle<()>>, -} - -impl FuseDaemon { - pub fn new<FS, P>(fs: FS, mountpoint: P, threads: usize) -> Result<Self, io::Error> - where - FS: FileSystem + Sync + Send + 'static, - P: AsRef<Path>, - { - let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); - - let mut session = FuseSession::new(mountpoint.as_ref(), "tvix-store", "", true) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - - session.set_allow_other(false); - session - .mount() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - let mut join_handles = Vec::with_capacity(threads); - for _ in 0..threads { - let mut server = FuseServer { - server: server.clone(), - channel: session - .new_channel() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, - }; - let join_handle = thread::Builder::new() - .name("fuse_server".to_string()) - .spawn(move || { - let _ = server.start(); - })?; - join_handles.push(join_handle); - } - - Ok(FuseDaemon { - session, - threads: join_handles, - }) - } - - pub fn unmount(&mut self) -> Result<(), io::Error> { - self.session - .umount() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - - for thread in self.threads.drain(..) { - thread.join().map_err(|_| { - io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") - })?; - } - - Ok(()) - } -} - -impl Drop for FuseDaemon { - fn drop(&mut self) { - if let Err(error) = self.unmount() { - error!(?error, "failed to unmont fuse filesystem") - } - } -} diff --git a/tvix/store/src/fuse/tests.rs b/tvix/store/src/fs/tests.rs index 81de8b13de58..30f5ca3f40aa 100644 --- a/tvix/store/src/fuse/tests.rs +++ b/tvix/store/src/fs/tests.rs @@ -8,11 +8,12 @@ use tempfile::TempDir; use crate::blobservice::BlobService; use crate::directoryservice::DirectoryService; +use crate::fs::{fuse::FuseDaemon, TvixStoreFs}; use crate::pathinfoservice::PathInfoService; +use crate::proto; use crate::proto::{DirectoryNode, FileNode, PathInfo}; use crate::tests::fixtures; use crate::tests::utils::{gen_blob_service, gen_directory_service, gen_pathinfo_service}; -use crate::{proto, FuseDaemon, FUSE}; const BLOB_A_NAME: &str = "00000000000000000000000000000000-test"; const BLOB_B_NAME: &str = "55555555555555555555555555555555-test"; @@ -41,7 +42,7 @@ fn do_mount<P: AsRef<Path>>( mountpoint: P, list_root: bool, ) -> io::Result<FuseDaemon> { - let fs = FUSE::new( + let fs = TvixStoreFs::new( blob_service, directory_service, path_info_service, diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs index 252506de5977..6270812d47fc 100644 --- a/tvix/store/src/lib.rs +++ b/tvix/store/src/lib.rs @@ -1,7 +1,8 @@ mod digests; mod errors; -#[cfg(feature = "fuse")] -mod fuse; + +#[cfg(feature = "fs")] +pub mod fs; pub mod blobservice; pub mod directoryservice; @@ -13,8 +14,5 @@ pub mod proto; pub use digests::B3Digest; pub use errors::Error; -#[cfg(feature = "fuse")] -pub use fuse::{FuseDaemon, FUSE}; - #[cfg(test)] mod tests; |