diff options
author | Connor Brewster <cbrewster@hey.com> | 2023-09-16T15·20-0500 |
---|---|---|
committer | Connor Brewster <cbrewster@hey.com> | 2023-09-20T14·21+0000 |
commit | 6b7c936bc50934b45df132f659292e2c45256dea (patch) | |
tree | 74b369688e63ed4ff34c9270584ba30b91d7c979 /tvix/store/src/bin/tvix-store.rs | |
parent | 237c0eb415c632ec3480d890084c3d815298ac1f (diff) |
refactor(tvix/store/fuse): Switch from fuser to fuse-backend-rs r/6621
This switches the FUSE implementation from fuser to fuse-backend-rs. fuse-backend-rs is designed to work with both FUSE and virtiofs. Virtiofs support will make it possible to plug the tvix-store into a microvm and have `/nix/store` access without having to setup FUSE inside the guest. Additionally fuse-backend-rs has nice support for running multiple FUSE threads and has some async support. The goal of this commit is to mechanically switch over to fuse-backend-rs with minimal changes. I did have to add some locks here and there because fuse-backend-rs uses `&self` on all methods whereas fuser uses `&mut self`. `&self` is required for concurrent access to the FUSE server, so this makes sense. We can consider switching to concurrent maps and use some other techniques to reduce lock contention and critical section size. Issue: https://b.tvl.fyi/issues/305 Change-Id: Icde5a58c6eef98f8984c1e04e980b756dfb76b47 Reviewed-on: https://cl.tvl.fyi/c/depot/+/9341 Tested-by: BuildkiteCI Reviewed-by: flokli <flokli@flokli.de>
Diffstat (limited to 'tvix/store/src/bin/tvix-store.rs')
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 32 |
1 files changed, 18 insertions, 14 deletions
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 0da264808efb..f707e3850d4f 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -22,7 +22,7 @@ use tvix_store::proto::GRPCPathInfoServiceWrapper; use tvix_store::proto::NamedNode; use tvix_store::proto::NarInfo; use tvix_store::proto::PathInfo; -use tvix_store::FUSE; +use tvix_store::{FuseDaemon, FUSE}; #[cfg(feature = "reflection")] use tvix_store::proto::FILE_DESCRIPTOR_SET; @@ -94,6 +94,10 @@ enum Commands { #[arg(long, env, default_value = "grpc+http://[::1]:8000")] path_info_service_addr: String, + /// Number of FUSE threads to spawn. + #[arg(long, env, default_value_t = default_threads())] + threads: usize, + /// Whether to list elements at the root of the mount point. /// This is useful if your PathInfoService doesn't provide an /// (exhaustive) listing. @@ -102,6 +106,13 @@ enum Commands { }, } +#[cfg(feature = "fuse")] +fn default_threads() -> usize { + std::thread::available_parallelism() + .map(|threads| threads.into()) + .unwrap_or(4) +} + #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let cli = Cli::parse(); @@ -280,6 +291,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { directory_service_addr, path_info_service_addr, list_root, + threads, } => { let blob_service = blobservice::from_addr(&blob_service_addr)?; let directory_service = directoryservice::from_addr(&directory_service_addr)?; @@ -289,35 +301,27 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { directory_service.clone(), )?; - let mut fuse_session = tokio::task::spawn_blocking(move || { + let mut fuse_daemon = tokio::task::spawn_blocking(move || { let f = FUSE::new( blob_service, directory_service, path_info_service, list_root, ); + info!("mounting tvix-store on {:?}", &dest); - fuser::Session::new(f, &dest, &[]) + FuseDaemon::new(f, &dest, threads) }) .await??; // grab a handle to unmount the file system, and register a signal // handler. - let mut fuse_unmounter = fuse_session.unmount_callable(); tokio::spawn(async move { tokio::signal::ctrl_c().await.unwrap(); info!("interrupt received, unmounting…"); - fuse_unmounter.unmount().unwrap(); - }); - - // Start the fuse filesystem and wait for its completion, which - // happens when it's unmounted externally, or via the signal handler - // task. - tokio::task::spawn_blocking(move || -> io::Result<()> { - info!("mounting tvix-store on {:?}", fuse_session.mountpoint()); - fuse_session.run()?; + fuse_daemon.unmount()?; info!("unmount occured, terminating…"); - Ok(()) + Ok::<_, io::Error>(()) }) .await??; } |