use std::{ convert, error, fmt, io, ops::Deref, path::Path, sync::{Arc, MutexGuard, RwLock}, }; use fuse_backend_rs::{ api::{filesystem::FileSystem, server::Server}, transport::{FsCacheReqHandler, Reader, VirtioFsWriter}, }; use tracing::error; use vhost::vhost_user::{ Listener, SlaveFsCacheReq, VhostUserProtocolFeatures, VhostUserVirtioFeatures, }; use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringMutex, VringState, VringT}; use virtio_bindings::bindings::virtio_ring::{ VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC, }; use virtio_queue::QueueT; use vm_memory::{GuestAddressSpace, GuestMemoryAtomic, GuestMemoryMmap}; use vmm_sys_util::epoll::EventSet; const VIRTIO_F_VERSION_1: u32 = 32; const NUM_QUEUES: usize = 2; const QUEUE_SIZE: usize = 1024; #[derive(Debug)] enum Error { /// Failed to handle non-input event. HandleEventNotEpollIn, /// Failed to handle unknown event. HandleEventUnknownEvent, /// Invalid descriptor chain. InvalidDescriptorChain, /// Failed to handle filesystem requests. #[allow(dead_code)] HandleRequests(fuse_backend_rs::Error), /// Failed to construct new vhost user daemon. NewDaemon, /// Failed to start the vhost user daemon. StartDaemon, /// Failed to wait for the vhost user daemon. WaitDaemon, } impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "vhost_user_fs_error: {self:?}") } } impl error::Error for Error {} impl convert::From<Error> for io::Error { fn from(e: Error) -> Self { io::Error::new(io::ErrorKind::Other, e) } } struct VhostUserFsBackend<FS> where FS: FileSystem + Send + Sync, { server: Arc<Server<Arc<FS>>>, event_idx: bool, guest_mem: GuestMemoryAtomic<GuestMemoryMmap>, cache_req: Option<SlaveFsCacheReq>, } impl<FS> VhostUserFsBackend<FS> where FS: FileSystem + Send + Sync, { fn process_queue(&mut self, vring: &mut MutexGuard<VringState>) -> std::io::Result<bool> { let mut used_descs = false; while let Some(desc_chain) = vring .get_queue_mut() .pop_descriptor_chain(self.guest_mem.memory()) { let memory = desc_chain.memory(); let reader = Reader::from_descriptor_chain(memory, desc_chain.clone()) .map_err(|_| Error::InvalidDescriptorChain)?; let writer = VirtioFsWriter::new(memory, desc_chain.clone()) .map_err(|_| Error::InvalidDescriptorChain)?; self.server .handle_message( reader, writer.into(), self.cache_req .as_mut() .map(|req| req as &mut dyn FsCacheReqHandler), None, ) .map_err(Error::HandleRequests)?; // TODO: Is len 0 correct? if let Err(error) = vring .get_queue_mut() .add_used(memory, desc_chain.head_index(), 0) { error!(?error, "failed to add desc back to ring"); } // TODO: What happens if we error out before here? used_descs = true; } let needs_notification = if self.event_idx { match vring .get_queue_mut() .needs_notification(self.guest_mem.memory().deref()) { Ok(needs_notification) => needs_notification, Err(error) => { error!(?error, "failed to check if queue needs notification"); true } } } else { true }; if needs_notification { if let Err(error) = vring.signal_used_queue() { error!(?error, "failed to signal used queue"); } } Ok(used_descs) } } impl<FS> VhostUserBackendMut<VringMutex> for VhostUserFsBackend<FS> where FS: FileSystem + Send + Sync, { fn num_queues(&self) -> usize { NUM_QUEUES } fn max_queue_size(&self) -> usize { QUEUE_SIZE } fn features(&self) -> u64 { 1 << VIRTIO_F_VERSION_1 | 1 << VIRTIO_RING_F_INDIRECT_DESC | 1 << VIRTIO_RING_F_EVENT_IDX | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits() } fn protocol_features(&self) -> VhostUserProtocolFeatures { VhostUserProtocolFeatures::MQ | VhostUserProtocolFeatures::SLAVE_REQ } fn set_event_idx(&mut self, enabled: bool) { self.event_idx = enabled; } fn update_memory(&mut self, _mem: GuestMemoryAtomic<GuestMemoryMmap>) -> std::io::Result<()> { // This is what most the vhost user implementations do... Ok(()) } fn set_slave_req_fd(&mut self, cache_req: SlaveFsCacheReq) { self.cache_req = Some(cache_req); } fn handle_event( &mut self, device_event: u16, evset: vmm_sys_util::epoll::EventSet, vrings: &[VringMutex], _thread_id: usize, ) -> std::io::Result<bool> { if evset != EventSet::IN { return Err(Error::HandleEventNotEpollIn.into()); } let mut queue = match device_event { // High priority queue 0 => vrings[0].get_mut(), // Regurlar priority queue 1 => vrings[1].get_mut(), _ => { return Err(Error::HandleEventUnknownEvent.into()); } }; if self.event_idx { loop { queue .get_queue_mut() .enable_notification(self.guest_mem.memory().deref()) .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; if !self.process_queue(&mut queue)? { break; } } } else { self.process_queue(&mut queue)?; } Ok(false) } } pub fn start_virtiofs_daemon<FS, P>(fs: FS, socket: P) -> io::Result<()> where FS: FileSystem + Send + Sync + 'static, P: AsRef<Path>, { let guest_mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); let server = Arc::new(fuse_backend_rs::api::server::Server::new(Arc::new(fs))); let backend = Arc::new(RwLock::new(VhostUserFsBackend { server, guest_mem: guest_mem.clone(), event_idx: false, cache_req: None, })); let listener = Listener::new(socket, true).unwrap(); let mut fs_daemon = VhostUserDaemon::new(String::from("vhost-user-fs-tvix-store"), backend, guest_mem) .map_err(|_| Error::NewDaemon)?; fs_daemon.start(listener).map_err(|_| Error::StartDaemon)?; fs_daemon.wait().map_err(|_| Error::WaitDaemon)?; Ok(()) }