diff options
Diffstat (limited to 'tvix')
-rw-r--r-- | tvix/Cargo.lock | 10 | ||||
-rw-r--r-- | tvix/Cargo.nix | 25 | ||||
-rw-r--r-- | tvix/castore/Cargo.toml | 6 | ||||
-rw-r--r-- | tvix/castore/src/fs/fuse/mod.rs | 56 | ||||
-rw-r--r-- | tvix/castore/src/fs/fuse/tests.rs | 44 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 28 |
6 files changed, 113 insertions, 56 deletions
diff --git a/tvix/Cargo.lock b/tvix/Cargo.lock index 7bc0eb232b50..b4ee59031ba9 100644 --- a/tvix/Cargo.lock +++ b/tvix/Cargo.lock @@ -3694,6 +3694,15 @@ dependencies = [ ] [[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + +[[package]] name = "tikv-jemalloc-sys" version = "0.5.4+5.3.0-patched" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -4230,6 +4239,7 @@ dependencies = [ "sled", "tempfile", "thiserror", + "threadpool", "tokio", "tokio-retry", "tokio-stream", diff --git a/tvix/Cargo.nix b/tvix/Cargo.nix index 3b8c45bf48f4..cd8ec667f36d 100644 --- a/tvix/Cargo.nix +++ b/tvix/Cargo.nix @@ -11293,6 +11293,24 @@ rec { ]; features = { }; }; + "threadpool" = rec { + crateName = "threadpool"; + version = "1.8.1"; + edition = "2015"; + sha256 = "1amgfyzvynbm8pacniivzq9r0fh3chhs7kijic81j76l6c5ycl6h"; + authors = [ + "The Rust Project Developers" + "Corey Farwell <coreyf@rwell.org>" + "Stefan Schindler <dns2utf8@estada.ch>" + ]; + dependencies = [ + { + name = "num_cpus"; + packageId = "num_cpus"; + } + ]; + + }; "tikv-jemalloc-sys" = rec { crateName = "tikv-jemalloc-sys"; version = "0.5.4+5.3.0-patched"; @@ -13320,6 +13338,11 @@ rec { packageId = "thiserror"; } { + name = "threadpool"; + packageId = "threadpool"; + optional = true; + } + { name = "tokio"; packageId = "tokio"; features = [ "fs" "macros" "net" "rt" "rt-multi-thread" "signal" ]; @@ -13445,7 +13468,7 @@ rec { features = { "cloud" = [ "dep:bigtable_rs" "object_store/aws" "object_store/azure" "object_store/gcp" ]; "default" = [ "cloud" ]; - "fs" = [ "dep:libc" "dep:fuse-backend-rs" ]; + "fs" = [ "dep:fuse-backend-rs" "dep:threadpool" "dep:libc" ]; "fuse" = [ "fs" ]; "tonic-reflection" = [ "dep:tonic-reflection" ]; "virtiofs" = [ "fs" "dep:vhost" "dep:vhost-user-backend" "dep:virtio-queue" "dep:vm-memory" "dep:vmm-sys-util" "dep:virtio-bindings" "fuse-backend-rs?/vhost-user-fs" "fuse-backend-rs?/virtiofs" ]; diff --git a/tvix/castore/Cargo.toml b/tvix/castore/Cargo.toml index c03dca3ebb34..e82a6d12421d 100644 --- a/tvix/castore/Cargo.toml +++ b/tvix/castore/Cargo.toml @@ -51,6 +51,10 @@ version = "0.11.0" optional = true version = "0.2.144" +[dependencies.threadpool] +version = "1.8.1" +optional = true + [dependencies.tonic-reflection] optional = true version = "0.11.0" @@ -100,7 +104,7 @@ cloud = [ "object_store/azure", "object_store/gcp", ] -fs = ["dep:libc", "dep:fuse-backend-rs"] +fs = ["dep:fuse-backend-rs", "dep:threadpool", "dep:libc"] virtiofs = [ "fs", "dep:vhost", diff --git a/tvix/castore/src/fs/fuse/mod.rs b/tvix/castore/src/fs/fuse/mod.rs index 94b73d422a14..64ef29ed2aa1 100644 --- a/tvix/castore/src/fs/fuse/mod.rs +++ b/tvix/castore/src/fs/fuse/mod.rs @@ -1,6 +1,8 @@ -use std::{io, path::Path, sync::Arc, thread}; +use std::{io, path::Path, sync::Arc}; use fuse_backend_rs::{api::filesystem::FileSystem, transport::FuseSession}; +use parking_lot::Mutex; +use threadpool::ThreadPool; use tracing::{error, instrument}; #[cfg(test)] @@ -49,9 +51,12 @@ where } } +/// Starts a [Filesystem] with the specified number of threads, and provides +/// functions to unmount, and wait for it to have completed. +#[derive(Clone)] pub struct FuseDaemon { - session: FuseSession, - threads: Vec<thread::JoinHandle<()>>, + session: Arc<Mutex<FuseSession>>, + threads: Arc<ThreadPool>, } impl FuseDaemon { @@ -59,7 +64,7 @@ impl FuseDaemon { pub fn new<FS, P>( fs: FS, mountpoint: P, - threads: usize, + num_threads: usize, allow_other: bool, ) -> Result<Self, io::Error> where @@ -76,40 +81,49 @@ impl FuseDaemon { session .mount() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - let mut join_handles = Vec::with_capacity(threads); - for _ in 0..threads { + + // construct a thread pool + let threads = threadpool::Builder::new() + .num_threads(num_threads) + .thread_name("fuse_server".to_string()) + .build(); + + for _ in 0..num_threads { + // for each thread requested, create and start a FuseServer accepting requests. let mut server = FuseServer { server: server.clone(), channel: session .new_channel() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?, }; - let join_handle = thread::Builder::new() - .name("fuse_server".to_string()) - .spawn(move || { - let _ = server.start(); - })?; - join_handles.push(join_handle); + + threads.execute(move || { + let _ = server.start(); + }); } Ok(FuseDaemon { - session, - threads: join_handles, + session: Arc::new(Mutex::new(session)), + threads: Arc::new(threads), }) } + /// Waits for all threads to finish. + #[instrument(skip_all)] + pub fn wait(&self) { + self.threads.join() + } + + /// Send the unmount command, and waits for all threads to finish. #[instrument(skip_all, err)] - pub fn unmount(&mut self) -> Result<(), io::Error> { + pub fn unmount(&self) -> Result<(), io::Error> { + // Send the unmount command. self.session + .lock() .umount() .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; - for thread in self.threads.drain(..) { - thread.join().map_err(|_| { - io::Error::new(io::ErrorKind::Other, "failed to join fuse server thread") - })?; - } - + self.wait(); Ok(()) } } diff --git a/tvix/castore/src/fs/fuse/tests.rs b/tvix/castore/src/fs/fuse/tests.rs index bb321f5888f8..bcebcf4a7292 100644 --- a/tvix/castore/src/fs/fuse/tests.rs +++ b/tvix/castore/src/fs/fuse/tests.rs @@ -248,7 +248,7 @@ async fn mount() { let (blob_service, directory_service) = gen_svcs(); - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, BTreeMap::default(), @@ -271,7 +271,7 @@ async fn root() { let tmpdir = TempDir::new().unwrap(); let (blob_service, directory_service) = gen_svcs(); - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, BTreeMap::default(), @@ -305,7 +305,7 @@ async fn root_with_listing() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -349,7 +349,7 @@ async fn stat_file_at_root() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -386,7 +386,7 @@ async fn read_file_at_root() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -423,7 +423,7 @@ async fn read_large_file_at_root() { populate_blob_b(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -468,7 +468,7 @@ async fn symlink_readlink() { populate_symlink(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -515,7 +515,7 @@ async fn read_stat_through_symlink() { populate_blob_a(&blob_service, &mut root_nodes).await; populate_symlink(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -560,7 +560,7 @@ async fn read_stat_directory() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -597,7 +597,7 @@ async fn xattr() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -680,7 +680,7 @@ async fn read_blob_inside_dir() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -720,7 +720,7 @@ async fn read_blob_deep_inside_dir() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -763,7 +763,7 @@ async fn readdir() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -823,7 +823,7 @@ async fn readdir_deep() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -873,7 +873,7 @@ async fn check_attributes() { populate_symlink(&mut root_nodes).await; populate_blob_helloworld(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -948,7 +948,7 @@ async fn compare_inodes_directories() { populate_directory_with_keep(&blob_service, &directory_service, &mut root_nodes).await; populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -992,7 +992,7 @@ async fn compare_inodes_files() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1041,7 +1041,7 @@ async fn compare_inodes_symlinks() { populate_directory_complicated(&blob_service, &directory_service, &mut root_nodes).await; populate_symlink2(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1084,7 +1084,7 @@ async fn read_wrong_paths_in_root() { populate_blob_a(&blob_service, &mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1139,7 +1139,7 @@ async fn disallow_writes() { let (blob_service, directory_service) = gen_svcs(); let root_nodes = BTreeMap::default(); - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1171,7 +1171,7 @@ async fn missing_directory() { populate_directorynode_without_directory(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, @@ -1219,7 +1219,7 @@ async fn missing_blob() { populate_filenode_without_blob(&mut root_nodes).await; - let mut fuse_daemon = do_mount( + let fuse_daemon = do_mount( blob_service, directory_service, root_nodes, diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index dadfa114f72b..4353cbdd036a 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -452,7 +452,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> { ) .await?; - let mut fuse_daemon = tokio::task::spawn_blocking(move || { + let fuse_daemon = tokio::task::spawn_blocking(move || { let fs = make_fs( blob_service, directory_service, @@ -466,16 +466,22 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error>> { }) .await??; - // grab a handle to unmount the file system, and register a signal - // handler. - tokio::spawn(async move { - tokio::signal::ctrl_c().await.unwrap(); - info!("interrupt received, unmounting…"); - tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??; - info!("unmount occured, terminating…"); - Ok::<_, std::io::Error>(()) - }) - .await??; + // Wait for a ctrl_c and then call fuse_daemon.unmount(). + tokio::spawn({ + let fuse_daemon = fuse_daemon.clone(); + async move { + tokio::signal::ctrl_c().await.unwrap(); + info!("interrupt received, unmounting…"); + tokio::task::spawn_blocking(move || fuse_daemon.unmount()).await??; + info!("unmount occured, terminating…"); + Ok::<_, std::io::Error>(()) + } + }); + + // Wait for the server to finish, which can either happen through it + // being unmounted externally, or receiving a signal invoking the + // handler above. + tokio::task::spawn_blocking(move || fuse_daemon.wait()).await? } #[cfg(feature = "virtiofs")] Commands::VirtioFs { |