diff options
Diffstat (limited to 'tvix/store')
-rw-r--r-- | tvix/store/Cargo.toml | 32 | ||||
-rw-r--r-- | tvix/store/src/bin/tvix-store.rs | 54 | ||||
-rw-r--r-- | tvix/store/src/fs/mod.rs | 3 | ||||
-rw-r--r-- | tvix/store/src/fs/virtiofs.rs | 237 |
4 files changed, 325 insertions, 1 deletions
diff --git a/tvix/store/Cargo.toml b/tvix/store/Cargo.toml index a2e143de7014..20909221c524 100644 --- a/tvix/store/Cargo.toml +++ b/tvix/store/Cargo.toml @@ -36,6 +36,35 @@ optional = true # TODO: Switch back to upstream version once https://github.com/cloud-hypervisor/fuse-backend-rs/pull/153 lands. git = "https://github.com/cbrewster/fuse-backend-rs.git" branch = "optional-allow_other" +# Ideally this would only be enabled if virtiofs is enabled +# Ex: virtiofs = [..., "fuse-backend-rs/?vhost-user-fs", ...] +# However, crate2nix doesn't properly understand this syntax and doesn't +# enable this feature properly. +features = ["vhost-user-fs"] + +[dependencies.vhost] +optional = true +version = "0.6" + +[dependencies.vhost-user-backend] +optional = true +version = "0.8" + +[dependencies.virtio-queue] +optional = true +version = "0.7" + +[dependencies.vm-memory] +optional = true +version = "0.10" + +[dependencies.vmm-sys-util] +optional = true +version = "0.11" + +[dependencies.virtio-bindings] +optional = true +version = "0.2.1" [dependencies.tonic-reflection] optional = true @@ -55,7 +84,8 @@ tempfile = "3.3.0" tonic-mock = { git = "https://github.com/brainrake/tonic-mock", branch = "bump-dependencies" } [features] -default = ["fuse", "reflection"] +default = ["fuse", "virtiofs", "reflection"] fs = ["dep:libc", "dep:fuse-backend-rs"] +virtiofs = ["fs", "dep:vhost", "dep:vhost-user-backend", "dep:virtio-queue", "dep:vm-memory", "dep:vmm-sys-util", "dep:virtio-bindings"] fuse = ["fs"] reflection = ["tonic-reflection"] diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs index 474a48c9fd1a..813d62cb129a 100644 --- a/tvix/store/src/bin/tvix-store.rs +++ b/tvix/store/src/bin/tvix-store.rs @@ -29,6 +29,9 @@ use tvix_store::fs::TvixStoreFs; #[cfg(feature = "fuse")] use tvix_store::fs::fuse::FuseDaemon; +#[cfg(feature = "virtiofs")] +use tvix_store::fs::virtiofs::start_virtiofs_daemon; + #[cfg(feature = "reflection")] use tvix_castore::proto::FILE_DESCRIPTOR_SET as CASTORE_FILE_DESCRIPTOR_SET; #[cfg(feature = "reflection")] @@ -111,6 +114,28 @@ enum Commands { #[clap(long, short, action)] list_root: bool, }, + /// Starts a tvix-store virtiofs daemon at the given socket path. + #[cfg(feature = "virtiofs")] + #[command(name = "virtiofs")] + VirtioFs { + #[clap(value_name = "PATH")] + socket: PathBuf, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + blob_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + directory_service_addr: String, + + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + + /// Whether to list elements at the root of the mount point. + /// This is useful if your PathInfoService doesn't provide an + /// (exhaustive) listing. + #[clap(long, short, action)] + list_root: bool, + }, } #[cfg(feature = "fuse")] @@ -328,6 +353,35 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> { }) .await??; } + #[cfg(feature = "virtiofs")] + Commands::VirtioFs { + socket, + blob_service_addr, + directory_service_addr, + path_info_service_addr, + list_root, + } => { + let blob_service = blobservice::from_addr(&blob_service_addr)?; + let directory_service = directoryservice::from_addr(&directory_service_addr)?; + let path_info_service = pathinfoservice::from_addr( + &path_info_service_addr, + blob_service.clone(), + directory_service.clone(), + )?; + + tokio::task::spawn_blocking(move || { + let fs = TvixStoreFs::new( + blob_service, + directory_service, + path_info_service, + list_root, + ); + info!("starting tvix-store virtiofs daemon on {:?}", &socket); + + start_virtiofs_daemon(fs, socket) + }) + .await??; + } }; Ok(()) } diff --git a/tvix/store/src/fs/mod.rs b/tvix/store/src/fs/mod.rs index 59b8f0d0854f..91adfa35f0e0 100644 --- a/tvix/store/src/fs/mod.rs +++ b/tvix/store/src/fs/mod.rs @@ -5,6 +5,9 @@ mod inodes; #[cfg(feature = "fuse")] pub mod fuse; +#[cfg(feature = "virtiofs")] +pub mod virtiofs; + #[cfg(test)] mod tests; diff --git a/tvix/store/src/fs/virtiofs.rs b/tvix/store/src/fs/virtiofs.rs new file mode 100644 index 000000000000..3786a84285cd --- /dev/null +++ b/tvix/store/src/fs/virtiofs.rs @@ -0,0 +1,237 @@ +use std::{ + convert, error, fmt, io, + ops::Deref, + path::Path, + sync::{Arc, MutexGuard, RwLock}, +}; + +use fuse_backend_rs::{ + api::{filesystem::FileSystem, server::Server}, + transport::{FsCacheReqHandler, Reader, VirtioFsWriter}, +}; +use tracing::error; +use vhost::vhost_user::{ + Listener, SlaveFsCacheReq, VhostUserProtocolFeatures, VhostUserVirtioFeatures, +}; +use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringMutex, VringState, VringT}; +use virtio_bindings::bindings::virtio_ring::{ + VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC, +}; +use virtio_queue::QueueT; +use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; +use vmm_sys_util::epoll::EventSet; + +const VIRTIO_F_VERSION_1: u32 = 32; +const NUM_QUEUES: usize = 2; +const QUEUE_SIZE: usize = 1024; + +#[derive(Debug)] +enum Error { + /// Failed to handle non-input event. + HandleEventNotEpollIn, + /// Failed to handle unknown event. + HandleEventUnknownEvent, + /// Invalid descriptor chain. + InvlaidDescriptorChain, + /// Failed to handle filesystem requests. + HandleRequests(fuse_backend_rs::Error), + /// Failed to construct new vhost user daemon. + NewDaemon, + /// Failed to start the vhost user daemon. + StartDaemon, + /// Failed to wait for the vhost user daemon. + WaitDaemon, +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "vhost_user_fs_error: {self:?}") + } +} + +impl error::Error for Error {} + +impl convert::From<Error> for io::Error { + fn from(e: Error) -> Self { + io::Error::new(io::ErrorKind::Other, e) + } +} + +struct VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + server: Arc<Server<Arc<FS>>>, + event_idx: bool, + guest_mem: GuestMemoryAtomic<GuestMemoryMmap>, + cache_req: Option<SlaveFsCacheReq>, +} + +impl<FS> VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + fn process_queue(&mut self, vring: &mut MutexGuard<VringState>) -> std::io::Result<bool> { + let mut used_descs = false; + + while let Some(desc_chain) = vring + .get_queue_mut() + .pop_descriptor_chain(self.guest_mem.memory()) + { + let memory = desc_chain.memory(); + let reader = Reader::from_descriptor_chain(memory, desc_chain.clone()) + .map_err(|_| Error::InvlaidDescriptorChain)?; + let writer = VirtioFsWriter::new(memory, desc_chain.clone()) + .map_err(|_| Error::InvlaidDescriptorChain)?; + + self.server + .handle_message( + reader, + writer.into(), + self.cache_req + .as_mut() + .map(|req| req as &mut dyn FsCacheReqHandler), + None, + ) + .map_err(Error::HandleRequests)?; + + // TODO: Is len 0 correct? + if let Err(error) = vring + .get_queue_mut() + .add_used(memory, desc_chain.head_index(), 0) + { + error!(?error, "failed to add desc back to ring"); + } + + // TODO: What happens if we error out before here? + used_descs = true; + } + + let needs_notification = if self.event_idx { + match vring + .get_queue_mut() + .needs_notification(self.guest_mem.memory().deref()) + { + Ok(needs_notification) => needs_notification, + Err(error) => { + error!(?error, "failed to check if queue needs notification"); + true + } + } + } else { + true + }; + + if needs_notification { + if let Err(error) = vring.signal_used_queue() { + error!(?error, "failed to signal used queue"); + } + } + + Ok(used_descs) + } +} + +impl<FS> VhostUserBackendMut<VringMutex> for VhostUserFsBackend<FS> +where + FS: FileSystem + Send + Sync, +{ + fn num_queues(&self) -> usize { + NUM_QUEUES + } + + fn max_queue_size(&self) -> usize { + QUEUE_SIZE + } + + fn features(&self) -> u64 { + 1 << VIRTIO_F_VERSION_1 + | 1 << VIRTIO_RING_F_INDIRECT_DESC + | 1 << VIRTIO_RING_F_EVENT_IDX + | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() + } + + fn protocol_features(&self) -> VhostUserProtocolFeatures { + VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::SLAVE_REQ + } + + fn set_event_idx(&mut self, enabled: bool) { + self.event_idx = enabled; + } + + fn update_memory(&mut self, _mem: GuestMemoryAtomic<GuestMemoryMmap>) -> std::io::Result<()> { + // This is what most the vhost user implementations do... + Ok(()) + } + + fn set_slave_req_fd(&mut self, cache_req: SlaveFsCacheReq) { + self.cache_req = Some(cache_req); + } + + fn handle_event( + &mut self, + device_event: u16, + evset: vmm_sys_util::epoll::EventSet, + vrings: &[VringMutex], + _thread_id: usize, + ) -> std::io::Result<bool> { + if evset != EventSet::IN { + return Err(Error::HandleEventNotEpollIn.into()); + } + + let mut queue = match device_event { + // High priority queue + 0 => vrings[0].get_mut(), + // Regurlar priority queue + 1 => vrings[1].get_mut(), + _ => { + return Err(Error::HandleEventUnknownEvent.into()); + } + }; + + if self.event_idx { + loop { + queue + .get_queue_mut() + .enable_notification(self.guest_mem.memory().deref()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; + if !self.process_queue(&mut queue)? { + break; + } + } + } else { + self.process_queue(&mut queue)?; + } + + Ok(false) + } +} + +pub fn start_virtiofs_daemon<FS, P>(fs: FS, socket: P) -> io::Result<()> +where + FS: FileSystem + Send + Sync + 'static, + P: AsRef<Path>, +{ + let guest_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); + + let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); + + let backend = Arc::new(RwLock::new(VhostUserFsBackend { + server, + guest_mem: guest_mem.clone(), + event_idx: false, + cache_req: None, + })); + + let listener = Listener::new(socket, true).unwrap(); + + let mut fs_daemon = + VhostUserDaemon::new(String::from("vhost-user-fs-tvix-store"), backend, guest_mem) + .map_err(|_| Error::NewDaemon)?; + + fs_daemon.start(listener).map_err(|_| Error::StartDaemon)?; + + fs_daemon.wait().map_err(|_| Error::WaitDaemon)?; + + Ok(()) +} |